mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2026-06-10 07:03:52 +00:00
Compare commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
77733fb03d | ||
|
|
ebdf0e2d85 | ||
|
|
6d6c08e1fc | ||
|
|
b10c0e67d8 | ||
|
|
d0dbe73b55 | ||
|
|
caa6a11ee8 | ||
|
|
1009ae601e | ||
|
|
1d35f93162 | ||
|
|
1d135d4dda | ||
|
|
35b4365108 | ||
|
|
ad0b98a4a0 | ||
|
|
fbef659df1 | ||
|
|
f77472e8e7 | ||
|
|
5106e6cfdf | ||
|
|
e1e5a06c9b | ||
|
|
25bd53a987 | ||
|
|
58848f0985 | ||
|
|
d28a770825 | ||
|
|
55bb290076 | ||
|
|
70641827ce | ||
|
|
99ca95588e | ||
|
|
cc9952d66e | ||
|
|
58c6a56cbf | ||
|
|
9e3a1760f1 | ||
|
|
0e52e076c1 | ||
|
|
18faf9c076 | ||
|
|
170faf8809 | ||
|
|
419f75cf18 |
@@ -10,10 +10,10 @@ jobs:
|
||||
tests:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6.0.1
|
||||
- uses: actions/setup-go@v6.1.0
|
||||
- uses: actions/checkout@v6.0.2
|
||||
- uses: actions/setup-go@v6.4.0
|
||||
with:
|
||||
go-version: '^1.25'
|
||||
go-version: '^1.26'
|
||||
- run: go version
|
||||
- run: go install github.com/mattn/goveralls@latest
|
||||
- run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
|
||||
|
||||
@@ -38,12 +38,12 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v6.0.1
|
||||
uses: actions/checkout@v6.0.2
|
||||
|
||||
- name: setup go
|
||||
uses: actions/setup-go@v6.1.0
|
||||
uses: actions/setup-go@v6.4.0
|
||||
with:
|
||||
go-version: '1.25'
|
||||
go-version: '1.26'
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
|
||||
@@ -25,15 +25,15 @@ jobs:
|
||||
packages: write # to push packages
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8
|
||||
uses: actions/checkout@0c366fd6a839edf440554fa01a7085ccba70ac98
|
||||
|
||||
- uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c
|
||||
- uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c
|
||||
with:
|
||||
# https://www.npmjs.com/package/semver#caret-ranges-123-025-004
|
||||
go-version: '^1.25'
|
||||
go-version: '^1.26'
|
||||
|
||||
- name: Login to Github Container Registry
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef
|
||||
uses: docker/login-action@b45d80f862d83dbcd57f89517bcf500b2ab88fb2
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ github.actor }}
|
||||
@@ -47,21 +47,21 @@ jobs:
|
||||
make build.linux.amd64 build.linux.arm64
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@c7c53464625b32c7a7e944ae62b3e17d2b600130
|
||||
uses: docker/setup-qemu-action@ce360397dd3f832beb865e1373c09c0e9f86d70a
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f
|
||||
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd
|
||||
|
||||
- name: Login to GitHub Container Registry
|
||||
if: github.event_name != 'pull_request'
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef
|
||||
uses: docker/login-action@b45d80f862d83dbcd57f89517bcf500b2ab88fb2
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Docker meta
|
||||
uses: docker/metadata-action@c299e40c65443455700f0fdfc63efafe5b349051
|
||||
uses: docker/metadata-action@030e881283bb7a6894de51c315a6bfe6a94e05cf
|
||||
id: meta
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
@@ -70,7 +70,7 @@ jobs:
|
||||
type=semver,pattern=v{{major}}.{{minor}}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83
|
||||
uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294
|
||||
with:
|
||||
context: .
|
||||
build-args: BASE_IMAGE=gcr.io/distroless/static-debian12
|
||||
@@ -81,7 +81,7 @@ jobs:
|
||||
|
||||
# Build and push latest tag
|
||||
- name: Build and push latest
|
||||
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83
|
||||
uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294
|
||||
with:
|
||||
context: .
|
||||
build-args: BASE_IMAGE=gcr.io/distroless/static-debian12
|
||||
|
||||
@@ -10,7 +10,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v6.0.1
|
||||
uses: actions/checkout@v6.0.2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
@@ -3,6 +3,13 @@ apiVersion: apiregistration.k8s.io/v1
|
||||
kind: APIService
|
||||
metadata:
|
||||
name: v1beta1.custom.metrics.k8s.io
|
||||
{{- with .Values.customMetricsApi.annotations }}
|
||||
annotations:
|
||||
{{- range $k, $v := . }}
|
||||
{{- $value := $v | quote }}
|
||||
{{- printf "%s: %s" (tpl $k $) (tpl $value $) | nindent 4 }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
spec:
|
||||
service:
|
||||
name: kube-metrics-adapter
|
||||
|
||||
@@ -3,6 +3,13 @@ apiVersion: apiregistration.k8s.io/v1
|
||||
kind: APIService
|
||||
metadata:
|
||||
name: v1beta1.external.metrics.k8s.io
|
||||
{{- with .Values.externalMetricsApi.annotations }}
|
||||
annotations:
|
||||
{{- range $k, $v := . }}
|
||||
{{- $value := $v | quote }}
|
||||
{{- printf "%s: %s" (tpl $k $) (tpl $value $) | nindent 4 }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
spec:
|
||||
service:
|
||||
name: kube-metrics-adapter
|
||||
|
||||
@@ -15,7 +15,19 @@ addDirectoryHeader:
|
||||
contentionProfiling:
|
||||
profiling:
|
||||
enableCustomMetricsApi: true
|
||||
|
||||
# -- add annotations to the custom metrics apiservice resource, e.g.
|
||||
# cert-manager.io/inject-ca-from: {{.Release.Namespace}}/kube-metrics-adapter to inject the CA certificate form a cert-manager-issued cert used for the deployment
|
||||
customMetricsApi:
|
||||
annotations: {}
|
||||
|
||||
enableExternalMetricsApi: true
|
||||
|
||||
# -- add annotations to the custom metrics apiservice resource, e.g.
|
||||
# cert-manager.io/inject-ca-from: {{.Release.Namespace}}/kube-metrics-adapter to inject the CA certificate form a cert-manager-issued cert used for the deployment
|
||||
externalMetricsApi:
|
||||
annotations: {}
|
||||
|
||||
credentialsDirectory:
|
||||
disregardIncompatibleHPAs:
|
||||
http2MaxStreamsPerConnection:
|
||||
|
||||
+1
-1
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
func metricsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
_, err := w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
|
||||
_, err := fmt.Fprintf(w, `{"queue": {"length": %d}}`, size)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to write: %v", err)
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -35,7 +35,7 @@ type ScalingSchedule struct {
|
||||
// Identifier returns the namespaced scalingScale Identifier in the format
|
||||
// `<namespace>/<name>`.
|
||||
func (s *ScalingSchedule) Identifier() string {
|
||||
return s.ObjectMeta.Namespace + "/" + s.ObjectMeta.Name
|
||||
return s.Namespace + "/" + s.Name
|
||||
}
|
||||
|
||||
// ResourceSpec returns the ScalingScheduleSpec of the ScalingSchedule.
|
||||
@@ -66,7 +66,7 @@ type ClusterScalingSchedule struct {
|
||||
// Identifier returns the cluster scalingScale Identifier in the format
|
||||
// `<name>`.
|
||||
func (s *ClusterScalingSchedule) Identifier() string {
|
||||
return s.ObjectMeta.Name
|
||||
return s.Name
|
||||
}
|
||||
|
||||
// ResourceSpec returns the ScalingScheduleSpec of the ClusterScalingSchedule.
|
||||
|
||||
@@ -34,10 +34,6 @@ import (
|
||||
// It's backed by a very simple object tracker that processes creates, updates and deletions as-is,
|
||||
// without applying any field management, validations and/or defaults. It shouldn't be considered a replacement
|
||||
// for a real clientset and is mostly useful in simple unit tests.
|
||||
//
|
||||
// DEPRECATED: NewClientset replaces this with support for field management, which significantly improves
|
||||
// server side apply testing. NewClientset is only available when apply configurations are generated (e.g.
|
||||
// via --with-applyconfig).
|
||||
func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||
o := testing.NewObjectTracker(scheme, codecs.UniversalDecoder())
|
||||
for _, obj := range objects {
|
||||
@@ -51,8 +47,8 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||
cs.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||
cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||
var opts metav1.ListOptions
|
||||
if watchActcion, ok := action.(testing.WatchActionImpl); ok {
|
||||
opts = watchActcion.ListOptions
|
||||
if watchAction, ok := action.(testing.WatchActionImpl); ok {
|
||||
opts = watchAction.ListOptions
|
||||
}
|
||||
gvr := action.GetResource()
|
||||
ns := action.GetNamespace()
|
||||
@@ -83,6 +79,17 @@ func (c *Clientset) Tracker() testing.ObjectTracker {
|
||||
return c.tracker
|
||||
}
|
||||
|
||||
// IsWatchListSemanticsSupported informs the reflector that this client
|
||||
// doesn't support WatchList semantics.
|
||||
//
|
||||
// This is a synthetic method whose sole purpose is to satisfy the optional
|
||||
// interface check performed by the reflector.
|
||||
// Returning true signals that WatchList can NOT be used.
|
||||
// No additional logic is implemented here.
|
||||
func (c *Clientset) IsWatchListSemanticsUnSupported() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
var (
|
||||
_ clientset.Interface = &Clientset{}
|
||||
_ testing.FakeClient = &Clientset{}
|
||||
|
||||
@@ -97,6 +97,7 @@ func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Dur
|
||||
// NewFilteredSharedInformerFactory constructs a new instance of sharedInformerFactory.
|
||||
// Listers obtained via this SharedInformerFactory will be subject to the same filters
|
||||
// as specified here.
|
||||
//
|
||||
// Deprecated: Please use NewSharedInformerFactoryWithOptions instead
|
||||
func NewFilteredSharedInformerFactory(client versioned.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
|
||||
return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
|
||||
@@ -204,7 +205,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
|
||||
//
|
||||
// It is typically used like this:
|
||||
//
|
||||
// ctx, cancel := context.Background()
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
// defer cancel()
|
||||
// factory := NewSharedInformerFactory(client, resyncPeriod)
|
||||
// defer factory.WaitForStop() // Returns immediately if nothing was started.
|
||||
|
||||
@@ -56,7 +56,7 @@ func NewClusterScalingScheduleInformer(client versioned.Interface, resyncPeriod
|
||||
// one. This reduces memory footprint and number of connections to the server.
|
||||
func NewFilteredClusterScalingScheduleInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
|
||||
return cache.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
cache.ToListWatcherWithWatchListSemantics(&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
if tweakListOptions != nil {
|
||||
tweakListOptions(&options)
|
||||
@@ -81,7 +81,7 @@ func NewFilteredClusterScalingScheduleInformer(client versioned.Interface, resyn
|
||||
}
|
||||
return client.ZalandoV1().ClusterScalingSchedules().Watch(ctx, options)
|
||||
},
|
||||
},
|
||||
}, client),
|
||||
&apiszalandoorgv1.ClusterScalingSchedule{},
|
||||
resyncPeriod,
|
||||
indexers,
|
||||
|
||||
@@ -57,7 +57,7 @@ func NewScalingScheduleInformer(client versioned.Interface, namespace string, re
|
||||
// one. This reduces memory footprint and number of connections to the server.
|
||||
func NewFilteredScalingScheduleInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
|
||||
return cache.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
cache.ToListWatcherWithWatchListSemantics(&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
if tweakListOptions != nil {
|
||||
tweakListOptions(&options)
|
||||
@@ -82,7 +82,7 @@ func NewFilteredScalingScheduleInformer(client versioned.Interface, namespace st
|
||||
}
|
||||
return client.ZalandoV1().ScalingSchedules(namespace).Watch(ctx, options)
|
||||
},
|
||||
},
|
||||
}, client),
|
||||
&apiszalandoorgv1.ScalingSchedule{},
|
||||
resyncPeriod,
|
||||
indexers,
|
||||
|
||||
@@ -185,7 +185,7 @@ type MetricTypeName struct {
|
||||
|
||||
func (m MetricTypeName) String() string {
|
||||
str := fmt.Sprintf("%s/%s", m.Type, m.Metric.Name)
|
||||
if len(m.Metric.Selector.MatchLabels) > 0 {
|
||||
if m.Metric.Selector != nil && len(m.Metric.Selector.MatchLabels) > 0 {
|
||||
str += " " + mapToString(m.Metric.Selector.MatchLabels)
|
||||
}
|
||||
return str
|
||||
|
||||
@@ -30,6 +30,88 @@ func (c *mockCollector) Interval() time.Duration {
|
||||
return 0
|
||||
}
|
||||
|
||||
func TestMetricTypeName_String(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
metricName string
|
||||
selector *metav1.LabelSelector
|
||||
expectedStr string
|
||||
shouldContain []string
|
||||
}{
|
||||
{
|
||||
name: "nil selector with PodsMetricSourceType",
|
||||
metricType: autoscalingv2.PodsMetricSourceType,
|
||||
metricName: "metric-name",
|
||||
selector: nil,
|
||||
expectedStr: "Pods/metric-name",
|
||||
},
|
||||
{
|
||||
name: "nil selector with ObjectMetricSourceType",
|
||||
metricType: autoscalingv2.ObjectMetricSourceType,
|
||||
metricName: "metric-name",
|
||||
selector: nil,
|
||||
expectedStr: "Object/metric-name",
|
||||
},
|
||||
{
|
||||
name: "nil selector with ExternalMetricSourceType",
|
||||
metricType: autoscalingv2.ExternalMetricSourceType,
|
||||
metricName: "metric-name",
|
||||
selector: nil,
|
||||
expectedStr: "External/metric-name",
|
||||
},
|
||||
{
|
||||
name: "empty selector (non-nil but empty MatchLabels)",
|
||||
metricType: autoscalingv2.ExternalMetricSourceType,
|
||||
metricName: "metric-name",
|
||||
selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{},
|
||||
},
|
||||
expectedStr: "External/metric-name",
|
||||
},
|
||||
{
|
||||
name: "selector with single MatchLabel",
|
||||
metricType: autoscalingv2.ExternalMetricSourceType,
|
||||
metricName: "metric-name",
|
||||
selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{"key": "value"},
|
||||
},
|
||||
shouldContain: []string{"External/metric-name", "key=value"},
|
||||
},
|
||||
{
|
||||
name: "selector with multiple MatchLabels",
|
||||
metricType: autoscalingv2.ExternalMetricSourceType,
|
||||
metricName: "metric-name",
|
||||
selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{"key1": "val1", "key2": "val2"},
|
||||
},
|
||||
shouldContain: []string{"External/metric-name", "key1=val1", "key2=val2"},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
mtn := MetricTypeName{
|
||||
Type: tc.metricType,
|
||||
Metric: autoscalingv2.MetricIdentifier{
|
||||
Name: tc.metricName,
|
||||
Selector: tc.selector,
|
||||
},
|
||||
}
|
||||
|
||||
result := mtn.String()
|
||||
|
||||
if tc.expectedStr != "" {
|
||||
require.Equal(t, tc.expectedStr, result)
|
||||
}
|
||||
|
||||
if len(tc.shouldContain) > 0 {
|
||||
for _, substring := range tc.shouldContain {
|
||||
require.Contains(t, result, substring, "result should contain %q", substring)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewCollector(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
msg string
|
||||
|
||||
@@ -63,7 +63,7 @@ func (p *ExternalRPSCollectorPlugin) NewCollector(
|
||||
confCopy := *config
|
||||
|
||||
if _, ok := config.Config["hostnames"]; !ok {
|
||||
return nil, fmt.Errorf("Hostname is not specified, unable to create collector")
|
||||
return nil, fmt.Errorf("hostname is not specified, unable to create collector")
|
||||
}
|
||||
|
||||
hostnames := strings.Split(config.Config["hostnames"], ",")
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
type testExternalMetricsHandler struct {
|
||||
@@ -84,7 +83,7 @@ func makeTestHTTPCollectorConfig(endpoint, aggregator string) *MetricConfig {
|
||||
Type: autoscalingv2.ExternalMetricSourceType,
|
||||
Metric: autoscalingv2.MetricIdentifier{
|
||||
Name: "test-metric",
|
||||
Selector: &v1.LabelSelector{
|
||||
Selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{"type": HTTPJSONPathType},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -2,6 +2,7 @@ package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
@@ -88,7 +89,7 @@ func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathG
|
||||
return nil, err
|
||||
}
|
||||
if d < 0 {
|
||||
return nil, fmt.Errorf("Invalid request-timeout config value: %s", v)
|
||||
return nil, fmt.Errorf("invalid request-timeout config value: %s", v)
|
||||
}
|
||||
requestTimeout = d
|
||||
}
|
||||
@@ -99,7 +100,7 @@ func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathG
|
||||
return nil, err
|
||||
}
|
||||
if d < 0 {
|
||||
return nil, fmt.Errorf("Invalid connect-timeout config value: %s", v)
|
||||
return nil, fmt.Errorf("invalid connect-timeout config value: %s", v)
|
||||
}
|
||||
connectTimeout = d
|
||||
}
|
||||
@@ -122,7 +123,7 @@ func (g *PodMetricsJSONPathGetter) buildMetricsURL(podIP string) url.URL {
|
||||
|
||||
return url.URL{
|
||||
Scheme: scheme,
|
||||
Host: fmt.Sprintf("%s:%d", podIP, g.port),
|
||||
Host: net.JoinHostPort(podIP, strconv.Itoa(g.port)),
|
||||
Path: g.path,
|
||||
RawQuery: g.rawQuery,
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestInfluxDBCollector_New(t *testing.T) {
|
||||
@@ -24,7 +23,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
|
||||
Metric: autoscalingv2.MetricIdentifier{
|
||||
Name: "flux-query",
|
||||
// This is actually useless, because the selector should be flattened in Config when parsing.
|
||||
Selector: &v1.LabelSelector{
|
||||
Selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"query-name": "range2m",
|
||||
},
|
||||
@@ -62,7 +61,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
|
||||
Type: autoscalingv2.ExternalMetricSourceType,
|
||||
Metric: autoscalingv2.MetricIdentifier{
|
||||
Name: "flux-query",
|
||||
Selector: &v1.LabelSelector{
|
||||
Selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"query-name": "range2m",
|
||||
},
|
||||
|
||||
@@ -135,7 +135,7 @@ func (c *PodCollector) Interval() time.Duration {
|
||||
func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, errCh chan error) {
|
||||
value, err := c.Getter.GetMetric(&pod)
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||
errCh <- fmt.Errorf("failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -289,9 +289,9 @@ func makeTestPods(t *testing.T, testServer string, metricName string, port strin
|
||||
}
|
||||
|
||||
if podDeletionTimestamp.IsZero() {
|
||||
testPod.ObjectMeta.DeletionTimestamp = nil
|
||||
testPod.DeletionTimestamp = nil
|
||||
} else {
|
||||
testPod.ObjectMeta.DeletionTimestamp = &v1.Time{Time: podDeletionTimestamp}
|
||||
testPod.DeletionTimestamp = &v1.Time{Time: podDeletionTimestamp}
|
||||
}
|
||||
_, err := client.CoreV1().Pods(testNamespace).Create(context.Background(), testPod, v1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -654,7 +654,7 @@ func TestScalingScheduleCollector(t *testing.T) {
|
||||
require.EqualValues(t, namespace, collected[0].Custom.DescribedObject.Namespace)
|
||||
require.EqualValues(t, "zalando.org/v1", collected[0].Custom.DescribedObject.APIVersion)
|
||||
require.EqualValues(t, resourceType, collected[0].Custom.DescribedObject.Kind)
|
||||
require.EqualValues(t, uTCNowRFC3339, collected[0].Custom.Timestamp.Time.Format(time.RFC3339))
|
||||
require.EqualValues(t, uTCNowRFC3339, collected[0].Custom.Timestamp.Format(time.RFC3339))
|
||||
require.EqualValues(t, collected[0].Custom.Metric.Name, scalingScheduleName)
|
||||
require.EqualValues(t, namespace, collected[0].Namespace)
|
||||
}
|
||||
|
||||
@@ -173,7 +173,7 @@ func (c *SkipperCollector) getCollector(ctx context.Context) (Collector, error)
|
||||
}
|
||||
|
||||
for _, rule := range ingress.Spec.Rules {
|
||||
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1)))
|
||||
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.ReplaceAll(rule.Host, ".", "_")))
|
||||
}
|
||||
case "RouteGroup":
|
||||
routegroup, err := c.rgClient.ZalandoV1().RouteGroups(c.objectReference.Namespace).Get(ctx, c.objectReference.Name, metav1.GetOptions{})
|
||||
@@ -187,7 +187,7 @@ func (c *SkipperCollector) getCollector(ctx context.Context) (Collector, error)
|
||||
}
|
||||
|
||||
for _, host := range routegroup.Spec.Hosts {
|
||||
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(host, ".", "_", -1)))
|
||||
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.ReplaceAll(host, ".", "_")))
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown skipper resource kind %s for resource %s/%s", c.objectReference.Kind, c.objectReference.Namespace, c.objectReference.Name)
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/restmapper"
|
||||
"k8s.io/client-go/scale"
|
||||
scaleclient "k8s.io/client-go/scale"
|
||||
)
|
||||
|
||||
@@ -41,8 +40,8 @@ func NewHPATargetScaler(ctx context.Context, kubeClient kubernetes.Interface, cf
|
||||
restMapper.Reset()
|
||||
}, 30*time.Second, ctx.Done())
|
||||
|
||||
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(kubeClient.Discovery())
|
||||
scaleClient, err := scale.NewForConfig(cfg, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
|
||||
scaleKindResolver := scaleclient.NewDiscoveryScaleKindResolver(kubeClient.Discovery())
|
||||
scaleClient, err := scaleclient.NewForConfig(cfg, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create scale client: %w", err)
|
||||
}
|
||||
|
||||
@@ -8,12 +8,10 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
v1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/apis/zalando.org/v1"
|
||||
scalingschedulefake "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/fake"
|
||||
zfake "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/fake"
|
||||
zalandov1 "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned/typed/zalando.org/v1"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
||||
v2 "k8s.io/api/autoscaling/v2"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -217,7 +215,7 @@ func TestRunOnce(t *testing.T) {
|
||||
} {
|
||||
t.Run(tc.msg, func(t *testing.T) {
|
||||
// setup fake client and cache
|
||||
client := scalingschedulefake.NewSimpleClientset()
|
||||
client := zfake.NewSimpleClientset()
|
||||
|
||||
clusterScalingSchedulesStore := fakeClusterScalingScheduleStore{
|
||||
client: client.ZalandoV1(),
|
||||
@@ -425,25 +423,25 @@ func TestAdjustScaling(t *testing.T) {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "hpa-1",
|
||||
},
|
||||
Spec: v2.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: v2.CrossVersionObjectReference{
|
||||
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
|
||||
APIVersion: "apps/v1",
|
||||
Kind: "Deployment",
|
||||
Name: "deployment-1",
|
||||
},
|
||||
MinReplicas: ptr.To(int32(1)),
|
||||
MaxReplicas: 1000,
|
||||
Metrics: []v2.MetricSpec{
|
||||
Metrics: []autoscalingv2.MetricSpec{
|
||||
{
|
||||
Type: v2.ObjectMetricSourceType,
|
||||
Object: &v2.ObjectMetricSource{
|
||||
DescribedObject: v2.CrossVersionObjectReference{
|
||||
Type: autoscalingv2.ObjectMetricSourceType,
|
||||
Object: &autoscalingv2.ObjectMetricSource{
|
||||
DescribedObject: autoscalingv2.CrossVersionObjectReference{
|
||||
APIVersion: "zalando.org/v1",
|
||||
Kind: "ClusterScalingSchedule",
|
||||
Name: "schedule-1",
|
||||
},
|
||||
Target: v2.MetricTarget{
|
||||
Type: v2.AverageValueMetricType,
|
||||
Target: autoscalingv2.MetricTarget{
|
||||
Type: autoscalingv2.AverageValueMetricType,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
+92
-28
@@ -8,6 +8,10 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v5"
|
||||
)
|
||||
|
||||
// Nakadi defines an interface for talking to the Nakadi API.
|
||||
@@ -95,26 +99,40 @@ func (c *Client) subscriptions(ctx context.Context, filter *SubscriptionFilter,
|
||||
endpoint.RawQuery = q.Encode()
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[nakadi subscriptions] failed to create request: %w", err)
|
||||
op := func() ([]byte, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[nakadi subscriptions] failed to create request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[nakadi subscriptions] failed to make request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := checkResponseStatus(resp, b); err != nil {
|
||||
return nil, fmt.Errorf("[nakadi subscriptions] %w", err)
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[nakadi subscriptions] failed to make request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
d, err := io.ReadAll(resp.Body)
|
||||
d, err := backoff.Retry(
|
||||
ctx,
|
||||
op,
|
||||
backoff.WithBackOff(exponentialBackoff()),
|
||||
backoff.WithMaxTries(3),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("[nakadi subscriptions] unexpected response code: %d (%s)", resp.StatusCode, string(d))
|
||||
}
|
||||
|
||||
var subscriptionsResp struct {
|
||||
Items []struct {
|
||||
ID string `json:"id"`
|
||||
@@ -194,26 +212,40 @@ func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]stats
|
||||
q.Set("show_time_lag", "true")
|
||||
endpoint.RawQuery = q.Encode()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[nakadi stats] failed to create request: %w", err)
|
||||
op := func() ([]byte, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[nakadi stats] failed to create request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[nakadi stats] failed to make request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := checkResponseStatus(resp, b); err != nil {
|
||||
return nil, fmt.Errorf("[nakadi stats] %w", err)
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[nakadi stats] failed to make request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
d, err := io.ReadAll(resp.Body)
|
||||
d, err := backoff.Retry(
|
||||
ctx,
|
||||
op,
|
||||
backoff.WithBackOff(exponentialBackoff()),
|
||||
backoff.WithMaxTries(3),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
|
||||
}
|
||||
|
||||
var result statsResp
|
||||
err = json.Unmarshal(d, &result)
|
||||
if err != nil {
|
||||
@@ -229,3 +261,35 @@ func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]stats
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func checkResponseStatus(resp *http.Response, b []byte) error {
|
||||
if resp.StatusCode == http.StatusTooManyRequests {
|
||||
h := resp.Header.Get("Retry-After")
|
||||
if h == "" {
|
||||
return fmt.Errorf("unexpected response code: %d (%s)", resp.StatusCode, string(b))
|
||||
|
||||
}
|
||||
sec, err := strconv.ParseInt(h, 10, 32)
|
||||
if err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
return backoff.RetryAfter(int(sec))
|
||||
}
|
||||
if resp.StatusCode >= http.StatusBadRequest &&
|
||||
resp.StatusCode < http.StatusInternalServerError {
|
||||
return backoff.Permanent(
|
||||
fmt.Errorf("non-retryable response code: %d (%s)", resp.StatusCode, string(b)),
|
||||
)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("unexpected response code: %d (%s)", resp.StatusCode, string(b))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func exponentialBackoff() backoff.BackOff {
|
||||
b := backoff.NewExponentialBackOff()
|
||||
b.MaxInterval = time.Second * 30
|
||||
b.InitialInterval = time.Millisecond * 100
|
||||
return b
|
||||
}
|
||||
|
||||
@@ -5,8 +5,10 @@ import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/cenkalti/backoff/v5"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@@ -210,11 +212,70 @@ func TestQuery(tt *testing.T) {
|
||||
|
||||
nakadiClient := NewNakadiClient(ts.URL, client)
|
||||
consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), ti.subscriptionFilter)
|
||||
assert.Equal(t, ti.err, err)
|
||||
assertErrorMessage(t, ti.err, err)
|
||||
assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds)
|
||||
unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), ti.subscriptionFilter)
|
||||
assert.Equal(t, ti.err, err)
|
||||
assertErrorMessage(t, ti.err, err)
|
||||
assert.Equal(t, ti.unconsumedEvents, unconsumedEvents)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_checkResponseStatus(tt *testing.T) {
|
||||
for _, ti := range []struct {
|
||||
msg string
|
||||
resp *http.Response
|
||||
errCheck func(error) bool
|
||||
}{
|
||||
{
|
||||
msg: "nil when 200",
|
||||
resp: &http.Response{StatusCode: http.StatusOK},
|
||||
},
|
||||
{
|
||||
msg: "backoff.Permanent when 4xx",
|
||||
resp: &http.Response{StatusCode: http.StatusBadRequest},
|
||||
errCheck: func(err error) bool {
|
||||
var permanentErr *backoff.PermanentError
|
||||
return errors.As(err, &permanentErr)
|
||||
},
|
||||
},
|
||||
{
|
||||
msg: "unexpected error when 422 without Retry-After header",
|
||||
resp: &http.Response{StatusCode: http.StatusTooManyRequests},
|
||||
errCheck: func(err error) bool {
|
||||
return strings.Contains(err.Error(), "unexpected response code")
|
||||
},
|
||||
},
|
||||
{
|
||||
msg: "backoff.RetryAfter when 422 with Retry-After header",
|
||||
resp: &http.Response{
|
||||
StatusCode: http.StatusTooManyRequests,
|
||||
Header: http.Header{
|
||||
"Retry-After": []string{"120"},
|
||||
},
|
||||
},
|
||||
errCheck: func(err error) bool {
|
||||
var retryAfterErr *backoff.RetryAfterError
|
||||
return errors.As(err, &retryAfterErr)
|
||||
},
|
||||
},
|
||||
} {
|
||||
tt.Run(ti.msg, func(t *testing.T) {
|
||||
err := checkResponseStatus(ti.resp, nil)
|
||||
if ti.errCheck == nil {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.True(t, ti.errCheck(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func assertErrorMessage(t *testing.T, expected, actual error) {
|
||||
t.Helper()
|
||||
if expected != nil {
|
||||
assert.EqualError(t, actual, expected.Error())
|
||||
} else {
|
||||
assert.NoError(t, actual)
|
||||
}
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user