Compare commits

...

17 Commits

Author SHA1 Message Date
c9fa15c7d4 Updated the tests (#103)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-02-04 09:48:50 +01:00
e3330dcf43 Reuse the HTTP client for scraping pods (#102)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-01-30 17:49:22 +01:00
8e4662b26c Permit disregarding incompatible HPAs (#95)
* This commit adds a --disregard-incompatible-hpas that makes the HPA
provider stop erroring out when a collector cannot be created for a
metric in a HPA. Useful when kube-metrics-adapter runs alongside another
metrics provider. Fixes issue #94.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Make tests pass

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Wraps the Plugin Not Found error in a new type that can be checked by the caller of a function to determine if its contents should be logged or added as an event to the HPA, when this HPA is incompatible.
The disregardIncompatibleHPAs is now targetting only the log or addition of the same event.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Invert if expression to select when we should log
CreateNewMetricsCollector errors: don't log when both conditions are true - it's not a PluginNotFoundError
and disregardIncompatibleHPAs flag is set to true. This way, if an error
is NOT PluginNotFoundError it will always be logged, and when it IS
PluginNotFoundError it will only be logged when
disregardIncompatibleHPAs is false.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Remove redundant "whether to"

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add test case for updating HPAs via HPA Provider while disregarding
incompatible HPAs.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2020-01-30 11:33:15 +01:00
9e211b181a Merge pull request #101 from zalando-incubator/update-to-v2beta2
Only support autoscaling/v2beta2
2020-01-29 16:47:15 +01:00
9d78fff1b5 Only support autoscaling/v2beta2
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2020-01-29 15:59:20 +01:00
1c6f9e2ea6 Merge pull request #100 from affo/feat/influxdb-collector
feat(collector): add InfluxDB collector
2020-01-24 10:56:52 +01:00
c0eda7cd1e adding tests for collector creation
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-24 09:54:35 +01:00
75f3e48f70 address szuecs review
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-24 09:07:56 +01:00
5b55bea994 feat(collector): add InfluxDB collector
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-22 10:09:29 +01:00
4412e3dca4 Merge pull request #92 from zalando-incubator/njuettner-patch
Updating golangci
2019-11-26 14:49:02 +01:00
8f9277258c Increase timeout for golangci-lint
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-11-25 11:36:18 +01:00
8c3fef45fd Updating golangci
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
Signed-off-by: Nick Jüttner <nick@zalando.de>
2019-11-25 10:56:22 +01:00
120950078c Fix #89 by copying the MatchLabels map instead of referencing it. (#90)
Signed-off-by: Johann Fuechsl <johann@fuechsl.co>
2019-11-07 14:38:26 +01:00
0790bc351a This fixes an issue with the type switch that was never able to fall (#88)
into cases of []<number>, being <number> a number type such as int,
float32, float64. This is because Go can't type cast slices of
interface{} out right because it's impossible to know the true types of
the slice members beforehand.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2019-11-05 09:43:25 +01:00
f6b2aede5b Support for JSONPath expressions that return arrays of values (#85)
* This is the initial implementation of support for JSONPath expressions
that return arrays of values instead of a single value.

This extends the
collector to define a few handy reducer functions that take in the slice
of float64 and return a single value. It also allows the user to define
which reducer function to use via the
"metric-config.<metricType>.<metricName>.json-path/reducer-func"
annotation, which
can have the values of 'avg', 'min', 'max' and 'sum'.

For instance, the Ruby puma webserver exposes metrics of the form of $.worker_status[*].last_status.pool_capacity that have to be consumed as an array of values to be properly targetted.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Renames "reducerFunc" to "aggregator" for consistency with other
collectors. Renames the annotation from
"metric-config.<metricType>.<metricName>.json-path/reducer-func" to "metric-config.<metricType>.<metricName>.json-path/aggregator".

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Return error instead of defaulting to the avg aggregator, when no valid
aggregator name was specified and the JSONPath value is a slice of
numbers.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Fix index out of range on initialized output slice that was found while
writing tests.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add tests for all added functions + NewJSONPathMetricsGetter

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add documentation on the `aggregator` option.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* reducer function -> aggregator function

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Fix comment to account for returned error.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2019-10-24 18:15:10 +02:00
7d5e719eb0 Merge pull request #86 from pinkavaj/fix-var-name
Fix variable name typo
2019-10-24 10:16:58 +02:00
7497a61a2c Fix variable name typo
Signed-off-by: Jiri Pinkava <jiri.pinkava@rossum.ai>
2019-10-24 09:45:22 +02:00
20 changed files with 1006 additions and 933 deletions

View File

@ -1,7 +1,4 @@
run:
skip-files:
- "pkg/provider/generated.conversion.go"
- "pkg/provider/conversion.go"
linters-settings:
golint:
min-confidence: 0.9

View File

@ -5,7 +5,7 @@ go:
- "1.13.x"
env:
- GO111MODULE=on GOLANGCI_RELEASE="v1.16.0"
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"
before_install:
- GO111MODULE=off go get github.com/mattn/goveralls
@ -13,8 +13,8 @@ before_install:
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
script:
- make check
- make test
- make build.docker
- make check
- roveralls
- goveralls -v -coverprofile=roveralls.coverprofile -service=travis-ci

View File

@ -19,7 +19,8 @@ test:
go test -v $(GOPKGS)
check:
golangci-lint run ./...
go mod download
golangci-lint run --timeout=2m ./...
build.local: build/$(BINARY)
build.linux: build/linux/$(BINARY)

203
README.md

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
apiVersion: autoscaling/v2beta1
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: custom-metrics-consumer
@ -25,25 +25,36 @@ spec:
# - type: Resource
# resource:
# name: cpu
# targetAverageUtilization: 50
# current:
# averageUtilization: 50
- type: Pods
pods:
metricName: queue-length
targetAverageValue: 1k
metric:
name: queue-length
target:
averageValue: 1k
type: AverageValue
- type: Object
object:
metricName: requests-per-second
target:
describedObject:
apiVersion: extensions/v1beta1
kind: Ingress
name: custom-metrics-consumer
averageValue: 10
targetValue: 10 # this must be set, but has no effect if `averageValue` is defined.
metric:
name: requests-per-second
target:
averageValue: "10"
type: AverageValue
- type: External
external:
metricName: sqs-queue-length
metricSelector:
matchLabels:
queue-name: foobar
region: eu-central-1
targetAverageValue: 30
metric:
name: sqs-queue-length
selector:
matchLabels:
queue-name: foobar
region: eu-central-1
target:
averageValue: "30"
type: AverageValue

3
go.mod
View File

@ -7,8 +7,7 @@ require (
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/influxdb-client-go v0.1.4
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/prometheus/client_golang v0.9.2

132
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@ package main
import (
"flag"
_ "net/http/pprof"
"os"
"runtime"

View File

@ -65,6 +65,27 @@ func TestParser(t *testing.T) {
},
PerReplica: false,
},
{
Name: "influxdb metrics",
Annotations: map[string]string{
"metric-config.external.flux-query.influxdb/range1m": `from(bucket: "?") |> range(start: -1m)`,
"metric-config.external.flux-query.influxdb/range2m": `from(bucket: "?") |> range(start: -2m)`,
"metric-config.external.flux-query.influxdb/range3m": `from(bucket: "?") |> range(start: -3m)`,
"metric-config.external.flux-query.influxdb/address": "http://localhost:9999",
"metric-config.external.flux-query.influxdb/token": "sEcr3TT0ken",
"metric-config.external.flux-query.influxdb/org-id": "deadbeef",
},
MetricName: "flux-query",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef",
},
},
} {
t.Run(tc.Name, func(t *testing.T) {
hpaMap := make(AnnotationConfigMap)

View File

@ -46,6 +46,14 @@ type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
}
type PluginNotFoundError struct {
metricTypeName MetricTypeName
}
func (p *PluginNotFoundError) Error() string {
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
}
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
if metricCollector == "" {
c.podsPlugins.Any = plugin
@ -139,7 +147,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
}
}
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
}
type MetricTypeName struct {
@ -213,7 +221,9 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
metric.External.Metric.Selector != nil &&
metric.External.Metric.Selector.MatchLabels != nil {
config.Config = metric.External.Metric.Selector.MatchLabels
for k, v := range metric.External.Metric.Selector.MatchLabels {
config.Config[k] = v
}
}
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)

View File

@ -0,0 +1,152 @@
package collector
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb-client-go"
"k8s.io/api/autoscaling/v2beta2"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
InfluxDBMetricName = "flux-query"
influxDBAddressKey = "address"
influxDBTokenKey = "token"
influxDBOrgIDKey = "org-id"
influxDBQueryNameLabelKey = "query-name"
)
type InfluxDBCollectorPlugin struct {
kubeClient kubernetes.Interface
address string
token string
orgID string
}
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, orgID string) (*InfluxDBCollectorPlugin, error) {
return &InfluxDBCollectorPlugin{
kubeClient: client,
address: address,
token: token,
orgID: orgID,
}, nil
}
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(p.address, p.orgID, p.token, config, interval)
}
type InfluxDBCollector struct {
address string
token string
orgID string
influxDBClient *influxdb.Client
interval time.Duration
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
query string
}
func NewInfluxDBCollector(address string, token string, orgID string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
collector := &InfluxDBCollector{
interval: interval,
metric: config.Metric,
metricType: config.Type,
}
switch configType := config.Type; configType {
case autoscalingv2.ObjectMetricSourceType:
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
case autoscalingv2.ExternalMetricSourceType:
// `metricSelector` is flattened into the MetricConfig.Config.
queryName, ok := config.Config[influxDBQueryNameLabelKey]
if !ok {
return nil, fmt.Errorf("selector for Flux query is not specified, "+
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
}
if query, ok := config.Config[queryName]; ok {
// TODO(affo): validate the query once this is done:
// https://github.com/influxdata/influxdb-client-go/issues/73.
collector.query = query
} else {
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
}
default:
return nil, fmt.Errorf("unknown metric type: %v", configType)
}
// Use custom InfluxDB config if defined in HPA annotation.
if v, ok := config.Config[influxDBAddressKey]; ok {
address = v
}
if v, ok := config.Config[influxDBTokenKey]; ok {
token = v
}
if v, ok := config.Config[influxDBOrgIDKey]; ok {
orgID = v
}
influxDbClient, err := influxdb.New(address, token)
if err != nil {
return nil, err
}
collector.address = address
collector.token = token
collector.orgID = orgID
collector.influxDBClient = influxDbClient
return collector, nil
}
// queryResult is for unmarshaling the result from InfluxDB.
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
type queryResult struct {
MetricValue float64
}
// getValue returns the first result gathered from an InfluxDB instance.
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.orgID)
if err != nil {
return resource.Quantity{}, err
}
defer res.Close()
// Keeping just the first result.
if res.Next() {
qr := queryResult{}
if err := res.Unmarshal(&qr); err != nil {
return resource.Quantity{}, fmt.Errorf("error in unmarshaling query result: %v", err)
}
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
}
if err := res.Err; err != nil {
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
}
return resource.Quantity{}, fmt.Errorf("empty result returned")
}
func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.getValue()
if err != nil {
return nil, err
}
cm := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{
Time: time.Now().UTC(),
},
Value: v,
},
}
return []CollectedMetric{cm}, nil
}
func (c *InfluxDBCollector) Interval() time.Duration {
return c.interval
}

View File

@ -0,0 +1,155 @@
package collector
import (
"strings"
"testing"
"time"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestInfluxDBCollector_New(t *testing.T) {
t.Run("simple", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
// This is actually useless, because the selector should be flattened in Config when parsing.
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "range2m",
},
}
c, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "secret"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -2m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
t.Run("override params", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef1234",
"query-name": "range3m",
},
}
c, err := NewInfluxDBCollector("http://localhost:8888", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef1234"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "sEcr3TT0ken"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -3m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
// Errors.
for _, tc := range []struct {
name string
mTypeName MetricTypeName
config map[string]string
errorStartsWith string
}{
{
name: "object metric",
mTypeName: MetricTypeName{
Type: v2beta2.ObjectMetricSourceType,
},
errorStartsWith: "InfluxDB does not support object",
},
{
name: "no selector",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
// The selector should be flattened into the config by the parsing step, but it isn't.
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
},
errorStartsWith: "selector for Flux query is not specified",
},
{
name: "referencing non-existing query",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "rangeXm",
},
errorStartsWith: "no Flux query defined for metric",
},
} {
t.Run("error - "+tc.name, func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: tc.mTypeName,
CollectorName: "influxdb",
Config: tc.config,
}
_, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err == nil {
t.Fatal("expected error got none")
}
if want, got := tc.errorStartsWith, err.Error(); !strings.HasPrefix(got, want) {
t.Fatalf("%s should start with %s", got, want)
}
})
}
}

View File

@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"strconv"
@ -17,23 +19,26 @@ import (
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
scheme string
path string
port int
jsonPath *jsonpath.Compiled
scheme string
path string
port int
aggregator string
client *http.Client
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
getter := &JSONPathMetricsGetter{}
httpClient := defaultHTTPClient()
getter := &JSONPathMetricsGetter{client: httpClient}
if v, ok := config["json-key"]; ok {
pat, err := jsonpath.Compile(v)
path, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}
getter.jsonPath = pat
getter.jsonPath = path
}
if v, ok := config["scheme"]; ok {
@ -52,14 +57,33 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
getter.port = n
}
if v, ok := config["aggregator"]; ok {
getter.aggregator = v
}
return getter, nil
}
func defaultHTTPClient() *http.Client {
client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 15 * time.Second,
}).DialContext,
MaxIdleConns: 50,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 15 * time.Second,
}
return client
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
data, err := g.getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil {
return 0, err
}
@ -83,20 +107,42 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
return float64(res), nil
case float64:
return res, nil
case []interface{}:
s, err := castSlice(res)
if err != nil {
return 0, err
}
return reduce(s, g.aggregator)
default:
return 0, fmt.Errorf("unsupported type %T", res)
}
}
// getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
// castSlice takes a slice of interface and returns a slice of float64 if all
// values in slice were castable, else returns an error
func castSlice(in []interface{}) ([]float64, error) {
out := []float64{}
for _, v := range in {
switch v := v.(type) {
case int:
out = append(out, float64(v))
case float32:
out = append(out, float64(v))
case float64:
out = append(out, v)
default:
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
}
}
httpClient := &http.Client{
Timeout: 15 * time.Second,
Transport: &http.Transport{},
return out, nil
}
// getPodMetrics returns the content of the pods metrics endpoint.
func (g *JSONPathMetricsGetter) getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
}
if scheme == "" {
@ -114,7 +160,7 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
return nil, err
}
resp, err := httpClient.Do(request)
resp, err := g.client.Do(request)
if err != nil {
return nil, err
}
@ -131,3 +177,64 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
return data, nil
}
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
func reduce(values []float64, aggregator string) (float64, error) {
switch aggregator {
case "avg":
return avg(values), nil
case "min":
return min(values), nil
case "max":
return max(values), nil
case "sum":
return sum(values), nil
default:
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
}
}
// avg implements the average mathematical function over a slice of float64
func avg(values []float64) float64 {
sum := sum(values)
return sum / float64(len(values))
}
// min implements the absolute minimum mathematical function over a slice of float64
func min(values []float64) float64 {
// initialized with positive infinity, all finite numbers are smaller than it
curMin := math.Inf(1)
for _, v := range values {
if v < curMin {
curMin = v
}
}
return curMin
}
// max implements the absolute maximum mathematical function over a slice of float64
func max(values []float64) float64 {
// initialized with negative infinity, all finite numbers are bigger than it
curMax := math.Inf(-1)
for _, v := range values {
if v > curMax {
curMax = v
}
}
return curMax
}
// sum implements the summation mathematical function over a slice of float64
func sum(values []float64) float64 {
res := 0.0
for _, v := range values {
res += v
}
return res
}

View File

@ -0,0 +1,112 @@
package collector
import (
"testing"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func compareMetricsGetter(t *testing.T, first, second *JSONPathMetricsGetter) {
require.Equal(t, first.jsonPath, second.jsonPath)
require.Equal(t, first.scheme, second.scheme)
require.Equal(t, first.path, second.path)
require.Equal(t, first.port, second.port)
}
func TestNewJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
require.NoError(t, err1)
compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath1,
scheme: "http",
path: "/metrics",
port: 9090,
}, getterNoAggregator)
configAggregator := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"aggregator": "avg",
}
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
require.NoError(t, err2)
compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath2,
scheme: "http",
path: "/metrics",
port: 9090,
aggregator: "avg",
}, getterAggregator)
configErrorJSONPath := map[string]string{
"json-key": "{}",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
require.Error(t, err3)
configErrorPort := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "a9090",
}
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
require.Error(t, err4)
}
func TestCastSlice(t *testing.T) {
res1, err1 := castSlice([]interface{}{1, 2, 3})
require.NoError(t, err1)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
require.NoError(t, err2)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
require.NoError(t, err3)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
require.Equal(t, []float64(nil), res4)
}
func TestReduce(t *testing.T) {
average, err1 := reduce([]float64{1, 2, 3}, "avg")
require.NoError(t, err1)
require.Equal(t, 2.0, average)
min, err2 := reduce([]float64{1, 2, 3}, "min")
require.NoError(t, err2)
require.Equal(t, 1.0, min)
max, err3 := reduce([]float64{1, 2, 3}, "max")
require.NoError(t, err3)
require.Equal(t, 3.0, max)
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
require.NoError(t, err4)
require.Equal(t, 6.0, sum)
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
}

View File

@ -2,6 +2,7 @@ package collector
import (
"fmt"
"net/http"
"time"
log "github.com/sirupsen/logrus"
@ -37,6 +38,7 @@ type PodCollector struct {
metricType autoscalingv2.MetricSourceType
interval time.Duration
logger *log.Entry
httpClient *http.Client
}
type PodMetricsGetter interface {

View File

@ -1,247 +0,0 @@
package provider
import (
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscaling "k8s.io/api/autoscaling/v2beta2"
core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/conversion"
)
// from: https://github.com/kubernetes/kubernetes/blob/v1.14.4/pkg/apis/autoscaling/v2beta1/conversion.go
func Convert_autoscaling_MetricTarget_To_v2beta1_CrossVersionObjectReference(in *autoscaling.MetricTarget, out *autoscalingv2beta1.CrossVersionObjectReference, s conversion.Scope) error {
return nil
}
func Convert_v2beta1_CrossVersionObjectReference_To_autoscaling_MetricTarget(in *autoscalingv2beta1.CrossVersionObjectReference, out *autoscaling.MetricTarget, s conversion.Scope) error {
return nil
}
func Convert_v2beta1_ResourceMetricStatus_To_autoscaling_ResourceMetricStatus(in *autoscalingv2beta1.ResourceMetricStatus, out *autoscaling.ResourceMetricStatus, s conversion.Scope) error {
out.Name = core.ResourceName(in.Name)
utilization := in.CurrentAverageUtilization
averageValue := in.CurrentAverageValue
out.Current = autoscaling.MetricValueStatus{
AverageValue: &averageValue,
AverageUtilization: utilization,
}
return nil
}
func Convert_autoscaling_ResourceMetricStatus_To_v2beta1_ResourceMetricStatus(in *autoscaling.ResourceMetricStatus, out *autoscalingv2beta1.ResourceMetricStatus, s conversion.Scope) error {
out.Name = v1.ResourceName(in.Name)
out.CurrentAverageUtilization = in.Current.AverageUtilization
if in.Current.AverageValue != nil {
out.CurrentAverageValue = *in.Current.AverageValue
}
return nil
}
func Convert_v2beta1_ResourceMetricSource_To_autoscaling_ResourceMetricSource(in *autoscalingv2beta1.ResourceMetricSource, out *autoscaling.ResourceMetricSource, s conversion.Scope) error {
out.Name = core.ResourceName(in.Name)
utilization := in.TargetAverageUtilization
averageValue := in.TargetAverageValue
var metricType autoscaling.MetricTargetType
if utilization == nil {
metricType = autoscaling.AverageValueMetricType
} else {
metricType = autoscaling.UtilizationMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
AverageValue: averageValue,
AverageUtilization: utilization,
}
return nil
}
func Convert_autoscaling_ResourceMetricSource_To_v2beta1_ResourceMetricSource(in *autoscaling.ResourceMetricSource, out *autoscalingv2beta1.ResourceMetricSource, s conversion.Scope) error {
out.Name = v1.ResourceName(in.Name)
out.TargetAverageUtilization = in.Target.AverageUtilization
out.TargetAverageValue = in.Target.AverageValue
return nil
}
func Convert_autoscaling_ExternalMetricSource_To_v2beta1_ExternalMetricSource(in *autoscaling.ExternalMetricSource, out *autoscalingv2beta1.ExternalMetricSource, s conversion.Scope) error {
out.MetricName = in.Metric.Name
out.TargetValue = in.Target.Value
out.TargetAverageValue = in.Target.AverageValue
out.MetricSelector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ExternalMetricSource_To_autoscaling_ExternalMetricSource(in *autoscalingv2beta1.ExternalMetricSource, out *autoscaling.ExternalMetricSource, s conversion.Scope) error {
value := in.TargetValue
averageValue := in.TargetAverageValue
var metricType autoscaling.MetricTargetType
if value == nil {
metricType = autoscaling.AverageValueMetricType
} else {
metricType = autoscaling.ValueMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: value,
AverageValue: averageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.MetricSelector,
}
return nil
}
func Convert_autoscaling_ObjectMetricSource_To_v2beta1_ObjectMetricSource(in *autoscaling.ObjectMetricSource, out *autoscalingv2beta1.ObjectMetricSource, s conversion.Scope) error {
if in.Target.Value != nil {
out.TargetValue = *in.Target.Value
}
out.AverageValue = in.Target.AverageValue
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
Kind: in.DescribedObject.Kind,
Name: in.DescribedObject.Name,
APIVersion: in.DescribedObject.APIVersion,
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ObjectMetricSource_To_autoscaling_ObjectMetricSource(in *autoscalingv2beta1.ObjectMetricSource, out *autoscaling.ObjectMetricSource, s conversion.Scope) error {
var metricType autoscaling.MetricTargetType
if in.AverageValue == nil {
metricType = autoscaling.ValueMetricType
} else {
metricType = autoscaling.AverageValueMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: &in.TargetValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_PodsMetricSource_To_v2beta1_PodsMetricSource(in *autoscaling.PodsMetricSource, out *autoscalingv2beta1.PodsMetricSource, s conversion.Scope) error {
if in.Target.AverageValue != nil {
targetAverageValue := *in.Target.AverageValue
out.TargetAverageValue = targetAverageValue
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_PodsMetricSource_To_autoscaling_PodsMetricSource(in *autoscalingv2beta1.PodsMetricSource, out *autoscaling.PodsMetricSource, s conversion.Scope) error {
targetAverageValue := &in.TargetAverageValue
var metricType autoscaling.MetricTargetType
metricType = autoscaling.AverageValueMetricType
out.Target = autoscaling.MetricTarget{
Type: metricType,
AverageValue: targetAverageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_ExternalMetricStatus_To_v2beta1_ExternalMetricStatus(in *autoscaling.ExternalMetricStatus, out *autoscalingv2beta1.ExternalMetricStatus, s conversion.Scope) error {
if &in.Current.AverageValue != nil {
out.CurrentAverageValue = in.Current.AverageValue
}
out.MetricName = in.Metric.Name
if in.Current.Value != nil {
out.CurrentValue = *in.Current.Value
}
out.MetricSelector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ExternalMetricStatus_To_autoscaling_ExternalMetricStatus(in *autoscalingv2beta1.ExternalMetricStatus, out *autoscaling.ExternalMetricStatus, s conversion.Scope) error {
value := in.CurrentValue
averageValue := in.CurrentAverageValue
out.Current = autoscaling.MetricValueStatus{
Value: &value,
AverageValue: averageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.MetricSelector,
}
return nil
}
func Convert_autoscaling_ObjectMetricStatus_To_v2beta1_ObjectMetricStatus(in *autoscaling.ObjectMetricStatus, out *autoscalingv2beta1.ObjectMetricStatus, s conversion.Scope) error {
if in.Current.Value != nil {
out.CurrentValue = *in.Current.Value
}
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
Kind: in.DescribedObject.Kind,
Name: in.DescribedObject.Name,
APIVersion: in.DescribedObject.APIVersion,
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
if in.Current.AverageValue != nil {
currentAverageValue := *in.Current.AverageValue
out.AverageValue = &currentAverageValue
}
return nil
}
func Convert_v2beta1_ObjectMetricStatus_To_autoscaling_ObjectMetricStatus(in *autoscalingv2beta1.ObjectMetricStatus, out *autoscaling.ObjectMetricStatus, s conversion.Scope) error {
out.Current = autoscaling.MetricValueStatus{
Value: &in.CurrentValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_PodsMetricStatus_To_v2beta1_PodsMetricStatus(in *autoscaling.PodsMetricStatus, out *autoscalingv2beta1.PodsMetricStatus, s conversion.Scope) error {
if in.Current.AverageValue != nil {
out.CurrentAverageValue = *in.Current.AverageValue
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_PodsMetricStatus_To_autoscaling_PodsMetricStatus(in *autoscalingv2beta1.PodsMetricStatus, out *autoscaling.PodsMetricStatus, s conversion.Scope) error {
out.Current = autoscaling.MetricValueStatus{
AverageValue: &in.CurrentAverageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@ package provider
import (
"context"
"errors"
"reflect"
"sync"
"time"
@ -49,16 +50,17 @@ var (
// HPAProvider is a base provider for initializing metric collectors based on
// HPA resources.
type HPAProvider struct {
client kubernetes.Interface
interval time.Duration
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
logger *log.Entry
client kubernetes.Interface
interval time.Duration
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
logger *log.Entry
disregardIncompatibleHPAs bool
}
// metricCollection is a container for sending collected metrics across a
@ -69,7 +71,7 @@ type metricCollection struct {
}
// NewHPAProvider initializes a new HPAProvider.
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider {
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
metricsc := make(chan metricCollection)
return &HPAProvider{
@ -80,9 +82,10 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
metricStore: NewMetricStore(func() time.Time {
return time.Now().UTC().Add(15 * time.Minute)
}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
}
}
@ -116,7 +119,7 @@ func (p *HPAProvider) Run(ctx context.Context) {
func (p *HPAProvider) updateHPAs() error {
p.logger.Info("Looking for HPAs")
hpas, err := p.client.AutoscalingV2beta1().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
hpas, err := p.client.AutoscalingV2beta2().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
return err
}
@ -125,15 +128,7 @@ func (p *HPAProvider) updateHPAs() error {
newHPAs := 0
for _, hpav1 := range hpas.Items {
hpav1 := hpav1
hpa := autoscalingv2.HorizontalPodAutoscaler{}
err := Convert_v2beta1_HorizontalPodAutoscaler_To_autoscaling_HorizontalPodAutoscaler(&hpav1, &hpa, nil)
if err != nil {
p.logger.Errorf("Failed to convert HPA to v2beta2: %v", err)
continue
}
for _, hpa := range hpas.Items {
resourceRef := resourceReference{
Name: hpa.Name,
Namespace: hpa.Namespace,
@ -162,15 +157,20 @@ func (p *HPAProvider) updateHPAs() error {
interval = p.collectorInterval
}
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval)
c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
if err != nil {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
// Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
}
cache = false
continue
}
p.logger.Infof("Adding new metrics collector: %T", collector)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector)
p.logger.Infof("Adding new metrics collector: %T", c)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)
}
newHPAs++

View File

@ -7,8 +7,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
autoscalingv1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
autoscaling "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
@ -16,7 +15,7 @@ import (
type mockCollectorPlugin struct{}
func (m mockCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
func (m mockCollectorPlugin) NewCollector(hpa *autoscaling.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
return mockCollector{}, nil
}
@ -33,7 +32,7 @@ func (c mockCollector) Interval() time.Duration {
func TestUpdateHPAs(t *testing.T) {
value := resource.MustParse("1k")
hpa := &autoscalingv1.HorizontalPodAutoscaler{
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
@ -43,20 +42,25 @@ func TestUpdateHPAs(t *testing.T) {
"metric-config.pods.requests-per-second.json-path/port": "9090",
},
},
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscalingv1.MetricSpec{
Metrics: []autoscaling.MetricSpec{
{
Type: autoscalingv1.PodsMetricSourceType,
Pods: &autoscalingv1.PodsMetricSource{
MetricName: "requests-per-second",
TargetAverageValue: value,
Type: autoscaling.PodsMetricSourceType,
Pods: &autoscaling.PodsMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "requests-per-second",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
@ -66,14 +70,14 @@ func TestUpdateHPAs(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
var err error
hpa, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Create(hpa)
hpa, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
@ -82,7 +86,7 @@ func TestUpdateHPAs(t *testing.T) {
// update HPA
hpa.Annotations["metric-config.pods.requests-per-second.json-path/port"] = "8080"
_, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Update(hpa)
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Update(hpa)
require.NoError(t, err)
err = provider.updateHPAs()
@ -90,3 +94,56 @@ func TestUpdateHPAs(t *testing.T) {
require.Len(t, provider.collectorScheduler.table, 1)
}
func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
// Test HPAProvider with disregardIncompatibleHPAs = true
value := resource.MustParse("1k")
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
Annotations: map[string]string{},
},
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscaling.MetricSpec{
{
Type: autoscaling.ExternalMetricSourceType,
External: &autoscaling.ExternalMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "some-other-metric",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
},
}
fakeClient := fake.NewSimpleClientset()
var err error
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
require.NoError(t, err)
}

View File

@ -88,6 +88,12 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"whether to enable External Metrics API")
flags.StringVar(&o.PrometheusServer, "prometheus-server", o.PrometheusServer, ""+
"url of prometheus server to query")
flags.StringVar(&o.InfluxDBAddress, "influxdb-address", o.InfluxDBAddress, ""+
"address of InfluxDB 2.x server to query (e.g. http://localhost:9999)")
flags.StringVar(&o.InfluxDBToken, "influxdb-token", o.InfluxDBToken, ""+
"token for InfluxDB 2.x server to query")
flags.StringVar(&o.InfluxDBOrgID, "influxdb-org-id", o.InfluxDBOrgID, ""+
"organization ID for InfluxDB 2.x server to query")
flags.StringVar(&o.ZMONKariosDBEndpoint, "zmon-kariosdb-endpoint", o.ZMONKariosDBEndpoint, ""+
"url of ZMON KariosDB endpoint to query for ZMON checks")
flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+
@ -104,7 +110,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"whether to enable AWS external metrics")
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
"disregard failing to create collectors for incompatible HPAs")
return cmd
}
@ -175,6 +182,14 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
}
}
if o.InfluxDBAddress != "" {
influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrgID)
if err != nil {
return fmt.Errorf("failed to initialize InfluxDB collector plugin: %v", err)
}
collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
}
// register generic pod collector
err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
if err != nil {
@ -214,7 +229,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
}
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory)
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
go hpaProvider.Run(ctx)
@ -290,6 +305,12 @@ type AdapterServerOptions struct {
// PrometheusServer enables prometheus queries to the specified
// server
PrometheusServer string
// InfluxDBAddress enables Flux queries to the specified InfluxDB instance
InfluxDBAddress string
// InfluxDBToken is the token used for querying InfluxDB
InfluxDBToken string
// InfluxDBOrgID is the organization ID used for querying InfluxDB
InfluxDBOrgID string
// ZMONKariosDBEndpoint enables ZMON check queries to the specified
// kariosDB endpoint
ZMONKariosDBEndpoint string
@ -312,4 +333,7 @@ type AdapterServerOptions struct {
MetricsAddress string
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
SkipperBackendWeightAnnotation []string
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
// kube-metrics-adapter beside another Metrics Provider
DisregardIncompatibleHPAs bool
}