Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force using a buffer from the ChunkRefsPool #9217

Merged
merged 3 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/stores/tsdb/head_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ func (t *tenantHeads) GetChunkRefs(ctx context.Context, userID string, from, thr
if !ok {
return nil, nil
}
return idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...)

}

Expand Down
33 changes: 24 additions & 9 deletions pkg/storage/stores/tsdb/head_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,19 @@ func Test_TenantHeads_Append(t *testing.T) {
}
_ = h.Append("fake", ls, ls.Hash(), chks)

buf := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(buf)

found, err := h.GetChunkRefs(
context.Background(),
"fake",
0,
100,
nil, nil,
buf, nil,
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs("fake", ls.Hash(), chks), found)

}

// Test multitenant reads
Expand Down Expand Up @@ -157,20 +159,23 @@ func Test_TenantHeads_MultiRead(t *testing.T) {

}

buf := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(buf)

// ensure we're only returned the data from the correct tenant
for _, tenant := range tenants {
buf = buf[:0]
found, err := h.GetChunkRefs(
context.Background(),
tenant.user,
0,
100,
nil, nil,
buf, nil,
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs(tenant.user, tenant.ls.Hash(), chks), found)
}

}

// test head recover from wal
Expand Down Expand Up @@ -247,18 +252,21 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
require.Equal(t, 1, len(grp.wals))
require.Nil(t, recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false))

buf := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(buf)

for _, c := range cases {
buf = buf[:0]
refs, err := mgr.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
nil, nil,
buf, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs(c.User, c.Fingerprint, c.Chunks), refs)
}

}

// test mgr recover from multiple wals across multiple periods
Expand Down Expand Up @@ -323,12 +331,16 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
// Ensure old WAL data is queryable
multiIndex := NewMultiIndex(IndexSlice{mgr, mgr.tsdbManager.(noopTSDBManager).tenantHeads})

buf := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(buf)

for _, c := range cases {
buf = buf[:0]
refs, err := multiIndex.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
nil, nil,
buf, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
Expand Down Expand Up @@ -359,11 +371,12 @@ func Test_HeadManager_Lifecycle(t *testing.T) {

// Ensure old + new data is queryable
for _, c := range append(cases, newCase) {
buf = buf[:0]
refs, err := multiIndex.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
nil, nil,
buf, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
Expand Down Expand Up @@ -536,7 +549,9 @@ func BenchmarkTenantHeads(b *testing.B) {
wg.Add(1)
go func(r int) {
defer wg.Done()
var res []ChunkRef
res := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(res)

tenant := r % nTenants

// nolint:ineffassign,staticcheck
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/stores/tsdb/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ func (c *IndexClient) GetChunkRefs(ctx context.Context, userID string, from, thr
return nil, err
}

// TODO(owen-d): use a pool to reduce allocs here
chks, err := c.idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
chks := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(chks)

chks, err = c.idx.GetChunkRefs(ctx, userID, from, through, chks, shard, matchers...)
kvps = append(kvps,
"chunks", len(chks),
"indexErr", err,
Expand Down
15 changes: 5 additions & 10 deletions pkg/storage/stores/tsdb/multi_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model

func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) {
if res == nil {
res = ChunkRefsPool.Get()
}
res = res[:0]

// keep track of duplicates
seen := make(map[ChunkRef]struct{})

Expand All @@ -131,9 +126,7 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
res = append(res, ref)
}
ChunkRefsPool.Put(g)

}

return res, nil
})

Expand All @@ -142,11 +135,14 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
from,
through,
func(ctx context.Context, idx Index) error {
got, err := idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
var err error
buf := ChunkRefsPool.Get()
buf, err = idx.GetChunkRefs(ctx, userID, from, through, buf, shard, matchers...)
if err != nil {
ChunkRefsPool.Put(buf)
return err
}
acc.Add(got)
acc.Add(buf)
return nil
},
); err != nil {
Expand All @@ -161,7 +157,6 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
return nil, err
}
return merged.([]ChunkRef), nil

}

func (i *MultiIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/stores/tsdb/multi_file_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func TestMultiIndex(t *testing.T) {
idx := NewMultiIndex(IndexSlice(indices))

t.Run("GetChunkRefs", func(t *testing.T) {
refs, err := idx.GetChunkRefs(context.Background(), "fake", 2, 5, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var err error
refs := make([]ChunkRef, 0, 8)
refs, err = idx.GetChunkRefs(context.Background(), "fake", 2, 5, refs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Nil(t, err)

expected := []ChunkRef{
Expand Down
6 changes: 0 additions & 6 deletions pkg/storage/stores/tsdb/single_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,8 @@ func (i *TSDBIndex) forPostings(
}

func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
if res == nil {
res = ChunkRefsPool.Get()
}
res = res[:0]

if err := i.ForSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
for _, chk := range chks {

res = append(res, ChunkRef{
User: userID, // assumed to be the same, will be enforced by caller.
Fingerprint: fp,
Expand Down
15 changes: 11 additions & 4 deletions pkg/storage/stores/tsdb/single_file_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func TestSingleIdx(t *testing.T) {
t.Run(variant.desc, func(t *testing.T) {
idx := variant.fn()
t.Run("GetChunkRefs", func(t *testing.T) {
refs, err := idx.GetChunkRefs(context.Background(), "fake", 1, 5, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var err error
refs := make([]ChunkRef, 0, 8)
refs, err = idx.GetChunkRefs(context.Background(), "fake", 1, 5, refs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Nil(t, err)

expected := []ChunkRef{
Expand Down Expand Up @@ -126,7 +128,9 @@ func TestSingleIdx(t *testing.T) {
Shard: 1,
Of: 2,
}
shardedRefs, err := idx.GetChunkRefs(context.Background(), "fake", 1, 5, nil, &shard, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var err error
refs := make([]ChunkRef, 0, 8)
refs, err = idx.GetChunkRefs(context.Background(), "fake", 1, 5, refs, &shard, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))

require.Nil(t, err)

Expand All @@ -136,7 +140,7 @@ func TestSingleIdx(t *testing.T) {
Start: 1,
End: 10,
Checksum: 3,
}}, shardedRefs)
}}, refs)

})

Expand Down Expand Up @@ -249,10 +253,13 @@ func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) {

b.ResetTimer()
b.ReportAllocs()
var err error
for i := 0; i < b.N; i++ {
chkRefs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
chkRefs := ChunkRefsPool.Get()
chkRefs, err = tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, chkRefs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.NoError(b, err)
require.Len(b, chkRefs, numChunksToMatch*2)
ChunkRefsPool.Put(chkRefs)
}
}

Expand Down