package collector import ( "encoding/json" "errors" "fmt" "math" "regexp" "strings" "time" autoscalingv2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/metrics/pkg/apis/custom_metrics" ) const ( rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"%s"}[1m])) * %.4f)` rpsMetricName = "requests-per-second" rpsMetricBackendSeparator = "," ) var ( errBackendNameMissing = errors.New("backend name must be specified for requests-per-second when traffic switching is used") ) // SkipperCollectorPlugin is a collector plugin for initializing metrics // collectors for getting skipper ingress metrics. type SkipperCollectorPlugin struct { client kubernetes.Interface plugin CollectorPlugin backendAnnotations []string } // NewSkipperCollectorPlugin initializes a new SkipperCollectorPlugin. func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin, backendAnnotations []string) (*SkipperCollectorPlugin, error) { return &SkipperCollectorPlugin{ client: client, plugin: prometheusPlugin, backendAnnotations: backendAnnotations, }, nil } // NewCollector initializes a new skipper collector from the specified HPA. func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { if strings.HasPrefix(config.Metric.Name, rpsMetricName) { backend := "" if len(config.Metric.Name) > len(rpsMetricName) { metricNameParts := strings.Split(config.Metric.Name, rpsMetricBackendSeparator) if len(metricNameParts) == 2 { backend = metricNameParts[1] } } return NewSkipperCollector(c.client, c.plugin, hpa, config, interval, c.backendAnnotations, backend) } return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name) } // SkipperCollector is a metrics collector for getting skipper ingress metrics. // It depends on the prometheus collector for getting the metrics. type SkipperCollector struct { client kubernetes.Interface metric autoscalingv2.MetricIdentifier objectReference custom_metrics.ObjectReference hpa *autoscalingv2.HorizontalPodAutoscaler interval time.Duration plugin CollectorPlugin config MetricConfig backend string backendAnnotations []string } // NewSkipperCollector initializes a new SkipperCollector. func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) { return &SkipperCollector{ client: client, objectReference: config.ObjectReference, hpa: hpa, metric: config.Metric, interval: interval, plugin: plugin, config: *config, backend: backend, backendAnnotations: backendAnnotations, }, nil } func getAnnotationWeight(backendWeights string, backend string) float64 { var weightsMap map[string]int err := json.Unmarshal([]byte(backendWeights), &weightsMap) if err != nil { return 0 } if weight, ok := weightsMap[backend]; ok { return float64(weight) / 100 } return 0 } func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) (float64, error) { maxWeight := 0.0 annotationsPresent := false for _, anno := range backendAnnotations { if weightsMap, ok := ingressAnnotations[anno]; ok { annotationsPresent = true maxWeight = math.Max(maxWeight, getAnnotationWeight(weightsMap, backend)) } } // Fallback for ingresses that don't use traffic switching if !annotationsPresent { return 1.0, nil } // Require backend name here if backend != "" { return maxWeight, nil } return 0.0, errBackendNameMissing } // getCollector returns a collector for getting the metrics. func (c *SkipperCollector) getCollector() (Collector, error) { ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{}) if err != nil { return nil, err } backendWeight, err := getWeights(ingress.Annotations, c.backendAnnotations, c.backend) if err != nil { return nil, err } config := c.config var escapedHostnames []string for _, rule := range ingress.Spec.Rules { escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1))) } if len(escapedHostnames) == 0 { return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name) } config.Config = map[string]string{ "query": fmt.Sprintf(rpsQuery, strings.Join(escapedHostnames, "|"), backendWeight), } config.PerReplica = false // per replica is handled outside of the prometheus collector collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval) if err != nil { return nil, err } return collector, nil } // GetMetrics gets skipper metrics from prometheus. func (c *SkipperCollector) GetMetrics() ([]CollectedMetric, error) { collector, err := c.getCollector() if err != nil { return nil, err } values, err := collector.GetMetrics() if err != nil { return nil, err } if len(values) != 1 { return nil, fmt.Errorf("expected to only get one metric value, got %d", len(values)) } value := values[0] // For Kubernetes