package nakadi

import (
	"context"
	"errors"
	"net/http"
	"net/http/httptest"
	"testing"

	"github.com/stretchr/testify/assert"
)

func TestQuery(tt *testing.T) {
	client := &http.Client{}
	for _, ti := range []struct {
		msg                string
		status             int
		responseBody       string
		err                error
		unconsumedEvents   int64
		consumerLagSeconds int64
	}{
		{
			msg:    "test getting a single event-type",
			status: http.StatusOK,
			responseBody: `{
					  "items": [
					    {
					      "event_type": "example-event",
					      "partitions": [
						{
						  "partition": "0",
						  "state": "assigned",
						  "unconsumed_events": 4,
						  "consumer_lag_seconds": 2,
						  "stream_id": "example-id",
						  "assignment_type": "auto"
						},
						{
						  "partition": "0",
						  "state": "assigned",
						  "unconsumed_events": 5,
						  "consumer_lag_seconds": 1,
						  "stream_id": "example-id",
						  "assignment_type": "auto"
						}
					     ]
					   }
					 ]
				       }`,
			unconsumedEvents:   9,
			consumerLagSeconds: 2,
		},
		{
			msg:    "test getting multiple event-types",
			status: http.StatusOK,
			responseBody: `{
					  "items": [
					    {
					      "event_type": "example-event",
					      "partitions": [
						{
						  "partition": "0",
						  "state": "assigned",
						  "unconsumed_events": 4,
						  "consumer_lag_seconds": 2,
						  "stream_id": "example-id",
						  "assignment_type": "auto"
						},
						{
						  "partition": "0",
						  "state": "assigned",
						  "unconsumed_events": 5,
						  "consumer_lag_seconds": 1,
						  "stream_id": "example-id",
						  "assignment_type": "auto"
						}
					      ]
					     },
					     {
					      "event_type": "example-event-2",
					      "partitions": [
						{
						  "partition": "0",
						  "state": "assigned",
						  "unconsumed_events": 4,
						  "consumer_lag_seconds": 6,
						  "stream_id": "example-id",
						  "assignment_type": "auto"
						},
						{
						  "partition": "0",
						  "state": "assigned",
						  "unconsumed_events": 5,
						  "consumer_lag_seconds": 1,
						  "stream_id": "example-id",
						  "assignment_type": "auto"
						}
					       ]
					     }
					  ]
				       }`,
			unconsumedEvents:   18,
			consumerLagSeconds: 6,
		},
		{
			msg:          "test call with invalid response",
			status:       http.StatusInternalServerError,
			responseBody: `{"error": 500}`,
			err:          errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"),
		},
		{
			msg:    "test getting back a single data point",
			status: http.StatusOK,
			responseBody: `{
					  "items": []
				       }`,
			err: errors.New("expected at least 1 event-type, 0 returned"),
		},
	} {
		tt.Run(ti.msg, func(t *testing.T) {
			ts := httptest.NewServer(http.HandlerFunc(
				func(w http.ResponseWriter, r *http.Request) {
					w.WriteHeader(ti.status)
					_, err := w.Write([]byte(ti.responseBody))
					assert.NoError(t, err)
				}),
			)
			defer ts.Close()

			nakadiClient := NewNakadiClient(ts.URL, client)
			consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id")
			assert.Equal(t, ti.err, err)
			assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds)
			unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id")
			assert.Equal(t, ti.err, err)
			assert.Equal(t, ti.unconsumedEvents, unconsumedEvents)
		})
	}

}