From c5411c74b77b015546d25c69d105acd8120b7380 Mon Sep 17 00:00:00 2001 From: Alexey Ermakov Date: Thu, 23 Sep 2021 19:47:03 +0200 Subject: [PATCH] Scheduled scaling: add an optional scaling window Signed-off-by: Alexey Ermakov --- docs/cluster_scaling_schedules_crd.yaml | 6 + docs/scaling_schedules_crd.yaml | 6 + pkg/apis/zalando.org/v1/types.go | 12 +- .../zalando.org/v1/zz_generated.deepcopy.go | 5 + pkg/collector/scaling_schedule_collector.go | 149 +++++++++++------- .../scaling_schedule_collector_test.go | 95 ++++++++--- pkg/server/start.go | 7 +- 7 files changed, 200 insertions(+), 80 deletions(-) diff --git a/docs/cluster_scaling_schedules_crd.yaml b/docs/cluster_scaling_schedules_crd.yaml index a5af588..2cfc3e2 100644 --- a/docs/cluster_scaling_schedules_crd.yaml +++ b/docs/cluster_scaling_schedules_crd.yaml @@ -37,6 +37,11 @@ spec: spec: description: ScalingScheduleSpec is the spec part of the ScalingSchedule. properties: + scalingWindowDurationMinutes: + description: Fade the scheduled values in and out over this many minutes. + If unset, the default per-cluster value will be used. + format: int64 + type: integer schedules: description: Schedules is the list of schedules for this ScalingSchedule resource. All the schedules defined here will result on the value @@ -96,6 +101,7 @@ spec: value: description: The metric value that will be returned for the defined schedule. + format: int64 type: integer required: - durationMinutes diff --git a/docs/scaling_schedules_crd.yaml b/docs/scaling_schedules_crd.yaml index 5f8c3a4..eb4d4d8 100644 --- a/docs/scaling_schedules_crd.yaml +++ b/docs/scaling_schedules_crd.yaml @@ -37,6 +37,11 @@ spec: spec: description: ScalingScheduleSpec is the spec part of the ScalingSchedule. properties: + scalingWindowDurationMinutes: + description: Fade the scheduled values in and out over this many minutes. + If unset, the default per-cluster value will be used. + format: int64 + type: integer schedules: description: Schedules is the list of schedules for this ScalingSchedule resource. All the schedules defined here will result on the value @@ -96,6 +101,7 @@ spec: value: description: The metric value that will be returned for the defined schedule. + format: int64 type: integer required: - durationMinutes diff --git a/pkg/apis/zalando.org/v1/types.go b/pkg/apis/zalando.org/v1/types.go index f64b51a..aa548c0 100644 --- a/pkg/apis/zalando.org/v1/types.go +++ b/pkg/apis/zalando.org/v1/types.go @@ -1,6 +1,8 @@ package v1 import ( + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -37,6 +39,10 @@ type ClusterScalingSchedule struct { // ScalingScheduleSpec is the spec part of the ScalingSchedule. // +k8s:deepcopy-gen=true type ScalingScheduleSpec struct { + // Fade the scheduled values in and out over this many minutes. If unset, the default per-cluster value will be used. + // +optional + ScalingWindowDurationMinutes *int64 `json:"scalingWindowDurationMinutes,omitempty"` + // Schedules is the list of schedules for this ScalingSchedule // resource. All the schedules defined here will result on the value // to the same metric. New metrics require a new ScalingSchedule @@ -59,7 +65,11 @@ type Schedule struct { // returned for the defined schedule. DurationMinutes int `json:"durationMinutes"` // The metric value that will be returned for the defined schedule. - Value int `json:"value"` + Value int64 `json:"value"` +} + +func (in Schedule) Duration() time.Duration { + return time.Duration(in.DurationMinutes) * time.Minute } // Defines if the schedule is a OneTime schedule or diff --git a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go index 1a2687e..36d3f21 100644 --- a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go @@ -147,6 +147,11 @@ func (in *ScalingScheduleList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScalingScheduleSpec) DeepCopyInto(out *ScalingScheduleSpec) { *out = *in + if in.ScalingWindowDurationMinutes != nil { + in, out := &in.ScalingWindowDurationMinutes, &out.ScalingWindowDurationMinutes + *out = new(int64) + **out = **in + } if in.Schedules != nil { in, out := &in.Schedules, &out.Schedules *out = make([]Schedule, len(*in)) diff --git a/pkg/collector/scaling_schedule_collector.go b/pkg/collector/scaling_schedule_collector.go index 8281765..c4a963f 100644 --- a/pkg/collector/scaling_schedule_collector.go +++ b/pkg/collector/scaling_schedule_collector.go @@ -3,6 +3,7 @@ package collector import ( "errors" "fmt" + "math" "time" v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1" @@ -78,30 +79,34 @@ type Store interface { // ScalingScheduleCollectorPlugin is a collector plugin for initializing metrics // collectors for getting ScalingSchedule configured metrics. type ScalingScheduleCollectorPlugin struct { - store Store - now Now + store Store + now Now + defaultScalingWindow time.Duration } // ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics // collectors for getting ClusterScalingSchedule configured metrics. type ClusterScalingScheduleCollectorPlugin struct { - store Store - now Now + store Store + now Now + defaultScalingWindow time.Duration } // NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin. -func NewScalingScheduleCollectorPlugin(store Store, now Now) (*ScalingScheduleCollectorPlugin, error) { +func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ScalingScheduleCollectorPlugin, error) { return &ScalingScheduleCollectorPlugin{ - store: store, - now: now, + store: store, + now: now, + defaultScalingWindow: defaultScalingWindow, }, nil } // NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin. -func NewClusterScalingScheduleCollectorPlugin(store Store, now Now) (*ClusterScalingScheduleCollectorPlugin, error) { +func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ClusterScalingScheduleCollectorPlugin, error) { return &ClusterScalingScheduleCollectorPlugin{ - store: store, - now: now, + store: store, + now: now, + defaultScalingWindow: defaultScalingWindow, }, nil } @@ -109,14 +114,14 @@ func NewClusterScalingScheduleCollectorPlugin(store Store, now Now) (*ClusterSca // specified HPA. It's the only required method to implement the // collector.CollectorPlugin interface. func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewScalingScheduleCollector(c.store, c.now, hpa, config, interval) + return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval) } // NewCollector initializes a new cluster wide scaling schedule // collector from the specified HPA. It's the only required method to // implement the collector.CollectorPlugin interface. func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewClusterScalingScheduleCollector(c.store, c.now, hpa, config, interval) + return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval) } // ScalingScheduleCollector is a metrics collector for time based @@ -135,41 +140,44 @@ type ClusterScalingScheduleCollector struct { // struct used by both ClusterScalingScheduleCollector and the // ScalingScheduleCollector. type scalingScheduleCollector struct { - store Store - now Now - metric autoscalingv2.MetricIdentifier - objectReference custom_metrics.ObjectReference - hpa *autoscalingv2.HorizontalPodAutoscaler - interval time.Duration - config MetricConfig + store Store + now Now + metric autoscalingv2.MetricIdentifier + objectReference custom_metrics.ObjectReference + hpa *autoscalingv2.HorizontalPodAutoscaler + interval time.Duration + config MetricConfig + defaultScalingWindow time.Duration } // NewScalingScheduleCollector initializes a new ScalingScheduleCollector. -func NewScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { +func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { return &ScalingScheduleCollector{ scalingScheduleCollector{ - store: store, - now: now, - objectReference: config.ObjectReference, - hpa: hpa, - metric: config.Metric, - interval: interval, - config: *config, + store: store, + now: now, + objectReference: config.ObjectReference, + hpa: hpa, + metric: config.Metric, + interval: interval, + config: *config, + defaultScalingWindow: defaultScalingWindow, }, }, nil } // NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector. -func NewClusterScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { +func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { return &ClusterScalingScheduleCollector{ scalingScheduleCollector{ - store: store, - now: now, - objectReference: config.ObjectReference, - hpa: hpa, - metric: config.Metric, - interval: interval, - config: *config, + store: store, + now: now, + objectReference: config.ObjectReference, + hpa: hpa, + metric: config.Metric, + interval: interval, + config: *config, + defaultScalingWindow: defaultScalingWindow, }, }, nil } @@ -188,7 +196,7 @@ func (c *ScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error) { if !ok { return nil, ErrNotScalingScheduleFound } - return calculateMetrics(scalingSchedule.Spec.Schedules, c.now(), c.objectReference, c.metric) + return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric) } // GetMetrics is the main implementation for collector.Collector interface @@ -221,7 +229,7 @@ func (c *ClusterScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule) } - return calculateMetrics(clusterScalingSchedule.Spec.Schedules, c.now(), c.objectReference, c.metric) + return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric) } // Interval returns the interval at which the collector should run. @@ -234,9 +242,17 @@ func (c *ClusterScalingScheduleCollector) Interval() time.Duration { return c.interval } -func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { - value := 0 - for _, schedule := range schedules { +func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { + scalingWindowDuration := defaultScalingWindow + if spec.ScalingWindowDurationMinutes != nil { + scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute + } + if scalingWindowDuration < 0 { + return nil, fmt.Errorf("scaling window duration cannot be negative") + } + + value := int64(0) + for _, schedule := range spec.Schedules { switch schedule.Type { case v1.RepeatingSchedule: location, err := time.LoadLocation(schedule.Period.Timezone) @@ -269,9 +285,7 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu parsedStartTime.Nanosecond(), location, ) - if within(now, scheduledTime, schedule.DurationMinutes) && schedule.Value > value { - value = schedule.Value - } + value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value)) break } } @@ -280,9 +294,8 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu if err != nil { return nil, ErrInvalidScheduleDate } - if within(now, scheduledTime, schedule.DurationMinutes) && schedule.Value > value { - value = schedule.Value - } + + value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value)) } } @@ -293,17 +306,47 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu Custom: custom_metrics.MetricValue{ DescribedObject: objectReference, Timestamp: metav1.Time{Time: now}, - Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI), + Value: *resource.NewMilliQuantity(value*1000, resource.DecimalSI), Metric: custom_metrics.MetricIdentifier(metric), }, }, }, nil } -// within receive two time.Time and a number of minutes. It returns true -// if the first given time, instant, is within the period of the second -// given time (start) plus the given number of minutes. -func within(instant, start time.Time, minutes int) bool { - return (instant.After(start) || instant.Equal(start)) && - instant.Before(start.Add(time.Duration(minutes)*time.Minute)) +func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, value int64) int64 { + scaleUpStart := startTime.Add(-scalingWindowDuration) + endTime := startTime.Add(entryDuration) + scaleUpEnd := endTime.Add(scalingWindowDuration) + + if between(timestamp, startTime, endTime) { + return value + } + if between(timestamp, scaleUpStart, startTime) { + return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, value) + } + if between(timestamp, endTime, scaleUpEnd) { + return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, value) + } + return 0 +} + +func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, value int64) int64 { + if scalingWindowDuration == 0 { + return 0 + } + return int64(math.Ceil(math.Abs(float64(timestamp.Sub(startTime))) / float64(scalingWindowDuration) * float64(value))) +} + +func between(timestamp, start, end time.Time) bool { + if timestamp.Before(start) { + return false + } + return timestamp.Before(end) +} + +func maxInt64(i1, i2 int64) int64 { + if i1 > i2 { + return i1 + } + return i2 } diff --git a/pkg/collector/scaling_schedule_collector_test.go b/pkg/collector/scaling_schedule_collector_test.go index e0f8fc5..91740c9 100644 --- a/pkg/collector/scaling_schedule_collector_test.go +++ b/pkg/collector/scaling_schedule_collector_test.go @@ -12,7 +12,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const hHMMFormat = "15:04" +const ( + hHMMFormat = "15:04" + defaultScalingWindowDuration = 1 * time.Minute +) type schedule struct { kind string @@ -21,7 +24,7 @@ type schedule struct { days []v1.ScheduleDay timezone string duration int - value int + value int64 } func TestScalingScheduleCollector(t *testing.T) { @@ -37,11 +40,14 @@ func TestScalingScheduleCollector(t *testing.T) { return uTCNow } + tenMinutes := int64(10) + for _, tc := range []struct { - msg string - schedules []schedule - expectedValue int - err error + msg string + schedules []schedule + scalingWindowDurationMinutes *int64 + expectedValue int64 + err error }{ { msg: "Return the right value for one time config", @@ -80,7 +86,45 @@ func TestScalingScheduleCollector(t *testing.T) { expectedValue: 100, }, { - msg: "Return the default value (0) for one time config - 30 seconds after", + msg: "Return the scaled value (67) for one time config - 20 seconds before starting", + schedules: []schedule{ + { + date: nowTime.Add(time.Second * 20).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 67, + }, + { + msg: "Return the scaled value (67) for one time config - 20 seconds after", + schedules: []schedule{ + { + date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 20).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 67, + }, + { + msg: "Return the scaled value (95) for one time config with a custom scaling window - 30 seconds before starting", + scalingWindowDurationMinutes: &tenMinutes, + schedules: []schedule{ + { + date: nowTime.Add(time.Second * 30).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 95, + }, + { + msg: "Return the scaled value (95) for one time config with a custom scaling window - 30 seconds after", + scalingWindowDurationMinutes: &tenMinutes, schedules: []schedule{ { date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 30).Format(time.RFC3339), @@ -89,7 +133,7 @@ func TestScalingScheduleCollector(t *testing.T) { value: 100, }, }, - expectedValue: 0, + expectedValue: 95, }, { msg: "Return the default value (0) for one time config not started yet (20 minutes before)", @@ -428,16 +472,16 @@ func TestScalingScheduleCollector(t *testing.T) { namespace := "default" schedules := getSchedules(tc.schedules) - store := newMockStore(scalingScheduleName, namespace, schedules) - plugin, err := NewScalingScheduleCollectorPlugin(store, now) + store := newMockStore(scalingScheduleName, namespace, tc.scalingWindowDurationMinutes, schedules) + plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration) require.NoError(t, err) - clusterStore := newClusterMockStore(scalingScheduleName, schedules) - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now) + clusterStore := newClusterMockStore(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration) require.NoError(t, err) - clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, schedules) - clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now) + clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules) + clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration) require.NoError(t, err) hpa := makeScalingScheduleHPA(namespace, scalingScheduleName) @@ -505,14 +549,14 @@ func TestScalingScheduleObjectNotPresentReturnsError(t *testing.T) { make(map[string]interface{}), getByKeyFn, } - plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now) + plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) require.NoError(t, err) clusterStore := mockStore{ make(map[string]interface{}), getByKeyFn, } - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration) require.NoError(t, err) hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") @@ -567,10 +611,10 @@ func TestReturnsErrorWhenStoreDoes(t *testing.T) { }, } - plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now) + plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) require.NoError(t, err) - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) require.NoError(t, err) hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") @@ -615,7 +659,7 @@ func getByKeyFn(d map[string]interface{}, key string) (item interface{}, exists return item, exists, nil } -func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore { +func newMockStore(name, namespace string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore { return mockStore{ map[string]interface{}{ fmt.Sprintf("%s/%s", namespace, name): &v1.ScalingSchedule{ @@ -623,7 +667,8 @@ func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore { Name: name, }, Spec: v1.ScalingScheduleSpec{ - Schedules: schedules, + ScalingWindowDurationMinutes: scalingWindowDurationMinutes, + Schedules: schedules, }, }, }, @@ -631,7 +676,7 @@ func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore { } } -func newClusterMockStore(name string, schedules []v1.Schedule) mockStore { +func newClusterMockStore(name string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore { return mockStore{ map[string]interface{}{ name: &v1.ClusterScalingSchedule{ @@ -639,7 +684,8 @@ func newClusterMockStore(name string, schedules []v1.Schedule) mockStore { Name: name, }, Spec: v1.ScalingScheduleSpec{ - Schedules: schedules, + ScalingWindowDurationMinutes: scalingWindowDurationMinutes, + Schedules: schedules, }, }, }, @@ -650,7 +696,7 @@ func newClusterMockStore(name string, schedules []v1.Schedule) mockStore { // The cache.Store returns the v1.ClusterScalingSchedule items as // v1.ScalingSchedule when it first lists it. When it's update it // asserts it correctly to the v1.ClusterScalingSchedule type. -func newClusterMockStoreFirstRun(name string, schedules []v1.Schedule) mockStore { +func newClusterMockStoreFirstRun(name string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore { return mockStore{ map[string]interface{}{ name: &v1.ScalingSchedule{ @@ -658,7 +704,8 @@ func newClusterMockStoreFirstRun(name string, schedules []v1.Schedule) mockStore Name: name, }, Spec: v1.ScalingScheduleSpec{ - Schedules: schedules, + ScalingWindowDurationMinutes: scalingWindowDurationMinutes, + Schedules: schedules, }, }, }, diff --git a/pkg/server/start.go b/pkg/server/start.go index 11ea557..1b9de87 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -128,6 +128,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.") flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+ "whether to enable time-based ScalingSchedule metrics") + flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default scale-up/scale-down window duration for scheduled metrics") return cmd } @@ -293,7 +294,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct ) go reflector.Run(ctx.Done()) - clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now) + clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow) if err != nil { return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err) } @@ -302,7 +303,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err) } - plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now) + plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow) if err != nil { return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err) } @@ -428,4 +429,6 @@ type AdapterServerOptions struct { GCInterval time.Duration // Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling. ScalingScheduleMetrics bool + // Default scale-up/scale-down window duration for scheduled metrics + DefaultScheduledScalingWindow time.Duration }