diff --git a/docs/cluster_scaling_schedules_crd.yaml b/docs/cluster_scaling_schedules_crd.yaml index 16337ed..e2d6f1f 100644 --- a/docs/cluster_scaling_schedules_crd.yaml +++ b/docs/cluster_scaling_schedules_crd.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: clusterscalingschedules.zalando.org spec: group: zalando.org diff --git a/docs/scaling_schedules_crd.yaml b/docs/scaling_schedules_crd.yaml index adde7a8..2482307 100644 --- a/docs/scaling_schedules_crd.yaml +++ b/docs/scaling_schedules_crd.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: scalingschedules.zalando.org spec: group: zalando.org diff --git a/go.mod b/go.mod index 697b8a2..dc82e73 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b k8s.io/metrics v0.31.3 + k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/controller-tools v0.16.5 sigs.k8s.io/custom-metrics-apiserver v1.30.1-0.20241105195130-84dc8cfe2555 ) @@ -182,7 +183,6 @@ require ( k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kms v0.31.3 // indirect - k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect diff --git a/pkg/apis/zalando.org/v1/types.go b/pkg/apis/zalando.org/v1/types.go index f0e7615..ff1f470 100644 --- a/pkg/apis/zalando.org/v1/types.go +++ b/pkg/apis/zalando.org/v1/types.go @@ -6,6 +6,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// ScalingScheduler is an interface that represents a ScalingSchedule resource, +// namespaced or cluster wide. +type ScalingScheduler interface { + Identifier() string + ResourceSpec() ScalingScheduleSpec +} + // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:object:root=true @@ -25,6 +32,17 @@ type ScalingSchedule struct { Status ScalingScheduleStatus `json:"status"` } +// Identifier returns the namespaced scalingScale Identifier in the format +// `/`. +func (s *ScalingSchedule) Identifier() string { + return s.ObjectMeta.Namespace + "/" + s.ObjectMeta.Name +} + +// ResourceSpec returns the ScalingScheduleSpec of the ScalingSchedule. +func (s *ScalingSchedule) ResourceSpec() ScalingScheduleSpec { + return s.Spec +} + // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -46,6 +64,17 @@ type ClusterScalingSchedule struct { Status ScalingScheduleStatus `json:"status"` } +// Identifier returns the cluster scalingScale Identifier in the format +// ``. +func (s *ClusterScalingSchedule) Identifier() string { + return s.ObjectMeta.Name +} + +// ResourceSpec returns the ScalingScheduleSpec of the ClusterScalingSchedule. +func (s *ClusterScalingSchedule) ResourceSpec() ScalingScheduleSpec { + return s.Spec +} + // ScalingScheduleSpec is the spec part of the ScalingSchedule. // +k8s:deepcopy-gen=true type ScalingScheduleSpec struct { diff --git a/pkg/controller/scheduledscaling/scaler.go b/pkg/controller/scheduledscaling/scaler.go new file mode 100644 index 0000000..869b37d --- /dev/null +++ b/pkg/controller/scheduledscaling/scaler.go @@ -0,0 +1,118 @@ +package scheduledscaling + +import ( + "fmt" + "time" + + "golang.org/x/net/context" + autoscalingv1 "k8s.io/api/autoscaling/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + cacheddiscovery "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" + scaleclient "k8s.io/client-go/scale" +) + +// TargetScaler is an interface for scaling a target referenced resource in an +// HPA to the desired replicas. +type TargetScaler interface { + Scale(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, replicas int32) error +} + +type hpaTargetScaler struct { + mapper apimeta.RESTMapper + scaleClient scaleclient.ScalesGetter +} + +// NewHPATargetScaler creates a new TargetScaler that can scale resources +// targeted by HPAs. It takes a Kubernetes client and a REST config and uses a +// restmapper to resolve the target reference API. +func NewHPATargetScaler(ctx context.Context, kubeClient kubernetes.Interface, cfg *rest.Config) (TargetScaler, error) { + cachedClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery()) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient) + go wait.Until(func() { + restMapper.Reset() + }, 30*time.Second, ctx.Done()) + + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(kubeClient.Discovery()) + scaleClient, err := scale.NewForConfig(cfg, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + if err != nil { + return nil, fmt.Errorf("failed to create scale client: %w", err) + } + + return &hpaTargetScaler{ + mapper: restMapper, + scaleClient: scaleClient, + }, nil +} + +// Scale scales the target resource of the given HPA to the desired number of +// replicas. +func (s *hpaTargetScaler) Scale(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, replicas int32) error { + reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name) + + targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion) + if err != nil { + return fmt.Errorf("invalid API version '%s' in scale target reference: %w", hpa.Spec.ScaleTargetRef.APIVersion, err) + } + + targetGK := schema.GroupKind{ + Group: targetGV.Group, + Kind: hpa.Spec.ScaleTargetRef.Kind, + } + + mappings, err := s.mapper.RESTMappings(targetGK) + if err != nil { + return fmt.Errorf("unable to determine resource for scale target reference: %w", err) + } + + scale, targetGR, err := s.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings) + if err != nil { + return fmt.Errorf("failed to get scale subresource for %s: %w", reference, err) + } + + scale.Spec.Replicas = replicas + _, err = s.scaleClient.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to rescale %s: %w", reference, err) + } + + return nil +} + +// scaleForResourceMappings attempts to fetch the scale for the +// resource with the given name and namespace, trying each RESTMapping +// in turn until a working one is found. If none work, the first error +// is returned. It returns both the scale, as well as the group-resource from +// the working mapping. +// from: https://github.com/kubernetes/kubernetes/blob/c9092f69fc0c099062dd23cd6ee226bcd52ec790/pkg/controller/podautoscaler/horizontal.go#L1326-L1353 +func (s *hpaTargetScaler) scaleForResourceMappings(ctx context.Context, namespace, name string, mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error) { + var firstErr error + for i, mapping := range mappings { + targetGR := mapping.Resource.GroupResource() + scale, err := s.scaleClient.Scales(namespace).Get(ctx, targetGR, name, metav1.GetOptions{}) + if err == nil { + return scale, targetGR, nil + } + + // if this is the first error, remember it, + // then go on and try other mappings until we find a good one + if i == 0 { + firstErr = err + } + } + + // make sure we handle an empty set of mappings + if firstErr == nil { + firstErr = fmt.Errorf("unrecognized resource") + } + + return nil, schema.GroupResource{}, firstErr +} diff --git a/pkg/controller/scheduledscaling/scheduled_scaling.go b/pkg/controller/scheduledscaling/scheduled_scaling.go index e665ecd..d1b38aa 100644 --- a/pkg/controller/scheduledscaling/scheduled_scaling.go +++ b/pkg/controller/scheduledscaling/scheduled_scaling.go @@ -3,14 +3,20 @@ package scheduledscaling import ( "errors" "fmt" + "math" "time" log "github.com/sirupsen/logrus" v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1" zalandov1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/typed/zalando.org/v1" + "github.com/zalando-incubator/kube-metrics-adapter/pkg/recorder" "golang.org/x/net/context" "golang.org/x/sync/errgroup" + autoscalingv2 "k8s.io/api/autoscaling/v2" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + kube_record "k8s.io/client-go/tools/record" ) const ( @@ -56,21 +62,29 @@ type scalingScheduleStore interface { type Controller struct { client zalandov1.ZalandoV1Interface + kubeClient kubernetes.Interface + scaler TargetScaler + recorder kube_record.EventRecorder scalingScheduleStore scalingScheduleStore clusterScalingScheduleStore scalingScheduleStore now now defaultScalingWindow time.Duration defaultTimeZone string + hpaTolerance float64 } -func NewController(client zalandov1.ZalandoV1Interface, scalingScheduleStore, clusterScalingScheduleStore scalingScheduleStore, now now, defaultScalingWindow time.Duration, defaultTimeZone string) *Controller { +func NewController(zclient zalandov1.ZalandoV1Interface, kubeClient kubernetes.Interface, scaler TargetScaler, scalingScheduleStore, clusterScalingScheduleStore scalingScheduleStore, now now, defaultScalingWindow time.Duration, defaultTimeZone string, hpaThreshold float64) *Controller { return &Controller{ - client: client, + client: zclient, + kubeClient: kubeClient, + scaler: scaler, + recorder: recorder.CreateEventRecorder(kubeClient), scalingScheduleStore: scalingScheduleStore, clusterScalingScheduleStore: clusterScalingScheduleStore, now: now, defaultScalingWindow: defaultScalingWindow, defaultTimeZone: defaultTimeZone, + hpaTolerance: hpaThreshold, } } @@ -93,26 +107,23 @@ func (c *Controller) Run(ctx context.Context) { } } -func (c *Controller) runOnce(ctx context.Context) error { +func (c *Controller) updateStatus(ctx context.Context, schedules []*v1.ScalingSchedule, clusterschedules []*v1.ClusterScalingSchedule) error { // ScalingSchedules var scalingGroup errgroup.Group scalingGroup.SetLimit(10) - schedules := c.scalingScheduleStore.List() - for _, scheduleInterface := range schedules { - schedule, ok := scheduleInterface.(*v1.ScalingSchedule) - if !ok { - return ErrNotScalingScheduleFound - } + for _, schedule := range schedules { schedule = schedule.DeepCopy() scalingGroup.Go(func() error { - active, err := c.scheduleActive(schedule.Spec) + activeSchedules, err := c.activeSchedules(schedule.Spec) if err != nil { log.Errorf("Failed to check for active schedules in ScalingSchedule %s/%s: %v", schedule.Namespace, schedule.Name, err) return nil } + active := len(activeSchedules) > 0 + if active != schedule.Status.Active { schedule.Status.Active = active _, err := c.client.ScalingSchedules(schedule.Namespace).UpdateStatus(ctx, schedule, metav1.UpdateOptions{}) @@ -141,21 +152,18 @@ func (c *Controller) runOnce(ctx context.Context) error { var clusterScalingGroup errgroup.Group clusterScalingGroup.SetLimit(10) - clusterschedules := c.clusterScalingScheduleStore.List() - for _, scheduleInterface := range clusterschedules { - schedule, ok := scheduleInterface.(*v1.ClusterScalingSchedule) - if !ok { - return ErrNotScalingScheduleFound - } + for _, schedule := range clusterschedules { schedule = schedule.DeepCopy() clusterScalingGroup.Go(func() error { - active, err := c.scheduleActive(schedule.Spec) + activeSchedules, err := c.activeSchedules(schedule.Spec) if err != nil { log.Errorf("Failed to check for active schedules in ClusterScalingSchedule %s: %v", schedule.Name, err) return nil } + active := len(activeSchedules) > 0 + if active != schedule.Status.Active { schedule.Status.Active = active _, err := c.client.ClusterScalingSchedules().UpdateStatus(ctx, schedule, metav1.UpdateOptions{}) @@ -183,30 +191,210 @@ func (c *Controller) runOnce(ctx context.Context) error { return nil } -func (c *Controller) scheduleActive(spec v1.ScalingScheduleSpec) (bool, error) { +func (c *Controller) runOnce(ctx context.Context) error { + schedulesInterface := c.scalingScheduleStore.List() + namespacedSchedules := make([]*v1.ScalingSchedule, 0, len(schedulesInterface)) + schedules := make([]v1.ScalingScheduler, 0) + for _, scheduleInterface := range schedulesInterface { + schedule, ok := scheduleInterface.(*v1.ScalingSchedule) + if !ok { + return ErrNotScalingScheduleFound + } + namespacedSchedules = append(namespacedSchedules, schedule) + schedules = append(schedules, schedule) + } + + clusterschedulesInterface := c.clusterScalingScheduleStore.List() + clusterschedules := make([]*v1.ClusterScalingSchedule, 0, len(clusterschedulesInterface)) + for _, scheduleInterface := range clusterschedulesInterface { + schedule, ok := scheduleInterface.(*v1.ClusterScalingSchedule) + if !ok { + return ErrNotScalingScheduleFound + } + clusterschedules = append(clusterschedules, schedule) + schedules = append(schedules, schedule) + } + + err := c.updateStatus(ctx, namespacedSchedules, clusterschedules) + if err != nil { + return fmt.Errorf("failed to update status: %w", err) + } + + log.Info("Adjusting scaling") + err = c.adjustScaling(ctx, schedules) + if err != nil { + return fmt.Errorf("failed to adjust scaling: %w", err) + } + + return nil +} + +// activeScheduledScaling returns a map of the current active schedules and the +// max value per schedule. +func (c *Controller) activeScheduledScaling(schedules []v1.ScalingScheduler) map[string]int64 { + currentActiveSchedules := make(map[string]int64) + + for _, schedule := range schedules { + activeSchedules, err := c.activeSchedules(schedule.ResourceSpec()) + if err != nil { + log.Errorf("Failed to check for active schedules in ScalingSchedule %s: %v", schedule.Identifier(), err) + continue + } + + if len(activeSchedules) == 0 { + continue + } + + maxValue := int64(0) + for _, activeSchedule := range activeSchedules { + if activeSchedule.Value > maxValue { + maxValue = activeSchedule.Value + } + } + currentActiveSchedules[schedule.Identifier()] = maxValue + } + + return currentActiveSchedules +} + +// adjustHPAScaling adjusts the scaling for a single HPA based on the active +// scaling schedules. An adjustment is made if the current HPA scale is below +// the desired and the change is within the HPA tolerance. +func (c *Controller) adjustHPAScaling(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, activeSchedules map[string]int64) error { + current := int64(hpa.Status.CurrentReplicas) + if current == 0 { + return nil + } + + highestExpected, highestObject := highestActiveSchedule(hpa, activeSchedules) + + highestExpected = int64(math.Min(float64(highestExpected), float64(hpa.Spec.MaxReplicas))) + + var change float64 + if highestExpected > current { + change = (float64(highestExpected) - float64(current)) / float64(current) + } + + if change > 0 && change <= c.hpaTolerance { + err := c.scaler.Scale(ctx, hpa, int32(highestExpected)) + if err != nil { + reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name) + log.Errorf("Failed to scale target %s for HPA %s/%s: %v", reference, hpa.Namespace, hpa.Name, err) + return nil + } + + scheduleRef := highestObject.Name + if highestObject.Kind == "ScalingSchedule" { + scheduleRef = hpa.Namespace + "/" + scheduleRef + } + + c.recorder.Eventf( + hpa, + corev1.EventTypeNormal, + "ScalingAdjusted", + "Scaling schedule '%s' adjusted replicas %d -> %d based on metric: %s", + highestObject.Kind, + current, + highestExpected, + scheduleRef, + ) + } + return nil +} + +// highestActiveSchedule returns the highest active schedule value and +// corresponding object. +func highestActiveSchedule(hpa *autoscalingv2.HorizontalPodAutoscaler, activeSchedules map[string]int64) (int64, autoscalingv2.CrossVersionObjectReference) { + var highestExpected int64 + var highestObject autoscalingv2.CrossVersionObjectReference + for _, metric := range hpa.Spec.Metrics { + if metric.Type != autoscalingv2.ObjectMetricSourceType { + continue + } + + switch metric.Object.DescribedObject.Kind { + case "ClusterScalingSchedule", "ScalingSchedule": + default: + continue + } + + scheduleName := metric.Object.DescribedObject.Name + + target := int64(metric.Object.Target.AverageValue.MilliValue() / 1000) + if target == 0 { + continue + } + + var value int64 + switch metric.Object.DescribedObject.Kind { + case "ScalingSchedule": + value = activeSchedules[hpa.Namespace+"/"+scheduleName] + case "ClusterScalingSchedule": + value = activeSchedules[scheduleName] + } + + expected := int64(math.Ceil(float64(value) / float64(target))) + if expected > highestExpected { + highestExpected = expected + highestObject = metric.Object.DescribedObject + } + } + + return highestExpected, highestObject +} + +func (c *Controller) adjustScaling(ctx context.Context, schedules []v1.ScalingScheduler) error { + currentActiveSchedules := c.activeScheduledScaling(schedules) + + hpas, err := c.kubeClient.AutoscalingV2().HorizontalPodAutoscalers(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list HPAs: %w", err) + } + + var hpaGroup errgroup.Group + hpaGroup.SetLimit(10) + + for _, hpa := range hpas.Items { + hpa := hpa.DeepCopy() + + hpaGroup.Go(func() error { + return c.adjustHPAScaling(ctx, hpa, currentActiveSchedules) + }) + } + + err = hpaGroup.Wait() + if err != nil { + return fmt.Errorf("failed to wait for handling of HPAs: %w", err) + } + + return nil +} + +func (c *Controller) activeSchedules(spec v1.ScalingScheduleSpec) ([]v1.Schedule, error) { scalingWindowDuration := c.defaultScalingWindow if spec.ScalingWindowDurationMinutes != nil { scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute } if scalingWindowDuration < 0 { - return false, fmt.Errorf("scaling window duration cannot be negative: %d", scalingWindowDuration) + return nil, fmt.Errorf("scaling window duration cannot be negative: %d", scalingWindowDuration) } + activeSchedules := make([]v1.Schedule, 0, len(spec.Schedules)) for _, schedule := range spec.Schedules { startTime, endTime, err := ScheduleStartEnd(c.now(), schedule, c.defaultTimeZone) if err != nil { - return false, err + return nil, err } scalingStart := startTime.Add(-scalingWindowDuration) scalingEnd := endTime.Add(scalingWindowDuration) if Between(c.now(), scalingStart, scalingEnd) { - return true, nil + activeSchedules = append(activeSchedules, schedule) } } - return false, nil + return activeSchedules, nil } func ScheduleStartEnd(now time.Time, schedule v1.Schedule, defaultTimeZone string) (time.Time, time.Time, error) { diff --git a/pkg/controller/scheduledscaling/scheduled_scaling_test.go b/pkg/controller/scheduledscaling/scheduled_scaling_test.go index 32a3c1c..b379f9c 100644 --- a/pkg/controller/scheduledscaling/scheduled_scaling_test.go +++ b/pkg/controller/scheduledscaling/scheduled_scaling_test.go @@ -2,15 +2,24 @@ package scheduledscaling import ( "context" + "fmt" "testing" "time" "github.com/stretchr/testify/require" v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1" scalingschedulefake "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/fake" + zfake "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/fake" zalandov1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/typed/zalando.org/v1" + appsv1 "k8s.io/api/apps/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + v2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/ptr" ) const ( @@ -222,7 +231,7 @@ func TestRunOnce(t *testing.T) { err := applySchedules(client.ZalandoV1(), tc.schedules) require.NoError(t, err) - controller := NewController(client.ZalandoV1(), scalingSchedulesStore, clusterScalingSchedulesStore, now, 0, "Europe/Berlin") + controller := NewController(client.ZalandoV1(), fake.NewSimpleClientset(), nil, scalingSchedulesStore, clusterScalingSchedulesStore, now, 0, "Europe/Berlin", 0.10) err = controller.runOnce(context.Background()) require.NoError(t, err) @@ -297,3 +306,142 @@ func checkSchedules(t *testing.T, client zalandov1.ZalandoV1Interface, schedules } return nil } + +type mockScaler struct { + client kubernetes.Interface +} + +func (s *mockScaler) Scale(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, replicas int32) error { + switch hpa.Spec.ScaleTargetRef.Kind { + case "Deployment": + deployment, err := s.client.AppsV1().Deployments(hpa.Namespace).Get(ctx, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + deployment.Spec.Replicas = &replicas + _, err = s.client.AppsV1().Deployments(hpa.Namespace).Update(ctx, deployment, metav1.UpdateOptions{}) + if err != nil { + return err + } + default: + return fmt.Errorf("unsupported kind %s", hpa.Spec.ScaleTargetRef.Kind) + } + + return nil +} + +func TestAdjustScaling(t *testing.T) { + for _, tc := range []struct { + msg string + currentReplicas int32 + desiredReplicas int32 + targetValue int64 + }{ + { + msg: "current less than 10%% below desired", + currentReplicas: 95, // 5.3% increase to desired + desiredReplicas: 100, + targetValue: 10, // 1000/10 = 100 + }, + { + msg: "current more than 10%% below desired, no adjustment", + currentReplicas: 90, // 11% increase to desired + desiredReplicas: 90, + targetValue: 10, // 1000/10 = 100 + }, + } { + t.Run(tc.msg, func(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + scalingScheduleClient := zfake.NewSimpleClientset() + controller := NewController( + scalingScheduleClient.ZalandoV1(), + kubeClient, + &mockScaler{client: kubeClient}, + nil, + nil, + time.Now, + time.Hour, + "Europe/Berlin", + 0.10, + ) + + scheduleDate := v1.ScheduleDate(time.Now().Add(-10 * time.Minute).Format(time.RFC3339)) + clusterScalingSchedules := []v1.ScalingScheduler{ + &v1.ClusterScalingSchedule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "schedule-1", + }, + Spec: v1.ScalingScheduleSpec{ + Schedules: []v1.Schedule{ + { + Type: v1.OneTimeSchedule, + Date: &scheduleDate, + DurationMinutes: 15, + Value: 1000, + }, + }, + }, + }, + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deployment-1", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: ptr.To(tc.currentReplicas), + }, + } + + _, err := kubeClient.AppsV1().Deployments("default").Create(context.Background(), deployment, metav1.CreateOptions{}) + require.NoError(t, err) + + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hpa-1", + }, + Spec: v2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: v2.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "deployment-1", + }, + MinReplicas: ptr.To(int32(1)), + MaxReplicas: 1000, + Metrics: []v2.MetricSpec{ + { + Type: v2.ObjectMetricSourceType, + Object: &v2.ObjectMetricSource{ + DescribedObject: v2.CrossVersionObjectReference{ + APIVersion: "zalando.org/v1", + Kind: "ClusterScalingSchedule", + Name: "schedule-1", + }, + Target: v2.MetricTarget{ + Type: v2.AverageValueMetricType, + AverageValue: resource.NewQuantity(tc.targetValue, resource.DecimalSI), + }, + }, + }, + }, + }, + } + + hpa, err = kubeClient.AutoscalingV2().HorizontalPodAutoscalers("default").Create(context.Background(), hpa, metav1.CreateOptions{}) + require.NoError(t, err) + + hpa.Status.CurrentReplicas = tc.currentReplicas + _, err = kubeClient.AutoscalingV2().HorizontalPodAutoscalers("default").UpdateStatus(context.Background(), hpa, metav1.UpdateOptions{}) + require.NoError(t, err) + + err = controller.adjustScaling(context.Background(), clusterScalingSchedules) + require.NoError(t, err) + + deployment, err = kubeClient.AppsV1().Deployments("default").Get(context.Background(), "deployment-1", metav1.GetOptions{}) + require.NoError(t, err) + + require.Equal(t, tc.desiredReplicas, ptr.Deref(deployment.Spec.Replicas, 0)) + }) + } +} diff --git a/pkg/server/start.go b/pkg/server/start.go index fea0002..77eeb35 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -138,6 +138,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules") flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.") flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.") + flags.Float64Var(&o.HorizontalPodAutoscalerTolerance, "horizontal-pod-autoscaler-tolerance", 0.1, "The HPA tolerance also configured in the HPA controller.") flags.StringVar(&o.ExternalRPSMetricName, "external-rps-metric-name", o.ExternalRPSMetricName, ""+ "The name of the metric that should be used to query prometheus for RPS per hostname.") flags.BoolVar(&o.ExternalRPSMetrics, "external-rps-metrics", o.ExternalRPSMetrics, ""+ @@ -367,10 +368,25 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct return fmt.Errorf("failed to register ScalingSchedule object collector plugin: %v", err) } + scaler, err := scheduledscaling.NewHPATargetScaler(ctx, client, clientConfig) + if err != nil { + return fmt.Errorf("unable to create HPA target scaler: %w", err) + } + // setup ScheduledScaling controller to continuously update // status of ScalingSchedule and ClusterScalingSchedule // resources. - scheduledScalingController := scheduledscaling.NewController(scalingScheduleClient.ZalandoV1(), scalingSchedulesStore, clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.DefaultTimeZone) + scheduledScalingController := scheduledscaling.NewController( + scalingScheduleClient.ZalandoV1(), + client, + scaler, + scalingSchedulesStore, + clusterScalingSchedulesStore, + time.Now, + o.DefaultScheduledScalingWindow, + o.DefaultTimeZone, + o.HorizontalPodAutoscalerTolerance, + ) go scheduledScalingController.Run(ctx) } @@ -501,6 +517,9 @@ type AdapterServerOptions struct { RampSteps int // Default time zone to use for ScalingSchedules. DefaultTimeZone string + // The HPA tolerance also configured in the HPA controller. + // kube-controller-manager flag: --horizontal-pod-autoscaler-tolerance= + HorizontalPodAutoscalerTolerance float64 // Feature flag to enable external rps metric collector ExternalRPSMetrics bool // Name of the Prometheus metric that stores RPS by hostname for external RPS metrics.