Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: Add handler timeout for GCP Logs Push target #7401

Merged
merged 7 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [5977](https://github.com/grafana/loki/pull/5977) **juissi-t** lambda-promtail: Add support for Kinesis data stream events
* [6828](https://github.com/grafana/loki/pull/6828) **alexandre1984rj** Add the BotScore and BotScoreSrc fields once the Cloudflare API returns those two fields on the list of all available log fields.
* [6656](https://github.com/grafana/loki/pull/6656) **carlospeon**: Allow promtail to add matches to the journal reader
* [7401](https://github.com/grafana/loki/pull/7401) **thepalbi**: Add timeout to GCP Logs push target

##### Fixes
* [6766](https://github.com/grafana/loki/pull/6766) **kavirajk**: fix(logql): Make `LabelSampleExtractor` ignore processing the line if it doesn't contain that specific label. Fixes unwrap behavior explained in the issue https://github.com/grafana/loki/issues/6713
Expand Down
3 changes: 3 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ type GcplogTargetConfig struct {
// Defaults to `pull` for backwards compatibility reasons.
SubscriptionType string `yaml:"subscription_type"`

// PushTimeout is used to set a maximum processing time for each incoming GCP Logs entry. Used just for `push` subscription type.
PushTimeout time.Duration `yaml:"push_timeout"`

// Server is the weaveworks server config for listening connections. Used just for `push` subscription type.
Server server.Config `yaml:"server"`
}
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/gcplog/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {

m.gcpPushErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "gcp_push_target_parsing_errors_total",
Name: "gcp_push_target_errors_total",
Help: "Number of parsing errors while receiving GCP Push messages",
}, []string{"reason"})

Expand Down
37 changes: 35 additions & 2 deletions clients/pkg/promtail/targets/gcplog/push_target.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gcplog

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -51,6 +52,8 @@ func newPushTarget(metrics *Metrics, logger log.Logger, handler api.EntryHandler
return nil, fmt.Errorf("failed to parse configs and override defaults when configuring gcp push target: %w", err)
}
config.Server = mergedServerConfigs
// Avoid logging entire received request on failures
config.Server.ExcludeRequestInLog = true

err = ht.run()
if err != nil {
Expand Down Expand Up @@ -83,8 +86,8 @@ func (h *pushTarget) run() error {
if err != nil {
return err
}

h.server = srv

h.server.HTTP.Path("/gcp/api/v1/push").Methods("POST").Handler(http.HandlerFunc(h.push))

go func() {
Expand All @@ -100,6 +103,14 @@ func (h *pushTarget) run() error {
func (h *pushTarget) push(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

// Create no-op context.WithTimeout returns to simplify logic
ctx := r.Context()
cancel := context.CancelFunc(func() {})
if h.config.PushTimeout != 0 {
ctx, cancel = context.WithTimeout(r.Context(), h.config.PushTimeout)
}
defer cancel()

pushMessage := PushMessage{}
bs, err := io.ReadAll(r.Body)
if err != nil {
Expand Down Expand Up @@ -132,11 +143,33 @@ func (h *pushTarget) push(w http.ResponseWriter, r *http.Request) {

level.Debug(h.logger).Log("msg", fmt.Sprintf("Received line: %s", entry.Line))

h.entries <- entry
if err := h.doSendEntry(ctx, entry); err != nil {
h.metrics.gcpPushErrors.WithLabelValues("timeout").Inc()
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
level.Warn(h.logger).Log("msg", "error sending log entry", "err", err.Error())
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}

h.metrics.gcpPushEntries.WithLabelValues().Inc()
w.WriteHeader(http.StatusNoContent)
}

func (h *pushTarget) doSendEntry(ctx context.Context, entry api.Entry) error {
// Since this setting is configured at the target level, it will be ignored for follow-up request due to branch prediction
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
if h.config.PushTimeout != 0 {
select {
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
// Timeout the api.Entry channel send operation, which is the only blocking operation in the handler
case <-ctx.Done():
return fmt.Errorf("timeout exceeded")
case h.entries <- entry:
}
} else {
// No timeout configured, no limit
h.entries <- entry
}
return nil
}

func (h *pushTarget) Type() target.TargetType {
return target.GcplogTargetType
}
Expand Down
64 changes: 64 additions & 0 deletions clients/pkg/promtail/targets/gcplog/push_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"net/http"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/grafana/loki/clients/pkg/promtail/api"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -371,6 +374,67 @@ func TestPushTarget_ErroneousPayloadsAreRejected(t *testing.T) {
}
}

// blockingEntryHandler implements an api.EntryHandler that has no space in it's receive channel, blocking when an api.Entry
// is sent down the pipe.
type blockingEntryHandler struct {
ch chan api.Entry
once sync.Once
}

func newBlockingEntryHandler() *blockingEntryHandler {
filledChannel := make(chan api.Entry)
return &blockingEntryHandler{ch: filledChannel}
}

func (t *blockingEntryHandler) Chan() chan<- api.Entry {
return t.ch
}

func (t *blockingEntryHandler) Stop() {
t.once.Do(func() { close(t.ch) })
}

func TestPushTarget_UsePushTimeout(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

eh := newBlockingEntryHandler()
defer eh.Stop()

serverConfig, port, err := getServerConfigWithAvailablePort()
require.NoError(t, err, "error generating server config or finding open port")
config := &scrapeconfig.GcplogTargetConfig{
Server: serverConfig,
Labels: nil,
UseIncomingTimestamp: true,
SubscriptionType: "push",
PushTimeout: time.Second,
}

prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := gcplog.NewMetrics(prometheus.DefaultRegisterer)
tenantIDRelabelConfig := []*relabel.Config{
{
SourceLabels: model.LabelNames{"__tenant_id__"},
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "$1",
TargetLabel: "tenant_id",
Action: relabel.Replace,
},
}
pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, tenantIDRelabelConfig, t.Name()+"_test_job", config)
require.NoError(t, err)
defer func() {
_ = pt.Stop()
}()

req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)
require.NoError(t, err, "expected request to be created successfully")
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusServiceUnavailable, res.StatusCode, "expected timeout response")
}

func waitForMessages(eh *fake.Client) {
countdown := 1000
for len(eh.Received()) != 1 && countdown > 0 {
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/clients/promtail/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,11 @@ When using the `push` subscription type, keep in mind:
# timestamp to the log when it was processed.
[use_incoming_timestamp: <boolean> | default = false]

# If the subscription_type is push, configures an HTTP handler timeout. If processing the incoming GCP Logs request takes longer
# than the configured duration, that is processing and then sending the entry down the processing pipeline, the server will abort
# and respond with a 503 HTTP status code.
[push_timeout: <duration>| default = 0 (no timeout)]

# Label map to add to every log message.
labels:
[ <labelname>: <labelvalue> ... ]
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ The name of this metric was changed to `loki_internal_log_messages_total` to red

### Promtail

### `gcp_push_target_parsing_errors_total` has been renamed to `gcp_push_target_errors_total`

The `gcp_push_target_parsing_errors_total` GCP Push Target metric has been renamed to `gcp_push_target_errors_total`, to track error causes such as timeouts.

#### `gcp_push_target_parsing_errors_total` has a new `reason` label

The `gcp_push_target_parsing_errors_total` GCP Push Target metrics has been added a new label named `reason`. This includes detail on what might have caused the parsing to fail.
Expand Down