Skip to content

Commit

Permalink
Revert 9217 chaudum/tsdb chunkrefs pool (#9685) (#9686)
Browse files Browse the repository at this point in the history
Revert #9217 (potential bug in query result)

Cherry pick #9685 to `k153`

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
kavirajk authored Jun 12, 2023
1 parent a9fbb73 commit 8a2f897
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 50 deletions.
5 changes: 3 additions & 2 deletions operator/apis/loki/v1beta1/lokistack_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package v1beta1_test
import (
"testing"

v1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/apis/loki/v1beta1"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/apis/loki/v1beta1"
)

func TestConvertToV1_LokiStack(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/tsdb/head_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ func (t *tenantHeads) GetChunkRefs(ctx context.Context, userID string, from, thr
if !ok {
return nil, nil
}
return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...)
return idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)

}

Expand Down
33 changes: 9 additions & 24 deletions pkg/storage/stores/tsdb/head_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,17 @@ func Test_TenantHeads_Append(t *testing.T) {
}
_ = h.Append("fake", ls, ls.Hash(), chks)

buf := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(buf)

found, err := h.GetChunkRefs(
context.Background(),
"fake",
0,
100,
buf, nil,
nil, nil,
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs("fake", ls.Hash(), chks), found)

}

// Test multitenant reads
Expand Down Expand Up @@ -159,23 +157,20 @@ func Test_TenantHeads_MultiRead(t *testing.T) {

}

buf := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(buf)

// ensure we're only returned the data from the correct tenant
for _, tenant := range tenants {
buf = buf[:0]
found, err := h.GetChunkRefs(
context.Background(),
tenant.user,
0,
100,
buf, nil,
nil, nil,
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs(tenant.user, tenant.ls.Hash(), chks), found)
}

}

// test head recover from wal
Expand Down Expand Up @@ -252,21 +247,18 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
require.Equal(t, 1, len(grp.wals))
require.Nil(t, recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false))

buf := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(buf)

for _, c := range cases {
buf = buf[:0]
refs, err := mgr.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
buf, nil,
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs(c.User, c.Fingerprint, c.Chunks), refs)
}

}

// test mgr recover from multiple wals across multiple periods
Expand Down Expand Up @@ -331,16 +323,12 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
// Ensure old WAL data is queryable
multiIndex := NewMultiIndex(IndexSlice{mgr, mgr.tsdbManager.(noopTSDBManager).tenantHeads})

buf := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(buf)

for _, c := range cases {
buf = buf[:0]
refs, err := multiIndex.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
buf, nil,
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
Expand Down Expand Up @@ -371,12 +359,11 @@ func Test_HeadManager_Lifecycle(t *testing.T) {

// Ensure old + new data is queryable
for _, c := range append(cases, newCase) {
buf = buf[:0]
refs, err := multiIndex.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
buf, nil,
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
Expand Down Expand Up @@ -549,9 +536,7 @@ func BenchmarkTenantHeads(b *testing.B) {
wg.Add(1)
go func(r int) {
defer wg.Done()
res := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(res)

var res []ChunkRef
tenant := r % nTenants

// nolint:ineffassign,staticcheck
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/stores/tsdb/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,8 @@ func (c *IndexClient) GetChunkRefs(ctx context.Context, userID string, from, thr
return nil, err
}

chks := ChunkRefsPool.Get()
defer ChunkRefsPool.Put(chks)

chks, err = c.idx.GetChunkRefs(ctx, userID, from, through, chks, shard, matchers...)
// TODO(owen-d): use a pool to reduce allocs here
chks, err := c.idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
kvps = append(kvps,
"chunks", len(chks),
"indexErr", err,
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/stores/tsdb/multi_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model

func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) {
if res == nil {
res = ChunkRefsPool.Get()
}
res = res[:0]

// keep track of duplicates
seen := make(map[ChunkRef]struct{})

Expand All @@ -126,7 +131,9 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
res = append(res, ref)
}
ChunkRefsPool.Put(g)

}

return res, nil
})

Expand All @@ -135,14 +142,11 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
from,
through,
func(ctx context.Context, idx Index) error {
var err error
buf := ChunkRefsPool.Get()
buf, err = idx.GetChunkRefs(ctx, userID, from, through, buf, shard, matchers...)
got, err := idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
if err != nil {
ChunkRefsPool.Put(buf)
return err
}
acc.Add(buf)
acc.Add(got)
return nil
},
); err != nil {
Expand All @@ -157,6 +161,7 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
return nil, err
}
return merged.([]ChunkRef), nil

}

func (i *MultiIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/stores/tsdb/multi_file_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ func TestMultiIndex(t *testing.T) {
idx := NewMultiIndex(IndexSlice(indices))

t.Run("GetChunkRefs", func(t *testing.T) {
var err error
refs := make([]ChunkRef, 0, 8)
refs, err = idx.GetChunkRefs(context.Background(), "fake", 2, 5, refs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
refs, err := idx.GetChunkRefs(context.Background(), "fake", 2, 5, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Nil(t, err)

expected := []ChunkRef{
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/stores/tsdb/single_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,14 @@ func (i *TSDBIndex) forPostings(
}

func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
if res == nil {
res = ChunkRefsPool.Get()
}
res = res[:0]

if err := i.ForSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
for _, chk := range chks {

res = append(res, ChunkRef{
User: userID, // assumed to be the same, will be enforced by caller.
Fingerprint: fp,
Expand Down
15 changes: 4 additions & 11 deletions pkg/storage/stores/tsdb/single_file_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ func TestSingleIdx(t *testing.T) {
t.Run(variant.desc, func(t *testing.T) {
idx := variant.fn()
t.Run("GetChunkRefs", func(t *testing.T) {
var err error
refs := make([]ChunkRef, 0, 8)
refs, err = idx.GetChunkRefs(context.Background(), "fake", 1, 5, refs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
refs, err := idx.GetChunkRefs(context.Background(), "fake", 1, 5, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Nil(t, err)

expected := []ChunkRef{
Expand Down Expand Up @@ -128,9 +126,7 @@ func TestSingleIdx(t *testing.T) {
Shard: 1,
Of: 2,
}
var err error
refs := make([]ChunkRef, 0, 8)
refs, err = idx.GetChunkRefs(context.Background(), "fake", 1, 5, refs, &shard, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
shardedRefs, err := idx.GetChunkRefs(context.Background(), "fake", 1, 5, nil, &shard, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))

require.Nil(t, err)

Expand All @@ -140,7 +136,7 @@ func TestSingleIdx(t *testing.T) {
Start: 1,
End: 10,
Checksum: 3,
}}, refs)
}}, shardedRefs)

})

Expand Down Expand Up @@ -253,13 +249,10 @@ func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) {

b.ResetTimer()
b.ReportAllocs()
var err error
for i := 0; i < b.N; i++ {
chkRefs := ChunkRefsPool.Get()
chkRefs, err = tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, chkRefs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
chkRefs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.NoError(b, err)
require.Len(b, chkRefs, numChunksToMatch*2)
ChunkRefsPool.Put(chkRefs)
}
}

Expand Down

0 comments on commit 8a2f897

Please sign in to comment.