Compare commits

...

8 Commits

Author SHA1 Message Date
Jonathan Juares Beber
1c9038b2cc Merge pull request #374 from zalando-incubator/configurable-buckets
Make the number of ramp steps configurable
2021-10-25 10:21:32 +02:00
Jonathan Juares Beber
fd4ead837e Make the number of ramp steps configurable
In #371 we introduced steps to make the scaling up possible even when
the HPA forces a 10% change. The problem is that 10% might not be
sufficient for some specific scaling scenarios.

For example, a an application targeting 12 pods and using a
ScalingSchedule with the value of 10000 to achieve that, will require a
target of 833. With 10 ramp steps the 90% bucket will return a metric of
9000 and the HPA calculates (9000/833) 10.8 pods, rounding to 11 pods.
Once the metric reaches the time to return 100% it will won't be
effective, since the change of the current number of pods (11) and the
desired one (12) is less than 10%.

This commit does not try to tackle this problem completely, since the
10% rule is not fixed, might change among different clusters and is also
dependent on the value given to each ScalingSchedule. Therefore, this
commit makes the number of ramp steps configurable via the
`--scaling-schedule-ramp-steps` config flag, defaulting to 10.

Signed-off-by: Jonathan Juares Beber <jonathanbeber@gmail.com>
2021-10-22 15:35:11 +02:00
Mikkel Oscar Lyderik Larsen
f46f801811 Merge pull request #373 from zalando-incubator/json-path-array
Handle more complex array in json path
2021-10-19 10:04:04 +02:00
Mikkel Oscar Lyderik Larsen
4acdf72ef7 Handle more complex array in json path
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2021-10-14 09:34:15 +02:00
Jonathan Juares Beber
e04cd10bfc Merge pull request #371 from zalando-incubator/scaling-chunks
Use 10 buckets on ScalingSchedule ramp-up/down
2021-10-01 10:39:57 +02:00
Jonathan Juares Beber
8fe330941a Use 10 buckets on ScalingSchedule ramp-up/down
The HPA has a feature to do not scale up and down when the change in the
metric is less than 10%:

> We'll skip scaling if the ratio is sufficiently close to 1.0 (within a
> globally-configurable tolerance, from the
> `--horizontal-pod-autoscaler-tolerance` flag, which defaults to 0.1.

It could lead to pods scaling up to 10% less than the target for
ScalingSchedules and then not scaling to the actual value if the metric
calculated before was less than 10% of the target.

This commit uses 10 fixed buckets for scaling, this way we know the
metric returned during a scaling event is at least 10% more than a
previous one calculated during the period of ramp up. The same is valid
for the scaling down during a ramp-down

Signed-off-by: Jonathan Juares Beber <jonathanbeber@gmail.com>
2021-09-30 19:01:59 +02:00
aermakov-zalando
0730c6ef1e Merge pull request #370 from zalando-incubator/schedule-scaling-window
Scheduled scaling: scale up/down slowly
2021-09-24 15:47:44 +02:00
Alexey Ermakov
c5411c74b7 Scheduled scaling: add an optional scaling window
Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2021-09-24 15:33:49 +02:00
10 changed files with 325 additions and 86 deletions

View File

@ -720,12 +720,63 @@ The `ScalingSchedule` and `ClusterScalingSchedule` collectors allow
collecting time-based metrics from the respective CRD objects specified
in the HPA.
These collectors are disabled by default, you have to start the server
with the `--scaling-schedule` flag to enable it. Remember to deploy the CRDs
`ScalingSchedule` and `ClusterScalingSchedule` and allow the service
account used by the server to read, watch and list them.
### Supported metrics
| Metric | Description | Type | K8s Versions |
| ---------- | -------------- | ------- | -- |
| ObjectName | The metric is calculated and stored for each `ScalingSchedule` and `ClusterScalingSchedule` referenced in the HPAs | `ScalingSchedule` and `ClusterScalingSchedule` | `>=1.16` |
### Ramp-up and ramp-down feature
To avoid abrupt scaling due to time based metrics,the `SchalingSchedule`
collector has a feature of ramp-up and ramp-down the metric over a
specific period of time. The duration of the scaling window can be
configured individually in the `[Cluster]ScalingSchedule` object, via
the option `scalingWindowDurationMinutes` or globally for all scheduled
events, and defaults to a globally configured value if not specified.
The default for the latter is set to 10 minutes, but can be changed
using the `--scaling-schedule-default-scaling-window` flag.
This spreads the scale events around, creating less load on the other
components, and helping the rest of the metrics (like the CPU ones) to
adjust as well.
The [HPA algorithm][algo-details] does not make changes if the metric
change is less than the specified by the
`horizontal-pod-autoscaler-tolerance` flag:
> We'll skip scaling if the ratio is sufficiently close to 1.0 (within a
> globally-configurable tolerance, from the
> `--horizontal-pod-autoscaler-tolerance` flag, which defaults to 0.1.
With that in mind, the ramp-up and ramp-down feature divides the scaling
over the specified period of time in buckets, trying to achieve changes
bigger than the configured tolerance. The number of buckets defaults to
10 and can be configured by the `--scaling-schedule-ramp-steps` flag.
**Important**: note that the ramp-up and ramp-down feature can lead to
deployments achieving less than the specified number of pods, due to the
HPA 10% change rule and the ceiling function applied to the desired
number of the pods (check the [algorithm details][algo-details]). It
varies with the configured metric for `ScalingSchedule` events, the
number of pods and the configured `horizontal-pod-autoscaler-tolerance`
flag of your kubernetes installation. [This gist][gist] contains the code to
simulate the situations a deployment with different number of pods, with
a metric of 10000 can face with 10 buckets (max of 90% of the metric
returned) and 5 buckets (max of 80% of the metric returned). The ramp-up
and ramp-down feature can be disabled by setting
`--scaling-schedule-default-scaling-window` to 0 and abrupt scalings can
be handled via [scaling policies][policies].
[algo-details]: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#algorithm-details
[gist]: https://gist.github.com/jonathanbeber/37f1f918ab7ef6101c6ce56cc2cef3a2
[policies]: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#scaling-policies
### Example
This is an example of using the ScalingSchedule collectors to collect
@ -826,8 +877,3 @@ Note that these number of pods are just considering these custom
metrics, the normal HPA behavior still applies, such as: in case of
multiple metrics the biggest number of pods is the utilized one, HPA max
and min replica configuration, autoscaling policies, etc.
These collectors are disabled by default, you have to start the server
with the `--scaling-schedule` flag to enable it. Remember to deploy the CRDs
`ScalingSchedule` and `ClusterScalingSchedule` and allow the service
account used by the server to read, watch and list them.

View File

@ -37,6 +37,11 @@ spec:
spec:
description: ScalingScheduleSpec is the spec part of the ScalingSchedule.
properties:
scalingWindowDurationMinutes:
description: Fade the scheduled values in and out over this many minutes.
If unset, the default per-cluster value will be used.
format: int64
type: integer
schedules:
description: Schedules is the list of schedules for this ScalingSchedule
resource. All the schedules defined here will result on the value
@ -96,6 +101,7 @@ spec:
value:
description: The metric value that will be returned for the
defined schedule.
format: int64
type: integer
required:
- durationMinutes

View File

@ -37,6 +37,11 @@ spec:
spec:
description: ScalingScheduleSpec is the spec part of the ScalingSchedule.
properties:
scalingWindowDurationMinutes:
description: Fade the scheduled values in and out over this many minutes.
If unset, the default per-cluster value will be used.
format: int64
type: integer
schedules:
description: Schedules is the list of schedules for this ScalingSchedule
resource. All the schedules defined here will result on the value
@ -96,6 +101,7 @@ spec:
value:
description: The metric value that will be returned for the
defined schedule.
format: int64
type: integer
required:
- durationMinutes

View File

@ -1,6 +1,8 @@
package v1
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -37,6 +39,10 @@ type ClusterScalingSchedule struct {
// ScalingScheduleSpec is the spec part of the ScalingSchedule.
// +k8s:deepcopy-gen=true
type ScalingScheduleSpec struct {
// Fade the scheduled values in and out over this many minutes. If unset, the default per-cluster value will be used.
// +optional
ScalingWindowDurationMinutes *int64 `json:"scalingWindowDurationMinutes,omitempty"`
// Schedules is the list of schedules for this ScalingSchedule
// resource. All the schedules defined here will result on the value
// to the same metric. New metrics require a new ScalingSchedule
@ -59,7 +65,11 @@ type Schedule struct {
// returned for the defined schedule.
DurationMinutes int `json:"durationMinutes"`
// The metric value that will be returned for the defined schedule.
Value int `json:"value"`
Value int64 `json:"value"`
}
func (in Schedule) Duration() time.Duration {
return time.Duration(in.DurationMinutes) * time.Minute
}
// Defines if the schedule is a OneTime schedule or

View File

@ -147,6 +147,11 @@ func (in *ScalingScheduleList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ScalingScheduleSpec) DeepCopyInto(out *ScalingScheduleSpec) {
*out = *in
if in.ScalingWindowDurationMinutes != nil {
in, out := &in.ScalingWindowDurationMinutes, &out.ScalingWindowDurationMinutes
*out = new(int64)
**out = **in
}
if in.Schedules != nil {
in, out := &in.Schedules, &out.Schedules
*out = make([]Schedule, len(*in))

View File

@ -72,10 +72,25 @@ func (g *JSONPathMetricsGetter) GetMetric(metricsURL url.URL) (float64, error) {
return 0, err
}
if len(nodes) != 1 {
if len(nodes) == 0 {
return 0, fmt.Errorf("unexpected json: expected single numeric or array value")
}
if len(nodes) > 1 {
if g.aggregator == nil {
return 0, fmt.Errorf("no aggregator function has been specified")
}
values := make([]float64, 0, len(nodes))
for _, node := range nodes {
v, err := node.GetNumeric()
if err != nil {
return 0, fmt.Errorf("unexpected json: did not find numeric or array value '%s': %w", nodes, err)
}
values = append(values, v)
}
return g.aggregator(values...), nil
}
node := nodes[0]
if node.IsArray() {
if g.aggregator == nil {

View File

@ -51,6 +51,13 @@ func TestJSONPathMetricsGetter(t *testing.T) {
result: 5,
aggregator: Average,
},
{
name: "glob array query",
jsonResponse: []byte(`{"worker_status":[{"last_status":{"backlog":3}},{"last_status":{"backlog":7}}]}`),
jsonPath: "$.worker_status.[*].last_status.backlog",
result: 5,
aggregator: Average,
},
{
name: "json path not resulting in array or number should lead to error",
jsonResponse: []byte(`{"metric.value":5}`),

View File

@ -3,6 +3,7 @@ package collector
import (
"errors"
"fmt"
"math"
"time"
v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1"
@ -78,30 +79,38 @@ type Store interface {
// ScalingScheduleCollectorPlugin is a collector plugin for initializing metrics
// collectors for getting ScalingSchedule configured metrics.
type ScalingScheduleCollectorPlugin struct {
store Store
now Now
store Store
now Now
defaultScalingWindow time.Duration
rampSteps int
}
// ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics
// collectors for getting ClusterScalingSchedule configured metrics.
type ClusterScalingScheduleCollectorPlugin struct {
store Store
now Now
store Store
now Now
defaultScalingWindow time.Duration
rampSteps int
}
// NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin.
func NewScalingScheduleCollectorPlugin(store Store, now Now) (*ScalingScheduleCollectorPlugin, error) {
func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ScalingScheduleCollectorPlugin, error) {
return &ScalingScheduleCollectorPlugin{
store: store,
now: now,
store: store,
now: now,
defaultScalingWindow: defaultScalingWindow,
rampSteps: rampSteps,
}, nil
}
// NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin.
func NewClusterScalingScheduleCollectorPlugin(store Store, now Now) (*ClusterScalingScheduleCollectorPlugin, error) {
func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ClusterScalingScheduleCollectorPlugin, error) {
return &ClusterScalingScheduleCollectorPlugin{
store: store,
now: now,
store: store,
now: now,
defaultScalingWindow: defaultScalingWindow,
rampSteps: rampSteps,
}, nil
}
@ -109,14 +118,14 @@ func NewClusterScalingScheduleCollectorPlugin(store Store, now Now) (*ClusterSca
// specified HPA. It's the only required method to implement the
// collector.CollectorPlugin interface.
func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewScalingScheduleCollector(c.store, c.now, hpa, config, interval)
return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval)
}
// NewCollector initializes a new cluster wide scaling schedule
// collector from the specified HPA. It's the only required method to
// implement the collector.CollectorPlugin interface.
func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewClusterScalingScheduleCollector(c.store, c.now, hpa, config, interval)
return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval)
}
// ScalingScheduleCollector is a metrics collector for time based
@ -135,41 +144,47 @@ type ClusterScalingScheduleCollector struct {
// struct used by both ClusterScalingScheduleCollector and the
// ScalingScheduleCollector.
type scalingScheduleCollector struct {
store Store
now Now
metric autoscalingv2.MetricIdentifier
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2.HorizontalPodAutoscaler
interval time.Duration
config MetricConfig
store Store
now Now
metric autoscalingv2.MetricIdentifier
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2.HorizontalPodAutoscaler
interval time.Duration
config MetricConfig
defaultScalingWindow time.Duration
rampSteps int
}
// NewScalingScheduleCollector initializes a new ScalingScheduleCollector.
func NewScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) {
func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) {
return &ScalingScheduleCollector{
scalingScheduleCollector{
store: store,
now: now,
objectReference: config.ObjectReference,
hpa: hpa,
metric: config.Metric,
interval: interval,
config: *config,
store: store,
now: now,
objectReference: config.ObjectReference,
hpa: hpa,
metric: config.Metric,
interval: interval,
config: *config,
defaultScalingWindow: defaultScalingWindow,
rampSteps: rampSteps,
},
}, nil
}
// NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector.
func NewClusterScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) {
func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) {
return &ClusterScalingScheduleCollector{
scalingScheduleCollector{
store: store,
now: now,
objectReference: config.ObjectReference,
hpa: hpa,
metric: config.Metric,
interval: interval,
config: *config,
store: store,
now: now,
objectReference: config.ObjectReference,
hpa: hpa,
metric: config.Metric,
interval: interval,
config: *config,
defaultScalingWindow: defaultScalingWindow,
rampSteps: rampSteps,
},
}, nil
}
@ -188,7 +203,7 @@ func (c *ScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error) {
if !ok {
return nil, ErrNotScalingScheduleFound
}
return calculateMetrics(scalingSchedule.Spec.Schedules, c.now(), c.objectReference, c.metric)
return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric)
}
// GetMetrics is the main implementation for collector.Collector interface
@ -221,7 +236,7 @@ func (c *ClusterScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error
clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule)
}
return calculateMetrics(clusterScalingSchedule.Spec.Schedules, c.now(), c.objectReference, c.metric)
return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric)
}
// Interval returns the interval at which the collector should run.
@ -234,9 +249,17 @@ func (c *ClusterScalingScheduleCollector) Interval() time.Duration {
return c.interval
}
func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) {
value := 0
for _, schedule := range schedules {
func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, rampSteps int, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) {
scalingWindowDuration := defaultScalingWindow
if spec.ScalingWindowDurationMinutes != nil {
scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute
}
if scalingWindowDuration < 0 {
return nil, fmt.Errorf("scaling window duration cannot be negative")
}
value := int64(0)
for _, schedule := range spec.Schedules {
switch schedule.Type {
case v1.RepeatingSchedule:
location, err := time.LoadLocation(schedule.Period.Timezone)
@ -269,9 +292,7 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
parsedStartTime.Nanosecond(),
location,
)
if within(now, scheduledTime, schedule.DurationMinutes) && schedule.Value > value {
value = schedule.Value
}
value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value))
break
}
}
@ -280,9 +301,8 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
if err != nil {
return nil, ErrInvalidScheduleDate
}
if within(now, scheduledTime, schedule.DurationMinutes) && schedule.Value > value {
value = schedule.Value
}
value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value))
}
}
@ -293,17 +313,56 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
Custom: custom_metrics.MetricValue{
DescribedObject: objectReference,
Timestamp: metav1.Time{Time: now},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
Value: *resource.NewMilliQuantity(value*1000, resource.DecimalSI),
Metric: custom_metrics.MetricIdentifier(metric),
},
},
}, nil
}
// within receive two time.Time and a number of minutes. It returns true
// if the first given time, instant, is within the period of the second
// given time (start) plus the given number of minutes.
func within(instant, start time.Time, minutes int) bool {
return (instant.After(start) || instant.Equal(start)) &&
instant.Before(start.Add(time.Duration(minutes)*time.Minute))
func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 {
scaleUpStart := startTime.Add(-scalingWindowDuration)
endTime := startTime.Add(entryDuration)
scaleUpEnd := endTime.Add(scalingWindowDuration)
if between(timestamp, startTime, endTime) {
return value
}
if between(timestamp, scaleUpStart, startTime) {
return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, rampSteps, value)
}
if between(timestamp, endTime, scaleUpEnd) {
return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, rampSteps, value)
}
return 0
}
// The HPA has a rule to do not scale up or down if the change in the
// metric is less than 10% (by default) of the current value. We will
// use buckets of time using the floor of each as the returned metric.
// Any config greater or equal to 10 buckets must guarantee changes
// bigger than 10%.
func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 {
if scalingWindowDuration == 0 {
return 0
}
steps := float64(rampSteps)
requiredPercentage := math.Abs(float64(timestamp.Sub(startTime))) / float64(scalingWindowDuration)
return int64(math.Floor(requiredPercentage*steps) * (float64(value) / steps))
}
func between(timestamp, start, end time.Time) bool {
if timestamp.Before(start) {
return false
}
return timestamp.Before(end)
}
func maxInt64(i1, i2 int64) int64 {
if i1 > i2 {
return i1
}
return i2
}

View File

@ -12,7 +12,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const hHMMFormat = "15:04"
const (
hHMMFormat = "15:04"
defaultScalingWindowDuration = 1 * time.Minute
defaultRampSteps = 10
)
type schedule struct {
kind string
@ -21,7 +25,7 @@ type schedule struct {
days []v1.ScheduleDay
timezone string
duration int
value int
value int64
}
func TestScalingScheduleCollector(t *testing.T) {
@ -37,11 +41,15 @@ func TestScalingScheduleCollector(t *testing.T) {
return uTCNow
}
tenMinutes := int64(10)
for _, tc := range []struct {
msg string
schedules []schedule
expectedValue int
err error
msg string
schedules []schedule
scalingWindowDurationMinutes *int64
expectedValue int64
err error
rampSteps int
}{
{
msg: "Return the right value for one time config",
@ -80,7 +88,70 @@ func TestScalingScheduleCollector(t *testing.T) {
expectedValue: 100,
},
{
msg: "Return the default value (0) for one time config - 30 seconds after",
msg: "Return the scaled value (60) for one time config - 20 seconds before starting",
schedules: []schedule{
{
date: nowTime.Add(time.Second * 20).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 60,
},
{
msg: "Return the scaled value (60) for one time config - 20 seconds after",
schedules: []schedule{
{
date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 20).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 60,
},
{
msg: "10 steps (default) return 90% of the metric, even 1 second before",
schedules: []schedule{
{
date: nowTime.Add(time.Second * 1).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 90,
},
{
msg: "5 steps return 80% of the metric, even 1 second before",
schedules: []schedule{
{
date: nowTime.Add(time.Second * 1).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 80,
rampSteps: 5,
},
{
msg: "Return the scaled value (90) for one time config with a custom scaling window - 30 seconds before starting",
scalingWindowDurationMinutes: &tenMinutes,
schedules: []schedule{
{
date: nowTime.Add(time.Second * 30).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 90,
},
{
msg: "Return the scaled value (90) for one time config with a custom scaling window - 30 seconds after",
scalingWindowDurationMinutes: &tenMinutes,
schedules: []schedule{
{
date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 30).Format(time.RFC3339),
@ -89,7 +160,7 @@ func TestScalingScheduleCollector(t *testing.T) {
value: 100,
},
},
expectedValue: 0,
expectedValue: 90,
},
{
msg: "Return the default value (0) for one time config not started yet (20 minutes before)",
@ -427,17 +498,22 @@ func TestScalingScheduleCollector(t *testing.T) {
scalingScheduleName := "my_scaling_schedule"
namespace := "default"
rampSteps := tc.rampSteps
if rampSteps == 0 {
rampSteps = defaultRampSteps
}
schedules := getSchedules(tc.schedules)
store := newMockStore(scalingScheduleName, namespace, schedules)
plugin, err := NewScalingScheduleCollectorPlugin(store, now)
store := newMockStore(scalingScheduleName, namespace, tc.scalingWindowDurationMinutes, schedules)
plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration, rampSteps)
require.NoError(t, err)
clusterStore := newClusterMockStore(scalingScheduleName, schedules)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now)
clusterStore := newClusterMockStore(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration, rampSteps)
require.NoError(t, err)
clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, schedules)
clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now)
clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules)
clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration, rampSteps)
require.NoError(t, err)
hpa := makeScalingScheduleHPA(namespace, scalingScheduleName)
@ -505,14 +581,14 @@ func TestScalingScheduleObjectNotPresentReturnsError(t *testing.T) {
make(map[string]interface{}),
getByKeyFn,
}
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now)
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
require.NoError(t, err)
clusterStore := mockStore{
make(map[string]interface{}),
getByKeyFn,
}
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration, defaultRampSteps)
require.NoError(t, err)
hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName")
@ -567,10 +643,10 @@ func TestReturnsErrorWhenStoreDoes(t *testing.T) {
},
}
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now)
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
require.NoError(t, err)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
require.NoError(t, err)
hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName")
@ -615,7 +691,7 @@ func getByKeyFn(d map[string]interface{}, key string) (item interface{}, exists
return item, exists, nil
}
func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore {
func newMockStore(name, namespace string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
return mockStore{
map[string]interface{}{
fmt.Sprintf("%s/%s", namespace, name): &v1.ScalingSchedule{
@ -623,7 +699,8 @@ func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore {
Name: name,
},
Spec: v1.ScalingScheduleSpec{
Schedules: schedules,
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
Schedules: schedules,
},
},
},
@ -631,7 +708,7 @@ func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore {
}
}
func newClusterMockStore(name string, schedules []v1.Schedule) mockStore {
func newClusterMockStore(name string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
return mockStore{
map[string]interface{}{
name: &v1.ClusterScalingSchedule{
@ -639,7 +716,8 @@ func newClusterMockStore(name string, schedules []v1.Schedule) mockStore {
Name: name,
},
Spec: v1.ScalingScheduleSpec{
Schedules: schedules,
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
Schedules: schedules,
},
},
},
@ -650,7 +728,7 @@ func newClusterMockStore(name string, schedules []v1.Schedule) mockStore {
// The cache.Store returns the v1.ClusterScalingSchedule items as
// v1.ScalingSchedule when it first lists it. When it's update it
// asserts it correctly to the v1.ClusterScalingSchedule type.
func newClusterMockStoreFirstRun(name string, schedules []v1.Schedule) mockStore {
func newClusterMockStoreFirstRun(name string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
return mockStore{
map[string]interface{}{
name: &v1.ScalingSchedule{
@ -658,7 +736,8 @@ func newClusterMockStoreFirstRun(name string, schedules []v1.Schedule) mockStore
Name: name,
},
Spec: v1.ScalingScheduleSpec{
Schedules: schedules,
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
Schedules: schedules,
},
},
},

View File

@ -128,6 +128,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.")
flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+
"whether to enable time-based ScalingSchedule metrics")
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
return cmd
}
@ -293,7 +295,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
)
go reflector.Run(ctx.Done())
clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now)
clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps)
if err != nil {
return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err)
}
@ -302,7 +304,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err)
}
plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now)
plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps)
if err != nil {
return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err)
}
@ -428,4 +430,8 @@ type AdapterServerOptions struct {
GCInterval time.Duration
// Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling.
ScalingScheduleMetrics bool
// Default ramp-up/ramp-down window duration for scheduled metrics
DefaultScheduledScalingWindow time.Duration
// Number of steps utilized during the rampup and rampdown for scheduled metrics
RampSteps int
}