Scheduled scaling: add an optional scaling window

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
This commit is contained in:
Alexey Ermakov
2021-09-23 19:47:03 +02:00
parent 4ba6b66441
commit c5411c74b7
7 changed files with 200 additions and 80 deletions
+6
View File
@@ -37,6 +37,11 @@ spec:
spec: spec:
description: ScalingScheduleSpec is the spec part of the ScalingSchedule. description: ScalingScheduleSpec is the spec part of the ScalingSchedule.
properties: properties:
scalingWindowDurationMinutes:
description: Fade the scheduled values in and out over this many minutes.
If unset, the default per-cluster value will be used.
format: int64
type: integer
schedules: schedules:
description: Schedules is the list of schedules for this ScalingSchedule description: Schedules is the list of schedules for this ScalingSchedule
resource. All the schedules defined here will result on the value resource. All the schedules defined here will result on the value
@@ -96,6 +101,7 @@ spec:
value: value:
description: The metric value that will be returned for the description: The metric value that will be returned for the
defined schedule. defined schedule.
format: int64
type: integer type: integer
required: required:
- durationMinutes - durationMinutes
+6
View File
@@ -37,6 +37,11 @@ spec:
spec: spec:
description: ScalingScheduleSpec is the spec part of the ScalingSchedule. description: ScalingScheduleSpec is the spec part of the ScalingSchedule.
properties: properties:
scalingWindowDurationMinutes:
description: Fade the scheduled values in and out over this many minutes.
If unset, the default per-cluster value will be used.
format: int64
type: integer
schedules: schedules:
description: Schedules is the list of schedules for this ScalingSchedule description: Schedules is the list of schedules for this ScalingSchedule
resource. All the schedules defined here will result on the value resource. All the schedules defined here will result on the value
@@ -96,6 +101,7 @@ spec:
value: value:
description: The metric value that will be returned for the description: The metric value that will be returned for the
defined schedule. defined schedule.
format: int64
type: integer type: integer
required: required:
- durationMinutes - durationMinutes
+11 -1
View File
@@ -1,6 +1,8 @@
package v1 package v1
import ( import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
@@ -37,6 +39,10 @@ type ClusterScalingSchedule struct {
// ScalingScheduleSpec is the spec part of the ScalingSchedule. // ScalingScheduleSpec is the spec part of the ScalingSchedule.
// +k8s:deepcopy-gen=true // +k8s:deepcopy-gen=true
type ScalingScheduleSpec struct { type ScalingScheduleSpec struct {
// Fade the scheduled values in and out over this many minutes. If unset, the default per-cluster value will be used.
// +optional
ScalingWindowDurationMinutes *int64 `json:"scalingWindowDurationMinutes,omitempty"`
// Schedules is the list of schedules for this ScalingSchedule // Schedules is the list of schedules for this ScalingSchedule
// resource. All the schedules defined here will result on the value // resource. All the schedules defined here will result on the value
// to the same metric. New metrics require a new ScalingSchedule // to the same metric. New metrics require a new ScalingSchedule
@@ -59,7 +65,11 @@ type Schedule struct {
// returned for the defined schedule. // returned for the defined schedule.
DurationMinutes int `json:"durationMinutes"` DurationMinutes int `json:"durationMinutes"`
// The metric value that will be returned for the defined schedule. // The metric value that will be returned for the defined schedule.
Value int `json:"value"` Value int64 `json:"value"`
}
func (in Schedule) Duration() time.Duration {
return time.Duration(in.DurationMinutes) * time.Minute
} }
// Defines if the schedule is a OneTime schedule or // Defines if the schedule is a OneTime schedule or
@@ -147,6 +147,11 @@ func (in *ScalingScheduleList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ScalingScheduleSpec) DeepCopyInto(out *ScalingScheduleSpec) { func (in *ScalingScheduleSpec) DeepCopyInto(out *ScalingScheduleSpec) {
*out = *in *out = *in
if in.ScalingWindowDurationMinutes != nil {
in, out := &in.ScalingWindowDurationMinutes, &out.ScalingWindowDurationMinutes
*out = new(int64)
**out = **in
}
if in.Schedules != nil { if in.Schedules != nil {
in, out := &in.Schedules, &out.Schedules in, out := &in.Schedules, &out.Schedules
*out = make([]Schedule, len(*in)) *out = make([]Schedule, len(*in))
+67 -24
View File
@@ -3,6 +3,7 @@ package collector
import ( import (
"errors" "errors"
"fmt" "fmt"
"math"
"time" "time"
v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1" v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1"
@@ -80,6 +81,7 @@ type Store interface {
type ScalingScheduleCollectorPlugin struct { type ScalingScheduleCollectorPlugin struct {
store Store store Store
now Now now Now
defaultScalingWindow time.Duration
} }
// ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics // ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics
@@ -87,21 +89,24 @@ type ScalingScheduleCollectorPlugin struct {
type ClusterScalingScheduleCollectorPlugin struct { type ClusterScalingScheduleCollectorPlugin struct {
store Store store Store
now Now now Now
defaultScalingWindow time.Duration
} }
// NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin. // NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin.
func NewScalingScheduleCollectorPlugin(store Store, now Now) (*ScalingScheduleCollectorPlugin, error) { func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ScalingScheduleCollectorPlugin, error) {
return &ScalingScheduleCollectorPlugin{ return &ScalingScheduleCollectorPlugin{
store: store, store: store,
now: now, now: now,
defaultScalingWindow: defaultScalingWindow,
}, nil }, nil
} }
// NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin. // NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin.
func NewClusterScalingScheduleCollectorPlugin(store Store, now Now) (*ClusterScalingScheduleCollectorPlugin, error) { func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ClusterScalingScheduleCollectorPlugin, error) {
return &ClusterScalingScheduleCollectorPlugin{ return &ClusterScalingScheduleCollectorPlugin{
store: store, store: store,
now: now, now: now,
defaultScalingWindow: defaultScalingWindow,
}, nil }, nil
} }
@@ -109,14 +114,14 @@ func NewClusterScalingScheduleCollectorPlugin(store Store, now Now) (*ClusterSca
// specified HPA. It's the only required method to implement the // specified HPA. It's the only required method to implement the
// collector.CollectorPlugin interface. // collector.CollectorPlugin interface.
func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewScalingScheduleCollector(c.store, c.now, hpa, config, interval) return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval)
} }
// NewCollector initializes a new cluster wide scaling schedule // NewCollector initializes a new cluster wide scaling schedule
// collector from the specified HPA. It's the only required method to // collector from the specified HPA. It's the only required method to
// implement the collector.CollectorPlugin interface. // implement the collector.CollectorPlugin interface.
func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewClusterScalingScheduleCollector(c.store, c.now, hpa, config, interval) return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval)
} }
// ScalingScheduleCollector is a metrics collector for time based // ScalingScheduleCollector is a metrics collector for time based
@@ -142,10 +147,11 @@ type scalingScheduleCollector struct {
hpa *autoscalingv2.HorizontalPodAutoscaler hpa *autoscalingv2.HorizontalPodAutoscaler
interval time.Duration interval time.Duration
config MetricConfig config MetricConfig
defaultScalingWindow time.Duration
} }
// NewScalingScheduleCollector initializes a new ScalingScheduleCollector. // NewScalingScheduleCollector initializes a new ScalingScheduleCollector.
func NewScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) {
return &ScalingScheduleCollector{ return &ScalingScheduleCollector{
scalingScheduleCollector{ scalingScheduleCollector{
store: store, store: store,
@@ -155,12 +161,13 @@ func NewScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.Horizo
metric: config.Metric, metric: config.Metric,
interval: interval, interval: interval,
config: *config, config: *config,
defaultScalingWindow: defaultScalingWindow,
}, },
}, nil }, nil
} }
// NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector. // NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector.
func NewClusterScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) {
return &ClusterScalingScheduleCollector{ return &ClusterScalingScheduleCollector{
scalingScheduleCollector{ scalingScheduleCollector{
store: store, store: store,
@@ -170,6 +177,7 @@ func NewClusterScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2
metric: config.Metric, metric: config.Metric,
interval: interval, interval: interval,
config: *config, config: *config,
defaultScalingWindow: defaultScalingWindow,
}, },
}, nil }, nil
} }
@@ -188,7 +196,7 @@ func (c *ScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error) {
if !ok { if !ok {
return nil, ErrNotScalingScheduleFound return nil, ErrNotScalingScheduleFound
} }
return calculateMetrics(scalingSchedule.Spec.Schedules, c.now(), c.objectReference, c.metric) return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric)
} }
// GetMetrics is the main implementation for collector.Collector interface // GetMetrics is the main implementation for collector.Collector interface
@@ -221,7 +229,7 @@ func (c *ClusterScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error
clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule) clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule)
} }
return calculateMetrics(clusterScalingSchedule.Spec.Schedules, c.now(), c.objectReference, c.metric) return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric)
} }
// Interval returns the interval at which the collector should run. // Interval returns the interval at which the collector should run.
@@ -234,9 +242,17 @@ func (c *ClusterScalingScheduleCollector) Interval() time.Duration {
return c.interval return c.interval
} }
func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) {
value := 0 scalingWindowDuration := defaultScalingWindow
for _, schedule := range schedules { if spec.ScalingWindowDurationMinutes != nil {
scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute
}
if scalingWindowDuration < 0 {
return nil, fmt.Errorf("scaling window duration cannot be negative")
}
value := int64(0)
for _, schedule := range spec.Schedules {
switch schedule.Type { switch schedule.Type {
case v1.RepeatingSchedule: case v1.RepeatingSchedule:
location, err := time.LoadLocation(schedule.Period.Timezone) location, err := time.LoadLocation(schedule.Period.Timezone)
@@ -269,9 +285,7 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
parsedStartTime.Nanosecond(), parsedStartTime.Nanosecond(),
location, location,
) )
if within(now, scheduledTime, schedule.DurationMinutes) && schedule.Value > value { value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value))
value = schedule.Value
}
break break
} }
} }
@@ -280,9 +294,8 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
if err != nil { if err != nil {
return nil, ErrInvalidScheduleDate return nil, ErrInvalidScheduleDate
} }
if within(now, scheduledTime, schedule.DurationMinutes) && schedule.Value > value {
value = schedule.Value value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value))
}
} }
} }
@@ -293,17 +306,47 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
Custom: custom_metrics.MetricValue{ Custom: custom_metrics.MetricValue{
DescribedObject: objectReference, DescribedObject: objectReference,
Timestamp: metav1.Time{Time: now}, Timestamp: metav1.Time{Time: now},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI), Value: *resource.NewMilliQuantity(value*1000, resource.DecimalSI),
Metric: custom_metrics.MetricIdentifier(metric), Metric: custom_metrics.MetricIdentifier(metric),
}, },
}, },
}, nil }, nil
} }
// within receive two time.Time and a number of minutes. It returns true func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, value int64) int64 {
// if the first given time, instant, is within the period of the second scaleUpStart := startTime.Add(-scalingWindowDuration)
// given time (start) plus the given number of minutes. endTime := startTime.Add(entryDuration)
func within(instant, start time.Time, minutes int) bool { scaleUpEnd := endTime.Add(scalingWindowDuration)
return (instant.After(start) || instant.Equal(start)) &&
instant.Before(start.Add(time.Duration(minutes)*time.Minute)) if between(timestamp, startTime, endTime) {
return value
}
if between(timestamp, scaleUpStart, startTime) {
return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, value)
}
if between(timestamp, endTime, scaleUpEnd) {
return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, value)
}
return 0
}
func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, value int64) int64 {
if scalingWindowDuration == 0 {
return 0
}
return int64(math.Ceil(math.Abs(float64(timestamp.Sub(startTime))) / float64(scalingWindowDuration) * float64(value)))
}
func between(timestamp, start, end time.Time) bool {
if timestamp.Before(start) {
return false
}
return timestamp.Before(end)
}
func maxInt64(i1, i2 int64) int64 {
if i1 > i2 {
return i1
}
return i2
} }
@@ -12,7 +12,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
const hHMMFormat = "15:04" const (
hHMMFormat = "15:04"
defaultScalingWindowDuration = 1 * time.Minute
)
type schedule struct { type schedule struct {
kind string kind string
@@ -21,7 +24,7 @@ type schedule struct {
days []v1.ScheduleDay days []v1.ScheduleDay
timezone string timezone string
duration int duration int
value int value int64
} }
func TestScalingScheduleCollector(t *testing.T) { func TestScalingScheduleCollector(t *testing.T) {
@@ -37,10 +40,13 @@ func TestScalingScheduleCollector(t *testing.T) {
return uTCNow return uTCNow
} }
tenMinutes := int64(10)
for _, tc := range []struct { for _, tc := range []struct {
msg string msg string
schedules []schedule schedules []schedule
expectedValue int scalingWindowDurationMinutes *int64
expectedValue int64
err error err error
}{ }{
{ {
@@ -80,7 +86,45 @@ func TestScalingScheduleCollector(t *testing.T) {
expectedValue: 100, expectedValue: 100,
}, },
{ {
msg: "Return the default value (0) for one time config - 30 seconds after", msg: "Return the scaled value (67) for one time config - 20 seconds before starting",
schedules: []schedule{
{
date: nowTime.Add(time.Second * 20).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 67,
},
{
msg: "Return the scaled value (67) for one time config - 20 seconds after",
schedules: []schedule{
{
date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 20).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 67,
},
{
msg: "Return the scaled value (95) for one time config with a custom scaling window - 30 seconds before starting",
scalingWindowDurationMinutes: &tenMinutes,
schedules: []schedule{
{
date: nowTime.Add(time.Second * 30).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 95,
},
{
msg: "Return the scaled value (95) for one time config with a custom scaling window - 30 seconds after",
scalingWindowDurationMinutes: &tenMinutes,
schedules: []schedule{ schedules: []schedule{
{ {
date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 30).Format(time.RFC3339), date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 30).Format(time.RFC3339),
@@ -89,7 +133,7 @@ func TestScalingScheduleCollector(t *testing.T) {
value: 100, value: 100,
}, },
}, },
expectedValue: 0, expectedValue: 95,
}, },
{ {
msg: "Return the default value (0) for one time config not started yet (20 minutes before)", msg: "Return the default value (0) for one time config not started yet (20 minutes before)",
@@ -428,16 +472,16 @@ func TestScalingScheduleCollector(t *testing.T) {
namespace := "default" namespace := "default"
schedules := getSchedules(tc.schedules) schedules := getSchedules(tc.schedules)
store := newMockStore(scalingScheduleName, namespace, schedules) store := newMockStore(scalingScheduleName, namespace, tc.scalingWindowDurationMinutes, schedules)
plugin, err := NewScalingScheduleCollectorPlugin(store, now) plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration)
require.NoError(t, err) require.NoError(t, err)
clusterStore := newClusterMockStore(scalingScheduleName, schedules) clusterStore := newClusterMockStore(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now) clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration)
require.NoError(t, err) require.NoError(t, err)
clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, schedules) clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules)
clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now) clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration)
require.NoError(t, err) require.NoError(t, err)
hpa := makeScalingScheduleHPA(namespace, scalingScheduleName) hpa := makeScalingScheduleHPA(namespace, scalingScheduleName)
@@ -505,14 +549,14 @@ func TestScalingScheduleObjectNotPresentReturnsError(t *testing.T) {
make(map[string]interface{}), make(map[string]interface{}),
getByKeyFn, getByKeyFn,
} }
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now) plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration)
require.NoError(t, err) require.NoError(t, err)
clusterStore := mockStore{ clusterStore := mockStore{
make(map[string]interface{}), make(map[string]interface{}),
getByKeyFn, getByKeyFn,
} }
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now) clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration)
require.NoError(t, err) require.NoError(t, err)
hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName")
@@ -567,10 +611,10 @@ func TestReturnsErrorWhenStoreDoes(t *testing.T) {
}, },
} }
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now) plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration)
require.NoError(t, err) require.NoError(t, err)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now) clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration)
require.NoError(t, err) require.NoError(t, err)
hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName")
@@ -615,7 +659,7 @@ func getByKeyFn(d map[string]interface{}, key string) (item interface{}, exists
return item, exists, nil return item, exists, nil
} }
func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore { func newMockStore(name, namespace string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
return mockStore{ return mockStore{
map[string]interface{}{ map[string]interface{}{
fmt.Sprintf("%s/%s", namespace, name): &v1.ScalingSchedule{ fmt.Sprintf("%s/%s", namespace, name): &v1.ScalingSchedule{
@@ -623,6 +667,7 @@ func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore {
Name: name, Name: name,
}, },
Spec: v1.ScalingScheduleSpec{ Spec: v1.ScalingScheduleSpec{
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
Schedules: schedules, Schedules: schedules,
}, },
}, },
@@ -631,7 +676,7 @@ func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore {
} }
} }
func newClusterMockStore(name string, schedules []v1.Schedule) mockStore { func newClusterMockStore(name string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
return mockStore{ return mockStore{
map[string]interface{}{ map[string]interface{}{
name: &v1.ClusterScalingSchedule{ name: &v1.ClusterScalingSchedule{
@@ -639,6 +684,7 @@ func newClusterMockStore(name string, schedules []v1.Schedule) mockStore {
Name: name, Name: name,
}, },
Spec: v1.ScalingScheduleSpec{ Spec: v1.ScalingScheduleSpec{
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
Schedules: schedules, Schedules: schedules,
}, },
}, },
@@ -650,7 +696,7 @@ func newClusterMockStore(name string, schedules []v1.Schedule) mockStore {
// The cache.Store returns the v1.ClusterScalingSchedule items as // The cache.Store returns the v1.ClusterScalingSchedule items as
// v1.ScalingSchedule when it first lists it. When it's update it // v1.ScalingSchedule when it first lists it. When it's update it
// asserts it correctly to the v1.ClusterScalingSchedule type. // asserts it correctly to the v1.ClusterScalingSchedule type.
func newClusterMockStoreFirstRun(name string, schedules []v1.Schedule) mockStore { func newClusterMockStoreFirstRun(name string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
return mockStore{ return mockStore{
map[string]interface{}{ map[string]interface{}{
name: &v1.ScalingSchedule{ name: &v1.ScalingSchedule{
@@ -658,6 +704,7 @@ func newClusterMockStoreFirstRun(name string, schedules []v1.Schedule) mockStore
Name: name, Name: name,
}, },
Spec: v1.ScalingScheduleSpec{ Spec: v1.ScalingScheduleSpec{
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
Schedules: schedules, Schedules: schedules,
}, },
}, },
+5 -2
View File
@@ -128,6 +128,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.") flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.")
flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+ flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+
"whether to enable time-based ScalingSchedule metrics") "whether to enable time-based ScalingSchedule metrics")
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default scale-up/scale-down window duration for scheduled metrics")
return cmd return cmd
} }
@@ -293,7 +294,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
) )
go reflector.Run(ctx.Done()) go reflector.Run(ctx.Done())
clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now) clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow)
if err != nil { if err != nil {
return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err) return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err)
} }
@@ -302,7 +303,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err) return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err)
} }
plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now) plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow)
if err != nil { if err != nil {
return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err) return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err)
} }
@@ -428,4 +429,6 @@ type AdapterServerOptions struct {
GCInterval time.Duration GCInterval time.Duration
// Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling. // Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling.
ScalingScheduleMetrics bool ScalingScheduleMetrics bool
// Default scale-up/scale-down window duration for scheduled metrics
DefaultScheduledScalingWindow time.Duration
} }