mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-01-24 08:41:00 +00:00
Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
120950078c | ||
|
0790bc351a | ||
|
f6b2aede5b | ||
|
7d5e719eb0 | ||
|
7497a61a2c |
@ -106,6 +106,7 @@ metadata:
|
||||
metric-config.pods.requests-per-second.json-path/path: /metrics
|
||||
metric-config.pods.requests-per-second.json-path/port: "9090"
|
||||
metric-config.pods.requests-per-second.json-path/scheme: "https"
|
||||
metric-config.pods.requests-per-second.json-path/aggregator: "max"
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
@ -143,6 +144,11 @@ The other configuration options `path`, `port` and `scheme` specify where the me
|
||||
endpoint is exposed on the pod. The `path` and `port` options do not have default values
|
||||
so they must be defined. The `scheme` is optional and defaults to `http`.
|
||||
|
||||
The `aggregator` configuration option specifies the aggregation function used to aggregate
|
||||
values of JSONPath expressions that evaluate to arrays/slices of numbers.
|
||||
It's optional but when the expression evaluates to an array/slice, it's absence will
|
||||
produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`.
|
||||
|
||||
## Prometheus collector
|
||||
|
||||
The Prometheus collector is a generic collector which can map Prometheus
|
||||
|
@ -213,7 +213,9 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
|
||||
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
|
||||
metric.External.Metric.Selector != nil &&
|
||||
metric.External.Metric.Selector.MatchLabels != nil {
|
||||
config.Config = metric.External.Metric.Selector.MatchLabels
|
||||
for k, v := range metric.External.Metric.Selector.MatchLabels {
|
||||
config.Config[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@ -17,10 +18,11 @@ import (
|
||||
// querying the pods metrics endpoint and lookup the metric value as defined by
|
||||
// the json path query.
|
||||
type JSONPathMetricsGetter struct {
|
||||
jsonPath *jsonpath.Compiled
|
||||
scheme string
|
||||
path string
|
||||
port int
|
||||
jsonPath *jsonpath.Compiled
|
||||
scheme string
|
||||
path string
|
||||
port int
|
||||
aggregator string
|
||||
}
|
||||
|
||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||
@ -28,12 +30,12 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
|
||||
getter := &JSONPathMetricsGetter{}
|
||||
|
||||
if v, ok := config["json-key"]; ok {
|
||||
pat, err := jsonpath.Compile(v)
|
||||
path, err := jsonpath.Compile(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
|
||||
}
|
||||
|
||||
getter.jsonPath = pat
|
||||
getter.jsonPath = path
|
||||
}
|
||||
|
||||
if v, ok := config["scheme"]; ok {
|
||||
@ -52,6 +54,10 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
|
||||
getter.port = n
|
||||
}
|
||||
|
||||
if v, ok := config["aggregator"]; ok {
|
||||
getter.aggregator = v
|
||||
}
|
||||
|
||||
return getter, nil
|
||||
}
|
||||
|
||||
@ -83,11 +89,38 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
||||
return float64(res), nil
|
||||
case float64:
|
||||
return res, nil
|
||||
case []interface{}:
|
||||
s, err := castSlice(res)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return reduce(s, g.aggregator)
|
||||
default:
|
||||
return 0, fmt.Errorf("unsupported type %T", res)
|
||||
}
|
||||
}
|
||||
|
||||
// castSlice takes a slice of interface and returns a slice of float64 if all
|
||||
// values in slice were castable, else returns an error
|
||||
func castSlice(in []interface{}) ([]float64, error) {
|
||||
out := []float64{}
|
||||
|
||||
for _, v := range in {
|
||||
switch v := v.(type) {
|
||||
case int:
|
||||
out = append(out, float64(v))
|
||||
case float32:
|
||||
out = append(out, float64(v))
|
||||
case float64:
|
||||
out = append(out, v)
|
||||
default:
|
||||
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// getPodMetrics returns the content of the pods metrics endpoint.
|
||||
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
|
||||
if pod.Status.PodIP == "" {
|
||||
@ -131,3 +164,64 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
|
||||
func reduce(values []float64, aggregator string) (float64, error) {
|
||||
switch aggregator {
|
||||
case "avg":
|
||||
return avg(values), nil
|
||||
case "min":
|
||||
return min(values), nil
|
||||
case "max":
|
||||
return max(values), nil
|
||||
case "sum":
|
||||
return sum(values), nil
|
||||
default:
|
||||
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
|
||||
}
|
||||
}
|
||||
|
||||
// avg implements the average mathematical function over a slice of float64
|
||||
func avg(values []float64) float64 {
|
||||
sum := sum(values)
|
||||
return sum / float64(len(values))
|
||||
}
|
||||
|
||||
// min implements the absolute minimum mathematical function over a slice of float64
|
||||
func min(values []float64) float64 {
|
||||
// initialized with positive infinity, all finite numbers are smaller than it
|
||||
curMin := math.Inf(1)
|
||||
|
||||
for _, v := range values {
|
||||
if v < curMin {
|
||||
curMin = v
|
||||
}
|
||||
}
|
||||
|
||||
return curMin
|
||||
}
|
||||
|
||||
// max implements the absolute maximum mathematical function over a slice of float64
|
||||
func max(values []float64) float64 {
|
||||
// initialized with negative infinity, all finite numbers are bigger than it
|
||||
curMax := math.Inf(-1)
|
||||
|
||||
for _, v := range values {
|
||||
if v > curMax {
|
||||
curMax = v
|
||||
}
|
||||
}
|
||||
|
||||
return curMax
|
||||
}
|
||||
|
||||
// sum implements the summation mathematical function over a slice of float64
|
||||
func sum(values []float64) float64 {
|
||||
res := 0.0
|
||||
|
||||
for _, v := range values {
|
||||
res += v
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
105
pkg/collector/json_path_collector_test.go
Normal file
105
pkg/collector/json_path_collector_test.go
Normal file
@ -0,0 +1,105 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/oliveagle/jsonpath"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewJSONPathMetricsGetter(t *testing.T) {
|
||||
configNoAggregator := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
|
||||
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
|
||||
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath1,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
}, getterNoAggregator)
|
||||
|
||||
configAggregator := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"aggregator": "avg",
|
||||
}
|
||||
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
|
||||
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
|
||||
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath2,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
aggregator: "avg",
|
||||
}, getterAggregator)
|
||||
|
||||
configErrorJSONPath := map[string]string{
|
||||
"json-key": "{}",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
|
||||
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
|
||||
require.Error(t, err3)
|
||||
|
||||
configErrorPort := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "a9090",
|
||||
}
|
||||
|
||||
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
|
||||
require.Error(t, err4)
|
||||
}
|
||||
|
||||
func TestCastSlice(t *testing.T) {
|
||||
res1, err1 := castSlice([]interface{}{1, 2, 3})
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
|
||||
|
||||
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
|
||||
|
||||
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
|
||||
require.NoError(t, err3)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
|
||||
|
||||
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
|
||||
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
|
||||
require.Equal(t, []float64(nil), res4)
|
||||
}
|
||||
|
||||
func TestReduce(t *testing.T) {
|
||||
average, err1 := reduce([]float64{1, 2, 3}, "avg")
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, 2.0, average)
|
||||
|
||||
min, err2 := reduce([]float64{1, 2, 3}, "min")
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, 1.0, min)
|
||||
|
||||
max, err3 := reduce([]float64{1, 2, 3}, "max")
|
||||
require.NoError(t, err3)
|
||||
require.Equal(t, 3.0, max)
|
||||
|
||||
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
|
||||
require.NoError(t, err4)
|
||||
require.Equal(t, 6.0, sum)
|
||||
|
||||
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
|
||||
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
|
||||
}
|
Reference in New Issue
Block a user