mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2024-12-22 19:16:06 +00:00
Namespace external metrics (#259)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
This commit is contained in:
parent
942e753f87
commit
b7aa886546
@ -33,7 +33,7 @@ func NewAWSCollectorPlugin(sessions map[string]*session.Session) *AWSCollectorPl
|
||||
|
||||
// NewCollector initializes a new skipper collector from the specified HPA.
|
||||
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewAWSSQSCollector(c.sessions, config, interval)
|
||||
return NewAWSSQSCollector(c.sessions, hpa, config, interval)
|
||||
}
|
||||
|
||||
type AWSSQSCollector struct {
|
||||
@ -42,11 +42,12 @@ type AWSSQSCollector struct {
|
||||
region string
|
||||
queueURL string
|
||||
queueName string
|
||||
namespace string
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
}
|
||||
|
||||
func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
|
||||
func NewAWSSQSCollector(sessions map[string]*session.Session, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for queue is not specified")
|
||||
}
|
||||
@ -80,6 +81,7 @@ func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConf
|
||||
interval: interval,
|
||||
queueURL: aws.StringValue(resp.QueueUrl),
|
||||
queueName: name,
|
||||
namespace: hpa.Namespace,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
}, nil
|
||||
@ -103,7 +105,8 @@ func (c *AWSSQSCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
}
|
||||
|
||||
metricValue := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Namespace: c.namespace,
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
|
@ -177,9 +177,10 @@ type MetricTypeName struct {
|
||||
}
|
||||
|
||||
type CollectedMetric struct {
|
||||
Type autoscalingv2.MetricSourceType
|
||||
Custom custom_metrics.MetricValue
|
||||
External external_metrics.ExternalMetricValue
|
||||
Type autoscalingv2.MetricSourceType
|
||||
Namespace string
|
||||
Custom custom_metrics.MetricValue
|
||||
External external_metrics.ExternalMetricValue
|
||||
}
|
||||
|
||||
type Collector interface {
|
||||
|
@ -27,8 +27,10 @@ func NewHTTPCollectorPlugin() (*HTTPCollectorPlugin, error) {
|
||||
return &HTTPCollectorPlugin{}, nil
|
||||
}
|
||||
|
||||
func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
collector := &HTTPCollector{}
|
||||
func (p *HTTPCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
collector := &HTTPCollector{
|
||||
namespace: hpa.Namespace,
|
||||
}
|
||||
var (
|
||||
value string
|
||||
ok bool
|
||||
@ -74,6 +76,7 @@ func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, c
|
||||
type HTTPCollector struct {
|
||||
endpoint *url.URL
|
||||
interval time.Duration
|
||||
namespace string
|
||||
metricType v2beta2.MetricSourceType
|
||||
metricsGetter *httpmetrics.JSONPathMetricsGetter
|
||||
metric v2beta2.MetricIdentifier
|
||||
@ -86,7 +89,8 @@ func (c *HTTPCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
}
|
||||
|
||||
value := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Namespace: c.namespace,
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
|
@ -8,6 +8,8 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
@ -60,7 +62,12 @@ func TestHTTPCollector(t *testing.T) {
|
||||
plugin, err := NewHTTPCollectorPlugin()
|
||||
require.NoError(t, err)
|
||||
testConfig := makeTestHTTPCollectorConfig(testServer, tc.aggregator)
|
||||
collector, err := plugin.NewCollector(nil, testConfig, testInterval)
|
||||
hpa := &autoscalingv2.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
},
|
||||
}
|
||||
collector, err := plugin.NewCollector(hpa, testConfig, testInterval)
|
||||
require.NoError(t, err)
|
||||
metrics, err := collector.GetMetrics()
|
||||
require.NoError(t, err)
|
||||
|
@ -40,7 +40,7 @@ func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org
|
||||
}
|
||||
|
||||
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewInfluxDBCollector(p.address, p.token, p.org, config, interval)
|
||||
return NewInfluxDBCollector(hpa, p.address, p.token, p.org, config, interval)
|
||||
}
|
||||
|
||||
type InfluxDBCollector struct {
|
||||
@ -53,13 +53,15 @@ type InfluxDBCollector struct {
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
query string
|
||||
namespace string
|
||||
}
|
||||
|
||||
func NewInfluxDBCollector(address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
|
||||
func NewInfluxDBCollector(hpa *v2beta2.HorizontalPodAutoscaler, address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
|
||||
collector := &InfluxDBCollector{
|
||||
interval: interval,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
namespace: hpa.Namespace,
|
||||
}
|
||||
switch configType := config.Type; configType {
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
@ -135,7 +137,8 @@ func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
return nil, err
|
||||
}
|
||||
cm := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Namespace: c.namespace,
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
|
@ -6,10 +6,17 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestInfluxDBCollector_New(t *testing.T) {
|
||||
hpa := &autoscalingv2.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
},
|
||||
}
|
||||
t.Run("simple", func(t *testing.T) {
|
||||
m := &MetricConfig{
|
||||
MetricTypeName: MetricTypeName{
|
||||
@ -32,7 +39,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
|
||||
"query-name": "range2m",
|
||||
},
|
||||
}
|
||||
c, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
|
||||
c, err := NewInfluxDBCollector(hpa, "http://localhost:9999", "secret", "deadbeef", m, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@ -73,7 +80,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
|
||||
"query-name": "range3m",
|
||||
},
|
||||
}
|
||||
c, err := NewInfluxDBCollector("http://localhost:8888", "secret", "deadbeef", m, time.Second)
|
||||
c, err := NewInfluxDBCollector(hpa, "http://localhost:8888", "secret", "deadbeef", m, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@ -143,7 +150,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
|
||||
CollectorType: "influxdb",
|
||||
Config: tc.config,
|
||||
}
|
||||
_, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
|
||||
_, err := NewInfluxDBCollector(hpa, "http://localhost:9999", "secret", "deadbeef", m, time.Second)
|
||||
if err == nil {
|
||||
t.Fatal("expected error got none")
|
||||
}
|
||||
|
@ -118,7 +118,8 @@ func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, err
|
||||
}
|
||||
|
||||
ch <- CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Namespace: c.namespace,
|
||||
Type: c.metricType,
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
APIVersion: "v1",
|
||||
|
@ -174,7 +174,8 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
switch c.metricType {
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
metricValue = CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Namespace: c.hpa.Namespace,
|
||||
Type: c.metricType,
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: c.objectReference,
|
||||
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.metric.Selector},
|
||||
@ -184,7 +185,8 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
}
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
metricValue = CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Namespace: c.hpa.Namespace,
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
|
@ -43,7 +43,7 @@ func NewZMONCollectorPlugin(zmon zmon.ZMON) (*ZMONCollectorPlugin, error) {
|
||||
|
||||
// NewCollector initializes a new ZMON collector from the specified HPA.
|
||||
func (c *ZMONCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewZMONCollector(c.zmon, config, interval)
|
||||
return NewZMONCollector(c.zmon, hpa, config, interval)
|
||||
}
|
||||
|
||||
// ZMONCollector defines a collector that is able to collect metrics from ZMON.
|
||||
@ -57,10 +57,11 @@ type ZMONCollector struct {
|
||||
aggregators []string
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
namespace string
|
||||
}
|
||||
|
||||
// NewZMONCollector initializes a new ZMONCollector.
|
||||
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, interval time.Duration) (*ZMONCollector, error) {
|
||||
func NewZMONCollector(zmon zmon.ZMON, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ZMONCollector, error) {
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for zmon-check is not specified")
|
||||
}
|
||||
@ -117,6 +118,7 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, interval time.Durati
|
||||
aggregators: aggregators,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
namespace: hpa.Namespace,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -136,7 +138,8 @@ func (c *ZMONCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
point := dataPoints[len(dataPoints)-1]
|
||||
|
||||
metricValue := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Namespace: c.namespace,
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
|
@ -96,7 +96,8 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
|
||||
},
|
||||
collectedMetrics: []CollectedMetric{
|
||||
{
|
||||
Type: config.Type,
|
||||
Namespace: "default",
|
||||
Type: config.Type,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: config.Metric.Name,
|
||||
MetricLabels: config.Metric.Selector.MatchLabels,
|
||||
@ -115,7 +116,13 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
|
||||
dataPoints: ti.dataPoints,
|
||||
}
|
||||
|
||||
zmonCollector, err := NewZMONCollector(z, config, 1*time.Second)
|
||||
hpa := &autoscalingv2.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
},
|
||||
}
|
||||
|
||||
zmonCollector, err := NewZMONCollector(z, hpa, config, 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics, _ := zmonCollector.GetMetrics()
|
||||
|
@ -129,6 +129,7 @@ func (p *HPAProvider) updateHPAs() error {
|
||||
newHPAs := 0
|
||||
|
||||
for _, hpa := range hpas.Items {
|
||||
hpa := *hpa.DeepCopy()
|
||||
resourceRef := resourceReference{
|
||||
Name: hpa.Name,
|
||||
Namespace: hpa.Namespace,
|
||||
@ -246,7 +247,8 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
value.Custom.DescribedObject.Name,
|
||||
)
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
p.logger.Infof("Collected new external metric '%s' (%s) [%s]",
|
||||
p.logger.Infof("Collected new external metric '%s/%s' (%s) [%s]",
|
||||
value.Namespace,
|
||||
value.External.MetricName,
|
||||
value.External.Value.String(),
|
||||
labels.Set(value.External.MetricLabels).String(),
|
||||
|
@ -31,8 +31,10 @@ type externalMetricsStoredMetric struct {
|
||||
|
||||
// MetricStore is a simple in-memory Metrics Store for HPA metrics.
|
||||
type MetricStore struct {
|
||||
customMetricsStore map[string]map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric
|
||||
externalMetricsStore map[string]map[string]externalMetricsStoredMetric
|
||||
// metricName -> referencedResource -> objectNamespace -> objectName -> metric
|
||||
customMetricsStore map[string]map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric
|
||||
// namespace -> metricName -> labels -> metric
|
||||
externalMetricsStore map[string]map[string]map[string]externalMetricsStoredMetric
|
||||
metricsTTLCalculator func() time.Time
|
||||
sync.RWMutex
|
||||
}
|
||||
@ -41,7 +43,7 @@ type MetricStore struct {
|
||||
func NewMetricStore(ttlCalculator func() time.Time) *MetricStore {
|
||||
return &MetricStore{
|
||||
customMetricsStore: make(map[string]map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric, 0),
|
||||
externalMetricsStore: make(map[string]map[string]externalMetricsStoredMetric, 0),
|
||||
externalMetricsStore: make(map[string]map[string]map[string]externalMetricsStoredMetric, 0),
|
||||
metricsTTLCalculator: ttlCalculator,
|
||||
}
|
||||
}
|
||||
@ -52,7 +54,7 @@ func (s *MetricStore) Insert(value collector.CollectedMetric) {
|
||||
case autoscalingv2.ObjectMetricSourceType, autoscalingv2.PodsMetricSourceType:
|
||||
s.insertCustomMetric(value.Custom)
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
s.insertExternalMetric(value.External)
|
||||
s.insertExternalMetric(value.Namespace, value.External)
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,7 +122,7 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) {
|
||||
}
|
||||
|
||||
// insertExternalMetric inserts an external metric into the store.
|
||||
func (s *MetricStore) insertExternalMetric(metric external_metrics.ExternalMetricValue) {
|
||||
func (s *MetricStore) insertExternalMetric(namespace string, metric external_metrics.ExternalMetricValue) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
@ -131,11 +133,19 @@ func (s *MetricStore) insertExternalMetric(metric external_metrics.ExternalMetri
|
||||
|
||||
labelsKey := hashLabelMap(metric.MetricLabels)
|
||||
|
||||
if metrics, ok := s.externalMetricsStore[metric.MetricName]; ok {
|
||||
metrics[labelsKey] = storedMetric
|
||||
if metrics, ok := s.externalMetricsStore[namespace]; ok {
|
||||
if labels, ok := metrics[metric.MetricName]; ok {
|
||||
labels[labelsKey] = storedMetric
|
||||
} else {
|
||||
metrics[metric.MetricName] = map[string]externalMetricsStoredMetric{
|
||||
labelsKey: storedMetric,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.externalMetricsStore[metric.MetricName] = map[string]externalMetricsStoredMetric{
|
||||
labelsKey: storedMetric,
|
||||
s.externalMetricsStore[namespace] = map[string]map[string]externalMetricsStoredMetric{
|
||||
metric.MetricName: {
|
||||
labelsKey: storedMetric,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -250,10 +260,12 @@ func (s *MetricStore) GetExternalMetric(namespace string, selector labels.Select
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
if metrics, ok := s.externalMetricsStore[info.Metric]; ok {
|
||||
for _, metric := range metrics {
|
||||
if selector.Matches(labels.Set(metric.Value.MetricLabels)) {
|
||||
matchedMetrics = append(matchedMetrics, metric.Value)
|
||||
if metrics, ok := s.externalMetricsStore[namespace]; ok {
|
||||
if selectors, ok := metrics[info.Metric]; ok {
|
||||
for _, sel := range selectors {
|
||||
if selector.Matches(labels.Set(sel.Value.MetricLabels)) {
|
||||
matchedMetrics = append(matchedMetrics, sel.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -268,11 +280,13 @@ func (s *MetricStore) ListAllExternalMetrics() []provider.ExternalMetricInfo {
|
||||
|
||||
metricsInfo := make([]provider.ExternalMetricInfo, 0, len(s.externalMetricsStore))
|
||||
|
||||
for metricName := range s.externalMetricsStore {
|
||||
info := provider.ExternalMetricInfo{
|
||||
Metric: metricName,
|
||||
for _, metrics := range s.externalMetricsStore {
|
||||
for metricName := range metrics {
|
||||
info := provider.ExternalMetricInfo{
|
||||
Metric: metricName,
|
||||
}
|
||||
metricsInfo = append(metricsInfo, info)
|
||||
}
|
||||
metricsInfo = append(metricsInfo, info)
|
||||
}
|
||||
return metricsInfo
|
||||
}
|
||||
@ -306,14 +320,19 @@ func (s *MetricStore) RemoveExpired() {
|
||||
}
|
||||
|
||||
// cleanup external metrics
|
||||
for metricName, metrics := range s.externalMetricsStore {
|
||||
for k, metric := range metrics {
|
||||
if metric.TTL.Before(time.Now().UTC()) {
|
||||
delete(metrics, k)
|
||||
for namespace, metrics := range s.externalMetricsStore {
|
||||
for metricName, selectors := range metrics {
|
||||
for k, metric := range selectors {
|
||||
if metric.TTL.Before(time.Now().UTC()) {
|
||||
delete(selectors, k)
|
||||
}
|
||||
}
|
||||
if len(selectors) == 0 {
|
||||
delete(metrics, metricName)
|
||||
}
|
||||
}
|
||||
if len(metrics) == 0 {
|
||||
delete(s.externalMetricsStore, metricName)
|
||||
delete(s.externalMetricsStore, namespace)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user