diff --git a/README.md b/README.md index 275bc68..7856327 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,7 @@ metadata: metric-config.pods.requests-per-second.json-path/path: /metrics metric-config.pods.requests-per-second.json-path/port: "9090" metric-config.pods.requests-per-second.json-path/scheme: "https" + metric-config.pods.requests-per-second.json-path/aggregator: "max" spec: scaleTargetRef: apiVersion: apps/v1 @@ -143,6 +144,11 @@ The other configuration options `path`, `port` and `scheme` specify where the me endpoint is exposed on the pod. The `path` and `port` options do not have default values so they must be defined. The `scheme` is optional and defaults to `http`. +The `aggregator` configuration option specifies the aggregation function used to aggregate +values of JSONPath expressions that evaluate to arrays/slices of numbers. +It's optional but when the expression evaluates to an array/slice, it's absence will +produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`. + ## Prometheus collector The Prometheus collector is a generic collector which can map Prometheus diff --git a/pkg/collector/json_path_collector.go b/pkg/collector/json_path_collector.go index d7164c4..e6e818f 100644 --- a/pkg/collector/json_path_collector.go +++ b/pkg/collector/json_path_collector.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "net/http" "net/url" "strconv" @@ -17,10 +18,11 @@ import ( // querying the pods metrics endpoint and lookup the metric value as defined by // the json path query. type JSONPathMetricsGetter struct { - jsonPath *jsonpath.Compiled - scheme string - path string - port int + jsonPath *jsonpath.Compiled + scheme string + path string + port int + aggregator string } // NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter. @@ -52,6 +54,10 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, getter.port = n } + if v, ok := config["aggregator"]; ok { + getter.aggregator = v + } + return getter, nil } @@ -83,6 +89,12 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) { return float64(res), nil case float64: return res, nil + case []int: + return reduce(intsToFloat64s(res), g.aggregator) + case []float32: + return reduce(float32sToFloat64s(res), g.aggregator) + case []float64: + return reduce(res, g.aggregator) default: return 0, fmt.Errorf("unsupported type %T", res) } @@ -131,3 +143,82 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro return data, nil } + +// intsToFloat64s will convert a slice of int to a slice of float64 +func intsToFloat64s(in []int) (out []float64) { + out = []float64{} + for _, v := range in { + out = append(out, float64(v)) + } + return +} + +// float32sToFloat64s will convert a slice of float32 to a slice of float64 +func float32sToFloat64s(in []float32) (out []float64) { + out = []float64{} + for _, v := range in { + out = append(out, float64(v)) + } + return +} + +// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned. +func reduce(values []float64, aggregator string) (float64, error) { + switch aggregator { + case "avg": + return avg(values), nil + case "min": + return min(values), nil + case "max": + return max(values), nil + case "sum": + return sum(values), nil + default: + return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator) + } +} + +// avg implements the average mathematical function over a slice of float64 +func avg(values []float64) float64 { + sum := sum(values) + return sum / float64(len(values)) +} + +// min implements the absolute minimum mathematical function over a slice of float64 +func min(values []float64) float64 { + // initialized with positive infinity, all finite numbers are smaller than it + curMin := math.Inf(1) + + for _, v := range values { + if v < curMin { + curMin = v + } + } + + return curMin +} + +// max implements the absolute maximum mathematical function over a slice of float64 +func max(values []float64) float64 { + // initialized with negative infinity, all finite numbers are bigger than it + curMax := math.Inf(-1) + + for _, v := range values { + if v > curMax { + curMax = v + } + } + + return curMax +} + +// sum implements the summation mathematical function over a slice of float64 +func sum(values []float64) float64 { + res := 0.0 + + for _, v := range values { + res += v + } + + return res +} diff --git a/pkg/collector/json_path_collector_test.go b/pkg/collector/json_path_collector_test.go new file mode 100644 index 0000000..994694e --- /dev/null +++ b/pkg/collector/json_path_collector_test.go @@ -0,0 +1,107 @@ +package collector + +import ( + "testing" + + "github.com/oliveagle/jsonpath" + "github.com/stretchr/testify/require" +) + +func TestNewJSONPathMetricsGetter(t *testing.T) { + configNoAggregator := map[string]string{ + "json-key": "$.value", + "scheme": "http", + "path": "/metrics", + "port": "9090", + } + jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"]) + getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator) + + require.NoError(t, err1) + require.Equal(t, &JSONPathMetricsGetter{ + jsonPath: jpath1, + scheme: "http", + path: "/metrics", + port: 9090, + }, getterNoAggregator) + + configAggregator := map[string]string{ + "json-key": "$.values", + "scheme": "http", + "path": "/metrics", + "port": "9090", + "aggregator": "avg", + } + jpath2, _ := jsonpath.Compile(configAggregator["json-key"]) + getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator) + + require.NoError(t, err2) + require.Equal(t, &JSONPathMetricsGetter{ + jsonPath: jpath2, + scheme: "http", + path: "/metrics", + port: 9090, + aggregator: "avg", + }, getterAggregator) + + configErrorJSONPath := map[string]string{ + "json-key": "{}", + "scheme": "http", + "path": "/metrics", + "port": "9090", + } + + _, err3 := NewJSONPathMetricsGetter(configErrorJSONPath) + require.Error(t, err3) + + configErrorPort := map[string]string{ + "json-key": "$.values", + "scheme": "http", + "path": "/metrics", + "port": "a9090", + } + + _, err4 := NewJSONPathMetricsGetter(configErrorPort) + require.Error(t, err4) +} + +func TestIntsToFloat64s(t *testing.T) { + noInts := []int{} + noFloat64s := intsToFloat64s(noInts) + require.Equal(t, []float64{}, noFloat64s) + + someInts := []int{1, 2, 3} + someFloat64s := intsToFloat64s(someInts) + require.Equal(t, []float64{1.0, 2.0, 3.0}, someFloat64s) +} + +func TestFloat32sToFloat64s(t *testing.T) { + noFloat32s := []float32{} + noFloat64s := float32sToFloat64s(noFloat32s) + require.Equal(t, []float64{}, noFloat64s) + + someFloat32s := []float32{1.0, 2.0, 3.0} + someFloat64s := float32sToFloat64s(someFloat32s) + require.Equal(t, []float64{1.0, 2.0, 3.0}, someFloat64s) +} + +func TestReduce(t *testing.T) { + average, err1 := reduce([]float64{1, 2, 3}, "avg") + require.NoError(t, err1) + require.Equal(t, 2.0, average) + + min, err2 := reduce([]float64{1, 2, 3}, "min") + require.NoError(t, err2) + require.Equal(t, 1.0, min) + + max, err3 := reduce([]float64{1, 2, 3}, "max") + require.NoError(t, err3) + require.Equal(t, 3.0, max) + + sum, err4 := reduce([]float64{1, 2, 3}, "sum") + require.NoError(t, err4) + require.Equal(t, 6.0, sum) + + _, err5 := reduce([]float64{1, 2, 3}, "inexistent_function") + require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function") +}