From 94113ee080a6397a7f6e35146e4a4e0b64879381 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 18 Nov 2021 14:53:07 +0100 Subject: [PATCH] Logql `stdin` support only log queries (#4606) * Hack stdin client into LogCLI Add flag to choose what kind of client to make request to. Signed-off-by: Kaviraj * Label filter working Signed-off-by: Kaviraj * Demo checkpoint1 Signed-off-by: Kaviraj * Metrics query working. Issue was with default value of `step` (1 nanosecond). StepEvaluator try to go through every nanosecond to apply aggregate on sample data during the metric queries. Signed-off-by: Kaviraj * Basic tests for fileClient * Tests for logqueries direction * Use HeapIterator for Entries. Signed-off-by: Kaviraj * Remove some debug statements Signed-off-by: Kaviraj * Remove support for metric queries. Stick with only log queries Signed-off-by: Kaviraj * Small rough usage doc * Remove filesampleiterator * Fix some typos and tests * Fix breaking test cases Signed-off-by: Kaviraj * Make linter happy Signed-off-by: Kaviraj * PR remarks. 1. Add `--stdin` in examples of the usage 2. Add `--stdin` in all the command help output * PR remarks - Use parsed labels correctly - Fix indendation with --stdin flag * Fix issue with direction * Fix linter * MaxInt64 -> MaxInt (to support even arm32 images) * Add note on calculating `step` value on the client side --- cmd/logcli/main.go | 39 ++++ docs/sources/getting-started/logcli.md | 44 +++- pkg/logcli/client/file.go | 284 +++++++++++++++++++++++++ pkg/logcli/client/file_test.go | 223 +++++++++++++++++++ pkg/querier/http.go | 1 - 5 files changed, 582 insertions(+), 9 deletions(-) create mode 100644 pkg/logcli/client/file.go create mode 100644 pkg/logcli/client/file_test.go diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index a60a36721c14..2441913a6ee8 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -2,9 +2,11 @@ package main import ( "log" + "math" "net/url" "os" "runtime/pprof" + "strings" "time" "github.com/prometheus/common/config" @@ -27,6 +29,7 @@ var ( timezone = app.Flag("timezone", "Specify the timezone to use when formatting output timestamps [Local, UTC]").Default("Local").Short('z').Enum("Local", "UTC") cpuProfile = app.Flag("cpuprofile", "Specify the location for writing a CPU profile.").Default("").String() memProfile = app.Flag("memprofile", "Specify the location for writing a memory profile.").Default("").String() + stdin = app.Flag("stdin", "Take input logs from stdin").Bool() queryClient = newQueryClient(app) @@ -138,6 +141,33 @@ func main() { }() } + if *stdin { + queryClient = client.NewFileClient(os.Stdin) + if rangeQuery.Step.Seconds() == 0 { + // Set default value for `step` based on `start` and `end`. + // In non-stdin case, this is set on Loki server side. + // If this is not set, then `step` will have default value of 1 nanosecond and `STepEvaluator` will go through every nanosecond when applying aggregation during metric queries. + rangeQuery.Step = defaultQueryRangeStep(rangeQuery.Start, rangeQuery.End) + } + + // When `--stdin` flag is set, stream selector is optional in the query. + // But logQL package throw parser error if stream selector is not provided. + // So we inject "dummy" stream selector if not provided by user already. + // Which brings down to two ways of using LogQL query under `--stdin`. + // 1. Query with stream selector(e.g: `{foo="bar"}|="error"`) + // 2. Query without stream selector (e.g: `|="error"`) + + qs := rangeQuery.QueryString + if strings.HasPrefix(strings.TrimSpace(qs), "|") { + // inject the dummy stream selector + qs = `{source="logcli"}` + qs + rangeQuery.QueryString = qs + } + + // `--limit` doesn't make sense when using `--stdin` flag. + rangeQuery.Limit = math.MaxInt // TODO(kavi): is it a good idea? + } + switch cmd { case queryCmd.FullCommand(): location, err := time.LoadLocation(*timezone) @@ -307,6 +337,7 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query { cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step) cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval) cmd.Flag("batch", "Query batch size to use until 'limit' is reached").Default("1000").IntVar(&q.BatchSize) + } cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward) @@ -333,3 +364,11 @@ func mustParse(t string, defaultTime time.Time) time.Time { return ret } + +// This method is to duplicate the same logic of `step` value from `start` and `end` +// done on the loki server side. +// https://github.com/grafana/loki/blob/main/pkg/loghttp/params.go +func defaultQueryRangeStep(start, end time.Time) time.Duration { + step := int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1)) + return time.Duration(step) * time.Second +} diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 836f2caf0226..458d3daa8dfe 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -74,13 +74,13 @@ When not set, `--limit` defaults to 30. The limit protects the user from overwhelming the system for cases in which the specified query would have returned a large quantity of log lines. -The limit also protects the user from unexpectedly large responses. +The limit also protects the user from unexpectedly large responses. The quantity of log line results that arrive in each batch is set by the `--batch` option in a `logcli query` command. When not set, `--batch` defaults to 1000. -Setting a `--limit` value larger than the `--batch` value causes the +Setting a `--limit` value larger than the `--batch` value causes the requests from LogCLI to Loki to be batched. Loki has a server-side limit that defaults to 5000 for the maximum quantity of lines returned for a single query. @@ -120,7 +120,8 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --addr="http://localhost:3100" + --stdin Take input logs from stdin + --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. --username="" Username for HTTP basic auth. Can also be set using @@ -275,7 +276,8 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --addr="http://localhost:3100" + --stdin Take input logs from stdin + --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. --username="" Username for HTTP basic auth. Can also be set using @@ -308,9 +310,9 @@ Flags: --batch=1000 Query batch size to use until 'limit' is reached --forward Scan forwards through logs. --no-labels Do not print any labels - --exclude-label=EXCLUDE-LABEL ... + --exclude-label=EXCLUDE-LABEL ... Exclude labels given the provided key during output. - --include-label=INCLUDE-LABEL ... + --include-label=INCLUDE-LABEL ... Include labels given the provided key during output. --labels-length=0 Set a fixed padding to labels --store-config="" Execute the current query using a configured storage @@ -346,7 +348,8 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --addr="http://localhost:3100" + --stdin Take input logs from stdin + --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. --username="" Username for HTTP basic auth. Can also be set using @@ -402,7 +405,8 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --addr="http://localhost:3100" + --stdin Take input logs from stdin + --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. --username="" Username for HTTP basic auth. Can also be set using @@ -430,3 +434,27 @@ Flags: Args: eg '{foo="bar",baz=~".*blip"}' ``` + +### LogCLI `--stdin` usage + +You can consume log lines from your `stdin` instead of Loki servers. + +Say you have log files in your local, and just want to do run some LogQL queries for that, `--stdin` flag can help. + +**NOTE: Currently it doesn't support any type of metric queries** + +You may have to use `stdin` flag for several reasons +1. Quick way to check and validate a LogQL expressions. +2. Learn basics of LogQL with just Log files and `LogCLI`tool ( without needing set up Loki servers, Grafana etc.) +3. Easy discussion on public forums. Like Q&A, Share the LogQL expressions. + +**NOTES on Usage** +1. `--limits` flag doesn't have any meaning when using `--stdin` (use pager like `less` for that) +1. Be aware there are no **labels** when using `--stdin` + - So stream selector in the query is optional e.g just `|="timeout"|logfmt|level="error"` is same as `{foo="bar"}|="timeout|logfmt|level="error"` + +**Examples** +1. Line filter - `cat mylog.log | logcli --stdin query '|="too many open connections"'` +2. Label matcher - `echo 'msg="timeout happened" level="warning"' | logcli --stdin query '|logfmt|level="warning"'` +3. Different parsers (logfmt, json, pattern, regexp) - `cat mylog.log | logcli --stdin query '|pattern - - <_> " <_>" <_> "" <_>'` +4. Line formatters - `cat mylog.log | logcli --stdin query '|logfmt|line_format "{{.query}} {{.duration}}"'` diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go new file mode 100644 index 000000000000..059f4859c387 --- /dev/null +++ b/pkg/logcli/client/file.go @@ -0,0 +1,284 @@ +package client + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "sort" + "strings" + "time" + + "github.com/gorilla/websocket" + + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + logqllog "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/marshal" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/user" +) + +const ( + defaultLabelKey = "source" + defaultLabelValue = "logcli" + defaultOrgID = "logcli" + defaultMetricSeriesLimit = 1024 + defaultMaxFileSize = 20 * (1 << 20) // 20MB +) + +var ( + ErrNotSupported = errors.New("not supported") +) + +// FileClient is a type of LogCLI client that do LogQL on log lines from +// the given file directly, instead get log lines from Loki servers. +type FileClient struct { + r io.ReadCloser + labels []string + labelValues []string + orgID string + engine *logql.Engine +} + +// NewFileClient returns the new instance of FileClient for the given `io.ReadCloser` +func NewFileClient(r io.ReadCloser) *FileClient { + lbs := []labels.Label{ + { + Name: defaultLabelKey, + Value: defaultLabelValue, + }, + } + + eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r, labels: lbs}, &limiter{n: defaultMetricSeriesLimit}) + return &FileClient{ + r: r, + orgID: defaultOrgID, + engine: eng, + labels: []string{defaultLabelKey}, + labelValues: []string{defaultLabelValue}, + } + +} + +func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { + ctx := context.Background() + + ctx = user.InjectOrgID(ctx, f.orgID) + + params := logql.NewLiteralParams( + q, + t, t, + 0, + 0, + direction, + uint32(limit), + nil, + ) + + query := f.engine.Query(params) + + result, err := query.Exec(ctx) + if err != nil { + return nil, fmt.Errorf("failed to exec query: %w", err) + } + + value, err := marshal.NewResultValue(result.Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal result data: %w", err) + } + + return &loghttp.QueryResponse{ + Status: "success", + Data: loghttp.QueryResponseData{ + ResultType: value.Type(), + Result: value, + Statistics: result.Statistics, + }, + }, nil +} + +func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { + ctx := context.Background() + + ctx = user.InjectOrgID(ctx, f.orgID) + + params := logql.NewLiteralParams( + queryStr, + start, + end, + step, + interval, + direction, + uint32(limit), + nil, + ) + + query := f.engine.Query(params) + + result, err := query.Exec(ctx) + if err != nil { + return nil, err + } + + value, err := marshal.NewResultValue(result.Data) + if err != nil { + return nil, err + } + + return &loghttp.QueryResponse{ + Status: "success", + Data: loghttp.QueryResponseData{ + ResultType: value.Type(), + Result: value, + Statistics: result.Statistics, + }, + }, nil +} + +func (f *FileClient) ListLabelNames(quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { + return &loghttp.LabelResponse{ + Status: loghttp.QueryStatusSuccess, + Data: f.labels, + }, nil +} + +func (f *FileClient) ListLabelValues(name string, quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { + i := sort.SearchStrings(f.labels, name) + if i < 0 { + return &loghttp.LabelResponse{}, nil + } + + return &loghttp.LabelResponse{ + Status: loghttp.QueryStatusSuccess, + Data: []string{f.labelValues[i]}, + }, nil +} + +func (f *FileClient) Series(matchers []string, start, end time.Time, quiet bool) (*loghttp.SeriesResponse, error) { + m := len(f.labels) + if m > len(f.labelValues) { + m = len(f.labelValues) + } + + lbs := make(loghttp.LabelSet) + for i := 0; i < m; i++ { + lbs[f.labels[i]] = f.labelValues[i] + } + + return &loghttp.SeriesResponse{ + Status: loghttp.QueryStatusSuccess, + Data: []loghttp.LabelSet{lbs}, + }, nil +} + +func (f *FileClient) LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error) { + return nil, fmt.Errorf("LiveTailQuery: %w", ErrNotSupported) +} + +func (f *FileClient) GetOrgID() string { + return f.orgID +} + +type limiter struct { + n int +} + +func (l *limiter) MaxQuerySeries(userID string) int { + return l.n +} + +type querier struct { + r io.Reader + labels labels.Labels +} + +func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { + expr, err := params.LogSelector() + if err != nil { + return nil, fmt.Errorf("failed to extract selector for logs: %w", err) + } + pipeline, err := expr.Pipeline() + if err != nil { + return nil, fmt.Errorf("failed to extract pipeline for logs: %w", err) + } + return newFileIterator(ctx, q.r, params, pipeline.ForStream(q.labels)) +} + +func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { + return nil, fmt.Errorf("Metrics Query: %w", ErrNotSupported) +} + +func newFileIterator( + ctx context.Context, + r io.Reader, + params logql.SelectLogParams, + pipeline logqllog.StreamPipeline, +) (iter.EntryIterator, error) { + + lr := io.LimitReader(r, defaultMaxFileSize) + b, err := ioutil.ReadAll(lr) + if err != nil { + return nil, err + } + lines := strings.FieldsFunc(string(b), func(r rune) bool { + return r == '\n' + }) + + if len(lines) == 0 { + return iter.NoopIterator, nil + } + + streams := map[uint64]*logproto.Stream{} + + processLine := func(line string) { + parsedLine, parsedLabels, ok := pipeline.ProcessString(line) + if !ok { + return + } + + var stream *logproto.Stream + lhash := parsedLabels.Hash() + if stream, ok = streams[lhash]; !ok { + stream = &logproto.Stream{ + Labels: parsedLabels.String(), + } + streams[lhash] = stream + } + + stream.Entries = append(stream.Entries, logproto.Entry{ + Timestamp: time.Now(), + Line: parsedLine, + }) + } + + if params.Direction == logproto.FORWARD { + for _, line := range lines { + processLine(line) + } + } else { + for i := len(lines) - 1; i >= 0; i-- { + processLine(lines[i]) + } + } + + if len(streams) == 0 { + return iter.NoopIterator, nil + } + + streamResult := make([]logproto.Stream, 0, len(streams)) + + for _, stream := range streams { + streamResult = append(streamResult, *stream) + } + + return iter.NewStreamsIterator( + ctx, + streamResult, + params.Direction, + ), nil +} diff --git a/pkg/logcli/client/file_test.go b/pkg/logcli/client/file_test.go new file mode 100644 index 000000000000..1e5a2ab77c63 --- /dev/null +++ b/pkg/logcli/client/file_test.go @@ -0,0 +1,223 @@ +package client + +import ( + "bytes" + "errors" + "io" + "sort" + "strings" + "testing" + "time" + + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logproto" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFileClient_QueryRangeLogQueries(t *testing.T) { + input := []string{ + `level=info event="loki started" caller=main.go ts=1625995076`, + `level=info event="runtime loader started" caller=main.go ts=1625995077`, + `level=error event="unable to read rules directory" file="/tmp/rules" caller=rules.go ts=1625995090`, + `level=error event="failed to apply wal" error="/tmp/wal/ corrupted" caller=wal.go ts=1625996090`, + `level=info event="loki ready" caller=main.go ts=1625996095`, + } + + reversed := make([]string, len(input)) + copy(reversed, input) + sort.Slice(reversed, func(i, j int) bool { + return i > j + }) + + now := time.Now() + + cases := []struct { + name string + limit int + start, end time.Time + direction logproto.Direction + step, interval time.Duration + expectedStatus loghttp.QueryStatus + expected []string + }{ + { + name: "return-all-logs-backward", + limit: 10, // more than input + start: now.Add(-1 * time.Hour), + end: now, + direction: logproto.BACKWARD, + step: 0, // let client decide based on start and end + interval: 0, + expectedStatus: loghttp.QueryStatusSuccess, + expected: reversed, + }, + { + name: "return-all-logs-forward", + limit: 10, // more than input + start: now.Add(-1 * time.Hour), + end: now, + direction: logproto.FORWARD, + step: 0, // let the client decide based on start and end + interval: 0, + expectedStatus: loghttp.QueryStatusSuccess, + expected: input, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + client := NewFileClient(io.NopCloser(strings.NewReader(strings.Join(input, "\n")))) + resp, err := client.QueryRange( + `{foo="bar"}`, // label matcher doesn't matter. + c.limit, + c.start, + c.end, + c.direction, + c.step, + c.interval, + true, + ) + + require.NoError(t, err) + require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) + assert.Equal(t, string(resp.Data.ResultType), loghttp.ResultTypeStream) + assertStreams(t, resp.Data.Result, c.expected) + }) + } +} + +func TestFileClient_Query(t *testing.T) { + input := []string{ + `level=info event="loki started" caller=main.go ts=1625995076`, + `level=info event="runtime loader started" caller=main.go ts=1625995077`, + `level=error event="unable to read rules directory" file="/tmp/rules" caller=rules.go ts=1625995090`, + `level=error event="failed to apply wal" error="/tmp/wal/ corrupted" caller=wal.go ts=1625996090`, + `level=info event="loki ready" caller=main.go ts=1625996095`, + } + + reversed := make([]string, len(input)) + copy(reversed, input) + sort.Slice(reversed, func(i, j int) bool { + return i > j + }) + + now := time.Now() + + cases := []struct { + name string + limit int + ts time.Time + direction logproto.Direction + expectedStatus loghttp.QueryStatus + expected []string + }{ + { + name: "return-all-logs-backward", + limit: 10, // more than input + ts: now.Add(-1 * time.Hour), + direction: logproto.BACKWARD, + expectedStatus: loghttp.QueryStatusSuccess, + expected: reversed, + }, + { + name: "return-all-logs-forward", + limit: 10, // more than input + ts: now.Add(-1 * time.Hour), + direction: logproto.FORWARD, + expectedStatus: loghttp.QueryStatusSuccess, + expected: input, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + client := NewFileClient(io.NopCloser(strings.NewReader(strings.Join(input, "\n")))) + resp, err := client.Query( + `{foo="bar"}`, // label matcher doesn't matter. + c.limit, + c.ts, + c.direction, + true, + ) + + require.NoError(t, err) + require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) + assert.Equal(t, string(resp.Data.ResultType), loghttp.ResultTypeStream) + assertStreams(t, resp.Data.Result, c.expected) + }) + } +} + +func TestFileClient_ListLabelNames(t *testing.T) { + c := newEmptyClient(t) + values, err := c.ListLabelNames(true, time.Now(), time.Now()) + require.NoError(t, err) + assert.Equal(t, &loghttp.LabelResponse{ + Data: []string{defaultLabelKey}, + Status: loghttp.QueryStatusSuccess, + }, values) +} + +func TestFileClient_ListLabelValues(t *testing.T) { + c := newEmptyClient(t) + values, err := c.ListLabelValues(defaultLabelKey, true, time.Now(), time.Now()) + require.NoError(t, err) + assert.Equal(t, &loghttp.LabelResponse{ + Data: []string{defaultLabelValue}, + Status: loghttp.QueryStatusSuccess, + }, values) + +} + +func TestFileClient_Series(t *testing.T) { + c := newEmptyClient(t) + got, err := c.Series(nil, time.Now(), time.Now(), true) + require.NoError(t, err) + + exp := &loghttp.SeriesResponse{ + Data: []loghttp.LabelSet{ + {defaultLabelKey: defaultLabelValue}, + }, + Status: loghttp.QueryStatusSuccess, + } + + assert.Equal(t, exp, got) +} + +func TestFileClient_LiveTail(t *testing.T) { + c := newEmptyClient(t) + x, err := c.LiveTailQueryConn("", time.Second, 0, time.Now(), true) + require.Error(t, err) + require.Nil(t, x) + assert.True(t, errors.Is(err, ErrNotSupported)) +} + +func TestFileClient_GetOrgID(t *testing.T) { + c := newEmptyClient(t) + assert.Equal(t, defaultOrgID, c.GetOrgID()) +} + +func newEmptyClient(t *testing.T) *FileClient { + t.Helper() + return NewFileClient(io.NopCloser(&bytes.Buffer{})) +} + +func assertStreams(t *testing.T, result loghttp.ResultValue, logLines []string) { + t.Helper() + + streams, ok := result.(loghttp.Streams) + require.True(t, ok, "response type should be `loghttp.Streams`") + + require.Len(t, streams, 1, "there should be only one stream for FileClient") + + got := streams[0] + sort.Slice(got.Entries, func(i, j int) bool { + return got.Entries[i].Timestamp.UnixNano() < got.Entries[j].Timestamp.UnixNano() + }) + require.Equal(t, len(got.Entries), len(logLines)) + for i, entry := range got.Entries { + assert.Equal(t, entry.Line, logLines[i]) + } +} diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 33317f3cc087..8910aad5b109 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -64,7 +64,6 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { serverutil.WriteError(err, w) return } - if err := marshal.WriteQueryResponseJSON(result, w); err != nil { serverutil.WriteError(err, w) return