mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-08-14 10:39:26 +00:00
Adding event recorder and replace glog (#12)
Signed-off-by: Nick Jüttner <nick@zalando.de>
This commit is contained in:
@@ -4,7 +4,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
log "github.com/sirupsen/logrus"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
@@ -36,6 +36,7 @@ type PodCollector struct {
|
||||
metricName string
|
||||
metricType autoscalingv2beta1.MetricSourceType
|
||||
interval time.Duration
|
||||
logger *log.Entry
|
||||
}
|
||||
|
||||
type PodMetricsGetter interface {
|
||||
@@ -56,6 +57,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.Horizo
|
||||
metricType: config.Type,
|
||||
interval: interval,
|
||||
podLabelSelector: selector,
|
||||
logger: log.WithFields(log.Fields{"Collector": "Pod"}),
|
||||
}
|
||||
|
||||
var getter PodMetricsGetter
|
||||
@@ -91,7 +93,7 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
for _, pod := range pods.Items {
|
||||
value, err := c.Getter.GetMetric(&pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||
c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@@ -6,16 +6,19 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/recorder"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
kube_record "k8s.io/client-go/tools/record"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
@@ -58,6 +61,8 @@ type HPAProvider struct {
|
||||
hpaCache map[resourceReference]autoscalingv2beta1.HorizontalPodAutoscaler
|
||||
metricStore *MetricStore
|
||||
collectorFactory *collector.CollectorFactory
|
||||
recorder kube_record.EventRecorder
|
||||
logger *log.Entry
|
||||
}
|
||||
|
||||
// metricCollection is a container for sending collected metrics across a
|
||||
@@ -70,6 +75,7 @@ type metricCollection struct {
|
||||
// NewHPAProvider initializes a new HPAProvider.
|
||||
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider {
|
||||
metricsc := make(chan metricCollection)
|
||||
|
||||
return &HPAProvider{
|
||||
client: client,
|
||||
interval: interval,
|
||||
@@ -77,6 +83,8 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
|
||||
metricSink: metricsc,
|
||||
metricStore: NewMetricStore(),
|
||||
collectorFactory: collectorFactory,
|
||||
recorder: recorder.CreateEventRecorder(client),
|
||||
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,7 +98,7 @@ func (p *HPAProvider) Run(ctx context.Context) {
|
||||
for {
|
||||
err := p.updateHPAs()
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
p.logger.Error(err)
|
||||
UpdateErrors.Inc()
|
||||
} else {
|
||||
UpdateSuccesses.Inc()
|
||||
@@ -99,7 +107,7 @@ func (p *HPAProvider) Run(ctx context.Context) {
|
||||
select {
|
||||
case <-time.After(p.interval):
|
||||
case <-ctx.Done():
|
||||
glog.Info("Stopped HPA provider.")
|
||||
p.logger.Info("Stopped HPA provider.")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -108,7 +116,7 @@ func (p *HPAProvider) Run(ctx context.Context) {
|
||||
// updateHPAs discovers all HPA resources and sets up metric collectors for new
|
||||
// HPAs.
|
||||
func (p *HPAProvider) updateHPAs() error {
|
||||
glog.Info("Looking for HPAs")
|
||||
p.logger.Info("Looking for HPAs")
|
||||
|
||||
hpas, err := p.client.AutoscalingV2beta1().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
@@ -129,7 +137,7 @@ func (p *HPAProvider) updateHPAs() error {
|
||||
if cachedHPA, ok := p.hpaCache[resourceRef]; !ok || !equalHPA(cachedHPA, hpa) {
|
||||
metricConfigs, err := collector.ParseHPAMetrics(&hpa)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to parse HPA metrics: %v", err)
|
||||
p.logger.Errorf("Failed to parse HPA metrics: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -142,13 +150,12 @@ func (p *HPAProvider) updateHPAs() error {
|
||||
|
||||
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval)
|
||||
if err != nil {
|
||||
// TODO: log and send event
|
||||
glog.Errorf("Failed to create new metrics collector: %v", err)
|
||||
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
|
||||
cache = false
|
||||
continue
|
||||
}
|
||||
|
||||
glog.Infof("Adding new metrics collector: %T", collector)
|
||||
p.logger.Infof("Adding new metrics collector: %T", collector)
|
||||
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector)
|
||||
}
|
||||
newHPAs++
|
||||
@@ -168,11 +175,11 @@ func (p *HPAProvider) updateHPAs() error {
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Removing previously scheduled metrics collector: %s", ref)
|
||||
p.logger.Infof("Removing previously scheduled metrics collector: %s", ref)
|
||||
p.collectorScheduler.Remove(ref)
|
||||
}
|
||||
|
||||
glog.Infof("Found %d new/updated HPA(s)", newHPAs)
|
||||
p.logger.Infof("Found %d new/updated HPA(s)", newHPAs)
|
||||
p.hpaCache = newHPACache
|
||||
return nil
|
||||
}
|
||||
@@ -197,7 +204,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
case <-time.After(10 * time.Minute):
|
||||
p.metricStore.RemoveExpired()
|
||||
case <-ctx.Done():
|
||||
glog.Info("Stopped metrics store garbage collection.")
|
||||
p.logger.Info("Stopped metrics store garbage collection.")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -207,17 +214,17 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
select {
|
||||
case collection := <-p.metricSink:
|
||||
if collection.Error != nil {
|
||||
glog.Errorf("Failed to collect metrics: %v", collection.Error)
|
||||
p.logger.Errorf("Failed to collect metrics: %v", collection.Error)
|
||||
CollectionErrors.Inc()
|
||||
} else {
|
||||
CollectionSuccesses.Inc()
|
||||
}
|
||||
|
||||
glog.Infof("Collected %d new metric(s)", len(collection.Values))
|
||||
p.logger.Infof("Collected %d new metric(s)", len(collection.Values))
|
||||
for _, value := range collection.Values {
|
||||
switch value.Type {
|
||||
case autoscalingv2beta1.ObjectMetricSourceType, autoscalingv2beta1.PodsMetricSourceType:
|
||||
glog.Infof("Collected new custom metric '%s' (%s) for %s %s/%s",
|
||||
p.logger.Infof("Collected new custom metric '%s' (%s) for %s %s/%s",
|
||||
value.Custom.MetricName,
|
||||
value.Custom.Value.String(),
|
||||
value.Custom.DescribedObject.Kind,
|
||||
@@ -225,7 +232,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
value.Custom.DescribedObject.Name,
|
||||
)
|
||||
case autoscalingv2beta1.ExternalMetricSourceType:
|
||||
glog.Infof("Collected new external metric '%s' (%s) [%s]",
|
||||
p.logger.Infof("Collected new external metric '%s' (%s) [%s]",
|
||||
value.External.MetricName,
|
||||
value.External.Value.String(),
|
||||
labels.Set(value.External.MetricLabels).String(),
|
||||
@@ -234,7 +241,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
p.metricStore.Insert(value)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
glog.Info("Stopped metrics collection.")
|
||||
p.logger.Info("Stopped metrics collection.")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -330,7 +337,7 @@ func collectorRunner(ctx context.Context, collector collector.Collector, metrics
|
||||
select {
|
||||
case <-time.After(collector.Interval()):
|
||||
case <-ctx.Done():
|
||||
glog.V(2).Infof("stopping collector runner...")
|
||||
log.Info("stopping collector runner...")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
21
pkg/recorder/recorder.go
Normal file
21
pkg/recorder/recorder.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package recorder
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
clientv1 "k8s.io/api/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
kube_record "k8s.io/client-go/tools/record"
|
||||
)
|
||||
|
||||
// CreateEventRecorder creates an event recorder to send custom events to Kubernetes to be recorded for targeted Kubernetes objects
|
||||
func CreateEventRecorder(kubeClient clientset.Interface) kube_record.EventRecorder {
|
||||
eventBroadcaster := kube_record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(logrus.Infof)
|
||||
if _, isfake := kubeClient.(*fake.Clientset); !isfake {
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
|
||||
}
|
||||
return eventBroadcaster.NewRecorder(scheme.Scheme, clientv1.EventSource{Component: "kube-metrics-adapter"})
|
||||
}
|
Reference in New Issue
Block a user