diff --git a/docs/sources/operations/storage/wal.md b/docs/sources/operations/storage/wal.md index 700dc65eec27..0dd5ffa0a380 100644 --- a/docs/sources/operations/storage/wal.md +++ b/docs/sources/operations/storage/wal.md @@ -4,11 +4,29 @@ title: Write Ahead Log # Write Ahead Log (WAL) +Ingesters temporarily store data in memory. In the event of a crash, there could be data loss. The WAL helps fill this gap in reliability. -Ingesters store all their data in memory. If there is a crash, there can be data loss. The WAL helps fill this gap in reliability. -This section will use Kubernetes as a reference. +The WAL in Loki records incoming data and stores it on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash. Upon restart, Loki will "replay" all of the data in the log before registering itself as ready for subsequent writes. This allows Loki to maintain the performance & cost benefits of buffering data in memory _and_ durability benefits (it won't lose data once a write has been acknowledged). -To use the WAL, there are some changes that needs to be made. +This section will use Kubernetes as a reference deployment paradigm in the examples. + +## Disclaimer & WAL nuances + +The Write Ahead Log in Loki takes a few particular tradeoffs compared to other WALs you may be familiar with. The WAL aims to add additional durability guarantees, but _not at the expense of availability_. Particularly, there are two scenarios where the WAL sacrifices these guarantees. + +1) Corruption/Deletion of the WAL prior to replaying it + +In the event the WAL is corrupted/partially deleted, Loki will not be able to recover all of it's data. In this case, Loki will attempt to recover any data it can, but will not prevent Loki from starting. + +Note: the Prometheus metric `loki_ingester_wal_corruptions_total` can be used to track and alert when this happens. + +1) No space left on disk + +In the event the underlying WAL disk is full, Loki will not fail incoming writes, but neither will it log them to the WAL. In this case, the persistence guarantees across process restarts will not hold. + +Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be used to track and alert when this happens. + +### Metrics ## Changes to deployment diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 24509c708825..2d2648682791 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -38,12 +38,7 @@ func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time, require.Len(t, result.resps[0].Streams[1].Entries, ln) } -func TestIngesterWAL(t *testing.T) { - - walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") - require.Nil(t, err) - defer os.RemoveAll(walDir) - +func defaultIngesterTestConfigWithWAL(t *testing.T, walDir string) Config { ingesterConfig := defaultIngesterTestConfig(t) ingesterConfig.MaxTransferRetries = 0 ingesterConfig.WAL = WALConfig{ @@ -52,6 +47,18 @@ func TestIngesterWAL(t *testing.T) { Recover: true, CheckpointDuration: time.Second, } + + return ingesterConfig +} + +func TestIngesterWAL(t *testing.T) { + + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) @@ -134,14 +141,8 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { require.Nil(t, err) defer os.RemoveAll(walDir) - ingesterConfig := defaultIngesterTestConfig(t) - ingesterConfig.MaxTransferRetries = 0 - ingesterConfig.WAL = WALConfig{ - Enabled: true, - Dir: walDir, - Recover: true, - CheckpointDuration: time.Second, - } + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 9a92502a774c..3255f50f7953 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -2,8 +2,11 @@ package ingester import ( "fmt" + "io/ioutil" + "os" "sort" "sync" + "syscall" "testing" "time" @@ -44,7 +47,7 @@ func TestChunkFlushingIdle(t *testing.T) { cfg.MaxChunkIdle = 100 * time.Millisecond cfg.RetainPeriod = 500 * time.Millisecond - store, ing := newTestStore(t, cfg) + store, ing := newTestStore(t, cfg, nil) defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck testData := pushTestSamples(t, ing) @@ -54,7 +57,25 @@ func TestChunkFlushingIdle(t *testing.T) { } func TestChunkFlushingShutdown(t *testing.T) { - store, ing := newTestStore(t, defaultIngesterTestConfig(t)) + store, ing := newTestStore(t, defaultIngesterTestConfig(t), nil) + testData := pushTestSamples(t, ing) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) + store.checkData(t, testData) +} + +type fullWAL struct{} + +func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} } +func (fullWAL) Stop() error { return nil } + +func TestWALFullFlush(t *testing.T) { + // technically replaced with a fake wal, but the ingester New() function creates a regular wal first, + // so we enable creation/cleanup even though it remains unused. + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + store, ing := newTestStore(t, defaultIngesterTestConfigWithWAL(t, walDir), fullWAL{}) testData := pushTestSamples(t, ing) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) store.checkData(t, testData) @@ -66,7 +87,7 @@ func TestFlushingCollidingLabels(t *testing.T) { cfg.MaxChunkIdle = 100 * time.Millisecond cfg.RetainPeriod = 500 * time.Millisecond - store, ing := newTestStore(t, cfg) + store, ing := newTestStore(t, cfg, nil) defer store.Stop() const userID = "testUser" @@ -112,7 +133,7 @@ func TestFlushMaxAge(t *testing.T) { cfg.MaxChunkAge = time.Minute cfg.MaxChunkIdle = time.Hour - store, ing := newTestStore(t, cfg) + store, ing := newTestStore(t, cfg, nil) defer store.Stop() now := time.Unix(0, 0) @@ -166,7 +187,10 @@ type testStore struct { chunks map[string][]chunk.Chunk } -func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) { +// Note: the ingester New() function creates it's own WAL first which we then override if specified. +// Because of this, ensure any WAL directories exist/are cleaned up even when overriding the wal. +// This is an ugly hook for testing :( +func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, *Ingester) { store := &testStore{ chunks: map[string][]chunk.Chunk{}, } @@ -178,6 +202,11 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) { require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + if walOverride != nil { + _ = ing.wal.Stop() + ing.wal = walOverride + } + return store, ing } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index fabf7d47decd..4d2fe2a6f932 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -146,6 +146,10 @@ type Ingester struct { limiter *Limiter + // Denotes whether the ingester should flush on shutdown. + // Currently only used by the WAL to signal when the disk is full. + flushOnShutdownSwitch *OnceSwitch + metrics *ingesterMetrics wal WAL @@ -169,15 +173,16 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid metrics := newIngesterMetrics(registerer) i := &Ingester{ - cfg: cfg, - clientConfig: clientConfig, - instances: map[string]*instance{}, - store: store, - periodicConfigs: store.GetSchemaConfigs(), - loopQuit: make(chan struct{}), - flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), - tailersQuit: make(chan struct{}), - metrics: metrics, + cfg: cfg, + clientConfig: clientConfig, + instances: map[string]*instance{}, + store: store, + periodicConfigs: store.GetSchemaConfigs(), + loopQuit: make(chan struct{}), + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), + tailersQuit: make(chan struct{}), + metrics: metrics, + flushOnShutdownSwitch: &OnceSwitch{}, } if cfg.WAL.Enabled { @@ -319,6 +324,10 @@ func (i *Ingester) stopping(_ error) error { i.stopIncomingRequests() var errs errUtil.MultiError errs.Add(i.wal.Stop()) + + if i.flushOnShutdownSwitch.Get() { + i.lifecycler.SetFlushOnShutdown(true) + } errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler)) // Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails, @@ -384,7 +393,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { defer i.instancesMtx.Unlock() inst, ok = i.instances[instanceID] if !ok { - inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics) + inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch) i.instances[instanceID] = inst } return inst diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index dfe43aaccfc0..2cbdd1182805 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -3,8 +3,11 @@ package ingester import ( "context" "net/http" + "os" "sync" + "syscall" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -15,6 +18,7 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ingester/index" + "github.com/cortexproject/cortex/pkg/util" cutil "github.com/cortexproject/cortex/pkg/util" "github.com/grafana/loki/pkg/helpers" @@ -77,6 +81,10 @@ type instance struct { wal WAL + // Denotes whether the ingester should flush on shutdown. + // Currently only used by the WAL to signal when the disk is full. + flushOnShutdownSwitch *OnceSwitch + metrics *ingesterMetrics } @@ -86,6 +94,7 @@ func newInstance( limiter *Limiter, wal WAL, metrics *ingesterMetrics, + flushOnShutdownSwitch *OnceSwitch, ) *instance { i := &instance{ cfg: cfg, @@ -101,8 +110,9 @@ func newInstance( tailers: map[uint32]*tailer{}, limiter: limiter, - wal: wal, - metrics: metrics, + wal: wal, + metrics: metrics, + flushOnShutdownSwitch: flushOnShutdownSwitch, } i.mapper = newFPMapper(i.getLabelsFromFingerprint) return i @@ -161,8 +171,19 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { if !record.IsEmpty() { if err := i.wal.Log(record); err != nil { - return err + if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC { + i.metrics.walDiskFullFailures.Inc() + i.flushOnShutdownSwitch.TriggerAnd(func() { + level.Error(util.Logger).Log( + "msg", + "Error writing to WAL, disk full, no further messages will be logged for this error", + ) + }) + } else { + return err + } } + } return appendErr @@ -578,3 +599,33 @@ func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool { } return false } + +// OnceSwitch is a write optimized switch that can only ever be switched "on". +// It uses a RWMutex underneath the hood to quickly and effectively (in a concurrent environment) +// check if the switch has already been triggered, only actually acquiring the mutex for writing if not. +type OnceSwitch struct { + sync.RWMutex + toggle bool +} + +func (o *OnceSwitch) Get() bool { + o.RLock() + defer o.RUnlock() + return o.toggle +} + +// TriggerAnd will ensure the switch is on and run the provided function if +// the switch was not already toggled on. +func (o *OnceSwitch) TriggerAnd(fn func()) { + o.RLock() + if o.toggle { + o.RUnlock() + return + } + + o.RUnlock() + o.Lock() + o.toggle = true + o.Unlock() + fn() +} diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 6613b6250ac0..f456a6285777 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -35,7 +35,7 @@ func TestLabelsCollisions(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil) + i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil, &OnceSwitch{}) // avoid entries from the future. tt := time.Now().Add(-5 * time.Minute) @@ -62,7 +62,7 @@ func TestConcurrentPushes(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics) + inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) const ( concurrent = 10 @@ -120,7 +120,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics) + inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) lbls := makeRandomLabels() tt := time.Now() @@ -160,7 +160,7 @@ func Test_SeriesQuery(t *testing.T) { cfg.SyncPeriod = 1 * time.Minute cfg.SyncMinUtilization = 0.20 - instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics) + instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) currentTime := time.Now() @@ -271,7 +271,7 @@ func Benchmark_PushInstance(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics) + i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) ctx := context.Background() for n := 0; n < b.N; n++ { @@ -313,7 +313,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { ctx := context.Background() - inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics) + inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil) require.NoError(b, err) for i := 0; i < 10000; i++ { diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index e9bdefdb9c2c..b907612c6f0c 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -13,6 +13,7 @@ type ingesterMetrics struct { checkpointDuration prometheus.Summary checkpointLoggedBytesTotal prometheus.Counter + walDiskFullFailures prometheus.Counter walReplayDuration prometheus.Gauge walCorruptionsTotal *prometheus.CounterVec walLoggedBytesTotal prometheus.Counter @@ -30,6 +31,10 @@ const ( func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { return &ingesterMetrics{ + walDiskFullFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_disk_full_failures_total", + Help: "Total number of wal write failures due to full disk.", + }), walReplayDuration: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Name: "loki_ingester_wal_replay_duration_seconds", Help: "Time taken to replay the checkpoint and the WAL.", diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index 7e0e1b7271d0..0d33bdef87a9 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -165,7 +165,7 @@ func (f *testIngesterFactory) getIngester(joinAfter time.Duration, t *testing.T) }, nil } - _, ing := newTestStore(f.t, cfg) + _, ing := newTestStore(f.t, cfg, nil) f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing // NB there's some kind of race condition with the in-memory KV client when