Files
kube-metrics-adapter/pkg/collector/collector.go
2019-01-09 14:38:56 +01:00

317 lines
8.4 KiB
Go

package collector
import (
"fmt"
"strings"
"time"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
customMetricsPrefix = "metric-config."
perReplicaMetricsConfKey = "per-replica"
intervalMetricsConfKey = "interval"
)
type ObjectReference struct {
autoscalingv2beta2.CrossVersionObjectReference
Namespace string
}
type CollectorFactory struct {
podsPlugins pluginMap
objectPlugins objectPluginMap
externalPlugins map[string]CollectorPlugin
}
type objectPluginMap struct {
Any pluginMap
Named map[string]*pluginMap
}
type pluginMap struct {
Any CollectorPlugin
Named map[string]CollectorPlugin
}
func NewCollectorFactory() *CollectorFactory {
return &CollectorFactory{
podsPlugins: pluginMap{Named: map[string]CollectorPlugin{}},
objectPlugins: objectPluginMap{
Any: pluginMap{},
Named: map[string]*pluginMap{},
},
externalPlugins: map[string]CollectorPlugin{},
}
}
type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
}
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
if metricCollector == "" {
c.podsPlugins.Any = plugin
} else {
c.podsPlugins.Named[metricCollector] = plugin
}
return nil
}
func (c *CollectorFactory) RegisterObjectCollector(kind, metricCollector string, plugin CollectorPlugin) error {
if kind == "" {
if metricCollector == "" {
c.objectPlugins.Any.Any = plugin
} else {
if c.objectPlugins.Any.Named == nil {
c.objectPlugins.Any.Named = map[string]CollectorPlugin{
metricCollector: plugin,
}
} else {
c.objectPlugins.Any.Named[metricCollector] = plugin
}
}
} else {
if named, ok := c.objectPlugins.Named[kind]; ok {
if metricCollector == "" {
named.Any = plugin
} else {
named.Named[metricCollector] = plugin
}
} else {
if metricCollector == "" {
c.objectPlugins.Named[kind] = &pluginMap{
Any: plugin,
}
} else {
c.objectPlugins.Named[kind] = &pluginMap{
Named: map[string]CollectorPlugin{
metricCollector: plugin,
},
}
}
}
}
return nil
}
func (c *CollectorFactory) RegisterExternalCollector(metrics []string, plugin CollectorPlugin) {
for _, metric := range metrics {
c.externalPlugins[metric] = plugin
}
}
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Type {
case autoscalingv2beta2.PodsMetricSourceType:
// first try to find a plugin by format
if plugin, ok := c.podsPlugins.Named[config.CollectorName]; ok {
return plugin.NewCollector(hpa, config, interval)
}
// else try to use the default plugin if set
if c.podsPlugins.Any != nil {
return c.podsPlugins.Any.NewCollector(hpa, config, interval)
}
case autoscalingv2beta2.ObjectMetricSourceType:
// first try to find a plugin by kind
if kinds, ok := c.objectPlugins.Named[config.ObjectReference.Kind]; ok {
if plugin, ok := kinds.Named[config.CollectorName]; ok {
return plugin.NewCollector(hpa, config, interval)
}
if kinds.Any != nil {
return kinds.Any.NewCollector(hpa, config, interval)
}
break
}
// else try to find a default plugin for this kind
if plugin, ok := c.objectPlugins.Any.Named[config.CollectorName]; ok {
return plugin.NewCollector(hpa, config, interval)
}
if c.objectPlugins.Any.Any != nil {
return c.objectPlugins.Any.Any.NewCollector(hpa, config, interval)
}
case autoscalingv2beta2.ExternalMetricSourceType:
if plugin, ok := c.externalPlugins[config.Name]; ok {
return plugin.NewCollector(hpa, config, interval)
}
}
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
}
func getObjectReference(hpa *autoscalingv2beta2.HorizontalPodAutoscaler, metricName string) (custom_metrics.ObjectReference, error) {
for _, metric := range hpa.Spec.Metrics {
if metric.Type == autoscalingv2beta2.ObjectMetricSourceType && metric.Object.Metric.Name == metricName {
return custom_metrics.ObjectReference{
APIVersion: metric.Object.DescribedObject.APIVersion,
Kind: metric.Object.DescribedObject.Kind,
Name: metric.Object.DescribedObject.Name,
Namespace: hpa.Namespace,
}, nil
}
}
return custom_metrics.ObjectReference{}, fmt.Errorf("failed to find object reference")
}
type MetricTypeName struct {
Type autoscalingv2beta2.MetricSourceType
Name string
MetricLabels *metav1.LabelSelector
}
type CollectedMetric struct {
Type autoscalingv2beta2.MetricSourceType
Custom custom_metrics.MetricValue
External external_metrics.ExternalMetricValue
Labels map[string]string
}
type Collector interface {
GetMetrics() ([]CollectedMetric, error)
Interval() time.Duration
}
type MetricConfig struct {
MetricTypeName
CollectorName string
Config map[string]string
ObjectReference custom_metrics.ObjectReference
PerReplica bool
Interval time.Duration
Labels map[string]string
}
func parseCustomMetricsAnnotations(annotations map[string]string) (map[MetricTypeName]*MetricConfig, error) {
metrics := make(map[MetricTypeName]*MetricConfig)
for key, val := range annotations {
if !strings.HasPrefix(key, customMetricsPrefix) {
continue
}
parts := strings.Split(key, "/")
if len(parts) != 2 {
// TODO: error?
continue
}
configs := strings.Split(parts[0], ".")
if len(configs) != 4 {
// TODO: error?
continue
}
metricTypeName := MetricTypeName{
Name: configs[2],
}
switch configs[1] {
case "pods":
metricTypeName.Type = autoscalingv2beta2.PodsMetricSourceType
case "object":
metricTypeName.Type = autoscalingv2beta2.ObjectMetricSourceType
}
metricCollector := configs[3]
config, ok := metrics[metricTypeName]
if !ok {
config = &MetricConfig{
MetricTypeName: metricTypeName,
CollectorName: metricCollector,
Config: map[string]string{},
}
metrics[metricTypeName] = config
}
// TODO: fail if collector name doesn't match
if config.CollectorName != metricCollector {
continue
}
if parts[1] == perReplicaMetricsConfKey {
config.PerReplica = true
continue
}
if parts[1] == intervalMetricsConfKey {
interval, err := time.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
}
config.Interval = interval
continue
}
config.Config[parts[1]] = val
}
return metrics, nil
}
// ParseHPAMetrics parses the HPA object into a list of metric configurations.
func ParseHPAMetrics(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
metricConfigs := make([]*MetricConfig, 0, len(hpa.Spec.Metrics))
// TODO: validate that the specified metric names are defined
// in the HPA
configs, err := parseCustomMetricsAnnotations(hpa.Annotations)
if err != nil {
return nil, err
}
for _, metric := range hpa.Spec.Metrics {
typeName := MetricTypeName{
Type: metric.Type,
}
var ref custom_metrics.ObjectReference
switch metric.Type {
case autoscalingv2beta2.PodsMetricSourceType:
typeName.Name = metric.Pods.Metric.Name
case autoscalingv2beta2.ObjectMetricSourceType:
typeName.Name = metric.Object.Metric.Name
typeName.MetricLabels = metric.Object.Metric.Selector
ref = custom_metrics.ObjectReference{
APIVersion: metric.Object.DescribedObject.APIVersion,
Kind: metric.Object.DescribedObject.Kind,
Name: metric.Object.DescribedObject.Name,
Namespace: hpa.Namespace,
}
case autoscalingv2beta2.ExternalMetricSourceType:
typeName.Name = metric.External.Metric.Name
case autoscalingv2beta2.ResourceMetricSourceType:
continue // kube-metrics-adapter does not collect resource metrics
}
if config, ok := configs[typeName]; ok {
config.ObjectReference = ref
metricConfigs = append(metricConfigs, config)
continue
}
config := &MetricConfig{
MetricTypeName: typeName,
ObjectReference: ref,
Config: map[string]string{},
}
if metric.Type == autoscalingv2beta2.ExternalMetricSourceType {
config.Labels = metric.External.Metric.Selector.MatchLabels
}
metricConfigs = append(metricConfigs, config)
}
return metricConfigs, nil
}