Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: Fix retry/stop when erroring for out of cloudflare retention range (e.g. over 168 hours old) #5698

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Main
* [5696](https://github.com/grafana/loki/pull/5696) **paullryan** don't block scraping of new logs from cloudflare within promtail if an error is received from cloudflare about too early logs.
* [5685](https://github.com/grafana/loki/pull/5625) **chaudum** Fix bug in push request parser that allowed users to send arbitrary non-string data as "log line".
* [5707](https://github.com/grafana/loki/pull/5707) **franzwong** Promtail: Rename config name limit_config to limits_config.
* [5626](https://github.com/grafana/loki/pull/5626) **jeschkies** Support multi-tenant select logs and samples queries.
Expand Down
8 changes: 7 additions & 1 deletion clients/pkg/promtail/targets/cloudflare/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudflare
import (
"context"
"errors"
"regexp"
"strings"
"sync"
"time"
Expand All @@ -28,6 +29,8 @@ import (
// The minimun window size is 1 minute.
const minDelay = time.Minute

var cloudflareTooEarlyError = regexp.MustCompile(`too early: logs older than \S+ are not available`)

var defaultBackoff = backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
Expand Down Expand Up @@ -151,7 +154,10 @@ func (t *Target) pull(ctx context.Context, start, end time.Time) error {

for backoff.Ongoing() {
it, err = t.client.LogpullReceived(ctx, start, end)
if err != nil {
if err != nil && cloudflareTooEarlyError.MatchString(err.Error()) {
level.Warn(t.logger).Log("msg", "failed iterating over logs, out of cloudflare range, not retrying", "err", err, "start", start, "end", end, "retries", backoff.NumRetries())
return nil
} else if err != nil {
errs.Add(err)
backoff.Wait()
continue
Expand Down
52 changes: 52 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,58 @@ func Test_CloudflareTargetError(t *testing.T) {
require.Equal(t, newEnd, end.UnixNano())
}

func Test_CloudflareTargetError168h(t *testing.T) {
var (
w = log.NewSyncWriter(os.Stderr)
logger = log.NewLogfmtLogger(w)
cfg = &scrapeconfig.CloudflareConfig{
APIToken: "foo",
ZoneID: "bar",
Labels: model.LabelSet{"job": "cloudflare"},
PullRange: model.Duration(time.Minute),
}
end = time.Unix(0, time.Hour.Nanoseconds())
client = fake.New(func() {})
cfClient = newFakeCloudflareClient()
)
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: t.TempDir() + "/positions.yml",
})
// retries as fast as possible.
defaultBackoff.MinBackoff = 0
defaultBackoff.MaxBackoff = 0

// set our end time to be the last time we have a position
ps.Put(positions.CursorKey(cfg.ZoneID), end.UnixNano())
require.NoError(t, err)

// setup errors for all retries
cfClient.On("LogpullReceived", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("HTTP status 400: bad query: error parsing time: invalid time range: too early: logs older than 168h0m0s are not available"))
// replace the client.
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
return cfClient, nil
}

ta, err := NewTarget(NewMetrics(prometheus.NewRegistry()), logger, client, ps, cfg)
require.NoError(t, err)
require.True(t, ta.Ready())

// wait for the target to be stopped.
require.Eventually(t, func() bool {
return cfClient.CallCount() >= 5
}, 5*time.Second, 100*time.Millisecond)

require.Len(t, client.Received(), 0)
require.GreaterOrEqual(t, cfClient.CallCount(), 5)
ta.Stop()
ps.Stop()

// Make sure we move on from the save the last position.
newEnd, _ := ps.Get(positions.CursorKey(cfg.ZoneID))
require.Greater(t, newEnd, end.UnixNano())
}

func Test_validateConfig(t *testing.T) {
tests := []struct {
in *scrapeconfig.CloudflareConfig
Expand Down