diff --git a/pkg/collector/influxdb_collector.go b/pkg/collector/influxdb_collector.go index ba90186..43ae0c8 100644 --- a/pkg/collector/influxdb_collector.go +++ b/pkg/collector/influxdb_collector.go @@ -39,22 +39,23 @@ func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org } func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewInfluxDBCollector(p.address, p.orgID, p.token, hpa, config, interval) + return NewInfluxDBCollector(p.address, p.orgID, p.token, config, interval) } type InfluxDBCollector struct { + address string + token string + orgID string + influxDBClient *influxdb.Client - hpa *autoscalingv2.HorizontalPodAutoscaler interval time.Duration metric autoscalingv2.MetricIdentifier metricType autoscalingv2.MetricSourceType query string - orgID string } -func NewInfluxDBCollector(address string, token string, orgID string, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) { +func NewInfluxDBCollector(address string, token string, orgID string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) { collector := &InfluxDBCollector{ - hpa: hpa, interval: interval, metric: config.Metric, metricType: config.Type, @@ -66,7 +67,7 @@ func NewInfluxDBCollector(address string, token string, orgID string, hpa *autos // `metricSelector` is flattened into the MetricConfig.Config. queryName, ok := config.Config[influxDBQueryNameLabelKey] if !ok { - return nil, fmt.Errorf("selector for Flux query is not specified,"+ + return nil, fmt.Errorf("selector for Flux query is not specified, "+ "please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey) } if query, ok := config.Config[queryName]; ok { @@ -93,8 +94,10 @@ func NewInfluxDBCollector(address string, token string, orgID string, hpa *autos if err != nil { return nil, err } - collector.influxDBClient = influxDbClient + collector.address = address + collector.token = token collector.orgID = orgID + collector.influxDBClient = influxDbClient return collector, nil } diff --git a/pkg/collector/influxdb_collector_test.go b/pkg/collector/influxdb_collector_test.go new file mode 100644 index 0000000..5e59967 --- /dev/null +++ b/pkg/collector/influxdb_collector_test.go @@ -0,0 +1,155 @@ +package collector + +import ( + "strings" + "testing" + "time" + + "k8s.io/api/autoscaling/v2beta2" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestInfluxDBCollector_New(t *testing.T) { + t.Run("simple", func(t *testing.T) { + m := &MetricConfig{ + MetricTypeName: MetricTypeName{ + Type: v2beta2.ExternalMetricSourceType, + Metric: v2beta2.MetricIdentifier{ + Name: "flux-query", + // This is actually useless, because the selector should be flattened in Config when parsing. + Selector: &v1.LabelSelector{ + MatchLabels: map[string]string{ + "query-name": "range2m", + }, + }, + }, + }, + CollectorName: "influxdb", + Config: map[string]string{ + "range1m": `from(bucket: "?") |> range(start: -1m)`, + "range2m": `from(bucket: "?") |> range(start: -2m)`, + "range3m": `from(bucket: "?") |> range(start: -3m)`, + "query-name": "range2m", + }, + } + c, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got, want := c.orgID, "deadbeef"; want != got { + t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got) + } + if got, want := c.address, "http://localhost:9999"; want != got { + t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got) + } + if got, want := c.token, "secret"; want != got { + t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got) + } + if got, want := c.query, `from(bucket: "?") |> range(start: -2m)`; want != got { + t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got) + } + }) + t.Run("override params", func(t *testing.T) { + m := &MetricConfig{ + MetricTypeName: MetricTypeName{ + Type: v2beta2.ExternalMetricSourceType, + Metric: v2beta2.MetricIdentifier{ + Name: "flux-query", + Selector: &v1.LabelSelector{ + MatchLabels: map[string]string{ + "query-name": "range2m", + }, + }, + }, + }, + CollectorName: "influxdb", + Config: map[string]string{ + "range1m": `from(bucket: "?") |> range(start: -1m)`, + "range2m": `from(bucket: "?") |> range(start: -2m)`, + "range3m": `from(bucket: "?") |> range(start: -3m)`, + "address": "http://localhost:9999", + "token": "sEcr3TT0ken", + "org-id": "deadbeef1234", + "query-name": "range3m", + }, + } + c, err := NewInfluxDBCollector("http://localhost:8888", "secret", "deadbeef", m, time.Second) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got, want := c.orgID, "deadbeef1234"; want != got { + t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got) + } + if got, want := c.address, "http://localhost:9999"; want != got { + t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got) + } + if got, want := c.token, "sEcr3TT0ken"; want != got { + t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got) + } + if got, want := c.query, `from(bucket: "?") |> range(start: -3m)`; want != got { + t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got) + } + }) + // Errors. + for _, tc := range []struct { + name string + mTypeName MetricTypeName + config map[string]string + errorStartsWith string + }{ + { + name: "object metric", + mTypeName: MetricTypeName{ + Type: v2beta2.ObjectMetricSourceType, + }, + errorStartsWith: "InfluxDB does not support object", + }, + { + name: "no selector", + mTypeName: MetricTypeName{ + Type: v2beta2.ExternalMetricSourceType, + Metric: v2beta2.MetricIdentifier{ + Name: "flux-query", + }, + }, + // The selector should be flattened into the config by the parsing step, but it isn't. + config: map[string]string{ + "range1m": `from(bucket: "?") |> range(start: -1m)`, + "range2m": `from(bucket: "?") |> range(start: -2m)`, + "range3m": `from(bucket: "?") |> range(start: -3m)`, + }, + errorStartsWith: "selector for Flux query is not specified", + }, + { + name: "referencing non-existing query", + mTypeName: MetricTypeName{ + Type: v2beta2.ExternalMetricSourceType, + Metric: v2beta2.MetricIdentifier{ + Name: "flux-query", + }, + }, + config: map[string]string{ + "range1m": `from(bucket: "?") |> range(start: -1m)`, + "range2m": `from(bucket: "?") |> range(start: -2m)`, + "range3m": `from(bucket: "?") |> range(start: -3m)`, + "query-name": "rangeXm", + }, + errorStartsWith: "no Flux query defined for metric", + }, + } { + t.Run("error - "+tc.name, func(t *testing.T) { + m := &MetricConfig{ + MetricTypeName: tc.mTypeName, + CollectorName: "influxdb", + Config: tc.config, + } + _, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second) + if err == nil { + t.Fatal("expected error got none") + } + if want, got := tc.errorStartsWith, err.Error(); !strings.HasPrefix(got, want) { + t.Fatalf("%s should start with %s", got, want) + } + }) + } +}