From f6b2aede5b7f747a649d36124e5ac0a6ada21124 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pinho?= Date: Thu, 24 Oct 2019 17:15:10 +0100 Subject: [PATCH] Support for JSONPath expressions that return arrays of values (#85) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * This is the initial implementation of support for JSONPath expressions that return arrays of values instead of a single value. This extends the collector to define a few handy reducer functions that take in the slice of float64 and return a single value. It also allows the user to define which reducer function to use via the "metric-config...json-path/reducer-func" annotation, which can have the values of 'avg', 'min', 'max' and 'sum'. For instance, the Ruby puma webserver exposes metrics of the form of $.worker_status[*].last_status.pool_capacity that have to be consumed as an array of values to be properly targetted. Signed-off-by: Tomás Pinho * Renames "reducerFunc" to "aggregator" for consistency with other collectors. Renames the annotation from "metric-config...json-path/reducer-func" to "metric-config...json-path/aggregator". Signed-off-by: Tomás Pinho * Return error instead of defaulting to the avg aggregator, when no valid aggregator name was specified and the JSONPath value is a slice of numbers. Signed-off-by: Tomás Pinho * Fix index out of range on initialized output slice that was found while writing tests. Signed-off-by: Tomás Pinho * Add tests for all added functions + NewJSONPathMetricsGetter Signed-off-by: Tomás Pinho * Add documentation on the `aggregator` option. Signed-off-by: Tomás Pinho * reducer function -> aggregator function Signed-off-by: Tomás Pinho * Fix comment to account for returned error. Signed-off-by: Tomás Pinho --- README.md | 6 ++ pkg/collector/json_path_collector.go | 99 +++++++++++++++++++- pkg/collector/json_path_collector_test.go | 107 ++++++++++++++++++++++ 3 files changed, 208 insertions(+), 4 deletions(-) create mode 100644 pkg/collector/json_path_collector_test.go diff --git a/README.md b/README.md index 275bc68..7856327 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,7 @@ metadata: metric-config.pods.requests-per-second.json-path/path: /metrics metric-config.pods.requests-per-second.json-path/port: "9090" metric-config.pods.requests-per-second.json-path/scheme: "https" + metric-config.pods.requests-per-second.json-path/aggregator: "max" spec: scaleTargetRef: apiVersion: apps/v1 @@ -143,6 +144,11 @@ The other configuration options `path`, `port` and `scheme` specify where the me endpoint is exposed on the pod. The `path` and `port` options do not have default values so they must be defined. The `scheme` is optional and defaults to `http`. +The `aggregator` configuration option specifies the aggregation function used to aggregate +values of JSONPath expressions that evaluate to arrays/slices of numbers. +It's optional but when the expression evaluates to an array/slice, it's absence will +produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`. + ## Prometheus collector The Prometheus collector is a generic collector which can map Prometheus diff --git a/pkg/collector/json_path_collector.go b/pkg/collector/json_path_collector.go index d7164c4..e6e818f 100644 --- a/pkg/collector/json_path_collector.go +++ b/pkg/collector/json_path_collector.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "net/http" "net/url" "strconv" @@ -17,10 +18,11 @@ import ( // querying the pods metrics endpoint and lookup the metric value as defined by // the json path query. type JSONPathMetricsGetter struct { - jsonPath *jsonpath.Compiled - scheme string - path string - port int + jsonPath *jsonpath.Compiled + scheme string + path string + port int + aggregator string } // NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter. @@ -52,6 +54,10 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, getter.port = n } + if v, ok := config["aggregator"]; ok { + getter.aggregator = v + } + return getter, nil } @@ -83,6 +89,12 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) { return float64(res), nil case float64: return res, nil + case []int: + return reduce(intsToFloat64s(res), g.aggregator) + case []float32: + return reduce(float32sToFloat64s(res), g.aggregator) + case []float64: + return reduce(res, g.aggregator) default: return 0, fmt.Errorf("unsupported type %T", res) } @@ -131,3 +143,82 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro return data, nil } + +// intsToFloat64s will convert a slice of int to a slice of float64 +func intsToFloat64s(in []int) (out []float64) { + out = []float64{} + for _, v := range in { + out = append(out, float64(v)) + } + return +} + +// float32sToFloat64s will convert a slice of float32 to a slice of float64 +func float32sToFloat64s(in []float32) (out []float64) { + out = []float64{} + for _, v := range in { + out = append(out, float64(v)) + } + return +} + +// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned. +func reduce(values []float64, aggregator string) (float64, error) { + switch aggregator { + case "avg": + return avg(values), nil + case "min": + return min(values), nil + case "max": + return max(values), nil + case "sum": + return sum(values), nil + default: + return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator) + } +} + +// avg implements the average mathematical function over a slice of float64 +func avg(values []float64) float64 { + sum := sum(values) + return sum / float64(len(values)) +} + +// min implements the absolute minimum mathematical function over a slice of float64 +func min(values []float64) float64 { + // initialized with positive infinity, all finite numbers are smaller than it + curMin := math.Inf(1) + + for _, v := range values { + if v < curMin { + curMin = v + } + } + + return curMin +} + +// max implements the absolute maximum mathematical function over a slice of float64 +func max(values []float64) float64 { + // initialized with negative infinity, all finite numbers are bigger than it + curMax := math.Inf(-1) + + for _, v := range values { + if v > curMax { + curMax = v + } + } + + return curMax +} + +// sum implements the summation mathematical function over a slice of float64 +func sum(values []float64) float64 { + res := 0.0 + + for _, v := range values { + res += v + } + + return res +} diff --git a/pkg/collector/json_path_collector_test.go b/pkg/collector/json_path_collector_test.go new file mode 100644 index 0000000..994694e --- /dev/null +++ b/pkg/collector/json_path_collector_test.go @@ -0,0 +1,107 @@ +package collector + +import ( + "testing" + + "github.com/oliveagle/jsonpath" + "github.com/stretchr/testify/require" +) + +func TestNewJSONPathMetricsGetter(t *testing.T) { + configNoAggregator := map[string]string{ + "json-key": "$.value", + "scheme": "http", + "path": "/metrics", + "port": "9090", + } + jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"]) + getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator) + + require.NoError(t, err1) + require.Equal(t, &JSONPathMetricsGetter{ + jsonPath: jpath1, + scheme: "http", + path: "/metrics", + port: 9090, + }, getterNoAggregator) + + configAggregator := map[string]string{ + "json-key": "$.values", + "scheme": "http", + "path": "/metrics", + "port": "9090", + "aggregator": "avg", + } + jpath2, _ := jsonpath.Compile(configAggregator["json-key"]) + getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator) + + require.NoError(t, err2) + require.Equal(t, &JSONPathMetricsGetter{ + jsonPath: jpath2, + scheme: "http", + path: "/metrics", + port: 9090, + aggregator: "avg", + }, getterAggregator) + + configErrorJSONPath := map[string]string{ + "json-key": "{}", + "scheme": "http", + "path": "/metrics", + "port": "9090", + } + + _, err3 := NewJSONPathMetricsGetter(configErrorJSONPath) + require.Error(t, err3) + + configErrorPort := map[string]string{ + "json-key": "$.values", + "scheme": "http", + "path": "/metrics", + "port": "a9090", + } + + _, err4 := NewJSONPathMetricsGetter(configErrorPort) + require.Error(t, err4) +} + +func TestIntsToFloat64s(t *testing.T) { + noInts := []int{} + noFloat64s := intsToFloat64s(noInts) + require.Equal(t, []float64{}, noFloat64s) + + someInts := []int{1, 2, 3} + someFloat64s := intsToFloat64s(someInts) + require.Equal(t, []float64{1.0, 2.0, 3.0}, someFloat64s) +} + +func TestFloat32sToFloat64s(t *testing.T) { + noFloat32s := []float32{} + noFloat64s := float32sToFloat64s(noFloat32s) + require.Equal(t, []float64{}, noFloat64s) + + someFloat32s := []float32{1.0, 2.0, 3.0} + someFloat64s := float32sToFloat64s(someFloat32s) + require.Equal(t, []float64{1.0, 2.0, 3.0}, someFloat64s) +} + +func TestReduce(t *testing.T) { + average, err1 := reduce([]float64{1, 2, 3}, "avg") + require.NoError(t, err1) + require.Equal(t, 2.0, average) + + min, err2 := reduce([]float64{1, 2, 3}, "min") + require.NoError(t, err2) + require.Equal(t, 1.0, min) + + max, err3 := reduce([]float64{1, 2, 3}, "max") + require.NoError(t, err3) + require.Equal(t, 3.0, max) + + sum, err4 := reduce([]float64{1, 2, 3}, "sum") + require.NoError(t, err4) + require.Equal(t, 6.0, sum) + + _, err5 := reduce([]float64{1, 2, 3}, "inexistent_function") + require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function") +}