diff --git a/README.md b/README.md index 73c2e41..cf90cbb 100644 --- a/README.md +++ b/README.md @@ -751,12 +751,12 @@ spec: - type: External external: metric: - name: my-nakadi-consumer - selector: - matchLabels: - type: nakadi - subscription-id: "708095f6-cece-4d02-840e-ee488d710b29" - metric-type: "consumer-lag-seconds|unconsumed-events" + name: my-nakadi-consumer + selector: + matchLabels: + type: nakadi + subscription-id: "708095f6-cece-4d02-840e-ee488d710b29" + metric-type: "consumer-lag-seconds|unconsumed-events" target: # value is compatible with the consumer-lag-seconds metric type. # It describes the amount of consumer lag in seconds before scaling @@ -805,6 +805,27 @@ with more consumers. For this case you should also account for the average time for processing an event when defining the target. +Alternative to defining `subscription-id` you can also filter based on +`owning_application`, `event-types` and `consumer-group`: + +```yaml +metrics: +- type: External + external: + metric: + name: my-nakadi-consumer + selector: + matchLabels: + type: nakadi + owning-application: "example-app" + # comma separated list of event types + event-types: "example-event-type,example-event-type2" + consumer-group: "abcd1234" + metric-type: "consumer-lag-seconds|unconsumed-events" +``` + +This is useful in dynamic environments where the subscription ID might not be +known before deployment time (e.g. because it's created by the same deployment). ## HTTP Collector diff --git a/pkg/collector/nakadi_collector.go b/pkg/collector/nakadi_collector.go index 2ced5c4..12c27a5 100644 --- a/pkg/collector/nakadi_collector.go +++ b/pkg/collector/nakadi_collector.go @@ -3,6 +3,7 @@ package collector import ( "context" "fmt" + "strings" "time" "github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi" @@ -17,6 +18,9 @@ const ( // subscriptions. NakadiMetricType = "nakadi" nakadiSubscriptionIDKey = "subscription-id" + nakadiOwningApplicationKey = "owning-application" + nakadiConsumerGroupKey = "consumer-group" + nakadiEventTypesKey = "event-types" nakadiMetricTypeKey = "metric-type" nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds" nakadiMetricTypeUnconsumedEvents = "unconsumed-events" @@ -43,26 +47,21 @@ func (c *NakadiCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscali // NakadiCollector defines a collector that is able to collect metrics from // Nakadi. type NakadiCollector struct { - nakadi nakadi.Nakadi - interval time.Duration - subscriptionID string - nakadiMetricType string - metric autoscalingv2.MetricIdentifier - metricType autoscalingv2.MetricSourceType - namespace string + nakadi nakadi.Nakadi + interval time.Duration + subscriptionFilter *nakadi.SubscriptionFilter + nakadiMetricType string + metric autoscalingv2.MetricIdentifier + metricType autoscalingv2.MetricSourceType + namespace string } // NewNakadiCollector initializes a new NakadiCollector. -func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) { +func NewNakadiCollector(_ context.Context, nakadiClient nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) { if config.Metric.Selector == nil { return nil, fmt.Errorf("selector for nakadi is not specified") } - subscriptionID, ok := config.Config[nakadiSubscriptionIDKey] - if !ok { - return nil, fmt.Errorf("subscription-id not specified on metric") - } - metricType, ok := config.Config[nakadiMetricTypeKey] if !ok { return nil, fmt.Errorf("metric-type not specified on metric") @@ -72,14 +71,40 @@ func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalin return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType) } + // Either subscription-id or filtering via owning-application, + // event-types, and consumer-group is supported. If all are defined + // then only subscription-id is used and the rest of the fields are + // ignored. + subscriptionFilter := &nakadi.SubscriptionFilter{} + if subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]; ok { + subscriptionFilter.SubscriptionID = subscriptionID + } + + if owningApplication, ok := config.Config[nakadiOwningApplicationKey]; ok { + subscriptionFilter.OwningApplication = owningApplication + } + + if nakadiEventTypes, ok := config.Config[nakadiEventTypesKey]; ok { + eventTypes := strings.Split(nakadiEventTypes, ",") + subscriptionFilter.EventTypes = eventTypes + } + + if consumerGroup, ok := config.Config[nakadiConsumerGroupKey]; ok { + subscriptionFilter.ConsumerGroup = consumerGroup + } + + if subscriptionFilter.SubscriptionID == "" && (subscriptionFilter.OwningApplication == "" || len(subscriptionFilter.EventTypes) == 0 || subscriptionFilter.ConsumerGroup == "") { + return nil, fmt.Errorf("either subscription-id or all of [owning-application, event-types, consumer-group] must be specified on the metric") + } + return &NakadiCollector{ - nakadi: nakadi, - interval: interval, - subscriptionID: subscriptionID, - nakadiMetricType: metricType, - metric: config.Metric, - metricType: config.Type, - namespace: hpa.Namespace, + nakadi: nakadiClient, + interval: interval, + subscriptionFilter: subscriptionFilter, + nakadiMetricType: metricType, + metric: config.Metric, + metricType: config.Type, + namespace: hpa.Namespace, }, nil } @@ -89,12 +114,12 @@ func (c *NakadiCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, er var err error switch c.nakadiMetricType { case nakadiMetricTypeConsumerLagSeconds: - value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionID) + value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionFilter) if err != nil { return nil, err } case nakadiMetricTypeUnconsumedEvents: - value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionID) + value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionFilter) if err != nil { return nil, err } diff --git a/pkg/nakadi/nakadi.go b/pkg/nakadi/nakadi.go index a4e9728..b00a517 100644 --- a/pkg/nakadi/nakadi.go +++ b/pkg/nakadi/nakadi.go @@ -12,8 +12,8 @@ import ( // Nakadi defines an interface for talking to the Nakadi API. type Nakadi interface { - ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) - UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) + ConsumerLagSeconds(ctx context.Context, filter *SubscriptionFilter) (int64, error) + UnconsumedEvents(ctx context.Context, filter *SubscriptionFilter) (int64, error) } // Client defines client for interfacing with the Nakadi API. @@ -30,8 +30,8 @@ func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client { } } -func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) { - stats, err := c.stats(ctx, subscriptionID) +func (c *Client) ConsumerLagSeconds(ctx context.Context, filter *SubscriptionFilter) (int64, error) { + stats, err := c.stats(ctx, filter) if err != nil { return 0, err } @@ -46,8 +46,8 @@ func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) return maxConsumerLagSeconds, nil } -func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) { - stats, err := c.stats(ctx, subscriptionID) +func (c *Client) UnconsumedEvents(ctx context.Context, filter *SubscriptionFilter) (int64, error) { + stats, err := c.stats(ctx, filter) if err != nil { return 0, err } @@ -62,6 +62,90 @@ func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (i return unconsumedEvents, nil } +type SubscriptionFilter struct { + SubscriptionID string + OwningApplication string + EventTypes []string + ConsumerGroup string +} + +func (c *Client) subscriptions(ctx context.Context, filter *SubscriptionFilter, href string) ([]string, error) { + endpoint, err := url.Parse(c.nakadiEndpoint) + if err != nil { + return nil, err + } + + if href != "" { + endpoint, err = url.Parse(c.nakadiEndpoint + href) + if err != nil { + return nil, fmt.Errorf("[nakadi subscriptions] failed to parse URL with href: %w", err) + } + } else { + endpoint.Path = "/subscriptions" + q := endpoint.Query() + if filter.OwningApplication != "" { + q.Set("owning_application", filter.OwningApplication) + } + for _, eventType := range filter.EventTypes { + q.Add("event_type", eventType) + } + if filter.ConsumerGroup != "" { + q.Set("consumer_group", filter.ConsumerGroup) + } + endpoint.RawQuery = q.Encode() + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil) + if err != nil { + return nil, fmt.Errorf("[nakadi subscriptions] failed to create request: %w", err) + } + + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("[nakadi subscriptions] failed to make request: %w", err) + } + defer resp.Body.Close() + + d, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("[nakadi subscriptions] unexpected response code: %d (%s)", resp.StatusCode, string(d)) + } + + var subscriptionsResp struct { + Items []struct { + ID string `json:"id"` + } + Links struct { + Next struct { + Href string `json:"href"` + } `json:"next"` + } `json:"_links"` + } + err = json.Unmarshal(d, &subscriptionsResp) + if err != nil { + return nil, err + } + + var subscriptions []string + for _, item := range subscriptionsResp.Items { + subscriptions = append(subscriptions, item.ID) + } + + if subscriptionsResp.Links.Next.Href != "" { + nextSubscriptions, err := c.subscriptions(ctx, nil, subscriptionsResp.Links.Next.Href) + if err != nil { + return nil, fmt.Errorf("[nakadi subscriptions] failed to get next subscriptions: %w", err) + } + subscriptions = append(subscriptions, nextSubscriptions...) + } + + return subscriptions, nil +} + type statsResp struct { Items []statsEventType `json:"items"` } @@ -80,45 +164,68 @@ type statsPartition struct { AssignmentType string `json:"assignment_type"` } -// stats returns the Nakadi stats for a given subscription ID. +// stats returns the Nakadi stats for a given a subscription filter which can +// include the subscription ID or a filter combination of [owning-applicaiton, +// event-types, consumer-group].. // // https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get -func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) { +func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]statsEventType, error) { + var subscriptionIDs []string + if filter.SubscriptionID == "" { + subscriptions, err := c.subscriptions(ctx, filter, "") + if err != nil { + return nil, fmt.Errorf("[nakadi stats] failed to get subscriptions: %w", err) + } + subscriptionIDs = subscriptions + } else { + subscriptionIDs = []string{filter.SubscriptionID} + } + endpoint, err := url.Parse(c.nakadiEndpoint) if err != nil { - return nil, err + return nil, fmt.Errorf("[nakadi stats] failed to parse URL %q: %w", c.nakadiEndpoint, err) } - endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID) + var stats []statsEventType + for _, subscriptionID := range subscriptionIDs { + endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID) - q := endpoint.Query() - q.Set("show_time_lag", "true") - endpoint.RawQuery = q.Encode() + q := endpoint.Query() + q.Set("show_time_lag", "true") + endpoint.RawQuery = q.Encode() - resp, err := c.http.Get(endpoint.String()) - if err != nil { - return nil, err - } - defer resp.Body.Close() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil) + if err != nil { + return nil, fmt.Errorf("[nakadi stats] failed to create request: %w", err) + } - d, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("[nakadi stats] failed to make request: %w", err) + } + defer resp.Body.Close() + + d, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d)) + } + + var result statsResp + err = json.Unmarshal(d, &result) + if err != nil { + return nil, err + } + + if len(result.Items) == 0 { + return nil, errors.New("[nakadi stats] expected at least 1 event-type, 0 returned") + } + + stats = append(stats, result.Items...) } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d)) - } - - var result statsResp - err = json.Unmarshal(d, &result) - if err != nil { - return nil, err - } - - if len(result.Items) == 0 { - return nil, errors.New("expected at least 1 event-type, 0 returned") - } - - return result.Items, nil + return stats, nil } diff --git a/pkg/nakadi/nakadi_test.go b/pkg/nakadi/nakadi_test.go index 2a14e0a..01e2f7c 100644 --- a/pkg/nakadi/nakadi_test.go +++ b/pkg/nakadi/nakadi_test.go @@ -12,18 +12,42 @@ import ( func TestQuery(tt *testing.T) { client := &http.Client{} + + subscriptionsResponseBody := `{ + "items": [ + { + "id": "id_1" + }, + { + "id": "id_2" + } + ], + "_links": { + "next": { + "href": "/subscriptions?event_type=example-event&owning_application=example-app&offset=20&limit=20" + } + } +}` + + subscriptionsResponseBodyNoNext := `{ + "items": [], + "_links": {} +}` + for _, ti := range []struct { - msg string - status int - responseBody string - err error - unconsumedEvents int64 - consumerLagSeconds int64 + msg string + status int + subscriptionIDResponseBody string + subscriptionFilter *SubscriptionFilter + err error + unconsumedEvents int64 + consumerLagSeconds int64 }{ { - msg: "test getting a single event-type", - status: http.StatusOK, - responseBody: `{ + msg: "test getting a single event-type", + status: http.StatusOK, + subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"}, + subscriptionIDResponseBody: `{ "items": [ { "event_type": "example-event", @@ -52,9 +76,10 @@ func TestQuery(tt *testing.T) { consumerLagSeconds: 2, }, { - msg: "test getting multiple event-types", - status: http.StatusOK, - responseBody: `{ + msg: "test getting multiple event-types", + status: http.StatusOK, + subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"}, + subscriptionIDResponseBody: `{ "items": [ { "event_type": "example-event", @@ -104,38 +129,92 @@ func TestQuery(tt *testing.T) { consumerLagSeconds: 6, }, { - msg: "test call with invalid response", - status: http.StatusInternalServerError, - responseBody: `{"error": 500}`, - err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"), + msg: "test call with invalid response", + status: http.StatusInternalServerError, + subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"}, + subscriptionIDResponseBody: `{"error": 500}`, + err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"), }, { - msg: "test getting back a single data point", - status: http.StatusOK, - responseBody: `{ + msg: "test getting back no data points", + status: http.StatusOK, + subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"}, + subscriptionIDResponseBody: `{ "items": [] }`, - err: errors.New("expected at least 1 event-type, 0 returned"), + err: errors.New("[nakadi stats] expected at least 1 event-type, 0 returned"), + }, + { + msg: "test filtering by owning_application and event_type", + status: http.StatusOK, + subscriptionFilter: &SubscriptionFilter{OwningApplication: "example-app", EventTypes: []string{"example-event"}, ConsumerGroup: "example-group"}, + subscriptionIDResponseBody: `{ + "items": [ + { + "event_type": "example-event", + "partitions": [ + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 4, + "consumer_lag_seconds": 2, + "stream_id": "example-id", + "assignment_type": "auto" + }, + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 5, + "consumer_lag_seconds": 1, + "stream_id": "example-id", + "assignment_type": "auto" + } + ] + } + ] + }`, + unconsumedEvents: 18, + consumerLagSeconds: 2, }, } { tt.Run(ti.msg, func(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(ti.status) - _, err := w.Write([]byte(ti.responseBody)) + mux := http.NewServeMux() + mux.HandleFunc("/subscriptions", func(w http.ResponseWriter, r *http.Request) { + offset := r.URL.Query().Get("offset") + if offset != "" { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(subscriptionsResponseBodyNoNext)) assert.NoError(t, err) - }), - ) + return + } + + owningApplication := r.URL.Query().Get("owning_application") + eventTypes := r.URL.Query()["event_type"] + consumerGroup := r.URL.Query().Get("consumer_group") + + assert.Equal(t, ti.subscriptionFilter.OwningApplication, owningApplication) + assert.Equal(t, ti.subscriptionFilter.EventTypes, eventTypes) + assert.Equal(t, ti.subscriptionFilter.ConsumerGroup, consumerGroup) + + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(subscriptionsResponseBody)) + assert.NoError(t, err) + }) + mux.HandleFunc("/subscriptions/{id}/stats", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(ti.status) + _, err := w.Write([]byte(ti.subscriptionIDResponseBody)) + assert.NoError(t, err) + }) + ts := httptest.NewServer(mux) defer ts.Close() nakadiClient := NewNakadiClient(ts.URL, client) - consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id") + consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), ti.subscriptionFilter) assert.Equal(t, ti.err, err) assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds) - unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id") + unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), ti.subscriptionFilter) assert.Equal(t, ti.err, err) assert.Equal(t, ti.unconsumedEvents, unconsumedEvents) }) } - }