Skip to content

Commit

Permalink
Add some additional logging to the canary on queries (#1137)
Browse files Browse the repository at this point in the history
* log what was missing from the websocket connection and what we received from a query to Loki to troubleshoot missing logs

* removing unnecessary unused return variables from print statements

* update test to handle the new debug output
  • Loading branch information
slim-bean authored Oct 9, 2019
1 parent 4f780d3 commit 8a95972
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
33 changes: 22 additions & 11 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
)

const (
ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n"
ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n"
ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n"
ErrDuplicateEntry = "received a duplicate entry for ts %v\n"
ErrUnexpectedEntry = "received an unexpected entry with ts %v\n"
ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n"
ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n"
ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n"
ErrDuplicateEntry = "received a duplicate entry for ts %v\n"
ErrUnexpectedEntry = "received an unexpected entry with ts %v\n"
DebugWebsocketMissingEntry = "websocket missing entry: %v\n"
DebugQueryResult = "confirmation query result: %v\n"
)

var (
Expand Down Expand Up @@ -125,7 +127,7 @@ func (c *Comparator) entryReceived(ts time.Time) {
// If this isn't the first item in the list we received it out of order
if i != 0 {
outOfOrderEntries.Inc()
_, _ = fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i])
fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i])
}
responseLatency.Observe(time.Since(ts).Seconds())
// Put this element in the acknowledged entries list so we can use it to check for duplicates
Expand All @@ -145,12 +147,12 @@ func (c *Comparator) entryReceived(ts time.Time) {
if ts.Equal(*e) {
duplicate = true
duplicateEntries.Inc()
_, _ = fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano())
fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano())
break
}
}
if !duplicate {
_, _ = fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano())
fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano())
unexpectedEntries.Inc()
}
}
Expand Down Expand Up @@ -199,7 +201,7 @@ func (c *Comparator) pruneEntries() {
if e.Before(time.Now().Add(-c.maxWait)) {
missing = append(missing, e)
wsMissingEntries.Inc()
_, _ = fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds())
fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds())
} else {
if i != k {
c.entries[k] = c.entries[i]
Expand Down Expand Up @@ -249,9 +251,18 @@ func (c *Comparator) confirmMissing(missing []*time.Time) {
end = end.Add(10 * time.Second)
recvd, err := c.rdr.Query(start, end)
if err != nil {
_, _ = fmt.Fprintf(c.w, "error querying loki: %s", err)
fmt.Fprintf(c.w, "error querying loki: %s\n", err)
return
}
// This is to help debug some missing log entries when queried,
// let's print exactly what we are missing and what Loki sent back
for _, r := range missing {
fmt.Fprintf(c.w, DebugWebsocketMissingEntry, r.UnixNano())
}
for _, r := range recvd {
fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano())
}

k := 0
for i, m := range missing {
found := false
Expand All @@ -277,6 +288,6 @@ func (c *Comparator) confirmMissing(missing []*time.Time) {
missing = missing[:k]
for _, e := range missing {
missingEntries.Inc()
_, _ = fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds())
fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds())
}
}
12 changes: 11 additions & 1 deletion pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,21 @@ func TestEntryNeverReceived(t *testing.T) {

c.pruneEntries()

expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ErrEntryNotReceived,
expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ // Out of order because we missed entries
ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ // Complain about missed entries
DebugWebsocketMissingEntry+DebugWebsocketMissingEntry+ // List entries we are missing
DebugQueryResult+DebugQueryResult+DebugQueryResult+DebugQueryResult+ // List entries we got back from Loki
ErrEntryNotReceived, // List entry not received from Loki
t3, []time.Time{t2},
t5, []time.Time{t2, t4},
t2.UnixNano(), maxWait.Seconds(),
t4.UnixNano(), maxWait.Seconds(),
t2.UnixNano(),
t4.UnixNano(),
t1.UnixNano(),
t3.UnixNano(),
t4.UnixNano(),
t5.UnixNano(),
t2.UnixNano(), maxWait.Seconds())

assert.Equal(t, expected, actual.String())
Expand Down
14 changes: 7 additions & 7 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool,
go func() {
<-rd.quit
if rd.conn != nil {
_, _ = fmt.Fprintf(rd.w, "shutting down reader\n")
fmt.Fprintf(rd.w, "shutting down reader\n")
rd.shuttingDown = true
_ = rd.conn.Close()
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
"&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) +
"&limit=1000",
}
_, _ = fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String())
fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String())

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
Expand Down Expand Up @@ -142,7 +142,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
for _, entry := range stream.Entries {
ts, err := parseResponse(&entry)
if err != nil {
_, _ = fmt.Fprint(r.w, err)
fmt.Fprint(r.w, err)
continue
}
tss = append(tss, *ts)
Expand All @@ -166,15 +166,15 @@ func (r *Reader) run() {
close(r.done)
return
}
_, _ = fmt.Fprintf(r.w, "error reading websocket: %s\n", err)
fmt.Fprintf(r.w, "error reading websocket: %s\n", err)
r.closeAndReconnect()
continue
}
for _, stream := range tailResponse.Streams {
for _, entry := range stream.Entries {
ts, err := parseResponse(&entry)
if err != nil {
_, _ = fmt.Fprint(r.w, err)
fmt.Fprint(r.w, err)
continue
}
r.recv <- *ts
Expand Down Expand Up @@ -203,11 +203,11 @@ func (r *Reader) closeAndReconnect() {
RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)),
}

_, _ = fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)
fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)

c, _, err := websocket.DefaultDialer.Dial(u.String(), r.header)
if err != nil {
_, _ = fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err)
fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err)
<-time.After(5 * time.Second)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/canary/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (w *Writer) run() {
w.prevTsLen = tsLen
}

_, _ = fmt.Fprintf(w.w, LogEntry, ts, w.pad)
fmt.Fprintf(w.w, LogEntry, ts, w.pad)
w.sent <- t
case <-w.quit:
return
Expand Down

0 comments on commit 8a95972

Please sign in to comment.