Create ScalingSchedule collector

This commit adds two new collectors to the adapter:
- ClusterScalingScheduleCollector; and
- ScalingScheduleCollector

Also, it introduces the required collectors plugins, initialization
logic in the server startup, documentation and deployment example
(including the helm chart). A new config flag is created,
`-scaling-schedule`, and allows to enable and to disable the collection
of such metrics. It's disabled by default.

This collectors are the required logic to utilise the CRDs introduced in
the #284 pull request. It makes use of the kubernetes go-client
implementations of a [Store][0] and [Reflector][1].

[0]: https://pkg.go.dev/k8s.io/client-go/tools/cache#Store
[1]: https://pkg.go.dev/k8s.io/client-go/tools/cache#Reflector

Signed-off-by: Jonathan Juares Beber <jonathanbeber@gmail.com>
This commit is contained in:
Jonathan Juares Beber
2021-05-21 09:00:39 +02:00
parent 7a68304389
commit a382dbfe7b
17 changed files with 1838 additions and 5 deletions
+52
View File
@@ -18,6 +18,7 @@ package server
import (
"context"
"errors"
"fmt"
"net"
"net/http"
@@ -31,15 +32,19 @@ import (
"github.com/spf13/cobra"
"github.com/zalando-incubator/cluster-lifecycle-manager/pkg/credentials-loader/platformiam"
generatedopenapi "github.com/zalando-incubator/kube-metrics-adapter/pkg/api/generated/openapi"
v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/provider"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
"golang.org/x/oauth2"
"k8s.io/apimachinery/pkg/fields"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
)
@@ -118,6 +123,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"disregard failing to create collectors for incompatible HPAs")
flags.DurationVar(&o.MetricsTTL, "metrics-ttl", 15*time.Minute, "TTL for metrics that are stored in in-memory cache.")
flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.")
flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+
"whether to enable time-based ScalingSchedule metrics")
return cmd
}
@@ -245,6 +252,49 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
}
if o.ScalingScheduleMetrics {
scalingScheduleClient, err := versioned.NewForConfig(clientConfig)
if err != nil {
return errors.New("unable to create [Cluster]ScalingSchedule.zalando.org/v1 client")
}
clusterScalingSchedulesStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
clusterReflector := cache.NewReflector(
cache.NewListWatchFromClient(scalingScheduleClient.ZalandoV1().RESTClient(), "ClusterScalingSchedules", "", fields.Everything()),
&v1.ClusterScalingSchedule{},
clusterScalingSchedulesStore,
0,
)
go clusterReflector.Run(ctx.Done())
scalingSchedulesStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
cache.NewListWatchFromClient(scalingScheduleClient.ZalandoV1().RESTClient(), "ScalingSchedules", "", fields.Everything()),
&v1.ScalingSchedule{},
scalingSchedulesStore,
0,
)
go reflector.Run(ctx.Done())
clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now)
if err != nil {
return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err)
}
err = collectorFactory.RegisterObjectCollector("ClusterScalingSchedule", "", clusterPlugin)
if err != nil {
return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err)
}
plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now)
if err != nil {
return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err)
}
err = collectorFactory.RegisterObjectCollector("ScalingSchedule", "", plugin)
if err != nil {
return fmt.Errorf("failed to register ScalingSchedule object collector plugin: %v", err)
}
}
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs, o.MetricsTTL, o.GCInterval)
go hpaProvider.Run(ctx)
@@ -356,4 +406,6 @@ type AdapterServerOptions struct {
MetricsTTL time.Duration
// Interval to clean up metrics that are stored in in-memory cache
GCInterval time.Duration
// Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling.
ScalingScheduleMetrics bool
}