kube-metrics-adapter/pkg/collector/influxdb_collector.go

151 lines
4.6 KiB
Go
Raw Permalink Normal View History

package collector
import (
"context"
"fmt"
"time"
influxdb "github.com/influxdata/influxdb-client-go"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
InfluxDBMetricType = "influxdb"
InfluxDBMetricNameLegacy = "flux-query"
influxDBAddressKey = "address"
influxDBTokenKey = "token"
influxDBOrgKey = "org"
influxDBQueryNameLabelKey = "query-name"
)
type InfluxDBCollectorPlugin struct {
kubeClient kubernetes.Interface
address string
token string
org string
}
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org string) (*InfluxDBCollectorPlugin, error) {
return &InfluxDBCollectorPlugin{
kubeClient: client,
address: address,
token: token,
org: org,
}, nil
}
func (p *InfluxDBCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(ctx, hpa, p.address, p.token, p.org, config, interval)
}
type InfluxDBCollector struct {
address string
token string
org string
influxDBClient influxdb.Client
interval time.Duration
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
query string
namespace string
}
func NewInfluxDBCollector(_ context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
collector := &InfluxDBCollector{
interval: interval,
metric: config.Metric,
metricType: config.Type,
namespace: hpa.Namespace,
}
switch configType := config.Type; configType {
case autoscalingv2.ObjectMetricSourceType:
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
case autoscalingv2.ExternalMetricSourceType:
// `metricSelector` is flattened into the MetricConfig.Config.
queryName, ok := config.Config[influxDBQueryNameLabelKey]
if !ok {
return nil, fmt.Errorf("selector for Flux query is not specified, "+
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
}
if query, ok := config.Config[queryName]; ok {
// TODO(affo): validate the query once this is done:
// https://github.com/influxdata/influxdb-client-go/issues/73.
collector.query = query
} else {
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
}
default:
return nil, fmt.Errorf("unknown metric type: %v", configType)
}
// Use custom InfluxDB config if defined in HPA annotation.
if v, ok := config.Config[influxDBAddressKey]; ok {
address = v
}
if v, ok := config.Config[influxDBTokenKey]; ok {
token = v
}
if v, ok := config.Config[influxDBOrgKey]; ok {
org = v
}
influxDbClient := influxdb.NewClient(address, token)
collector.address = address
collector.token = token
collector.org = org
collector.influxDBClient = influxDbClient
return collector, nil
}
// queryResult is for unmarshaling the result from InfluxDB.
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
type queryResult struct {
MetricValue float64
}
// getValue returns the first result gathered from an InfluxDB instance.
func (c *InfluxDBCollector) getValue(ctx context.Context) (resource.Quantity, error) {
queryAPI := c.influxDBClient.QueryAPI(c.org)
res, err := queryAPI.Query(ctx, c.query)
if err != nil {
return resource.Quantity{}, err
}
defer res.Close()
// Keeping just the first result.
if res.Next() {
qr := queryResult{}
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
}
if err := res.Err(); err != nil {
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
}
return resource.Quantity{}, fmt.Errorf("empty result returned")
}
func (c *InfluxDBCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, error) {
v, err := c.getValue(ctx)
if err != nil {
return nil, err
}
cm := CollectedMetric{
Namespace: c.namespace,
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{
Time: time.Now().UTC(),
},
Value: v,
},
}
return []CollectedMetric{cm}, nil
}
func (c *InfluxDBCollector) Interval() time.Duration {
return c.interval
}