diff --git a/README.md b/README.md index c6e788a..2a4afe8 100644 --- a/README.md +++ b/README.md @@ -720,12 +720,63 @@ The `ScalingSchedule` and `ClusterScalingSchedule` collectors allow collecting time-based metrics from the respective CRD objects specified in the HPA. +These collectors are disabled by default, you have to start the server +with the `--scaling-schedule` flag to enable it. Remember to deploy the CRDs +`ScalingSchedule` and `ClusterScalingSchedule` and allow the service +account used by the server to read, watch and list them. + ### Supported metrics | Metric | Description | Type | K8s Versions | | ---------- | -------------- | ------- | -- | | ObjectName | The metric is calculated and stored for each `ScalingSchedule` and `ClusterScalingSchedule` referenced in the HPAs | `ScalingSchedule` and `ClusterScalingSchedule` | `>=1.16` | +### Ramp-up and ramp-down feature + +To avoid abrupt scaling due to time based metrics,the `SchalingSchedule` +collector has a feature of ramp-up and ramp-down the metric over a +specific period of time. The duration of the scaling window can be +configured individually in the `[Cluster]ScalingSchedule` object, via +the option `scalingWindowDurationMinutes` or globally for all scheduled +events, and defaults to a globally configured value if not specified. +The default for the latter is set to 10 minutes, but can be changed +using the `--scaling-schedule-default-scaling-window` flag. + +This spreads the scale events around, creating less load on the other +components, and helping the rest of the metrics (like the CPU ones) to +adjust as well. + +The [HPA algorithm][algo-details] does not make changes if the metric +change is less than the specified by the +`horizontal-pod-autoscaler-tolerance` flag: + +> We'll skip scaling if the ratio is sufficiently close to 1.0 (within a +> globally-configurable tolerance, from the +> `--horizontal-pod-autoscaler-tolerance` flag, which defaults to 0.1. + +With that in mind, the ramp-up and ramp-down feature divides the scaling +over the specified period of time in buckets, trying to achieve changes +bigger than the configured tolerance. The number of buckets defaults to +10 and can be configured by the `--scaling-schedule-ramp-steps` flag. + +**Important**: note that the ramp-up and ramp-down feature can lead to +deployments achieving less than the specified number of pods, due to the +HPA 10% change rule and the ceiling function applied to the desired +number of the pods (check the [algorithm details][algo-details]). It +varies with the configured metric for `ScalingSchedule` events, the +number of pods and the configured `horizontal-pod-autoscaler-tolerance` +flag of your kubernetes installation. [This gist][gist] contains the code to +simulate the situations a deployment with different number of pods, with +a metric of 10000 can face with 10 buckets (max of 90% of the metric +returned) and 5 buckets (max of 80% of the metric returned). The ramp-up +and ramp-down feature can be disabled by setting +`--scaling-schedule-default-scaling-window` to 0 and abrupt scalings can +be handled via [scaling policies][policies]. + +[algo-details]: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#algorithm-details +[gist]: https://gist.github.com/jonathanbeber/37f1f918ab7ef6101c6ce56cc2cef3a2 +[policies]: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#scaling-policies + ### Example This is an example of using the ScalingSchedule collectors to collect @@ -826,8 +877,3 @@ Note that these number of pods are just considering these custom metrics, the normal HPA behavior still applies, such as: in case of multiple metrics the biggest number of pods is the utilized one, HPA max and min replica configuration, autoscaling policies, etc. - -These collectors are disabled by default, you have to start the server -with the `--scaling-schedule` flag to enable it. Remember to deploy the CRDs -`ScalingSchedule` and `ClusterScalingSchedule` and allow the service -account used by the server to read, watch and list them. diff --git a/pkg/collector/scaling_schedule_collector.go b/pkg/collector/scaling_schedule_collector.go index 3dba87b..ae87c4f 100644 --- a/pkg/collector/scaling_schedule_collector.go +++ b/pkg/collector/scaling_schedule_collector.go @@ -82,6 +82,7 @@ type ScalingScheduleCollectorPlugin struct { store Store now Now defaultScalingWindow time.Duration + rampSteps int } // ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics @@ -90,23 +91,26 @@ type ClusterScalingScheduleCollectorPlugin struct { store Store now Now defaultScalingWindow time.Duration + rampSteps int } // NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin. -func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ScalingScheduleCollectorPlugin, error) { +func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ScalingScheduleCollectorPlugin, error) { return &ScalingScheduleCollectorPlugin{ store: store, now: now, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, nil } // NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin. -func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ClusterScalingScheduleCollectorPlugin, error) { +func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ClusterScalingScheduleCollectorPlugin, error) { return &ClusterScalingScheduleCollectorPlugin{ store: store, now: now, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, nil } @@ -114,14 +118,14 @@ func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScali // specified HPA. It's the only required method to implement the // collector.CollectorPlugin interface. func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval) + return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval) } // NewCollector initializes a new cluster wide scaling schedule // collector from the specified HPA. It's the only required method to // implement the collector.CollectorPlugin interface. func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval) + return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval) } // ScalingScheduleCollector is a metrics collector for time based @@ -148,10 +152,11 @@ type scalingScheduleCollector struct { interval time.Duration config MetricConfig defaultScalingWindow time.Duration + rampSteps int } // NewScalingScheduleCollector initializes a new ScalingScheduleCollector. -func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { +func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { return &ScalingScheduleCollector{ scalingScheduleCollector{ store: store, @@ -162,12 +167,13 @@ func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration interval: interval, config: *config, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, }, nil } // NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector. -func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { +func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { return &ClusterScalingScheduleCollector{ scalingScheduleCollector{ store: store, @@ -178,6 +184,7 @@ func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.D interval: interval, config: *config, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, }, nil } @@ -196,7 +203,7 @@ func (c *ScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error) { if !ok { return nil, ErrNotScalingScheduleFound } - return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric) + return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric) } // GetMetrics is the main implementation for collector.Collector interface @@ -229,7 +236,7 @@ func (c *ClusterScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule) } - return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric) + return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric) } // Interval returns the interval at which the collector should run. @@ -242,7 +249,7 @@ func (c *ClusterScalingScheduleCollector) Interval() time.Duration { return c.interval } -func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { +func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, rampSteps int, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { scalingWindowDuration := defaultScalingWindow if spec.ScalingWindowDurationMinutes != nil { scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute @@ -285,7 +292,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur parsedStartTime.Nanosecond(), location, ) - value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value)) + value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value)) break } } @@ -295,7 +302,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur return nil, ErrInvalidScheduleDate } - value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value)) + value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value)) } } @@ -313,7 +320,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur }, nil } -func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, value int64) int64 { +func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 { scaleUpStart := startTime.Add(-scalingWindowDuration) endTime := startTime.Add(entryDuration) scaleUpEnd := endTime.Add(scalingWindowDuration) @@ -322,23 +329,25 @@ func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time. return value } if between(timestamp, scaleUpStart, startTime) { - return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, value) + return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, rampSteps, value) } if between(timestamp, endTime, scaleUpEnd) { - return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, value) + return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, rampSteps, value) } return 0 } -func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, value int64) int64 { +// The HPA has a rule to do not scale up or down if the change in the +// metric is less than 10% (by default) of the current value. We will +// use buckets of time using the floor of each as the returned metric. +// Any config greater or equal to 10 buckets must guarantee changes +// bigger than 10%. +func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 { if scalingWindowDuration == 0 { return 0 } - // The HPA has a rule to do not scale up or down if the change in - // the metric is less than 10% of the current value. We will use 10 - // buckets of time using the floor of each. This value might be - // flexible one day, but for now it's fixed. - const steps float64 = 10 + + steps := float64(rampSteps) requiredPercentage := math.Abs(float64(timestamp.Sub(startTime))) / float64(scalingWindowDuration) return int64(math.Floor(requiredPercentage*steps) * (float64(value) / steps)) diff --git a/pkg/collector/scaling_schedule_collector_test.go b/pkg/collector/scaling_schedule_collector_test.go index 305ebf6..aa9b0ed 100644 --- a/pkg/collector/scaling_schedule_collector_test.go +++ b/pkg/collector/scaling_schedule_collector_test.go @@ -15,6 +15,7 @@ import ( const ( hHMMFormat = "15:04" defaultScalingWindowDuration = 1 * time.Minute + defaultRampSteps = 10 ) type schedule struct { @@ -48,6 +49,7 @@ func TestScalingScheduleCollector(t *testing.T) { scalingWindowDurationMinutes *int64 expectedValue int64 err error + rampSteps int }{ { msg: "Return the right value for one time config", @@ -109,6 +111,31 @@ func TestScalingScheduleCollector(t *testing.T) { }, expectedValue: 60, }, + { + msg: "10 steps (default) return 90% of the metric, even 1 second before", + schedules: []schedule{ + { + date: nowTime.Add(time.Second * 1).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 90, + }, + { + msg: "5 steps return 80% of the metric, even 1 second before", + schedules: []schedule{ + { + date: nowTime.Add(time.Second * 1).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 80, + rampSteps: 5, + }, { msg: "Return the scaled value (90) for one time config with a custom scaling window - 30 seconds before starting", scalingWindowDurationMinutes: &tenMinutes, @@ -471,17 +498,22 @@ func TestScalingScheduleCollector(t *testing.T) { scalingScheduleName := "my_scaling_schedule" namespace := "default" + rampSteps := tc.rampSteps + if rampSteps == 0 { + rampSteps = defaultRampSteps + } + schedules := getSchedules(tc.schedules) store := newMockStore(scalingScheduleName, namespace, tc.scalingWindowDurationMinutes, schedules) - plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) clusterStore := newClusterMockStore(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules) - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules) - clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration) + clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA(namespace, scalingScheduleName) @@ -549,14 +581,14 @@ func TestScalingScheduleObjectNotPresentReturnsError(t *testing.T) { make(map[string]interface{}), getByKeyFn, } - plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) clusterStore := mockStore{ make(map[string]interface{}), getByKeyFn, } - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") @@ -611,10 +643,10 @@ func TestReturnsErrorWhenStoreDoes(t *testing.T) { }, } - plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") diff --git a/pkg/server/start.go b/pkg/server/start.go index 1b9de87..dd11287 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -128,7 +128,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.") flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+ "whether to enable time-based ScalingSchedule metrics") - flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default scale-up/scale-down window duration for scheduled metrics") + flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules") + flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.") return cmd } @@ -294,7 +295,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct ) go reflector.Run(ctx.Done()) - clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow) + clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps) if err != nil { return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err) } @@ -303,7 +304,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err) } - plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow) + plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps) if err != nil { return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err) } @@ -429,6 +430,8 @@ type AdapterServerOptions struct { GCInterval time.Duration // Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling. ScalingScheduleMetrics bool - // Default scale-up/scale-down window duration for scheduled metrics + // Default ramp-up/ramp-down window duration for scheduled metrics DefaultScheduledScalingWindow time.Duration + // Number of steps utilized during the rampup and rampdown for scheduled metrics + RampSteps int }