diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 40ea88211bec..cf9d67c1d1a2 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -39,13 +39,24 @@ func main() { tls := flag.Bool("tls", false, "Does the loki connection use TLS?") user := flag.String("user", "", "Loki username") pass := flag.String("pass", "", "Loki password") + queryTimeout := flag.Duration("query-timeout", 10*time.Second, "How long to wait for a query response from Loki") interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries") size := flag.Int("size", 100, "Size in bytes of each log line") wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries before reporting them lost") - pruneInterval := flag.Duration("pruneinterval", 60*time.Second, "Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki") + pruneInterval := flag.Duration("pruneinterval", 60*time.Second, "Frequency to check sent vs received logs, "+ + "also the frequency which queries for missing logs will be dispatched to loki, and the frequency spot check queries are run") buckets := flag.Int("buckets", 10, "Number of buckets in the response_latency histogram") + metricTestInterval := flag.Duration("metric-test-interval", 1*time.Hour, "The interval the metric test query should be run") + metricTestQueryRange := flag.Duration("metric-test-range", 24*time.Hour, "The range value [24h] used in the metric test instant-query."+ + " Note: this value is truncated to the running time of the canary until this value is reached") + + spotCheckInterval := flag.Duration("spot-check-interval", 15*time.Minute, "Interval that a single result will be kept from sent entries and spot-checked against Loki, "+ + "e.g. 15min default one entry every 15 min will be saved and then queried again every 15min until spot-check-max is reached") + spotCheckMax := flag.Duration("spot-check-max", 4*time.Hour, "How far back to check a spot check entry before dropping it") + spotCheckQueryRate := flag.Duration("spot-check-query-rate", 1*time.Minute, "Interval that the canary will query Loki for the current list of all spot check entries") + printVersion := flag.Bool("version", false, "Print this builds version information") flag.Parse() @@ -71,8 +82,8 @@ func main() { defer c.lock.Unlock() c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size) - c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal, *sName, *sValue) - c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, c.reader, true) + c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue) + c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true) } startCanary() diff --git a/docs/operations/loki-canary.md b/docs/operations/loki-canary.md index 71b67ff7d8ab..1b9be3f1d52c 100644 --- a/docs/operations/loki-canary.md +++ b/docs/operations/loki-canary.md @@ -47,6 +47,52 @@ determine if they are truly missing or only missing from the WebSocket. If missing entries are not found in the direct query, the `missing_entries` counter is incremented. +### Additional Queries + +#### Spot Check + +Starting with version 1.6.0, the canary will spot check certain results over time +to make sure they are present in Loki, this is helpful for testing the transition +of inmemory logs in the ingester to the store to make sure nothing is lost. + +`-spot-check-interval` and `-spot-check-max` are used to tune this feature, +`-spot-check-interval` will pull a log entry from the stream at this interval +and save it in a separate list up to `-spot-check-max`. + +Every `-spot-check-query-rate`, Loki will be queried for each entry in this list and +`loki_canary_spot_check_entries_total` will be incremented, if a result +is missing `loki_canary_spot_check_missing_entries_total` will be incremented. + +The defaults of `15m` for `spot-check-interval` and `4h` for `spot-check-max` +means that after 4 hours of running the canary will have a list of 16 entries +it will query every minute (default `spot-check-query-rate` interval is 1m), +so be aware of the query load this can put on Loki if you have a lot of canaries. + +#### Metric Test + +Starting with version 1.6.0 the canary will run a metric query `count_over_time` to +verify the rate of logs being stored in Loki corresponds to the rate they are being +created by the canary. + +`-metric-test-interval` and `-metric-test-range` are used to tune this feature, but +by default every `15m` the canary will run a `count_over_time` instant-query to Loki +for a range of `24h`. + +If the canary has not run for `-metric-test-range` (`24h`) the query range is adjusted +to the amount of time the canary has been running such that the rate can be calculated +since the canary was started. + +The canary calculates what the expected count of logs would be for the range +(also adjusting this based on canary runtime) and compares the expected result with +the actual result returned from Loki. The _difference_ is stored as the value in +the gauge `loki_canary_metric_test_deviation` + +It's expected that there will be some deviation, the method of creating an expected +calculation based on the query rate compared to actual query data is imperfect +and will lead to a deviation of a few log entries. + +It's not expected for there to be a deviation of more than 3-4 log entries. + ### Control Loki Canary responds to two endpoints to allow dynamic suspending/resuming of the @@ -246,14 +292,26 @@ All options: The label name for this instance of loki-canary to use in the log selector (default "name") -labelvalue string The unique label value for this instance of loki-canary to use in the log selector (default "loki-canary") + -metric-test-interval duration + The interval the metric test query should be run (default 1h0m0s) + -metric-test-range duration + The range value [24h] used in the metric test instant-query. Note: this value is truncated to the running time of the canary until this value is reached (default 24h0m0s) -pass string Loki password -port int Port which loki-canary should expose metrics (default 3500) -pruneinterval duration - Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki (default 1m0s) + Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki, and the frequency spot check queries are run (default 1m0s) + -query-timeout duration + How long to wait for a query response from Loki (default 10s) -size int Size in bytes of each log line (default 100) + -spot-check-interval duration + Interval that a single result will be kept from sent entries and spot-checked against Loki, e.g. 15min default one entry every 15 min will be saved andthen queried again every 15min until spot-check-max is reached (default 15m0s) + -spot-check-max duration + How far back to check a spot check entry before dropping it (default 4h0m0s) + -spot-check-query-rate duration + Interval that the canary will query Loki for the current list of all spot check entries (default 1m0s) -streamname string The stream name for this instance of loki-canary to use in the log selector (default "stream") -streamvalue string diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index b687ce26cd60..6f6e93d5e2b0 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -13,77 +13,128 @@ import ( ) const ( - ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n" - ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n" - ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n" - ErrDuplicateEntry = "received a duplicate entry for ts %v\n" - ErrUnexpectedEntry = "received an unexpected entry with ts %v\n" - DebugWebsocketMissingEntry = "websocket missing entry: %v\n" - DebugQueryResult = "confirmation query result: %v\n" + ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n" + ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n" + ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n" + ErrSpotCheckEntryNotReceived = "failed to find entry %v in Loki when spot check querying %v after it was written\n" + ErrDuplicateEntry = "received a duplicate entry for ts %v\n" + ErrUnexpectedEntry = "received an unexpected entry with ts %v\n" + DebugWebsocketMissingEntry = "websocket missing entry: %v\n" + DebugQueryResult = "confirmation query result: %v\n" ) var ( totalEntries = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", - Name: "total_entries", + Name: "entries_total", Help: "counts log entries written to the file", }) outOfOrderEntries = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", - Name: "out_of_order_entries", + Name: "out_of_order_entries_total", Help: "counts log entries received with a timestamp more recent than the others in the queue", }) wsMissingEntries = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", - Name: "websocket_missing_entries", + Name: "websocket_missing_entries_total", Help: "counts log entries not received within the maxWait duration via the websocket connection", }) missingEntries = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", - Name: "missing_entries", + Name: "missing_entries_total", Help: "counts log entries not received within the maxWait duration via both websocket and direct query", }) + spotCheckMissing = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "spot_check_missing_entries_total", + Help: "counts log entries not received when directly queried as part of spot checking", + }) + spotCheckEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "spot_check_entries_total", + Help: "total count of entries pot checked", + }) unexpectedEntries = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", - Name: "unexpected_entries", + Name: "unexpected_entries_total", Help: "counts a log entry received which was not expected (e.g. received after reported missing)", }) duplicateEntries = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", - Name: "duplicate_entries", + Name: "duplicate_entries_total", Help: "counts a log entry received more than one time", }) + metricTestDeviation = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_canary", + Name: "metric_test_deviation", + Help: "How many counts was the actual query result from the expected based on the canary log write rate", + }) responseLatency prometheus.Histogram ) type Comparator struct { - entMtx sync.Mutex - w io.Writer - entries []*time.Time - ackdEntries []*time.Time - maxWait time.Duration - pruneInterval time.Duration - confirmAsync bool - sent chan time.Time - recv chan time.Time - rdr reader.LokiReader - quit chan struct{} - done chan struct{} + entMtx sync.Mutex + spotMtx sync.Mutex + metTestMtx sync.Mutex + pruneMtx sync.Mutex + w io.Writer + entries []*time.Time + spotCheck []*time.Time + ackdEntries []*time.Time + maxWait time.Duration + pruneInterval time.Duration + pruneEntriesRunning bool + spotCheckInterval time.Duration + spotCheckMax time.Duration + spotCheckQueryRate time.Duration + spotCheckRunning bool + metricTestInterval time.Duration + metricTestRange time.Duration + metricTestRunning bool + writeInterval time.Duration + confirmAsync bool + startTime time.Time + sent chan time.Time + recv chan time.Time + rdr reader.LokiReader + quit chan struct{} + done chan struct{} } -func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.Duration, - buckets int, sentChan chan time.Time, receivedChan chan time.Time, reader reader.LokiReader, confirmAsync bool) *Comparator { +func NewComparator(writer io.Writer, + maxWait time.Duration, + pruneInterval time.Duration, + spotCheckInterval, spotCheckMax, spotCheckQueryRate time.Duration, + metricTestInterval time.Duration, + metricTestRange time.Duration, + writeInterval time.Duration, + buckets int, + sentChan chan time.Time, + receivedChan chan time.Time, + reader reader.LokiReader, + confirmAsync bool) *Comparator { c := &Comparator{ - w: writer, - entries: []*time.Time{}, - maxWait: maxWait, - pruneInterval: pruneInterval, - confirmAsync: confirmAsync, - sent: sentChan, - recv: receivedChan, - rdr: reader, - quit: make(chan struct{}), - done: make(chan struct{}), + w: writer, + entries: []*time.Time{}, + spotCheck: []*time.Time{}, + maxWait: maxWait, + pruneInterval: pruneInterval, + pruneEntriesRunning: false, + spotCheckInterval: spotCheckInterval, + spotCheckMax: spotCheckMax, + spotCheckQueryRate: spotCheckQueryRate, + spotCheckRunning: false, + metricTestInterval: metricTestInterval, + metricTestRange: metricTestRange, + metricTestRunning: false, + writeInterval: writeInterval, + confirmAsync: confirmAsync, + startTime: time.Now(), + sent: sentChan, + recv: receivedChan, + rdr: reader, + quit: make(chan struct{}), + done: make(chan struct{}), } if responseLatency == nil { @@ -112,6 +163,12 @@ func (c *Comparator) entrySent(time time.Time) { c.entMtx.Lock() defer c.entMtx.Unlock() c.entries = append(c.entries, &time) + //If this entry equals or exceeds the spot check interval from the last entry in the spot check array, add it. + if len(c.spotCheck) == 0 || time.Sub(*c.spotCheck[len(c.spotCheck)-1]) >= c.spotCheckInterval { + c.spotMtx.Lock() + c.spotCheck = append(c.spotCheck, &time) + c.spotMtx.Unlock() + } totalEntries.Inc() } @@ -173,8 +230,12 @@ func (c *Comparator) Size() int { func (c *Comparator) run() { t := time.NewTicker(c.pruneInterval) + mt := time.NewTicker(c.metricTestInterval) + sc := time.NewTicker(c.spotCheckQueryRate) defer func() { t.Stop() + mt.Stop() + sc.Stop() close(c.done) }() @@ -185,14 +246,133 @@ func (c *Comparator) run() { case e := <-c.sent: c.entrySent(e) case <-t.C: - c.pruneEntries() + // Only run one instance of prune entries at a time. + c.pruneMtx.Lock() + if !c.pruneEntriesRunning { + c.pruneEntriesRunning = true + go c.pruneEntries() + } + c.pruneMtx.Unlock() + case <-sc.C: + // Only run one instance of spot check at a time. + c.spotMtx.Lock() + if !c.spotCheckRunning { + c.spotCheckRunning = true + go c.spotCheckEntries(time.Now()) + } + c.spotMtx.Unlock() + case <-mt.C: + // Only run one intstance of metric tests at a time. + c.metTestMtx.Lock() + if !c.metricTestRunning { + c.metricTestRunning = true + go c.metricTest(time.Now()) + } + c.metTestMtx.Unlock() case <-c.quit: return } } } +func (c *Comparator) metricTest(currTime time.Time) { + // Always make sure to set the running state back to false + defer func() { + c.metTestMtx.Lock() + c.metricTestRunning = false + c.metTestMtx.Unlock() + }() + adjustedRange := c.metricTestRange + + // Adjust the query range to not be longer than the canary has been running. + // We can't query for 24 hours of counts if it's only been running for 10m, + // so we adjusted the range to the run time until we reach the desired lookback time. + if currTime.Add(-c.metricTestRange).Before(c.startTime) { + adjustedRange = currTime.Sub(c.startTime) + } + actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds())) + if err != nil { + fmt.Fprintf(c.w, "error running metric query test: %s\n", err.Error()) + return + } + expectedCount := float64(adjustedRange.Milliseconds()) / float64(c.writeInterval.Milliseconds()) + deviation := expectedCount - actualCount + // There is nothing special about the number 10 here, it's fairly common for the deviation to be 2-4 + // based on how expected is calculated vs the actual query data, more than 10 would be unlikely + // unless there is a problem. + if deviation > 10 { + fmt.Fprintf(c.w, "large metric deviation: expected %v, actual %v\n", expectedCount, actualCount) + } + metricTestDeviation.Set(deviation) +} + +func (c *Comparator) spotCheckEntries(currTime time.Time) { + // Always make sure to set the running state back to false + defer func() { + c.spotMtx.Lock() + c.spotCheckRunning = false + c.spotMtx.Unlock() + }() + c.spotMtx.Lock() + k := 0 + for i, e := range c.spotCheck { + if e.Before(currTime.Add(-c.spotCheckMax)) { + // Do nothing, if we don't increment the output index k, this will be dropped + } else { + if i != k { + c.spotCheck[k] = c.spotCheck[i] + } + k++ + } + } + // Nil out the pointers to any trailing elements which were removed from the slice + for i := k; i < len(c.spotCheck); i++ { + c.spotCheck[i] = nil // or the zero value of T + } + c.spotCheck = c.spotCheck[:k] + cpy := make([]*time.Time, len(c.spotCheck)) + //Make a copy so we don't have to hold the lock to verify entries + copy(cpy, c.spotCheck) + c.spotMtx.Unlock() + + for _, sce := range cpy { + spotCheckEntries.Inc() + // Because we are querying loki timestamps vs the timestamp in the log, + // make the range +/- 10 seconds to allow for clock inaccuracies + start := *sce + adjustedStart := start.Add(-10 * time.Second) + adjustedEnd := start.Add(10 * time.Second) + recvd, err := c.rdr.Query(adjustedStart, adjustedEnd) + if err != nil { + fmt.Fprintf(c.w, "error querying loki: %s\n", err) + return + } + + found := false + for _, r := range recvd { + if (*sce).Equal(r) { + found = true + break + } + } + if !found { + fmt.Fprintf(c.w, ErrSpotCheckEntryNotReceived, sce.UnixNano(), currTime.Sub(*sce)) + for _, r := range recvd { + fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano()) + } + spotCheckMissing.Inc() + } + } + +} + func (c *Comparator) pruneEntries() { + // Always make sure to set the running state back to false + defer func() { + c.pruneMtx.Lock() + c.pruneEntriesRunning = false + c.pruneMtx.Unlock() + }() c.entMtx.Lock() defer c.entMtx.Unlock() diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 0af065664410..5ed9beb65423 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Now() t2 := t1.Add(1 * time.Second) @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Now() t2 := t1.Add(1 * time.Second) @@ -101,7 +101,7 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Now() t2 := t1.Add(1 * time.Second) @@ -155,10 +155,10 @@ func TestEntryNeverReceived(t *testing.T) { found := []time.Time{t1, t3, t4, t5} - mr := &mockReader{found} + mr := &mockReader{resp: found} maxWait := 50 * time.Millisecond //We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below - c := NewComparator(actual, maxWait, 50*time.Hour, 1, make(chan time.Time), make(chan time.Time), mr, false) + c := NewComparator(actual, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) c.entrySent(t1) c.entrySent(t2) @@ -216,7 +216,7 @@ func TestPruneAckdEntires(t *testing.T) { actual := &bytes.Buffer{} maxWait := 30 * time.Millisecond //We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below - c := NewComparator(actual, maxWait, 50*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Now() t2 := t1.Add(1 * time.Millisecond) @@ -251,6 +251,103 @@ func TestPruneAckdEntires(t *testing.T) { } +func TestSpotCheck(t *testing.T) { + spotCheckMissing = &mockCounter{} + spotCheckEntries = &mockCounter{} + + actual := &bytes.Buffer{} + + t1 := time.Unix(0, 0) + entries := []time.Time{} + found := []time.Time{} + entries = append(entries, t1) + for i := 1; i <= 20; i++ { + t := entries[i-1].Add(1 * time.Millisecond) + entries = append(entries, t) + // Don't add the last entry so we get one error in spot check + if i != 20 { + found = append(found, t) + } + } + + mr := &mockReader{resp: found} + maxWait := 50 * time.Millisecond + spotCheck := 10 * time.Millisecond + spotCheckMax := 10 * time.Millisecond + //We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below + c := NewComparator(actual, maxWait, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) + + // Send all the entries + for i := range entries { + c.entrySent(entries[i]) + } + + assert.Equal(t, 3, len(c.spotCheck)) + + // Run with "current time" 11ms after start which will prune the first entry which is no "before" the 10ms spot check max + c.spotCheckEntries(time.Unix(0, 11*time.Millisecond.Nanoseconds())) + + // First entry should have been pruned + assert.Equal(t, 2, len(c.spotCheck)) + + expected := fmt.Sprintf(ErrSpotCheckEntryNotReceived, // List entry not received from Loki + entries[20].UnixNano(), "-9ms") + + // We didn't send the last entry and our initial counter did not start at 0 so we should get back entries 1-19 + for i := 1; i < 20; i++ { + expected = expected + fmt.Sprintf(DebugQueryResult, entries[i].UnixNano()) + } + + assert.Equal(t, expected, actual.String()) + + assert.Equal(t, 2, spotCheckEntries.(*mockCounter).count) + assert.Equal(t, 1, spotCheckMissing.(*mockCounter).count) + + prometheus.Unregister(responseLatency) +} + +func TestMetricTest(t *testing.T) { + metricTestDeviation = &mockGauge{} + + actual := &bytes.Buffer{} + + writeInterval := 500 * time.Millisecond + + mr := &mockReader{} + maxWait := 50 * time.Millisecond + metricTestRange := 30 * time.Second + //We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below + c := NewComparator(actual, maxWait, 50*time.Hour, 0, 0, 4*time.Hour, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false) + // Force the start time to a known value + c.startTime = time.Unix(10, 0) + + // Run test at time 20s which is 10s after start + mr.countOverTime = float64((10 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds()) + c.metricTest(time.Unix(0, 20*time.Second.Nanoseconds())) + // We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s + assert.Equal(t, "10s", mr.queryRange) + // Should be no deviation we set countOverTime to the runtime/writeinterval which should be what metrictTest expected + assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val) + + // Run test at time 30s which is 20s after start + mr.countOverTime = float64((20 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds()) + c.metricTest(time.Unix(0, 30*time.Second.Nanoseconds())) + // We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s + assert.Equal(t, "20s", mr.queryRange) + // Gauge should be equal to the countOverTime value + assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val) + + // Run test 60s after start, we should now be capping the query range to 30s and expecting only 30s of counts + mr.countOverTime = float64((30 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds()) + c.metricTest(time.Unix(0, 60*time.Second.Nanoseconds())) + // We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s + assert.Equal(t, "30s", mr.queryRange) + // Gauge should be equal to the countOverTime value + assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val) + + prometheus.Unregister(responseLatency) +} + type mockCounter struct { cLck sync.Mutex count int @@ -282,10 +379,64 @@ func (m *mockCounter) Inc() { m.count++ } +type mockGauge struct { + cLck sync.Mutex + val float64 +} + +func (m *mockGauge) Desc() *prometheus.Desc { + panic("implement me") +} + +func (m *mockGauge) Write(*io_prometheus_client.Metric) error { + panic("implement me") +} + +func (m *mockGauge) Describe(chan<- *prometheus.Desc) { + panic("implement me") +} + +func (m *mockGauge) Collect(chan<- prometheus.Metric) { + panic("implement me") +} + +func (m *mockGauge) Set(v float64) { + m.cLck.Lock() + m.val = v + m.cLck.Unlock() +} + +func (m *mockGauge) Inc() { + panic("implement me") +} + +func (m *mockGauge) Dec() { + panic("implement me") +} + +func (m *mockGauge) Add(float64) { + panic("implement me") +} + +func (m *mockGauge) Sub(float64) { + panic("implement me") +} + +func (m *mockGauge) SetToCurrentTime() { + panic("implement me") +} + type mockReader struct { - resp []time.Time + resp []time.Time + countOverTime float64 + queryRange string } func (r *mockReader) Query(start time.Time, end time.Time) ([]time.Time, error) { return r.resp, nil } + +func (r *mockReader) QueryCountOverTime(queryRange string) (float64, error) { + r.queryRange = queryRange + return r.countOverTime, nil +} diff --git a/pkg/canary/reader/reader.go b/pkg/canary/reader/reader.go index bc427e9f0edc..9ee6de6dd32f 100644 --- a/pkg/canary/reader/reader.go +++ b/pkg/canary/reader/reader.go @@ -1,6 +1,7 @@ package reader import ( + "context" "encoding/base64" "fmt" "io" @@ -17,10 +18,11 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/build" - loghttp "github.com/grafana/loki/pkg/loghttp/legacy" - "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logql" ) var ( @@ -34,6 +36,7 @@ var ( type LokiReader interface { Query(start time.Time, end time.Time) ([]time.Time, error) + QueryCountOverTime(queryRange string) (float64, error) } type Reader struct { @@ -42,6 +45,7 @@ type Reader struct { addr string user string pass string + queryTimeout time.Duration sName string sValue string lName string @@ -54,8 +58,17 @@ type Reader struct { done chan struct{} } -func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool, - address string, user string, pass string, labelName string, labelVal string, streamName string, streamValue string) *Reader { +func NewReader(writer io.Writer, + receivedChan chan time.Time, + tls bool, + address string, + user string, + pass string, + queryTimeout time.Duration, + labelName string, + labelVal string, + streamName string, + streamValue string) *Reader { h := http.Header{} if user != "" { h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}} @@ -67,6 +80,7 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool, addr: address, user: user, pass: pass, + queryTimeout: queryTimeout, sName: streamName, sValue: streamValue, lName: labelName, @@ -100,6 +114,70 @@ func (r *Reader) Stop() { } } +func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) { + scheme := "http" + if r.tls { + scheme = "https" + } + u := url.URL{ + Scheme: scheme, + Host: r.addr, + Path: "/loki/api/v1/query", + RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("count_over_time({%v=\"%v\",%v=\"%v\"}[%s])", r.sName, r.sValue, r.lName, r.lVal, queryRange)) + + "&limit=1000", + } + fmt.Fprintf(r.w, "Querying loki for metric count with query: %v\n", u.String()) + + ctx, cancel := context.WithTimeout(context.Background(), r.queryTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return 0, err + } + + req.SetBasicAuth(r.user, r.pass) + req.Header.Set("User-Agent", userAgent) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return 0, err + } + defer func() { + if err := resp.Body.Close(); err != nil { + log.Println("error closing body", err) + } + }() + + if resp.StatusCode/100 != 2 { + buf, _ := ioutil.ReadAll(resp.Body) + return 0, fmt.Errorf("error response from server: %s (%v)", string(buf), err) + } + var decoded loghttp.QueryResponse + err = json.NewDecoder(resp.Body).Decode(&decoded) + if err != nil { + return 0, err + } + + value := decoded.Data.Result + ret := 0.0 + switch value.Type() { + case parser.ValueTypeVector: + samples := value.(loghttp.Vector) + if len(samples) > 1 { + return 0, fmt.Errorf("expected only a single result in the metric test query vector, instead received %v", len(samples)) + } + if len(samples) == 0 { + return 0, fmt.Errorf("expected to receive one sample in the result vector, received 0") + } + ret = float64(samples[0].Value) + default: + return 0, fmt.Errorf("unexpected result type, expected a Vector result instead received %v", value.Type()) + } + + return ret, nil +} + func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { scheme := "http" if r.tls { @@ -108,14 +186,17 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { u := url.URL{ Scheme: scheme, Host: r.addr, - Path: "/api/prom/query", + Path: "/loki/api/v1/query_range", RawQuery: fmt.Sprintf("start=%d&end=%d", start.UnixNano(), end.UnixNano()) + "&query=" + url.QueryEscape(fmt.Sprintf("{%v=\"%v\",%v=\"%v\"}", r.sName, r.sValue, r.lName, r.lVal)) + "&limit=1000", } - fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String()) + fmt.Fprintf(r.w, "Querying loki for logs with query: %v\n", u.String()) + + ctx, cancel := context.WithTimeout(context.Background(), r.queryTimeout) + defer cancel() - req, err := http.NewRequest("GET", u.String(), nil) + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) if err != nil { return nil, err } @@ -137,24 +218,29 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { buf, _ := ioutil.ReadAll(resp.Body) return nil, fmt.Errorf("error response from server: %s (%v)", string(buf), err) } - var decoded logproto.QueryResponse + var decoded loghttp.QueryResponse err = json.NewDecoder(resp.Body).Decode(&decoded) if err != nil { return nil, err } tss := []time.Time{} - - for _, stream := range decoded.Streams { - for _, entry := range stream.Entries { - ts, err := parseResponse(&entry) - if err != nil { - fmt.Fprint(r.w, err) - continue + value := decoded.Data.Result + switch value.Type() { + case logql.ValueTypeStreams: + for _, stream := range value.(loghttp.Streams) { + for _, entry := range stream.Entries { + ts, err := parseResponse(&entry) + if err != nil { + fmt.Fprint(r.w, err) + continue + } + tss = append(tss, *ts) } - tss = append(tss, *ts) - } + } + default: + return nil, fmt.Errorf("unexpected result type, expected a log stream result instead received %v", value.Type()) } return tss, nil @@ -209,7 +295,7 @@ func (r *Reader) closeAndReconnect() { u := url.URL{ Scheme: scheme, Host: r.addr, - Path: "/api/prom/tail", + Path: "/loki/api/v1/tail", RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{%v=\"%v\",%v=\"%v\"}", r.sName, r.sValue, r.lName, r.lVal)), } @@ -225,7 +311,7 @@ func (r *Reader) closeAndReconnect() { } } -func parseResponse(entry *logproto.Entry) (*time.Time, error) { +func parseResponse(entry *loghttp.Entry) (*time.Time, error) { sp := strings.Split(entry.Line, " ") if len(sp) != 2 { return nil, errors.Errorf("received invalid entry: %s\n", entry.Line)