mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2024-12-22 11:06:04 +00:00
feat(collector): add InfluxDB collector
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
This commit is contained in:
parent
4412e3dca4
commit
5b55bea994
65
README.md
65
README.md
@ -337,6 +337,71 @@ instead of a total sum.
|
||||
`<1.14`, it is not as percise as using `averageValue` and will not be supported
|
||||
after Kubernetes `v1.16` is released according to the [support policy](https://kubernetes.io/docs/setup/release/version-skew-policy/).**
|
||||
|
||||
## InfluxDB collector
|
||||
|
||||
The InfluxDB collector maps [Flux](https://github.com/influxdata/flux) queries to metrics that can be used for scaling.
|
||||
|
||||
Note that the collector targets an [InfluxDB v2](https://v2.docs.influxdata.com/v2.0/get-started/) instance, that's why
|
||||
we only support Flux instead of InfluxQL.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type | Kind | K8s Versions |
|
||||
| ------------ | -------------- | ------- | -- | -- |
|
||||
| `flux-query` | Generic metric which requires a user defined query. | External | | `>=1.10` |
|
||||
|
||||
### Example: External Metric
|
||||
|
||||
This is an example of an HPA configured to get metrics based on a Flux query.
|
||||
The query is defined in the annotation
|
||||
`metric-config.external.flux-query.influxdb/queue_depth`
|
||||
where `queue_depth` is the query name which will be associated with the result of the query.
|
||||
A matching `query-name` label must be defined in the `matchLabels` of the metric definition.
|
||||
This allows having multiple flux queries associated with a single HPA.
|
||||
|
||||
```yaml
|
||||
apiVersion: autoscaling/v2beta1
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: my-hpa
|
||||
annotations:
|
||||
# These annotations are optional.
|
||||
# If specified, then they are used for setting up the InfluxDB client properly,
|
||||
# instead of using the ones specified via CLI. Respectively:
|
||||
# - --influxdb-address
|
||||
# - --influxdb-token
|
||||
# - --influxdb-org-id
|
||||
metric-config.external.flux-query.influxdb/address: "http://influxdbv2.my-namespace.svc"
|
||||
metric-config.external.flux-query.influxdb/token: "secret-token"
|
||||
metric-config.external.flux-query.influxdb/org-id: "deadbeef"
|
||||
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
|
||||
# <configKey> == query-name
|
||||
metric-config.external.flux-query.influxdb/queue_depth: |
|
||||
from(bucket: "apps")
|
||||
|> range(start: -30s)
|
||||
|> filter(fn: (r) => r._measurement == "queue_depth")
|
||||
|> group()
|
||||
|> max()
|
||||
// Rename "_value" to "metricvalue" for letting the metrics server properly unmarshal the result.
|
||||
|> rename(columns: {_value: "metricvalue"})
|
||||
|> keep(columns: ["metricvalue"])
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
name: queryd-v1
|
||||
minReplicas: 1
|
||||
maxReplicas: 4
|
||||
metrics:
|
||||
- type: External
|
||||
external:
|
||||
metricName: flux-query
|
||||
metricSelector:
|
||||
matchLabels:
|
||||
query-name: queue_depth
|
||||
targetValue: 1
|
||||
```
|
||||
|
||||
## AWS collector
|
||||
|
||||
The AWS collector allows scaling based on external metrics exposed by AWS
|
||||
|
3
go.mod
3
go.mod
@ -7,8 +7,7 @@ require (
|
||||
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
|
||||
github.com/googleapis/gnostic v0.2.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
|
||||
github.com/imdario/mergo v0.3.6 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/influxdata/influxdb-client-go v0.1.4
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
|
||||
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
||||
github.com/prometheus/client_golang v0.9.2
|
||||
|
@ -65,6 +65,27 @@ func TestParser(t *testing.T) {
|
||||
},
|
||||
PerReplica: false,
|
||||
},
|
||||
{
|
||||
Name: "influxdb metrics",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.external.flux-query.influxdb/range1m": `from(bucket: "?") |> range(start: -1m)`,
|
||||
"metric-config.external.flux-query.influxdb/range2m": `from(bucket: "?") |> range(start: -2m)`,
|
||||
"metric-config.external.flux-query.influxdb/range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||
"metric-config.external.flux-query.influxdb/address": "http://localhost:9999",
|
||||
"metric-config.external.flux-query.influxdb/token": "sEcr3TT0ken",
|
||||
"metric-config.external.flux-query.influxdb/org-id": "deadbeef",
|
||||
},
|
||||
MetricName: "flux-query",
|
||||
MetricType: autoscalingv2.ExternalMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"range1m": `from(bucket: "?") |> range(start: -1m)`,
|
||||
"range2m": `from(bucket: "?") |> range(start: -2m)`,
|
||||
"range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||
"address": "http://localhost:9999",
|
||||
"token": "sEcr3TT0ken",
|
||||
"org-id": "deadbeef",
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
hpaMap := make(AnnotationConfigMap)
|
||||
|
152
pkg/collector/influxdb_collector.go
Normal file
152
pkg/collector/influxdb_collector.go
Normal file
@ -0,0 +1,152 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb-client-go"
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
InfluxDBMetricName = "flux-query"
|
||||
influxDBAddressKey = "address"
|
||||
influxDBTokenKey = "token"
|
||||
influxDBOrgIDKey = "org-id"
|
||||
influxDBQueryNameLabelKey = "query-name"
|
||||
)
|
||||
|
||||
type InfluxDBCollectorPlugin struct {
|
||||
kubeClient kubernetes.Interface
|
||||
address string
|
||||
token string
|
||||
orgID string
|
||||
}
|
||||
|
||||
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, orgID string) (*InfluxDBCollectorPlugin, error) {
|
||||
return &InfluxDBCollectorPlugin{
|
||||
kubeClient: client,
|
||||
address: address,
|
||||
token: token,
|
||||
orgID: orgID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewInfluxDBCollector(p.address, p.orgID, p.token, hpa, config, interval)
|
||||
}
|
||||
|
||||
type InfluxDBCollector struct {
|
||||
influxDBClient *influxdb.Client
|
||||
hpa *autoscalingv2.HorizontalPodAutoscaler
|
||||
interval time.Duration
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
query string
|
||||
orgID string
|
||||
}
|
||||
|
||||
func NewInfluxDBCollector(address string, token string, orgID string, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
|
||||
collector := &InfluxDBCollector{
|
||||
hpa: hpa,
|
||||
interval: interval,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
}
|
||||
switch configType := config.Type; configType {
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
// `metricSelector` is flattened into the MetricConfig.Config.
|
||||
queryName, ok := config.Config[influxDBQueryNameLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("selector for Flux query is not specified,"+
|
||||
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
|
||||
}
|
||||
if query, ok := config.Config[queryName]; ok {
|
||||
// TODO(affo): validate the query.
|
||||
// This would mean depending on Flux and run a compilation.
|
||||
// Otherwise, the InfluxDB client should expose a method for compilation only.
|
||||
// Yeah, we could launch the query and if there is an error, check it was a compilation one
|
||||
// (by checking the message), but that seems quite dirty.
|
||||
collector.query = query
|
||||
} else {
|
||||
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown metric type: %v", configType)
|
||||
}
|
||||
// Use custom InfluxDB config if defined in HPA annotation.
|
||||
if v, found := config.Config[influxDBAddressKey]; found {
|
||||
address = v
|
||||
}
|
||||
if v, found := config.Config[influxDBTokenKey]; found {
|
||||
token = v
|
||||
}
|
||||
if v, found := config.Config[influxDBOrgIDKey]; found {
|
||||
orgID = v
|
||||
}
|
||||
influxDbClient, err := influxdb.New(address, token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
collector.influxDBClient = influxDbClient
|
||||
collector.orgID = orgID
|
||||
return collector, nil
|
||||
}
|
||||
|
||||
// queryResult is for unmarshaling the result from InfluxDB.
|
||||
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
|
||||
type queryResult struct {
|
||||
MetricValue float64
|
||||
}
|
||||
|
||||
// getValue returns the first result gathered from an InfluxDB instance.
|
||||
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
|
||||
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.orgID)
|
||||
if err != nil {
|
||||
return resource.Quantity{}, err
|
||||
}
|
||||
defer res.Close()
|
||||
// Keeping just the first result.
|
||||
if res.Next() {
|
||||
qr := queryResult{}
|
||||
if err := res.Unmarshal(&qr); err != nil {
|
||||
return resource.Quantity{}, fmt.Errorf("error in unmarshaling query result: %v", err)
|
||||
}
|
||||
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
|
||||
}
|
||||
if err := res.Err; err != nil {
|
||||
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
|
||||
}
|
||||
return resource.Quantity{}, fmt.Errorf("empty result returned")
|
||||
}
|
||||
|
||||
func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
v, err := c.getValue()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cm := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Value: v,
|
||||
},
|
||||
}
|
||||
return []CollectedMetric{cm}, nil
|
||||
}
|
||||
|
||||
func (c *InfluxDBCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
@ -88,6 +88,12 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
"whether to enable External Metrics API")
|
||||
flags.StringVar(&o.PrometheusServer, "prometheus-server", o.PrometheusServer, ""+
|
||||
"url of prometheus server to query")
|
||||
flags.StringVar(&o.InfluxDBAddress, "influxdb-address", o.InfluxDBAddress, ""+
|
||||
"address of InfluxDB 2.x server to query (e.g. http://localhost:9999)")
|
||||
flags.StringVar(&o.InfluxDBToken, "influxdb-token", o.InfluxDBToken, ""+
|
||||
"token for InfluxDB 2.x server to query")
|
||||
flags.StringVar(&o.InfluxDBOrgID, "influxdb-org-id", o.InfluxDBOrgID, ""+
|
||||
"organization ID for InfluxDB 2.x server to query")
|
||||
flags.StringVar(&o.ZMONKariosDBEndpoint, "zmon-kariosdb-endpoint", o.ZMONKariosDBEndpoint, ""+
|
||||
"url of ZMON KariosDB endpoint to query for ZMON checks")
|
||||
flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+
|
||||
@ -175,6 +181,14 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
}
|
||||
}
|
||||
|
||||
if o.InfluxDBAddress != "" {
|
||||
influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrgID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize InfluxDB collector plugin: %v", err)
|
||||
}
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
|
||||
}
|
||||
|
||||
// register generic pod collector
|
||||
err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
|
||||
if err != nil {
|
||||
@ -290,6 +304,12 @@ type AdapterServerOptions struct {
|
||||
// PrometheusServer enables prometheus queries to the specified
|
||||
// server
|
||||
PrometheusServer string
|
||||
// InfluxDBAddress enables Flux queries to the specified InfluxDB instance
|
||||
InfluxDBAddress string
|
||||
// InfluxDBToken is the token used for querying InfluxDB
|
||||
InfluxDBToken string
|
||||
// InfluxDBOrgID is the organization ID used for querying InfluxDB
|
||||
InfluxDBOrgID string
|
||||
// ZMONKariosDBEndpoint enables ZMON check queries to the specified
|
||||
// kariosDB endpoint
|
||||
ZMONKariosDBEndpoint string
|
||||
|
Loading…
x
Reference in New Issue
Block a user