mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-01-03 07:40:09 +00:00
Support for JSONPath expressions that return arrays of values (#85)
* This is the initial implementation of support for JSONPath expressions that return arrays of values instead of a single value. This extends the collector to define a few handy reducer functions that take in the slice of float64 and return a single value. It also allows the user to define which reducer function to use via the "metric-config.<metricType>.<metricName>.json-path/reducer-func" annotation, which can have the values of 'avg', 'min', 'max' and 'sum'. For instance, the Ruby puma webserver exposes metrics of the form of $.worker_status[*].last_status.pool_capacity that have to be consumed as an array of values to be properly targetted. Signed-off-by: Tomás Pinho <me@tomaspinho.com> * Renames "reducerFunc" to "aggregator" for consistency with other collectors. Renames the annotation from "metric-config.<metricType>.<metricName>.json-path/reducer-func" to "metric-config.<metricType>.<metricName>.json-path/aggregator". Signed-off-by: Tomás Pinho <me@tomaspinho.com> * Return error instead of defaulting to the avg aggregator, when no valid aggregator name was specified and the JSONPath value is a slice of numbers. Signed-off-by: Tomás Pinho <me@tomaspinho.com> * Fix index out of range on initialized output slice that was found while writing tests. Signed-off-by: Tomás Pinho <me@tomaspinho.com> * Add tests for all added functions + NewJSONPathMetricsGetter Signed-off-by: Tomás Pinho <me@tomaspinho.com> * Add documentation on the `aggregator` option. Signed-off-by: Tomás Pinho <me@tomaspinho.com> * reducer function -> aggregator function Signed-off-by: Tomás Pinho <me@tomaspinho.com> * Fix comment to account for returned error. Signed-off-by: Tomás Pinho <me@tomaspinho.com>
This commit is contained in:
Tomás Pinho
committed by
Sandor Szücs
parent
7d5e719eb0
commit
f6b2aede5b
@ -106,6 +106,7 @@ metadata:
|
|||||||
metric-config.pods.requests-per-second.json-path/path: /metrics
|
metric-config.pods.requests-per-second.json-path/path: /metrics
|
||||||
metric-config.pods.requests-per-second.json-path/port: "9090"
|
metric-config.pods.requests-per-second.json-path/port: "9090"
|
||||||
metric-config.pods.requests-per-second.json-path/scheme: "https"
|
metric-config.pods.requests-per-second.json-path/scheme: "https"
|
||||||
|
metric-config.pods.requests-per-second.json-path/aggregator: "max"
|
||||||
spec:
|
spec:
|
||||||
scaleTargetRef:
|
scaleTargetRef:
|
||||||
apiVersion: apps/v1
|
apiVersion: apps/v1
|
||||||
@ -143,6 +144,11 @@ The other configuration options `path`, `port` and `scheme` specify where the me
|
|||||||
endpoint is exposed on the pod. The `path` and `port` options do not have default values
|
endpoint is exposed on the pod. The `path` and `port` options do not have default values
|
||||||
so they must be defined. The `scheme` is optional and defaults to `http`.
|
so they must be defined. The `scheme` is optional and defaults to `http`.
|
||||||
|
|
||||||
|
The `aggregator` configuration option specifies the aggregation function used to aggregate
|
||||||
|
values of JSONPath expressions that evaluate to arrays/slices of numbers.
|
||||||
|
It's optional but when the expression evaluates to an array/slice, it's absence will
|
||||||
|
produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`.
|
||||||
|
|
||||||
## Prometheus collector
|
## Prometheus collector
|
||||||
|
|
||||||
The Prometheus collector is a generic collector which can map Prometheus
|
The Prometheus collector is a generic collector which can map Prometheus
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -17,10 +18,11 @@ import (
|
|||||||
// querying the pods metrics endpoint and lookup the metric value as defined by
|
// querying the pods metrics endpoint and lookup the metric value as defined by
|
||||||
// the json path query.
|
// the json path query.
|
||||||
type JSONPathMetricsGetter struct {
|
type JSONPathMetricsGetter struct {
|
||||||
jsonPath *jsonpath.Compiled
|
jsonPath *jsonpath.Compiled
|
||||||
scheme string
|
scheme string
|
||||||
path string
|
path string
|
||||||
port int
|
port int
|
||||||
|
aggregator string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||||
@ -52,6 +54,10 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
|
|||||||
getter.port = n
|
getter.port = n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if v, ok := config["aggregator"]; ok {
|
||||||
|
getter.aggregator = v
|
||||||
|
}
|
||||||
|
|
||||||
return getter, nil
|
return getter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,6 +89,12 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
|||||||
return float64(res), nil
|
return float64(res), nil
|
||||||
case float64:
|
case float64:
|
||||||
return res, nil
|
return res, nil
|
||||||
|
case []int:
|
||||||
|
return reduce(intsToFloat64s(res), g.aggregator)
|
||||||
|
case []float32:
|
||||||
|
return reduce(float32sToFloat64s(res), g.aggregator)
|
||||||
|
case []float64:
|
||||||
|
return reduce(res, g.aggregator)
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("unsupported type %T", res)
|
return 0, fmt.Errorf("unsupported type %T", res)
|
||||||
}
|
}
|
||||||
@ -131,3 +143,82 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
|
|||||||
|
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// intsToFloat64s will convert a slice of int to a slice of float64
|
||||||
|
func intsToFloat64s(in []int) (out []float64) {
|
||||||
|
out = []float64{}
|
||||||
|
for _, v := range in {
|
||||||
|
out = append(out, float64(v))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// float32sToFloat64s will convert a slice of float32 to a slice of float64
|
||||||
|
func float32sToFloat64s(in []float32) (out []float64) {
|
||||||
|
out = []float64{}
|
||||||
|
for _, v := range in {
|
||||||
|
out = append(out, float64(v))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
|
||||||
|
func reduce(values []float64, aggregator string) (float64, error) {
|
||||||
|
switch aggregator {
|
||||||
|
case "avg":
|
||||||
|
return avg(values), nil
|
||||||
|
case "min":
|
||||||
|
return min(values), nil
|
||||||
|
case "max":
|
||||||
|
return max(values), nil
|
||||||
|
case "sum":
|
||||||
|
return sum(values), nil
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// avg implements the average mathematical function over a slice of float64
|
||||||
|
func avg(values []float64) float64 {
|
||||||
|
sum := sum(values)
|
||||||
|
return sum / float64(len(values))
|
||||||
|
}
|
||||||
|
|
||||||
|
// min implements the absolute minimum mathematical function over a slice of float64
|
||||||
|
func min(values []float64) float64 {
|
||||||
|
// initialized with positive infinity, all finite numbers are smaller than it
|
||||||
|
curMin := math.Inf(1)
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
if v < curMin {
|
||||||
|
curMin = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return curMin
|
||||||
|
}
|
||||||
|
|
||||||
|
// max implements the absolute maximum mathematical function over a slice of float64
|
||||||
|
func max(values []float64) float64 {
|
||||||
|
// initialized with negative infinity, all finite numbers are bigger than it
|
||||||
|
curMax := math.Inf(-1)
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
if v > curMax {
|
||||||
|
curMax = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return curMax
|
||||||
|
}
|
||||||
|
|
||||||
|
// sum implements the summation mathematical function over a slice of float64
|
||||||
|
func sum(values []float64) float64 {
|
||||||
|
res := 0.0
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
res += v
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
107
pkg/collector/json_path_collector_test.go
Normal file
107
pkg/collector/json_path_collector_test.go
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
package collector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/oliveagle/jsonpath"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewJSONPathMetricsGetter(t *testing.T) {
|
||||||
|
configNoAggregator := map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "9090",
|
||||||
|
}
|
||||||
|
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
|
||||||
|
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
|
||||||
|
|
||||||
|
require.NoError(t, err1)
|
||||||
|
require.Equal(t, &JSONPathMetricsGetter{
|
||||||
|
jsonPath: jpath1,
|
||||||
|
scheme: "http",
|
||||||
|
path: "/metrics",
|
||||||
|
port: 9090,
|
||||||
|
}, getterNoAggregator)
|
||||||
|
|
||||||
|
configAggregator := map[string]string{
|
||||||
|
"json-key": "$.values",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "9090",
|
||||||
|
"aggregator": "avg",
|
||||||
|
}
|
||||||
|
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
|
||||||
|
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
|
||||||
|
|
||||||
|
require.NoError(t, err2)
|
||||||
|
require.Equal(t, &JSONPathMetricsGetter{
|
||||||
|
jsonPath: jpath2,
|
||||||
|
scheme: "http",
|
||||||
|
path: "/metrics",
|
||||||
|
port: 9090,
|
||||||
|
aggregator: "avg",
|
||||||
|
}, getterAggregator)
|
||||||
|
|
||||||
|
configErrorJSONPath := map[string]string{
|
||||||
|
"json-key": "{}",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "9090",
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
|
||||||
|
require.Error(t, err3)
|
||||||
|
|
||||||
|
configErrorPort := map[string]string{
|
||||||
|
"json-key": "$.values",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "a9090",
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
|
||||||
|
require.Error(t, err4)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIntsToFloat64s(t *testing.T) {
|
||||||
|
noInts := []int{}
|
||||||
|
noFloat64s := intsToFloat64s(noInts)
|
||||||
|
require.Equal(t, []float64{}, noFloat64s)
|
||||||
|
|
||||||
|
someInts := []int{1, 2, 3}
|
||||||
|
someFloat64s := intsToFloat64s(someInts)
|
||||||
|
require.Equal(t, []float64{1.0, 2.0, 3.0}, someFloat64s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFloat32sToFloat64s(t *testing.T) {
|
||||||
|
noFloat32s := []float32{}
|
||||||
|
noFloat64s := float32sToFloat64s(noFloat32s)
|
||||||
|
require.Equal(t, []float64{}, noFloat64s)
|
||||||
|
|
||||||
|
someFloat32s := []float32{1.0, 2.0, 3.0}
|
||||||
|
someFloat64s := float32sToFloat64s(someFloat32s)
|
||||||
|
require.Equal(t, []float64{1.0, 2.0, 3.0}, someFloat64s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReduce(t *testing.T) {
|
||||||
|
average, err1 := reduce([]float64{1, 2, 3}, "avg")
|
||||||
|
require.NoError(t, err1)
|
||||||
|
require.Equal(t, 2.0, average)
|
||||||
|
|
||||||
|
min, err2 := reduce([]float64{1, 2, 3}, "min")
|
||||||
|
require.NoError(t, err2)
|
||||||
|
require.Equal(t, 1.0, min)
|
||||||
|
|
||||||
|
max, err3 := reduce([]float64{1, 2, 3}, "max")
|
||||||
|
require.NoError(t, err3)
|
||||||
|
require.Equal(t, 3.0, max)
|
||||||
|
|
||||||
|
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
|
||||||
|
require.NoError(t, err4)
|
||||||
|
require.Equal(t, 6.0, sum)
|
||||||
|
|
||||||
|
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
|
||||||
|
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
|
||||||
|
}
|
Reference in New Issue
Block a user