2020-01-21 10:13:27 +01:00
|
|
|
package collector
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
2023-10-24 11:54:43 +02:00
|
|
|
influxdb "github.com/influxdata/influxdb-client-go"
|
2023-05-03 23:01:19 +02:00
|
|
|
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
2020-01-21 10:13:27 +01:00
|
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
"k8s.io/metrics/pkg/apis/external_metrics"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2020-11-04 20:40:53 +01:00
|
|
|
InfluxDBMetricType = "influxdb"
|
|
|
|
InfluxDBMetricNameLegacy = "flux-query"
|
2020-01-21 10:13:27 +01:00
|
|
|
influxDBAddressKey = "address"
|
|
|
|
influxDBTokenKey = "token"
|
2020-02-21 18:26:46 +01:00
|
|
|
influxDBOrgKey = "org"
|
2020-01-21 10:13:27 +01:00
|
|
|
influxDBQueryNameLabelKey = "query-name"
|
|
|
|
)
|
|
|
|
|
|
|
|
type InfluxDBCollectorPlugin struct {
|
|
|
|
kubeClient kubernetes.Interface
|
|
|
|
address string
|
|
|
|
token string
|
2020-02-21 18:26:46 +01:00
|
|
|
org string
|
2020-01-21 10:13:27 +01:00
|
|
|
}
|
|
|
|
|
2020-02-21 18:26:46 +01:00
|
|
|
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org string) (*InfluxDBCollectorPlugin, error) {
|
2020-01-21 10:13:27 +01:00
|
|
|
return &InfluxDBCollectorPlugin{
|
|
|
|
kubeClient: client,
|
|
|
|
address: address,
|
|
|
|
token: token,
|
2020-02-21 18:26:46 +01:00
|
|
|
org: org,
|
2020-01-21 10:13:27 +01:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2024-05-21 14:00:31 +02:00
|
|
|
func (p *InfluxDBCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
|
|
|
return NewInfluxDBCollector(ctx, hpa, p.address, p.token, p.org, config, interval)
|
2020-01-21 10:13:27 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
type InfluxDBCollector struct {
|
2020-01-24 09:54:35 +01:00
|
|
|
address string
|
|
|
|
token string
|
2020-02-21 18:26:46 +01:00
|
|
|
org string
|
2020-01-24 09:54:35 +01:00
|
|
|
|
2024-04-09 11:46:02 +02:00
|
|
|
influxDBClient influxdb.Client
|
2020-01-21 10:13:27 +01:00
|
|
|
interval time.Duration
|
|
|
|
metric autoscalingv2.MetricIdentifier
|
|
|
|
metricType autoscalingv2.MetricSourceType
|
|
|
|
query string
|
2021-02-19 11:11:29 +01:00
|
|
|
namespace string
|
2020-01-21 10:13:27 +01:00
|
|
|
}
|
|
|
|
|
2024-05-21 14:00:31 +02:00
|
|
|
func NewInfluxDBCollector(_ context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
|
2020-01-21 10:13:27 +01:00
|
|
|
collector := &InfluxDBCollector{
|
|
|
|
interval: interval,
|
|
|
|
metric: config.Metric,
|
|
|
|
metricType: config.Type,
|
2021-02-19 11:11:29 +01:00
|
|
|
namespace: hpa.Namespace,
|
2020-01-21 10:13:27 +01:00
|
|
|
}
|
|
|
|
switch configType := config.Type; configType {
|
|
|
|
case autoscalingv2.ObjectMetricSourceType:
|
|
|
|
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
|
|
|
|
case autoscalingv2.ExternalMetricSourceType:
|
|
|
|
// `metricSelector` is flattened into the MetricConfig.Config.
|
|
|
|
queryName, ok := config.Config[influxDBQueryNameLabelKey]
|
|
|
|
if !ok {
|
2020-01-24 09:54:35 +01:00
|
|
|
return nil, fmt.Errorf("selector for Flux query is not specified, "+
|
2020-01-21 10:13:27 +01:00
|
|
|
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
|
|
|
|
}
|
|
|
|
if query, ok := config.Config[queryName]; ok {
|
2020-01-24 09:07:20 +01:00
|
|
|
// TODO(affo): validate the query once this is done:
|
|
|
|
// https://github.com/influxdata/influxdb-client-go/issues/73.
|
2020-01-21 10:13:27 +01:00
|
|
|
collector.query = query
|
|
|
|
} else {
|
|
|
|
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unknown metric type: %v", configType)
|
|
|
|
}
|
|
|
|
// Use custom InfluxDB config if defined in HPA annotation.
|
2020-01-24 09:07:20 +01:00
|
|
|
if v, ok := config.Config[influxDBAddressKey]; ok {
|
2020-01-21 10:13:27 +01:00
|
|
|
address = v
|
|
|
|
}
|
2020-01-24 09:07:20 +01:00
|
|
|
if v, ok := config.Config[influxDBTokenKey]; ok {
|
2020-01-21 10:13:27 +01:00
|
|
|
token = v
|
|
|
|
}
|
2020-02-21 18:26:46 +01:00
|
|
|
if v, ok := config.Config[influxDBOrgKey]; ok {
|
|
|
|
org = v
|
2020-01-21 10:13:27 +01:00
|
|
|
}
|
2023-10-24 11:54:43 +02:00
|
|
|
influxDbClient := influxdb.NewClient(address, token)
|
2020-01-24 09:54:35 +01:00
|
|
|
collector.address = address
|
|
|
|
collector.token = token
|
2020-02-21 18:26:46 +01:00
|
|
|
collector.org = org
|
2024-04-09 11:46:02 +02:00
|
|
|
collector.influxDBClient = influxDbClient
|
2020-01-21 10:13:27 +01:00
|
|
|
return collector, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// queryResult is for unmarshaling the result from InfluxDB.
|
|
|
|
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
|
|
|
|
type queryResult struct {
|
|
|
|
MetricValue float64
|
|
|
|
}
|
|
|
|
|
|
|
|
// getValue returns the first result gathered from an InfluxDB instance.
|
2024-05-21 14:00:31 +02:00
|
|
|
func (c *InfluxDBCollector) getValue(ctx context.Context) (resource.Quantity, error) {
|
2024-04-09 11:46:02 +02:00
|
|
|
queryAPI := c.influxDBClient.QueryAPI(c.org)
|
2024-05-21 14:00:31 +02:00
|
|
|
res, err := queryAPI.Query(ctx, c.query)
|
2020-01-21 10:13:27 +01:00
|
|
|
if err != nil {
|
|
|
|
return resource.Quantity{}, err
|
|
|
|
}
|
|
|
|
defer res.Close()
|
|
|
|
// Keeping just the first result.
|
|
|
|
if res.Next() {
|
|
|
|
qr := queryResult{}
|
|
|
|
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
|
|
|
|
}
|
2023-10-24 11:54:43 +02:00
|
|
|
if err := res.Err(); err != nil {
|
2020-01-21 10:13:27 +01:00
|
|
|
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
|
|
|
|
}
|
|
|
|
return resource.Quantity{}, fmt.Errorf("empty result returned")
|
|
|
|
}
|
|
|
|
|
2024-05-21 14:00:31 +02:00
|
|
|
func (c *InfluxDBCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, error) {
|
|
|
|
v, err := c.getValue(ctx)
|
2020-01-21 10:13:27 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
cm := CollectedMetric{
|
2021-02-19 11:11:29 +01:00
|
|
|
Namespace: c.namespace,
|
|
|
|
Type: c.metricType,
|
2020-01-21 10:13:27 +01:00
|
|
|
External: external_metrics.ExternalMetricValue{
|
|
|
|
MetricName: c.metric.Name,
|
|
|
|
MetricLabels: c.metric.Selector.MatchLabels,
|
|
|
|
Timestamp: metav1.Time{
|
|
|
|
Time: time.Now().UTC(),
|
|
|
|
},
|
|
|
|
Value: v,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
return []CollectedMetric{cm}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *InfluxDBCollector) Interval() time.Duration {
|
|
|
|
return c.interval
|
|
|
|
}
|