Compare commits

...

3 Commits

Author SHA1 Message Date
Arjun
c9fa15c7d4 Updated the tests (#103)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-02-04 09:48:50 +01:00
Arjun
e3330dcf43 Reuse the HTTP client for scraping pods (#102)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-01-30 17:49:22 +01:00
Tomás Pinho
8e4662b26c Permit disregarding incompatible HPAs (#95)
* This commit adds a --disregard-incompatible-hpas that makes the HPA
provider stop erroring out when a collector cannot be created for a
metric in a HPA. Useful when kube-metrics-adapter runs alongside another
metrics provider. Fixes issue #94.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Make tests pass

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Wraps the Plugin Not Found error in a new type that can be checked by the caller of a function to determine if its contents should be logged or added as an event to the HPA, when this HPA is incompatible.
The disregardIncompatibleHPAs is now targetting only the log or addition of the same event.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Invert if expression to select when we should log
CreateNewMetricsCollector errors: don't log when both conditions are true - it's not a PluginNotFoundError
and disregardIncompatibleHPAs flag is set to true. This way, if an error
is NOT PluginNotFoundError it will always be logged, and when it IS
PluginNotFoundError it will only be logged when
disregardIncompatibleHPAs is false.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Remove redundant "whether to"

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add test case for updating HPAs via HPA Provider while disregarding
incompatible HPAs.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2020-01-30 11:33:15 +01:00
8 changed files with 129 additions and 33 deletions

View File

@ -18,6 +18,7 @@ package main
import (
"flag"
_ "net/http/pprof"
"os"
"runtime"

View File

@ -46,6 +46,14 @@ type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
}
type PluginNotFoundError struct {
metricTypeName MetricTypeName
}
func (p *PluginNotFoundError) Error() string {
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
}
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
if metricCollector == "" {
c.podsPlugins.Any = plugin
@ -139,7 +147,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
}
}
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
}
type MetricTypeName struct {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"strconv"
@ -23,11 +24,13 @@ type JSONPathMetricsGetter struct {
path string
port int
aggregator string
client *http.Client
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
getter := &JSONPathMetricsGetter{}
httpClient := defaultHTTPClient()
getter := &JSONPathMetricsGetter{client: httpClient}
if v, ok := config["json-key"]; ok {
path, err := jsonpath.Compile(v)
@ -61,11 +64,26 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
return getter, nil
}
func defaultHTTPClient() *http.Client {
client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 15 * time.Second,
}).DialContext,
MaxIdleConns: 50,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 15 * time.Second,
}
return client
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
data, err := g.getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil {
return 0, err
}
@ -122,16 +140,11 @@ func castSlice(in []interface{}) ([]float64, error) {
}
// getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
func (g *JSONPathMetricsGetter) getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
}
httpClient := &http.Client{
Timeout: 15 * time.Second,
Transport: &http.Transport{},
}
if scheme == "" {
scheme = "http"
}
@ -147,7 +160,7 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
return nil, err
}
resp, err := httpClient.Do(request)
resp, err := g.client.Do(request)
if err != nil {
return nil, err
}

View File

@ -7,6 +7,13 @@ import (
"github.com/stretchr/testify/require"
)
func compareMetricsGetter(t *testing.T, first, second *JSONPathMetricsGetter) {
require.Equal(t, first.jsonPath, second.jsonPath)
require.Equal(t, first.scheme, second.scheme)
require.Equal(t, first.path, second.path)
require.Equal(t, first.port, second.port)
}
func TestNewJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
@ -18,7 +25,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
require.NoError(t, err1)
require.Equal(t, &JSONPathMetricsGetter{
compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath1,
scheme: "http",
path: "/metrics",
@ -36,7 +43,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
require.NoError(t, err2)
require.Equal(t, &JSONPathMetricsGetter{
compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath2,
scheme: "http",
path: "/metrics",

View File

@ -2,6 +2,7 @@ package collector
import (
"fmt"
"net/http"
"time"
log "github.com/sirupsen/logrus"
@ -37,6 +38,7 @@ type PodCollector struct {
metricType autoscalingv2.MetricSourceType
interval time.Duration
logger *log.Entry
httpClient *http.Client
}
type PodMetricsGetter interface {

View File

@ -2,6 +2,7 @@ package provider
import (
"context"
"errors"
"reflect"
"sync"
"time"
@ -49,16 +50,17 @@ var (
// HPAProvider is a base provider for initializing metric collectors based on
// HPA resources.
type HPAProvider struct {
client kubernetes.Interface
interval time.Duration
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
logger *log.Entry
client kubernetes.Interface
interval time.Duration
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
logger *log.Entry
disregardIncompatibleHPAs bool
}
// metricCollection is a container for sending collected metrics across a
@ -69,7 +71,7 @@ type metricCollection struct {
}
// NewHPAProvider initializes a new HPAProvider.
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider {
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
metricsc := make(chan metricCollection)
return &HPAProvider{
@ -80,9 +82,10 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
metricStore: NewMetricStore(func() time.Time {
return time.Now().UTC().Add(15 * time.Minute)
}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
}
}
@ -154,15 +157,20 @@ func (p *HPAProvider) updateHPAs() error {
interval = p.collectorInterval
}
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval)
c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
if err != nil {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
// Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
}
cache = false
continue
}
p.logger.Infof("Adding new metrics collector: %T", collector)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector)
p.logger.Infof("Adding new metrics collector: %T", c)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)
}
newHPAs++

View File

@ -77,7 +77,7 @@ func TestUpdateHPAs(t *testing.T) {
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
@ -94,3 +94,56 @@ func TestUpdateHPAs(t *testing.T) {
require.Len(t, provider.collectorScheduler.table, 1)
}
func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
// Test HPAProvider with disregardIncompatibleHPAs = true
value := resource.MustParse("1k")
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
Annotations: map[string]string{},
},
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscaling.MetricSpec{
{
Type: autoscaling.ExternalMetricSourceType,
External: &autoscaling.ExternalMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "some-other-metric",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
},
}
fakeClient := fake.NewSimpleClientset()
var err error
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
require.NoError(t, err)
}

View File

@ -110,7 +110,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"whether to enable AWS external metrics")
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
"disregard failing to create collectors for incompatible HPAs")
return cmd
}
@ -228,7 +229,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
}
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory)
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
go hpaProvider.Run(ctx)
@ -332,4 +333,7 @@ type AdapterServerOptions struct {
MetricsAddress string
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
SkipperBackendWeightAnnotation []string
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
// kube-metrics-adapter beside another Metrics Provider
DisregardIncompatibleHPAs bool
}