Skip to content

Commit

Permalink
Update /loki/api/v1/push to use the v1 json format (#1145)
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott authored and rfratto committed Oct 15, 2019
1 parent 81f9786 commit bb2b925
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 17 deletions.
65 changes: 53 additions & 12 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The HTTP API includes the following endpoints:
- [`POST /loki/api/v1/push`](#post-lokiapiv1push)
- [`GET /api/prom/tail`](#get-apipromtail)
- [`GET /api/prom/query`](#get-apipromquery)
- [`POST /api/prom/push`](#post-apiprompush)
- [`GET /ready`](#get-ready)
- [`POST /flush`](#post-flush)
- [`GET /metrics`](#get-metrics)
Expand Down Expand Up @@ -445,8 +446,6 @@ Response (streamed):

## `POST /loki/api/v1/push`

Alias (DEPRECATED): `POST /api/prom/push`

`/loki/api/v1/push` is the endpoint used to send log entries to Loki. The default
behavior is for the POST body to be a snappy-compressed protobuf messsage:

Expand All @@ -460,12 +459,12 @@ JSON post body can be sent in the following format:
{
"streams": [
{
"labels": "<LogQL label key-value pairs>",
"entries": [
{
"ts": "<RFC3339Nano string>",
"line": "<log line>"
}
"stream": {
"label": "value"
},
"values": [
[ "<unix epoch in nanoseconds>", "<log line>" ],
[ "<unix epoch in nanoseconds>", "<log line>" ]
]
}
]
Expand All @@ -482,8 +481,8 @@ In microservices mode, `/loki/api/v1/push` is exposed by the distributor.
### Examples

```bash
$ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/loki/api/v1/push" --data-raw \
'{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}'
$ curl -v -H "Content-Type: application/json" -XPOST -s "http://localhost:3100/loki/api/v1/push" --data-raw \
'{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}'
```

## `GET /api/prom/tail`
Expand Down Expand Up @@ -535,8 +534,6 @@ and `Labels` instead of `labels` and `ts` like in the entries for the stream.
As the response is streamed, the object defined by the response format above
will be sent over the WebSocket multiple times.



## `GET /api/prom/query`

> **WARNING**: `/api/prom/query` is DEPRECATED; use `/loki/api/v1/query_range`
Expand Down Expand Up @@ -607,6 +604,50 @@ $ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/lok
'{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}'
```

## `POST /api/prom/push`

> **WARNING**: `/api/prom/push` is DEPRECATED; use `/loki/api/v1/push`
> instead.
`/api/prom/push` is the endpoint used to send log entries to Loki. The default
behavior is for the POST body to be a snappy-compressed protobuf messsage:

- [Protobuf definition](/pkg/logproto/logproto.proto)
- [Go client library](/pkg/promtail/client/client.go)

Alternatively, if the `Content-Type` header is set to `application/json`, a
JSON post body can be sent in the following format:

```
{
"streams": [
{
"labels": "<LogQL label key-value pairs>",
"entries": [
{
"ts": "<RFC3339Nano string>",
"line": "<log line>"
}
]
}
]
}
```

> **NOTE**: logs sent to Loki for every stream must be in timestamp-ascending
> order, meaning each log line must be more recent than the one last received.
> If logs do not follow this order, Loki will reject the log with an out of
> order error.
In microservices mode, `/api/prom/push` is exposed by the distributor.

### Examples

```bash
$ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/api/prom/push" --data-raw \
'{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}'
```

## `GET /ready`

`/ready` returns HTTP 200 when the Loki ingester is ready to accept traffic. If
Expand Down
2 changes: 1 addition & 1 deletion fluentd/fluent-plugin-grafana-loki/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ services:
## Configuration

### url
The url of the Loki server to send logs to. When sending data the publish path (`/loki/api/v1/push`) will automatically be appended.
The url of the Loki server to send logs to. When sending data the publish path (`/api/prom/push`) will automatically be appended.
By default the url is set to `https://logs-us-west1.grafana.net`, the url of the Grafana Labs preview (hosted Loki)[https://grafana.com/loki] service.

#### Proxy Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.push File.expand_path('lib', __dir__)

Gem::Specification.new do |spec|
spec.name = 'fluent-plugin-grafana-loki'
spec.version = '1.1.0'
spec.version = '1.0.2'
spec.authors = %w[woodsaj briangann]
spec.email = ['awoods@grafana.com', 'brian@grafana.com']

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def write(chunk)
body = { 'streams': payload }

# add ingest path to loki url
uri = URI.parse(url + '/loki/api/v1/push')
uri = URI.parse(url + '/api/prom/push')

req = Net::HTTP::Post.new(
uri.request_uri
Expand Down
14 changes: 12 additions & 2 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package distributor

import (
"encoding/json"
"net/http"

"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/unmarshal"
unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy"
)

var contentType = http.CanonicalHeaderKey("Content-Type")
Expand All @@ -21,7 +23,15 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

switch r.Header.Get(contentType) {
case applicationJSON:
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
var err error

if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(r.Body, &req)
} else {
err = unmarshal_legacy.DecodePushRequest(r.Body, &req)
}

if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type QueryResponse struct {
Data QueryResponseData `json:"data"`
}

// PushRequest models a log stream push
type PushRequest struct {
Streams []*Stream `json:"streams"`
}

// ResultType holds the type of the result
type ResultType string

Expand Down
13 changes: 13 additions & 0 deletions pkg/logql/unmarshal/legacy/unmarshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package unmarshal

import (
"encoding/json"
"io"

"github.com/grafana/loki/pkg/logproto"
)

// DecodePushRequest directly decodes json to a logproto.PushRequest
func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error {
return json.NewDecoder(b).Decode(r)
}
67 changes: 67 additions & 0 deletions pkg/logql/unmarshal/legacy/unmarshal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package unmarshal

import (
"io/ioutil"
"log"
"strings"
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

// covers requests to /api/prom/push
var pushTests = []struct {
expected []*logproto.Stream
actual string
}{
{
[]*logproto.Stream{
{
Entries: []logproto.Entry{
{
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:22.380001319Z"),
Line: "super line",
},
},
Labels: `{test="test"}`,
},
},
`{
"streams":[
{
"labels":"{test=\"test\"}",
"entries":[
{
"ts": "2019-09-13T18:32:22.380001319Z",
"line": "super line"
}
]
}
]
}`,
},
}

func Test_DecodePushRequest(t *testing.T) {

for i, pushTest := range pushTests {
var actual logproto.PushRequest
closer := ioutil.NopCloser(strings.NewReader(pushTest.actual))

err := DecodePushRequest(closer, &actual)
require.NoError(t, err)

require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i)
}
}

func mustParse(l string, t string) time.Time {
ret, err := time.Parse(l, t)
if err != nil {
log.Fatalf("Failed to parse %s", t)
}

return ret
}
60 changes: 60 additions & 0 deletions pkg/logql/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package unmarshal

import (
"encoding/json"
"io"

"github.com/grafana/loki/pkg/loghttp"

"github.com/grafana/loki/pkg/logproto"
)

// DecodePushRequest directly decodes json to a logproto.PushRequest
func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error {
var request loghttp.PushRequest

err := json.NewDecoder(b).Decode(&request)

if err != nil {
return err
}

*r = NewPushRequest(request)

return nil
}

// NewPushRequest constructs a logproto.PushRequest from a PushRequest
func NewPushRequest(r loghttp.PushRequest) logproto.PushRequest {
ret := logproto.PushRequest{
Streams: make([]*logproto.Stream, len(r.Streams)),
}

for i, s := range r.Streams {
ret.Streams[i] = NewStream(s)
}

return ret
}

// NewStream constructs a logproto.Stream from a Stream
func NewStream(s *loghttp.Stream) *logproto.Stream {
ret := &logproto.Stream{
Entries: make([]logproto.Entry, len(s.Entries)),
Labels: s.Labels.String(),
}

for i, e := range s.Entries {
ret.Entries[i] = NewEntry(e)
}

return ret
}

// NewEntry constructs a logproto.Entry from a Entry
func NewEntry(e loghttp.Entry) logproto.Entry {
return logproto.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
}
}
56 changes: 56 additions & 0 deletions pkg/logql/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package unmarshal

import (
"io/ioutil"
"strings"
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

// covers requests to /loki/api/v1/push
var pushTests = []struct {
expected []*logproto.Stream
actual string
}{
{
[]*logproto.Stream{
{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
},
Labels: `{test="test"}`,
},
},
`{
"streams": [
{
"stream": {
"test": "test"
},
"values":[
[ "123456789012345", "super line" ]
]
}
]
}`,
},
}

func Test_DecodePushRequest(t *testing.T) {

for i, pushTest := range pushTests {
var actual logproto.PushRequest
closer := ioutil.NopCloser(strings.NewReader(pushTest.actual))

err := DecodePushRequest(closer, &actual)
require.NoError(t, err)

require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i)
}
}

0 comments on commit bb2b925

Please sign in to comment.