Skip to content

Commit

Permalink
add store method for getting fetcher for a chunk (grafana#3164)
Browse files Browse the repository at this point in the history
* add store method for getting fetcher for a chunk

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* add validation in schema to have from time in increasing other, accept a single timestamp for getting chunk fetcher

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* suggested change from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* Update pkg/chunk/composite_store.go

Signed-off-by: Marco Pracucci <marco@pracucci.com>

Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
sandeepsukhani and pracucci authored Sep 14, 2020
1 parent 04be3f9 commit 11f5407
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 26 deletions.
4 changes: 4 additions & 0 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,3 +714,7 @@ func (c *store) DeleteSeriesIDs(ctx context.Context, from, through model.Time, u
// SeriesID is something which is only used in SeriesStore so we need not do anything here
return nil
}

func (c *baseStore) GetChunkFetcher(_ model.Time) *Fetcher {
return c.fetcher
}
23 changes: 17 additions & 6 deletions composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Store interface {
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
GetChunkFetcher(tm model.Time) *Fetcher

// DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage.
// It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk
Expand Down Expand Up @@ -174,6 +175,22 @@ func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, t
return chunkIDs, fetchers, err
}

func (c compositeStore) GetChunkFetcher(tm model.Time) *Fetcher {
// find the schema with the lowest start _after_ tm
j := sort.Search(len(c.stores), func(j int) bool {
return c.stores[j].start > tm
})

// reduce it by 1 because we want a schema with start <= tm
j--

if 0 <= j && j < len(c.stores) {
return c.stores[j].GetChunkFetcher(tm)
}

return nil
}

// DeleteSeriesIDs deletes series IDs from index in series store
func (c CompositeStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error {
return c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
Expand Down Expand Up @@ -233,12 +250,6 @@ func (c compositeStore) forStores(ctx context.Context, userID string, from, thro
nextSchemaStarts = c.stores[i+1].start
}

// If the next schema starts at the same time as this one,
// skip this one.
if nextSchemaStarts == c.stores[i].start {
continue
}

end := min(through, nextSchemaStarts-1)
err := callback(ctx, start, end, c.stores[i].Store)
if err != nil {
Expand Down
74 changes: 58 additions & 16 deletions composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (m mockStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time
return nil
}

func (m mockStore) GetChunkFetcher(tm model.Time) *Fetcher {
return nil
}

func (m mockStore) Stop() {}

func TestCompositeStore(t *testing.T) {
Expand Down Expand Up @@ -124,22 +128,6 @@ func TestCompositeStore(t *testing.T) {
},
},

// Test we get only one result when two schema start at same time
{
compositeStore{
stores: []compositeStoreEntry{
{model.TimeFromUnix(0), mockStore(1)},
{model.TimeFromUnix(10), mockStore(2)},
{model.TimeFromUnix(10), mockStore(3)},
},
},
0, 165,
[]result{
{model.TimeFromUnix(0), model.TimeFromUnix(10) - 1, mockStore(1)},
{model.TimeFromUnix(10), model.TimeFromUnix(165), mockStore(3)},
},
},

// Test all the various combination we can get when there are three schemas
{
cs, 34, 65,
Expand Down Expand Up @@ -247,3 +235,57 @@ func TestCompositeStoreLabels(t *testing.T) {
}

}

type mockStoreGetChunkFetcher struct {
mockStore
chunkFetcher *Fetcher
}

func (m mockStoreGetChunkFetcher) GetChunkFetcher(tm model.Time) *Fetcher {
return m.chunkFetcher
}

func TestCompositeStore_GetChunkFetcher(t *testing.T) {
cs := compositeStore{
stores: []compositeStoreEntry{
{model.TimeFromUnix(10), mockStoreGetChunkFetcher{mockStore(0), &Fetcher{}}},
{model.TimeFromUnix(20), mockStoreGetChunkFetcher{mockStore(1), &Fetcher{}}},
},
}

for _, tc := range []struct {
name string
tm model.Time
expectedFetcher *Fetcher
}{
{
name: "no matching store",
tm: model.TimeFromUnix(0),
},
{
name: "first store",
tm: model.TimeFromUnix(10),
expectedFetcher: cs.stores[0].Store.(mockStoreGetChunkFetcher).chunkFetcher,
},
{
name: "still first store",
tm: model.TimeFromUnix(11),
expectedFetcher: cs.stores[0].Store.(mockStoreGetChunkFetcher).chunkFetcher,
},
{
name: "second store",
tm: model.TimeFromUnix(20),
expectedFetcher: cs.stores[1].Store.(mockStoreGetChunkFetcher).chunkFetcher,
},
{
name: "still second store",
tm: model.TimeFromUnix(21),
expectedFetcher: cs.stores[1].Store.(mockStoreGetChunkFetcher).chunkFetcher,
},
} {
t.Run(tc.name, func(t *testing.T) {
require.Same(t, tc.expectedFetcher, cs.GetChunkFetcher(tc.tm))
})
}

}
15 changes: 11 additions & 4 deletions schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ const (
)

var (
errInvalidSchemaVersion = errors.New("invalid schema version")
errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)")
errConfigFileNotSet = errors.New("schema config file needs to be set")
errConfigChunkPrefixNotSet = errors.New("schema config for chunks is missing the 'prefix' setting")
errInvalidSchemaVersion = errors.New("invalid schema version")
errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)")
errConfigFileNotSet = errors.New("schema config file needs to be set")
errConfigChunkPrefixNotSet = errors.New("schema config for chunks is missing the 'prefix' setting")
errSchemaIncreasingFromTime = errors.New("from time in schemas must be distinct and in increasing order")
)

// PeriodConfig defines the schema and tables to use for a period of time
Expand Down Expand Up @@ -120,6 +121,12 @@ func (cfg *SchemaConfig) Validate() error {
if err := periodCfg.validate(); err != nil {
return err
}

if i+1 < len(cfg.Configs) {
if cfg.Configs[i].From.Time.Unix() >= cfg.Configs[i+1].From.Time.Unix() {
return errSchemaIncreasingFromTime
}
}
}
return nil
}
Expand Down
44 changes: 44 additions & 0 deletions schema_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,50 @@ func TestSchemaConfig_Validate(t *testing.T) {
},
err: errConfigChunkPrefixNotSet,
},
"invalid schema with same from time configs": {
config: &SchemaConfig{
Configs: []PeriodConfig{
{
From: MustParseDayTime("1970-01-01"),
Schema: "v9",
},
{
From: MustParseDayTime("1970-01-01"),
Schema: "v10",
},
},
},
err: errSchemaIncreasingFromTime,
},
"invalid schema with from time not in increasing order": {
config: &SchemaConfig{
Configs: []PeriodConfig{
{
From: MustParseDayTime("1970-01-02"),
Schema: "v9",
},
{
From: MustParseDayTime("1970-01-01"),
Schema: "v10",
},
},
},
err: errSchemaIncreasingFromTime,
},
"valid schema with different from time configs": {
config: &SchemaConfig{
Configs: []PeriodConfig{
{
From: MustParseDayTime("1970-01-01"),
Schema: "v9",
},
{
From: MustParseDayTime("1970-01-02"),
Schema: "v10",
},
},
},
},
}

for testName, testData := range tests {
Expand Down

0 comments on commit 11f5407

Please sign in to comment.