mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-06-24 20:42:19 +00:00
Compare commits
26 Commits
Author | SHA1 | Date | |
---|---|---|---|
e600557636 | |||
0f06db7cdf | |||
1c9038b2cc | |||
fd4ead837e | |||
f46f801811 | |||
4acdf72ef7 | |||
e04cd10bfc | |||
8fe330941a | |||
0730c6ef1e | |||
c5411c74b7 | |||
4ba6b66441 | |||
0dbe769e1b | |||
582a78d4ec | |||
481df883e5 | |||
318d47e05e | |||
190f0db092 | |||
c618494177 | |||
a08860d06c | |||
6b0afe5180 | |||
9722e38f62 | |||
35aa03c771 | |||
34c833e04a | |||
c93c1dd7bb | |||
59d39bc51c | |||
4066e450e5 | |||
8415373f3d |
123
README.md
123
README.md
@ -306,20 +306,27 @@ for this object. But to satisfy the schema we specify a dummy pod called `dummy-
|
||||
## Skipper collector
|
||||
|
||||
The skipper collector is a simple wrapper around the Prometheus collector to
|
||||
make it easy to define an HPA for scaling based on ingress metrics when
|
||||
make it easy to define an HPA for scaling based on [Ingress][ingress] or
|
||||
[RouteGroup][routegroup] metrics when
|
||||
[skipper](https://github.com/zalando/skipper) is used as the ingress
|
||||
implementation in your cluster. It assumes you are collecting Prometheus
|
||||
metrics from skipper and it provides the correct Prometheus queries out of the
|
||||
box so users don't have to define those manually.
|
||||
|
||||
[ingress]: https://kubernetes.io/docs/concepts/services-networking/ingress/
|
||||
[routegroup]: https://opensource.zalando.com/skipper/kubernetes/routegroups/
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type | Kind | K8s Versions |
|
||||
| ----------- | -------------- | ------ | ---- | ---- |
|
||||
| `requests-per-second` | Scale based on requests per second for a certain ingress. | Object | `Ingress` | `>=1.14` |
|
||||
| `requests-per-second` | Scale based on requests per second for a certain ingress or routegroup. | Object | `Ingress`, `RouteGroup` | `>=1.14` |
|
||||
|
||||
### Example
|
||||
|
||||
|
||||
#### Ingress
|
||||
|
||||
This is an example of an HPA that will scale based on `requests-per-second` for
|
||||
an ingress called `myapp`.
|
||||
|
||||
@ -339,11 +346,48 @@ spec:
|
||||
- type: Object
|
||||
object:
|
||||
describedObject:
|
||||
apiVersion: extensions/v1beta1
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
name: myapp
|
||||
metric:
|
||||
name: requests-per-second
|
||||
selector:
|
||||
matchLabels:
|
||||
backend: backend1 # optional backend
|
||||
target:
|
||||
averageValue: "10"
|
||||
type: AverageValue
|
||||
```
|
||||
|
||||
#### RouteGroup
|
||||
|
||||
This is an example of an HPA that will scale based on `requests-per-second` for
|
||||
a routegroup called `myapp`.
|
||||
|
||||
```yaml
|
||||
apiVersion: autoscaling/v2beta2
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: myapp-hpa
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
name: myapp
|
||||
minReplicas: 1
|
||||
maxReplicas: 10
|
||||
metrics:
|
||||
- type: Object
|
||||
object:
|
||||
describedObject:
|
||||
apiVersion: zalando.org/v1
|
||||
kind: RouteGroup
|
||||
name: myapp
|
||||
metric:
|
||||
name: requests-per-second
|
||||
selector:
|
||||
matchLabels:
|
||||
backend: backend1 # optional backend
|
||||
target:
|
||||
averageValue: "10"
|
||||
type: AverageValue
|
||||
@ -351,15 +395,14 @@ spec:
|
||||
|
||||
### Metric weighting based on backend
|
||||
|
||||
Skipper supports sending traffic to different backend based on annotations
|
||||
present on the `Ingress` object. When the metric name is specified without a
|
||||
backend as `requests-per-second` then the number of replicas will be calculated
|
||||
based on the full traffic served by that ingress. If however only the traffic
|
||||
being routed to a specific backend should be used then the backend name can be
|
||||
specified as a metric name like `requests-per-second,backend1` which would
|
||||
return the requests-per-second being sent to the `backend1`. The ingress
|
||||
annotation where the backend weights can be obtained can be specified through
|
||||
the flag `--skipper-backends-annotation`.
|
||||
Skipper supports sending traffic to different backends based on annotations
|
||||
present on the `Ingress` object, or weights on the RouteGroup backends. By
|
||||
default the number of replicas will be calculated based on the full traffic
|
||||
served by that ingress/routegroup. If however only the traffic being routed to
|
||||
a specific backend should be used then the backend name can be specified via
|
||||
the `backend` label under `matchLabels` for the metric. The ingress annotation
|
||||
where the backend weights can be obtained can be specified through the flag
|
||||
`--skipper-backends-annotation`.
|
||||
|
||||
## InfluxDB collector
|
||||
|
||||
@ -677,12 +720,63 @@ The `ScalingSchedule` and `ClusterScalingSchedule` collectors allow
|
||||
collecting time-based metrics from the respective CRD objects specified
|
||||
in the HPA.
|
||||
|
||||
These collectors are disabled by default, you have to start the server
|
||||
with the `--scaling-schedule` flag to enable it. Remember to deploy the CRDs
|
||||
`ScalingSchedule` and `ClusterScalingSchedule` and allow the service
|
||||
account used by the server to read, watch and list them.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type | K8s Versions |
|
||||
| ---------- | -------------- | ------- | -- |
|
||||
| ObjectName | The metric is calculated and stored for each `ScalingSchedule` and `ClusterScalingSchedule` referenced in the HPAs | `ScalingSchedule` and `ClusterScalingSchedule` | `>=1.16` |
|
||||
|
||||
### Ramp-up and ramp-down feature
|
||||
|
||||
To avoid abrupt scaling due to time based metrics,the `SchalingSchedule`
|
||||
collector has a feature of ramp-up and ramp-down the metric over a
|
||||
specific period of time. The duration of the scaling window can be
|
||||
configured individually in the `[Cluster]ScalingSchedule` object, via
|
||||
the option `scalingWindowDurationMinutes` or globally for all scheduled
|
||||
events, and defaults to a globally configured value if not specified.
|
||||
The default for the latter is set to 10 minutes, but can be changed
|
||||
using the `--scaling-schedule-default-scaling-window` flag.
|
||||
|
||||
This spreads the scale events around, creating less load on the other
|
||||
components, and helping the rest of the metrics (like the CPU ones) to
|
||||
adjust as well.
|
||||
|
||||
The [HPA algorithm][algo-details] does not make changes if the metric
|
||||
change is less than the specified by the
|
||||
`horizontal-pod-autoscaler-tolerance` flag:
|
||||
|
||||
> We'll skip scaling if the ratio is sufficiently close to 1.0 (within a
|
||||
> globally-configurable tolerance, from the
|
||||
> `--horizontal-pod-autoscaler-tolerance` flag, which defaults to 0.1.
|
||||
|
||||
With that in mind, the ramp-up and ramp-down feature divides the scaling
|
||||
over the specified period of time in buckets, trying to achieve changes
|
||||
bigger than the configured tolerance. The number of buckets defaults to
|
||||
10 and can be configured by the `--scaling-schedule-ramp-steps` flag.
|
||||
|
||||
**Important**: note that the ramp-up and ramp-down feature can lead to
|
||||
deployments achieving less than the specified number of pods, due to the
|
||||
HPA 10% change rule and the ceiling function applied to the desired
|
||||
number of the pods (check the [algorithm details][algo-details]). It
|
||||
varies with the configured metric for `ScalingSchedule` events, the
|
||||
number of pods and the configured `horizontal-pod-autoscaler-tolerance`
|
||||
flag of your kubernetes installation. [This gist][gist] contains the code to
|
||||
simulate the situations a deployment with different number of pods, with
|
||||
a metric of 10000 can face with 10 buckets (max of 90% of the metric
|
||||
returned) and 5 buckets (max of 80% of the metric returned). The ramp-up
|
||||
and ramp-down feature can be disabled by setting
|
||||
`--scaling-schedule-default-scaling-window` to 0 and abrupt scalings can
|
||||
be handled via [scaling policies][policies].
|
||||
|
||||
[algo-details]: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#algorithm-details
|
||||
[gist]: https://gist.github.com/jonathanbeber/37f1f918ab7ef6101c6ce56cc2cef3a2
|
||||
[policies]: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#scaling-policies
|
||||
|
||||
### Example
|
||||
|
||||
This is an example of using the ScalingSchedule collectors to collect
|
||||
@ -783,8 +877,3 @@ Note that these number of pods are just considering these custom
|
||||
metrics, the normal HPA behavior still applies, such as: in case of
|
||||
multiple metrics the biggest number of pods is the utilized one, HPA max
|
||||
and min replica configuration, autoscaling policies, etc.
|
||||
|
||||
These collectors are disabled by default, you have to start the server
|
||||
with the `--scaling-schedule` flag to enable it. Remember to deploy the CRDs
|
||||
`ScalingSchedule` and `ClusterScalingSchedule` and allow the service
|
||||
account used by the server to read, watch and list them.
|
||||
|
@ -37,6 +37,11 @@ spec:
|
||||
spec:
|
||||
description: ScalingScheduleSpec is the spec part of the ScalingSchedule.
|
||||
properties:
|
||||
scalingWindowDurationMinutes:
|
||||
description: Fade the scheduled values in and out over this many minutes.
|
||||
If unset, the default per-cluster value will be used.
|
||||
format: int64
|
||||
type: integer
|
||||
schedules:
|
||||
description: Schedules is the list of schedules for this ScalingSchedule
|
||||
resource. All the schedules defined here will result on the value
|
||||
@ -96,6 +101,7 @@ spec:
|
||||
value:
|
||||
description: The metric value that will be returned for the
|
||||
defined schedule.
|
||||
format: int64
|
||||
type: integer
|
||||
required:
|
||||
- durationMinutes
|
||||
|
@ -64,12 +64,23 @@ rules:
|
||||
- statefulsets
|
||||
verbs:
|
||||
- get
|
||||
# only relevant if running with the flag:
|
||||
# --skipper-ingress-metrics
|
||||
- apiGroups:
|
||||
- extensions
|
||||
- networking.k8s.io
|
||||
resources:
|
||||
- ingresses
|
||||
verbs:
|
||||
- get
|
||||
# only relevant if running with the flag:
|
||||
# --skipper-routegroup-metrics
|
||||
- apiGroups:
|
||||
- zalando.org
|
||||
resources:
|
||||
- routegroups
|
||||
verbs:
|
||||
- get
|
||||
- apiGroups:
|
||||
- autoscaling
|
||||
resources:
|
||||
|
@ -37,6 +37,11 @@ spec:
|
||||
spec:
|
||||
description: ScalingScheduleSpec is the spec part of the ScalingSchedule.
|
||||
properties:
|
||||
scalingWindowDurationMinutes:
|
||||
description: Fade the scheduled values in and out over this many minutes.
|
||||
If unset, the default per-cluster value will be used.
|
||||
format: int64
|
||||
type: integer
|
||||
schedules:
|
||||
description: Schedules is the list of schedules for this ScalingSchedule
|
||||
resource. All the schedules defined here will result on the value
|
||||
@ -96,6 +101,7 @@ spec:
|
||||
value:
|
||||
description: The metric value that will be returned for the
|
||||
defined schedule.
|
||||
format: int64
|
||||
type: integer
|
||||
required:
|
||||
- durationMinutes
|
||||
|
26
go.mod
26
go.mod
@ -1,32 +1,32 @@
|
||||
module github.com/zalando-incubator/kube-metrics-adapter
|
||||
|
||||
require (
|
||||
github.com/NYTimes/gziphandler v1.0.1 // indirect
|
||||
github.com/aws/aws-sdk-go v1.39.2
|
||||
github.com/aws/aws-sdk-go v1.40.22
|
||||
github.com/go-openapi/spec v0.20.3
|
||||
github.com/influxdata/influxdb-client-go v0.2.0
|
||||
github.com/influxdata/line-protocol v0.0.0-20201012155213-5f565037cbc9 // indirect
|
||||
github.com/kubernetes-sigs/custom-metrics-apiserver v0.0.0-20201216091021-1b9fa998bbaa
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/prometheus/client_golang v1.10.0
|
||||
github.com/prometheus/common v0.25.0
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
github.com/prometheus/common v0.26.0
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/spf13/cobra v1.1.3
|
||||
github.com/spyzhov/ajson v0.4.2
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/szuecs/routegroup-client v0.18.3
|
||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
|
||||
golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392 // indirect
|
||||
go.uber.org/zap v1.13.0 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
k8s.io/api v0.20.5
|
||||
k8s.io/apimachinery v0.20.5
|
||||
k8s.io/apiserver v0.20.5
|
||||
k8s.io/client-go v0.20.5
|
||||
k8s.io/code-generator v0.20.5
|
||||
k8s.io/component-base v0.20.5
|
||||
k8s.io/api v0.21.5
|
||||
k8s.io/apimachinery v0.21.5
|
||||
k8s.io/apiserver v0.21.5
|
||||
k8s.io/client-go v0.21.5
|
||||
k8s.io/code-generator v0.21.5
|
||||
k8s.io/component-base v0.21.5
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd
|
||||
k8s.io/metrics v0.20.5
|
||||
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7
|
||||
k8s.io/metrics v0.21.5
|
||||
sigs.k8s.io/controller-tools v0.5.0
|
||||
)
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,6 +1,8 @@
|
||||
package v1
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
@ -37,6 +39,10 @@ type ClusterScalingSchedule struct {
|
||||
// ScalingScheduleSpec is the spec part of the ScalingSchedule.
|
||||
// +k8s:deepcopy-gen=true
|
||||
type ScalingScheduleSpec struct {
|
||||
// Fade the scheduled values in and out over this many minutes. If unset, the default per-cluster value will be used.
|
||||
// +optional
|
||||
ScalingWindowDurationMinutes *int64 `json:"scalingWindowDurationMinutes,omitempty"`
|
||||
|
||||
// Schedules is the list of schedules for this ScalingSchedule
|
||||
// resource. All the schedules defined here will result on the value
|
||||
// to the same metric. New metrics require a new ScalingSchedule
|
||||
@ -59,7 +65,11 @@ type Schedule struct {
|
||||
// returned for the defined schedule.
|
||||
DurationMinutes int `json:"durationMinutes"`
|
||||
// The metric value that will be returned for the defined schedule.
|
||||
Value int `json:"value"`
|
||||
Value int64 `json:"value"`
|
||||
}
|
||||
|
||||
func (in Schedule) Duration() time.Duration {
|
||||
return time.Duration(in.DurationMinutes) * time.Minute
|
||||
}
|
||||
|
||||
// Defines if the schedule is a OneTime schedule or
|
||||
@ -119,11 +129,11 @@ type ScalingScheduleList struct {
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
// ScalingScheduleList is a list of cluster scoped scaling schedules.
|
||||
// ClusterScalingScheduleList is a list of cluster scoped scaling schedules.
|
||||
// +k8s:deepcopy-gen=true
|
||||
type ClusterScalingScheduleList struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
metav1.ListMeta `json:"metadata,omitempty"`
|
||||
|
||||
Items []ScalingSchedule `json:"items"`
|
||||
Items []ClusterScalingSchedule `json:"items"`
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
@ -58,7 +59,7 @@ func (in *ClusterScalingScheduleList) DeepCopyInto(out *ClusterScalingScheduleLi
|
||||
in.ListMeta.DeepCopyInto(&out.ListMeta)
|
||||
if in.Items != nil {
|
||||
in, out := &in.Items, &out.Items
|
||||
*out = make([]ScalingSchedule, len(*in))
|
||||
*out = make([]ClusterScalingSchedule, len(*in))
|
||||
for i := range *in {
|
||||
(*in)[i].DeepCopyInto(&(*out)[i])
|
||||
}
|
||||
@ -147,6 +148,11 @@ func (in *ScalingScheduleList) DeepCopyObject() runtime.Object {
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ScalingScheduleSpec) DeepCopyInto(out *ScalingScheduleSpec) {
|
||||
*out = *in
|
||||
if in.ScalingWindowDurationMinutes != nil {
|
||||
in, out := &in.ScalingWindowDurationMinutes, &out.ScalingWindowDurationMinutes
|
||||
*out = new(int64)
|
||||
**out = **in
|
||||
}
|
||||
if in.Schedules != nil {
|
||||
in, out := &in.Schedules, &out.Schedules
|
||||
*out = make([]Schedule, len(*in))
|
||||
|
@ -247,13 +247,19 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
|
||||
}
|
||||
|
||||
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
|
||||
metric.External.Metric.Selector != nil &&
|
||||
metric.External.Metric.Selector.MatchLabels != nil {
|
||||
metric.External.Metric.Selector != nil {
|
||||
for k, v := range metric.External.Metric.Selector.MatchLabels {
|
||||
config.Config[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if metric.Type == autoscalingv2.ObjectMetricSourceType &&
|
||||
metric.Object.Metric.Selector != nil {
|
||||
for k, v := range metric.Object.Metric.Selector.MatchLabels {
|
||||
config.Config[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)
|
||||
if present {
|
||||
config.CollectorType = annotationConfigs.CollectorType
|
||||
|
@ -72,10 +72,25 @@ func (g *JSONPathMetricsGetter) GetMetric(metricsURL url.URL) (float64, error) {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if len(nodes) != 1 {
|
||||
if len(nodes) == 0 {
|
||||
return 0, fmt.Errorf("unexpected json: expected single numeric or array value")
|
||||
}
|
||||
|
||||
if len(nodes) > 1 {
|
||||
if g.aggregator == nil {
|
||||
return 0, fmt.Errorf("no aggregator function has been specified")
|
||||
}
|
||||
values := make([]float64, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
v, err := node.GetNumeric()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("unexpected json: did not find numeric or array value '%s': %w", nodes, err)
|
||||
}
|
||||
values = append(values, v)
|
||||
}
|
||||
return g.aggregator(values...), nil
|
||||
}
|
||||
|
||||
node := nodes[0]
|
||||
if node.IsArray() {
|
||||
if g.aggregator == nil {
|
||||
|
@ -51,6 +51,13 @@ func TestJSONPathMetricsGetter(t *testing.T) {
|
||||
result: 5,
|
||||
aggregator: Average,
|
||||
},
|
||||
{
|
||||
name: "glob array query",
|
||||
jsonResponse: []byte(`{"worker_status":[{"last_status":{"backlog":3}},{"last_status":{"backlog":7}}]}`),
|
||||
jsonPath: "$.worker_status.[*].last_status.backlog",
|
||||
result: 5,
|
||||
aggregator: Average,
|
||||
},
|
||||
{
|
||||
name: "json path not resulting in array or number should lead to error",
|
||||
jsonResponse: []byte(`{"metric.value":5}`),
|
||||
|
@ -3,6 +3,7 @@ package collector
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1"
|
||||
@ -78,30 +79,38 @@ type Store interface {
|
||||
// ScalingScheduleCollectorPlugin is a collector plugin for initializing metrics
|
||||
// collectors for getting ScalingSchedule configured metrics.
|
||||
type ScalingScheduleCollectorPlugin struct {
|
||||
store Store
|
||||
now Now
|
||||
store Store
|
||||
now Now
|
||||
defaultScalingWindow time.Duration
|
||||
rampSteps int
|
||||
}
|
||||
|
||||
// ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics
|
||||
// collectors for getting ClusterScalingSchedule configured metrics.
|
||||
type ClusterScalingScheduleCollectorPlugin struct {
|
||||
store Store
|
||||
now Now
|
||||
store Store
|
||||
now Now
|
||||
defaultScalingWindow time.Duration
|
||||
rampSteps int
|
||||
}
|
||||
|
||||
// NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin.
|
||||
func NewScalingScheduleCollectorPlugin(store Store, now Now) (*ScalingScheduleCollectorPlugin, error) {
|
||||
func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ScalingScheduleCollectorPlugin, error) {
|
||||
return &ScalingScheduleCollectorPlugin{
|
||||
store: store,
|
||||
now: now,
|
||||
store: store,
|
||||
now: now,
|
||||
defaultScalingWindow: defaultScalingWindow,
|
||||
rampSteps: rampSteps,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin.
|
||||
func NewClusterScalingScheduleCollectorPlugin(store Store, now Now) (*ClusterScalingScheduleCollectorPlugin, error) {
|
||||
func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ClusterScalingScheduleCollectorPlugin, error) {
|
||||
return &ClusterScalingScheduleCollectorPlugin{
|
||||
store: store,
|
||||
now: now,
|
||||
store: store,
|
||||
now: now,
|
||||
defaultScalingWindow: defaultScalingWindow,
|
||||
rampSteps: rampSteps,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -109,14 +118,14 @@ func NewClusterScalingScheduleCollectorPlugin(store Store, now Now) (*ClusterSca
|
||||
// specified HPA. It's the only required method to implement the
|
||||
// collector.CollectorPlugin interface.
|
||||
func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewScalingScheduleCollector(c.store, c.now, hpa, config, interval)
|
||||
return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval)
|
||||
}
|
||||
|
||||
// NewCollector initializes a new cluster wide scaling schedule
|
||||
// collector from the specified HPA. It's the only required method to
|
||||
// implement the collector.CollectorPlugin interface.
|
||||
func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewClusterScalingScheduleCollector(c.store, c.now, hpa, config, interval)
|
||||
return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval)
|
||||
}
|
||||
|
||||
// ScalingScheduleCollector is a metrics collector for time based
|
||||
@ -135,41 +144,47 @@ type ClusterScalingScheduleCollector struct {
|
||||
// struct used by both ClusterScalingScheduleCollector and the
|
||||
// ScalingScheduleCollector.
|
||||
type scalingScheduleCollector struct {
|
||||
store Store
|
||||
now Now
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
objectReference custom_metrics.ObjectReference
|
||||
hpa *autoscalingv2.HorizontalPodAutoscaler
|
||||
interval time.Duration
|
||||
config MetricConfig
|
||||
store Store
|
||||
now Now
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
objectReference custom_metrics.ObjectReference
|
||||
hpa *autoscalingv2.HorizontalPodAutoscaler
|
||||
interval time.Duration
|
||||
config MetricConfig
|
||||
defaultScalingWindow time.Duration
|
||||
rampSteps int
|
||||
}
|
||||
|
||||
// NewScalingScheduleCollector initializes a new ScalingScheduleCollector.
|
||||
func NewScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) {
|
||||
func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) {
|
||||
return &ScalingScheduleCollector{
|
||||
scalingScheduleCollector{
|
||||
store: store,
|
||||
now: now,
|
||||
objectReference: config.ObjectReference,
|
||||
hpa: hpa,
|
||||
metric: config.Metric,
|
||||
interval: interval,
|
||||
config: *config,
|
||||
store: store,
|
||||
now: now,
|
||||
objectReference: config.ObjectReference,
|
||||
hpa: hpa,
|
||||
metric: config.Metric,
|
||||
interval: interval,
|
||||
config: *config,
|
||||
defaultScalingWindow: defaultScalingWindow,
|
||||
rampSteps: rampSteps,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector.
|
||||
func NewClusterScalingScheduleCollector(store Store, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) {
|
||||
func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) {
|
||||
return &ClusterScalingScheduleCollector{
|
||||
scalingScheduleCollector{
|
||||
store: store,
|
||||
now: now,
|
||||
objectReference: config.ObjectReference,
|
||||
hpa: hpa,
|
||||
metric: config.Metric,
|
||||
interval: interval,
|
||||
config: *config,
|
||||
store: store,
|
||||
now: now,
|
||||
objectReference: config.ObjectReference,
|
||||
hpa: hpa,
|
||||
metric: config.Metric,
|
||||
interval: interval,
|
||||
config: *config,
|
||||
defaultScalingWindow: defaultScalingWindow,
|
||||
rampSteps: rampSteps,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@ -188,7 +203,7 @@ func (c *ScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
if !ok {
|
||||
return nil, ErrNotScalingScheduleFound
|
||||
}
|
||||
return calculateMetrics(scalingSchedule.Spec.Schedules, c.now(), c.objectReference, c.metric)
|
||||
return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric)
|
||||
}
|
||||
|
||||
// GetMetrics is the main implementation for collector.Collector interface
|
||||
@ -221,7 +236,7 @@ func (c *ClusterScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error
|
||||
clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule)
|
||||
}
|
||||
|
||||
return calculateMetrics(clusterScalingSchedule.Spec.Schedules, c.now(), c.objectReference, c.metric)
|
||||
return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric)
|
||||
}
|
||||
|
||||
// Interval returns the interval at which the collector should run.
|
||||
@ -234,9 +249,17 @@ func (c *ClusterScalingScheduleCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
||||
|
||||
func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) {
|
||||
value := 0
|
||||
for _, schedule := range schedules {
|
||||
func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, rampSteps int, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) {
|
||||
scalingWindowDuration := defaultScalingWindow
|
||||
if spec.ScalingWindowDurationMinutes != nil {
|
||||
scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute
|
||||
}
|
||||
if scalingWindowDuration < 0 {
|
||||
return nil, fmt.Errorf("scaling window duration cannot be negative")
|
||||
}
|
||||
|
||||
value := int64(0)
|
||||
for _, schedule := range spec.Schedules {
|
||||
switch schedule.Type {
|
||||
case v1.RepeatingSchedule:
|
||||
location, err := time.LoadLocation(schedule.Period.Timezone)
|
||||
@ -269,9 +292,7 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
|
||||
parsedStartTime.Nanosecond(),
|
||||
location,
|
||||
)
|
||||
if within(now, scheduledTime, schedule.DurationMinutes) && schedule.Value > value {
|
||||
value = schedule.Value
|
||||
}
|
||||
value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value))
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -280,9 +301,8 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
|
||||
if err != nil {
|
||||
return nil, ErrInvalidScheduleDate
|
||||
}
|
||||
if within(now, scheduledTime, schedule.DurationMinutes) && schedule.Value > value {
|
||||
value = schedule.Value
|
||||
}
|
||||
|
||||
value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value))
|
||||
}
|
||||
}
|
||||
|
||||
@ -293,17 +313,56 @@ func calculateMetrics(schedules []v1.Schedule, now time.Time, objectReference cu
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: objectReference,
|
||||
Timestamp: metav1.Time{Time: now},
|
||||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
||||
Value: *resource.NewMilliQuantity(value*1000, resource.DecimalSI),
|
||||
Metric: custom_metrics.MetricIdentifier(metric),
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// within receive two time.Time and a number of minutes. It returns true
|
||||
// if the first given time, instant, is within the period of the second
|
||||
// given time (start) plus the given number of minutes.
|
||||
func within(instant, start time.Time, minutes int) bool {
|
||||
return (instant.After(start) || instant.Equal(start)) &&
|
||||
instant.Before(start.Add(time.Duration(minutes)*time.Minute))
|
||||
func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 {
|
||||
scaleUpStart := startTime.Add(-scalingWindowDuration)
|
||||
endTime := startTime.Add(entryDuration)
|
||||
scaleUpEnd := endTime.Add(scalingWindowDuration)
|
||||
|
||||
if between(timestamp, startTime, endTime) {
|
||||
return value
|
||||
}
|
||||
if between(timestamp, scaleUpStart, startTime) {
|
||||
return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, rampSteps, value)
|
||||
}
|
||||
if between(timestamp, endTime, scaleUpEnd) {
|
||||
return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, rampSteps, value)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// The HPA has a rule to do not scale up or down if the change in the
|
||||
// metric is less than 10% (by default) of the current value. We will
|
||||
// use buckets of time using the floor of each as the returned metric.
|
||||
// Any config greater or equal to 10 buckets must guarantee changes
|
||||
// bigger than 10%.
|
||||
func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 {
|
||||
if scalingWindowDuration == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
steps := float64(rampSteps)
|
||||
|
||||
requiredPercentage := math.Abs(float64(timestamp.Sub(startTime))) / float64(scalingWindowDuration)
|
||||
return int64(math.Floor(requiredPercentage*steps) * (float64(value) / steps))
|
||||
}
|
||||
|
||||
func between(timestamp, start, end time.Time) bool {
|
||||
if timestamp.Before(start) {
|
||||
return false
|
||||
}
|
||||
return timestamp.Before(end)
|
||||
}
|
||||
|
||||
func maxInt64(i1, i2 int64) int64 {
|
||||
if i1 > i2 {
|
||||
return i1
|
||||
}
|
||||
return i2
|
||||
}
|
||||
|
@ -12,7 +12,11 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
const hHMMFormat = "15:04"
|
||||
const (
|
||||
hHMMFormat = "15:04"
|
||||
defaultScalingWindowDuration = 1 * time.Minute
|
||||
defaultRampSteps = 10
|
||||
)
|
||||
|
||||
type schedule struct {
|
||||
kind string
|
||||
@ -21,7 +25,7 @@ type schedule struct {
|
||||
days []v1.ScheduleDay
|
||||
timezone string
|
||||
duration int
|
||||
value int
|
||||
value int64
|
||||
}
|
||||
|
||||
func TestScalingScheduleCollector(t *testing.T) {
|
||||
@ -37,11 +41,15 @@ func TestScalingScheduleCollector(t *testing.T) {
|
||||
return uTCNow
|
||||
}
|
||||
|
||||
tenMinutes := int64(10)
|
||||
|
||||
for _, tc := range []struct {
|
||||
msg string
|
||||
schedules []schedule
|
||||
expectedValue int
|
||||
err error
|
||||
msg string
|
||||
schedules []schedule
|
||||
scalingWindowDurationMinutes *int64
|
||||
expectedValue int64
|
||||
err error
|
||||
rampSteps int
|
||||
}{
|
||||
{
|
||||
msg: "Return the right value for one time config",
|
||||
@ -80,7 +88,70 @@ func TestScalingScheduleCollector(t *testing.T) {
|
||||
expectedValue: 100,
|
||||
},
|
||||
{
|
||||
msg: "Return the default value (0) for one time config - 30 seconds after",
|
||||
msg: "Return the scaled value (60) for one time config - 20 seconds before starting",
|
||||
schedules: []schedule{
|
||||
{
|
||||
date: nowTime.Add(time.Second * 20).Format(time.RFC3339),
|
||||
kind: "OneTime",
|
||||
duration: 45,
|
||||
value: 100,
|
||||
},
|
||||
},
|
||||
expectedValue: 60,
|
||||
},
|
||||
{
|
||||
msg: "Return the scaled value (60) for one time config - 20 seconds after",
|
||||
schedules: []schedule{
|
||||
{
|
||||
date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 20).Format(time.RFC3339),
|
||||
kind: "OneTime",
|
||||
duration: 45,
|
||||
value: 100,
|
||||
},
|
||||
},
|
||||
expectedValue: 60,
|
||||
},
|
||||
{
|
||||
msg: "10 steps (default) return 90% of the metric, even 1 second before",
|
||||
schedules: []schedule{
|
||||
{
|
||||
date: nowTime.Add(time.Second * 1).Format(time.RFC3339),
|
||||
kind: "OneTime",
|
||||
duration: 45,
|
||||
value: 100,
|
||||
},
|
||||
},
|
||||
expectedValue: 90,
|
||||
},
|
||||
{
|
||||
msg: "5 steps return 80% of the metric, even 1 second before",
|
||||
schedules: []schedule{
|
||||
{
|
||||
date: nowTime.Add(time.Second * 1).Format(time.RFC3339),
|
||||
kind: "OneTime",
|
||||
duration: 45,
|
||||
value: 100,
|
||||
},
|
||||
},
|
||||
expectedValue: 80,
|
||||
rampSteps: 5,
|
||||
},
|
||||
{
|
||||
msg: "Return the scaled value (90) for one time config with a custom scaling window - 30 seconds before starting",
|
||||
scalingWindowDurationMinutes: &tenMinutes,
|
||||
schedules: []schedule{
|
||||
{
|
||||
date: nowTime.Add(time.Second * 30).Format(time.RFC3339),
|
||||
kind: "OneTime",
|
||||
duration: 45,
|
||||
value: 100,
|
||||
},
|
||||
},
|
||||
expectedValue: 90,
|
||||
},
|
||||
{
|
||||
msg: "Return the scaled value (90) for one time config with a custom scaling window - 30 seconds after",
|
||||
scalingWindowDurationMinutes: &tenMinutes,
|
||||
schedules: []schedule{
|
||||
{
|
||||
date: nowTime.Add(-time.Minute * 45).Add(-time.Second * 30).Format(time.RFC3339),
|
||||
@ -89,7 +160,7 @@ func TestScalingScheduleCollector(t *testing.T) {
|
||||
value: 100,
|
||||
},
|
||||
},
|
||||
expectedValue: 0,
|
||||
expectedValue: 90,
|
||||
},
|
||||
{
|
||||
msg: "Return the default value (0) for one time config not started yet (20 minutes before)",
|
||||
@ -427,17 +498,22 @@ func TestScalingScheduleCollector(t *testing.T) {
|
||||
scalingScheduleName := "my_scaling_schedule"
|
||||
namespace := "default"
|
||||
|
||||
rampSteps := tc.rampSteps
|
||||
if rampSteps == 0 {
|
||||
rampSteps = defaultRampSteps
|
||||
}
|
||||
|
||||
schedules := getSchedules(tc.schedules)
|
||||
store := newMockStore(scalingScheduleName, namespace, schedules)
|
||||
plugin, err := NewScalingScheduleCollectorPlugin(store, now)
|
||||
store := newMockStore(scalingScheduleName, namespace, tc.scalingWindowDurationMinutes, schedules)
|
||||
plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration, rampSteps)
|
||||
require.NoError(t, err)
|
||||
|
||||
clusterStore := newClusterMockStore(scalingScheduleName, schedules)
|
||||
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now)
|
||||
clusterStore := newClusterMockStore(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules)
|
||||
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration, rampSteps)
|
||||
require.NoError(t, err)
|
||||
|
||||
clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, schedules)
|
||||
clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now)
|
||||
clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules)
|
||||
clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration, rampSteps)
|
||||
require.NoError(t, err)
|
||||
|
||||
hpa := makeScalingScheduleHPA(namespace, scalingScheduleName)
|
||||
@ -505,14 +581,14 @@ func TestScalingScheduleObjectNotPresentReturnsError(t *testing.T) {
|
||||
make(map[string]interface{}),
|
||||
getByKeyFn,
|
||||
}
|
||||
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now)
|
||||
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
|
||||
require.NoError(t, err)
|
||||
|
||||
clusterStore := mockStore{
|
||||
make(map[string]interface{}),
|
||||
getByKeyFn,
|
||||
}
|
||||
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now)
|
||||
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration, defaultRampSteps)
|
||||
require.NoError(t, err)
|
||||
|
||||
hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName")
|
||||
@ -567,10 +643,10 @@ func TestReturnsErrorWhenStoreDoes(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now)
|
||||
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
|
||||
require.NoError(t, err)
|
||||
|
||||
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now)
|
||||
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
|
||||
require.NoError(t, err)
|
||||
|
||||
hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName")
|
||||
@ -615,7 +691,7 @@ func getByKeyFn(d map[string]interface{}, key string) (item interface{}, exists
|
||||
return item, exists, nil
|
||||
}
|
||||
|
||||
func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore {
|
||||
func newMockStore(name, namespace string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
|
||||
return mockStore{
|
||||
map[string]interface{}{
|
||||
fmt.Sprintf("%s/%s", namespace, name): &v1.ScalingSchedule{
|
||||
@ -623,7 +699,8 @@ func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore {
|
||||
Name: name,
|
||||
},
|
||||
Spec: v1.ScalingScheduleSpec{
|
||||
Schedules: schedules,
|
||||
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
|
||||
Schedules: schedules,
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -631,7 +708,7 @@ func newMockStore(name, namespace string, schedules []v1.Schedule) mockStore {
|
||||
}
|
||||
}
|
||||
|
||||
func newClusterMockStore(name string, schedules []v1.Schedule) mockStore {
|
||||
func newClusterMockStore(name string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
|
||||
return mockStore{
|
||||
map[string]interface{}{
|
||||
name: &v1.ClusterScalingSchedule{
|
||||
@ -639,7 +716,8 @@ func newClusterMockStore(name string, schedules []v1.Schedule) mockStore {
|
||||
Name: name,
|
||||
},
|
||||
Spec: v1.ScalingScheduleSpec{
|
||||
Schedules: schedules,
|
||||
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
|
||||
Schedules: schedules,
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -650,7 +728,7 @@ func newClusterMockStore(name string, schedules []v1.Schedule) mockStore {
|
||||
// The cache.Store returns the v1.ClusterScalingSchedule items as
|
||||
// v1.ScalingSchedule when it first lists it. When it's update it
|
||||
// asserts it correctly to the v1.ClusterScalingSchedule type.
|
||||
func newClusterMockStoreFirstRun(name string, schedules []v1.Schedule) mockStore {
|
||||
func newClusterMockStoreFirstRun(name string, scalingWindowDurationMinutes *int64, schedules []v1.Schedule) mockStore {
|
||||
return mockStore{
|
||||
map[string]interface{}{
|
||||
name: &v1.ScalingSchedule{
|
||||
@ -658,7 +736,8 @@ func newClusterMockStoreFirstRun(name string, schedules []v1.Schedule) mockStore
|
||||
Name: name,
|
||||
},
|
||||
Spec: v1.ScalingScheduleSpec{
|
||||
Schedules: schedules,
|
||||
ScalingWindowDurationMinutes: scalingWindowDurationMinutes,
|
||||
Schedules: schedules,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -10,6 +10,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
|
||||
rginterface "github.com/szuecs/routegroup-client/client/clientset/versioned"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -31,14 +33,16 @@ var (
|
||||
// collectors for getting skipper ingress metrics.
|
||||
type SkipperCollectorPlugin struct {
|
||||
client kubernetes.Interface
|
||||
rgClient rginterface.Interface
|
||||
plugin CollectorPlugin
|
||||
backendAnnotations []string
|
||||
}
|
||||
|
||||
// NewSkipperCollectorPlugin initializes a new SkipperCollectorPlugin.
|
||||
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin, backendAnnotations []string) (*SkipperCollectorPlugin, error) {
|
||||
func NewSkipperCollectorPlugin(client kubernetes.Interface, rgClient rginterface.Interface, prometheusPlugin *PrometheusCollectorPlugin, backendAnnotations []string) (*SkipperCollectorPlugin, error) {
|
||||
return &SkipperCollectorPlugin{
|
||||
client: client,
|
||||
rgClient: rgClient,
|
||||
plugin: prometheusPlugin,
|
||||
backendAnnotations: backendAnnotations,
|
||||
}, nil
|
||||
@ -47,14 +51,18 @@ func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *Pr
|
||||
// NewCollector initializes a new skipper collector from the specified HPA.
|
||||
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
if strings.HasPrefix(config.Metric.Name, rpsMetricName) {
|
||||
backend := ""
|
||||
if len(config.Metric.Name) > len(rpsMetricName) {
|
||||
metricNameParts := strings.Split(config.Metric.Name, rpsMetricBackendSeparator)
|
||||
if len(metricNameParts) == 2 {
|
||||
backend = metricNameParts[1]
|
||||
backend, ok := config.Config["backend"]
|
||||
if !ok {
|
||||
// TODO: remove the deprecated way of specifying
|
||||
// optional backend at a later point in time.
|
||||
if len(config.Metric.Name) > len(rpsMetricName) {
|
||||
metricNameParts := strings.Split(config.Metric.Name, rpsMetricBackendSeparator)
|
||||
if len(metricNameParts) == 2 {
|
||||
backend = metricNameParts[1]
|
||||
}
|
||||
}
|
||||
}
|
||||
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval, c.backendAnnotations, backend)
|
||||
return NewSkipperCollector(c.client, c.rgClient, c.plugin, hpa, config, interval, c.backendAnnotations, backend)
|
||||
}
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
|
||||
}
|
||||
@ -63,6 +71,7 @@ func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAu
|
||||
// It depends on the prometheus collector for getting the metrics.
|
||||
type SkipperCollector struct {
|
||||
client kubernetes.Interface
|
||||
rgClient rginterface.Interface
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
objectReference custom_metrics.ObjectReference
|
||||
hpa *autoscalingv2.HorizontalPodAutoscaler
|
||||
@ -74,9 +83,10 @@ type SkipperCollector struct {
|
||||
}
|
||||
|
||||
// NewSkipperCollector initializes a new SkipperCollector.
|
||||
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
|
||||
func NewSkipperCollector(client kubernetes.Interface, rgClient rginterface.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
|
||||
return &SkipperCollector{
|
||||
client: client,
|
||||
rgClient: rgClient,
|
||||
objectReference: config.ObjectReference,
|
||||
hpa: hpa,
|
||||
metric: config.Metric,
|
||||
@ -100,7 +110,7 @@ func getAnnotationWeight(backendWeights string, backend string) (float64, error)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) (float64, error) {
|
||||
func getIngressWeight(ingressAnnotations map[string]string, backendAnnotations []string, backend string) (float64, error) {
|
||||
maxWeight := 0.0
|
||||
annotationsPresent := false
|
||||
|
||||
@ -128,26 +138,65 @@ func getWeights(ingressAnnotations map[string]string, backendAnnotations []strin
|
||||
return 0.0, errBackendNameMissing
|
||||
}
|
||||
|
||||
// getCollector returns a collector for getting the metrics.
|
||||
func (c *SkipperCollector) getCollector() (Collector, error) {
|
||||
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(context.TODO(), c.objectReference.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func getRouteGroupWeight(backends []rgv1.RouteGroupBackendReference, backendName string) (float64, error) {
|
||||
if len(backends) <= 1 {
|
||||
return 1.0, nil
|
||||
}
|
||||
|
||||
backendWeight, err := getWeights(ingress.Annotations, c.backendAnnotations, c.backend)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if backendName == "" {
|
||||
return 0.0, errBackendNameMissing
|
||||
}
|
||||
|
||||
for _, backend := range backends {
|
||||
if backend.BackendName == backendName {
|
||||
return float64(backend.Weight) / 100.0, nil
|
||||
}
|
||||
}
|
||||
|
||||
return 0.0, nil
|
||||
}
|
||||
|
||||
// getCollector returns a collector for getting the metrics.
|
||||
func (c *SkipperCollector) getCollector(ctx context.Context) (Collector, error) {
|
||||
var escapedHostnames []string
|
||||
var backendWeight float64
|
||||
switch c.objectReference.Kind {
|
||||
case "Ingress":
|
||||
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(ctx, c.objectReference.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
backendWeight, err = getIngressWeight(ingress.Annotations, c.backendAnnotations, c.backend)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, rule := range ingress.Spec.Rules {
|
||||
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1)))
|
||||
}
|
||||
case "RouteGroup":
|
||||
routegroup, err := c.rgClient.ZalandoV1().RouteGroups(c.objectReference.Namespace).Get(ctx, c.objectReference.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
backendWeight, err = getRouteGroupWeight(routegroup.Spec.DefaultBackends, c.backend)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, host := range routegroup.Spec.Hosts {
|
||||
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(host, ".", "_", -1)))
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown skipper resource kind %s for resource %s/%s", c.objectReference.Kind, c.objectReference.Namespace, c.objectReference.Name)
|
||||
}
|
||||
|
||||
config := c.config
|
||||
|
||||
var escapedHostnames []string
|
||||
for _, rule := range ingress.Spec.Rules {
|
||||
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1)))
|
||||
}
|
||||
|
||||
if len(escapedHostnames) == 0 {
|
||||
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
|
||||
return nil, fmt.Errorf("no hosts defined on %s %s/%s, unable to create collector", c.objectReference.Kind, c.objectReference.Namespace, c.objectReference.Name)
|
||||
}
|
||||
|
||||
config.Config = map[string]string{
|
||||
@ -165,7 +214,7 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
|
||||
|
||||
// GetMetrics gets skipper metrics from prometheus.
|
||||
func (c *SkipperCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
collector, err := c.getCollector()
|
||||
collector, err := c.getCollector(context.TODO())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -82,6 +82,16 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) {
|
||||
Resource: "ingresses",
|
||||
Group: group,
|
||||
}
|
||||
case "RouteGroup":
|
||||
group := "zalando.org"
|
||||
gv, err := schema.ParseGroupVersion(value.DescribedObject.APIVersion)
|
||||
if err == nil {
|
||||
group = gv.Group
|
||||
}
|
||||
groupResource = schema.GroupResource{
|
||||
Resource: "routegroups",
|
||||
Group: group,
|
||||
}
|
||||
case "ScalingSchedule":
|
||||
group := "zalando.org"
|
||||
gv, err := schema.ParseGroupVersion(value.DescribedObject.APIVersion)
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
"github.com/kubernetes-sigs/custom-metrics-apiserver/pkg/cmd/server"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/spf13/cobra"
|
||||
rg "github.com/szuecs/routegroup-client/client/clientset/versioned"
|
||||
"github.com/zalando-incubator/cluster-lifecycle-manager/pkg/credentials-loader/platformiam"
|
||||
generatedopenapi "github.com/zalando-incubator/kube-metrics-adapter/pkg/api/generated/openapi"
|
||||
v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1"
|
||||
@ -113,6 +114,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
"path to the credentials dir where tokens are stored")
|
||||
flags.BoolVar(&o.SkipperIngressMetrics, "skipper-ingress-metrics", o.SkipperIngressMetrics, ""+
|
||||
"whether to enable skipper ingress metrics")
|
||||
flags.BoolVar(&o.SkipperRouteGroupMetrics, "skipper-routegroup-metrics", o.SkipperRouteGroupMetrics, ""+
|
||||
"whether to enable skipper routegroup metrics")
|
||||
flags.StringArrayVar(&o.SkipperBackendWeightAnnotation, "skipper-backends-annotation", o.SkipperBackendWeightAnnotation, ""+
|
||||
"the annotation to get backend weights so that the returned metric can be weighted")
|
||||
flags.BoolVar(&o.AWSExternalMetrics, "aws-external-metrics", o.AWSExternalMetrics, ""+
|
||||
@ -125,6 +128,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.")
|
||||
flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+
|
||||
"whether to enable time-based ScalingSchedule metrics")
|
||||
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
|
||||
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
|
||||
return cmd
|
||||
}
|
||||
|
||||
@ -170,6 +175,11 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
return fmt.Errorf("failed to initialize new client: %v", err)
|
||||
}
|
||||
|
||||
rgClient, err := rg.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize RouteGroup client: %v", err)
|
||||
}
|
||||
|
||||
collectorFactory := collector.NewCollectorFactory()
|
||||
|
||||
if o.PrometheusServer != "" {
|
||||
@ -186,15 +196,24 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.PrometheusMetricType, collector.PrometheusMetricNameLegacy}, promPlugin)
|
||||
|
||||
// skipper collector can only be enabled if prometheus is.
|
||||
if o.SkipperIngressMetrics {
|
||||
skipperPlugin, err := collector.NewSkipperCollectorPlugin(client, promPlugin, o.SkipperBackendWeightAnnotation)
|
||||
if o.SkipperIngressMetrics || o.SkipperRouteGroupMetrics {
|
||||
skipperPlugin, err := collector.NewSkipperCollectorPlugin(client, rgClient, promPlugin, o.SkipperBackendWeightAnnotation)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize skipper collector plugin: %v", err)
|
||||
}
|
||||
|
||||
err = collectorFactory.RegisterObjectCollector("Ingress", "", skipperPlugin)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register skipper collector plugin: %v", err)
|
||||
if o.SkipperIngressMetrics {
|
||||
err = collectorFactory.RegisterObjectCollector("Ingress", "", skipperPlugin)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register skipper Ingress collector plugin: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if o.SkipperRouteGroupMetrics {
|
||||
err = collectorFactory.RegisterObjectCollector("RouteGroup", "", skipperPlugin)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register skipper RouteGroup collector plugin: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -276,7 +295,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
)
|
||||
go reflector.Run(ctx.Done())
|
||||
|
||||
clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now)
|
||||
clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err)
|
||||
}
|
||||
@ -285,7 +304,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err)
|
||||
}
|
||||
|
||||
plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now)
|
||||
plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err)
|
||||
}
|
||||
@ -390,6 +409,9 @@ type AdapterServerOptions struct {
|
||||
// SkipperIngressMetrics switches on support for skipper ingress based
|
||||
// metric collection.
|
||||
SkipperIngressMetrics bool
|
||||
// SkipperRouteGroupMetrics switches on support for skipper routegroup
|
||||
// based metric collection.
|
||||
SkipperRouteGroupMetrics bool
|
||||
// AWSExternalMetrics switches on support for getting external metrics
|
||||
// from AWS.
|
||||
AWSExternalMetrics bool
|
||||
@ -408,4 +430,8 @@ type AdapterServerOptions struct {
|
||||
GCInterval time.Duration
|
||||
// Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling.
|
||||
ScalingScheduleMetrics bool
|
||||
// Default ramp-up/ramp-down window duration for scheduled metrics
|
||||
DefaultScheduledScalingWindow time.Duration
|
||||
// Number of steps utilized during the rampup and rampdown for scheduled metrics
|
||||
RampSteps int
|
||||
}
|
||||
|
Reference in New Issue
Block a user