Simple ZMON collector implementation (#2)

* Simple ZMON collector implementation

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>

* Add tests for ZMON client

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>

* Add tests for zmon collector

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>

* Update ZMON collector docs

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>

* Expose tags instead of entities for queries

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>

* Remove unused function

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
This commit is contained in:
Mikkel Oscar Lyderik Larsen
2018-10-29 14:26:25 +01:00
committed by Arjun
parent b18acf3ed0
commit c86a82ca88
8 changed files with 964 additions and 31 deletions
+269
View File
@@ -0,0 +1,269 @@
package zmon
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
)
var (
// set of valid aggregators that can be used in queries
// https://kairosdb.github.io/docs/build/html/restapi/Aggregators.html
validAggregators = map[string]struct{}{
"avg": struct{}{},
"dev": struct{}{},
"count": struct{}{},
"first": struct{}{},
"last": struct{}{},
"max": struct{}{},
"min": struct{}{},
"sum": struct{}{},
"diff": struct{}{},
}
)
// Entity defines a ZMON entity.
type Entity struct {
ID string `json:"id"`
}
// ZMON defines an interface for talking to the ZMON API.
type ZMON interface {
Query(checkID int, key string, tags map[string]string, aggregators []string, duration time.Duration) ([]DataPoint, error)
}
// Client defines client for interfacing with the ZMON API.
type Client struct {
dataServiceEndpoint string
http *http.Client
}
// NewZMONClient initializes a new ZMON Client.
func NewZMONClient(dataServiceEndpoint string, client *http.Client) *Client {
return &Client{
dataServiceEndpoint: dataServiceEndpoint,
http: client,
}
}
// DataPoint defines a single datapoint returned from a query.
type DataPoint struct {
Time time.Time
Value float64
}
type metricQuery struct {
StartRelative sampling `json:"start_relative"`
Metrics []metric `json:"metrics"`
}
type sampling struct {
Value int64 `json:"value"`
Unit string `json:"unit"`
}
type metric struct {
Name string `json:"name"`
Limit int `json:"limit"`
Tags map[string][]string `json:"tags"`
GroupBy []tagGroup `json:"group_by"`
Aggregators []aggregator `json:"aggregator"`
}
type tagGroup struct {
Name string `json:"name"`
Tags []string `json:"tags"`
}
type aggregator struct {
Name string `json:"name"`
Sampling sampling `json:"sampling"`
}
type queryResp struct {
Queries []struct {
Results []struct {
Values [][]float64 `json:"values"`
} `json:"results"`
} `json:"queries"`
}
// Query queries the ZMON KairosDB endpoint and returns the resulting list of
// data points for the query.
//
// https://kairosdb.github.io/docs/build/html/restapi/QueryMetrics.html
func (c *Client) Query(checkID int, key string, tags map[string]string, aggregators []string, duration time.Duration) ([]DataPoint, error) {
endpoint, err := url.Parse(c.dataServiceEndpoint)
if err != nil {
return nil, err
}
// convert tags map
tagsSlice := make(map[string][]string, len(tags))
for k, v := range tags {
tagsSlice[k] = []string{v}
}
query := metricQuery{
StartRelative: durationToSampling(duration),
Metrics: []metric{
{
Name: fmt.Sprintf("zmon.check.%d", checkID),
Limit: 10000, // maximum limit of ZMON
Tags: tagsSlice,
GroupBy: []tagGroup{
{
Name: "tag",
Tags: []string{
"key",
},
},
},
Aggregators: make([]aggregator, 0, len(aggregators)),
},
},
}
// add aggregators
for _, aggregatorName := range aggregators {
if _, ok := validAggregators[aggregatorName]; !ok {
return nil, fmt.Errorf("invalid aggregator '%s'", aggregatorName)
}
query.Metrics[0].Aggregators = append(query.Metrics[0].Aggregators, aggregator{
Name: aggregatorName,
Sampling: durationToSampling(duration),
})
}
// add key to query if defined
if key != "" {
query.Metrics[0].Tags["key"] = []string{key}
}
body, err := json.Marshal(&query)
if err != nil {
return nil, err
}
endpoint.Path += "/api/v1/datapoints/query"
req, err := http.NewRequest(http.MethodPost, endpoint.String(), bytes.NewBuffer(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
d, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("[kariosdb query] unexpected response code: %d", resp.StatusCode)
}
var result queryResp
err = json.Unmarshal(d, &result)
if err != nil {
return nil, err
}
if len(result.Queries) < 1 {
return nil, nil
}
if len(result.Queries[0].Results) < 1 {
return nil, nil
}
dataPoints := make([]DataPoint, 0, len(result.Queries[0].Results[0].Values))
for _, value := range result.Queries[0].Results[0].Values {
if len(value) != 2 {
return nil, fmt.Errorf("[kariosdb query] unexpected response data")
}
point := DataPoint{
Time: time.Unix(0, int64(value[0])*1000000),
Value: value[1],
}
dataPoints = append(dataPoints, point)
}
return dataPoints, nil
}
const (
day = 24 * time.Hour
week = day * 7
month = day * 30
year = day * 365
)
// durationToSampling converts a time.Duration to the sampling format expected
// by karios db. E.g. the duration `1 * time.Hour` would be converted to:
// sampling{
// Unit: "minutes",
// Value: 1,
// }
func durationToSampling(d time.Duration) sampling {
for _, u := range []struct {
Unit string
Nanoseconds time.Duration
}{
{
Unit: "years",
Nanoseconds: year,
},
{
Unit: "months",
Nanoseconds: month,
},
{
Unit: "weeks",
Nanoseconds: week,
},
{
Unit: "days",
Nanoseconds: day,
},
{
Unit: "hours",
Nanoseconds: 1 * time.Hour,
},
{
Unit: "minutes",
Nanoseconds: 1 * time.Minute,
},
{
Unit: "seconds",
Nanoseconds: 1 * time.Second,
},
{
Unit: "milliseconds",
Nanoseconds: 1 * time.Millisecond,
},
} {
if d.Nanoseconds()/int64(u.Nanoseconds) >= 1 {
return sampling{
Unit: u.Unit,
Value: int64(d.Round(u.Nanoseconds) / u.Nanoseconds),
}
}
}
return sampling{
Unit: "milliseconds",
Value: 0,
}
}
+184
View File
@@ -0,0 +1,184 @@
package zmon
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestQuery(tt *testing.T) {
client := &http.Client{}
for _, ti := range []struct {
msg string
duration time.Duration
aggregators []string
status int
body string
err error
dataPoints []DataPoint
key string
}{
{
msg: "test getting back a single data point",
duration: 1 * time.Hour,
status: http.StatusOK,
body: `{
"queries": [
{
"results": [
{
"values": [
[1539710395000,765952]
]
}
]
}
]
}`,
dataPoints: []DataPoint{
{
Time: time.Unix(1539710395, 0),
Value: 765952,
},
},
},
{
msg: "test getting back a single datapoint with key",
duration: 1 * time.Hour,
status: http.StatusOK,
key: "my-key",
body: `{
"queries": [
{
"results": [
{
"values": [
[1539710395000,765952]
]
}
]
}
]
}`,
dataPoints: []DataPoint{
{
Time: time.Unix(1539710395, 0),
Value: 765952,
},
},
},
{
msg: "test getting back a single datapoint with aggregators",
duration: 1 * time.Hour,
status: http.StatusOK,
aggregators: []string{"max"},
body: `{
"queries": [
{
"results": [
{
"values": [
[1539710395000,765952]
]
}
]
}
]
}`,
dataPoints: []DataPoint{
{
Time: time.Unix(1539710395, 0),
Value: 765952,
},
},
},
{
msg: "test query with invalid aggregator",
aggregators: []string{"invalid"},
err: fmt.Errorf("invalid aggregator 'invalid'"),
},
{
msg: "test query with invalid response",
status: http.StatusInternalServerError,
body: `{"error": 500}`,
err: fmt.Errorf("[kariosdb query] unexpected response code: 500"),
},
{
msg: "test getting invalid values response",
duration: 1 * time.Hour,
status: http.StatusOK,
body: `{
"queries": [
{
"results": [
{
"values": [
[1539710395000,765952,1]
]
}
]
}
]
}`,
err: fmt.Errorf("[kariosdb query] unexpected response data"),
},
} {
tt.Run(ti.msg, func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(ti.status)
w.Write([]byte(ti.body))
}),
)
defer ts.Close()
zmonClient := NewZMONClient(ts.URL, client)
dataPoints, err := zmonClient.Query(1, ti.key, nil, ti.aggregators, ti.duration)
assert.Equal(t, ti.err, err)
assert.Len(t, dataPoints, len(ti.dataPoints))
assert.Equal(t, ti.dataPoints, dataPoints)
})
}
}
func TestDurationToSampling(tt *testing.T) {
for _, ti := range []struct {
msg string
duration time.Duration
sampling sampling
}{
{
msg: "1 hour should map to hours sampling",
duration: 1 * time.Hour,
sampling: sampling{
Unit: "hours",
Value: 1,
},
},
{
msg: "2 years should map to years sampling",
duration: 2 * day * 365,
sampling: sampling{
Unit: "years",
Value: 2,
},
},
{
msg: "1 nanosecond should map to 0 milliseconds sampling",
duration: 1,
sampling: sampling{
Unit: "milliseconds",
Value: 0,
},
},
} {
tt.Run(ti.msg, func(t *testing.T) {
assert.Equal(t, durationToSampling(ti.duration), ti.sampling)
})
}
}