Compare commits

...

5 Commits

Author SHA1 Message Date
Johann Fuechsl
120950078c Fix #89 by copying the MatchLabels map instead of referencing it. (#90)
Signed-off-by: Johann Fuechsl <johann@fuechsl.co>
2019-11-07 14:38:26 +01:00
Tomás Pinho
0790bc351a This fixes an issue with the type switch that was never able to fall (#88)
into cases of []<number>, being <number> a number type such as int,
float32, float64. This is because Go can't type cast slices of
interface{} out right because it's impossible to know the true types of
the slice members beforehand.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2019-11-05 09:43:25 +01:00
Tomás Pinho
f6b2aede5b Support for JSONPath expressions that return arrays of values (#85)
* This is the initial implementation of support for JSONPath expressions
that return arrays of values instead of a single value.

This extends the
collector to define a few handy reducer functions that take in the slice
of float64 and return a single value. It also allows the user to define
which reducer function to use via the
"metric-config.<metricType>.<metricName>.json-path/reducer-func"
annotation, which
can have the values of 'avg', 'min', 'max' and 'sum'.

For instance, the Ruby puma webserver exposes metrics of the form of $.worker_status[*].last_status.pool_capacity that have to be consumed as an array of values to be properly targetted.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Renames "reducerFunc" to "aggregator" for consistency with other
collectors. Renames the annotation from
"metric-config.<metricType>.<metricName>.json-path/reducer-func" to "metric-config.<metricType>.<metricName>.json-path/aggregator".

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Return error instead of defaulting to the avg aggregator, when no valid
aggregator name was specified and the JSONPath value is a slice of
numbers.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Fix index out of range on initialized output slice that was found while
writing tests.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add tests for all added functions + NewJSONPathMetricsGetter

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add documentation on the `aggregator` option.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* reducer function -> aggregator function

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Fix comment to account for returned error.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2019-10-24 18:15:10 +02:00
Mikkel Oscar Lyderik Larsen
7d5e719eb0 Merge pull request #86 from pinkavaj/fix-var-name
Fix variable name typo
2019-10-24 10:16:58 +02:00
Jiri Pinkava
7497a61a2c Fix variable name typo
Signed-off-by: Jiri Pinkava <jiri.pinkava@rossum.ai>
2019-10-24 09:45:22 +02:00
4 changed files with 214 additions and 7 deletions

View File

@ -106,6 +106,7 @@ metadata:
metric-config.pods.requests-per-second.json-path/path: /metrics
metric-config.pods.requests-per-second.json-path/port: "9090"
metric-config.pods.requests-per-second.json-path/scheme: "https"
metric-config.pods.requests-per-second.json-path/aggregator: "max"
spec:
scaleTargetRef:
apiVersion: apps/v1
@ -143,6 +144,11 @@ The other configuration options `path`, `port` and `scheme` specify where the me
endpoint is exposed on the pod. The `path` and `port` options do not have default values
so they must be defined. The `scheme` is optional and defaults to `http`.
The `aggregator` configuration option specifies the aggregation function used to aggregate
values of JSONPath expressions that evaluate to arrays/slices of numbers.
It's optional but when the expression evaluates to an array/slice, it's absence will
produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`.
## Prometheus collector
The Prometheus collector is a generic collector which can map Prometheus

View File

@ -213,7 +213,9 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
metric.External.Metric.Selector != nil &&
metric.External.Metric.Selector.MatchLabels != nil {
config.Config = metric.External.Metric.Selector.MatchLabels
for k, v := range metric.External.Metric.Selector.MatchLabels {
config.Config[k] = v
}
}
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/url"
"strconv"
@ -17,10 +18,11 @@ import (
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
scheme string
path string
port int
jsonPath *jsonpath.Compiled
scheme string
path string
port int
aggregator string
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
@ -28,12 +30,12 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
getter := &JSONPathMetricsGetter{}
if v, ok := config["json-key"]; ok {
pat, err := jsonpath.Compile(v)
path, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}
getter.jsonPath = pat
getter.jsonPath = path
}
if v, ok := config["scheme"]; ok {
@ -52,6 +54,10 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
getter.port = n
}
if v, ok := config["aggregator"]; ok {
getter.aggregator = v
}
return getter, nil
}
@ -83,11 +89,38 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
return float64(res), nil
case float64:
return res, nil
case []interface{}:
s, err := castSlice(res)
if err != nil {
return 0, err
}
return reduce(s, g.aggregator)
default:
return 0, fmt.Errorf("unsupported type %T", res)
}
}
// castSlice takes a slice of interface and returns a slice of float64 if all
// values in slice were castable, else returns an error
func castSlice(in []interface{}) ([]float64, error) {
out := []float64{}
for _, v := range in {
switch v := v.(type) {
case int:
out = append(out, float64(v))
case float32:
out = append(out, float64(v))
case float64:
out = append(out, v)
default:
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
}
}
return out, nil
}
// getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
@ -131,3 +164,64 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
return data, nil
}
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
func reduce(values []float64, aggregator string) (float64, error) {
switch aggregator {
case "avg":
return avg(values), nil
case "min":
return min(values), nil
case "max":
return max(values), nil
case "sum":
return sum(values), nil
default:
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
}
}
// avg implements the average mathematical function over a slice of float64
func avg(values []float64) float64 {
sum := sum(values)
return sum / float64(len(values))
}
// min implements the absolute minimum mathematical function over a slice of float64
func min(values []float64) float64 {
// initialized with positive infinity, all finite numbers are smaller than it
curMin := math.Inf(1)
for _, v := range values {
if v < curMin {
curMin = v
}
}
return curMin
}
// max implements the absolute maximum mathematical function over a slice of float64
func max(values []float64) float64 {
// initialized with negative infinity, all finite numbers are bigger than it
curMax := math.Inf(-1)
for _, v := range values {
if v > curMax {
curMax = v
}
}
return curMax
}
// sum implements the summation mathematical function over a slice of float64
func sum(values []float64) float64 {
res := 0.0
for _, v := range values {
res += v
}
return res
}

View File

@ -0,0 +1,105 @@
package collector
import (
"testing"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func TestNewJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
require.NoError(t, err1)
require.Equal(t, &JSONPathMetricsGetter{
jsonPath: jpath1,
scheme: "http",
path: "/metrics",
port: 9090,
}, getterNoAggregator)
configAggregator := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"aggregator": "avg",
}
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
require.NoError(t, err2)
require.Equal(t, &JSONPathMetricsGetter{
jsonPath: jpath2,
scheme: "http",
path: "/metrics",
port: 9090,
aggregator: "avg",
}, getterAggregator)
configErrorJSONPath := map[string]string{
"json-key": "{}",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
require.Error(t, err3)
configErrorPort := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "a9090",
}
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
require.Error(t, err4)
}
func TestCastSlice(t *testing.T) {
res1, err1 := castSlice([]interface{}{1, 2, 3})
require.NoError(t, err1)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
require.NoError(t, err2)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
require.NoError(t, err3)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
require.Equal(t, []float64(nil), res4)
}
func TestReduce(t *testing.T) {
average, err1 := reduce([]float64{1, 2, 3}, "avg")
require.NoError(t, err1)
require.Equal(t, 2.0, average)
min, err2 := reduce([]float64{1, 2, 3}, "min")
require.NoError(t, err2)
require.Equal(t, 1.0, min)
max, err3 := reduce([]float64{1, 2, 3}, "max")
require.NoError(t, err3)
require.Equal(t, 3.0, max)
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
require.NoError(t, err4)
require.Equal(t, 6.0, sum)
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
}