From 3bb3cf5a57b90328c6b7e0edefcaf649917cd798 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 2 Jul 2021 00:58:42 +0200 Subject: [PATCH 01/20] Hack stdin client into LogCLI Add flag to choose what kind of client to make request to. Signed-off-by: Kaviraj --- .../cmd/promtail/promtail-local-config.yaml | 2 +- cmd/logcli/main.go | 5 + pkg/logcli/client/client.go | 146 ++++++++++++++++++ pkg/querier/http.go | 11 ++ pkg/querier/querier.go | 2 + 5 files changed, 165 insertions(+), 1 deletion(-) diff --git a/clients/cmd/promtail/promtail-local-config.yaml b/clients/cmd/promtail/promtail-local-config.yaml index 3b9256537ec8..401195b956d2 100644 --- a/clients/cmd/promtail/promtail-local-config.yaml +++ b/clients/cmd/promtail/promtail-local-config.yaml @@ -15,4 +15,4 @@ scrape_configs: - localhost labels: job: varlogs - __path__: /var/log/*log + __path__: /Users/kaviraj/src/loki/testlog.txt diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index a60a36721c14..1abc9eed0d94 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -27,6 +27,7 @@ var ( timezone = app.Flag("timezone", "Specify the timezone to use when formatting output timestamps [Local, UTC]").Default("Local").Short('z').Enum("Local", "UTC") cpuProfile = app.Flag("cpuprofile", "Specify the location for writing a CPU profile.").Default("").String() memProfile = app.Flag("memprofile", "Specify the location for writing a memory profile.").Default("").String() + stdin = app.Flag("stdin", "Take input logs from stdin").Bool() queryClient = newQueryClient(app) @@ -138,6 +139,10 @@ func main() { }() } + if *stdin { + queryClient = client.NewFileClient(os.Stdin) + } + switch cmd { case queryCmd.FullCommand(): location, err := time.LoadLocation(*timezone) diff --git a/pkg/logcli/client/client.go b/pkg/logcli/client/client.go index d7d2be1c57c0..990a1511fd41 100644 --- a/pkg/logcli/client/client.go +++ b/pkg/logcli/client/client.go @@ -1,8 +1,11 @@ package client import ( + "bufio" + "context" "encoding/base64" "fmt" + "io" "io/ioutil" "log" "net/http" @@ -15,10 +18,13 @@ import ( json "github.com/json-iterator/go" "github.com/prometheus/common/config" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/build" + "github.com/grafana/loki/pkg/util/marshal" ) const ( @@ -43,6 +49,146 @@ type Client interface { GetOrgID() string } +type limiter struct { + n int +} + +func (l *limiter) MaxQuerySeries(userID string) int { + return l.n +} + +type querier struct { + r io.Reader +} + +func (q *querier) SelectLogs(context.Context, logql.SelectLogParams) (iter.EntryIterator, error) { + it := NewFileIterator(q.r, "source:logcli") + + return it, nil +} + +func (q *querier) SelectSamples(context.Context, logql.SelectSampleParams) (iter.SampleIterator, error) { + return nil, nil +} + +type FileIterator struct { + s *bufio.Scanner + labels string + err error +} + +func NewFileIterator(r io.Reader, labels string) *FileIterator { + s := bufio.NewScanner(r) + s.Split(bufio.ScanLines) + return &FileIterator{ + s: s, + labels: labels, + } +} + +func (f *FileIterator) Next() bool { + return f.s.Scan() +} + +func (f *FileIterator) Entry() logproto.Entry { + return logproto.Entry{ + Timestamp: time.Now(), + Line: f.s.Text(), + } +} + +func (f *FileIterator) Labels() string { + return f.labels +} + +func (f *FileIterator) Error() error { + return f.err +} + +func (f *FileIterator) Close() error { + return nil +} + +type FileClient struct { + r io.Reader + labels []string + labelValues []string + orgID string + engine *logql.Engine +} + +func NewFileClient(r io.Reader) *FileClient { + eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r}, &limiter{}) + return &FileClient{ + r: r, + orgID: "fake", + engine: eng, + labels: []string{"foo"}, + labelValues: []string{"bar"}, + } + +} + +func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { + return nil, nil +} + +func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { + + ctx := context.Background() + + params := logql.NewLiteralParams( + queryStr, + start, + end, + step, + interval, + direction, + uint32(limit), + nil, + ) + + query := f.engine.Query(params) + + result, err := query.Exec(ctx) + if err != nil { + return nil, err + } + + value, err := marshal.NewResultValue(result.Data) + if err != nil { + return nil, err + } + + return &loghttp.QueryResponse{ + Status: "success", + Data: loghttp.QueryResponseData{ + ResultType: value.Type(), + Result: value, + Statistics: result.Statistics, + }, + }, nil +} + +func (f *FileClient) ListLabelNames(quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { + return &loghttp.LabelResponse{}, nil +} + +func (f *FileClient) ListLabelValues(name string, quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { + return &loghttp.LabelResponse{}, nil +} + +func (f *FileClient) Series(matchers []string, start, end time.Time, quiet bool) (*loghttp.SeriesResponse, error) { + return &loghttp.SeriesResponse{}, nil +} +func (f *FileClient) LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error) { + return nil, nil +} + +func (f *FileClient) GetOrgID() string { + return f.orgID +} + // Tripperware can wrap a roundtripper. type Tripperware func(http.RoundTripper) http.RoundTripper diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 33317f3cc087..052136412d4d 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -1,7 +1,9 @@ package querier import ( + "bytes" "context" + "fmt" "net/http" "time" @@ -33,6 +35,7 @@ type QueryResponse struct { // RangeQueryHandler is a http.HandlerFunc for range queries. func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { + fmt.Println("Am i in range query?") // Enforce the query timeout while querying backends ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() @@ -58,13 +61,19 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { request.Limit, request.Shards, ) + fmt.Printf("requrest: %+v\n", request) query := q.engine.Query(params) result, err := query.Exec(ctx) if err != nil { serverutil.WriteError(err, w) return } + buf := bytes.Buffer{} + if err := marshal.WriteQueryResponseJSON(result, &buf); err != nil { + panic(err) + } + fmt.Printf("response: %+v\n", buf.String()) if err := marshal.WriteQueryResponseJSON(result, w); err != nil { serverutil.WriteError(err, w) return @@ -73,6 +82,8 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { // InstantQueryHandler is a http.HandlerFunc for instant queries. func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) { + fmt.Println("Am i in instant query?") + // Enforce the query timeout while querying backends ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 20578021b9fd..82ab3829511f 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -3,6 +3,7 @@ package querier import ( "context" "flag" + "fmt" "net/http" "time" @@ -89,6 +90,7 @@ func (q *Querier) SetQueryable(queryable logql.Querier) { // Select Implements logql.Querier which select logs via matchers and regex filters. func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { + fmt.Println("I'm here", "params", params) var err error params.Start, params.End, err = q.validateQueryRequest(ctx, params) if err != nil { From 8dd927fac5291dd6e749a78b54649b3d9f6ffb7e Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 2 Jul 2021 17:38:26 +0200 Subject: [PATCH 02/20] Label filter working Signed-off-by: Kaviraj --- pkg/logcli/client/client.go | 39 +++++++++++++++++++++++++++++-------- pkg/logql/engine.go | 2 ++ pkg/querier/http.go | 13 ++++++------- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/pkg/logcli/client/client.go b/pkg/logcli/client/client.go index 990a1511fd41..74acb82bdc84 100644 --- a/pkg/logcli/client/client.go +++ b/pkg/logcli/client/client.go @@ -17,11 +17,13 @@ import ( "github.com/gorilla/websocket" json "github.com/json-iterator/go" "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + logqllog "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/build" "github.com/grafana/loki/pkg/util/marshal" @@ -61,8 +63,19 @@ type querier struct { r io.Reader } -func (q *querier) SelectLogs(context.Context, logql.SelectLogParams) (iter.EntryIterator, error) { - it := NewFileIterator(q.r, "source:logcli") +func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { + expr, err := params.LogSelector() + if err != nil { + panic(err) + } + pipeline, err := expr.Pipeline() + if err != nil { + panic(err) + } + streampipe := pipeline.ForStream(labels.Labels{ + labels.Label{Name: "foo", Value: "bar"}, + }) + it := NewFileIterator(q.r, "source:logcli", streampipe) return it, nil } @@ -75,26 +88,36 @@ type FileIterator struct { s *bufio.Scanner labels string err error + sp logqllog.StreamPipeline + curr logproto.Entry } -func NewFileIterator(r io.Reader, labels string) *FileIterator { +func NewFileIterator(r io.Reader, labels string, sp logqllog.StreamPipeline) *FileIterator { s := bufio.NewScanner(r) s.Split(bufio.ScanLines) return &FileIterator{ s: s, labels: labels, + sp: sp, } } func (f *FileIterator) Next() bool { - return f.s.Scan() + for f.s.Scan() { + _, _, skip := f.sp.Process([]byte(f.s.Text())) + if skip { + f.curr = logproto.Entry{ + Timestamp: time.Now(), + Line: f.s.Text(), + } + return true + } + } + return false } func (f *FileIterator) Entry() logproto.Entry { - return logproto.Entry{ - Timestamp: time.Now(), - Line: f.s.Text(), - } + return f.curr } func (f *FileIterator) Labels() string { diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 87273e7b1337..b24df7951680 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -4,6 +4,7 @@ import ( "context" "errors" "flag" + "fmt" "math" "sort" "time" @@ -164,6 +165,7 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { defer util.LogErrorWithContext(ctx, "closing iterator", iter.Close) streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval()) + fmt.Println("streams", streams, "end") return streams, err default: return nil, errors.New("Unexpected type (%T): cannot evaluate") diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 052136412d4d..a6a35a80f923 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -1,7 +1,6 @@ package querier import ( - "bytes" "context" "fmt" "net/http" @@ -61,19 +60,19 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { request.Limit, request.Shards, ) - fmt.Printf("requrest: %+v\n", request) + // fmt.Printf("requrest: %+v\n", request) query := q.engine.Query(params) result, err := query.Exec(ctx) if err != nil { serverutil.WriteError(err, w) return } - buf := bytes.Buffer{} - if err := marshal.WriteQueryResponseJSON(result, &buf); err != nil { - panic(err) - } + // buf := bytes.Buffer{} + // if err := marshal.WriteQueryResponseJSON(result, &buf); err != nil { + // panic(err) + // } - fmt.Printf("response: %+v\n", buf.String()) + // fmt.Printf("response: %+v\n", buf.String()) if err := marshal.WriteQueryResponseJSON(result, w); err != nil { serverutil.WriteError(err, w) return From e4fc46c453f6b0e014d45aec901ca669697980e9 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Tue, 6 Jul 2021 12:13:40 +0200 Subject: [PATCH 03/20] Demo checkpoint1 Signed-off-by: Kaviraj --- pkg/logcli/client/client.go | 119 +++++++++++++++++++++++++++++++++--- pkg/logql/engine.go | 2 - 2 files changed, 112 insertions(+), 9 deletions(-) diff --git a/pkg/logcli/client/client.go b/pkg/logcli/client/client.go index 74acb82bdc84..d28540978379 100644 --- a/pkg/logcli/client/client.go +++ b/pkg/logcli/client/client.go @@ -18,6 +18,7 @@ import ( json "github.com/json-iterator/go" "github.com/prometheus/common/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/loghttp" @@ -80,8 +81,78 @@ func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) return it, nil } -func (q *querier) SelectSamples(context.Context, logql.SelectSampleParams) (iter.SampleIterator, error) { - return nil, nil +func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { + expr, err := params.Expr() + if err != nil { + panic(err) + } + + fmt.Println("expr", expr.String()) + sampleExtractor, err := expr.Extractor() + if err != nil { + panic(err) + } + + streamSample := sampleExtractor.ForStream(labels.Labels{ + labels.Label{Name: "foo", Value: "bar"}, + }) + + it := NewFileSampleIterator(q.r, "source:logcli", streamSample) + + return it, nil +} + +type FileSampleIterator struct { + s *bufio.Scanner + labels string + err error + sp logqllog.StreamSampleExtractor + curr logproto.Sample +} + +func NewFileSampleIterator(r io.Reader, labels string, sp logqllog.StreamSampleExtractor) *FileSampleIterator { + s := bufio.NewScanner(r) + s.Split(bufio.ScanLines) + return &FileSampleIterator{ + s: s, + labels: labels, + sp: sp, + } +} + +func (f *FileSampleIterator) Next() bool { + for f.s.Scan() { + fmt.Println("input", f.s.Text()) + value, labels, ok := f.sp.Process([]byte(f.s.Text())) + fmt.Println("output", "value", value, "labels", labels, "skip", ok) + + if ok { + f.curr = logproto.Sample{ + Timestamp: time.Now().Unix(), + Value: value, + Hash: labels.Hash(), + } + return true + } + } + return false +} + +func (f *FileSampleIterator) Sample() logproto.Sample { + return logproto.Sample{} +} + +func (f *FileSampleIterator) Labels() string { + return f.labels +} + +func (f *FileSampleIterator) Error() error { + return f.err +} + +func (f *FileSampleIterator) Close() error { + // TODO: accept io.ReadCloser()? + return nil } type FileIterator struct { @@ -104,8 +175,8 @@ func NewFileIterator(r io.Reader, labels string, sp logqllog.StreamPipeline) *Fi func (f *FileIterator) Next() bool { for f.s.Scan() { - _, _, skip := f.sp.Process([]byte(f.s.Text())) - if skip { + _, _, ok := f.sp.Process([]byte(f.s.Text())) + if ok { f.curr = logproto.Entry{ Timestamp: time.Now(), Line: f.s.Text(), @@ -153,13 +224,47 @@ func NewFileClient(r io.Reader) *FileClient { } func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { - return nil, nil + ctx := context.Background() + + ctx = user.InjectOrgID(ctx, "fake") + + params := logql.NewLiteralParams( + q, + t, t, + 0, + 0, + direction, + uint32(limit), + nil, + ) + + query := f.engine.Query(params) + + result, err := query.Exec(ctx) + if err != nil { + return nil, fmt.Errorf("failed to exec query: %w", err) + } + + value, err := marshal.NewResultValue(result.Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal result data: %w", err) + } + + return &loghttp.QueryResponse{ + Status: "success", + Data: loghttp.QueryResponseData{ + ResultType: value.Type(), + Result: value, + Statistics: result.Statistics, + }, + }, nil } func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { - ctx := context.Background() + ctx = user.InjectOrgID(ctx, "fake") + params := logql.NewLiteralParams( queryStr, start, @@ -205,7 +310,7 @@ func (f *FileClient) Series(matchers []string, start, end time.Time, quiet bool) return &loghttp.SeriesResponse{}, nil } func (f *FileClient) LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error) { - return nil, nil + panic("Not supported") } func (f *FileClient) GetOrgID() string { diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index b24df7951680..87273e7b1337 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -4,7 +4,6 @@ import ( "context" "errors" "flag" - "fmt" "math" "sort" "time" @@ -165,7 +164,6 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { defer util.LogErrorWithContext(ctx, "closing iterator", iter.Close) streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval()) - fmt.Println("streams", streams, "end") return streams, err default: return nil, errors.New("Unexpected type (%T): cannot evaluate") From 87684f8f2bb7aafceef7d9d1242e2e4f19f7ebd8 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 8 Jul 2021 11:41:10 +0200 Subject: [PATCH 04/20] Metrics query working. Issue was with default value of `step` (1 nanosecond). StepEvaluator try to go through every nanosecond to apply aggregate on sample data during the metric queries. Signed-off-by: Kaviraj --- cmd/logcli/main.go | 14 ++++++++++++++ pkg/logcli/client/client.go | 37 +++++++++++++++++++------------------ 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 1abc9eed0d94..820260a5cebd 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "math" "net/url" "os" "runtime/pprof" @@ -141,6 +142,13 @@ func main() { if *stdin { queryClient = client.NewFileClient(os.Stdin) + if rangeQuery.Step.Seconds() == 0 { + // Set default value for `step` based on `start` and `end`. + // In non-stdin case, this is set on Loki server side. + // If this is not set, then `step` will have default value of 1 nanosecond and `STepEvaluator` will go through every nanosecond when applying aggregation during metric queries. + rangeQuery.Step = defaultQueryRangeStep(rangeQuery.Start, rangeQuery.End) + } + } switch cmd { @@ -312,6 +320,7 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query { cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step) cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval) cmd.Flag("batch", "Query batch size to use until 'limit' is reached").Default("1000").IntVar(&q.BatchSize) + } cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward) @@ -338,3 +347,8 @@ func mustParse(t string, defaultTime time.Time) time.Time { return ret } + +func defaultQueryRangeStep(start, end time.Time) time.Duration { + step := int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1)) + return time.Duration(step) * time.Second +} diff --git a/pkg/logcli/client/client.go b/pkg/logcli/client/client.go index d28540978379..4346d661fb4d 100644 --- a/pkg/logcli/client/client.go +++ b/pkg/logcli/client/client.go @@ -14,6 +14,7 @@ import ( "strings" "time" + "github.com/cespare/xxhash" "github.com/gorilla/websocket" json "github.com/json-iterator/go" "github.com/prometheus/common/config" @@ -87,7 +88,6 @@ func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa panic(err) } - fmt.Println("expr", expr.String()) sampleExtractor, err := expr.Extractor() if err != nil { panic(err) @@ -97,40 +97,41 @@ func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa labels.Label{Name: "foo", Value: "bar"}, }) - it := NewFileSampleIterator(q.r, "source:logcli", streamSample) + it := NewFileSampleIterator(q.r, params.Start, params.End, "source:logcli", streamSample) return it, nil } type FileSampleIterator struct { - s *bufio.Scanner - labels string - err error - sp logqllog.StreamSampleExtractor - curr logproto.Sample + s *bufio.Scanner + labels string + err error + sp logqllog.StreamSampleExtractor + curr logproto.Sample + start, end time.Time } -func NewFileSampleIterator(r io.Reader, labels string, sp logqllog.StreamSampleExtractor) *FileSampleIterator { +func NewFileSampleIterator(r io.Reader, start, end time.Time, labels string, sp logqllog.StreamSampleExtractor) *FileSampleIterator { s := bufio.NewScanner(r) s.Split(bufio.ScanLines) return &FileSampleIterator{ s: s, labels: labels, sp: sp, + start: start, + end: end, } } func (f *FileSampleIterator) Next() bool { for f.s.Scan() { - fmt.Println("input", f.s.Text()) - value, labels, ok := f.sp.Process([]byte(f.s.Text())) - fmt.Println("output", "value", value, "labels", labels, "skip", ok) - + value, _, ok := f.sp.Process([]byte(f.s.Text())) + ts := f.start.Add(2 * time.Minute) if ok { f.curr = logproto.Sample{ - Timestamp: time.Now().Unix(), + Timestamp: ts.UnixNano(), Value: value, - Hash: labels.Hash(), + Hash: xxhash.Sum64(f.s.Bytes()), } return true } @@ -139,7 +140,7 @@ func (f *FileSampleIterator) Next() bool { } func (f *FileSampleIterator) Sample() logproto.Sample { - return logproto.Sample{} + return f.curr } func (f *FileSampleIterator) Labels() string { @@ -175,11 +176,11 @@ func NewFileIterator(r io.Reader, labels string, sp logqllog.StreamPipeline) *Fi func (f *FileIterator) Next() bool { for f.s.Scan() { - _, _, ok := f.sp.Process([]byte(f.s.Text())) + line, _, ok := f.sp.Process([]byte(f.s.Text())) if ok { f.curr = logproto.Entry{ Timestamp: time.Now(), - Line: f.s.Text(), + Line: string(line), } return true } @@ -212,7 +213,7 @@ type FileClient struct { } func NewFileClient(r io.Reader) *FileClient { - eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r}, &limiter{}) + eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r}, &limiter{n: 1024}) return &FileClient{ r: r, orgID: "fake", From 2e67f0575c6fdda894dc45d2e035b6d4094bc331 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Sun, 11 Jul 2021 03:05:16 +0200 Subject: [PATCH 05/20] Basic tests for fileClient --- pkg/logcli/client/client.go | 275 --------------------------- pkg/logcli/client/file.go | 326 +++++++++++++++++++++++++++++++++ pkg/logcli/client/file_test.go | 75 ++++++++ 3 files changed, 401 insertions(+), 275 deletions(-) create mode 100644 pkg/logcli/client/file.go create mode 100644 pkg/logcli/client/file_test.go diff --git a/pkg/logcli/client/client.go b/pkg/logcli/client/client.go index 4346d661fb4d..d7d2be1c57c0 100644 --- a/pkg/logcli/client/client.go +++ b/pkg/logcli/client/client.go @@ -1,11 +1,8 @@ package client import ( - "bufio" - "context" "encoding/base64" "fmt" - "io" "io/ioutil" "log" "net/http" @@ -14,21 +11,14 @@ import ( "strings" "time" - "github.com/cespare/xxhash" "github.com/gorilla/websocket" json "github.com/json-iterator/go" "github.com/prometheus/common/config" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/weaveworks/common/user" - "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" - logqllog "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/build" - "github.com/grafana/loki/pkg/util/marshal" ) const ( @@ -53,271 +43,6 @@ type Client interface { GetOrgID() string } -type limiter struct { - n int -} - -func (l *limiter) MaxQuerySeries(userID string) int { - return l.n -} - -type querier struct { - r io.Reader -} - -func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { - expr, err := params.LogSelector() - if err != nil { - panic(err) - } - pipeline, err := expr.Pipeline() - if err != nil { - panic(err) - } - streampipe := pipeline.ForStream(labels.Labels{ - labels.Label{Name: "foo", Value: "bar"}, - }) - it := NewFileIterator(q.r, "source:logcli", streampipe) - - return it, nil -} - -func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { - expr, err := params.Expr() - if err != nil { - panic(err) - } - - sampleExtractor, err := expr.Extractor() - if err != nil { - panic(err) - } - - streamSample := sampleExtractor.ForStream(labels.Labels{ - labels.Label{Name: "foo", Value: "bar"}, - }) - - it := NewFileSampleIterator(q.r, params.Start, params.End, "source:logcli", streamSample) - - return it, nil -} - -type FileSampleIterator struct { - s *bufio.Scanner - labels string - err error - sp logqllog.StreamSampleExtractor - curr logproto.Sample - start, end time.Time -} - -func NewFileSampleIterator(r io.Reader, start, end time.Time, labels string, sp logqllog.StreamSampleExtractor) *FileSampleIterator { - s := bufio.NewScanner(r) - s.Split(bufio.ScanLines) - return &FileSampleIterator{ - s: s, - labels: labels, - sp: sp, - start: start, - end: end, - } -} - -func (f *FileSampleIterator) Next() bool { - for f.s.Scan() { - value, _, ok := f.sp.Process([]byte(f.s.Text())) - ts := f.start.Add(2 * time.Minute) - if ok { - f.curr = logproto.Sample{ - Timestamp: ts.UnixNano(), - Value: value, - Hash: xxhash.Sum64(f.s.Bytes()), - } - return true - } - } - return false -} - -func (f *FileSampleIterator) Sample() logproto.Sample { - return f.curr -} - -func (f *FileSampleIterator) Labels() string { - return f.labels -} - -func (f *FileSampleIterator) Error() error { - return f.err -} - -func (f *FileSampleIterator) Close() error { - // TODO: accept io.ReadCloser()? - return nil -} - -type FileIterator struct { - s *bufio.Scanner - labels string - err error - sp logqllog.StreamPipeline - curr logproto.Entry -} - -func NewFileIterator(r io.Reader, labels string, sp logqllog.StreamPipeline) *FileIterator { - s := bufio.NewScanner(r) - s.Split(bufio.ScanLines) - return &FileIterator{ - s: s, - labels: labels, - sp: sp, - } -} - -func (f *FileIterator) Next() bool { - for f.s.Scan() { - line, _, ok := f.sp.Process([]byte(f.s.Text())) - if ok { - f.curr = logproto.Entry{ - Timestamp: time.Now(), - Line: string(line), - } - return true - } - } - return false -} - -func (f *FileIterator) Entry() logproto.Entry { - return f.curr -} - -func (f *FileIterator) Labels() string { - return f.labels -} - -func (f *FileIterator) Error() error { - return f.err -} - -func (f *FileIterator) Close() error { - return nil -} - -type FileClient struct { - r io.Reader - labels []string - labelValues []string - orgID string - engine *logql.Engine -} - -func NewFileClient(r io.Reader) *FileClient { - eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r}, &limiter{n: 1024}) - return &FileClient{ - r: r, - orgID: "fake", - engine: eng, - labels: []string{"foo"}, - labelValues: []string{"bar"}, - } - -} - -func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { - ctx := context.Background() - - ctx = user.InjectOrgID(ctx, "fake") - - params := logql.NewLiteralParams( - q, - t, t, - 0, - 0, - direction, - uint32(limit), - nil, - ) - - query := f.engine.Query(params) - - result, err := query.Exec(ctx) - if err != nil { - return nil, fmt.Errorf("failed to exec query: %w", err) - } - - value, err := marshal.NewResultValue(result.Data) - if err != nil { - return nil, fmt.Errorf("failed to marshal result data: %w", err) - } - - return &loghttp.QueryResponse{ - Status: "success", - Data: loghttp.QueryResponseData{ - ResultType: value.Type(), - Result: value, - Statistics: result.Statistics, - }, - }, nil -} - -func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { - ctx := context.Background() - - ctx = user.InjectOrgID(ctx, "fake") - - params := logql.NewLiteralParams( - queryStr, - start, - end, - step, - interval, - direction, - uint32(limit), - nil, - ) - - query := f.engine.Query(params) - - result, err := query.Exec(ctx) - if err != nil { - return nil, err - } - - value, err := marshal.NewResultValue(result.Data) - if err != nil { - return nil, err - } - - return &loghttp.QueryResponse{ - Status: "success", - Data: loghttp.QueryResponseData{ - ResultType: value.Type(), - Result: value, - Statistics: result.Statistics, - }, - }, nil -} - -func (f *FileClient) ListLabelNames(quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { - return &loghttp.LabelResponse{}, nil -} - -func (f *FileClient) ListLabelValues(name string, quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { - return &loghttp.LabelResponse{}, nil -} - -func (f *FileClient) Series(matchers []string, start, end time.Time, quiet bool) (*loghttp.SeriesResponse, error) { - return &loghttp.SeriesResponse{}, nil -} -func (f *FileClient) LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error) { - panic("Not supported") -} - -func (f *FileClient) GetOrgID() string { - return f.orgID -} - // Tripperware can wrap a roundtripper. type Tripperware func(http.RoundTripper) http.RoundTripper diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go new file mode 100644 index 000000000000..97ee558c2111 --- /dev/null +++ b/pkg/logcli/client/file.go @@ -0,0 +1,326 @@ +package client + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "sort" + "time" + + "github.com/cespare/xxhash" + "github.com/gorilla/websocket" + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + logqllog "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/marshal" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/user" +) + +const ( + defaultLabelKey = "source" + defaultLabelValue = "logcli" + defaultOrgID = "logcli" + defaultMetricSeriesLimit = 1024 +) + +var ( + ErrNotSupported = errors.New("not supported") +) + +// FileClient is a type of LogCLI client that do LogQL on log lines from +// the given file directly, instead sending it to the Loki servers. +type FileClient struct { + r io.ReadCloser + labels []string + labelValues []string + orgID string + engine *logql.Engine +} + +// NewFileClient returns the new instance of FileClient for the given `io.ReadCloser` +func NewFileClient(r io.ReadCloser) *FileClient { + eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r}, &limiter{n: defaultMetricSeriesLimit}) + return &FileClient{ + r: r, + orgID: defaultOrgID, + engine: eng, + labels: []string{defaultLabelKey}, + labelValues: []string{defaultLabelValue}, + } + +} + +func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { + ctx := context.Background() + + ctx = user.InjectOrgID(ctx, f.orgID) + + params := logql.NewLiteralParams( + q, + t, t, + 0, + 0, + direction, + uint32(limit), + nil, + ) + + query := f.engine.Query(params) + + result, err := query.Exec(ctx) + if err != nil { + return nil, fmt.Errorf("failed to exec query: %w", err) + } + + value, err := marshal.NewResultValue(result.Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal result data: %w", err) + } + + return &loghttp.QueryResponse{ + Status: "success", + Data: loghttp.QueryResponseData{ + ResultType: value.Type(), + Result: value, + Statistics: result.Statistics, + }, + }, nil +} + +func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { + ctx := context.Background() + + ctx = user.InjectOrgID(ctx, f.orgID) + + params := logql.NewLiteralParams( + queryStr, + start, + end, + step, + interval, + direction, + uint32(limit), + nil, + ) + + query := f.engine.Query(params) + + result, err := query.Exec(ctx) + if err != nil { + return nil, err + } + + value, err := marshal.NewResultValue(result.Data) + if err != nil { + return nil, err + } + + return &loghttp.QueryResponse{ + Status: "success", + Data: loghttp.QueryResponseData{ + ResultType: value.Type(), + Result: value, + Statistics: result.Statistics, + }, + }, nil +} + +func (f *FileClient) ListLabelNames(quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { + return &loghttp.LabelResponse{ + Status: loghttp.QueryStatusSuccess, + Data: f.labels, + }, nil +} + +func (f *FileClient) ListLabelValues(name string, quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { + i := sort.SearchStrings(f.labels, name) + if i < 0 { + return &loghttp.LabelResponse{}, nil + } + + return &loghttp.LabelResponse{ + Status: loghttp.QueryStatusSuccess, + Data: []string{f.labelValues[i]}, + }, nil +} + +func (f *FileClient) Series(matchers []string, start, end time.Time, quiet bool) (*loghttp.SeriesResponse, error) { + m := len(f.labels) + if m > len(f.labelValues) { + m = len(f.labelValues) + } + + lbs := make(loghttp.LabelSet) + for i := 0; i < m; i++ { + lbs[f.labels[i]] = f.labelValues[i] + } + + return &loghttp.SeriesResponse{ + Status: loghttp.QueryStatusSuccess, + Data: []loghttp.LabelSet{lbs}, + }, nil +} + +func (f *FileClient) LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error) { + return nil, fmt.Errorf("LiveTailQuery: %w", ErrNotSupported) +} + +func (f *FileClient) GetOrgID() string { + return f.orgID +} + +type limiter struct { + n int +} + +func (l *limiter) MaxQuerySeries(userID string) int { + return l.n +} + +type querier struct { + r io.Reader +} + +func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { + expr, err := params.LogSelector() + if err != nil { + panic(err) + } + pipeline, err := expr.Pipeline() + if err != nil { + panic(err) + } + streampipe := pipeline.ForStream(labels.Labels{ + labels.Label{Name: "foo", Value: "bar"}, + }) + it := NewFileIterator(q.r, "source:logcli", streampipe) + + return it, nil +} + +func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { + expr, err := params.Expr() + if err != nil { + panic(err) + } + + sampleExtractor, err := expr.Extractor() + if err != nil { + panic(err) + } + + streamSample := sampleExtractor.ForStream(labels.Labels{ + labels.Label{Name: "foo", Value: "bar"}, + }) + + it := NewFileSampleIterator(q.r, params.Start, params.End, "source:logcli", streamSample) + + return it, nil +} + +type FileSampleIterator struct { + s *bufio.Scanner + labels string + err error + sp logqllog.StreamSampleExtractor + curr logproto.Sample + start, end time.Time +} + +func NewFileSampleIterator(r io.Reader, start, end time.Time, labels string, sp logqllog.StreamSampleExtractor) *FileSampleIterator { + s := bufio.NewScanner(r) + s.Split(bufio.ScanLines) + return &FileSampleIterator{ + s: s, + labels: labels, + sp: sp, + start: start, + end: end, + } +} + +func (f *FileSampleIterator) Next() bool { + for f.s.Scan() { + value, _, ok := f.sp.Process([]byte(f.s.Text())) + ts := f.start.Add(2 * time.Minute) + if ok { + f.curr = logproto.Sample{ + Timestamp: ts.UnixNano(), + Value: value, + Hash: xxhash.Sum64(f.s.Bytes()), + } + return true + } + } + return false +} + +func (f *FileSampleIterator) Sample() logproto.Sample { + return f.curr +} + +func (f *FileSampleIterator) Labels() string { + return f.labels +} + +func (f *FileSampleIterator) Error() error { + return f.err +} + +func (f *FileSampleIterator) Close() error { + // TODO: accept io.ReadCloser()? + return nil +} + +type FileIterator struct { + s *bufio.Scanner + labels string + err error + sp logqllog.StreamPipeline + curr logproto.Entry +} + +func NewFileIterator(r io.Reader, labels string, sp logqllog.StreamPipeline) *FileIterator { + s := bufio.NewScanner(r) + s.Split(bufio.ScanLines) + return &FileIterator{ + s: s, + labels: labels, + sp: sp, + } +} + +func (f *FileIterator) Next() bool { + for f.s.Scan() { + line, _, ok := f.sp.Process([]byte(f.s.Text())) + if ok { + f.curr = logproto.Entry{ + Timestamp: time.Now(), + Line: string(line), + } + return true + } + } + return false +} + +func (f *FileIterator) Entry() logproto.Entry { + return f.curr +} + +func (f *FileIterator) Labels() string { + return f.labels +} + +func (f *FileIterator) Error() error { + return f.err +} + +func (f *FileIterator) Close() error { + return nil +} diff --git a/pkg/logcli/client/file_test.go b/pkg/logcli/client/file_test.go new file mode 100644 index 000000000000..67739879142d --- /dev/null +++ b/pkg/logcli/client/file_test.go @@ -0,0 +1,75 @@ +package client + +import ( + "bytes" + "errors" + "io" + "testing" + "time" + + "github.com/grafana/loki/pkg/loghttp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFileClient_QueryRange(t *testing.T) { + +} + +func TestFileClient_Query(t *testing.T) { + +} + +func TestFileClient_ListLabelNames(t *testing.T) { + c := newEmptyClient(t) + values, err := c.ListLabelNames(true, time.Now(), time.Now()) + require.NoError(t, err) + assert.Equal(t, &loghttp.LabelResponse{ + Data: []string{defaultLabelKey}, + Status: loghttp.QueryStatusSuccess, + }, values) +} + +func TestFileClient_ListLabelValues(t *testing.T) { + c := newEmptyClient(t) + values, err := c.ListLabelValues(defaultLabelKey, true, time.Now(), time.Now()) + require.NoError(t, err) + assert.Equal(t, &loghttp.LabelResponse{ + Data: []string{defaultLabelValue}, + Status: loghttp.QueryStatusSuccess, + }, values) + +} + +func TestFileClient_Series(t *testing.T) { + c := newEmptyClient(t) + got, err := c.Series(nil, time.Now(), time.Now(), true) + require.NoError(t, err) + + exp := &loghttp.SeriesResponse{ + Data: []loghttp.LabelSet{ + {defaultLabelKey: defaultLabelValue}, + }, + Status: loghttp.QueryStatusSuccess, + } + + assert.Equal(t, exp, got) +} + +func TestFileClient_LiveTail(t *testing.T) { + c := newEmptyClient(t) + x, err := c.LiveTailQueryConn("", time.Second, 0, time.Now(), true) + require.Error(t, err) + require.Nil(t, x) + assert.True(t, errors.Is(err, ErrNotSupported)) +} + +func TestFileClient_GetOrgID(t *testing.T) { + c := newEmptyClient(t) + assert.Equal(t, defaultOrgID, c.GetOrgID()) +} + +func newEmptyClient(t *testing.T) *FileClient { + t.Helper() + return NewFileClient(io.NopCloser(&bytes.Buffer{})) +} From f05ca66ddc2fafc97467b108f21ee4a2da1f99f7 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Mon, 12 Jul 2021 00:18:05 +0200 Subject: [PATCH 06/20] Tests for logqueries direction --- .../cmd/promtail/promtail-local-config.yaml | 2 +- pkg/logcli/client/file.go | 40 ++++++-- pkg/logcli/client/file_test.go | 91 ++++++++++++++++++- 3 files changed, 125 insertions(+), 8 deletions(-) diff --git a/clients/cmd/promtail/promtail-local-config.yaml b/clients/cmd/promtail/promtail-local-config.yaml index 401195b956d2..0f4699642e19 100644 --- a/clients/cmd/promtail/promtail-local-config.yaml +++ b/clients/cmd/promtail/promtail-local-config.yaml @@ -15,4 +15,4 @@ scrape_configs: - localhost labels: job: varlogs - __path__: /Users/kaviraj/src/loki/testlog.txt + __path__: /home/kaviraj/src/loki/testlogs.txt diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 97ee558c2111..f488f1645eaf 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -6,7 +6,9 @@ import ( "errors" "fmt" "io" + "io/ioutil" "sort" + "strings" "time" "github.com/cespare/xxhash" @@ -26,6 +28,7 @@ const ( defaultLabelValue = "logcli" defaultOrgID = "logcli" defaultMetricSeriesLimit = 1024 + defaultMaxFileSize = 20 * (1 << 20) // 20MB ) var ( @@ -198,7 +201,11 @@ func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) streampipe := pipeline.ForStream(labels.Labels{ labels.Label{Name: "foo", Value: "bar"}, }) - it := NewFileIterator(q.r, "source:logcli", streampipe) + + it, err := NewFileIterator(q.r, params, streampipe) + if err != nil { + return nil, err + } return it, nil } @@ -273,26 +280,47 @@ func (f *FileSampleIterator) Error() error { } func (f *FileSampleIterator) Close() error { - // TODO: accept io.ReadCloser()? return nil } type FileIterator struct { s *bufio.Scanner - labels string err error sp logqllog.StreamPipeline curr logproto.Entry + params logql.SelectLogParams + labels string } -func NewFileIterator(r io.Reader, labels string, sp logqllog.StreamPipeline) *FileIterator { +func NewFileIterator(r io.Reader, params logql.SelectLogParams, sp logqllog.StreamPipeline) (*FileIterator, error) { s := bufio.NewScanner(r) + if params.Direction == logproto.FORWARD { + lr := io.LimitReader(r, defaultMaxFileSize) + b, err := ioutil.ReadAll(lr) + if err != nil { + return nil, err + } + lines := strings.FieldsFunc(string(b), func(r rune) bool { + if r == '\n' { + return true + } + return false + }) + // reverse + sort.Slice(lines, func(i, j int) bool { + return i > j + }) + + s = bufio.NewScanner(strings.NewReader(strings.Join(lines, "\n"))) + } + s.Split(bufio.ScanLines) + return &FileIterator{ s: s, - labels: labels, + params: params, sp: sp, - } + }, nil } func (f *FileIterator) Next() bool { diff --git a/pkg/logcli/client/file_test.go b/pkg/logcli/client/file_test.go index 67739879142d..0fb34088e458 100644 --- a/pkg/logcli/client/file_test.go +++ b/pkg/logcli/client/file_test.go @@ -4,16 +4,87 @@ import ( "bytes" "errors" "io" + "sort" + "strings" "testing" "time" "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logproto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestFileClient_QueryRange(t *testing.T) { +func TestFileClient_QueryRangeLogQueries(t *testing.T) { + input := []string{ + `level=info event="loki started" caller=main.go ts=1625995076`, + `level=info event="runtime loader started" caller=main.go ts=1625995077`, + `level=error event="unable to read rules directory" file="/tmp/rules" caller=rules.go ts=1625995090`, + `level=error event="failed to apply wal" error="/tmp/wal/ corrupted" caller=wal.go ts=1625996090`, + `level=info event="loki ready" caller=main.go ts=1625996095`, + } + + reversed := make([]string, len(input)) + copy(reversed, input) + sort.Slice(reversed, func(i, j int) bool { + return i > j + }) + + now := time.Now() + + cases := []struct { + name string + limit int + start, end time.Time + direction logproto.Direction + step, interval time.Duration + expectedStatus loghttp.QueryStatus + expected []string + }{ + { + name: "return-all-logs-backward", + limit: 10, // more than input + start: now.Add(-1 * time.Hour), + end: now, + direction: logproto.BACKWARD, + step: 0, // let client decide based on start and end + interval: 0, + expectedStatus: loghttp.QueryStatusSuccess, + expected: input, + }, + { + name: "return-all-logs-forward", + limit: 10, // more than input + start: now.Add(-1 * time.Hour), + end: now, + direction: logproto.FORWARD, + step: 0, // let the client decide based on start and end + interval: 0, + expectedStatus: loghttp.QueryStatusSuccess, + expected: reversed, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + client := NewFileClient(io.NopCloser(strings.NewReader(strings.Join(input, "\n")))) + resp, err := client.QueryRange( + `{foo="bar"}`, // label matcher doesn't matter. + c.limit, + c.start, + c.end, + c.direction, + c.step, + c.interval, + true, + ) + + require.NoError(t, err) + require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) + assertStreams(t, resp.Data.Result, c.expected) + }) + } } func TestFileClient_Query(t *testing.T) { @@ -73,3 +144,21 @@ func newEmptyClient(t *testing.T) *FileClient { t.Helper() return NewFileClient(io.NopCloser(&bytes.Buffer{})) } + +func assertStreams(t *testing.T, result loghttp.ResultValue, logLines []string) { + t.Helper() + + streams, ok := result.(loghttp.Streams) + require.True(t, ok, "response type should be `loghttp.Streams`") + + require.Len(t, streams, 1, "there should be only one stream for FileClient") + + got := streams[0] + sort.Slice(got.Entries, func(i, j int) bool { + return got.Entries[i].Timestamp.UnixNano() < got.Entries[j].Timestamp.UnixNano() + }) + require.Equal(t, len(got.Entries), len(logLines)) + for i, entry := range got.Entries { + assert.Equal(t, entry.Line, logLines[i]) + } +} From dde5a75f853b7f4805d9ae1978ee529af72ac391 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Mon, 12 Jul 2021 11:57:52 +0200 Subject: [PATCH 07/20] Use HeapIterator for Entries. Signed-off-by: Kaviraj --- pkg/logcli/client/file.go | 145 ++++++++++++++++++++------------------ 1 file changed, 77 insertions(+), 68 deletions(-) diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index f488f1645eaf..2fc87a2a9610 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -47,7 +47,14 @@ type FileClient struct { // NewFileClient returns the new instance of FileClient for the given `io.ReadCloser` func NewFileClient(r io.ReadCloser) *FileClient { - eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r}, &limiter{n: defaultMetricSeriesLimit}) + lbs := []labels.Label{ + { + Name: defaultLabelKey, + Value: defaultLabelValue, + }, + } + + eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r, labels: lbs}, &limiter{n: defaultMetricSeriesLimit}) return &FileClient{ r: r, orgID: defaultOrgID, @@ -186,28 +193,20 @@ func (l *limiter) MaxQuerySeries(userID string) int { } type querier struct { - r io.Reader + r io.Reader + labels labels.Labels } func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { expr, err := params.LogSelector() if err != nil { - panic(err) + return nil, fmt.Errorf("failed to extract selector for logs: %w", err) } pipeline, err := expr.Pipeline() if err != nil { - panic(err) + return nil, fmt.Errorf("failed to extract pipeline for logs: %w", err) } - streampipe := pipeline.ForStream(labels.Labels{ - labels.Label{Name: "foo", Value: "bar"}, - }) - - it, err := NewFileIterator(q.r, params, streampipe) - if err != nil { - return nil, err - } - - return it, nil + return newFileIterator(ctx, q.r, q.labels, params, pipeline.ForStream(q.labels)) } func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { @@ -283,72 +282,82 @@ func (f *FileSampleIterator) Close() error { return nil } -type FileIterator struct { - s *bufio.Scanner - err error - sp logqllog.StreamPipeline - curr logproto.Entry - params logql.SelectLogParams - labels string -} +// this is the generated timestamp for each input log line based on start and end +// of the query. +// start and end are unix timestamps in secs. +func assignTimestamps(lines []string, start, end int64) map[string]int64 { + res := make(map[string]int64) -func NewFileIterator(r io.Reader, params logql.SelectLogParams, sp logqllog.StreamPipeline) (*FileIterator, error) { - s := bufio.NewScanner(r) - if params.Direction == logproto.FORWARD { - lr := io.LimitReader(r, defaultMaxFileSize) - b, err := ioutil.ReadAll(lr) - if err != nil { - return nil, err - } - lines := strings.FieldsFunc(string(b), func(r rune) bool { - if r == '\n' { - return true - } - return false - }) - // reverse - sort.Slice(lines, func(i, j int) bool { - return i > j - }) + n := int64(len(lines)) - s = bufio.NewScanner(strings.NewReader(strings.Join(lines, "\n"))) + if end < start { + panic("`start` cannot be after `end`") } - s.Split(bufio.ScanLines) + step := (end - start) / n - return &FileIterator{ - s: s, - params: params, - sp: sp, - }, nil + for i, line := range lines { + fmt.Println("line", line, "ts", (step*int64(i+1) + start)) + res[line] = (step * int64(i+1)) + start + } + + return res } -func (f *FileIterator) Next() bool { - for f.s.Scan() { - line, _, ok := f.sp.Process([]byte(f.s.Text())) - if ok { - f.curr = logproto.Entry{ - Timestamp: time.Now(), - Line: string(line), - } +func newFileIterator( + ctx context.Context, + r io.Reader, + labels labels.Labels, + params logql.SelectLogParams, + pipeline logqllog.StreamPipeline, +) (iter.EntryIterator, error) { + + lr := io.LimitReader(r, defaultMaxFileSize) + b, err := ioutil.ReadAll(lr) + if err != nil { + return nil, err + } + lines := strings.FieldsFunc(string(b), func(r rune) bool { + if r == '\n' { return true } + return false + }) + + if len(lines) == 0 { + return iter.NoopIterator, nil } - return false -} -func (f *FileIterator) Entry() logproto.Entry { - return f.curr -} + stream := logproto.Stream{ + Labels: labels.String(), + } -func (f *FileIterator) Labels() string { - return f.labels -} + fmt.Println("directions", params.Direction) -func (f *FileIterator) Error() error { - return f.err -} + // reverse all the input lines if direction == FORWARD + if params.Direction == logproto.FORWARD { + sort.Slice(lines, func(i, j int) bool { + return i > j + }) + } -func (f *FileIterator) Close() error { - return nil + // timestamps := assignTimestamps(lines, params.Start.Unix(), params.End.Unix()) + + for _, line := range lines { + parsedLine, _, ok := pipeline.ProcessString(line) + if !ok { + continue + } + stream.Entries = append(stream.Entries, logproto.Entry{ + Timestamp: time.Now(), + // Timestamp: time.Unix(timestamps[line], 0), + Line: parsedLine, + }) + } + + return iter.NewHeapIterator( + ctx, + []iter.EntryIterator{iter.NewStreamIterator(stream)}, + params.Direction, + ), nil } From 3e8c36edb7fc9e2851c8b3674fb22b3f381f20b5 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Mon, 1 Nov 2021 16:39:20 +0100 Subject: [PATCH 08/20] Remove some debug statements Signed-off-by: Kaviraj --- pkg/querier/http.go | 8 -------- pkg/querier/querier.go | 2 -- 2 files changed, 10 deletions(-) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index a6a35a80f923..6d9647f37c40 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -34,7 +34,6 @@ type QueryResponse struct { // RangeQueryHandler is a http.HandlerFunc for range queries. func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { - fmt.Println("Am i in range query?") // Enforce the query timeout while querying backends ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() @@ -60,19 +59,12 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { request.Limit, request.Shards, ) - // fmt.Printf("requrest: %+v\n", request) query := q.engine.Query(params) result, err := query.Exec(ctx) if err != nil { serverutil.WriteError(err, w) return } - // buf := bytes.Buffer{} - // if err := marshal.WriteQueryResponseJSON(result, &buf); err != nil { - // panic(err) - // } - - // fmt.Printf("response: %+v\n", buf.String()) if err := marshal.WriteQueryResponseJSON(result, w); err != nil { serverutil.WriteError(err, w) return diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 82ab3829511f..20578021b9fd 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -3,7 +3,6 @@ package querier import ( "context" "flag" - "fmt" "net/http" "time" @@ -90,7 +89,6 @@ func (q *Querier) SetQueryable(queryable logql.Querier) { // Select Implements logql.Querier which select logs via matchers and regex filters. func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { - fmt.Println("I'm here", "params", params) var err error params.Start, params.End, err = q.validateQueryRequest(ctx, params) if err != nil { From 8bc703a5d51f605c9eca0bc71d2fe697b104b493 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Nov 2021 11:26:30 +0100 Subject: [PATCH 09/20] Remove support for metric queries. Stick with only log queries Signed-off-by: Kaviraj --- .../cmd/promtail/promtail-local-config.yaml | 2 +- pkg/logcli/client/file.go | 137 +++++++++--------- pkg/querier/http.go | 3 - 3 files changed, 69 insertions(+), 73 deletions(-) diff --git a/clients/cmd/promtail/promtail-local-config.yaml b/clients/cmd/promtail/promtail-local-config.yaml index 0f4699642e19..3b9256537ec8 100644 --- a/clients/cmd/promtail/promtail-local-config.yaml +++ b/clients/cmd/promtail/promtail-local-config.yaml @@ -15,4 +15,4 @@ scrape_configs: - localhost labels: job: varlogs - __path__: /home/kaviraj/src/loki/testlogs.txt + __path__: /var/log/*log diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 2fc87a2a9610..0f6308234bb6 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -1,7 +1,6 @@ package client import ( - "bufio" "context" "errors" "fmt" @@ -11,7 +10,6 @@ import ( "strings" "time" - "github.com/cespare/xxhash" "github.com/gorilla/websocket" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/loghttp" @@ -36,7 +34,7 @@ var ( ) // FileClient is a type of LogCLI client that do LogQL on log lines from -// the given file directly, instead sending it to the Loki servers. +// the given file directly, instead get log lines from Loki servers. type FileClient struct { r io.ReadCloser labels []string @@ -210,77 +208,78 @@ func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) } func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { - expr, err := params.Expr() - if err != nil { - panic(err) - } - - sampleExtractor, err := expr.Extractor() - if err != nil { - panic(err) - } - - streamSample := sampleExtractor.ForStream(labels.Labels{ - labels.Label{Name: "foo", Value: "bar"}, - }) - - it := NewFileSampleIterator(q.r, params.Start, params.End, "source:logcli", streamSample) - - return it, nil -} - -type FileSampleIterator struct { - s *bufio.Scanner - labels string - err error - sp logqllog.StreamSampleExtractor - curr logproto.Sample - start, end time.Time -} + return nil, fmt.Errorf("Metrics Query: %w", ErrNotSupported) + // expr, err := params.Expr() + // if err != nil { + // panic(err) + // } -func NewFileSampleIterator(r io.Reader, start, end time.Time, labels string, sp logqllog.StreamSampleExtractor) *FileSampleIterator { - s := bufio.NewScanner(r) - s.Split(bufio.ScanLines) - return &FileSampleIterator{ - s: s, - labels: labels, - sp: sp, - start: start, - end: end, - } -} + // sampleExtractor, err := expr.Extractor() + // if err != nil { + // panic(err) + // } -func (f *FileSampleIterator) Next() bool { - for f.s.Scan() { - value, _, ok := f.sp.Process([]byte(f.s.Text())) - ts := f.start.Add(2 * time.Minute) - if ok { - f.curr = logproto.Sample{ - Timestamp: ts.UnixNano(), - Value: value, - Hash: xxhash.Sum64(f.s.Bytes()), - } - return true - } - } - return false -} + // streamSample := sampleExtractor.ForStream(labels.Labels{ + // labels.Label{Name: "foo", Value: "bar"}, + // }) -func (f *FileSampleIterator) Sample() logproto.Sample { - return f.curr -} + // it := NewFileSampleIterator(q.r, params.Start, params.End, "source:logcli", streamSample) -func (f *FileSampleIterator) Labels() string { - return f.labels + // return it, nil } -func (f *FileSampleIterator) Error() error { - return f.err -} - -func (f *FileSampleIterator) Close() error { - return nil -} +// type FileSampleIterator struct { +// s *bufio.Scanner +// labels string +// err error +// sp logqllog.StreamSampleExtractor +// curr logproto.Sample +// start, end time.Time +// } + +// func NewFileSampleIterator(r io.Reader, start, end time.Time, labels string, sp logqllog.StreamSampleExtractor) *FileSampleIterator { +// s := bufio.NewScanner(r) +// s.Split(bufio.ScanLines) +// return &FileSampleIterator{ +// s: s, +// labels: labels, +// sp: sp, +// start: start, +// end: end, +// } +// } + +// func (f *FileSampleIterator) Next() bool { +// for f.s.Scan() { +// value, _, ok := f.sp.Process([]byte(f.s.Text())) +// ts := f.start.Add(2 * time.Minute) +// if ok { +// f.curr = logproto.Sample{ +// Timestamp: ts.UnixNano(), +// Value: value, +// Hash: xxhash.Sum64(f.s.Bytes()), +// } +// return true +// } +// } +// return false +// } + +// func (f *FileSampleIterator) Sample() logproto.Sample { +// return f.curr +// } + +// func (f *FileSampleIterator) Labels() string { +// return f.labels +// } + +// func (f *FileSampleIterator) Error() error { +// return f.err +// } + +// func (f *FileSampleIterator) Close() error { +// return nil +// } // this is the generated timestamp for each input log line based on start and end // of the query. @@ -332,7 +331,7 @@ func newFileIterator( Labels: labels.String(), } - fmt.Println("directions", params.Direction) + // fmt.Println("directions", params.Direction) // reverse all the input lines if direction == FORWARD if params.Direction == logproto.FORWARD { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 6d9647f37c40..8910aad5b109 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -2,7 +2,6 @@ package querier import ( "context" - "fmt" "net/http" "time" @@ -73,8 +72,6 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { // InstantQueryHandler is a http.HandlerFunc for instant queries. func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) { - fmt.Println("Am i in instant query?") - // Enforce the query timeout while querying backends ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() From a4b454fbbf22615b212943dc3faf410b55c4f048 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Nov 2021 15:56:37 +0100 Subject: [PATCH 10/20] Small rough usage doc --- cmd/logcli/main.go | 19 ++++++++++++ docs/sources/getting-started/logcli.md | 40 ++++++++++++++++++++------ 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 820260a5cebd..af8069c7704f 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -1,11 +1,13 @@ package main import ( + "fmt" "log" "math" "net/url" "os" "runtime/pprof" + "strings" "time" "github.com/prometheus/common/config" @@ -149,6 +151,22 @@ func main() { rangeQuery.Step = defaultQueryRangeStep(rangeQuery.Start, rangeQuery.End) } + // When `--stdin` flag is set, stream selector is optional in the query. + // But logQL package through parser error if stream selector is not provided. + // So we inject "dummy" stream selector if not provided by user already. + // Which brings down to two way of using LogQL query. + // 1. Query with stream selector(e.g: `{foo="bar"}|="error"`) + // 2. Query without stream selector (e.g: `|="error"`) + + qs := rangeQuery.QueryString + if strings.HasPrefix(strings.TrimSpace(qs), "|") { + // inject the dummy stream selector + qs = `{source="logcli"}` + qs + rangeQuery.QueryString = qs + } + + // `--limit` doesn't make sense when using `--stdin` flag. + rangeQuery.Limit = math.MaxInt // TODO(kavi): is it a good idea? } switch cmd { @@ -172,6 +190,7 @@ func main() { if *tail || *follow { rangeQuery.TailQuery(time.Duration(*delayFor)*time.Second, queryClient, out) } else { + fmt.Println("Debug: Query string", rangeQuery.QueryString) rangeQuery.DoQuery(queryClient, out, *statistics) } case instantQueryCmd.FullCommand(): diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 836f2caf0226..74293e0f14f7 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -74,13 +74,13 @@ When not set, `--limit` defaults to 30. The limit protects the user from overwhelming the system for cases in which the specified query would have returned a large quantity of log lines. -The limit also protects the user from unexpectedly large responses. +The limit also protects the user from unexpectedly large responses. The quantity of log line results that arrive in each batch is set by the `--batch` option in a `logcli query` command. When not set, `--batch` defaults to 1000. -Setting a `--limit` value larger than the `--batch` value causes the +Setting a `--limit` value larger than the `--batch` value causes the requests from LogCLI to Loki to be batched. Loki has a server-side limit that defaults to 5000 for the maximum quantity of lines returned for a single query. @@ -120,7 +120,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --addr="http://localhost:3100" + --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. --username="" Username for HTTP basic auth. Can also be set using @@ -275,7 +275,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --addr="http://localhost:3100" + --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. --username="" Username for HTTP basic auth. Can also be set using @@ -308,9 +308,9 @@ Flags: --batch=1000 Query batch size to use until 'limit' is reached --forward Scan forwards through logs. --no-labels Do not print any labels - --exclude-label=EXCLUDE-LABEL ... + --exclude-label=EXCLUDE-LABEL ... Exclude labels given the provided key during output. - --include-label=INCLUDE-LABEL ... + --include-label=INCLUDE-LABEL ... Include labels given the provided key during output. --labels-length=0 Set a fixed padding to labels --store-config="" Execute the current query using a configured storage @@ -346,7 +346,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --addr="http://localhost:3100" + --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. --username="" Username for HTTP basic auth. Can also be set using @@ -402,7 +402,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --addr="http://localhost:3100" + --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. --username="" Username for HTTP basic auth. Can also be set using @@ -430,3 +430,27 @@ Flags: Args: eg '{foo="bar",baz=~".*blip"}' ``` + +### LogCLI `--stdin` usage + +You can consume log lines from your `stdin` instead of Loki servers. + +Say you have log files in your local, and just want to do run some LogQL queries for that, `--stdin` flag can help. + +***NOTE: Currently it doesn't support any type of metric queries** + +You may have to use `stdin` flag for several reasons +1. Quick way to check and validate a LogQL expressions. +2. Learn basics of LogQL with just Log files and `LogCLI`tool ( without needing set up Loki servers, Grafana etc.) +3. Easy discussion on public forums. Like Q&A, Share the LogQL expressions. + +**NOTES on Usage** +1. `--limits` flag doesn't have any meaning when using `--stdin` (use pager like `less` for that) +1. Be aware there are no **labels** when using `--stdin` + - So stream selector in the query is optional e.g just `|="timeout"|logfmt|level="error"` is same as `{foo="bar"}|="timeout|logfmt|level="error"` + +**Examples** +1. Line filter - `cat mylog.log | logcli query '|="too many open connections"'` +2. Label matcher - `echo 'msg="timeout happened" level="warning"' | logcli query '|logfmt|level="warning"'` +3. Different parsers (logfmt, json, pattern, regexp) - `cat mylog.log | logcli query '|pattern - - <_> " <_>" <_> "" <_>'` +4. Line formatters - `cat mylog.log | logcli query '|logfmt|line_format "{{.query}} {{.duration}}"'` From 17fd2ef9deefbd9eb0a996ba0fe5d7210150ff29 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Nov 2021 16:11:46 +0100 Subject: [PATCH 11/20] Remove filesampleiterator --- docs/sources/getting-started/logcli.md | 2 +- pkg/logcli/client/file.go | 72 +------------------------- 2 files changed, 2 insertions(+), 72 deletions(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 74293e0f14f7..fbecce9a5c89 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -437,7 +437,7 @@ You can consume log lines from your `stdin` instead of Loki servers. Say you have log files in your local, and just want to do run some LogQL queries for that, `--stdin` flag can help. -***NOTE: Currently it doesn't support any type of metric queries** +**NOTE: Currently it doesn't support any type of metric queries** You may have to use `stdin` flag for several reasons 1. Quick way to check and validate a LogQL expressions. diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 0f6308234bb6..b250de59b4d5 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -209,79 +209,9 @@ func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { return nil, fmt.Errorf("Metrics Query: %w", ErrNotSupported) - // expr, err := params.Expr() - // if err != nil { - // panic(err) - // } - - // sampleExtractor, err := expr.Extractor() - // if err != nil { - // panic(err) - // } - - // streamSample := sampleExtractor.ForStream(labels.Labels{ - // labels.Label{Name: "foo", Value: "bar"}, - // }) - - // it := NewFileSampleIterator(q.r, params.Start, params.End, "source:logcli", streamSample) - - // return it, nil } -// type FileSampleIterator struct { -// s *bufio.Scanner -// labels string -// err error -// sp logqllog.StreamSampleExtractor -// curr logproto.Sample -// start, end time.Time -// } - -// func NewFileSampleIterator(r io.Reader, start, end time.Time, labels string, sp logqllog.StreamSampleExtractor) *FileSampleIterator { -// s := bufio.NewScanner(r) -// s.Split(bufio.ScanLines) -// return &FileSampleIterator{ -// s: s, -// labels: labels, -// sp: sp, -// start: start, -// end: end, -// } -// } - -// func (f *FileSampleIterator) Next() bool { -// for f.s.Scan() { -// value, _, ok := f.sp.Process([]byte(f.s.Text())) -// ts := f.start.Add(2 * time.Minute) -// if ok { -// f.curr = logproto.Sample{ -// Timestamp: ts.UnixNano(), -// Value: value, -// Hash: xxhash.Sum64(f.s.Bytes()), -// } -// return true -// } -// } -// return false -// } - -// func (f *FileSampleIterator) Sample() logproto.Sample { -// return f.curr -// } - -// func (f *FileSampleIterator) Labels() string { -// return f.labels -// } - -// func (f *FileSampleIterator) Error() error { -// return f.err -// } - -// func (f *FileSampleIterator) Close() error { -// return nil -// } - -// this is the generated timestamp for each input log line based on start and end +// assignTimestamps assigns the generated timestamp for each input log line based on start and end // of the query. // start and end are unix timestamps in secs. func assignTimestamps(lines []string, start, end int64) map[string]int64 { From c7e2fff3cfc87d921a56717bd02fa55066e82788 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Nov 2021 16:38:05 +0100 Subject: [PATCH 12/20] Fix some typos and tests --- cmd/logcli/main.go | 6 ++-- pkg/logcli/client/file.go | 7 +--- pkg/logcli/client/file_test.go | 60 +++++++++++++++++++++++++++++++++- 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index af8069c7704f..86783e256871 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "log" "math" "net/url" @@ -152,9 +151,9 @@ func main() { } // When `--stdin` flag is set, stream selector is optional in the query. - // But logQL package through parser error if stream selector is not provided. + // But logQL package throw parser error if stream selector is not provided. // So we inject "dummy" stream selector if not provided by user already. - // Which brings down to two way of using LogQL query. + // Which brings down to two ways of using LogQL query under `--stdin`. // 1. Query with stream selector(e.g: `{foo="bar"}|="error"`) // 2. Query without stream selector (e.g: `|="error"`) @@ -190,7 +189,6 @@ func main() { if *tail || *follow { rangeQuery.TailQuery(time.Duration(*delayFor)*time.Second, queryClient, out) } else { - fmt.Println("Debug: Query string", rangeQuery.QueryString) rangeQuery.DoQuery(queryClient, out, *statistics) } case instantQueryCmd.FullCommand(): diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index b250de59b4d5..6afae8559374 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -261,8 +261,6 @@ func newFileIterator( Labels: labels.String(), } - // fmt.Println("directions", params.Direction) - // reverse all the input lines if direction == FORWARD if params.Direction == logproto.FORWARD { sort.Slice(lines, func(i, j int) bool { @@ -270,8 +268,6 @@ func newFileIterator( }) } - // timestamps := assignTimestamps(lines, params.Start.Unix(), params.End.Unix()) - for _, line := range lines { parsedLine, _, ok := pipeline.ProcessString(line) if !ok { @@ -279,8 +275,7 @@ func newFileIterator( } stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: time.Now(), - // Timestamp: time.Unix(timestamps[line], 0), - Line: parsedLine, + Line: parsedLine, }) } diff --git a/pkg/logcli/client/file_test.go b/pkg/logcli/client/file_test.go index 0fb34088e458..54fd5cc31b40 100644 --- a/pkg/logcli/client/file_test.go +++ b/pkg/logcli/client/file_test.go @@ -16,7 +16,6 @@ import ( ) func TestFileClient_QueryRangeLogQueries(t *testing.T) { - input := []string{ `level=info event="loki started" caller=main.go ts=1625995076`, `level=info event="runtime loader started" caller=main.go ts=1625995077`, @@ -82,13 +81,72 @@ func TestFileClient_QueryRangeLogQueries(t *testing.T) { require.NoError(t, err) require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) + assert.Equal(t, resp.Data.ResultType, loghttp.ResultTypeStream) assertStreams(t, resp.Data.Result, c.expected) }) } } func TestFileClient_Query(t *testing.T) { + input := []string{ + `level=info event="loki started" caller=main.go ts=1625995076`, + `level=info event="runtime loader started" caller=main.go ts=1625995077`, + `level=error event="unable to read rules directory" file="/tmp/rules" caller=rules.go ts=1625995090`, + `level=error event="failed to apply wal" error="/tmp/wal/ corrupted" caller=wal.go ts=1625996090`, + `level=info event="loki ready" caller=main.go ts=1625996095`, + } + + reversed := make([]string, len(input)) + copy(reversed, input) + sort.Slice(reversed, func(i, j int) bool { + return i > j + }) + now := time.Now() + + cases := []struct { + name string + limit int + ts time.Time + direction logproto.Direction + expectedStatus loghttp.QueryStatus + expected []string + }{ + { + name: "return-all-logs-backward", + limit: 10, // more than input + ts: now.Add(-1 * time.Hour), + direction: logproto.BACKWARD, + expectedStatus: loghttp.QueryStatusSuccess, + expected: input, + }, + { + name: "return-all-logs-forward", + limit: 10, // more than input + ts: now.Add(-1 * time.Hour), + direction: logproto.FORWARD, + expectedStatus: loghttp.QueryStatusSuccess, + expected: reversed, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + client := NewFileClient(io.NopCloser(strings.NewReader(strings.Join(input, "\n")))) + resp, err := client.Query( + `{foo="bar"}`, // label matcher doesn't matter. + c.limit, + c.ts, + c.direction, + true, + ) + + require.NoError(t, err) + require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) + assert.Equal(t, resp.Data.ResultType, loghttp.ResultTypeStream) + assertStreams(t, resp.Data.Result, c.expected) + }) + } } func TestFileClient_ListLabelNames(t *testing.T) { From d6118f8800ee7860523fe3c7660c683d5aadb85c Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Nov 2021 17:19:11 +0100 Subject: [PATCH 13/20] Fix breaking test cases Signed-off-by: Kaviraj --- pkg/logcli/client/file_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logcli/client/file_test.go b/pkg/logcli/client/file_test.go index 54fd5cc31b40..ed15bad151ce 100644 --- a/pkg/logcli/client/file_test.go +++ b/pkg/logcli/client/file_test.go @@ -81,7 +81,7 @@ func TestFileClient_QueryRangeLogQueries(t *testing.T) { require.NoError(t, err) require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) - assert.Equal(t, resp.Data.ResultType, loghttp.ResultTypeStream) + assert.Equal(t, string(resp.Data.ResultType), loghttp.ResultTypeStream) assertStreams(t, resp.Data.Result, c.expected) }) } @@ -143,7 +143,7 @@ func TestFileClient_Query(t *testing.T) { require.NoError(t, err) require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) - assert.Equal(t, resp.Data.ResultType, loghttp.ResultTypeStream) + assert.Equal(t, string(resp.Data.ResultType), loghttp.ResultTypeStream) assertStreams(t, resp.Data.Result, c.expected) }) } From 15e41f877ba57be06010756d2730c121d7d74c32 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Nov 2021 17:43:16 +0100 Subject: [PATCH 14/20] Make linter happy Signed-off-by: Kaviraj --- pkg/logcli/client/file.go | 29 +++-------------------------- pkg/logcli/client/file_test.go | 1 + 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 6afae8559374..467f15417600 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -11,12 +11,14 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" logqllog "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/util/marshal" + "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" ) @@ -211,28 +213,6 @@ func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa return nil, fmt.Errorf("Metrics Query: %w", ErrNotSupported) } -// assignTimestamps assigns the generated timestamp for each input log line based on start and end -// of the query. -// start and end are unix timestamps in secs. -func assignTimestamps(lines []string, start, end int64) map[string]int64 { - res := make(map[string]int64) - - n := int64(len(lines)) - - if end < start { - panic("`start` cannot be after `end`") - } - - step := (end - start) / n - - for i, line := range lines { - fmt.Println("line", line, "ts", (step*int64(i+1) + start)) - res[line] = (step * int64(i+1)) + start - } - - return res -} - func newFileIterator( ctx context.Context, r io.Reader, @@ -247,10 +227,7 @@ func newFileIterator( return nil, err } lines := strings.FieldsFunc(string(b), func(r rune) bool { - if r == '\n' { - return true - } - return false + return r == '\n' }) if len(lines) == 0 { diff --git a/pkg/logcli/client/file_test.go b/pkg/logcli/client/file_test.go index ed15bad151ce..61e006ac45cd 100644 --- a/pkg/logcli/client/file_test.go +++ b/pkg/logcli/client/file_test.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) From e8f1145dacf44e278ace1001e16408ca05155ef3 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 5 Nov 2021 12:09:15 +0100 Subject: [PATCH 15/20] PR remarks. 1. Add `--stdin` in examples of the usage 2. Add `--stdin` in all the command help output --- docs/sources/getting-started/logcli.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index fbecce9a5c89..40b433e67a42 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -120,6 +120,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. + --stdin Take input logs from stdin --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. @@ -275,6 +276,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. + --stdin Take input logs from stdin --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. @@ -346,6 +348,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. + --stdin Take input logs from stdin --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. @@ -402,6 +405,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. + --stdin Take input logs from stdin --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. @@ -450,7 +454,7 @@ You may have to use `stdin` flag for several reasons - So stream selector in the query is optional e.g just `|="timeout"|logfmt|level="error"` is same as `{foo="bar"}|="timeout|logfmt|level="error"` **Examples** -1. Line filter - `cat mylog.log | logcli query '|="too many open connections"'` -2. Label matcher - `echo 'msg="timeout happened" level="warning"' | logcli query '|logfmt|level="warning"'` -3. Different parsers (logfmt, json, pattern, regexp) - `cat mylog.log | logcli query '|pattern - - <_> " <_>" <_> "" <_>'` -4. Line formatters - `cat mylog.log | logcli query '|logfmt|line_format "{{.query}} {{.duration}}"'` +1. Line filter - `cat mylog.log | logcli --stdin query '|="too many open connections"'` +2. Label matcher - `echo 'msg="timeout happened" level="warning"' | logcli --stdin query '|logfmt|level="warning"'` +3. Different parsers (logfmt, json, pattern, regexp) - `cat mylog.log | logcli --stdin query '|pattern - - <_> " <_>" <_> "" <_>'` +4. Line formatters - `cat mylog.log | logcli --stdin query '|logfmt|line_format "{{.query}} {{.duration}}"'` From 327ece934bfb74d522faa5cc4878c27b2db68f42 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 18 Nov 2021 11:18:45 +0100 Subject: [PATCH 16/20] PR remarks - Use parsed labels correctly - Fix indendation with --stdin flag --- docs/sources/getting-started/logcli.md | 8 +++---- pkg/logcli/client/file.go | 30 ++++++++++++++++++++------ 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 40b433e67a42..458d3daa8dfe 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -120,7 +120,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --stdin Take input logs from stdin + --stdin Take input logs from stdin --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. @@ -276,7 +276,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --stdin Take input logs from stdin + --stdin Take input logs from stdin --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. @@ -348,7 +348,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --stdin Take input logs from stdin + --stdin Take input logs from stdin --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. @@ -405,7 +405,7 @@ Flags: timestamps [Local, UTC] --cpuprofile="" Specify the location for writing a CPU profile. --memprofile="" Specify the location for writing a memory profile. - --stdin Take input logs from stdin + --stdin Take input logs from stdin --addr="http://localhost:3100" Server address. Can also be set using LOKI_ADDR env var. diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 467f15417600..3a1ad943798d 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -234,9 +234,7 @@ func newFileIterator( return iter.NoopIterator, nil } - stream := logproto.Stream{ - Labels: labels.String(), - } + streams := map[uint64]*logproto.Stream{} // reverse all the input lines if direction == FORWARD if params.Direction == logproto.FORWARD { @@ -246,19 +244,39 @@ func newFileIterator( } for _, line := range lines { - parsedLine, _, ok := pipeline.ProcessString(line) + parsedLine, parsedLabels, ok := pipeline.ProcessString(line) if !ok { continue } + + var stream *logproto.Stream + lhash := parsedLabels.Hash() + if stream, ok = streams[lhash]; !ok { + stream = &logproto.Stream{ + Labels: parsedLabels.String(), + } + streams[lhash] = stream + } + stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: time.Now(), Line: parsedLine, }) } - return iter.NewHeapIterator( + if len(streams) == 0 { + return iter.NoopIterator, nil + } + + streamResult := make([]logproto.Stream, 0, len(streams)) + + for _, stream := range streams { + streamResult = append(streamResult, *stream) + } + + return iter.NewStreamsIterator( ctx, - []iter.EntryIterator{iter.NewStreamIterator(stream)}, + streamResult, params.Direction, ), nil } From 50721d2f2088835641335388c18dbf55d42e88f2 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 18 Nov 2021 11:35:34 +0100 Subject: [PATCH 17/20] Fix issue with direction --- cmd/logcli/main.go | 2 +- pkg/logcli/client/file.go | 21 ++++++++++++--------- pkg/logcli/client/file_test.go | 8 ++++---- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 86783e256871..6674bdaebb24 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -165,7 +165,7 @@ func main() { } // `--limit` doesn't make sense when using `--stdin` flag. - rangeQuery.Limit = math.MaxInt // TODO(kavi): is it a good idea? + rangeQuery.Limit = math.MaxInt64 // TODO(kavi): is it a good idea? } switch cmd { diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 3a1ad943798d..24305dcc1202 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -236,17 +236,10 @@ func newFileIterator( streams := map[uint64]*logproto.Stream{} - // reverse all the input lines if direction == FORWARD - if params.Direction == logproto.FORWARD { - sort.Slice(lines, func(i, j int) bool { - return i > j - }) - } - - for _, line := range lines { + processLine := func(line string) { parsedLine, parsedLabels, ok := pipeline.ProcessString(line) if !ok { - continue + return } var stream *logproto.Stream @@ -264,6 +257,16 @@ func newFileIterator( }) } + if params.Direction == logproto.FORWARD { + for _, line := range lines { + processLine(line) + } + } else { + for i := len(lines) - 1; i >= 0; i-- { + processLine(lines[i]) + } + } + if len(streams) == 0 { return iter.NoopIterator, nil } diff --git a/pkg/logcli/client/file_test.go b/pkg/logcli/client/file_test.go index 61e006ac45cd..1e5a2ab77c63 100644 --- a/pkg/logcli/client/file_test.go +++ b/pkg/logcli/client/file_test.go @@ -51,7 +51,7 @@ func TestFileClient_QueryRangeLogQueries(t *testing.T) { step: 0, // let client decide based on start and end interval: 0, expectedStatus: loghttp.QueryStatusSuccess, - expected: input, + expected: reversed, }, { name: "return-all-logs-forward", @@ -62,7 +62,7 @@ func TestFileClient_QueryRangeLogQueries(t *testing.T) { step: 0, // let the client decide based on start and end interval: 0, expectedStatus: loghttp.QueryStatusSuccess, - expected: reversed, + expected: input, }, } @@ -119,7 +119,7 @@ func TestFileClient_Query(t *testing.T) { ts: now.Add(-1 * time.Hour), direction: logproto.BACKWARD, expectedStatus: loghttp.QueryStatusSuccess, - expected: input, + expected: reversed, }, { name: "return-all-logs-forward", @@ -127,7 +127,7 @@ func TestFileClient_Query(t *testing.T) { ts: now.Add(-1 * time.Hour), direction: logproto.FORWARD, expectedStatus: loghttp.QueryStatusSuccess, - expected: reversed, + expected: input, }, } From afbdaf62f7e781f8154885d69e9f8e468f3bba89 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 18 Nov 2021 11:49:17 +0100 Subject: [PATCH 18/20] Fix linter --- pkg/logcli/client/file.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 24305dcc1202..059f4859c387 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -206,7 +206,7 @@ func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) if err != nil { return nil, fmt.Errorf("failed to extract pipeline for logs: %w", err) } - return newFileIterator(ctx, q.r, q.labels, params, pipeline.ForStream(q.labels)) + return newFileIterator(ctx, q.r, params, pipeline.ForStream(q.labels)) } func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { @@ -216,7 +216,6 @@ func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa func newFileIterator( ctx context.Context, r io.Reader, - labels labels.Labels, params logql.SelectLogParams, pipeline logqllog.StreamPipeline, ) (iter.EntryIterator, error) { From 6394e9d730df15952a8b5a33a9f0eb9398cad185 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 18 Nov 2021 13:08:11 +0100 Subject: [PATCH 19/20] MaxInt64 -> MaxInt (to support even arm32 images) --- cmd/logcli/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 6674bdaebb24..86783e256871 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -165,7 +165,7 @@ func main() { } // `--limit` doesn't make sense when using `--stdin` flag. - rangeQuery.Limit = math.MaxInt64 // TODO(kavi): is it a good idea? + rangeQuery.Limit = math.MaxInt // TODO(kavi): is it a good idea? } switch cmd { From f1caf37738957057a3648aa835f71333fbbadd20 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 18 Nov 2021 13:48:09 +0100 Subject: [PATCH 20/20] Add note on calculating `step` value on the client side --- cmd/logcli/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 86783e256871..2441913a6ee8 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -365,6 +365,9 @@ func mustParse(t string, defaultTime time.Time) time.Time { return ret } +// This method is to duplicate the same logic of `step` value from `start` and `end` +// done on the loki server side. +// https://github.com/grafana/loki/blob/main/pkg/loghttp/params.go func defaultQueryRangeStep(start, end time.Time) time.Duration { step := int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1)) return time.Duration(step) * time.Second