Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruler: Recording Rules #3766

Merged
merged 41 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7d5942c
WIP: hack to get recording rules working and pushing to Cortex/Promet…
Mar 7, 2021
2215f7b
Refactoring
Mar 7, 2021
1e2d782
Merge remote-tracking branch 'upstream/master' into dannykopping/reco…
Mar 12, 2021
fc48da7
Minor refactorings
Mar 12, 2021
870aa51
Moving manager subpackage into ruler package to avoid dependency cycles
Mar 12, 2021
5565a78
Minor refactorings
Mar 12, 2021
23356a3
Skipping commit if remote-write client is not defined
Mar 13, 2021
d857417
Merge remote-tracking branch 'upstream/master' into dannykopping/reco…
Apr 9, 2021
a202c1a
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
Apr 27, 2021
8f07114
Updating use of cortex client
Apr 27, 2021
56ab4eb
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
May 10, 2021
f816193
Memoizing appenders, using queue for samples & labels
May 16, 2021
d0be7fa
Adding buffer size configurability
May 17, 2021
524bbf7
Adding metric to show current buffer size
May 18, 2021
0339fbf
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
May 22, 2021
df1f8d2
Refactoring for better responsibility separation & testability
May 24, 2021
df72b2f
Adding per-tenant overrides of remote-write queue capacity
May 24, 2021
2eb9042
Adding tests for evicting queue
May 24, 2021
19874d9
Adding more tests and refactoring
May 25, 2021
0200090
Adding queue benchmark
May 25, 2021
1760bd5
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
May 25, 2021
5800010
Reducing redundancy in metric names
May 25, 2021
02a0943
Testing that only metric queries can be run
May 25, 2021
0d188f7
Minor fixes pre-review
May 25, 2021
858934a
Appeasing the linter
May 25, 2021
67c78fa
Guarding against unprotected nil pointer dereference in Prometheus re…
May 27, 2021
469ce9b
Appeasing the linter
May 27, 2021
52489cd
Setting tenant ID header on remote-write client
May 28, 2021
4f218cf
Updating benchmark to use complex struct rather than int to be more r…
May 31, 2021
4d3bebd
Registering flags
May 31, 2021
cc736d7
Adding metric to track remote-write commit errors
May 31, 2021
7e95a8c
Refactoring based on review
May 31, 2021
a7a4186
Performance improvements based on review
May 31, 2021
1e8b8af
Return error on invalid queue capacity
May 31, 2021
9dee0f4
Removing global queue capacity config - using limits
Jun 1, 2021
bf473c8
Reusing memory in request preparation
Jun 1, 2021
082f54e
Moving remote-write metrics into struct
Jun 2, 2021
14a2d74
Applying review suggestions
Jun 2, 2021
8424008
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
Jun 2, 2021
22f628c
Allowing for runtime changing of per-tenant remote-write queue capacity
Jun 2, 2021
a320a46
Appeasing the linter
Jun 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions pkg/ruler/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ type RemoteWriteAppendable struct {
cfg Config
overrides RulesLimits
logger log.Logger

metrics *remoteWriteMetrics
}

func newRemoteWriteAppendable(cfg Config, overrides RulesLimits, logger log.Logger, userID string) *RemoteWriteAppendable {
func newRemoteWriteAppendable(cfg Config, overrides RulesLimits, logger log.Logger, userID string, metrics *remoteWriteMetrics) *RemoteWriteAppendable {
return &RemoteWriteAppendable{
logger: logger,
userID: userID,
cfg: cfg,
overrides: overrides,
groupAppender: make(map[string]*RemoteWriteAppender),
metrics: metrics,
}
}

Expand All @@ -42,15 +45,23 @@ type RemoteWriteAppender struct {
userID string
groupKey string

queue *util.EvictingQueue
queue *util.EvictingQueue
metrics *remoteWriteMetrics
}

func (a *RemoteWriteAppendable) Appender(ctx context.Context) storage.Appender {
groupKey := retrieveGroupKeyFromContext(ctx)

capacity := a.overrides.RulerRemoteWriteQueueCapacity(a.userID)

// create or retrieve an appender associated with this groupKey (unique ID for rule group)
appender, found := a.groupAppender[groupKey]
if found {
err := appender.WithQueueCapacity(capacity)
if err != nil {
level.Warn(a.logger).Log("msg", "attempting to set capacity failed", "err", err)
}

return appender
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is ultimately called on every evaluation cycle in the Ruler, we can probably add a method WithCapacity which can mutate an appenders capacity and thus will be kept up to date with the overrides (would need a test)

Suggested change
return appender
return appender.WithCapacity(capacity)

}

Expand All @@ -60,8 +71,7 @@ func (a *RemoteWriteAppendable) Appender(ctx context.Context) storage.Appender {
return &NoopAppender{}
}

capacity := a.overrides.RulerRemoteWriteQueueCapacity(a.userID)
queue, err := util.NewEvictingQueue(capacity, onEvict(a.userID, groupKey))
queue, err := util.NewEvictingQueue(capacity, a.onEvict(a.userID, groupKey))
if err != nil {
level.Error(a.logger).Log("msg", "queue creation error; setting appender as noop", "err", err, "tenant", a.userID)
return &NoopAppender{}
Expand All @@ -74,11 +84,10 @@ func (a *RemoteWriteAppendable) Appender(ctx context.Context) storage.Appender {
groupKey: groupKey,
userID: a.userID,

queue: queue,
queue: queue,
metrics: a.metrics,
}

samplesQueueCapacity.WithLabelValues(a.userID, groupKey).Set(float64(capacity))

// only track reference if groupKey was retrieved
if groupKey == "" {
level.Warn(a.logger).Log("msg", "blank group key passed via context; creating new appender")
Expand All @@ -89,9 +98,9 @@ func (a *RemoteWriteAppendable) Appender(ctx context.Context) storage.Appender {
return appender
}

func onEvict(userID, groupKey string) func() {
func (a *RemoteWriteAppendable) onEvict(userID, groupKey string) func() {
return func() {
samplesEvicted.WithLabelValues(userID, groupKey).Inc()
a.metrics.samplesEvicted.WithLabelValues(userID, groupKey).Inc()
}
}

Expand All @@ -104,8 +113,8 @@ func (a *RemoteWriteAppender) Append(_ uint64, l labels.Labels, t int64, v float
},
})

samplesQueued.WithLabelValues(a.userID, a.groupKey).Set(float64(a.queue.Length()))
samplesQueuedTotal.WithLabelValues(a.userID, a.groupKey).Inc()
a.metrics.samplesQueued.WithLabelValues(a.userID, a.groupKey).Set(float64(a.queue.Length()))
a.metrics.samplesQueuedTotal.WithLabelValues(a.userID, a.groupKey).Inc()

return 0, nil
}
Expand All @@ -129,21 +138,21 @@ func (a *RemoteWriteAppender) Commit() error {
req, err := a.remoteWriter.PrepareRequest(a.queue)
if err != nil {
level.Error(a.logger).Log("msg", "could not prepare remote-write request", "err", err)
remoteWriteErrors.WithLabelValues(a.userID, a.groupKey).Inc()
a.metrics.remoteWriteErrors.WithLabelValues(a.userID, a.groupKey).Inc()
return err
}

err = a.remoteWriter.Store(a.ctx, req)
if err != nil {
level.Error(a.logger).Log("msg", "could not store recording rule samples", "err", err)
remoteWriteErrors.WithLabelValues(a.userID, a.groupKey).Inc()
a.metrics.remoteWriteErrors.WithLabelValues(a.userID, a.groupKey).Inc()
return err
}

// Clear the queue on a successful response
a.queue.Clear()

samplesQueued.WithLabelValues(a.userID, a.groupKey).Set(0)
a.metrics.samplesQueued.WithLabelValues(a.userID, a.groupKey).Set(0)

return nil
}
Expand All @@ -154,6 +163,15 @@ func (a *RemoteWriteAppender) Rollback() error {
return nil
}

func (a *RemoteWriteAppender) WithQueueCapacity(capacity int) error {
if err := a.queue.SetCapacity(capacity); err != nil {
return err
}

a.metrics.samplesQueueCapacity.WithLabelValues(a.userID).Set(float64(capacity))
return nil
}

func retrieveGroupKeyFromContext(ctx context.Context) string {
data, found := ctx.Value(promql.QueryOrigin{}).(map[string]interface{})
if !found {
Expand Down
25 changes: 25 additions & 0 deletions pkg/ruler/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
promConfig "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -26,6 +27,7 @@ var (
fakeUserID = "fake"
emptyWriteRequest = []byte{}
queueCapacity = 10
metrics = newRemoteWriteMetrics(prometheus.DefaultRegisterer)
)

func TestGroupKeyRetrieval(t *testing.T) {
Expand Down Expand Up @@ -55,6 +57,28 @@ func TestMemoizedAppenders(t *testing.T) {
require.NotSame(t, appender, appendable.Appender(ctx))
}

// TestMemoizedAppendersWithRuntimeCapacityChange tests that memoized appenders can reconfigure their capacity
func TestMemoizedAppendersWithRuntimeCapacityChange(t *testing.T) {
ctx := createOriginContext("/rule/file", "rule-group")
appendable := createBasicAppendable(queueCapacity)

appender := appendable.Appender(ctx)

// appender is configured with default queue capacity initially
capacity := appender.(*RemoteWriteAppender).queue.Capacity()
require.Equal(t, queueCapacity, capacity)

newCapacity := 123

// reconfigure the overrides to simulate a runtime config change
appendable.overrides = fakeLimits(newCapacity)

// appender is reconfigured with new queue capacity when retrieved again
appender = appendable.Appender(ctx)
capacity = appender.(*RemoteWriteAppender).queue.Capacity()
require.Equal(t, newCapacity, capacity)
}

func TestAppenderSeparationByRuleGroup(t *testing.T) {
ctxA := createOriginContext("/rule/fileA", "rule-groupA")
ctxB := createOriginContext("/rule/fileB", "rule-groupB")
Expand Down Expand Up @@ -267,6 +291,7 @@ func createBasicAppendable(queueCapacity int) *RemoteWriteAppendable {
fakeLimits(queueCapacity),
logger,
fakeUserID,
metrics,
)
}

Expand Down
22 changes: 15 additions & 7 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func MemstoreTenantManager(
engine *logql.Engine,
overrides RulesLimits,
) ruler.ManagerFactory {
var metrics *Metrics
var msMetrics *memstoreMetrics
var rwMetrics *remoteWriteMetrics

return ruler.ManagerFactory(func(
ctx context.Context,
Expand All @@ -104,15 +105,22 @@ func MemstoreTenantManager(
) ruler.RulesManager {
// We'll ignore the passed registerer and use the default registerer to avoid prefix issues and other weirdness.
// This closure prevents re-registering.
if metrics == nil {
metrics = NewMetrics(prometheus.DefaultRegisterer)
registerer := prometheus.DefaultRegisterer

if msMetrics == nil {
msMetrics = newMemstoreMetrics(registerer)
}

if rwMetrics == nil {
rwMetrics = newRemoteWriteMetrics(registerer)
}

logger = log.With(logger, "user", userID)
queryFunc := engineQueryFunc(engine, overrides, userID)
memStore := NewMemStore(userID, queryFunc, metrics, 5*time.Minute, log.With(logger, "subcomponent", "MemStore"))
memStore := NewMemStore(userID, queryFunc, msMetrics, 5*time.Minute, log.With(logger, "subcomponent", "MemStore"))

mgr := rules.NewManager(&rules.ManagerOptions{
Appendable: newAppendable(cfg, overrides, logger, userID),
Appendable: newAppendable(cfg, overrides, logger, userID, rwMetrics),
Queryable: memStore,
QueryFunc: queryFunc,
Context: user.InjectOrgID(ctx, userID),
Expand All @@ -133,13 +141,13 @@ func MemstoreTenantManager(
})
}

func newAppendable(cfg Config, overrides RulesLimits, logger log.Logger, userID string) storage.Appendable {
func newAppendable(cfg Config, overrides RulesLimits, logger log.Logger, userID string, metrics *remoteWriteMetrics) storage.Appendable {
if !cfg.RemoteWrite.Enabled {
level.Info(logger).Log("msg", "remote-write is disabled")
return &NoopAppender{}
}

return newRemoteWriteAppendable(cfg, overrides, logger, userID)
return newRemoteWriteAppendable(cfg, overrides, logger, userID, metrics)
}

type GroupLoader struct{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestNoopAppender(t *testing.T) {
}
require.False(t, cfg.RemoteWrite.Enabled)

appendable := newAppendable(cfg, &validation.Overrides{}, log.NewNopLogger(), "fake")
appendable := newAppendable(cfg, &validation.Overrides{}, log.NewNopLogger(), "fake", metrics)
appender := appendable.Appender(context.TODO())
require.IsType(t, NoopAppender{}, appender)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/ruler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"github.com/prometheus/prometheus/config"
)

// DefaultQueueCapacity defines the default size of the samples queue which will hold samples
// while the remote-write endpoint is unavailable
const DefaultQueueCapacity = 10000

type Config struct {
ruler.Config `yaml:",inline"`

Expand Down
36 changes: 18 additions & 18 deletions pkg/ruler/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,23 @@ func ForStateMetric(base labels.Labels, alertName string) labels.Labels {
return b.Labels()
}

type Metrics struct {
Evaluations *prometheus.CounterVec
Samples prometheus.Gauge // in memory samples
CacheHits *prometheus.CounterVec // cache hits on in memory samples
type memstoreMetrics struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to make these private, but I did link you to ingesterMetrics as an example to follow which are private though 😆. In the long term, I think we should expose them consistently and I'd favor public Metric structs where possible.

evaluations *prometheus.CounterVec
samples prometheus.Gauge // in memory samples
cacheHits *prometheus.CounterVec // cache hits on in memory samples
}

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
Evaluations: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
func newMemstoreMetrics(r prometheus.Registerer) *memstoreMetrics {
return &memstoreMetrics{
evaluations: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ruler_memory_for_state_evaluations_total",
}, []string{"status", "tenant"}),
Samples: promauto.With(r).NewGauge(prometheus.GaugeOpts{
samples: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "ruler_memory_samples",
}),
CacheHits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
cacheHits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ruler_memory_for_state_cache_hits_total",
}, []string{"tenant"}),
Expand All @@ -77,7 +77,7 @@ type MemStore struct {
mtx sync.Mutex
userID string
queryFunc rules.QueryFunc
metrics *Metrics
metrics *memstoreMetrics
mgr RuleIter
logger log.Logger
rules map[string]*RuleCache
Expand All @@ -87,7 +87,7 @@ type MemStore struct {
cleanupInterval time.Duration
}

func NewMemStore(userID string, queryFunc rules.QueryFunc, metrics *Metrics, cleanupInterval time.Duration, logger log.Logger) *MemStore {
func NewMemStore(userID string, queryFunc rules.QueryFunc, metrics *memstoreMetrics, cleanupInterval time.Duration, logger log.Logger) *MemStore {
s := &MemStore{
userID: userID,
metrics: metrics,
Expand Down Expand Up @@ -243,7 +243,7 @@ func (m *memStoreQuerier) Select(sortSeries bool, params *storage.SelectHints, m

smpl, cached := cache.Get(m.ts, ls)
if cached {
m.metrics.CacheHits.WithLabelValues(m.userID).Inc()
m.metrics.cacheHits.WithLabelValues(m.userID).Inc()
level.Debug(m.logger).Log("msg", "result cached", "rule", ruleKey)
// Assuming the result is cached but the desired series is not in the result, it wouldn't be considered active.
if smpl == nil {
Expand All @@ -265,10 +265,10 @@ func (m *memStoreQuerier) Select(sortSeries bool, params *storage.SelectHints, m
vec, err := m.queryFunc(m.ctx, rule.Query().String(), m.ts.Add(-rule.HoldDuration()))
if err != nil {
level.Info(m.logger).Log("msg", "error querying for rule", "rule", ruleKey, "err", err.Error())
m.metrics.Evaluations.WithLabelValues(statusFailure, m.userID).Inc()
m.metrics.evaluations.WithLabelValues(statusFailure, m.userID).Inc()
return storage.NoopSeriesSet()
}
m.metrics.Evaluations.WithLabelValues(statusSuccess, m.userID).Inc()
m.metrics.evaluations.WithLabelValues(statusSuccess, m.userID).Inc()
level.Debug(m.logger).Log("msg", "rule state successfully restored", "rule", ruleKey, "len", len(vec))

// translate the result into the ALERTS_FOR_STATE series for caching,
Expand Down Expand Up @@ -322,11 +322,11 @@ func (*memStoreQuerier) Close() error { return nil }

type RuleCache struct {
mtx sync.Mutex
metrics *Metrics
metrics *memstoreMetrics
data map[int64]map[uint64]promql.Sample
}

func NewRuleCache(metrics *Metrics) *RuleCache {
func NewRuleCache(metrics *memstoreMetrics) *RuleCache {
return &RuleCache{
data: make(map[int64]map[uint64]promql.Sample),
metrics: metrics,
Expand All @@ -345,7 +345,7 @@ func (c *RuleCache) Set(ts time.Time, vec promql.Vector) {
for _, sample := range vec {
tsMap[sample.Metric.Hash()] = sample
}
c.metrics.Samples.Add(float64(len(vec)))
c.metrics.samples.Add(float64(len(vec)))
}

// Get returns ok if that timestamp's result is cached.
Expand Down Expand Up @@ -377,7 +377,7 @@ func (c *RuleCache) CleanupOldSamples(olderThan time.Time) (empty bool) {
for ts, tsMap := range c.data {
if ts < ns {
delete(c.data, ts)
c.metrics.Samples.Add(-float64(len(tsMap)))
c.metrics.samples.Add(-float64(len(tsMap)))
}

}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

var (
NilMetrics = NewMetrics(nil)
NilMetrics = newMemstoreMetrics(nil)
NilLogger = log.NewNopLogger()
)

Expand Down
Loading