Skip to content

Commit

Permalink
Promtail: Fix retry/stop when erroring for out of cloudflare retentio…
Browse files Browse the repository at this point in the history
…n range (e.g. over 168 hours old) (#5698)

* fix: 5696 don't retry when out of range for cloudflare
  • Loading branch information
Paul Ryan authored Mar 29, 2022
1 parent 3bf2efa commit 4e3f3b7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Main
* [5696](https://github.com/grafana/loki/pull/5696) **paullryan** don't block scraping of new logs from cloudflare within promtail if an error is received from cloudflare about too early logs.
* [5685](https://github.com/grafana/loki/pull/5625) **chaudum** Fix bug in push request parser that allowed users to send arbitrary non-string data as "log line".
* [5707](https://github.com/grafana/loki/pull/5707) **franzwong** Promtail: Rename config name limit_config to limits_config.
* [5626](https://github.com/grafana/loki/pull/5626) **jeschkies** Support multi-tenant select logs and samples queries.
Expand Down
8 changes: 7 additions & 1 deletion clients/pkg/promtail/targets/cloudflare/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudflare
import (
"context"
"errors"
"regexp"
"strings"
"sync"
"time"
Expand All @@ -28,6 +29,8 @@ import (
// The minimun window size is 1 minute.
const minDelay = time.Minute

var cloudflareTooEarlyError = regexp.MustCompile(`too early: logs older than \S+ are not available`)

var defaultBackoff = backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
Expand Down Expand Up @@ -151,7 +154,10 @@ func (t *Target) pull(ctx context.Context, start, end time.Time) error {

for backoff.Ongoing() {
it, err = t.client.LogpullReceived(ctx, start, end)
if err != nil {
if err != nil && cloudflareTooEarlyError.MatchString(err.Error()) {
level.Warn(t.logger).Log("msg", "failed iterating over logs, out of cloudflare range, not retrying", "err", err, "start", start, "end", end, "retries", backoff.NumRetries())
return nil
} else if err != nil {
errs.Add(err)
backoff.Wait()
continue
Expand Down
52 changes: 52 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,58 @@ func Test_CloudflareTargetError(t *testing.T) {
require.Equal(t, newEnd, end.UnixNano())
}

func Test_CloudflareTargetError168h(t *testing.T) {
var (
w = log.NewSyncWriter(os.Stderr)
logger = log.NewLogfmtLogger(w)
cfg = &scrapeconfig.CloudflareConfig{
APIToken: "foo",
ZoneID: "bar",
Labels: model.LabelSet{"job": "cloudflare"},
PullRange: model.Duration(time.Minute),
}
end = time.Unix(0, time.Hour.Nanoseconds())
client = fake.New(func() {})
cfClient = newFakeCloudflareClient()
)
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: t.TempDir() + "/positions.yml",
})
// retries as fast as possible.
defaultBackoff.MinBackoff = 0
defaultBackoff.MaxBackoff = 0

// set our end time to be the last time we have a position
ps.Put(positions.CursorKey(cfg.ZoneID), end.UnixNano())
require.NoError(t, err)

// setup errors for all retries
cfClient.On("LogpullReceived", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("HTTP status 400: bad query: error parsing time: invalid time range: too early: logs older than 168h0m0s are not available"))
// replace the client.
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
return cfClient, nil
}

ta, err := NewTarget(NewMetrics(prometheus.NewRegistry()), logger, client, ps, cfg)
require.NoError(t, err)
require.True(t, ta.Ready())

// wait for the target to be stopped.
require.Eventually(t, func() bool {
return cfClient.CallCount() >= 5
}, 5*time.Second, 100*time.Millisecond)

require.Len(t, client.Received(), 0)
require.GreaterOrEqual(t, cfClient.CallCount(), 5)
ta.Stop()
ps.Stop()

// Make sure we move on from the save the last position.
newEnd, _ := ps.Get(positions.CursorKey(cfg.ZoneID))
require.Greater(t, newEnd, end.UnixNano())
}

func Test_validateConfig(t *testing.T) {
tests := []struct {
in *scrapeconfig.CloudflareConfig
Expand Down

0 comments on commit 4e3f3b7

Please sign in to comment.