mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2024-12-22 11:06:04 +00:00
5a543781d7
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
121 lines
3.8 KiB
Go
121 lines
3.8 KiB
Go
package collector
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
|
|
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/metrics/pkg/apis/external_metrics"
|
|
)
|
|
|
|
const (
|
|
// NakadiMetricType defines the metric type for metrics based on Nakadi
|
|
// subscriptions.
|
|
NakadiMetricType = "nakadi"
|
|
nakadiSubscriptionIDKey = "subscription-id"
|
|
nakadiMetricTypeKey = "metric-type"
|
|
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
|
|
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
|
|
)
|
|
|
|
// NakadiCollectorPlugin defines a plugin for creating collectors that can get
|
|
// unconsumed events from Nakadi.
|
|
type NakadiCollectorPlugin struct {
|
|
nakadi nakadi.Nakadi
|
|
}
|
|
|
|
// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin.
|
|
func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) {
|
|
return &NakadiCollectorPlugin{
|
|
nakadi: nakadi,
|
|
}, nil
|
|
}
|
|
|
|
// NewCollector initializes a new Nakadi collector from the specified HPA.
|
|
func (c *NakadiCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
|
return NewNakadiCollector(ctx, c.nakadi, hpa, config, interval)
|
|
}
|
|
|
|
// NakadiCollector defines a collector that is able to collect metrics from
|
|
// Nakadi.
|
|
type NakadiCollector struct {
|
|
nakadi nakadi.Nakadi
|
|
interval time.Duration
|
|
subscriptionID string
|
|
nakadiMetricType string
|
|
metric autoscalingv2.MetricIdentifier
|
|
metricType autoscalingv2.MetricSourceType
|
|
namespace string
|
|
}
|
|
|
|
// NewNakadiCollector initializes a new NakadiCollector.
|
|
func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
|
|
if config.Metric.Selector == nil {
|
|
return nil, fmt.Errorf("selector for nakadi is not specified")
|
|
}
|
|
|
|
subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
|
|
if !ok {
|
|
return nil, fmt.Errorf("subscription-id not specified on metric")
|
|
}
|
|
|
|
metricType, ok := config.Config[nakadiMetricTypeKey]
|
|
if !ok {
|
|
return nil, fmt.Errorf("metric-type not specified on metric")
|
|
}
|
|
|
|
if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents {
|
|
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
|
|
}
|
|
|
|
return &NakadiCollector{
|
|
nakadi: nakadi,
|
|
interval: interval,
|
|
subscriptionID: subscriptionID,
|
|
nakadiMetricType: metricType,
|
|
metric: config.Metric,
|
|
metricType: config.Type,
|
|
namespace: hpa.Namespace,
|
|
}, nil
|
|
}
|
|
|
|
// GetMetrics returns a list of collected metrics for the Nakadi subscription ID.
|
|
func (c *NakadiCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, error) {
|
|
var value int64
|
|
var err error
|
|
switch c.nakadiMetricType {
|
|
case nakadiMetricTypeConsumerLagSeconds:
|
|
value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
case nakadiMetricTypeUnconsumedEvents:
|
|
value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
metricValue := CollectedMetric{
|
|
Namespace: c.namespace,
|
|
Type: c.metricType,
|
|
External: external_metrics.ExternalMetricValue{
|
|
MetricName: c.metric.Name,
|
|
MetricLabels: c.metric.Selector.MatchLabels,
|
|
Timestamp: metav1.Now(),
|
|
Value: *resource.NewQuantity(value, resource.DecimalSI),
|
|
},
|
|
}
|
|
|
|
return []CollectedMetric{metricValue}, nil
|
|
}
|
|
|
|
// Interval returns the interval at which the collector should run.
|
|
func (c *NakadiCollector) Interval() time.Duration {
|
|
return c.interval
|
|
}
|