From fd4ead837e5506c0a363ad182045a8b89f023205 Mon Sep 17 00:00:00 2001 From: Jonathan Juares Beber Date: Mon, 18 Oct 2021 18:31:55 +0200 Subject: [PATCH] Make the number of ramp steps configurable In #371 we introduced steps to make the scaling up possible even when the HPA forces a 10% change. The problem is that 10% might not be sufficient for some specific scaling scenarios. For example, a an application targeting 12 pods and using a ScalingSchedule with the value of 10000 to achieve that, will require a target of 833. With 10 ramp steps the 90% bucket will return a metric of 9000 and the HPA calculates (9000/833) 10.8 pods, rounding to 11 pods. Once the metric reaches the time to return 100% it will won't be effective, since the change of the current number of pods (11) and the desired one (12) is less than 10%. This commit does not try to tackle this problem completely, since the 10% rule is not fixed, might change among different clusters and is also dependent on the value given to each ScalingSchedule. Therefore, this commit makes the number of ramp steps configurable via the `--scaling-schedule-ramp-steps` config flag, defaulting to 10. Signed-off-by: Jonathan Juares Beber --- README.md | 56 +++++++++++++++++-- pkg/collector/scaling_schedule_collector.go | 49 +++++++++------- .../scaling_schedule_collector_test.go | 46 ++++++++++++--- pkg/server/start.go | 11 ++-- 4 files changed, 126 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index c6e788a..2a4afe8 100644 --- a/README.md +++ b/README.md @@ -720,12 +720,63 @@ The `ScalingSchedule` and `ClusterScalingSchedule` collectors allow collecting time-based metrics from the respective CRD objects specified in the HPA. +These collectors are disabled by default, you have to start the server +with the `--scaling-schedule` flag to enable it. Remember to deploy the CRDs +`ScalingSchedule` and `ClusterScalingSchedule` and allow the service +account used by the server to read, watch and list them. + ### Supported metrics | Metric | Description | Type | K8s Versions | | ---------- | -------------- | ------- | -- | | ObjectName | The metric is calculated and stored for each `ScalingSchedule` and `ClusterScalingSchedule` referenced in the HPAs | `ScalingSchedule` and `ClusterScalingSchedule` | `>=1.16` | +### Ramp-up and ramp-down feature + +To avoid abrupt scaling due to time based metrics,the `SchalingSchedule` +collector has a feature of ramp-up and ramp-down the metric over a +specific period of time. The duration of the scaling window can be +configured individually in the `[Cluster]ScalingSchedule` object, via +the option `scalingWindowDurationMinutes` or globally for all scheduled +events, and defaults to a globally configured value if not specified. +The default for the latter is set to 10 minutes, but can be changed +using the `--scaling-schedule-default-scaling-window` flag. + +This spreads the scale events around, creating less load on the other +components, and helping the rest of the metrics (like the CPU ones) to +adjust as well. + +The [HPA algorithm][algo-details] does not make changes if the metric +change is less than the specified by the +`horizontal-pod-autoscaler-tolerance` flag: + +> We'll skip scaling if the ratio is sufficiently close to 1.0 (within a +> globally-configurable tolerance, from the +> `--horizontal-pod-autoscaler-tolerance` flag, which defaults to 0.1. + +With that in mind, the ramp-up and ramp-down feature divides the scaling +over the specified period of time in buckets, trying to achieve changes +bigger than the configured tolerance. The number of buckets defaults to +10 and can be configured by the `--scaling-schedule-ramp-steps` flag. + +**Important**: note that the ramp-up and ramp-down feature can lead to +deployments achieving less than the specified number of pods, due to the +HPA 10% change rule and the ceiling function applied to the desired +number of the pods (check the [algorithm details][algo-details]). It +varies with the configured metric for `ScalingSchedule` events, the +number of pods and the configured `horizontal-pod-autoscaler-tolerance` +flag of your kubernetes installation. [This gist][gist] contains the code to +simulate the situations a deployment with different number of pods, with +a metric of 10000 can face with 10 buckets (max of 90% of the metric +returned) and 5 buckets (max of 80% of the metric returned). The ramp-up +and ramp-down feature can be disabled by setting +`--scaling-schedule-default-scaling-window` to 0 and abrupt scalings can +be handled via [scaling policies][policies]. + +[algo-details]: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#algorithm-details +[gist]: https://gist.github.com/jonathanbeber/37f1f918ab7ef6101c6ce56cc2cef3a2 +[policies]: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#scaling-policies + ### Example This is an example of using the ScalingSchedule collectors to collect @@ -826,8 +877,3 @@ Note that these number of pods are just considering these custom metrics, the normal HPA behavior still applies, such as: in case of multiple metrics the biggest number of pods is the utilized one, HPA max and min replica configuration, autoscaling policies, etc. - -These collectors are disabled by default, you have to start the server -with the `--scaling-schedule` flag to enable it. Remember to deploy the CRDs -`ScalingSchedule` and `ClusterScalingSchedule` and allow the service -account used by the server to read, watch and list them. diff --git a/pkg/collector/scaling_schedule_collector.go b/pkg/collector/scaling_schedule_collector.go index 3dba87b..ae87c4f 100644 --- a/pkg/collector/scaling_schedule_collector.go +++ b/pkg/collector/scaling_schedule_collector.go @@ -82,6 +82,7 @@ type ScalingScheduleCollectorPlugin struct { store Store now Now defaultScalingWindow time.Duration + rampSteps int } // ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics @@ -90,23 +91,26 @@ type ClusterScalingScheduleCollectorPlugin struct { store Store now Now defaultScalingWindow time.Duration + rampSteps int } // NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin. -func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ScalingScheduleCollectorPlugin, error) { +func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ScalingScheduleCollectorPlugin, error) { return &ScalingScheduleCollectorPlugin{ store: store, now: now, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, nil } // NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin. -func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ClusterScalingScheduleCollectorPlugin, error) { +func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ClusterScalingScheduleCollectorPlugin, error) { return &ClusterScalingScheduleCollectorPlugin{ store: store, now: now, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, nil } @@ -114,14 +118,14 @@ func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScali // specified HPA. It's the only required method to implement the // collector.CollectorPlugin interface. func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval) + return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval) } // NewCollector initializes a new cluster wide scaling schedule // collector from the specified HPA. It's the only required method to // implement the collector.CollectorPlugin interface. func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval) + return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval) } // ScalingScheduleCollector is a metrics collector for time based @@ -148,10 +152,11 @@ type scalingScheduleCollector struct { interval time.Duration config MetricConfig defaultScalingWindow time.Duration + rampSteps int } // NewScalingScheduleCollector initializes a new ScalingScheduleCollector. -func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { +func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { return &ScalingScheduleCollector{ scalingScheduleCollector{ store: store, @@ -162,12 +167,13 @@ func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration interval: interval, config: *config, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, }, nil } // NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector. -func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { +func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { return &ClusterScalingScheduleCollector{ scalingScheduleCollector{ store: store, @@ -178,6 +184,7 @@ func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.D interval: interval, config: *config, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, }, nil } @@ -196,7 +203,7 @@ func (c *ScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error) { if !ok { return nil, ErrNotScalingScheduleFound } - return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric) + return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric) } // GetMetrics is the main implementation for collector.Collector interface @@ -229,7 +236,7 @@ func (c *ClusterScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule) } - return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric) + return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric) } // Interval returns the interval at which the collector should run. @@ -242,7 +249,7 @@ func (c *ClusterScalingScheduleCollector) Interval() time.Duration { return c.interval } -func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { +func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, rampSteps int, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { scalingWindowDuration := defaultScalingWindow if spec.ScalingWindowDurationMinutes != nil { scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute @@ -285,7 +292,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur parsedStartTime.Nanosecond(), location, ) - value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value)) + value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value)) break } } @@ -295,7 +302,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur return nil, ErrInvalidScheduleDate } - value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value)) + value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value)) } } @@ -313,7 +320,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur }, nil } -func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, value int64) int64 { +func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 { scaleUpStart := startTime.Add(-scalingWindowDuration) endTime := startTime.Add(entryDuration) scaleUpEnd := endTime.Add(scalingWindowDuration) @@ -322,23 +329,25 @@ func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time. return value } if between(timestamp, scaleUpStart, startTime) { - return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, value) + return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, rampSteps, value) } if between(timestamp, endTime, scaleUpEnd) { - return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, value) + return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, rampSteps, value) } return 0 } -func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, value int64) int64 { +// The HPA has a rule to do not scale up or down if the change in the +// metric is less than 10% (by default) of the current value. We will +// use buckets of time using the floor of each as the returned metric. +// Any config greater or equal to 10 buckets must guarantee changes +// bigger than 10%. +func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 { if scalingWindowDuration == 0 { return 0 } - // The HPA has a rule to do not scale up or down if the change in - // the metric is less than 10% of the current value. We will use 10 - // buckets of time using the floor of each. This value might be - // flexible one day, but for now it's fixed. - const steps float64 = 10 + + steps := float64(rampSteps) requiredPercentage := math.Abs(float64(timestamp.Sub(startTime))) / float64(scalingWindowDuration) return int64(math.Floor(requiredPercentage*steps) * (float64(value) / steps)) diff --git a/pkg/collector/scaling_schedule_collector_test.go b/pkg/collector/scaling_schedule_collector_test.go index 305ebf6..aa9b0ed 100644 --- a/pkg/collector/scaling_schedule_collector_test.go +++ b/pkg/collector/scaling_schedule_collector_test.go @@ -15,6 +15,7 @@ import ( const ( hHMMFormat = "15:04" defaultScalingWindowDuration = 1 * time.Minute + defaultRampSteps = 10 ) type schedule struct { @@ -48,6 +49,7 @@ func TestScalingScheduleCollector(t *testing.T) { scalingWindowDurationMinutes *int64 expectedValue int64 err error + rampSteps int }{ { msg: "Return the right value for one time config", @@ -109,6 +111,31 @@ func TestScalingScheduleCollector(t *testing.T) { }, expectedValue: 60, }, + { + msg: "10 steps (default) return 90% of the metric, even 1 second before", + schedules: []schedule{ + { + date: nowTime.Add(time.Second * 1).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 90, + }, + { + msg: "5 steps return 80% of the metric, even 1 second before", + schedules: []schedule{ + { + date: nowTime.Add(time.Second * 1).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 80, + rampSteps: 5, + }, { msg: "Return the scaled value (90) for one time config with a custom scaling window - 30 seconds before starting", scalingWindowDurationMinutes: &tenMinutes, @@ -471,17 +498,22 @@ func TestScalingScheduleCollector(t *testing.T) { scalingScheduleName := "my_scaling_schedule" namespace := "default" + rampSteps := tc.rampSteps + if rampSteps == 0 { + rampSteps = defaultRampSteps + } + schedules := getSchedules(tc.schedules) store := newMockStore(scalingScheduleName, namespace, tc.scalingWindowDurationMinutes, schedules) - plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) clusterStore := newClusterMockStore(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules) - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules) - clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration) + clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA(namespace, scalingScheduleName) @@ -549,14 +581,14 @@ func TestScalingScheduleObjectNotPresentReturnsError(t *testing.T) { make(map[string]interface{}), getByKeyFn, } - plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) clusterStore := mockStore{ make(map[string]interface{}), getByKeyFn, } - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") @@ -611,10 +643,10 @@ func TestReturnsErrorWhenStoreDoes(t *testing.T) { }, } - plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") diff --git a/pkg/server/start.go b/pkg/server/start.go index 1b9de87..dd11287 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -128,7 +128,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.") flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+ "whether to enable time-based ScalingSchedule metrics") - flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default scale-up/scale-down window duration for scheduled metrics") + flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules") + flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.") return cmd } @@ -294,7 +295,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct ) go reflector.Run(ctx.Done()) - clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow) + clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps) if err != nil { return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err) } @@ -303,7 +304,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err) } - plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow) + plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps) if err != nil { return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err) } @@ -429,6 +430,8 @@ type AdapterServerOptions struct { GCInterval time.Duration // Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling. ScalingScheduleMetrics bool - // Default scale-up/scale-down window duration for scheduled metrics + // Default ramp-up/ramp-down window duration for scheduled metrics DefaultScheduledScalingWindow time.Duration + // Number of steps utilized during the rampup and rampdown for scheduled metrics + RampSteps int }