Adjust HPA when schedule scaling change is below tolerance (#765)

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
This commit is contained in:
Mikkel Oscar Lyderik Larsen 2024-12-10 16:48:34 +01:00 committed by GitHub
parent 48f0df44d9
commit ca9031228b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 529 additions and 27 deletions

View File

@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition kind: CustomResourceDefinition
metadata: metadata:
annotations: annotations:
controller-gen.kubebuilder.io/version: v0.15.0 controller-gen.kubebuilder.io/version: v0.16.5
name: clusterscalingschedules.zalando.org name: clusterscalingschedules.zalando.org
spec: spec:
group: zalando.org group: zalando.org

View File

@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition kind: CustomResourceDefinition
metadata: metadata:
annotations: annotations:
controller-gen.kubebuilder.io/version: v0.15.0 controller-gen.kubebuilder.io/version: v0.16.5
name: scalingschedules.zalando.org name: scalingschedules.zalando.org
spec: spec:
group: zalando.org group: zalando.org

2
go.mod
View File

@ -26,6 +26,7 @@ require (
k8s.io/klog v1.0.0 k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b
k8s.io/metrics v0.31.3 k8s.io/metrics v0.31.3
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/controller-tools v0.16.5 sigs.k8s.io/controller-tools v0.16.5
sigs.k8s.io/custom-metrics-apiserver v1.30.1-0.20241105195130-84dc8cfe2555 sigs.k8s.io/custom-metrics-apiserver v1.30.1-0.20241105195130-84dc8cfe2555
) )
@ -182,7 +183,6 @@ require (
k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 // indirect k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 // indirect
k8s.io/klog/v2 v2.130.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kms v0.31.3 // indirect k8s.io/kms v0.31.3 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect

View File

@ -6,6 +6,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
// ScalingScheduler is an interface that represents a ScalingSchedule resource,
// namespaced or cluster wide.
type ScalingScheduler interface {
Identifier() string
ResourceSpec() ScalingScheduleSpec
}
// +genclient // +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true // +kubebuilder:object:root=true
@ -25,6 +32,17 @@ type ScalingSchedule struct {
Status ScalingScheduleStatus `json:"status"` Status ScalingScheduleStatus `json:"status"`
} }
// Identifier returns the namespaced scalingScale Identifier in the format
// `<namespace>/<name>`.
func (s *ScalingSchedule) Identifier() string {
return s.ObjectMeta.Namespace + "/" + s.ObjectMeta.Name
}
// ResourceSpec returns the ScalingScheduleSpec of the ScalingSchedule.
func (s *ScalingSchedule) ResourceSpec() ScalingScheduleSpec {
return s.Spec
}
// +genclient // +genclient
// +genclient:nonNamespaced // +genclient:nonNamespaced
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@ -46,6 +64,17 @@ type ClusterScalingSchedule struct {
Status ScalingScheduleStatus `json:"status"` Status ScalingScheduleStatus `json:"status"`
} }
// Identifier returns the cluster scalingScale Identifier in the format
// `<name>`.
func (s *ClusterScalingSchedule) Identifier() string {
return s.ObjectMeta.Name
}
// ResourceSpec returns the ScalingScheduleSpec of the ClusterScalingSchedule.
func (s *ClusterScalingSchedule) ResourceSpec() ScalingScheduleSpec {
return s.Spec
}
// ScalingScheduleSpec is the spec part of the ScalingSchedule. // ScalingScheduleSpec is the spec part of the ScalingSchedule.
// +k8s:deepcopy-gen=true // +k8s:deepcopy-gen=true
type ScalingScheduleSpec struct { type ScalingScheduleSpec struct {

View File

@ -0,0 +1,118 @@
package scheduledscaling
import (
"fmt"
"time"
"golang.org/x/net/context"
autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
scaleclient "k8s.io/client-go/scale"
)
// TargetScaler is an interface for scaling a target referenced resource in an
// HPA to the desired replicas.
type TargetScaler interface {
Scale(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, replicas int32) error
}
type hpaTargetScaler struct {
mapper apimeta.RESTMapper
scaleClient scaleclient.ScalesGetter
}
// NewHPATargetScaler creates a new TargetScaler that can scale resources
// targeted by HPAs. It takes a Kubernetes client and a REST config and uses a
// restmapper to resolve the target reference API.
func NewHPATargetScaler(ctx context.Context, kubeClient kubernetes.Interface, cfg *rest.Config) (TargetScaler, error) {
cachedClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
go wait.Until(func() {
restMapper.Reset()
}, 30*time.Second, ctx.Done())
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(kubeClient.Discovery())
scaleClient, err := scale.NewForConfig(cfg, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, fmt.Errorf("failed to create scale client: %w", err)
}
return &hpaTargetScaler{
mapper: restMapper,
scaleClient: scaleClient,
}, nil
}
// Scale scales the target resource of the given HPA to the desired number of
// replicas.
func (s *hpaTargetScaler) Scale(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, replicas int32) error {
reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
if err != nil {
return fmt.Errorf("invalid API version '%s' in scale target reference: %w", hpa.Spec.ScaleTargetRef.APIVersion, err)
}
targetGK := schema.GroupKind{
Group: targetGV.Group,
Kind: hpa.Spec.ScaleTargetRef.Kind,
}
mappings, err := s.mapper.RESTMappings(targetGK)
if err != nil {
return fmt.Errorf("unable to determine resource for scale target reference: %w", err)
}
scale, targetGR, err := s.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
if err != nil {
return fmt.Errorf("failed to get scale subresource for %s: %w", reference, err)
}
scale.Spec.Replicas = replicas
_, err = s.scaleClient.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to rescale %s: %w", reference, err)
}
return nil
}
// scaleForResourceMappings attempts to fetch the scale for the
// resource with the given name and namespace, trying each RESTMapping
// in turn until a working one is found. If none work, the first error
// is returned. It returns both the scale, as well as the group-resource from
// the working mapping.
// from: https://github.com/kubernetes/kubernetes/blob/c9092f69fc0c099062dd23cd6ee226bcd52ec790/pkg/controller/podautoscaler/horizontal.go#L1326-L1353
func (s *hpaTargetScaler) scaleForResourceMappings(ctx context.Context, namespace, name string, mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error) {
var firstErr error
for i, mapping := range mappings {
targetGR := mapping.Resource.GroupResource()
scale, err := s.scaleClient.Scales(namespace).Get(ctx, targetGR, name, metav1.GetOptions{})
if err == nil {
return scale, targetGR, nil
}
// if this is the first error, remember it,
// then go on and try other mappings until we find a good one
if i == 0 {
firstErr = err
}
}
// make sure we handle an empty set of mappings
if firstErr == nil {
firstErr = fmt.Errorf("unrecognized resource")
}
return nil, schema.GroupResource{}, firstErr
}

File diff suppressed because it is too large Load Diff

View File

@ -2,15 +2,24 @@ package scheduledscaling
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1" v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1"
scalingschedulefake "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/fake" scalingschedulefake "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/fake"
zfake "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/fake"
zalandov1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/typed/zalando.org/v1" zalandov1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/typed/zalando.org/v1"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
v2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"
) )
const ( const (
@ -222,7 +231,7 @@ func TestRunOnce(t *testing.T) {
err := applySchedules(client.ZalandoV1(), tc.schedules) err := applySchedules(client.ZalandoV1(), tc.schedules)
require.NoError(t, err) require.NoError(t, err)
controller := NewController(client.ZalandoV1(), scalingSchedulesStore, clusterScalingSchedulesStore, now, 0, "Europe/Berlin") controller := NewController(client.ZalandoV1(), fake.NewSimpleClientset(), nil, scalingSchedulesStore, clusterScalingSchedulesStore, now, 0, "Europe/Berlin", 0.10)
err = controller.runOnce(context.Background()) err = controller.runOnce(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -297,3 +306,142 @@ func checkSchedules(t *testing.T, client zalandov1.ZalandoV1Interface, schedules
} }
return nil return nil
} }
type mockScaler struct {
client kubernetes.Interface
}
func (s *mockScaler) Scale(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, replicas int32) error {
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":
deployment, err := s.client.AppsV1().Deployments(hpa.Namespace).Get(ctx, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return err
}
deployment.Spec.Replicas = &replicas
_, err = s.client.AppsV1().Deployments(hpa.Namespace).Update(ctx, deployment, metav1.UpdateOptions{})
if err != nil {
return err
}
default:
return fmt.Errorf("unsupported kind %s", hpa.Spec.ScaleTargetRef.Kind)
}
return nil
}
func TestAdjustScaling(t *testing.T) {
for _, tc := range []struct {
msg string
currentReplicas int32
desiredReplicas int32
targetValue int64
}{
{
msg: "current less than 10%% below desired",
currentReplicas: 95, // 5.3% increase to desired
desiredReplicas: 100,
targetValue: 10, // 1000/10 = 100
},
{
msg: "current more than 10%% below desired, no adjustment",
currentReplicas: 90, // 11% increase to desired
desiredReplicas: 90,
targetValue: 10, // 1000/10 = 100
},
} {
t.Run(tc.msg, func(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
scalingScheduleClient := zfake.NewSimpleClientset()
controller := NewController(
scalingScheduleClient.ZalandoV1(),
kubeClient,
&mockScaler{client: kubeClient},
nil,
nil,
time.Now,
time.Hour,
"Europe/Berlin",
0.10,
)
scheduleDate := v1.ScheduleDate(time.Now().Add(-10 * time.Minute).Format(time.RFC3339))
clusterScalingSchedules := []v1.ScalingScheduler{
&v1.ClusterScalingSchedule{
ObjectMeta: metav1.ObjectMeta{
Name: "schedule-1",
},
Spec: v1.ScalingScheduleSpec{
Schedules: []v1.Schedule{
{
Type: v1.OneTimeSchedule,
Date: &scheduleDate,
DurationMinutes: 15,
Value: 1000,
},
},
},
},
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "deployment-1",
},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To(tc.currentReplicas),
},
}
_, err := kubeClient.AppsV1().Deployments("default").Create(context.Background(), deployment, metav1.CreateOptions{})
require.NoError(t, err)
hpa := &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa-1",
},
Spec: v2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: v2.CrossVersionObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: "deployment-1",
},
MinReplicas: ptr.To(int32(1)),
MaxReplicas: 1000,
Metrics: []v2.MetricSpec{
{
Type: v2.ObjectMetricSourceType,
Object: &v2.ObjectMetricSource{
DescribedObject: v2.CrossVersionObjectReference{
APIVersion: "zalando.org/v1",
Kind: "ClusterScalingSchedule",
Name: "schedule-1",
},
Target: v2.MetricTarget{
Type: v2.AverageValueMetricType,
AverageValue: resource.NewQuantity(tc.targetValue, resource.DecimalSI),
},
},
},
},
},
}
hpa, err = kubeClient.AutoscalingV2().HorizontalPodAutoscalers("default").Create(context.Background(), hpa, metav1.CreateOptions{})
require.NoError(t, err)
hpa.Status.CurrentReplicas = tc.currentReplicas
_, err = kubeClient.AutoscalingV2().HorizontalPodAutoscalers("default").UpdateStatus(context.Background(), hpa, metav1.UpdateOptions{})
require.NoError(t, err)
err = controller.adjustScaling(context.Background(), clusterScalingSchedules)
require.NoError(t, err)
deployment, err = kubeClient.AppsV1().Deployments("default").Get(context.Background(), "deployment-1", metav1.GetOptions{})
require.NoError(t, err)
require.Equal(t, tc.desiredReplicas, ptr.Deref(deployment.Spec.Replicas, 0))
})
}
}

View File

@ -138,6 +138,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules") flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.") flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.") flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.")
flags.Float64Var(&o.HorizontalPodAutoscalerTolerance, "horizontal-pod-autoscaler-tolerance", 0.1, "The HPA tolerance also configured in the HPA controller.")
flags.StringVar(&o.ExternalRPSMetricName, "external-rps-metric-name", o.ExternalRPSMetricName, ""+ flags.StringVar(&o.ExternalRPSMetricName, "external-rps-metric-name", o.ExternalRPSMetricName, ""+
"The name of the metric that should be used to query prometheus for RPS per hostname.") "The name of the metric that should be used to query prometheus for RPS per hostname.")
flags.BoolVar(&o.ExternalRPSMetrics, "external-rps-metrics", o.ExternalRPSMetrics, ""+ flags.BoolVar(&o.ExternalRPSMetrics, "external-rps-metrics", o.ExternalRPSMetrics, ""+
@ -367,10 +368,25 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
return fmt.Errorf("failed to register ScalingSchedule object collector plugin: %v", err) return fmt.Errorf("failed to register ScalingSchedule object collector plugin: %v", err)
} }
scaler, err := scheduledscaling.NewHPATargetScaler(ctx, client, clientConfig)
if err != nil {
return fmt.Errorf("unable to create HPA target scaler: %w", err)
}
// setup ScheduledScaling controller to continuously update // setup ScheduledScaling controller to continuously update
// status of ScalingSchedule and ClusterScalingSchedule // status of ScalingSchedule and ClusterScalingSchedule
// resources. // resources.
scheduledScalingController := scheduledscaling.NewController(scalingScheduleClient.ZalandoV1(), scalingSchedulesStore, clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.DefaultTimeZone) scheduledScalingController := scheduledscaling.NewController(
scalingScheduleClient.ZalandoV1(),
client,
scaler,
scalingSchedulesStore,
clusterScalingSchedulesStore,
time.Now,
o.DefaultScheduledScalingWindow,
o.DefaultTimeZone,
o.HorizontalPodAutoscalerTolerance,
)
go scheduledScalingController.Run(ctx) go scheduledScalingController.Run(ctx)
} }
@ -501,6 +517,9 @@ type AdapterServerOptions struct {
RampSteps int RampSteps int
// Default time zone to use for ScalingSchedules. // Default time zone to use for ScalingSchedules.
DefaultTimeZone string DefaultTimeZone string
// The HPA tolerance also configured in the HPA controller.
// kube-controller-manager flag: --horizontal-pod-autoscaler-tolerance=
HorizontalPodAutoscalerTolerance float64
// Feature flag to enable external rps metric collector // Feature flag to enable external rps metric collector
ExternalRPSMetrics bool ExternalRPSMetrics bool
// Name of the Prometheus metric that stores RPS by hostname for external RPS metrics. // Name of the Prometheus metric that stores RPS by hostname for external RPS metrics.