From 8b1245cbc8813527ed04c854b01d46ae478ee621 Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Thu, 27 Mar 2025 13:41:13 +0100 Subject: [PATCH 1/6] Add support for filtering nakadi subscriptions Signed-off-by: Mikkel Oscar Lyderik Larsen --- README.md | 33 ++++-- pkg/collector/nakadi_collector.go | 68 ++++++++---- pkg/nakadi/nakadi.go | 173 ++++++++++++++++++++++++------ pkg/nakadi/nakadi_test.go | 135 ++++++++++++++++++----- pkg/provider/metric_store.go | 8 +- 5 files changed, 323 insertions(+), 94 deletions(-) diff --git a/README.md b/README.md index 73c2e41..4b7c720 100644 --- a/README.md +++ b/README.md @@ -751,12 +751,12 @@ spec: - type: External external: metric: - name: my-nakadi-consumer - selector: - matchLabels: - type: nakadi - subscription-id: "708095f6-cece-4d02-840e-ee488d710b29" - metric-type: "consumer-lag-seconds|unconsumed-events" + name: my-nakadi-consumer + selector: + matchLabels: + type: nakadi + subscription-id: "708095f6-cece-4d02-840e-ee488d710b29" + metric-type: "consumer-lag-seconds|unconsumed-events" target: # value is compatible with the consumer-lag-seconds metric type. # It describes the amount of consumer lag in seconds before scaling @@ -805,6 +805,27 @@ with more consumers. For this case you should also account for the average time for processing an event when defining the target. +Alternative to defining `subscription-id` you can also filter based on +`owning_application`, `event-types` and `consumer-group`: + +```yaml +metrics: +- type: External + external: + metric: + name: my-nakadi-consumer + selector: + matchLabels: + type: nakadi + owning-application: "example-app" + # comma separated list of event types + event-types: "example-event-type,example-event-type2" + consumer-group: "abcd1234" + metric-type: "consumer-lag-seconds|unconsumed-events" +``` + +This is useful in dynamic environments where the subscription ID might not be +known at deployment time. ## HTTP Collector diff --git a/pkg/collector/nakadi_collector.go b/pkg/collector/nakadi_collector.go index 2ced5c4..0e71e6a 100644 --- a/pkg/collector/nakadi_collector.go +++ b/pkg/collector/nakadi_collector.go @@ -3,6 +3,7 @@ package collector import ( "context" "fmt" + "strings" "time" "github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi" @@ -17,6 +18,9 @@ const ( // subscriptions. NakadiMetricType = "nakadi" nakadiSubscriptionIDKey = "subscription-id" + nakadiOwningApplicationKey = "owning-application" + nakadiConsumerGroupKey = "consumer-group" + nakadiEventTypesKey = "event-types" nakadiMetricTypeKey = "metric-type" nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds" nakadiMetricTypeUnconsumedEvents = "unconsumed-events" @@ -43,26 +47,21 @@ func (c *NakadiCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscali // NakadiCollector defines a collector that is able to collect metrics from // Nakadi. type NakadiCollector struct { - nakadi nakadi.Nakadi - interval time.Duration - subscriptionID string - nakadiMetricType string - metric autoscalingv2.MetricIdentifier - metricType autoscalingv2.MetricSourceType - namespace string + nakadi nakadi.Nakadi + interval time.Duration + subscriptionFilter *nakadi.SubscriptionFilter + nakadiMetricType string + metric autoscalingv2.MetricIdentifier + metricType autoscalingv2.MetricSourceType + namespace string } // NewNakadiCollector initializes a new NakadiCollector. -func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) { +func NewNakadiCollector(_ context.Context, nakadiClient nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) { if config.Metric.Selector == nil { return nil, fmt.Errorf("selector for nakadi is not specified") } - subscriptionID, ok := config.Config[nakadiSubscriptionIDKey] - if !ok { - return nil, fmt.Errorf("subscription-id not specified on metric") - } - metricType, ok := config.Config[nakadiMetricTypeKey] if !ok { return nil, fmt.Errorf("metric-type not specified on metric") @@ -72,14 +71,39 @@ func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalin return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType) } + // Either subscription-id or filtering via owning-application, + // event-types, and consumer-group is supported. If all are defined + // then subscription-id is used. + subscriptionFilter := &nakadi.SubscriptionFilter{} + if subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]; ok { + subscriptionFilter.SubscriptionID = subscriptionID + } + + if owningApplication, ok := config.Config[nakadiOwningApplicationKey]; ok { + subscriptionFilter.OwningApplication = owningApplication + } + + if nakadiEventTypes, ok := config.Config[nakadiEventTypesKey]; ok { + eventTypes := strings.Split(nakadiEventTypes, ",") + subscriptionFilter.EventTypes = eventTypes + } + + if consumerGroup, ok := config.Config[nakadiConsumerGroupKey]; ok { + subscriptionFilter.ConsumerGroup = consumerGroup + } + + if subscriptionFilter.SubscriptionID == "" && (subscriptionFilter.OwningApplication == "" && len(subscriptionFilter.EventTypes) == 0 && subscriptionFilter.ConsumerGroup == "") { + return nil, fmt.Errorf("subscription-id or owning-application, event-types, and consumer-group must be specified on the metric") + } + return &NakadiCollector{ - nakadi: nakadi, - interval: interval, - subscriptionID: subscriptionID, - nakadiMetricType: metricType, - metric: config.Metric, - metricType: config.Type, - namespace: hpa.Namespace, + nakadi: nakadiClient, + interval: interval, + subscriptionFilter: subscriptionFilter, + nakadiMetricType: metricType, + metric: config.Metric, + metricType: config.Type, + namespace: hpa.Namespace, }, nil } @@ -89,12 +113,12 @@ func (c *NakadiCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, er var err error switch c.nakadiMetricType { case nakadiMetricTypeConsumerLagSeconds: - value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionID) + value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionFilter) if err != nil { return nil, err } case nakadiMetricTypeUnconsumedEvents: - value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionID) + value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionFilter) if err != nil { return nil, err } diff --git a/pkg/nakadi/nakadi.go b/pkg/nakadi/nakadi.go index a4e9728..5f7e57f 100644 --- a/pkg/nakadi/nakadi.go +++ b/pkg/nakadi/nakadi.go @@ -12,8 +12,8 @@ import ( // Nakadi defines an interface for talking to the Nakadi API. type Nakadi interface { - ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) - UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) + ConsumerLagSeconds(ctx context.Context, filter *SubscriptionFilter) (int64, error) + UnconsumedEvents(ctx context.Context, filter *SubscriptionFilter) (int64, error) } // Client defines client for interfacing with the Nakadi API. @@ -30,8 +30,8 @@ func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client { } } -func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) { - stats, err := c.stats(ctx, subscriptionID) +func (c *Client) ConsumerLagSeconds(ctx context.Context, filter *SubscriptionFilter) (int64, error) { + stats, err := c.stats(ctx, filter) if err != nil { return 0, err } @@ -46,8 +46,8 @@ func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) return maxConsumerLagSeconds, nil } -func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) { - stats, err := c.stats(ctx, subscriptionID) +func (c *Client) UnconsumedEvents(ctx context.Context, filter *SubscriptionFilter) (int64, error) { + stats, err := c.stats(ctx, filter) if err != nil { return 0, err } @@ -62,6 +62,90 @@ func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (i return unconsumedEvents, nil } +type SubscriptionFilter struct { + SubscriptionID string + OwningApplication string + EventTypes []string + ConsumerGroup string +} + +func (c *Client) subscriptions(ctx context.Context, filter *SubscriptionFilter, href string) ([]string, error) { + endpoint, err := url.Parse(c.nakadiEndpoint) + if err != nil { + return nil, err + } + + if href != "" { + endpoint, err = url.Parse(c.nakadiEndpoint + href) + if err != nil { + return nil, fmt.Errorf("failed to parse URL with href: %w", err) + } + } else { + endpoint.Path = "/subscriptions" + q := endpoint.Query() + if filter.OwningApplication != "" { + q.Set("owning_application", filter.OwningApplication) + } + for _, eventType := range filter.EventTypes { + q.Add("event_type", eventType) + } + if filter.ConsumerGroup != "" { + q.Set("consumer_group", filter.ConsumerGroup) + } + endpoint.RawQuery = q.Encode() + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to make request: %w", err) + } + defer resp.Body.Close() + + d, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("[nakadi subscriptions] unexpected response code: %d (%s)", resp.StatusCode, string(d)) + } + + var subscriptionsResp struct { + Items []struct { + ID string `json:"id"` + } + Links struct { + Next struct { + Href string `json:"href"` + } `json:"next"` + } `json:"_links"` + } + err = json.Unmarshal(d, &subscriptionsResp) + if err != nil { + return nil, err + } + + var subscriptions []string + for _, item := range subscriptionsResp.Items { + subscriptions = append(subscriptions, item.ID) + } + + if subscriptionsResp.Links.Next.Href != "" { + nextSubscriptions, err := c.subscriptions(ctx, nil, subscriptionsResp.Links.Next.Href) + if err != nil { + return nil, err + } + subscriptions = append(subscriptions, nextSubscriptions...) + } + + return subscriptions, nil +} + type statsResp struct { Items []statsEventType `json:"items"` } @@ -83,42 +167,63 @@ type statsPartition struct { // stats returns the Nakadi stats for a given subscription ID. // // https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get -func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) { +func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]statsEventType, error) { + var subscriptionIDs []string + if filter.SubscriptionID == "" { + subscriptions, err := c.subscriptions(ctx, filter, "") + if err != nil { + return nil, err + } + subscriptionIDs = subscriptions + } else { + subscriptionIDs = []string{filter.SubscriptionID} + } + endpoint, err := url.Parse(c.nakadiEndpoint) if err != nil { return nil, err } - endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID) + var stats []statsEventType + for _, subscriptionID := range subscriptionIDs { + endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID) - q := endpoint.Query() - q.Set("show_time_lag", "true") - endpoint.RawQuery = q.Encode() + q := endpoint.Query() + q.Set("show_time_lag", "true") + endpoint.RawQuery = q.Encode() - resp, err := c.http.Get(endpoint.String()) - if err != nil { - return nil, err - } - defer resp.Body.Close() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } - d, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to make request: %w", err) + } + defer resp.Body.Close() + + d, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d)) + } + + var result statsResp + err = json.Unmarshal(d, &result) + if err != nil { + return nil, err + } + + if len(result.Items) == 0 { + return nil, errors.New("expected at least 1 event-type, 0 returned") + } + + stats = append(stats, result.Items...) } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d)) - } - - var result statsResp - err = json.Unmarshal(d, &result) - if err != nil { - return nil, err - } - - if len(result.Items) == 0 { - return nil, errors.New("expected at least 1 event-type, 0 returned") - } - - return result.Items, nil + return stats, nil } diff --git a/pkg/nakadi/nakadi_test.go b/pkg/nakadi/nakadi_test.go index 2a14e0a..0337db3 100644 --- a/pkg/nakadi/nakadi_test.go +++ b/pkg/nakadi/nakadi_test.go @@ -12,18 +12,42 @@ import ( func TestQuery(tt *testing.T) { client := &http.Client{} + + subscriptionsResponseBody := `{ + "items": [ + { + "id": "id_1" + }, + { + "id": "id_2" + } + ], + "_links": { + "next": { + "href": "/subscriptions?event_type=example-event&owning_application=example-app&offset=20&limit=20" + } + } +}` + + subscriptionsResponseBodyNoNext := `{ + "items": [], + "_links": {} +}` + for _, ti := range []struct { - msg string - status int - responseBody string - err error - unconsumedEvents int64 - consumerLagSeconds int64 + msg string + status int + subscriptionIDResponseBody string + subscriptionFilter *SubscriptionFilter + err error + unconsumedEvents int64 + consumerLagSeconds int64 }{ { - msg: "test getting a single event-type", - status: http.StatusOK, - responseBody: `{ + msg: "test getting a single event-type", + status: http.StatusOK, + subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"}, + subscriptionIDResponseBody: `{ "items": [ { "event_type": "example-event", @@ -52,9 +76,10 @@ func TestQuery(tt *testing.T) { consumerLagSeconds: 2, }, { - msg: "test getting multiple event-types", - status: http.StatusOK, - responseBody: `{ + msg: "test getting multiple event-types", + status: http.StatusOK, + subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"}, + subscriptionIDResponseBody: `{ "items": [ { "event_type": "example-event", @@ -104,38 +129,92 @@ func TestQuery(tt *testing.T) { consumerLagSeconds: 6, }, { - msg: "test call with invalid response", - status: http.StatusInternalServerError, - responseBody: `{"error": 500}`, - err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"), + msg: "test call with invalid response", + status: http.StatusInternalServerError, + subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"}, + subscriptionIDResponseBody: `{"error": 500}`, + err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"), }, { - msg: "test getting back a single data point", - status: http.StatusOK, - responseBody: `{ + msg: "test getting back no data points", + status: http.StatusOK, + subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"}, + subscriptionIDResponseBody: `{ "items": [] }`, err: errors.New("expected at least 1 event-type, 0 returned"), }, + { + msg: "test filtering by owning_application and event_type", + status: http.StatusOK, + subscriptionFilter: &SubscriptionFilter{OwningApplication: "example-app", EventTypes: []string{"example-event"}, ConsumerGroup: "example-group"}, + subscriptionIDResponseBody: `{ + "items": [ + { + "event_type": "example-event", + "partitions": [ + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 4, + "consumer_lag_seconds": 2, + "stream_id": "example-id", + "assignment_type": "auto" + }, + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 5, + "consumer_lag_seconds": 1, + "stream_id": "example-id", + "assignment_type": "auto" + } + ] + } + ] + }`, + unconsumedEvents: 18, + consumerLagSeconds: 2, + }, } { tt.Run(ti.msg, func(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(ti.status) - _, err := w.Write([]byte(ti.responseBody)) + mux := http.NewServeMux() + mux.HandleFunc("/subscriptions", func(w http.ResponseWriter, r *http.Request) { + offset := r.URL.Query().Get("offset") + if offset != "" { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(subscriptionsResponseBodyNoNext)) assert.NoError(t, err) - }), - ) + return + } + + owningApplication := r.URL.Query().Get("owning_application") + eventTypes := r.URL.Query()["event_type"] + consumerGroup := r.URL.Query().Get("consumer_group") + + assert.Equal(t, ti.subscriptionFilter.OwningApplication, owningApplication) + assert.Equal(t, ti.subscriptionFilter.EventTypes, eventTypes) + assert.Equal(t, ti.subscriptionFilter.ConsumerGroup, consumerGroup) + + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(subscriptionsResponseBody)) + assert.NoError(t, err) + }) + mux.HandleFunc("/subscriptions/{id}/stats", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(ti.status) + _, err := w.Write([]byte(ti.subscriptionIDResponseBody)) + assert.NoError(t, err) + }) + ts := httptest.NewServer(mux) defer ts.Close() nakadiClient := NewNakadiClient(ts.URL, client) - consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id") + consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), ti.subscriptionFilter) assert.Equal(t, ti.err, err) assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds) - unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id") + unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), ti.subscriptionFilter) assert.Equal(t, ti.err, err) assert.Equal(t, ti.unconsumedEvents, unconsumedEvents) }) } - } diff --git a/pkg/provider/metric_store.go b/pkg/provider/metric_store.go index 290a090..bd13cc7 100644 --- a/pkg/provider/metric_store.go +++ b/pkg/provider/metric_store.go @@ -140,7 +140,7 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) { labelsKey = hashLabelMap(selector.MatchLabels) } - metric := metricName(value.Metric.Name) + metric := metricName(strings.ToLower(value.Metric.Name)) namespace := objectNamespace(value.DescribedObject.Namespace) object := objectName(value.DescribedObject.Name) @@ -203,7 +203,7 @@ func (s *MetricStore) insertExternalMetric(namespace objectNamespace, metric ext labelsKey := hashLabelMap(metric.MetricLabels) - metricName := metricName(metric.MetricName) + metricName := metricName(strings.ToLower(metric.MetricName)) if metrics, ok := s.externalMetricsStore[namespace]; ok { if labels, ok := metrics[metricName]; ok { @@ -259,7 +259,7 @@ func (s *MetricStore) GetMetricsBySelector(_ context.Context, namespace objectNa s.RLock() defer s.RUnlock() - group2namespace, ok := s.customMetricsStore[metricName(info.Metric)] + group2namespace, ok := s.customMetricsStore[metricName(strings.ToLower(info.Metric))] if !ok { return &custom_metrics.MetricValueList{} } @@ -368,7 +368,7 @@ func (s *MetricStore) GetExternalMetric(_ context.Context, namespace objectNames defer s.RUnlock() if metrics, ok := s.externalMetricsStore[namespace]; ok { - if selectors, ok := metrics[metricName(info.Metric)]; ok { + if selectors, ok := metrics[metricName(strings.ToLower(info.Metric))]; ok { for _, sel := range selectors { if selector.Matches(labels.Set(sel.Value.MetricLabels)) { matchedMetrics = append(matchedMetrics, sel.Value) From 96a1315ad39acb12ca34e8023ef3dd7c99c43517 Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Tue, 15 Apr 2025 10:42:39 +0200 Subject: [PATCH 2/6] Git revert metric_store changes Signed-off-by: Mikkel Oscar Lyderik Larsen --- pkg/provider/metric_store.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/provider/metric_store.go b/pkg/provider/metric_store.go index bd13cc7..290a090 100644 --- a/pkg/provider/metric_store.go +++ b/pkg/provider/metric_store.go @@ -140,7 +140,7 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) { labelsKey = hashLabelMap(selector.MatchLabels) } - metric := metricName(strings.ToLower(value.Metric.Name)) + metric := metricName(value.Metric.Name) namespace := objectNamespace(value.DescribedObject.Namespace) object := objectName(value.DescribedObject.Name) @@ -203,7 +203,7 @@ func (s *MetricStore) insertExternalMetric(namespace objectNamespace, metric ext labelsKey := hashLabelMap(metric.MetricLabels) - metricName := metricName(strings.ToLower(metric.MetricName)) + metricName := metricName(metric.MetricName) if metrics, ok := s.externalMetricsStore[namespace]; ok { if labels, ok := metrics[metricName]; ok { @@ -259,7 +259,7 @@ func (s *MetricStore) GetMetricsBySelector(_ context.Context, namespace objectNa s.RLock() defer s.RUnlock() - group2namespace, ok := s.customMetricsStore[metricName(strings.ToLower(info.Metric))] + group2namespace, ok := s.customMetricsStore[metricName(info.Metric)] if !ok { return &custom_metrics.MetricValueList{} } @@ -368,7 +368,7 @@ func (s *MetricStore) GetExternalMetric(_ context.Context, namespace objectNames defer s.RUnlock() if metrics, ok := s.externalMetricsStore[namespace]; ok { - if selectors, ok := metrics[metricName(strings.ToLower(info.Metric))]; ok { + if selectors, ok := metrics[metricName(info.Metric)]; ok { for _, sel := range selectors { if selector.Matches(labels.Set(sel.Value.MetricLabels)) { matchedMetrics = append(matchedMetrics, sel.Value) From e16119f821d58e85eaef045f9c83a565a379a2c5 Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Tue, 1 Apr 2025 14:30:12 +0200 Subject: [PATCH 3/6] Update README.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Paŭlo Ebermann Signed-off-by: Mikkel Oscar Lyderik Larsen --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4b7c720..cf90cbb 100644 --- a/README.md +++ b/README.md @@ -825,7 +825,7 @@ metrics: ``` This is useful in dynamic environments where the subscription ID might not be -known at deployment time. +known before deployment time (e.g. because it's created by the same deployment). ## HTTP Collector From a1d90533e870bc42a2121ec3fbcfabf74e0baacd Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Tue, 15 Apr 2025 10:40:26 +0200 Subject: [PATCH 4/6] Address PR comments Signed-off-by: Mikkel Oscar Lyderik Larsen --- pkg/collector/nakadi_collector.go | 5 +++-- pkg/nakadi/nakadi.go | 22 ++++++++++++---------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/pkg/collector/nakadi_collector.go b/pkg/collector/nakadi_collector.go index 0e71e6a..13d2db6 100644 --- a/pkg/collector/nakadi_collector.go +++ b/pkg/collector/nakadi_collector.go @@ -73,7 +73,8 @@ func NewNakadiCollector(_ context.Context, nakadiClient nakadi.Nakadi, hpa *auto // Either subscription-id or filtering via owning-application, // event-types, and consumer-group is supported. If all are defined - // then subscription-id is used. + // then only subscription-id is used and the rest of the fields are + // ignored. subscriptionFilter := &nakadi.SubscriptionFilter{} if subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]; ok { subscriptionFilter.SubscriptionID = subscriptionID @@ -93,7 +94,7 @@ func NewNakadiCollector(_ context.Context, nakadiClient nakadi.Nakadi, hpa *auto } if subscriptionFilter.SubscriptionID == "" && (subscriptionFilter.OwningApplication == "" && len(subscriptionFilter.EventTypes) == 0 && subscriptionFilter.ConsumerGroup == "") { - return nil, fmt.Errorf("subscription-id or owning-application, event-types, and consumer-group must be specified on the metric") + return nil, fmt.Errorf("either subscription-id or one of [owning-application, event-types, consumer-group] must be specified on the metric") } return &NakadiCollector{ diff --git a/pkg/nakadi/nakadi.go b/pkg/nakadi/nakadi.go index 5f7e57f..b00a517 100644 --- a/pkg/nakadi/nakadi.go +++ b/pkg/nakadi/nakadi.go @@ -78,7 +78,7 @@ func (c *Client) subscriptions(ctx context.Context, filter *SubscriptionFilter, if href != "" { endpoint, err = url.Parse(c.nakadiEndpoint + href) if err != nil { - return nil, fmt.Errorf("failed to parse URL with href: %w", err) + return nil, fmt.Errorf("[nakadi subscriptions] failed to parse URL with href: %w", err) } } else { endpoint.Path = "/subscriptions" @@ -97,12 +97,12 @@ func (c *Client) subscriptions(ctx context.Context, filter *SubscriptionFilter, req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil) if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) + return nil, fmt.Errorf("[nakadi subscriptions] failed to create request: %w", err) } resp, err := c.http.Do(req) if err != nil { - return nil, fmt.Errorf("failed to make request: %w", err) + return nil, fmt.Errorf("[nakadi subscriptions] failed to make request: %w", err) } defer resp.Body.Close() @@ -138,7 +138,7 @@ func (c *Client) subscriptions(ctx context.Context, filter *SubscriptionFilter, if subscriptionsResp.Links.Next.Href != "" { nextSubscriptions, err := c.subscriptions(ctx, nil, subscriptionsResp.Links.Next.Href) if err != nil { - return nil, err + return nil, fmt.Errorf("[nakadi subscriptions] failed to get next subscriptions: %w", err) } subscriptions = append(subscriptions, nextSubscriptions...) } @@ -164,7 +164,9 @@ type statsPartition struct { AssignmentType string `json:"assignment_type"` } -// stats returns the Nakadi stats for a given subscription ID. +// stats returns the Nakadi stats for a given a subscription filter which can +// include the subscription ID or a filter combination of [owning-applicaiton, +// event-types, consumer-group].. // // https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]statsEventType, error) { @@ -172,7 +174,7 @@ func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]stats if filter.SubscriptionID == "" { subscriptions, err := c.subscriptions(ctx, filter, "") if err != nil { - return nil, err + return nil, fmt.Errorf("[nakadi stats] failed to get subscriptions: %w", err) } subscriptionIDs = subscriptions } else { @@ -181,7 +183,7 @@ func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]stats endpoint, err := url.Parse(c.nakadiEndpoint) if err != nil { - return nil, err + return nil, fmt.Errorf("[nakadi stats] failed to parse URL %q: %w", c.nakadiEndpoint, err) } var stats []statsEventType @@ -194,12 +196,12 @@ func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]stats req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil) if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) + return nil, fmt.Errorf("[nakadi stats] failed to create request: %w", err) } resp, err := c.http.Do(req) if err != nil { - return nil, fmt.Errorf("failed to make request: %w", err) + return nil, fmt.Errorf("[nakadi stats] failed to make request: %w", err) } defer resp.Body.Close() @@ -219,7 +221,7 @@ func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]stats } if len(result.Items) == 0 { - return nil, errors.New("expected at least 1 event-type, 0 returned") + return nil, errors.New("[nakadi stats] expected at least 1 event-type, 0 returned") } stats = append(stats, result.Items...) From f0a07e0c647fd3db0f171cfcfe18c4a1cc878dbf Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Tue, 15 Apr 2025 10:59:33 +0200 Subject: [PATCH 5/6] Update pkg/collector/nakadi_collector.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Paŭlo Ebermann Signed-off-by: Mikkel Oscar Lyderik Larsen --- pkg/collector/nakadi_collector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/collector/nakadi_collector.go b/pkg/collector/nakadi_collector.go index 13d2db6..12c27a5 100644 --- a/pkg/collector/nakadi_collector.go +++ b/pkg/collector/nakadi_collector.go @@ -93,8 +93,8 @@ func NewNakadiCollector(_ context.Context, nakadiClient nakadi.Nakadi, hpa *auto subscriptionFilter.ConsumerGroup = consumerGroup } - if subscriptionFilter.SubscriptionID == "" && (subscriptionFilter.OwningApplication == "" && len(subscriptionFilter.EventTypes) == 0 && subscriptionFilter.ConsumerGroup == "") { - return nil, fmt.Errorf("either subscription-id or one of [owning-application, event-types, consumer-group] must be specified on the metric") + if subscriptionFilter.SubscriptionID == "" && (subscriptionFilter.OwningApplication == "" || len(subscriptionFilter.EventTypes) == 0 || subscriptionFilter.ConsumerGroup == "") { + return nil, fmt.Errorf("either subscription-id or all of [owning-application, event-types, consumer-group] must be specified on the metric") } return &NakadiCollector{ From 5418c64e8f43e35691aacde578a3fb2b57be2a20 Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Tue, 6 May 2025 14:24:27 +0200 Subject: [PATCH 6/6] Fix error string in test Signed-off-by: Mikkel Oscar Lyderik Larsen --- pkg/nakadi/nakadi_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/nakadi/nakadi_test.go b/pkg/nakadi/nakadi_test.go index 0337db3..01e2f7c 100644 --- a/pkg/nakadi/nakadi_test.go +++ b/pkg/nakadi/nakadi_test.go @@ -142,7 +142,7 @@ func TestQuery(tt *testing.T) { subscriptionIDResponseBody: `{ "items": [] }`, - err: errors.New("expected at least 1 event-type, 0 returned"), + err: errors.New("[nakadi stats] expected at least 1 event-type, 0 returned"), }, { msg: "test filtering by owning_application and event_type",