From b18acf3ed0822137f0cc51f65de0cc30c9beb744 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20J=C3=BCttner?= Date: Wed, 24 Oct 2018 17:27:30 +0200 Subject: [PATCH] Adding event recorder and replace glog (#12) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Jüttner --- go.mod | 7 +++--- go.sum | 4 ++++ pkg/collector/pod_collector.go | 6 +++-- pkg/provider/hpa.go | 41 ++++++++++++++++++++-------------- pkg/recorder/recorder.go | 21 +++++++++++++++++ 5 files changed, 57 insertions(+), 22 deletions(-) create mode 100644 pkg/recorder/recorder.go diff --git a/go.mod b/go.mod index 90bc0e0..96218eb 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/NYTimes/gziphandler v1.0.1 // indirect github.com/PuerkitoBio/purell v1.1.0 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect - github.com/aws/aws-sdk-go v1.15.21 + github.com/aws/aws-sdk-go v1.15.61 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect github.com/boltdb/bolt v1.3.1 // indirect github.com/coreos/bbolt v1.3.0 // indirect @@ -22,6 +22,7 @@ require ( github.com/evanphx/json-patch v3.0.0+incompatible // indirect github.com/fsnotify/fsnotify v1.4.7 // indirect github.com/ghodss/yaml v1.0.0 // indirect + github.com/go-ini/ini v1.25.4 // indirect github.com/go-openapi/jsonpointer v0.0.0-20180322222829-3a0015ad55fa // indirect github.com/go-openapi/jsonreference v0.0.0-20180322222742-3fb327e6747d // indirect github.com/go-openapi/spec v0.0.0-20180801175345-384415f06ee2 // indirect @@ -60,7 +61,7 @@ require ( github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 // indirect - github.com/sirupsen/logrus v1.0.6 // indirect + github.com/sirupsen/logrus v1.0.6 github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf // indirect github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a // indirect github.com/soheilhy/cmux v0.1.4 // indirect @@ -70,7 +71,7 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect github.com/ugorji/go v1.1.1 // indirect github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect - golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac // indirect + golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e // indirect golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 // indirect golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 // indirect diff --git a/go.sum b/go.sum index 8cb1e5e..663e91f 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/aws/aws-sdk-go v1.15.21 h1:STLvc6RrpycslC1NRtTvt/YSgDkIGCTrB9K9vE5R2oQ= github.com/aws/aws-sdk-go v1.15.21/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= +github.com/aws/aws-sdk-go v1.15.61 h1:M1mnQshHau/YfY2hV45rsaAevdMgLp7zh0oHRCgX100= +github.com/aws/aws-sdk-go v1.15.61/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= @@ -142,6 +144,8 @@ github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 h1:MPPkRncZLN9Kh4M github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac h1:7d7lG9fHOLdL6jZPtnV4LpI41SbohIJ1Atq7U991dMg= golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e h1:IzypfodbhbnViNUO/MEh0FzCUooG97cIGfdggUrUSyU= +golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 h1:4S2XUgvg3hUNTvxI307qkFPb9zKHG3Nf9TXFzX/DZZI= golang.org/x/net v0.0.0-20180824152047-4bcd98cce591/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= diff --git a/pkg/collector/pod_collector.go b/pkg/collector/pod_collector.go index 8d2a759..f7dfed9 100644 --- a/pkg/collector/pod_collector.go +++ b/pkg/collector/pod_collector.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/golang/glog" + log "github.com/sirupsen/logrus" autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -36,6 +36,7 @@ type PodCollector struct { metricName string metricType autoscalingv2beta1.MetricSourceType interval time.Duration + logger *log.Entry } type PodMetricsGetter interface { @@ -56,6 +57,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.Horizo metricType: config.Type, interval: interval, podLabelSelector: selector, + logger: log.WithFields(log.Fields{"Collector": "Pod"}), } var getter PodMetricsGetter @@ -91,7 +93,7 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) { for _, pod := range pods.Items { value, err := c.Getter.GetMetric(&pod) if err != nil { - glog.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err) + c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err) continue } diff --git a/pkg/provider/hpa.go b/pkg/provider/hpa.go index 5e6ac45..618485a 100644 --- a/pkg/provider/hpa.go +++ b/pkg/provider/hpa.go @@ -6,16 +6,19 @@ import ( "sync" "time" - "github.com/golang/glog" "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + log "github.com/sirupsen/logrus" "github.com/zalando-incubator/kube-metrics-adapter/pkg/collector" + "github.com/zalando-incubator/kube-metrics-adapter/pkg/recorder" autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1" + apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + kube_record "k8s.io/client-go/tools/record" "k8s.io/metrics/pkg/apis/custom_metrics" "k8s.io/metrics/pkg/apis/external_metrics" ) @@ -58,6 +61,8 @@ type HPAProvider struct { hpaCache map[resourceReference]autoscalingv2beta1.HorizontalPodAutoscaler metricStore *MetricStore collectorFactory *collector.CollectorFactory + recorder kube_record.EventRecorder + logger *log.Entry } // metricCollection is a container for sending collected metrics across a @@ -70,6 +75,7 @@ type metricCollection struct { // NewHPAProvider initializes a new HPAProvider. func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider { metricsc := make(chan metricCollection) + return &HPAProvider{ client: client, interval: interval, @@ -77,6 +83,8 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim metricSink: metricsc, metricStore: NewMetricStore(), collectorFactory: collectorFactory, + recorder: recorder.CreateEventRecorder(client), + logger: log.WithFields(log.Fields{"provider": "hpa"}), } } @@ -90,7 +98,7 @@ func (p *HPAProvider) Run(ctx context.Context) { for { err := p.updateHPAs() if err != nil { - glog.Error(err) + p.logger.Error(err) UpdateErrors.Inc() } else { UpdateSuccesses.Inc() @@ -99,7 +107,7 @@ func (p *HPAProvider) Run(ctx context.Context) { select { case <-time.After(p.interval): case <-ctx.Done(): - glog.Info("Stopped HPA provider.") + p.logger.Info("Stopped HPA provider.") return } } @@ -108,7 +116,7 @@ func (p *HPAProvider) Run(ctx context.Context) { // updateHPAs discovers all HPA resources and sets up metric collectors for new // HPAs. func (p *HPAProvider) updateHPAs() error { - glog.Info("Looking for HPAs") + p.logger.Info("Looking for HPAs") hpas, err := p.client.AutoscalingV2beta1().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{}) if err != nil { @@ -129,7 +137,7 @@ func (p *HPAProvider) updateHPAs() error { if cachedHPA, ok := p.hpaCache[resourceRef]; !ok || !equalHPA(cachedHPA, hpa) { metricConfigs, err := collector.ParseHPAMetrics(&hpa) if err != nil { - glog.Errorf("Failed to parse HPA metrics: %v", err) + p.logger.Errorf("Failed to parse HPA metrics: %v", err) continue } @@ -142,13 +150,12 @@ func (p *HPAProvider) updateHPAs() error { collector, err := p.collectorFactory.NewCollector(&hpa, config, interval) if err != nil { - // TODO: log and send event - glog.Errorf("Failed to create new metrics collector: %v", err) + p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err) cache = false continue } - glog.Infof("Adding new metrics collector: %T", collector) + p.logger.Infof("Adding new metrics collector: %T", collector) p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector) } newHPAs++ @@ -168,11 +175,11 @@ func (p *HPAProvider) updateHPAs() error { continue } - glog.V(2).Infof("Removing previously scheduled metrics collector: %s", ref) + p.logger.Infof("Removing previously scheduled metrics collector: %s", ref) p.collectorScheduler.Remove(ref) } - glog.Infof("Found %d new/updated HPA(s)", newHPAs) + p.logger.Infof("Found %d new/updated HPA(s)", newHPAs) p.hpaCache = newHPACache return nil } @@ -197,7 +204,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) { case <-time.After(10 * time.Minute): p.metricStore.RemoveExpired() case <-ctx.Done(): - glog.Info("Stopped metrics store garbage collection.") + p.logger.Info("Stopped metrics store garbage collection.") return } } @@ -207,17 +214,17 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) { select { case collection := <-p.metricSink: if collection.Error != nil { - glog.Errorf("Failed to collect metrics: %v", collection.Error) + p.logger.Errorf("Failed to collect metrics: %v", collection.Error) CollectionErrors.Inc() } else { CollectionSuccesses.Inc() } - glog.Infof("Collected %d new metric(s)", len(collection.Values)) + p.logger.Infof("Collected %d new metric(s)", len(collection.Values)) for _, value := range collection.Values { switch value.Type { case autoscalingv2beta1.ObjectMetricSourceType, autoscalingv2beta1.PodsMetricSourceType: - glog.Infof("Collected new custom metric '%s' (%s) for %s %s/%s", + p.logger.Infof("Collected new custom metric '%s' (%s) for %s %s/%s", value.Custom.MetricName, value.Custom.Value.String(), value.Custom.DescribedObject.Kind, @@ -225,7 +232,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) { value.Custom.DescribedObject.Name, ) case autoscalingv2beta1.ExternalMetricSourceType: - glog.Infof("Collected new external metric '%s' (%s) [%s]", + p.logger.Infof("Collected new external metric '%s' (%s) [%s]", value.External.MetricName, value.External.Value.String(), labels.Set(value.External.MetricLabels).String(), @@ -234,7 +241,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) { p.metricStore.Insert(value) } case <-ctx.Done(): - glog.Info("Stopped metrics collection.") + p.logger.Info("Stopped metrics collection.") return } } @@ -330,7 +337,7 @@ func collectorRunner(ctx context.Context, collector collector.Collector, metrics select { case <-time.After(collector.Interval()): case <-ctx.Done(): - glog.V(2).Infof("stopping collector runner...") + log.Info("stopping collector runner...") return } } diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go new file mode 100644 index 0000000..279bc09 --- /dev/null +++ b/pkg/recorder/recorder.go @@ -0,0 +1,21 @@ +package recorder + +import ( + "github.com/sirupsen/logrus" + clientv1 "k8s.io/api/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + kube_record "k8s.io/client-go/tools/record" +) + +// CreateEventRecorder creates an event recorder to send custom events to Kubernetes to be recorded for targeted Kubernetes objects +func CreateEventRecorder(kubeClient clientset.Interface) kube_record.EventRecorder { + eventBroadcaster := kube_record.NewBroadcaster() + eventBroadcaster.StartLogging(logrus.Infof) + if _, isfake := kubeClient.(*fake.Clientset); !isfake { + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) + } + return eventBroadcaster.NewRecorder(scheme.Scheme, clientv1.EventSource{Component: "kube-metrics-adapter"}) +}