Merge pull request #100 from affo/feat/influxdb-collector

feat(collector): add InfluxDB collector
This commit is contained in:
Mikkel Oscar Lyderik Larsen
2020-01-24 10:56:52 +01:00
committed by GitHub
7 changed files with 546 additions and 2 deletions

View File

@@ -337,6 +337,71 @@ instead of a total sum.
`<1.14`, it is not as percise as using `averageValue` and will not be supported
after Kubernetes `v1.16` is released according to the [support policy](https://kubernetes.io/docs/setup/release/version-skew-policy/).**
## InfluxDB collector
The InfluxDB collector maps [Flux](https://github.com/influxdata/flux) queries to metrics that can be used for scaling.
Note that the collector targets an [InfluxDB v2](https://v2.docs.influxdata.com/v2.0/get-started/) instance, that's why
we only support Flux instead of InfluxQL.
### Supported metrics
| Metric | Description | Type | Kind | K8s Versions |
| ------------ | -------------- | ------- | -- | -- |
| `flux-query` | Generic metric which requires a user defined query. | External | | `>=1.10` |
### Example: External Metric
This is an example of an HPA configured to get metrics based on a Flux query.
The query is defined in the annotation
`metric-config.external.flux-query.influxdb/queue_depth`
where `queue_depth` is the query name which will be associated with the result of the query.
A matching `query-name` label must be defined in the `matchLabels` of the metric definition.
This allows having multiple flux queries associated with a single HPA.
```yaml
apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
name: my-hpa
annotations:
# These annotations are optional.
# If specified, then they are used for setting up the InfluxDB client properly,
# instead of using the ones specified via CLI. Respectively:
# - --influxdb-address
# - --influxdb-token
# - --influxdb-org-id
metric-config.external.flux-query.influxdb/address: "http://influxdbv2.my-namespace.svc"
metric-config.external.flux-query.influxdb/token: "secret-token"
metric-config.external.flux-query.influxdb/org-id: "deadbeef"
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
# <configKey> == query-name
metric-config.external.flux-query.influxdb/queue_depth: |
from(bucket: "apps")
|> range(start: -30s)
|> filter(fn: (r) => r._measurement == "queue_depth")
|> group()
|> max()
// Rename "_value" to "metricvalue" for letting the metrics server properly unmarshal the result.
|> rename(columns: {_value: "metricvalue"})
|> keep(columns: ["metricvalue"])
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: queryd-v1
minReplicas: 1
maxReplicas: 4
metrics:
- type: External
external:
metricName: flux-query
metricSelector:
matchLabels:
query-name: queue_depth
targetValue: 1
```
## AWS collector
The AWS collector allows scaling based on external metrics exposed by AWS

3
go.mod
View File

@@ -7,8 +7,7 @@ require (
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/influxdb-client-go v0.1.4
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/prometheus/client_golang v0.9.2

132
go.sum
View File

File diff suppressed because it is too large Load Diff

View File

@@ -65,6 +65,27 @@ func TestParser(t *testing.T) {
},
PerReplica: false,
},
{
Name: "influxdb metrics",
Annotations: map[string]string{
"metric-config.external.flux-query.influxdb/range1m": `from(bucket: "?") |> range(start: -1m)`,
"metric-config.external.flux-query.influxdb/range2m": `from(bucket: "?") |> range(start: -2m)`,
"metric-config.external.flux-query.influxdb/range3m": `from(bucket: "?") |> range(start: -3m)`,
"metric-config.external.flux-query.influxdb/address": "http://localhost:9999",
"metric-config.external.flux-query.influxdb/token": "sEcr3TT0ken",
"metric-config.external.flux-query.influxdb/org-id": "deadbeef",
},
MetricName: "flux-query",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef",
},
},
} {
t.Run(tc.Name, func(t *testing.T) {
hpaMap := make(AnnotationConfigMap)

View File

@@ -0,0 +1,152 @@
package collector
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb-client-go"
"k8s.io/api/autoscaling/v2beta2"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
InfluxDBMetricName = "flux-query"
influxDBAddressKey = "address"
influxDBTokenKey = "token"
influxDBOrgIDKey = "org-id"
influxDBQueryNameLabelKey = "query-name"
)
type InfluxDBCollectorPlugin struct {
kubeClient kubernetes.Interface
address string
token string
orgID string
}
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, orgID string) (*InfluxDBCollectorPlugin, error) {
return &InfluxDBCollectorPlugin{
kubeClient: client,
address: address,
token: token,
orgID: orgID,
}, nil
}
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(p.address, p.orgID, p.token, config, interval)
}
type InfluxDBCollector struct {
address string
token string
orgID string
influxDBClient *influxdb.Client
interval time.Duration
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
query string
}
func NewInfluxDBCollector(address string, token string, orgID string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
collector := &InfluxDBCollector{
interval: interval,
metric: config.Metric,
metricType: config.Type,
}
switch configType := config.Type; configType {
case autoscalingv2.ObjectMetricSourceType:
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
case autoscalingv2.ExternalMetricSourceType:
// `metricSelector` is flattened into the MetricConfig.Config.
queryName, ok := config.Config[influxDBQueryNameLabelKey]
if !ok {
return nil, fmt.Errorf("selector for Flux query is not specified, "+
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
}
if query, ok := config.Config[queryName]; ok {
// TODO(affo): validate the query once this is done:
// https://github.com/influxdata/influxdb-client-go/issues/73.
collector.query = query
} else {
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
}
default:
return nil, fmt.Errorf("unknown metric type: %v", configType)
}
// Use custom InfluxDB config if defined in HPA annotation.
if v, ok := config.Config[influxDBAddressKey]; ok {
address = v
}
if v, ok := config.Config[influxDBTokenKey]; ok {
token = v
}
if v, ok := config.Config[influxDBOrgIDKey]; ok {
orgID = v
}
influxDbClient, err := influxdb.New(address, token)
if err != nil {
return nil, err
}
collector.address = address
collector.token = token
collector.orgID = orgID
collector.influxDBClient = influxDbClient
return collector, nil
}
// queryResult is for unmarshaling the result from InfluxDB.
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
type queryResult struct {
MetricValue float64
}
// getValue returns the first result gathered from an InfluxDB instance.
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.orgID)
if err != nil {
return resource.Quantity{}, err
}
defer res.Close()
// Keeping just the first result.
if res.Next() {
qr := queryResult{}
if err := res.Unmarshal(&qr); err != nil {
return resource.Quantity{}, fmt.Errorf("error in unmarshaling query result: %v", err)
}
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
}
if err := res.Err; err != nil {
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
}
return resource.Quantity{}, fmt.Errorf("empty result returned")
}
func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.getValue()
if err != nil {
return nil, err
}
cm := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{
Time: time.Now().UTC(),
},
Value: v,
},
}
return []CollectedMetric{cm}, nil
}
func (c *InfluxDBCollector) Interval() time.Duration {
return c.interval
}

View File

@@ -0,0 +1,155 @@
package collector
import (
"strings"
"testing"
"time"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestInfluxDBCollector_New(t *testing.T) {
t.Run("simple", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
// This is actually useless, because the selector should be flattened in Config when parsing.
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "range2m",
},
}
c, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "secret"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -2m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
t.Run("override params", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef1234",
"query-name": "range3m",
},
}
c, err := NewInfluxDBCollector("http://localhost:8888", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef1234"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "sEcr3TT0ken"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -3m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
// Errors.
for _, tc := range []struct {
name string
mTypeName MetricTypeName
config map[string]string
errorStartsWith string
}{
{
name: "object metric",
mTypeName: MetricTypeName{
Type: v2beta2.ObjectMetricSourceType,
},
errorStartsWith: "InfluxDB does not support object",
},
{
name: "no selector",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
// The selector should be flattened into the config by the parsing step, but it isn't.
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
},
errorStartsWith: "selector for Flux query is not specified",
},
{
name: "referencing non-existing query",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "rangeXm",
},
errorStartsWith: "no Flux query defined for metric",
},
} {
t.Run("error - "+tc.name, func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: tc.mTypeName,
CollectorName: "influxdb",
Config: tc.config,
}
_, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err == nil {
t.Fatal("expected error got none")
}
if want, got := tc.errorStartsWith, err.Error(); !strings.HasPrefix(got, want) {
t.Fatalf("%s should start with %s", got, want)
}
})
}
}

View File

@@ -88,6 +88,12 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"whether to enable External Metrics API")
flags.StringVar(&o.PrometheusServer, "prometheus-server", o.PrometheusServer, ""+
"url of prometheus server to query")
flags.StringVar(&o.InfluxDBAddress, "influxdb-address", o.InfluxDBAddress, ""+
"address of InfluxDB 2.x server to query (e.g. http://localhost:9999)")
flags.StringVar(&o.InfluxDBToken, "influxdb-token", o.InfluxDBToken, ""+
"token for InfluxDB 2.x server to query")
flags.StringVar(&o.InfluxDBOrgID, "influxdb-org-id", o.InfluxDBOrgID, ""+
"organization ID for InfluxDB 2.x server to query")
flags.StringVar(&o.ZMONKariosDBEndpoint, "zmon-kariosdb-endpoint", o.ZMONKariosDBEndpoint, ""+
"url of ZMON KariosDB endpoint to query for ZMON checks")
flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+
@@ -175,6 +181,14 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
}
}
if o.InfluxDBAddress != "" {
influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrgID)
if err != nil {
return fmt.Errorf("failed to initialize InfluxDB collector plugin: %v", err)
}
collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
}
// register generic pod collector
err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
if err != nil {
@@ -290,6 +304,12 @@ type AdapterServerOptions struct {
// PrometheusServer enables prometheus queries to the specified
// server
PrometheusServer string
// InfluxDBAddress enables Flux queries to the specified InfluxDB instance
InfluxDBAddress string
// InfluxDBToken is the token used for querying InfluxDB
InfluxDBToken string
// InfluxDBOrgID is the organization ID used for querying InfluxDB
InfluxDBOrgID string
// ZMONKariosDBEndpoint enables ZMON check queries to the specified
// kariosDB endpoint
ZMONKariosDBEndpoint string