mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-08-01 00:05:43 +00:00
Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
c9fa15c7d4 | ||
![]() |
e3330dcf43 | ||
![]() |
8e4662b26c |
1
main.go
1
main.go
@@ -18,6 +18,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
|
@@ -46,6 +46,14 @@ type CollectorPlugin interface {
|
|||||||
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PluginNotFoundError struct {
|
||||||
|
metricTypeName MetricTypeName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PluginNotFoundError) Error() string {
|
||||||
|
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
|
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
|
||||||
if metricCollector == "" {
|
if metricCollector == "" {
|
||||||
c.podsPlugins.Any = plugin
|
c.podsPlugins.Any = plugin
|
||||||
@@ -139,7 +147,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
|
return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricTypeName struct {
|
type MetricTypeName struct {
|
||||||
|
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -23,11 +24,13 @@ type JSONPathMetricsGetter struct {
|
|||||||
path string
|
path string
|
||||||
port int
|
port int
|
||||||
aggregator string
|
aggregator string
|
||||||
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||||
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
|
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
|
||||||
getter := &JSONPathMetricsGetter{}
|
httpClient := defaultHTTPClient()
|
||||||
|
getter := &JSONPathMetricsGetter{client: httpClient}
|
||||||
|
|
||||||
if v, ok := config["json-key"]; ok {
|
if v, ok := config["json-key"]; ok {
|
||||||
path, err := jsonpath.Compile(v)
|
path, err := jsonpath.Compile(v)
|
||||||
@@ -61,11 +64,26 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
|
|||||||
return getter, nil
|
return getter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func defaultHTTPClient() *http.Client {
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: 15 * time.Second,
|
||||||
|
}).DialContext,
|
||||||
|
MaxIdleConns: 50,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
|
},
|
||||||
|
Timeout: 15 * time.Second,
|
||||||
|
}
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
||||||
// endpoint and extracting the desired value using the specified json path
|
// endpoint and extracting the desired value using the specified json path
|
||||||
// query.
|
// query.
|
||||||
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
||||||
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
|
data, err := g.getPodMetrics(pod, g.scheme, g.path, g.port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -122,16 +140,11 @@ func castSlice(in []interface{}) ([]float64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getPodMetrics returns the content of the pods metrics endpoint.
|
// getPodMetrics returns the content of the pods metrics endpoint.
|
||||||
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
|
func (g *JSONPathMetricsGetter) getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
|
||||||
if pod.Status.PodIP == "" {
|
if pod.Status.PodIP == "" {
|
||||||
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
|
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
httpClient := &http.Client{
|
|
||||||
Timeout: 15 * time.Second,
|
|
||||||
Transport: &http.Transport{},
|
|
||||||
}
|
|
||||||
|
|
||||||
if scheme == "" {
|
if scheme == "" {
|
||||||
scheme = "http"
|
scheme = "http"
|
||||||
}
|
}
|
||||||
@@ -147,7 +160,7 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := httpClient.Do(request)
|
resp, err := g.client.Do(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -7,6 +7,13 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func compareMetricsGetter(t *testing.T, first, second *JSONPathMetricsGetter) {
|
||||||
|
require.Equal(t, first.jsonPath, second.jsonPath)
|
||||||
|
require.Equal(t, first.scheme, second.scheme)
|
||||||
|
require.Equal(t, first.path, second.path)
|
||||||
|
require.Equal(t, first.port, second.port)
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewJSONPathMetricsGetter(t *testing.T) {
|
func TestNewJSONPathMetricsGetter(t *testing.T) {
|
||||||
configNoAggregator := map[string]string{
|
configNoAggregator := map[string]string{
|
||||||
"json-key": "$.value",
|
"json-key": "$.value",
|
||||||
@@ -18,7 +25,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
|
|||||||
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
|
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
|
||||||
|
|
||||||
require.NoError(t, err1)
|
require.NoError(t, err1)
|
||||||
require.Equal(t, &JSONPathMetricsGetter{
|
compareMetricsGetter(t, &JSONPathMetricsGetter{
|
||||||
jsonPath: jpath1,
|
jsonPath: jpath1,
|
||||||
scheme: "http",
|
scheme: "http",
|
||||||
path: "/metrics",
|
path: "/metrics",
|
||||||
@@ -36,7 +43,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
|
|||||||
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
|
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
|
||||||
|
|
||||||
require.NoError(t, err2)
|
require.NoError(t, err2)
|
||||||
require.Equal(t, &JSONPathMetricsGetter{
|
compareMetricsGetter(t, &JSONPathMetricsGetter{
|
||||||
jsonPath: jpath2,
|
jsonPath: jpath2,
|
||||||
scheme: "http",
|
scheme: "http",
|
||||||
path: "/metrics",
|
path: "/metrics",
|
||||||
|
@@ -2,6 +2,7 @@ package collector
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -37,6 +38,7 @@ type PodCollector struct {
|
|||||||
metricType autoscalingv2.MetricSourceType
|
metricType autoscalingv2.MetricSourceType
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
|
httpClient *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
type PodMetricsGetter interface {
|
type PodMetricsGetter interface {
|
||||||
|
@@ -2,6 +2,7 @@ package provider
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -49,16 +50,17 @@ var (
|
|||||||
// HPAProvider is a base provider for initializing metric collectors based on
|
// HPAProvider is a base provider for initializing metric collectors based on
|
||||||
// HPA resources.
|
// HPA resources.
|
||||||
type HPAProvider struct {
|
type HPAProvider struct {
|
||||||
client kubernetes.Interface
|
client kubernetes.Interface
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
collectorScheduler *CollectorScheduler
|
collectorScheduler *CollectorScheduler
|
||||||
collectorInterval time.Duration
|
collectorInterval time.Duration
|
||||||
metricSink chan metricCollection
|
metricSink chan metricCollection
|
||||||
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
|
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
|
||||||
metricStore *MetricStore
|
metricStore *MetricStore
|
||||||
collectorFactory *collector.CollectorFactory
|
collectorFactory *collector.CollectorFactory
|
||||||
recorder kube_record.EventRecorder
|
recorder kube_record.EventRecorder
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
|
disregardIncompatibleHPAs bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// metricCollection is a container for sending collected metrics across a
|
// metricCollection is a container for sending collected metrics across a
|
||||||
@@ -69,7 +71,7 @@ type metricCollection struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewHPAProvider initializes a new HPAProvider.
|
// NewHPAProvider initializes a new HPAProvider.
|
||||||
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider {
|
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
|
||||||
metricsc := make(chan metricCollection)
|
metricsc := make(chan metricCollection)
|
||||||
|
|
||||||
return &HPAProvider{
|
return &HPAProvider{
|
||||||
@@ -80,9 +82,10 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
|
|||||||
metricStore: NewMetricStore(func() time.Time {
|
metricStore: NewMetricStore(func() time.Time {
|
||||||
return time.Now().UTC().Add(15 * time.Minute)
|
return time.Now().UTC().Add(15 * time.Minute)
|
||||||
}),
|
}),
|
||||||
collectorFactory: collectorFactory,
|
collectorFactory: collectorFactory,
|
||||||
recorder: recorder.CreateEventRecorder(client),
|
recorder: recorder.CreateEventRecorder(client),
|
||||||
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
||||||
|
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,15 +157,20 @@ func (p *HPAProvider) updateHPAs() error {
|
|||||||
interval = p.collectorInterval
|
interval = p.collectorInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval)
|
c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
|
|
||||||
|
// Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
|
||||||
|
if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
|
||||||
|
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
cache = false
|
cache = false
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
p.logger.Infof("Adding new metrics collector: %T", collector)
|
p.logger.Infof("Adding new metrics collector: %T", c)
|
||||||
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector)
|
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)
|
||||||
}
|
}
|
||||||
newHPAs++
|
newHPAs++
|
||||||
|
|
||||||
|
@@ -77,7 +77,7 @@ func TestUpdateHPAs(t *testing.T) {
|
|||||||
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
|
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
|
||||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||||
|
|
||||||
err = provider.updateHPAs()
|
err = provider.updateHPAs()
|
||||||
@@ -94,3 +94,56 @@ func TestUpdateHPAs(t *testing.T) {
|
|||||||
|
|
||||||
require.Len(t, provider.collectorScheduler.table, 1)
|
require.Len(t, provider.collectorScheduler.table, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
|
||||||
|
// Test HPAProvider with disregardIncompatibleHPAs = true
|
||||||
|
|
||||||
|
value := resource.MustParse("1k")
|
||||||
|
|
||||||
|
hpa := &autoscaling.HorizontalPodAutoscaler{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "hpa1",
|
||||||
|
Namespace: "default",
|
||||||
|
Annotations: map[string]string{},
|
||||||
|
},
|
||||||
|
Spec: autoscaling.HorizontalPodAutoscalerSpec{
|
||||||
|
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
|
||||||
|
Kind: "Deployment",
|
||||||
|
Name: "app",
|
||||||
|
APIVersion: "apps/v1",
|
||||||
|
},
|
||||||
|
MinReplicas: &[]int32{1}[0],
|
||||||
|
MaxReplicas: 10,
|
||||||
|
Metrics: []autoscaling.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscaling.ExternalMetricSourceType,
|
||||||
|
External: &autoscaling.ExternalMetricSource{
|
||||||
|
Metric: autoscaling.MetricIdentifier{
|
||||||
|
Name: "some-other-metric",
|
||||||
|
},
|
||||||
|
Target: autoscaling.MetricTarget{
|
||||||
|
Type: autoscaling.AverageValueMetricType,
|
||||||
|
AverageValue: &value,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeClient := fake.NewSimpleClientset()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
collectorFactory := collector.NewCollectorFactory()
|
||||||
|
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
|
||||||
|
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||||
|
|
||||||
|
err = provider.updateHPAs()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
@@ -110,7 +110,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
|||||||
"whether to enable AWS external metrics")
|
"whether to enable AWS external metrics")
|
||||||
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
|
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
|
||||||
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
|
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
|
||||||
|
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
|
||||||
|
"disregard failing to create collectors for incompatible HPAs")
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,7 +229,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
|||||||
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
|
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
|
||||||
}
|
}
|
||||||
|
|
||||||
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory)
|
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
|
||||||
|
|
||||||
go hpaProvider.Run(ctx)
|
go hpaProvider.Run(ctx)
|
||||||
|
|
||||||
@@ -332,4 +333,7 @@ type AdapterServerOptions struct {
|
|||||||
MetricsAddress string
|
MetricsAddress string
|
||||||
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
|
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
|
||||||
SkipperBackendWeightAnnotation []string
|
SkipperBackendWeightAnnotation []string
|
||||||
|
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
|
||||||
|
// kube-metrics-adapter beside another Metrics Provider
|
||||||
|
DisregardIncompatibleHPAs bool
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user