Compare commits

..

12 Commits

Author SHA1 Message Date
Arjun
c9fa15c7d4 Updated the tests (#103)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-02-04 09:48:50 +01:00
Arjun
e3330dcf43 Reuse the HTTP client for scraping pods (#102)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-01-30 17:49:22 +01:00
Tomás Pinho
8e4662b26c Permit disregarding incompatible HPAs (#95)
* This commit adds a --disregard-incompatible-hpas that makes the HPA
provider stop erroring out when a collector cannot be created for a
metric in a HPA. Useful when kube-metrics-adapter runs alongside another
metrics provider. Fixes issue #94.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Make tests pass

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Wraps the Plugin Not Found error in a new type that can be checked by the caller of a function to determine if its contents should be logged or added as an event to the HPA, when this HPA is incompatible.
The disregardIncompatibleHPAs is now targetting only the log or addition of the same event.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Invert if expression to select when we should log
CreateNewMetricsCollector errors: don't log when both conditions are true - it's not a PluginNotFoundError
and disregardIncompatibleHPAs flag is set to true. This way, if an error
is NOT PluginNotFoundError it will always be logged, and when it IS
PluginNotFoundError it will only be logged when
disregardIncompatibleHPAs is false.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Remove redundant "whether to"

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add test case for updating HPAs via HPA Provider while disregarding
incompatible HPAs.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2020-01-30 11:33:15 +01:00
Mikkel Oscar Lyderik Larsen
9e211b181a Merge pull request #101 from zalando-incubator/update-to-v2beta2
Only support autoscaling/v2beta2
2020-01-29 16:47:15 +01:00
Mikkel Oscar Lyderik Larsen
9d78fff1b5 Only support autoscaling/v2beta2
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2020-01-29 15:59:20 +01:00
Mikkel Oscar Lyderik Larsen
1c6f9e2ea6 Merge pull request #100 from affo/feat/influxdb-collector
feat(collector): add InfluxDB collector
2020-01-24 10:56:52 +01:00
Lorenzo Affetti
c0eda7cd1e adding tests for collector creation
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-24 09:54:35 +01:00
Lorenzo Affetti
75f3e48f70 address szuecs review
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-24 09:07:56 +01:00
Lorenzo Affetti
5b55bea994 feat(collector): add InfluxDB collector
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-22 10:09:29 +01:00
Nick Jüttner
4412e3dca4 Merge pull request #92 from zalando-incubator/njuettner-patch
Updating golangci
2019-11-26 14:49:02 +01:00
Mikkel Oscar Lyderik Larsen
8f9277258c Increase timeout for golangci-lint
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-11-25 11:36:18 +01:00
njuettner
8c3fef45fd Updating golangci
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
Signed-off-by: Nick Jüttner <nick@zalando.de>
2019-11-25 10:56:22 +01:00
20 changed files with 793 additions and 927 deletions

View File

@@ -1,7 +1,4 @@
run:
skip-files:
- "pkg/provider/generated.conversion.go"
- "pkg/provider/conversion.go"
linters-settings:
golint:
min-confidence: 0.9

View File

@@ -5,7 +5,7 @@ go:
- "1.13.x"
env:
- GO111MODULE=on GOLANGCI_RELEASE="v1.16.0"
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"
before_install:
- GO111MODULE=off go get github.com/mattn/goveralls
@@ -13,8 +13,8 @@ before_install:
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
script:
- make check
- make test
- make build.docker
- make check
- roveralls
- goveralls -v -coverprofile=roveralls.coverprofile -service=travis-ci

View File

@@ -19,7 +19,8 @@ test:
go test -v $(GOPKGS)
check:
golangci-lint run ./...
go mod download
golangci-lint run --timeout=2m ./...
build.local: build/$(BINARY)
build.linux: build/linux/$(BINARY)

197
README.md
View File

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
apiVersion: autoscaling/v2beta1
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: custom-metrics-consumer
@@ -25,25 +25,36 @@ spec:
# - type: Resource
# resource:
# name: cpu
# targetAverageUtilization: 50
# current:
# averageUtilization: 50
- type: Pods
pods:
metricName: queue-length
targetAverageValue: 1k
metric:
name: queue-length
target:
averageValue: 1k
type: AverageValue
- type: Object
object:
metricName: requests-per-second
target:
describedObject:
apiVersion: extensions/v1beta1
kind: Ingress
name: custom-metrics-consumer
averageValue: 10
targetValue: 10 # this must be set, but has no effect if `averageValue` is defined.
metric:
name: requests-per-second
target:
averageValue: "10"
type: AverageValue
- type: External
external:
metricName: sqs-queue-length
metricSelector:
matchLabels:
queue-name: foobar
region: eu-central-1
targetAverageValue: 30
metric:
name: sqs-queue-length
selector:
matchLabels:
queue-name: foobar
region: eu-central-1
target:
averageValue: "30"
type: AverageValue

3
go.mod
View File

@@ -7,8 +7,7 @@ require (
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/influxdb-client-go v0.1.4
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/prometheus/client_golang v0.9.2

132
go.sum
View File

File diff suppressed because it is too large Load Diff

View File

@@ -18,6 +18,7 @@ package main
import (
"flag"
_ "net/http/pprof"
"os"
"runtime"

View File

@@ -65,6 +65,27 @@ func TestParser(t *testing.T) {
},
PerReplica: false,
},
{
Name: "influxdb metrics",
Annotations: map[string]string{
"metric-config.external.flux-query.influxdb/range1m": `from(bucket: "?") |> range(start: -1m)`,
"metric-config.external.flux-query.influxdb/range2m": `from(bucket: "?") |> range(start: -2m)`,
"metric-config.external.flux-query.influxdb/range3m": `from(bucket: "?") |> range(start: -3m)`,
"metric-config.external.flux-query.influxdb/address": "http://localhost:9999",
"metric-config.external.flux-query.influxdb/token": "sEcr3TT0ken",
"metric-config.external.flux-query.influxdb/org-id": "deadbeef",
},
MetricName: "flux-query",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef",
},
},
} {
t.Run(tc.Name, func(t *testing.T) {
hpaMap := make(AnnotationConfigMap)

View File

@@ -46,6 +46,14 @@ type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
}
type PluginNotFoundError struct {
metricTypeName MetricTypeName
}
func (p *PluginNotFoundError) Error() string {
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
}
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
if metricCollector == "" {
c.podsPlugins.Any = plugin
@@ -139,7 +147,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
}
}
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
}
type MetricTypeName struct {

View File

@@ -0,0 +1,152 @@
package collector
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb-client-go"
"k8s.io/api/autoscaling/v2beta2"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
InfluxDBMetricName = "flux-query"
influxDBAddressKey = "address"
influxDBTokenKey = "token"
influxDBOrgIDKey = "org-id"
influxDBQueryNameLabelKey = "query-name"
)
type InfluxDBCollectorPlugin struct {
kubeClient kubernetes.Interface
address string
token string
orgID string
}
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, orgID string) (*InfluxDBCollectorPlugin, error) {
return &InfluxDBCollectorPlugin{
kubeClient: client,
address: address,
token: token,
orgID: orgID,
}, nil
}
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(p.address, p.orgID, p.token, config, interval)
}
type InfluxDBCollector struct {
address string
token string
orgID string
influxDBClient *influxdb.Client
interval time.Duration
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
query string
}
func NewInfluxDBCollector(address string, token string, orgID string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
collector := &InfluxDBCollector{
interval: interval,
metric: config.Metric,
metricType: config.Type,
}
switch configType := config.Type; configType {
case autoscalingv2.ObjectMetricSourceType:
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
case autoscalingv2.ExternalMetricSourceType:
// `metricSelector` is flattened into the MetricConfig.Config.
queryName, ok := config.Config[influxDBQueryNameLabelKey]
if !ok {
return nil, fmt.Errorf("selector for Flux query is not specified, "+
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
}
if query, ok := config.Config[queryName]; ok {
// TODO(affo): validate the query once this is done:
// https://github.com/influxdata/influxdb-client-go/issues/73.
collector.query = query
} else {
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
}
default:
return nil, fmt.Errorf("unknown metric type: %v", configType)
}
// Use custom InfluxDB config if defined in HPA annotation.
if v, ok := config.Config[influxDBAddressKey]; ok {
address = v
}
if v, ok := config.Config[influxDBTokenKey]; ok {
token = v
}
if v, ok := config.Config[influxDBOrgIDKey]; ok {
orgID = v
}
influxDbClient, err := influxdb.New(address, token)
if err != nil {
return nil, err
}
collector.address = address
collector.token = token
collector.orgID = orgID
collector.influxDBClient = influxDbClient
return collector, nil
}
// queryResult is for unmarshaling the result from InfluxDB.
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
type queryResult struct {
MetricValue float64
}
// getValue returns the first result gathered from an InfluxDB instance.
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.orgID)
if err != nil {
return resource.Quantity{}, err
}
defer res.Close()
// Keeping just the first result.
if res.Next() {
qr := queryResult{}
if err := res.Unmarshal(&qr); err != nil {
return resource.Quantity{}, fmt.Errorf("error in unmarshaling query result: %v", err)
}
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
}
if err := res.Err; err != nil {
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
}
return resource.Quantity{}, fmt.Errorf("empty result returned")
}
func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.getValue()
if err != nil {
return nil, err
}
cm := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{
Time: time.Now().UTC(),
},
Value: v,
},
}
return []CollectedMetric{cm}, nil
}
func (c *InfluxDBCollector) Interval() time.Duration {
return c.interval
}

View File

@@ -0,0 +1,155 @@
package collector
import (
"strings"
"testing"
"time"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestInfluxDBCollector_New(t *testing.T) {
t.Run("simple", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
// This is actually useless, because the selector should be flattened in Config when parsing.
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "range2m",
},
}
c, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "secret"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -2m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
t.Run("override params", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef1234",
"query-name": "range3m",
},
}
c, err := NewInfluxDBCollector("http://localhost:8888", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef1234"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "sEcr3TT0ken"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -3m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
// Errors.
for _, tc := range []struct {
name string
mTypeName MetricTypeName
config map[string]string
errorStartsWith string
}{
{
name: "object metric",
mTypeName: MetricTypeName{
Type: v2beta2.ObjectMetricSourceType,
},
errorStartsWith: "InfluxDB does not support object",
},
{
name: "no selector",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
// The selector should be flattened into the config by the parsing step, but it isn't.
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
},
errorStartsWith: "selector for Flux query is not specified",
},
{
name: "referencing non-existing query",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "rangeXm",
},
errorStartsWith: "no Flux query defined for metric",
},
} {
t.Run("error - "+tc.name, func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: tc.mTypeName,
CollectorName: "influxdb",
Config: tc.config,
}
_, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err == nil {
t.Fatal("expected error got none")
}
if want, got := tc.errorStartsWith, err.Error(); !strings.HasPrefix(got, want) {
t.Fatalf("%s should start with %s", got, want)
}
})
}
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"strconv"
@@ -23,11 +24,13 @@ type JSONPathMetricsGetter struct {
path string
port int
aggregator string
client *http.Client
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
getter := &JSONPathMetricsGetter{}
httpClient := defaultHTTPClient()
getter := &JSONPathMetricsGetter{client: httpClient}
if v, ok := config["json-key"]; ok {
path, err := jsonpath.Compile(v)
@@ -61,11 +64,26 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
return getter, nil
}
func defaultHTTPClient() *http.Client {
client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 15 * time.Second,
}).DialContext,
MaxIdleConns: 50,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 15 * time.Second,
}
return client
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
data, err := g.getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil {
return 0, err
}
@@ -122,16 +140,11 @@ func castSlice(in []interface{}) ([]float64, error) {
}
// getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
func (g *JSONPathMetricsGetter) getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
}
httpClient := &http.Client{
Timeout: 15 * time.Second,
Transport: &http.Transport{},
}
if scheme == "" {
scheme = "http"
}
@@ -147,7 +160,7 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
return nil, err
}
resp, err := httpClient.Do(request)
resp, err := g.client.Do(request)
if err != nil {
return nil, err
}

View File

@@ -7,6 +7,13 @@ import (
"github.com/stretchr/testify/require"
)
func compareMetricsGetter(t *testing.T, first, second *JSONPathMetricsGetter) {
require.Equal(t, first.jsonPath, second.jsonPath)
require.Equal(t, first.scheme, second.scheme)
require.Equal(t, first.path, second.path)
require.Equal(t, first.port, second.port)
}
func TestNewJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
@@ -18,7 +25,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
require.NoError(t, err1)
require.Equal(t, &JSONPathMetricsGetter{
compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath1,
scheme: "http",
path: "/metrics",
@@ -36,7 +43,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
require.NoError(t, err2)
require.Equal(t, &JSONPathMetricsGetter{
compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath2,
scheme: "http",
path: "/metrics",

View File

@@ -2,6 +2,7 @@ package collector
import (
"fmt"
"net/http"
"time"
log "github.com/sirupsen/logrus"
@@ -37,6 +38,7 @@ type PodCollector struct {
metricType autoscalingv2.MetricSourceType
interval time.Duration
logger *log.Entry
httpClient *http.Client
}
type PodMetricsGetter interface {

View File

@@ -1,247 +0,0 @@
package provider
import (
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscaling "k8s.io/api/autoscaling/v2beta2"
core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/conversion"
)
// from: https://github.com/kubernetes/kubernetes/blob/v1.14.4/pkg/apis/autoscaling/v2beta1/conversion.go
func Convert_autoscaling_MetricTarget_To_v2beta1_CrossVersionObjectReference(in *autoscaling.MetricTarget, out *autoscalingv2beta1.CrossVersionObjectReference, s conversion.Scope) error {
return nil
}
func Convert_v2beta1_CrossVersionObjectReference_To_autoscaling_MetricTarget(in *autoscalingv2beta1.CrossVersionObjectReference, out *autoscaling.MetricTarget, s conversion.Scope) error {
return nil
}
func Convert_v2beta1_ResourceMetricStatus_To_autoscaling_ResourceMetricStatus(in *autoscalingv2beta1.ResourceMetricStatus, out *autoscaling.ResourceMetricStatus, s conversion.Scope) error {
out.Name = core.ResourceName(in.Name)
utilization := in.CurrentAverageUtilization
averageValue := in.CurrentAverageValue
out.Current = autoscaling.MetricValueStatus{
AverageValue: &averageValue,
AverageUtilization: utilization,
}
return nil
}
func Convert_autoscaling_ResourceMetricStatus_To_v2beta1_ResourceMetricStatus(in *autoscaling.ResourceMetricStatus, out *autoscalingv2beta1.ResourceMetricStatus, s conversion.Scope) error {
out.Name = v1.ResourceName(in.Name)
out.CurrentAverageUtilization = in.Current.AverageUtilization
if in.Current.AverageValue != nil {
out.CurrentAverageValue = *in.Current.AverageValue
}
return nil
}
func Convert_v2beta1_ResourceMetricSource_To_autoscaling_ResourceMetricSource(in *autoscalingv2beta1.ResourceMetricSource, out *autoscaling.ResourceMetricSource, s conversion.Scope) error {
out.Name = core.ResourceName(in.Name)
utilization := in.TargetAverageUtilization
averageValue := in.TargetAverageValue
var metricType autoscaling.MetricTargetType
if utilization == nil {
metricType = autoscaling.AverageValueMetricType
} else {
metricType = autoscaling.UtilizationMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
AverageValue: averageValue,
AverageUtilization: utilization,
}
return nil
}
func Convert_autoscaling_ResourceMetricSource_To_v2beta1_ResourceMetricSource(in *autoscaling.ResourceMetricSource, out *autoscalingv2beta1.ResourceMetricSource, s conversion.Scope) error {
out.Name = v1.ResourceName(in.Name)
out.TargetAverageUtilization = in.Target.AverageUtilization
out.TargetAverageValue = in.Target.AverageValue
return nil
}
func Convert_autoscaling_ExternalMetricSource_To_v2beta1_ExternalMetricSource(in *autoscaling.ExternalMetricSource, out *autoscalingv2beta1.ExternalMetricSource, s conversion.Scope) error {
out.MetricName = in.Metric.Name
out.TargetValue = in.Target.Value
out.TargetAverageValue = in.Target.AverageValue
out.MetricSelector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ExternalMetricSource_To_autoscaling_ExternalMetricSource(in *autoscalingv2beta1.ExternalMetricSource, out *autoscaling.ExternalMetricSource, s conversion.Scope) error {
value := in.TargetValue
averageValue := in.TargetAverageValue
var metricType autoscaling.MetricTargetType
if value == nil {
metricType = autoscaling.AverageValueMetricType
} else {
metricType = autoscaling.ValueMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: value,
AverageValue: averageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.MetricSelector,
}
return nil
}
func Convert_autoscaling_ObjectMetricSource_To_v2beta1_ObjectMetricSource(in *autoscaling.ObjectMetricSource, out *autoscalingv2beta1.ObjectMetricSource, s conversion.Scope) error {
if in.Target.Value != nil {
out.TargetValue = *in.Target.Value
}
out.AverageValue = in.Target.AverageValue
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
Kind: in.DescribedObject.Kind,
Name: in.DescribedObject.Name,
APIVersion: in.DescribedObject.APIVersion,
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ObjectMetricSource_To_autoscaling_ObjectMetricSource(in *autoscalingv2beta1.ObjectMetricSource, out *autoscaling.ObjectMetricSource, s conversion.Scope) error {
var metricType autoscaling.MetricTargetType
if in.AverageValue == nil {
metricType = autoscaling.ValueMetricType
} else {
metricType = autoscaling.AverageValueMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: &in.TargetValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_PodsMetricSource_To_v2beta1_PodsMetricSource(in *autoscaling.PodsMetricSource, out *autoscalingv2beta1.PodsMetricSource, s conversion.Scope) error {
if in.Target.AverageValue != nil {
targetAverageValue := *in.Target.AverageValue
out.TargetAverageValue = targetAverageValue
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_PodsMetricSource_To_autoscaling_PodsMetricSource(in *autoscalingv2beta1.PodsMetricSource, out *autoscaling.PodsMetricSource, s conversion.Scope) error {
targetAverageValue := &in.TargetAverageValue
var metricType autoscaling.MetricTargetType
metricType = autoscaling.AverageValueMetricType
out.Target = autoscaling.MetricTarget{
Type: metricType,
AverageValue: targetAverageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_ExternalMetricStatus_To_v2beta1_ExternalMetricStatus(in *autoscaling.ExternalMetricStatus, out *autoscalingv2beta1.ExternalMetricStatus, s conversion.Scope) error {
if &in.Current.AverageValue != nil {
out.CurrentAverageValue = in.Current.AverageValue
}
out.MetricName = in.Metric.Name
if in.Current.Value != nil {
out.CurrentValue = *in.Current.Value
}
out.MetricSelector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ExternalMetricStatus_To_autoscaling_ExternalMetricStatus(in *autoscalingv2beta1.ExternalMetricStatus, out *autoscaling.ExternalMetricStatus, s conversion.Scope) error {
value := in.CurrentValue
averageValue := in.CurrentAverageValue
out.Current = autoscaling.MetricValueStatus{
Value: &value,
AverageValue: averageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.MetricSelector,
}
return nil
}
func Convert_autoscaling_ObjectMetricStatus_To_v2beta1_ObjectMetricStatus(in *autoscaling.ObjectMetricStatus, out *autoscalingv2beta1.ObjectMetricStatus, s conversion.Scope) error {
if in.Current.Value != nil {
out.CurrentValue = *in.Current.Value
}
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
Kind: in.DescribedObject.Kind,
Name: in.DescribedObject.Name,
APIVersion: in.DescribedObject.APIVersion,
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
if in.Current.AverageValue != nil {
currentAverageValue := *in.Current.AverageValue
out.AverageValue = &currentAverageValue
}
return nil
}
func Convert_v2beta1_ObjectMetricStatus_To_autoscaling_ObjectMetricStatus(in *autoscalingv2beta1.ObjectMetricStatus, out *autoscaling.ObjectMetricStatus, s conversion.Scope) error {
out.Current = autoscaling.MetricValueStatus{
Value: &in.CurrentValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_PodsMetricStatus_To_v2beta1_PodsMetricStatus(in *autoscaling.PodsMetricStatus, out *autoscalingv2beta1.PodsMetricStatus, s conversion.Scope) error {
if in.Current.AverageValue != nil {
out.CurrentAverageValue = *in.Current.AverageValue
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_PodsMetricStatus_To_autoscaling_PodsMetricStatus(in *autoscalingv2beta1.PodsMetricStatus, out *autoscaling.PodsMetricStatus, s conversion.Scope) error {
out.Current = autoscaling.MetricValueStatus{
AverageValue: &in.CurrentAverageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}

View File

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ package provider
import (
"context"
"errors"
"reflect"
"sync"
"time"
@@ -49,16 +50,17 @@ var (
// HPAProvider is a base provider for initializing metric collectors based on
// HPA resources.
type HPAProvider struct {
client kubernetes.Interface
interval time.Duration
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
logger *log.Entry
client kubernetes.Interface
interval time.Duration
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
logger *log.Entry
disregardIncompatibleHPAs bool
}
// metricCollection is a container for sending collected metrics across a
@@ -69,7 +71,7 @@ type metricCollection struct {
}
// NewHPAProvider initializes a new HPAProvider.
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider {
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
metricsc := make(chan metricCollection)
return &HPAProvider{
@@ -80,9 +82,10 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
metricStore: NewMetricStore(func() time.Time {
return time.Now().UTC().Add(15 * time.Minute)
}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
}
}
@@ -116,7 +119,7 @@ func (p *HPAProvider) Run(ctx context.Context) {
func (p *HPAProvider) updateHPAs() error {
p.logger.Info("Looking for HPAs")
hpas, err := p.client.AutoscalingV2beta1().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
hpas, err := p.client.AutoscalingV2beta2().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
return err
}
@@ -125,15 +128,7 @@ func (p *HPAProvider) updateHPAs() error {
newHPAs := 0
for _, hpav1 := range hpas.Items {
hpav1 := hpav1
hpa := autoscalingv2.HorizontalPodAutoscaler{}
err := Convert_v2beta1_HorizontalPodAutoscaler_To_autoscaling_HorizontalPodAutoscaler(&hpav1, &hpa, nil)
if err != nil {
p.logger.Errorf("Failed to convert HPA to v2beta2: %v", err)
continue
}
for _, hpa := range hpas.Items {
resourceRef := resourceReference{
Name: hpa.Name,
Namespace: hpa.Namespace,
@@ -162,15 +157,20 @@ func (p *HPAProvider) updateHPAs() error {
interval = p.collectorInterval
}
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval)
c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
if err != nil {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
// Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
}
cache = false
continue
}
p.logger.Infof("Adding new metrics collector: %T", collector)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector)
p.logger.Infof("Adding new metrics collector: %T", c)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)
}
newHPAs++

View File

@@ -7,8 +7,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
autoscalingv1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
autoscaling "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
@@ -16,7 +15,7 @@ import (
type mockCollectorPlugin struct{}
func (m mockCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
func (m mockCollectorPlugin) NewCollector(hpa *autoscaling.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
return mockCollector{}, nil
}
@@ -33,7 +32,7 @@ func (c mockCollector) Interval() time.Duration {
func TestUpdateHPAs(t *testing.T) {
value := resource.MustParse("1k")
hpa := &autoscalingv1.HorizontalPodAutoscaler{
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
@@ -43,20 +42,25 @@ func TestUpdateHPAs(t *testing.T) {
"metric-config.pods.requests-per-second.json-path/port": "9090",
},
},
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscalingv1.MetricSpec{
Metrics: []autoscaling.MetricSpec{
{
Type: autoscalingv1.PodsMetricSourceType,
Pods: &autoscalingv1.PodsMetricSource{
MetricName: "requests-per-second",
TargetAverageValue: value,
Type: autoscaling.PodsMetricSourceType,
Pods: &autoscaling.PodsMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "requests-per-second",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
@@ -66,14 +70,14 @@ func TestUpdateHPAs(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
var err error
hpa, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Create(hpa)
hpa, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
@@ -82,7 +86,7 @@ func TestUpdateHPAs(t *testing.T) {
// update HPA
hpa.Annotations["metric-config.pods.requests-per-second.json-path/port"] = "8080"
_, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Update(hpa)
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Update(hpa)
require.NoError(t, err)
err = provider.updateHPAs()
@@ -90,3 +94,56 @@ func TestUpdateHPAs(t *testing.T) {
require.Len(t, provider.collectorScheduler.table, 1)
}
func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
// Test HPAProvider with disregardIncompatibleHPAs = true
value := resource.MustParse("1k")
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
Annotations: map[string]string{},
},
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscaling.MetricSpec{
{
Type: autoscaling.ExternalMetricSourceType,
External: &autoscaling.ExternalMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "some-other-metric",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
},
}
fakeClient := fake.NewSimpleClientset()
var err error
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
require.NoError(t, err)
}

View File

@@ -88,6 +88,12 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"whether to enable External Metrics API")
flags.StringVar(&o.PrometheusServer, "prometheus-server", o.PrometheusServer, ""+
"url of prometheus server to query")
flags.StringVar(&o.InfluxDBAddress, "influxdb-address", o.InfluxDBAddress, ""+
"address of InfluxDB 2.x server to query (e.g. http://localhost:9999)")
flags.StringVar(&o.InfluxDBToken, "influxdb-token", o.InfluxDBToken, ""+
"token for InfluxDB 2.x server to query")
flags.StringVar(&o.InfluxDBOrgID, "influxdb-org-id", o.InfluxDBOrgID, ""+
"organization ID for InfluxDB 2.x server to query")
flags.StringVar(&o.ZMONKariosDBEndpoint, "zmon-kariosdb-endpoint", o.ZMONKariosDBEndpoint, ""+
"url of ZMON KariosDB endpoint to query for ZMON checks")
flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+
@@ -104,7 +110,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"whether to enable AWS external metrics")
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
"disregard failing to create collectors for incompatible HPAs")
return cmd
}
@@ -175,6 +182,14 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
}
}
if o.InfluxDBAddress != "" {
influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrgID)
if err != nil {
return fmt.Errorf("failed to initialize InfluxDB collector plugin: %v", err)
}
collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
}
// register generic pod collector
err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
if err != nil {
@@ -214,7 +229,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
}
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory)
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
go hpaProvider.Run(ctx)
@@ -290,6 +305,12 @@ type AdapterServerOptions struct {
// PrometheusServer enables prometheus queries to the specified
// server
PrometheusServer string
// InfluxDBAddress enables Flux queries to the specified InfluxDB instance
InfluxDBAddress string
// InfluxDBToken is the token used for querying InfluxDB
InfluxDBToken string
// InfluxDBOrgID is the organization ID used for querying InfluxDB
InfluxDBOrgID string
// ZMONKariosDBEndpoint enables ZMON check queries to the specified
// kariosDB endpoint
ZMONKariosDBEndpoint string
@@ -312,4 +333,7 @@ type AdapterServerOptions struct {
MetricsAddress string
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
SkipperBackendWeightAnnotation []string
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
// kube-metrics-adapter beside another Metrics Provider
DisregardIncompatibleHPAs bool
}