From 0794873fcb0d4a9d592561b9ebd6e41136d9d0e4 Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Thu, 6 Apr 2023 17:46:22 +0200 Subject: [PATCH] Add scaling schedule controller to updating status Signed-off-by: Mikkel Oscar Lyderik Larsen --- docs/cluster_scaling_schedules_crd.yaml | 18 +- docs/rbac.yaml | 7 + docs/scaling_schedules_crd.yaml | 20 +- go.mod | 2 +- pkg/apis/zalando.org/v1/types.go | 20 ++ .../zalando.org/v1/zz_generated.deepcopy.go | 18 ++ .../zalando.org/v1/clusterscalingschedule.go | 16 + .../v1/fake/fake_clusterscalingschedule.go | 11 + .../v1/fake/fake_scalingschedule.go | 12 + .../typed/zalando.org/v1/scalingschedule.go | 17 + .../scheduledscaling/scheduled_scaling.go | 164 ++++++++++ .../scheduled_scaling_test.go | 299 ++++++++++++++++++ pkg/server/start.go | 8 + 13 files changed, 609 insertions(+), 3 deletions(-) create mode 100644 pkg/controller/scheduledscaling/scheduled_scaling_test.go diff --git a/docs/cluster_scaling_schedules_crd.yaml b/docs/cluster_scaling_schedules_crd.yaml index 6dd58d6..24dad0d 100644 --- a/docs/cluster_scaling_schedules_crd.yaml +++ b/docs/cluster_scaling_schedules_crd.yaml @@ -15,7 +15,12 @@ spec: singular: clusterscalingschedule scope: Cluster versions: - - name: v1 + - additionalPrinterColumns: + - description: Whether one or more schedules are currently active. + jsonPath: .status.active + name: Active + type: boolean + name: v1 schema: openAPIV3Schema: description: ClusterScalingSchedule describes a cluster scoped time based @@ -119,11 +124,22 @@ spec: required: - schedules type: object + status: + description: ScalingScheduleStatus is the status section of the ScalingSchedule. + properties: + active: + default: false + description: Active is true if at least one of the schedules defined + in the scaling schedule is currently active. + type: boolean + type: object required: - spec type: object served: true storage: true + subresources: + status: {} status: acceptedNames: kind: "" diff --git a/docs/rbac.yaml b/docs/rbac.yaml index 9e6d765..33133a2 100644 --- a/docs/rbac.yaml +++ b/docs/rbac.yaml @@ -97,6 +97,13 @@ rules: - get - list - watch +- apiGroups: + - zalando.org + resources: + - clusterscalingschedules/status + - scalingschedules/status + verbs: + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/docs/scaling_schedules_crd.yaml b/docs/scaling_schedules_crd.yaml index 4b8448e..d22ee1a 100644 --- a/docs/scaling_schedules_crd.yaml +++ b/docs/scaling_schedules_crd.yaml @@ -9,13 +9,20 @@ metadata: spec: group: zalando.org names: + categories: + - all kind: ScalingSchedule listKind: ScalingScheduleList plural: scalingschedules singular: scalingschedule scope: Namespaced versions: - - name: v1 + - additionalPrinterColumns: + - description: Whether one or more schedules are currently active. + jsonPath: .status.active + name: Active + type: boolean + name: v1 schema: openAPIV3Schema: description: ScalingSchedule describes a namespaced time based metric to be @@ -119,11 +126,22 @@ spec: required: - schedules type: object + status: + description: ScalingScheduleStatus is the status section of the ScalingSchedule. + properties: + active: + default: false + description: Active is true if at least one of the schedules defined + in the scaling schedule is currently active. + type: boolean + type: object required: - spec type: object served: true storage: true + subresources: + status: {} status: acceptedNames: kind: "" diff --git a/go.mod b/go.mod index a25e7a5..bc7c1b5 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20230223125308-aff25efae501 golang.org/x/net v0.8.0 golang.org/x/oauth2 v0.6.0 + golang.org/x/sync v0.1.0 k8s.io/api v0.23.0 k8s.io/apimachinery v0.23.0 k8s.io/apiserver v0.23.0 @@ -93,7 +94,6 @@ require ( go.uber.org/zap v1.21.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/mod v0.8.0 // indirect - golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.6.0 // indirect golang.org/x/term v0.6.0 // indirect golang.org/x/text v0.8.0 // indirect diff --git a/pkg/apis/zalando.org/v1/types.go b/pkg/apis/zalando.org/v1/types.go index 0ef3a47..4feb7e5 100644 --- a/pkg/apis/zalando.org/v1/types.go +++ b/pkg/apis/zalando.org/v1/types.go @@ -13,11 +13,16 @@ import ( // ScalingSchedule describes a namespaced time based metric to be used // in autoscaling operations. // +k8s:deepcopy-gen=true +// +kubebuilder:resource:categories=all +// +kubebuilder:printcolumn:name="Active",type=boolean,JSONPath=`.status.active`,description="Whether one or more schedules are currently active." +// +kubebuilder:subresource:status type ScalingSchedule struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec ScalingScheduleSpec `json:"spec"` + // +optional + Status ScalingScheduleStatus `json:"status"` } // +genclient @@ -28,12 +33,17 @@ type ScalingSchedule struct { // ClusterScalingSchedule describes a cluster scoped time based metric // to be used in autoscaling operations. // +k8s:deepcopy-gen=true +// +kubebuilder:resource:categories=all +// +kubebuilder:printcolumn:name="Active",type=boolean,JSONPath=`.status.active`,description="Whether one or more schedules are currently active." +// +kubebuilder:subresource:status // +kubebuilder:resource:scope=Cluster type ClusterScalingSchedule struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec ScalingScheduleSpec `json:"spec"` + // +optional + Status ScalingScheduleStatus `json:"status"` } // ScalingScheduleSpec is the spec part of the ScalingSchedule. @@ -125,6 +135,16 @@ const ( // +kubebuilder:validation:Format="date-time" type ScheduleDate string +// ScalingScheduleStatus is the status section of the ScalingSchedule. +// +k8s:deepcopy-gen=true +type ScalingScheduleStatus struct { + // Active is true if at least one of the schedules defined in the + // scaling schedule is currently active. + // +kubebuilder:default:=false + // +optional + Active bool `json:"active"` +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // ScalingScheduleList is a list of namespaced scaling schedules. diff --git a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go index d1a7c82..a7536e3 100644 --- a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go @@ -31,6 +31,7 @@ func (in *ClusterScalingSchedule) DeepCopyInto(out *ClusterScalingSchedule) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status return } @@ -91,6 +92,7 @@ func (in *ScalingSchedule) DeepCopyInto(out *ScalingSchedule) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status return } @@ -173,6 +175,22 @@ func (in *ScalingScheduleSpec) DeepCopy() *ScalingScheduleSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScalingScheduleStatus) DeepCopyInto(out *ScalingScheduleStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScalingScheduleStatus. +func (in *ScalingScheduleStatus) DeepCopy() *ScalingScheduleStatus { + if in == nil { + return nil + } + out := new(ScalingScheduleStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Schedule) DeepCopyInto(out *Schedule) { *out = *in diff --git a/pkg/client/clientset/versioned/typed/zalando.org/v1/clusterscalingschedule.go b/pkg/client/clientset/versioned/typed/zalando.org/v1/clusterscalingschedule.go index 4550841..8ee15ed 100644 --- a/pkg/client/clientset/versioned/typed/zalando.org/v1/clusterscalingschedule.go +++ b/pkg/client/clientset/versioned/typed/zalando.org/v1/clusterscalingschedule.go @@ -40,6 +40,7 @@ type ClusterScalingSchedulesGetter interface { type ClusterScalingScheduleInterface interface { Create(ctx context.Context, clusterScalingSchedule *v1.ClusterScalingSchedule, opts metav1.CreateOptions) (*v1.ClusterScalingSchedule, error) Update(ctx context.Context, clusterScalingSchedule *v1.ClusterScalingSchedule, opts metav1.UpdateOptions) (*v1.ClusterScalingSchedule, error) + UpdateStatus(ctx context.Context, clusterScalingSchedule *v1.ClusterScalingSchedule, opts metav1.UpdateOptions) (*v1.ClusterScalingSchedule, error) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.ClusterScalingSchedule, error) @@ -128,6 +129,21 @@ func (c *clusterScalingSchedules) Update(ctx context.Context, clusterScalingSche return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *clusterScalingSchedules) UpdateStatus(ctx context.Context, clusterScalingSchedule *v1.ClusterScalingSchedule, opts metav1.UpdateOptions) (result *v1.ClusterScalingSchedule, err error) { + result = &v1.ClusterScalingSchedule{} + err = c.client.Put(). + Resource("clusterscalingschedules"). + Name(clusterScalingSchedule.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(clusterScalingSchedule). + Do(ctx). + Into(result) + return +} + // Delete takes name of the clusterScalingSchedule and deletes it. Returns an error if one occurs. func (c *clusterScalingSchedules) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { return c.client.Delete(). diff --git a/pkg/client/clientset/versioned/typed/zalando.org/v1/fake/fake_clusterscalingschedule.go b/pkg/client/clientset/versioned/typed/zalando.org/v1/fake/fake_clusterscalingschedule.go index 9c2d4ec..2a82a16 100644 --- a/pkg/client/clientset/versioned/typed/zalando.org/v1/fake/fake_clusterscalingschedule.go +++ b/pkg/client/clientset/versioned/typed/zalando.org/v1/fake/fake_clusterscalingschedule.go @@ -96,6 +96,17 @@ func (c *FakeClusterScalingSchedules) Update(ctx context.Context, clusterScaling return obj.(*zalandoorgv1.ClusterScalingSchedule), err } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeClusterScalingSchedules) UpdateStatus(ctx context.Context, clusterScalingSchedule *zalandoorgv1.ClusterScalingSchedule, opts v1.UpdateOptions) (*zalandoorgv1.ClusterScalingSchedule, error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(clusterscalingschedulesResource, "status", clusterScalingSchedule), &zalandoorgv1.ClusterScalingSchedule{}) + if obj == nil { + return nil, err + } + return obj.(*zalandoorgv1.ClusterScalingSchedule), err +} + // Delete takes name of the clusterScalingSchedule and deletes it. Returns an error if one occurs. func (c *FakeClusterScalingSchedules) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { _, err := c.Fake. diff --git a/pkg/client/clientset/versioned/typed/zalando.org/v1/fake/fake_scalingschedule.go b/pkg/client/clientset/versioned/typed/zalando.org/v1/fake/fake_scalingschedule.go index 98d4ba2..ef924ef 100644 --- a/pkg/client/clientset/versioned/typed/zalando.org/v1/fake/fake_scalingschedule.go +++ b/pkg/client/clientset/versioned/typed/zalando.org/v1/fake/fake_scalingschedule.go @@ -102,6 +102,18 @@ func (c *FakeScalingSchedules) Update(ctx context.Context, scalingSchedule *zala return obj.(*zalandoorgv1.ScalingSchedule), err } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeScalingSchedules) UpdateStatus(ctx context.Context, scalingSchedule *zalandoorgv1.ScalingSchedule, opts v1.UpdateOptions) (*zalandoorgv1.ScalingSchedule, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(scalingschedulesResource, "status", c.ns, scalingSchedule), &zalandoorgv1.ScalingSchedule{}) + + if obj == nil { + return nil, err + } + return obj.(*zalandoorgv1.ScalingSchedule), err +} + // Delete takes name of the scalingSchedule and deletes it. Returns an error if one occurs. func (c *FakeScalingSchedules) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { _, err := c.Fake. diff --git a/pkg/client/clientset/versioned/typed/zalando.org/v1/scalingschedule.go b/pkg/client/clientset/versioned/typed/zalando.org/v1/scalingschedule.go index e371e79..933b105 100644 --- a/pkg/client/clientset/versioned/typed/zalando.org/v1/scalingschedule.go +++ b/pkg/client/clientset/versioned/typed/zalando.org/v1/scalingschedule.go @@ -40,6 +40,7 @@ type ScalingSchedulesGetter interface { type ScalingScheduleInterface interface { Create(ctx context.Context, scalingSchedule *v1.ScalingSchedule, opts metav1.CreateOptions) (*v1.ScalingSchedule, error) Update(ctx context.Context, scalingSchedule *v1.ScalingSchedule, opts metav1.UpdateOptions) (*v1.ScalingSchedule, error) + UpdateStatus(ctx context.Context, scalingSchedule *v1.ScalingSchedule, opts metav1.UpdateOptions) (*v1.ScalingSchedule, error) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.ScalingSchedule, error) @@ -135,6 +136,22 @@ func (c *scalingSchedules) Update(ctx context.Context, scalingSchedule *v1.Scali return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *scalingSchedules) UpdateStatus(ctx context.Context, scalingSchedule *v1.ScalingSchedule, opts metav1.UpdateOptions) (result *v1.ScalingSchedule, err error) { + result = &v1.ScalingSchedule{} + err = c.client.Put(). + Namespace(c.ns). + Resource("scalingschedules"). + Name(scalingSchedule.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(scalingSchedule). + Do(ctx). + Into(result) + return +} + // Delete takes name of the scalingSchedule and deletes it. Returns an error if one occurs. func (c *scalingSchedules) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { return c.client.Delete(). diff --git a/pkg/controller/scheduledscaling/scheduled_scaling.go b/pkg/controller/scheduledscaling/scheduled_scaling.go index 19c1c28..e665ecd 100644 --- a/pkg/controller/scheduledscaling/scheduled_scaling.go +++ b/pkg/controller/scheduledscaling/scheduled_scaling.go @@ -5,7 +5,12 @@ import ( "fmt" "time" + log "github.com/sirupsen/logrus" v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1" + zalandov1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/typed/zalando.org/v1" + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -45,6 +50,165 @@ var ( // std lib. It's used mainly for test/mock purposes. type now func() time.Time +type scalingScheduleStore interface { + List() []interface{} +} + +type Controller struct { + client zalandov1.ZalandoV1Interface + scalingScheduleStore scalingScheduleStore + clusterScalingScheduleStore scalingScheduleStore + now now + defaultScalingWindow time.Duration + defaultTimeZone string +} + +func NewController(client zalandov1.ZalandoV1Interface, scalingScheduleStore, clusterScalingScheduleStore scalingScheduleStore, now now, defaultScalingWindow time.Duration, defaultTimeZone string) *Controller { + return &Controller{ + client: client, + scalingScheduleStore: scalingScheduleStore, + clusterScalingScheduleStore: clusterScalingScheduleStore, + now: now, + defaultScalingWindow: defaultScalingWindow, + defaultTimeZone: defaultTimeZone, + } +} + +func (c *Controller) Run(ctx context.Context) { + log.Info("Running Scaling Schedule Controller") + + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := c.runOnce(ctx) + if err != nil { + log.Errorf("failed to run scheduled scaling controller loop: %v", err) + } + case <-ctx.Done(): + return + } + } +} + +func (c *Controller) runOnce(ctx context.Context) error { + // ScalingSchedules + var scalingGroup errgroup.Group + scalingGroup.SetLimit(10) + + schedules := c.scalingScheduleStore.List() + for _, scheduleInterface := range schedules { + schedule, ok := scheduleInterface.(*v1.ScalingSchedule) + if !ok { + return ErrNotScalingScheduleFound + } + schedule = schedule.DeepCopy() + + scalingGroup.Go(func() error { + active, err := c.scheduleActive(schedule.Spec) + if err != nil { + log.Errorf("Failed to check for active schedules in ScalingSchedule %s/%s: %v", schedule.Namespace, schedule.Name, err) + return nil + } + + if active != schedule.Status.Active { + schedule.Status.Active = active + _, err := c.client.ScalingSchedules(schedule.Namespace).UpdateStatus(ctx, schedule, metav1.UpdateOptions{}) + if err != nil { + log.Errorf("Failed to update status for ScalingSchedule %s/%s: %v", schedule.Namespace, schedule.Name, err) + return nil + } + + status := "inactive" + if active { + status = "active" + } + + log.Infof("Marked Scaling Schedule %s/%s as %s", schedule.Namespace, schedule.Name, status) + } + return nil + }) + } + + err := scalingGroup.Wait() + if err != nil { + return fmt.Errorf("failed waiting for cluster scaling schedules: %w", err) + } + + // ClusterScalingSchedules + var clusterScalingGroup errgroup.Group + clusterScalingGroup.SetLimit(10) + + clusterschedules := c.clusterScalingScheduleStore.List() + for _, scheduleInterface := range clusterschedules { + schedule, ok := scheduleInterface.(*v1.ClusterScalingSchedule) + if !ok { + return ErrNotScalingScheduleFound + } + schedule = schedule.DeepCopy() + + clusterScalingGroup.Go(func() error { + active, err := c.scheduleActive(schedule.Spec) + if err != nil { + log.Errorf("Failed to check for active schedules in ClusterScalingSchedule %s: %v", schedule.Name, err) + return nil + } + + if active != schedule.Status.Active { + schedule.Status.Active = active + _, err := c.client.ClusterScalingSchedules().UpdateStatus(ctx, schedule, metav1.UpdateOptions{}) + if err != nil { + log.Errorf("Failed to update status for ClusterScalingSchedule %s: %v", schedule.Name, err) + return nil + } + + status := "inactive" + if active { + status = "active" + } + + log.Infof("Marked Cluster Scaling Schedule %s as %s", schedule.Name, status) + } + return nil + }) + } + + err = clusterScalingGroup.Wait() + if err != nil { + return fmt.Errorf("failed waiting for cluster scaling schedules: %w", err) + } + + return nil +} + +func (c *Controller) scheduleActive(spec v1.ScalingScheduleSpec) (bool, error) { + scalingWindowDuration := c.defaultScalingWindow + if spec.ScalingWindowDurationMinutes != nil { + scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute + } + if scalingWindowDuration < 0 { + return false, fmt.Errorf("scaling window duration cannot be negative: %d", scalingWindowDuration) + } + + for _, schedule := range spec.Schedules { + startTime, endTime, err := ScheduleStartEnd(c.now(), schedule, c.defaultTimeZone) + if err != nil { + return false, err + } + + scalingStart := startTime.Add(-scalingWindowDuration) + scalingEnd := endTime.Add(scalingWindowDuration) + + if Between(c.now(), scalingStart, scalingEnd) { + return true, nil + } + } + + return false, nil +} + func ScheduleStartEnd(now time.Time, schedule v1.Schedule, defaultTimeZone string) (time.Time, time.Time, error) { var startTime, endTime time.Time switch schedule.Type { diff --git a/pkg/controller/scheduledscaling/scheduled_scaling_test.go b/pkg/controller/scheduledscaling/scheduled_scaling_test.go new file mode 100644 index 0000000..32a3c1c --- /dev/null +++ b/pkg/controller/scheduledscaling/scheduled_scaling_test.go @@ -0,0 +1,299 @@ +package scheduledscaling + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1" + scalingschedulefake "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/fake" + zalandov1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/typed/zalando.org/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + hHMMFormat = "15:04" +) + +type fakeClusterScalingScheduleStore struct { + client zalandov1.ZalandoV1Interface +} + +func (s fakeClusterScalingScheduleStore) List() []interface{} { + schedules, err := s.client.ClusterScalingSchedules().List(context.Background(), metav1.ListOptions{}) + if err != nil { + return nil + } + + objects := make([]interface{}, 0, len(schedules.Items)) + for _, schedule := range schedules.Items { + schedule := schedule + objects = append(objects, &schedule) + } + + return objects +} + +type fakeScalingScheduleStore struct { + client zalandov1.ZalandoV1Interface +} + +func (s fakeScalingScheduleStore) List() []interface{} { + schedules, err := s.client.ScalingSchedules(corev1.NamespaceAll).List(context.Background(), metav1.ListOptions{}) + if err != nil { + return nil + } + + objects := make([]interface{}, 0, len(schedules.Items)) + for _, schedule := range schedules.Items { + schedule := schedule + objects = append(objects, &schedule) + } + + return objects +} + +type schedule struct { + schedules []v1.Schedule + expectedActive bool + preActiveStatus bool +} + +func scheduleDate(date string) *v1.ScheduleDate { + d := v1.ScheduleDate(date) + return &d +} + +func TestRunOnce(t *testing.T) { + nowRFC3339 := "2009-11-10T23:00:00+01:00" // +01:00 is Berlin timezone (default) + nowTime, _ := time.Parse(time.RFC3339, nowRFC3339) + nowWeekday := v1.TuesdaySchedule + + // now func always return in UTC for test purposes + utc, _ := time.LoadLocation("UTC") + uTCNow := nowTime.In(utc) + // uTCNowRFC3339 := uTCNow.Format(time.RFC3339) + now := func() time.Time { + return uTCNow + } + // date := v1.ScheduleDate(schedule.date) + // endDate := v1.ScheduleDate(schedule.endDate) + for _, tc := range []struct { + msg string + schedules map[string]schedule + preActiveStatus bool + // scalingWindowDurationMinutes *int64 + // expectedValue int64 + // err error + // rampSteps int + }{ + { + msg: "OneTime Schedules", + schedules: map[string]schedule{ + "active": { + schedules: []v1.Schedule{ + { + Type: v1.OneTimeSchedule, + Date: scheduleDate(nowRFC3339), + DurationMinutes: 15, + }, + }, + expectedActive: true, + }, + "inactive": { + schedules: []v1.Schedule{ + { + Type: v1.OneTimeSchedule, + Date: scheduleDate(nowTime.Add(1 * time.Hour).Format(time.RFC3339)), + DurationMinutes: 15, + }, + }, + expectedActive: false, + }, + }, + }, + { + msg: "OneTime Schedules change active", + schedules: map[string]schedule{ + "active": { + schedules: []v1.Schedule{ + { + Type: v1.OneTimeSchedule, + Date: scheduleDate(nowRFC3339), + DurationMinutes: 15, + }, + }, + preActiveStatus: false, + expectedActive: true, + }, + "inactive": { + schedules: []v1.Schedule{ + { + Type: v1.OneTimeSchedule, + Date: scheduleDate(nowTime.Add(1 * time.Hour).Format(time.RFC3339)), + DurationMinutes: 15, + }, + }, + preActiveStatus: true, + expectedActive: false, + }, + }, + }, + { + msg: "Repeating Schedules", + schedules: map[string]schedule{ + "active": { + schedules: []v1.Schedule{ + { + Type: v1.RepeatingSchedule, + DurationMinutes: 15, + Period: &v1.SchedulePeriod{ + Days: []v1.ScheduleDay{nowWeekday}, + StartTime: nowTime.Format(hHMMFormat), + }, + }, + }, + expectedActive: true, + }, + "inactive": { + schedules: []v1.Schedule{ + { + Type: v1.RepeatingSchedule, + DurationMinutes: 15, + Period: &v1.SchedulePeriod{ + Days: []v1.ScheduleDay{nowWeekday}, + StartTime: nowTime.Add(1 * time.Hour).Format(hHMMFormat), + }, + }, + }, + expectedActive: false, + }, + }, + }, + { + msg: "Repeating Schedules change active", + schedules: map[string]schedule{ + "active": { + schedules: []v1.Schedule{ + { + Type: v1.RepeatingSchedule, + DurationMinutes: 15, + Period: &v1.SchedulePeriod{ + Days: []v1.ScheduleDay{nowWeekday}, + StartTime: nowTime.Format(hHMMFormat), + }, + }, + }, + preActiveStatus: false, + expectedActive: true, + }, + "inactive": { + schedules: []v1.Schedule{ + { + Type: v1.RepeatingSchedule, + DurationMinutes: 15, + Period: &v1.SchedulePeriod{ + Days: []v1.ScheduleDay{nowWeekday}, + StartTime: nowTime.Add(1 * time.Hour).Format(hHMMFormat), + }, + }, + }, + preActiveStatus: true, + expectedActive: false, + }, + }, + }, + } { + t.Run(tc.msg, func(t *testing.T) { + // setup fake client and cache + client := scalingschedulefake.NewSimpleClientset() + + clusterScalingSchedulesStore := fakeClusterScalingScheduleStore{ + client: client.ZalandoV1(), + } + + scalingSchedulesStore := fakeScalingScheduleStore{ + client: client.ZalandoV1(), + } + + // add schedules + err := applySchedules(client.ZalandoV1(), tc.schedules) + require.NoError(t, err) + + controller := NewController(client.ZalandoV1(), scalingSchedulesStore, clusterScalingSchedulesStore, now, 0, "Europe/Berlin") + + err = controller.runOnce(context.Background()) + require.NoError(t, err) + + // check schedule status + err = checkSchedules(t, client.ZalandoV1(), tc.schedules) + require.NoError(t, err) + + }) + } +} + +func applySchedules(client zalandov1.ZalandoV1Interface, schedules map[string]schedule) error { + for name, schedule := range schedules { + spec := v1.ScalingScheduleSpec{ + // ScalingWindowDurationMinutes *int64 `json:"scalingWindowDurationMinutes,omitempty"` + Schedules: schedule.schedules, + } + + status := v1.ScalingScheduleStatus{ + Active: schedule.preActiveStatus, + } + + scalingSchedule := &v1.ScalingSchedule{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + + Spec: spec, + Status: status, + } + + _, err := client.ScalingSchedules(scalingSchedule.Namespace).Create(context.Background(), scalingSchedule, metav1.CreateOptions{}) + if err != nil { + return err + } + + clusterScalingSchedule := &v1.ClusterScalingSchedule{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + + Spec: spec, + Status: status, + } + + _, err = client.ClusterScalingSchedules().Create(context.Background(), clusterScalingSchedule, metav1.CreateOptions{}) + if err != nil { + return err + } + } + + return nil +} + +func checkSchedules(t *testing.T, client zalandov1.ZalandoV1Interface, schedules map[string]schedule) error { + for name, expectedSchedule := range schedules { + scalingSchedule, err := client.ScalingSchedules("default").Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return err + } + + require.Equal(t, expectedSchedule.expectedActive, scalingSchedule.Status.Active) + + clusterScalingSchedule, err := client.ClusterScalingSchedules().Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + return err + } + + require.Equal(t, expectedSchedule.expectedActive, clusterScalingSchedule.Status.Active) + } + return nil +} diff --git a/pkg/server/start.go b/pkg/server/start.go index 358efaf..86388c5 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -34,6 +34,7 @@ import ( v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1" "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned" "github.com/zalando-incubator/kube-metrics-adapter/pkg/collector" + "github.com/zalando-incubator/kube-metrics-adapter/pkg/controller/scheduledscaling" "github.com/zalando-incubator/kube-metrics-adapter/pkg/provider" "github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon" "golang.org/x/oauth2" @@ -313,6 +314,13 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct if err != nil { return fmt.Errorf("failed to register ScalingSchedule object collector plugin: %v", err) } + + // setup ScheduledScaling controller to continously update + // status of ScalingSchedule and ClusterScalingSchedule + // resources. + scheduledScalingController := scheduledscaling.NewController(scalingScheduleClient.ZalandoV1(), scalingSchedulesStore, clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.DefaultTimeZone) + + go scheduledScalingController.Run(ctx) } hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs, o.MetricsTTL, o.GCInterval)