Compare commits

...

3 Commits

Author SHA1 Message Date
Arjun
c9fa15c7d4 Updated the tests (#103)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-02-04 09:48:50 +01:00
Arjun
e3330dcf43 Reuse the HTTP client for scraping pods (#102)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-01-30 17:49:22 +01:00
Tomás Pinho
8e4662b26c Permit disregarding incompatible HPAs (#95)
* This commit adds a --disregard-incompatible-hpas that makes the HPA
provider stop erroring out when a collector cannot be created for a
metric in a HPA. Useful when kube-metrics-adapter runs alongside another
metrics provider. Fixes issue #94.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Make tests pass

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Wraps the Plugin Not Found error in a new type that can be checked by the caller of a function to determine if its contents should be logged or added as an event to the HPA, when this HPA is incompatible.
The disregardIncompatibleHPAs is now targetting only the log or addition of the same event.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Invert if expression to select when we should log
CreateNewMetricsCollector errors: don't log when both conditions are true - it's not a PluginNotFoundError
and disregardIncompatibleHPAs flag is set to true. This way, if an error
is NOT PluginNotFoundError it will always be logged, and when it IS
PluginNotFoundError it will only be logged when
disregardIncompatibleHPAs is false.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Remove redundant "whether to"

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add test case for updating HPAs via HPA Provider while disregarding
incompatible HPAs.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2020-01-30 11:33:15 +01:00
8 changed files with 129 additions and 33 deletions

View File

@@ -18,6 +18,7 @@ package main
import ( import (
"flag" "flag"
_ "net/http/pprof"
"os" "os"
"runtime" "runtime"

View File

@@ -46,6 +46,14 @@ type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
} }
type PluginNotFoundError struct {
metricTypeName MetricTypeName
}
func (p *PluginNotFoundError) Error() string {
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
}
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error { func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
if metricCollector == "" { if metricCollector == "" {
c.podsPlugins.Any = plugin c.podsPlugins.Any = plugin
@@ -139,7 +147,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
} }
} }
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName) return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
} }
type MetricTypeName struct { type MetricTypeName struct {

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math" "math"
"net"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
@@ -23,11 +24,13 @@ type JSONPathMetricsGetter struct {
path string path string
port int port int
aggregator string aggregator string
client *http.Client
} }
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter. // NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) { func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
getter := &JSONPathMetricsGetter{} httpClient := defaultHTTPClient()
getter := &JSONPathMetricsGetter{client: httpClient}
if v, ok := config["json-key"]; ok { if v, ok := config["json-key"]; ok {
path, err := jsonpath.Compile(v) path, err := jsonpath.Compile(v)
@@ -61,11 +64,26 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
return getter, nil return getter, nil
} }
func defaultHTTPClient() *http.Client {
client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 15 * time.Second,
}).DialContext,
MaxIdleConns: 50,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 15 * time.Second,
}
return client
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric // GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path // endpoint and extracting the desired value using the specified json path
// query. // query.
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) { func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
data, err := getPodMetrics(pod, g.scheme, g.path, g.port) data, err := g.getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -122,16 +140,11 @@ func castSlice(in []interface{}) ([]float64, error) {
} }
// getPodMetrics returns the content of the pods metrics endpoint. // getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) { func (g *JSONPathMetricsGetter) getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" { if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace) return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
} }
httpClient := &http.Client{
Timeout: 15 * time.Second,
Transport: &http.Transport{},
}
if scheme == "" { if scheme == "" {
scheme = "http" scheme = "http"
} }
@@ -147,7 +160,7 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
return nil, err return nil, err
} }
resp, err := httpClient.Do(request) resp, err := g.client.Do(request)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -7,6 +7,13 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func compareMetricsGetter(t *testing.T, first, second *JSONPathMetricsGetter) {
require.Equal(t, first.jsonPath, second.jsonPath)
require.Equal(t, first.scheme, second.scheme)
require.Equal(t, first.path, second.path)
require.Equal(t, first.port, second.port)
}
func TestNewJSONPathMetricsGetter(t *testing.T) { func TestNewJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{ configNoAggregator := map[string]string{
"json-key": "$.value", "json-key": "$.value",
@@ -18,7 +25,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator) getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
require.NoError(t, err1) require.NoError(t, err1)
require.Equal(t, &JSONPathMetricsGetter{ compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath1, jsonPath: jpath1,
scheme: "http", scheme: "http",
path: "/metrics", path: "/metrics",
@@ -36,7 +43,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator) getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
require.NoError(t, err2) require.NoError(t, err2)
require.Equal(t, &JSONPathMetricsGetter{ compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath2, jsonPath: jpath2,
scheme: "http", scheme: "http",
path: "/metrics", path: "/metrics",

View File

@@ -2,6 +2,7 @@ package collector
import ( import (
"fmt" "fmt"
"net/http"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -37,6 +38,7 @@ type PodCollector struct {
metricType autoscalingv2.MetricSourceType metricType autoscalingv2.MetricSourceType
interval time.Duration interval time.Duration
logger *log.Entry logger *log.Entry
httpClient *http.Client
} }
type PodMetricsGetter interface { type PodMetricsGetter interface {

View File

@@ -2,6 +2,7 @@ package provider
import ( import (
"context" "context"
"errors"
"reflect" "reflect"
"sync" "sync"
"time" "time"
@@ -49,16 +50,17 @@ var (
// HPAProvider is a base provider for initializing metric collectors based on // HPAProvider is a base provider for initializing metric collectors based on
// HPA resources. // HPA resources.
type HPAProvider struct { type HPAProvider struct {
client kubernetes.Interface client kubernetes.Interface
interval time.Duration interval time.Duration
collectorScheduler *CollectorScheduler collectorScheduler *CollectorScheduler
collectorInterval time.Duration collectorInterval time.Duration
metricSink chan metricCollection metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore metricStore *MetricStore
collectorFactory *collector.CollectorFactory collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder recorder kube_record.EventRecorder
logger *log.Entry logger *log.Entry
disregardIncompatibleHPAs bool
} }
// metricCollection is a container for sending collected metrics across a // metricCollection is a container for sending collected metrics across a
@@ -69,7 +71,7 @@ type metricCollection struct {
} }
// NewHPAProvider initializes a new HPAProvider. // NewHPAProvider initializes a new HPAProvider.
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider { func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
metricsc := make(chan metricCollection) metricsc := make(chan metricCollection)
return &HPAProvider{ return &HPAProvider{
@@ -80,9 +82,10 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
metricStore: NewMetricStore(func() time.Time { metricStore: NewMetricStore(func() time.Time {
return time.Now().UTC().Add(15 * time.Minute) return time.Now().UTC().Add(15 * time.Minute)
}), }),
collectorFactory: collectorFactory, collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client), recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}), logger: log.WithFields(log.Fields{"provider": "hpa"}),
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
} }
} }
@@ -154,15 +157,20 @@ func (p *HPAProvider) updateHPAs() error {
interval = p.collectorInterval interval = p.collectorInterval
} }
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval) c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
if err != nil { if err != nil {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
// Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
}
cache = false cache = false
continue continue
} }
p.logger.Infof("Adding new metrics collector: %T", collector) p.logger.Infof("Adding new metrics collector: %T", c)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector) p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)
} }
newHPAs++ newHPAs++

View File

@@ -77,7 +77,7 @@ func TestUpdateHPAs(t *testing.T) {
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{}) err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err) require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory) provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink) provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs() err = provider.updateHPAs()
@@ -94,3 +94,56 @@ func TestUpdateHPAs(t *testing.T) {
require.Len(t, provider.collectorScheduler.table, 1) require.Len(t, provider.collectorScheduler.table, 1)
} }
func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
// Test HPAProvider with disregardIncompatibleHPAs = true
value := resource.MustParse("1k")
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
Annotations: map[string]string{},
},
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscaling.MetricSpec{
{
Type: autoscaling.ExternalMetricSourceType,
External: &autoscaling.ExternalMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "some-other-metric",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
},
}
fakeClient := fake.NewSimpleClientset()
var err error
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
require.NoError(t, err)
}

View File

@@ -110,7 +110,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"whether to enable AWS external metrics") "whether to enable AWS external metrics")
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1") flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics") flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
"disregard failing to create collectors for incompatible HPAs")
return cmd return cmd
} }
@@ -228,7 +229,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions)) collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
} }
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory) hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
go hpaProvider.Run(ctx) go hpaProvider.Run(ctx)
@@ -332,4 +333,7 @@ type AdapterServerOptions struct {
MetricsAddress string MetricsAddress string
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights // SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
SkipperBackendWeightAnnotation []string SkipperBackendWeightAnnotation []string
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
// kube-metrics-adapter beside another Metrics Provider
DisregardIncompatibleHPAs bool
} }