Add support for filtering nakadi subscriptions

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
This commit is contained in:
Mikkel Oscar Lyderik Larsen
2025-03-27 13:41:13 +01:00
parent 763ba23fd9
commit 625a602f42
5 changed files with 323 additions and 94 deletions

View File

@ -751,12 +751,12 @@ spec:
- type: External
external:
metric:
name: my-nakadi-consumer
selector:
matchLabels:
type: nakadi
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
metric-type: "consumer-lag-seconds|unconsumed-events"
name: my-nakadi-consumer
selector:
matchLabels:
type: nakadi
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
metric-type: "consumer-lag-seconds|unconsumed-events"
target:
# value is compatible with the consumer-lag-seconds metric type.
# It describes the amount of consumer lag in seconds before scaling
@ -805,6 +805,27 @@ with more consumers.
For this case you should also account for the average time for processing an
event when defining the target.
Alternative to defining `subscription-id` you can also filter based on
`owning_application`, `event-types` and `consumer-group`:
```yaml
metrics:
- type: External
external:
metric:
name: my-nakadi-consumer
selector:
matchLabels:
type: nakadi
owning-application: "example-app"
# comma separated list of event types
event-types: "example-event-type,example-event-type2"
consumer-group: "abcd1234"
metric-type: "consumer-lag-seconds|unconsumed-events"
```
This is useful in dynamic environments where the subscription ID might not be
known at deployment time.
## HTTP Collector

View File

@ -3,6 +3,7 @@ package collector
import (
"context"
"fmt"
"strings"
"time"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
@ -17,6 +18,9 @@ const (
// subscriptions.
NakadiMetricType = "nakadi"
nakadiSubscriptionIDKey = "subscription-id"
nakadiOwningApplicationKey = "owning-application"
nakadiConsumerGroupKey = "consumer-group"
nakadiEventTypesKey = "event-types"
nakadiMetricTypeKey = "metric-type"
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
@ -43,26 +47,21 @@ func (c *NakadiCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscali
// NakadiCollector defines a collector that is able to collect metrics from
// Nakadi.
type NakadiCollector struct {
nakadi nakadi.Nakadi
interval time.Duration
subscriptionID string
nakadiMetricType string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
namespace string
nakadi nakadi.Nakadi
interval time.Duration
subscriptionFilter *nakadi.SubscriptionFilter
nakadiMetricType string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
namespace string
}
// NewNakadiCollector initializes a new NakadiCollector.
func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
func NewNakadiCollector(_ context.Context, nakadiClient nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for nakadi is not specified")
}
subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
if !ok {
return nil, fmt.Errorf("subscription-id not specified on metric")
}
metricType, ok := config.Config[nakadiMetricTypeKey]
if !ok {
return nil, fmt.Errorf("metric-type not specified on metric")
@ -72,14 +71,39 @@ func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalin
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
}
// Either subscription-id or filtering via owning-application,
// event-types, and consumer-group is supported. If all are defined
// then subscription-id is used.
subscriptionFilter := &nakadi.SubscriptionFilter{}
if subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]; ok {
subscriptionFilter.SubscriptionID = subscriptionID
}
if owningApplication, ok := config.Config[nakadiOwningApplicationKey]; ok {
subscriptionFilter.OwningApplication = owningApplication
}
if nakadiEventTypes, ok := config.Config[nakadiEventTypesKey]; ok {
eventTypes := strings.Split(nakadiEventTypes, ",")
subscriptionFilter.EventTypes = eventTypes
}
if consumerGroup, ok := config.Config[nakadiConsumerGroupKey]; ok {
subscriptionFilter.ConsumerGroup = consumerGroup
}
if subscriptionFilter.SubscriptionID == "" && (subscriptionFilter.OwningApplication == "" && len(subscriptionFilter.EventTypes) == 0 && subscriptionFilter.ConsumerGroup == "") {
return nil, fmt.Errorf("subscription-id or owning-application, event-types, and consumer-group must be specified on the metric")
}
return &NakadiCollector{
nakadi: nakadi,
interval: interval,
subscriptionID: subscriptionID,
nakadiMetricType: metricType,
metric: config.Metric,
metricType: config.Type,
namespace: hpa.Namespace,
nakadi: nakadiClient,
interval: interval,
subscriptionFilter: subscriptionFilter,
nakadiMetricType: metricType,
metric: config.Metric,
metricType: config.Type,
namespace: hpa.Namespace,
}, nil
}
@ -89,12 +113,12 @@ func (c *NakadiCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, er
var err error
switch c.nakadiMetricType {
case nakadiMetricTypeConsumerLagSeconds:
value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionID)
value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionFilter)
if err != nil {
return nil, err
}
case nakadiMetricTypeUnconsumedEvents:
value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionID)
value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionFilter)
if err != nil {
return nil, err
}

View File

@ -12,8 +12,8 @@ import (
// Nakadi defines an interface for talking to the Nakadi API.
type Nakadi interface {
ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error)
UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error)
ConsumerLagSeconds(ctx context.Context, filter *SubscriptionFilter) (int64, error)
UnconsumedEvents(ctx context.Context, filter *SubscriptionFilter) (int64, error)
}
// Client defines client for interfacing with the Nakadi API.
@ -30,8 +30,8 @@ func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client {
}
}
func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) {
stats, err := c.stats(ctx, subscriptionID)
func (c *Client) ConsumerLagSeconds(ctx context.Context, filter *SubscriptionFilter) (int64, error) {
stats, err := c.stats(ctx, filter)
if err != nil {
return 0, err
}
@ -46,8 +46,8 @@ func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string)
return maxConsumerLagSeconds, nil
}
func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) {
stats, err := c.stats(ctx, subscriptionID)
func (c *Client) UnconsumedEvents(ctx context.Context, filter *SubscriptionFilter) (int64, error) {
stats, err := c.stats(ctx, filter)
if err != nil {
return 0, err
}
@ -62,6 +62,90 @@ func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (i
return unconsumedEvents, nil
}
type SubscriptionFilter struct {
SubscriptionID string
OwningApplication string
EventTypes []string
ConsumerGroup string
}
func (c *Client) subscriptions(ctx context.Context, filter *SubscriptionFilter, href string) ([]string, error) {
endpoint, err := url.Parse(c.nakadiEndpoint)
if err != nil {
return nil, err
}
if href != "" {
endpoint, err = url.Parse(c.nakadiEndpoint + href)
if err != nil {
return nil, fmt.Errorf("failed to parse URL with href: %w", err)
}
} else {
endpoint.Path = "/subscriptions"
q := endpoint.Query()
if filter.OwningApplication != "" {
q.Set("owning_application", filter.OwningApplication)
}
for _, eventType := range filter.EventTypes {
q.Add("event_type", eventType)
}
if filter.ConsumerGroup != "" {
q.Set("consumer_group", filter.ConsumerGroup)
}
endpoint.RawQuery = q.Encode()
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer resp.Body.Close()
d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("[nakadi subscriptions] unexpected response code: %d (%s)", resp.StatusCode, string(d))
}
var subscriptionsResp struct {
Items []struct {
ID string `json:"id"`
}
Links struct {
Next struct {
Href string `json:"href"`
} `json:"next"`
} `json:"_links"`
}
err = json.Unmarshal(d, &subscriptionsResp)
if err != nil {
return nil, err
}
var subscriptions []string
for _, item := range subscriptionsResp.Items {
subscriptions = append(subscriptions, item.ID)
}
if subscriptionsResp.Links.Next.Href != "" {
nextSubscriptions, err := c.subscriptions(ctx, nil, subscriptionsResp.Links.Next.Href)
if err != nil {
return nil, err
}
subscriptions = append(subscriptions, nextSubscriptions...)
}
return subscriptions, nil
}
type statsResp struct {
Items []statsEventType `json:"items"`
}
@ -83,42 +167,63 @@ type statsPartition struct {
// stats returns the Nakadi stats for a given subscription ID.
//
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get
func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) {
func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]statsEventType, error) {
var subscriptionIDs []string
if filter.SubscriptionID == "" {
subscriptions, err := c.subscriptions(ctx, filter, "")
if err != nil {
return nil, err
}
subscriptionIDs = subscriptions
} else {
subscriptionIDs = []string{filter.SubscriptionID}
}
endpoint, err := url.Parse(c.nakadiEndpoint)
if err != nil {
return nil, err
}
endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)
var stats []statsEventType
for _, subscriptionID := range subscriptionIDs {
endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)
q := endpoint.Query()
q.Set("show_time_lag", "true")
endpoint.RawQuery = q.Encode()
q := endpoint.Query()
q.Set("show_time_lag", "true")
endpoint.RawQuery = q.Encode()
resp, err := c.http.Get(endpoint.String())
if err != nil {
return nil, err
}
defer resp.Body.Close()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer resp.Body.Close()
d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
}
var result statsResp
err = json.Unmarshal(d, &result)
if err != nil {
return nil, err
}
if len(result.Items) == 0 {
return nil, errors.New("expected at least 1 event-type, 0 returned")
}
stats = append(stats, result.Items...)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
}
var result statsResp
err = json.Unmarshal(d, &result)
if err != nil {
return nil, err
}
if len(result.Items) == 0 {
return nil, errors.New("expected at least 1 event-type, 0 returned")
}
return result.Items, nil
return stats, nil
}

View File

@ -12,18 +12,42 @@ import (
func TestQuery(tt *testing.T) {
client := &http.Client{}
subscriptionsResponseBody := `{
"items": [
{
"id": "id_1"
},
{
"id": "id_2"
}
],
"_links": {
"next": {
"href": "/subscriptions?event_type=example-event&owning_application=example-app&offset=20&limit=20"
}
}
}`
subscriptionsResponseBodyNoNext := `{
"items": [],
"_links": {}
}`
for _, ti := range []struct {
msg string
status int
responseBody string
err error
unconsumedEvents int64
consumerLagSeconds int64
msg string
status int
subscriptionIDResponseBody string
subscriptionFilter *SubscriptionFilter
err error
unconsumedEvents int64
consumerLagSeconds int64
}{
{
msg: "test getting a single event-type",
status: http.StatusOK,
responseBody: `{
msg: "test getting a single event-type",
status: http.StatusOK,
subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"},
subscriptionIDResponseBody: `{
"items": [
{
"event_type": "example-event",
@ -52,9 +76,10 @@ func TestQuery(tt *testing.T) {
consumerLagSeconds: 2,
},
{
msg: "test getting multiple event-types",
status: http.StatusOK,
responseBody: `{
msg: "test getting multiple event-types",
status: http.StatusOK,
subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"},
subscriptionIDResponseBody: `{
"items": [
{
"event_type": "example-event",
@ -104,38 +129,92 @@ func TestQuery(tt *testing.T) {
consumerLagSeconds: 6,
},
{
msg: "test call with invalid response",
status: http.StatusInternalServerError,
responseBody: `{"error": 500}`,
err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"),
msg: "test call with invalid response",
status: http.StatusInternalServerError,
subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"},
subscriptionIDResponseBody: `{"error": 500}`,
err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"),
},
{
msg: "test getting back a single data point",
status: http.StatusOK,
responseBody: `{
msg: "test getting back no data points",
status: http.StatusOK,
subscriptionFilter: &SubscriptionFilter{SubscriptionID: "id"},
subscriptionIDResponseBody: `{
"items": []
}`,
err: errors.New("expected at least 1 event-type, 0 returned"),
},
{
msg: "test filtering by owning_application and event_type",
status: http.StatusOK,
subscriptionFilter: &SubscriptionFilter{OwningApplication: "example-app", EventTypes: []string{"example-event"}, ConsumerGroup: "example-group"},
subscriptionIDResponseBody: `{
"items": [
{
"event_type": "example-event",
"partitions": [
{
"partition": "0",
"state": "assigned",
"unconsumed_events": 4,
"consumer_lag_seconds": 2,
"stream_id": "example-id",
"assignment_type": "auto"
},
{
"partition": "0",
"state": "assigned",
"unconsumed_events": 5,
"consumer_lag_seconds": 1,
"stream_id": "example-id",
"assignment_type": "auto"
}
]
}
]
}`,
unconsumedEvents: 18,
consumerLagSeconds: 2,
},
} {
tt.Run(ti.msg, func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(ti.status)
_, err := w.Write([]byte(ti.responseBody))
mux := http.NewServeMux()
mux.HandleFunc("/subscriptions", func(w http.ResponseWriter, r *http.Request) {
offset := r.URL.Query().Get("offset")
if offset != "" {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(subscriptionsResponseBodyNoNext))
assert.NoError(t, err)
}),
)
return
}
owningApplication := r.URL.Query().Get("owning_application")
eventTypes := r.URL.Query()["event_type"]
consumerGroup := r.URL.Query().Get("consumer_group")
assert.Equal(t, ti.subscriptionFilter.OwningApplication, owningApplication)
assert.Equal(t, ti.subscriptionFilter.EventTypes, eventTypes)
assert.Equal(t, ti.subscriptionFilter.ConsumerGroup, consumerGroup)
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(subscriptionsResponseBody))
assert.NoError(t, err)
})
mux.HandleFunc("/subscriptions/{id}/stats", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(ti.status)
_, err := w.Write([]byte(ti.subscriptionIDResponseBody))
assert.NoError(t, err)
})
ts := httptest.NewServer(mux)
defer ts.Close()
nakadiClient := NewNakadiClient(ts.URL, client)
consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id")
consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), ti.subscriptionFilter)
assert.Equal(t, ti.err, err)
assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds)
unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id")
unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), ti.subscriptionFilter)
assert.Equal(t, ti.err, err)
assert.Equal(t, ti.unconsumedEvents, unconsumedEvents)
})
}
}

View File

@ -140,7 +140,7 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) {
labelsKey = hashLabelMap(selector.MatchLabels)
}
metric := metricName(value.Metric.Name)
metric := metricName(strings.ToLower(value.Metric.Name))
namespace := objectNamespace(value.DescribedObject.Namespace)
object := objectName(value.DescribedObject.Name)
@ -203,7 +203,7 @@ func (s *MetricStore) insertExternalMetric(namespace objectNamespace, metric ext
labelsKey := hashLabelMap(metric.MetricLabels)
metricName := metricName(metric.MetricName)
metricName := metricName(strings.ToLower(metric.MetricName))
if metrics, ok := s.externalMetricsStore[namespace]; ok {
if labels, ok := metrics[metricName]; ok {
@ -259,7 +259,7 @@ func (s *MetricStore) GetMetricsBySelector(_ context.Context, namespace objectNa
s.RLock()
defer s.RUnlock()
group2namespace, ok := s.customMetricsStore[metricName(info.Metric)]
group2namespace, ok := s.customMetricsStore[metricName(strings.ToLower(info.Metric))]
if !ok {
return &custom_metrics.MetricValueList{}
}
@ -368,7 +368,7 @@ func (s *MetricStore) GetExternalMetric(_ context.Context, namespace objectNames
defer s.RUnlock()
if metrics, ok := s.externalMetricsStore[namespace]; ok {
if selectors, ok := metrics[metricName(info.Metric)]; ok {
if selectors, ok := metrics[metricName(strings.ToLower(info.Metric))]; ok {
for _, sel := range selectors {
if selector.Matches(labels.Set(sel.Value.MetricLabels)) {
matchedMetrics = append(matchedMetrics, sel.Value)