Adding boilerplate files

Signed-off-by: Per Ploug <per.ploug@zalando.de>
This commit is contained in:
Per Ploug
2018-10-08 13:17:05 +02:00
parent e9b677605a
commit 3db0cc3135
40 changed files with 3247 additions and 2 deletions

View File

@@ -0,0 +1,128 @@
package collector
import (
"fmt"
"strconv"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
AWSSQSQueueLengthMetric = "sqs-queue-length"
sqsQueueNameLabelKey = "queue-name"
sqsQueueRegionLabelKey = "region"
)
type AWSCollectorPlugin struct {
sessions map[string]*session.Session
}
func NewAWSCollectorPlugin(sessions map[string]*session.Session) *AWSCollectorPlugin {
return &AWSCollectorPlugin{
sessions: sessions,
}
}
// NewCollector initializes a new skipper collector from the specified HPA.
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
case AWSSQSQueueLengthMetric:
return NewAWSSQSCollector(c.sessions, config, interval)
}
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
}
type AWSSQSCollector struct {
sqs sqsiface.SQSAPI
interval time.Duration
region string
queueURL string
queueName string
labels map[string]string
metricName string
metricType autoscalingv2beta1.MetricSourceType
}
func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
name, ok := config.Labels[sqsQueueNameLabelKey]
if !ok {
return nil, fmt.Errorf("sqs queue name not specified on metric")
}
region, ok := config.Labels[sqsQueueRegionLabelKey]
if !ok {
return nil, fmt.Errorf("sqs queue region is not specified on metric")
}
session, ok := sessions[region]
if !ok {
return nil, fmt.Errorf("the metric region: %s is not configured", region)
}
service := sqs.New(session)
params := &sqs.GetQueueUrlInput{
QueueName: aws.String(name),
}
resp, err := service.GetQueueUrl(params)
if err != nil {
return nil, fmt.Errorf("failed to get queue URL for queue '%s': %v", name, err)
}
return &AWSSQSCollector{
sqs: service,
interval: interval,
queueURL: aws.StringValue(resp.QueueUrl),
queueName: name,
metricName: config.Name,
metricType: config.Type,
labels: config.Labels,
}, nil
}
func (c *AWSSQSCollector) GetMetrics() ([]CollectedMetric, error) {
params := &sqs.GetQueueAttributesInput{
QueueUrl: aws.String(c.queueURL),
AttributeNames: aws.StringSlice([]string{sqs.QueueAttributeNameApproximateNumberOfMessages}),
}
resp, err := c.sqs.GetQueueAttributes(params)
if err != nil {
return nil, err
}
if v, ok := resp.Attributes[sqs.QueueAttributeNameApproximateNumberOfMessages]; ok {
i, err := strconv.Atoi(aws.StringValue(v))
if err != nil {
return nil, err
}
metricValue := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metricName,
MetricLabels: c.labels,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewQuantity(int64(i), resource.DecimalSI),
},
}
return []CollectedMetric{metricValue}, nil
}
return nil, fmt.Errorf("failed to get queue length for '%s'", c.queueName)
}
// Interval returns the interval at which the collector should run.
func (c *AWSSQSCollector) Interval() time.Duration {
return c.interval
}

311
pkg/collector/collector.go Normal file
View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,133 @@
package collector
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"github.com/oliveagle/jsonpath"
"k8s.io/api/core/v1"
)
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
scheme string
path string
port int
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
getter := &JSONPathMetricsGetter{}
if v, ok := config["json-key"]; ok {
pat, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}
getter.jsonPath = pat
}
if v, ok := config["scheme"]; ok {
getter.scheme = v
}
if v, ok := config["path"]; ok {
getter.path = v
}
if v, ok := config["port"]; ok {
n, err := strconv.Atoi(v)
if err != nil {
return nil, err
}
getter.port = n
}
return getter, nil
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(pod *v1.Pod) (float64, error) {
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil {
return 0, err
}
// parse data
var jsonData interface{}
err = json.Unmarshal(data, &jsonData)
if err != nil {
return 0, err
}
res, err := g.jsonPath.Lookup(jsonData)
if err != nil {
return 0, err
}
switch res := res.(type) {
case int:
return float64(res), nil
case float32:
return float64(res), nil
case float64:
return res, nil
default:
return 0, fmt.Errorf("unsupported type %T", res)
}
}
// getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *v1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
}
httpClient := &http.Client{
Timeout: 15 * time.Second,
Transport: &http.Transport{},
}
if scheme == "" {
scheme = "http"
}
metricsURL := url.URL{
Scheme: scheme,
Host: fmt.Sprintf("%s:%d", pod.Status.PodIP, port),
Path: path,
}
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
if err != nil {
return nil, err
}
resp, err := httpClient.Do(request)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
}
return data, nil
}

View File

@@ -0,0 +1,42 @@
package collector
import "time"
// MaxCollector is a simple aggregator collector that returns the maximum value
// of metrics from all collectors.
type MaxCollector struct {
collectors []Collector
interval time.Duration
}
// NewMaxCollector initializes a new MacCollector.
func NewMaxCollector(interval time.Duration, collectors ...Collector) *MaxCollector {
return &MaxCollector{
collectors: collectors,
interval: interval,
}
}
// GetMetrics gets metrics from all collectors and return the higest value.
func (c *MaxCollector) GetMetrics() ([]CollectedMetric, error) {
var max CollectedMetric
for _, collector := range c.collectors {
values, err := collector.GetMetrics()
if err != nil {
return nil, err
}
for _, value := range values {
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
max = value
}
}
}
return []CollectedMetric{max}, nil
}
// Interval returns the interval at which the collector should run.
func (c *MaxCollector) Interval() time.Duration {
return c.interval
}

View File

@@ -0,0 +1,20 @@
package collector
import autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
type ObjectMetricsGetter interface {
GetObjectMetric(namespace string, reference *autoscalingv2beta1.CrossVersionObjectReference) (float64, error)
}
// type PodCollector struct {
// client kubernetes.Interface
// Getter PodMetricsGetter
// podLabelSelector string
// namespace string
// metricName string
// interval time.Duration
// }
// func NewObjectCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, metricName string, config *MetricConfig, interval time.Duration) (Collector, error) {
// switch
// }

View File

@@ -0,0 +1,141 @@
package collector
import (
"fmt"
"time"
"github.com/golang/glog"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
)
type PodCollectorPlugin struct {
client kubernetes.Interface
}
func NewPodCollectorPlugin(client kubernetes.Interface) *PodCollectorPlugin {
return &PodCollectorPlugin{
client: client,
}
}
func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewPodCollector(p.client, hpa, config, interval)
}
type PodCollector struct {
client kubernetes.Interface
Getter PodMetricsGetter
podLabelSelector string
namespace string
metricName string
metricType autoscalingv2beta1.MetricSourceType
interval time.Duration
}
type PodMetricsGetter interface {
GetMetric(pod *v1.Pod) (float64, error)
}
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
// get pod selector based on HPA scale target ref
selector, err := getPodLabelSelector(client, hpa)
if err != nil {
return nil, fmt.Errorf("failed to get pod label selector: %v", err)
}
c := &PodCollector{
client: client,
namespace: hpa.Namespace,
metricName: config.Name,
metricType: config.Type,
interval: interval,
podLabelSelector: selector,
}
var getter PodMetricsGetter
switch config.CollectorName {
case "json-path":
var err error
getter, err = NewJSONPathMetricsGetter(config.Config)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("format '%s' not supported", config.CollectorName)
}
c.Getter = getter
return c, nil
}
func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
opts := metav1.ListOptions{
LabelSelector: c.podLabelSelector,
}
pods, err := c.client.CoreV1().Pods(c.namespace).List(opts)
if err != nil {
return nil, err
}
values := make([]CollectedMetric, 0, len(pods.Items))
// TODO: get metrics in parallel
for _, pod := range pods.Items {
value, err := c.Getter.GetMetric(&pod)
if err != nil {
glog.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
continue
}
metricValue := CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
},
MetricName: c.metricName,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
Labels: pod.Labels,
}
values = append(values, metricValue)
}
return values, nil
}
func (c *PodCollector) Interval() time.Duration {
return c.interval
}
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler) (string, error) {
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
}
return labels.Set(deployment.Spec.Selector.MatchLabels).String(), nil
case "StatefulSet":
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
}
return labels.Set(sts.Spec.Selector.MatchLabels).String(), nil
}
return "", fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
}

View File

@@ -0,0 +1,131 @@
package collector
import (
"context"
"fmt"
"net/http"
"time"
"github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
)
type PrometheusCollectorPlugin struct {
promAPI promv1.API
client kubernetes.Interface
}
func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer string) (*PrometheusCollectorPlugin, error) {
cfg := api.Config{
Address: prometheusServer,
RoundTripper: &http.Transport{},
}
promClient, err := api.NewClient(cfg)
if err != nil {
return nil, err
}
return &PrometheusCollectorPlugin{
client: client,
promAPI: promv1.NewAPI(promClient),
}, nil
}
func (p *PrometheusCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewPrometheusCollector(p.client, p.promAPI, hpa, config, interval)
}
type PrometheusCollector struct {
client kubernetes.Interface
promAPI promv1.API
query string
metricName string
metricType autoscalingv2beta1.MetricSourceType
objectReference custom_metrics.ObjectReference
interval time.Duration
perReplica bool
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
}
func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PrometheusCollector, error) {
c := &PrometheusCollector{
client: client,
objectReference: config.ObjectReference,
metricName: config.Name,
metricType: config.Type,
interval: interval,
promAPI: promAPI,
perReplica: config.PerReplica,
hpa: hpa,
}
if v, ok := config.Config["query"]; ok {
// TODO: validate query
c.query = v
} else {
return nil, fmt.Errorf("no prometheus query defined")
}
return c, nil
}
func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
// TODO: use real context
value, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
if err != nil {
return nil, err
}
var sampleValue model.SampleValue
switch value.Type() {
case model.ValVector:
samples := value.(model.Vector)
if len(samples) == 0 {
return nil, fmt.Errorf("query '%s' returned no samples", c.query)
}
sampleValue = samples[0].Value
case model.ValScalar:
scalar := value.(*model.Scalar)
sampleValue = scalar.Value
}
if sampleValue.String() == "NaN" {
return nil, fmt.Errorf("query '%s' returned no samples: %s", c.query, sampleValue.String())
}
if c.perReplica {
// get current replicas for the targeted scale object. This is used to
// calculate an average metric instead of total.
// targetAverageValue will be available in Kubernetes v1.12
// https://github.com/kubernetes/kubernetes/pull/64097
replicas, err := targetRefReplicas(c.client, c.hpa)
if err != nil {
return nil, err
}
sampleValue = model.SampleValue(float64(sampleValue) / float64(replicas))
}
metricValue := CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: c.objectReference,
MetricName: c.metricName,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
},
}
return []CollectedMetric{metricValue}, nil
}
func (c *PrometheusCollector) Interval() time.Duration {
return c.interval
}

View File

@@ -0,0 +1,165 @@
package collector
import (
"fmt"
"strings"
"time"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
)
const (
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
rpsMetricName = "requests-per-second"
)
// SkipperCollectorPlugin is a collector plugin for initializing metrics
// collectors for getting skipper ingress metrics.
type SkipperCollectorPlugin struct {
client kubernetes.Interface
plugin CollectorPlugin
}
// NewSkipperCollectorPlugin initializes a new SkipperCollectorPlugin.
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin) (*SkipperCollectorPlugin, error) {
return &SkipperCollectorPlugin{
client: client,
plugin: prometheusPlugin,
}, nil
}
// NewCollector initializes a new skipper collector from the specified HPA.
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
case rpsMetricName:
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval)
default:
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
}
}
// SkipperCollector is a metrics collector for getting skipper ingress metrics.
// It depends on the prometheus collector for getting the metrics.
type SkipperCollector struct {
client kubernetes.Interface
metricName string
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
interval time.Duration
plugin CollectorPlugin
config MetricConfig
}
// NewSkipperCollector initializes a new SkipperCollector.
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*SkipperCollector, error) {
return &SkipperCollector{
client: client,
objectReference: config.ObjectReference,
hpa: hpa,
metricName: config.Name,
interval: interval,
plugin: plugin,
config: *config,
}, nil
}
// getCollector returns a collector for getting the metrics.
func (c *SkipperCollector) getCollector() (Collector, error) {
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
config := c.config
var collector Collector
collectors := make([]Collector, 0, len(ingress.Spec.Rules))
for _, rule := range ingress.Spec.Rules {
host := strings.Replace(rule.Host, ".", "_", -1)
config.Config = map[string]string{
"query": fmt.Sprintf(rpsQuery, host),
}
config.PerReplica = false // per replica is handled outside of the prometheus collector
collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval)
if err != nil {
return nil, err
}
collectors = append(collectors, collector)
}
if len(collectors) > 1 {
collector = NewMaxCollector(c.interval, collectors...)
} else if len(collectors) == 1 {
collector = collectors[0]
} else {
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
}
return collector, nil
}
// GetMetrics gets skipper metrics from prometheus.
func (c *SkipperCollector) GetMetrics() ([]CollectedMetric, error) {
collector, err := c.getCollector()
if err != nil {
return nil, err
}
values, err := collector.GetMetrics()
if err != nil {
return nil, err
}
if len(values) != 1 {
return nil, fmt.Errorf("expected to only get one metric value, got %d", len(values))
}
// get current replicas for the targeted scale object. This is used to
// calculate an average metric instead of total.
// targetAverageValue will be available in Kubernetes v1.12
// https://github.com/kubernetes/kubernetes/pull/64097
replicas, err := targetRefReplicas(c.client, c.hpa)
if err != nil {
return nil, err
}
if replicas < 1 {
return nil, fmt.Errorf("unable to get average value for %d replicas", replicas)
}
value := values[0]
avgValue := float64(value.Custom.Value.MilliValue()) / float64(replicas)
value.Custom.Value = *resource.NewMilliQuantity(int64(avgValue), resource.DecimalSI)
return []CollectedMetric{value}, nil
}
// Interval returns the interval at which the collector should run.
func (c *SkipperCollector) Interval() time.Duration {
return c.interval
}
func targetRefReplicas(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler) (int32, error) {
var replicas int32
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return 0, err
}
replicas = deployment.Status.ReadyReplicas
case "StatefulSet":
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return 0, err
}
replicas = sts.Status.ReadyReplicas
}
return replicas, nil
}