Add support for scaling based on Nakadi

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
This commit is contained in:
Mikkel Oscar Lyderik Larsen
2023-11-15 17:23:07 +01:00
parent 45a09c12a0
commit 4aeebc853c
5 changed files with 503 additions and 1 deletions
+120
View File
@@ -0,0 +1,120 @@
package collector
import (
"context"
"fmt"
"time"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
// NakadiMetricType defines the metric type for metrics based on Nakadi
// subscriptions.
NakadiMetricType = "nakadi"
nakadiSubscriptionIDKey = "subscription-id"
nakadiMetricTypeKey = "metric-type"
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
)
// NakadiCollectorPlugin defines a plugin for creating collectors that can get
// unconsumed events from Nakadi.
type NakadiCollectorPlugin struct {
nakadi nakadi.Nakadi
}
// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin.
func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) {
return &NakadiCollectorPlugin{
nakadi: nakadi,
}, nil
}
// NewCollector initializes a new Nakadi collector from the specified HPA.
func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewNakadiCollector(c.nakadi, hpa, config, interval)
}
// NakadiCollector defines a collector that is able to collect metrics from
// Nakadi.
type NakadiCollector struct {
nakadi nakadi.Nakadi
interval time.Duration
subscriptionID string
nakadiMetricType string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
namespace string
}
// NewNakadiCollector initializes a new NakadiCollector.
func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for nakadi is not specified")
}
subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
if !ok {
return nil, fmt.Errorf("subscription-id not specified on metric")
}
metricType, ok := config.Config[nakadiMetricTypeKey]
if !ok {
return nil, fmt.Errorf("metric-type not specified on metric")
}
if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents {
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
}
return &NakadiCollector{
nakadi: nakadi,
interval: interval,
subscriptionID: subscriptionID,
nakadiMetricType: metricType,
metric: config.Metric,
metricType: config.Type,
namespace: hpa.Namespace,
}, nil
}
// GetMetrics returns a list of collected metrics for the Nakadi subscription ID.
func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) {
var value int64
var err error
switch c.nakadiMetricType {
case nakadiMetricTypeConsumerLagSeconds:
value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID)
if err != nil {
return nil, err
}
case nakadiMetricTypeUnconsumedEvents:
value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID)
if err != nil {
return nil, err
}
}
metricValue := CollectedMetric{
Namespace: c.namespace,
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Now(),
Value: *resource.NewQuantity(value, resource.DecimalSI),
},
}
return []CollectedMetric{metricValue}, nil
}
// Interval returns the interval at which the collector should run.
func (c *NakadiCollector) Interval() time.Duration {
return c.interval
}
+124
View File
@@ -0,0 +1,124 @@
package nakadi
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
)
// Nakadi defines an interface for talking to the Nakadi API.
type Nakadi interface {
ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error)
UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error)
}
// Client defines client for interfacing with the Nakadi API.
type Client struct {
nakadiEndpoint string
http *http.Client
}
// NewNakadiClient initializes a new Nakadi Client.
func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client {
return &Client{
nakadiEndpoint: nakadiEndpoint,
http: client,
}
}
func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) {
stats, err := c.stats(ctx, subscriptionID)
if err != nil {
return 0, err
}
var maxConsumerLagSeconds int64
for _, eventType := range stats {
for _, partition := range eventType.Partitions {
maxConsumerLagSeconds = max(maxConsumerLagSeconds, partition.ConsumerLagSeconds)
}
}
return maxConsumerLagSeconds, nil
}
func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) {
stats, err := c.stats(ctx, subscriptionID)
if err != nil {
return 0, err
}
var unconsumedEvents int64
for _, eventType := range stats {
for _, partition := range eventType.Partitions {
unconsumedEvents += partition.UnconsumedEvents
}
}
return unconsumedEvents, nil
}
type statsResp struct {
Items []statsEventType `json:"items"`
}
type statsEventType struct {
EventType string `json:"event_type"`
Partitions []statsPartition `json:"partitions"`
}
type statsPartition struct {
Partiton string `json:"partition"`
State string `json:"state"`
UnconsumedEvents int64 `json:"unconsumed_events"`
ConsumerLagSeconds int64 `json:"consumer_lag_seconds"`
StreamID string `json:"stream_id"`
AssignmentType string `json:"assignment_type"`
}
// stats returns the Nakadi stats for a given subscription ID.
//
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get
func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) {
endpoint, err := url.Parse(c.nakadiEndpoint)
if err != nil {
return nil, err
}
endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)
q := endpoint.Query()
q.Set("show_time_lag", "true")
endpoint.RawQuery = q.Encode()
resp, err := c.http.Get(endpoint.String())
if err != nil {
return nil, err
}
defer resp.Body.Close()
d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
}
var result statsResp
err = json.Unmarshal(d, &result)
if err != nil {
return nil, err
}
if len(result.Items) == 0 {
return nil, errors.New("expected at least 1 event-type, 0 returned")
}
return result.Items, nil
}
+141
View File
@@ -0,0 +1,141 @@
package nakadi
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
)
func TestQuery(tt *testing.T) {
client := &http.Client{}
for _, ti := range []struct {
msg string
status int
responseBody string
err error
unconsumedEvents int64
consumerLagSeconds int64
}{
{
msg: "test getting a single event-type",
status: http.StatusOK,
responseBody: `{
"items": [
{
"event_type": "example-event",
"partitions": [
{
"partition": "0",
"state": "assigned",
"unconsumed_events": 4,
"consumer_lag_seconds": 2,
"stream_id": "example-id",
"assignment_type": "auto"
},
{
"partition": "0",
"state": "assigned",
"unconsumed_events": 5,
"consumer_lag_seconds": 1,
"stream_id": "example-id",
"assignment_type": "auto"
}
]
}
]
}`,
unconsumedEvents: 9,
consumerLagSeconds: 2,
},
{
msg: "test getting multiple event-types",
status: http.StatusOK,
responseBody: `{
"items": [
{
"event_type": "example-event",
"partitions": [
{
"partition": "0",
"state": "assigned",
"unconsumed_events": 4,
"consumer_lag_seconds": 2,
"stream_id": "example-id",
"assignment_type": "auto"
},
{
"partition": "0",
"state": "assigned",
"unconsumed_events": 5,
"consumer_lag_seconds": 1,
"stream_id": "example-id",
"assignment_type": "auto"
}
]
},
{
"event_type": "example-event-2",
"partitions": [
{
"partition": "0",
"state": "assigned",
"unconsumed_events": 4,
"consumer_lag_seconds": 6,
"stream_id": "example-id",
"assignment_type": "auto"
},
{
"partition": "0",
"state": "assigned",
"unconsumed_events": 5,
"consumer_lag_seconds": 1,
"stream_id": "example-id",
"assignment_type": "auto"
}
]
}
]
}`,
unconsumedEvents: 18,
consumerLagSeconds: 6,
},
{
msg: "test call with invalid response",
status: http.StatusInternalServerError,
responseBody: `{"error": 500}`,
err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"),
},
{
msg: "test getting back a single data point",
status: http.StatusOK,
responseBody: `{
"items": []
}`,
err: errors.New("expected at least 1 event-type, 0 returned"),
},
} {
tt.Run(ti.msg, func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(ti.status)
_, err := w.Write([]byte(ti.responseBody))
assert.NoError(t, err)
}),
)
defer ts.Close()
nakadiClient := NewNakadiClient(ts.URL, client)
consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id")
assert.Equal(t, ti.err, err)
assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds)
unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id")
assert.Equal(t, ti.err, err)
assert.Equal(t, ti.unconsumedEvents, unconsumedEvents)
})
}
}
+32 -1
View File
@@ -35,6 +35,7 @@ import (
"github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/controller/scheduledscaling"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/provider"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
"golang.org/x/oauth2"
@@ -64,6 +65,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
EnableExternalMetricsAPI: true,
MetricsAddress: ":7979",
ZMONTokenName: "zmon",
NakadiTokenName: "nakadi",
CredentialsDir: "/meta/credentials",
ExternalRPSMetricName: "skipper_serve_host_duration_seconds_count",
}
@@ -110,8 +112,12 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"url of ZMON KariosDB endpoint to query for ZMON checks")
flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+
"name of the token used to query ZMON")
flags.StringVar(&o.NakadiEndpoint, "nakadi-endpoint", o.NakadiEndpoint, ""+
"url of Nakadi endpoint to for nakadi subscription stats")
flags.StringVar(&o.NakadiTokenName, "nakadi-token-name", o.NakadiTokenName, ""+
"name of the token used to call nakadi subscription API")
flags.StringVar(&o.Token, "token", o.Token, ""+
"static oauth2 token to use when calling external services like ZMON")
"static oauth2 token to use when calling external services like ZMON and Nakadi")
flags.StringVar(&o.CredentialsDir, "credentials-dir", o.CredentialsDir, ""+
"path to the credentials dir where tokens are stored")
flags.BoolVar(&o.SkipperIngressMetrics, "skipper-ingress-metrics", o.SkipperIngressMetrics, ""+
@@ -274,6 +280,27 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
collectorFactory.RegisterExternalCollector([]string{collector.ZMONMetricType, collector.ZMONCheckMetricLegacy}, zmonPlugin)
}
// enable Nakadi based metrics
if o.NakadiEndpoint != "" {
var tokenSource oauth2.TokenSource
if o.Token != "" {
tokenSource = oauth2.StaticTokenSource(&oauth2.Token{AccessToken: o.Token})
} else {
tokenSource = platformiam.NewTokenSource(o.NakadiTokenName, o.CredentialsDir)
}
httpClient := newOauth2HTTPClient(ctx, tokenSource)
nakadiClient := nakadi.NewNakadiClient(o.NakadiEndpoint, httpClient)
nakadiPlugin, err := collector.NewNakadiCollectorPlugin(nakadiClient)
if err != nil {
return fmt.Errorf("failed to initialize Nakadi collector plugin: %v", err)
}
collectorFactory.RegisterExternalCollector([]string{collector.NakadiMetricType}, nakadiPlugin)
}
awsSessions := make(map[string]*session.Session, len(o.AWSRegions))
for _, region := range o.AWSRegions {
awsSessions[region], err = session.NewSessionWithOptions(session.Options{
@@ -427,6 +454,10 @@ type AdapterServerOptions struct {
ZMONKariosDBEndpoint string
// ZMONTokenName is the name of the token used to query ZMON
ZMONTokenName string
// NakadiEndpoint enables Nakadi metrics from the specified endpoint
NakadiEndpoint string
// NakadiTokenName is the name of the token used to call Nakadi
NakadiTokenName string
// Token is an oauth2 token used to authenticate with services like
// ZMON.
Token string