Skip to content

Commit

Permalink
Convert Cortex components to services (grafana#2166)
Browse files Browse the repository at this point in the history
* First step towards Cortex using services model. Only Server module converted.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted runtimeconfig.Manager to service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added /services page.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted memberlistKV to service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fixed runtimeconfig tests.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted Ring to service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Log waiting for other module to initialize.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted overrides to service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted client pool to a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Convert ring lifecycler into a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted HA Tracker to a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted Distributor to a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Handle nil from wrappedService call.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Explain why Server uses service and not a wrappedService.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Make service from Store.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Convert ingester to a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Convert querier initialization into a service. This isn't full
conversion, but more work would be required to fully use services
for querier. Left TODO comment instead.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Listen for module failures, and log them.

Also log when all services start successfully.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Convert blockQueryable and UserStore into a services.

UserStore now does initial sync in Starting state.
blockQueryable doesn't return querier until it has finished starting.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Wait a little before shutdown.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted queryFrontend to a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Logging

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Convert TableManager to service

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Convert Ruler to service

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Convert Configs to service

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Convert AlertManager to service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Renamed init methods back.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fixed tests.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted Compactor to a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Polishing, comments.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Comments.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Lint comments.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Stop server only after all other modules have finished.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Don't send more jobs to lifecycler loop, if it's not running anymore.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Don't stop Server until other modules have stopped.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Removed Compactor from All target. It was meant to be for testing only.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Comment.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* More comments around startup logic.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* moduleServiceWrapper doesn't need full Cortex, only serviceMap

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Messages

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix outdated comment.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Start lifecycler in starting functions.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fixed comment. Return lifecycler's failure case, if any, as error.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix lifecycler usage.

Pass independent context to lifecycler, so that it doesn't stop too early.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix test.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Removed obsolete waiting code. Only log error if it is real error.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Renamed servManager to subservices.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Addressing review feedback.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix test.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix compilation errors after rebase.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Extracted code that creates server service into separate file.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added some helper methods.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use helper methods to simplify service creation.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Comment.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Helper functions for manager.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use helper functions.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fixes, use helper functions.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fixes, use helper functions.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* comment

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Helper function

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use helper functions to reduce amount of code.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added tests for helper functions.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fixed imports

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Comment

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Simplify code.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Stop and wait until stopped.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Imports

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Manage compaction and shipper via subservices manager.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Improve error message.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Comment.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Comment.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added unit test for Cortex initialization and module dependencies.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Comments, return errors.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Unified /ready handlers into one, that reflects state of all services.

It also uses ingester's check (if configured) to limit rate of starting
ingesters.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added //nolint:errcheck to `defer services.StopAndAwaitTerminated(...)` calls.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix http handler logic. Also renamed it.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Address review feedback.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* One more test... since Shutdown already stops Run, no need to call Stop().

We can use Stop in the test instead.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fixed integration test, old versions of Cortex didn't have /ready probe.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Make lint happy.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Mention /ready for all services.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Wrap "not running" error into promql.ErrStorage.

That makes API handler to return HTTP code 500 (server error)
instead of 422 (client error).

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Expose http port via method on HTTPService.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Print number of services in each state.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix comment and remove obsolete line.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix compile errors after rebase.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Rebased and converted data purger to a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Pass context to the bucket client.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany authored Mar 4, 2020
1 parent e4a94bc commit 31a7eb4
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 82 deletions.
62 changes: 25 additions & 37 deletions purger/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

const millisecondPerDay = int64(24 * time.Hour / time.Millisecond)
Expand Down Expand Up @@ -51,6 +52,8 @@ type workerJob struct {

// DataPurger does the purging of data which is requested to be deleted
type DataPurger struct {
services.Service

cfg Config
deleteStore *chunk.DeleteStore
chunkStore chunk.Store
Expand All @@ -67,8 +70,7 @@ type DataPurger struct {
pendingPlansCount map[string]int // per request pending plan count
pendingPlansCountMtx sync.Mutex

quit chan struct{}
wg sync.WaitGroup
wg sync.WaitGroup
}

// NewDataPurger creates a new DataPurger
Expand All @@ -82,45 +84,39 @@ func NewDataPurger(cfg Config, deleteStore *chunk.DeleteStore, chunkStore chunk.
workerJobChan: make(chan workerJob, 50),
inProcessRequestIDs: map[string]string{},
pendingPlansCount: map[string]int{},
quit: make(chan struct{}),
}

dataPurger.Service = services.NewTimerService(time.Hour, dataPurger.init, dataPurger.runOneIteration, dataPurger.stop)
return &dataPurger, nil
}

// Run keeps pulling delete requests for planning after initializing necessary things
func (dp *DataPurger) Run() {
dp.wg.Add(1)
defer dp.wg.Done()

pullDeleteRequestsToPlanDeletesTicker := time.NewTicker(time.Hour)
defer pullDeleteRequestsToPlanDeletesTicker.Stop()

for {
select {
case <-pullDeleteRequestsToPlanDeletesTicker.C:
err := dp.pullDeleteRequestsToPlanDeletes()
if err != nil {
level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err)
}
case <-dp.quit:
return
}
func (dp *DataPurger) runOneIteration(ctx context.Context) error {
err := dp.pullDeleteRequestsToPlanDeletes()
if err != nil {
level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err)
}
// Don't return error here, or Timer service will stop.
return nil
}

// Init starts workers, scheduler and then loads in process delete requests
func (dp *DataPurger) Init() error {
dp.runWorkers()
go dp.jobScheduler()
// init starts workers, scheduler and then loads in process delete requests
func (dp *DataPurger) init(ctx context.Context) error {
for i := 0; i < dp.cfg.NumWorkers; i++ {
dp.wg.Add(1)
go dp.worker()
}

dp.wg.Add(1)
go dp.jobScheduler(ctx)

return dp.loadInprocessDeleteRequests()
}

// Stop stops all background workers/loops
func (dp *DataPurger) Stop() {
close(dp.quit)
// Stop waits until all background tasks stop.
func (dp *DataPurger) stop() error {
dp.wg.Wait()
return nil
}

func (dp *DataPurger) workerJobCleanup(job workerJob) {
Expand Down Expand Up @@ -154,8 +150,7 @@ func (dp *DataPurger) workerJobCleanup(job workerJob) {
}

// we send all the delete plans to workerJobChan
func (dp *DataPurger) jobScheduler() {
dp.wg.Add(1)
func (dp *DataPurger) jobScheduler(ctx context.Context) {
defer dp.wg.Done()

for {
Expand All @@ -172,20 +167,13 @@ func (dp *DataPurger) jobScheduler() {
dp.workerJobChan <- workerJob{planNo: i, userID: req.UserID,
deleteRequestID: req.RequestID, logger: req.logger}
}
case <-dp.quit:
case <-ctx.Done():
close(dp.workerJobChan)
return
}
}
}

func (dp *DataPurger) runWorkers() {
for i := 0; i < dp.cfg.NumWorkers; i++ {
dp.wg.Add(1)
go dp.worker()
}
}

func (dp *DataPurger) worker() {
defer dp.wg.Done()

Expand Down
13 changes: 6 additions & 7 deletions purger/purger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk/testutils"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
)

const (
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestDataPurger_BuildPlan(t *testing.T) {
t.Run(fmt.Sprintf("%s/batch-size=%d", tc.name, batchSize), func(t *testing.T) {
deleteStore, chunkStore, storageClient, dataPurger := setupStoresAndPurger(t)
defer func() {
dataPurger.Stop()
dataPurger.StopAsync()
chunkStore.Stop()
}()

Expand Down Expand Up @@ -221,7 +222,7 @@ func TestDataPurger_ExecutePlan(t *testing.T) {
t.Run(fmt.Sprintf("%s/batch-size=%d", tc.name, batchSize), func(t *testing.T) {
deleteStore, chunkStore, _, dataPurger := setupStoresAndPurger(t)
defer func() {
dataPurger.Stop()
dataPurger.StopAsync()
chunkStore.Stop()
}()

Expand Down Expand Up @@ -305,7 +306,7 @@ func TestDataPurger_Restarts(t *testing.T) {
require.NoError(t, err)

// stop the existing purger
dataPurger.Stop()
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), dataPurger))

// create a new purger to check whether it picks up in process delete requests
var cfg Config
Expand All @@ -314,11 +315,9 @@ func TestDataPurger_Restarts(t *testing.T) {
require.NoError(t, err)

// load in process delete requests by calling Run
require.NoError(t, newPurger.Init())
require.NoError(t, services.StartAndAwaitRunning(context.Background(), newPurger))

defer func() {
newPurger.Stop()
}()
defer newPurger.StopAsync()

// lets wait till purger finishes execution of in process delete requests
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
Expand Down
70 changes: 32 additions & 38 deletions table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math/rand"
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log/level"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/weaveworks/common/mtime"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

const (
Expand Down Expand Up @@ -116,13 +116,15 @@ func (cfg *ProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet) {

// TableManager creates and manages the provisioned throughput on DynamoDB tables
type TableManager struct {
services.Service

client TableClient
cfg TableManagerConfig
schemaCfg SchemaConfig
maxChunkAge time.Duration
done chan struct{}
wait sync.WaitGroup
bucketClient BucketClient

bucketRetentionLoop services.Service
}

// NewTableManager makes a new TableManager
Expand All @@ -137,36 +139,36 @@ func NewTableManager(cfg TableManagerConfig, schemaCfg SchemaConfig, maxChunkAge
}
}

return &TableManager{
tm := &TableManager{
cfg: cfg,
schemaCfg: schemaCfg,
maxChunkAge: maxChunkAge,
client: tableClient,
done: make(chan struct{}),
bucketClient: objectClient,
}, nil
}

tm.Service = services.NewBasicService(tm.starting, tm.loop, tm.stopping)
return tm, nil
}

// Start the TableManager
func (m *TableManager) Start() {
m.wait.Add(1)
go m.loop()

func (m *TableManager) starting(ctx context.Context) error {
if m.bucketClient != nil && m.cfg.RetentionPeriod != 0 && m.cfg.RetentionDeletesEnabled {
m.wait.Add(1)
go m.bucketRetentionLoop()
m.bucketRetentionLoop = services.NewTimerService(bucketRetentionEnforcementInterval, nil, m.bucketRetentionIteration, nil)
return services.StartAndAwaitRunning(ctx, m.bucketRetentionLoop)
}
return nil
}

// Stop the TableManager
func (m *TableManager) Stop() {
close(m.done)
m.wait.Wait()
func (m *TableManager) stopping() error {
if m.bucketRetentionLoop != nil {
return services.StopAndAwaitTerminated(context.Background(), m.bucketRetentionLoop)
}
return nil
}

func (m *TableManager) loop() {
defer m.wait.Done()

func (m *TableManager) loop(ctx context.Context) error {
ticker := time.NewTicker(m.cfg.DynamoDBPollInterval)
defer ticker.Stop()

Expand All @@ -179,8 +181,8 @@ func (m *TableManager) loop() {
// Sleep for a bit to spread the sync load across different times if the tablemanagers are all started at once.
select {
case <-time.After(time.Duration(rand.Int63n(int64(m.cfg.DynamoDBPollInterval)))):
case <-m.done:
return
case <-ctx.Done():
return nil
}

for {
Expand All @@ -191,30 +193,22 @@ func (m *TableManager) loop() {
}); err != nil {
level.Error(util.Logger).Log("msg", "error syncing tables", "err", err)
}
case <-m.done:
return
case <-ctx.Done():
return nil
}
}
}

func (m *TableManager) bucketRetentionLoop() {
defer m.wait.Done()

ticker := time.NewTicker(bucketRetentionEnforcementInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
err := m.bucketClient.DeleteChunksBefore(context.Background(), mtime.Now().Add(-m.cfg.RetentionPeriod))
// single iteration of bucket retention loop
func (m *TableManager) bucketRetentionIteration(ctx context.Context) error {
err := m.bucketClient.DeleteChunksBefore(ctx, mtime.Now().Add(-m.cfg.RetentionPeriod))

if err != nil {
level.Error(util.Logger).Log("msg", "error enforcing filesystem retention", "err", err)
}
case <-m.done:
return
}
if err != nil {
level.Error(util.Logger).Log("msg", "error enforcing filesystem retention", "err", err)
}

// don't return error, otherwise timer service would stop.
return nil
}

// SyncTables will calculate the tables expected to exist, create those that do
Expand Down

0 comments on commit 31a7eb4

Please sign in to comment.