Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
c9fa15c7d4 | ||
|
e3330dcf43 | ||
|
8e4662b26c |
@ -46,6 +46,14 @@ type CollectorPlugin interface {
|
||||
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
||||
}
|
||||
|
||||
type PluginNotFoundError struct {
|
||||
metricTypeName MetricTypeName
|
||||
}
|
||||
|
||||
func (p *PluginNotFoundError) Error() string {
|
||||
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
|
||||
}
|
||||
|
||||
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
|
||||
if metricCollector == "" {
|
||||
c.podsPlugins.Any = plugin
|
||||
@ -139,7 +147,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
|
||||
return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
|
||||
}
|
||||
|
||||
type MetricTypeName struct {
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@ -23,11 +24,13 @@ type JSONPathMetricsGetter struct {
|
||||
path string
|
||||
port int
|
||||
aggregator string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
|
||||
getter := &JSONPathMetricsGetter{}
|
||||
httpClient := defaultHTTPClient()
|
||||
getter := &JSONPathMetricsGetter{client: httpClient}
|
||||
|
||||
if v, ok := config["json-key"]; ok {
|
||||
path, err := jsonpath.Compile(v)
|
||||
@ -61,11 +64,26 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
|
||||
return getter, nil
|
||||
}
|
||||
|
||||
func defaultHTTPClient() *http.Client {
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 15 * time.Second,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 50,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
},
|
||||
Timeout: 15 * time.Second,
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
||||
// endpoint and extracting the desired value using the specified json path
|
||||
// query.
|
||||
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
||||
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
|
||||
data, err := g.getPodMetrics(pod, g.scheme, g.path, g.port)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -122,16 +140,11 @@ func castSlice(in []interface{}) ([]float64, error) {
|
||||
}
|
||||
|
||||
// getPodMetrics returns the content of the pods metrics endpoint.
|
||||
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
|
||||
func (g *JSONPathMetricsGetter) getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
|
||||
if pod.Status.PodIP == "" {
|
||||
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
|
||||
}
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: 15 * time.Second,
|
||||
Transport: &http.Transport{},
|
||||
}
|
||||
|
||||
if scheme == "" {
|
||||
scheme = "http"
|
||||
}
|
||||
@ -147,7 +160,7 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := httpClient.Do(request)
|
||||
resp, err := g.client.Do(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -7,6 +7,13 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func compareMetricsGetter(t *testing.T, first, second *JSONPathMetricsGetter) {
|
||||
require.Equal(t, first.jsonPath, second.jsonPath)
|
||||
require.Equal(t, first.scheme, second.scheme)
|
||||
require.Equal(t, first.path, second.path)
|
||||
require.Equal(t, first.port, second.port)
|
||||
}
|
||||
|
||||
func TestNewJSONPathMetricsGetter(t *testing.T) {
|
||||
configNoAggregator := map[string]string{
|
||||
"json-key": "$.value",
|
||||
@ -18,7 +25,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
|
||||
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
|
||||
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, &JSONPathMetricsGetter{
|
||||
compareMetricsGetter(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath1,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
@ -36,7 +43,7 @@ func TestNewJSONPathMetricsGetter(t *testing.T) {
|
||||
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
|
||||
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, &JSONPathMetricsGetter{
|
||||
compareMetricsGetter(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath2,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
|
@ -2,6 +2,7 @@ package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -37,6 +38,7 @@ type PodCollector struct {
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
interval time.Duration
|
||||
logger *log.Entry
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
type PodMetricsGetter interface {
|
||||
|
@ -2,6 +2,7 @@ package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
@ -49,16 +50,17 @@ var (
|
||||
// HPAProvider is a base provider for initializing metric collectors based on
|
||||
// HPA resources.
|
||||
type HPAProvider struct {
|
||||
client kubernetes.Interface
|
||||
interval time.Duration
|
||||
collectorScheduler *CollectorScheduler
|
||||
collectorInterval time.Duration
|
||||
metricSink chan metricCollection
|
||||
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
|
||||
metricStore *MetricStore
|
||||
collectorFactory *collector.CollectorFactory
|
||||
recorder kube_record.EventRecorder
|
||||
logger *log.Entry
|
||||
client kubernetes.Interface
|
||||
interval time.Duration
|
||||
collectorScheduler *CollectorScheduler
|
||||
collectorInterval time.Duration
|
||||
metricSink chan metricCollection
|
||||
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
|
||||
metricStore *MetricStore
|
||||
collectorFactory *collector.CollectorFactory
|
||||
recorder kube_record.EventRecorder
|
||||
logger *log.Entry
|
||||
disregardIncompatibleHPAs bool
|
||||
}
|
||||
|
||||
// metricCollection is a container for sending collected metrics across a
|
||||
@ -69,7 +71,7 @@ type metricCollection struct {
|
||||
}
|
||||
|
||||
// NewHPAProvider initializes a new HPAProvider.
|
||||
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider {
|
||||
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
|
||||
metricsc := make(chan metricCollection)
|
||||
|
||||
return &HPAProvider{
|
||||
@ -80,9 +82,10 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
|
||||
metricStore: NewMetricStore(func() time.Time {
|
||||
return time.Now().UTC().Add(15 * time.Minute)
|
||||
}),
|
||||
collectorFactory: collectorFactory,
|
||||
recorder: recorder.CreateEventRecorder(client),
|
||||
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
||||
collectorFactory: collectorFactory,
|
||||
recorder: recorder.CreateEventRecorder(client),
|
||||
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
||||
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
|
||||
}
|
||||
}
|
||||
|
||||
@ -154,15 +157,20 @@ func (p *HPAProvider) updateHPAs() error {
|
||||
interval = p.collectorInterval
|
||||
}
|
||||
|
||||
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval)
|
||||
c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
|
||||
if err != nil {
|
||||
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
|
||||
|
||||
// Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
|
||||
if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
|
||||
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
|
||||
}
|
||||
|
||||
cache = false
|
||||
continue
|
||||
}
|
||||
|
||||
p.logger.Infof("Adding new metrics collector: %T", collector)
|
||||
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector)
|
||||
p.logger.Infof("Adding new metrics collector: %T", c)
|
||||
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)
|
||||
}
|
||||
newHPAs++
|
||||
|
||||
|
@ -77,7 +77,7 @@ func TestUpdateHPAs(t *testing.T) {
|
||||
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||
require.NoError(t, err)
|
||||
|
||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
|
||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
|
||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||
|
||||
err = provider.updateHPAs()
|
||||
@ -94,3 +94,56 @@ func TestUpdateHPAs(t *testing.T) {
|
||||
|
||||
require.Len(t, provider.collectorScheduler.table, 1)
|
||||
}
|
||||
|
||||
func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
|
||||
// Test HPAProvider with disregardIncompatibleHPAs = true
|
||||
|
||||
value := resource.MustParse("1k")
|
||||
|
||||
hpa := &autoscaling.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "hpa1",
|
||||
Namespace: "default",
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
Spec: autoscaling.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
|
||||
Kind: "Deployment",
|
||||
Name: "app",
|
||||
APIVersion: "apps/v1",
|
||||
},
|
||||
MinReplicas: &[]int32{1}[0],
|
||||
MaxReplicas: 10,
|
||||
Metrics: []autoscaling.MetricSpec{
|
||||
{
|
||||
Type: autoscaling.ExternalMetricSourceType,
|
||||
External: &autoscaling.ExternalMetricSource{
|
||||
Metric: autoscaling.MetricIdentifier{
|
||||
Name: "some-other-metric",
|
||||
},
|
||||
Target: autoscaling.MetricTarget{
|
||||
Type: autoscaling.AverageValueMetricType,
|
||||
AverageValue: &value,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
|
||||
var err error
|
||||
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
|
||||
require.NoError(t, err)
|
||||
|
||||
collectorFactory := collector.NewCollectorFactory()
|
||||
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||
require.NoError(t, err)
|
||||
|
||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
|
||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||
|
||||
err = provider.updateHPAs()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
@ -110,7 +110,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
"whether to enable AWS external metrics")
|
||||
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
|
||||
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
|
||||
|
||||
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
|
||||
"disregard failing to create collectors for incompatible HPAs")
|
||||
return cmd
|
||||
}
|
||||
|
||||
@ -228,7 +229,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
|
||||
}
|
||||
|
||||
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory)
|
||||
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
|
||||
|
||||
go hpaProvider.Run(ctx)
|
||||
|
||||
@ -332,4 +333,7 @@ type AdapterServerOptions struct {
|
||||
MetricsAddress string
|
||||
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
|
||||
SkipperBackendWeightAnnotation []string
|
||||
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
|
||||
// kube-metrics-adapter beside another Metrics Provider
|
||||
DisregardIncompatibleHPAs bool
|
||||
}
|
||||
|
Reference in New Issue
Block a user