Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Add ingester handler for shutdown and forget tokens #6179

Merged
merged 3 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields
* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target
* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail
Expand Down
26 changes: 24 additions & 2 deletions docs/sources/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ These endpoints are exposed by the distributor:
These endpoints are exposed by the ingester:

- [`POST /flush`](#post-flush)
- [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown)
- **Deprecated** [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown)
- [`POST /ingester/shutdown`](#post-ingestershutdown)

The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably.

Expand Down Expand Up @@ -807,13 +808,34 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester.

## `POST /ingester/flush_shutdown`

**Deprecated**: Please use `/ingester/shutdown?flush=true` instead.

`/ingester/flush_shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds.
This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned,
but instead flushed to our chunk backend.

In microservices mode, the `/ingester/flush_shutdown` endpoint is exposed by the ingester.

### `GET /distributor/ring`
## `POST /ingester/shutdown`

`/ingester/shutdown` is similar to the [`/ingester/flush_shutdown`](#post-ingesterflush_shutdown)
endpoint, but accepts three URL query parameters `flush`, `delete_ring_tokens`, and `terminate`.

**URL query parameters:**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a POST, curious why we opt for url params intead of a request body?

Copy link
Contributor Author

@chaudum chaudum Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Query params are I guess easier to write when you execute the request using curl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we do not follow any strict, restful HTTP API design guides in our endpoint design.


* `flush=<bool>`:
Flag to control whether to flush any in-memory chunks the ingester holds. Defaults to `true`.
* `delete_ring_tokens=<bool>`:
Flag to control whether to delete the file that contains the ingester ring tokens of the instance if the `-ingester.token-file-path` is specified.
* `terminate=<bool>`:
Flag to control whether to terminate the Loki process after service shutdown. Defaults to `true`.

This handler, in contrast to the `/ingester/flush_shutdown` handler, terminates the Loki process by default.
This behaviour can be changed by setting the `terminate` query parameter to `false`.

In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester.

## `GET /distributor/ring`

Displays a web page with the distributor hash ring status, including the state, healthy and last heartbeat time of each distributor.

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1039,8 +1039,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM=
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca h1:0qHzm6VS0bCsSWKHuyfpt+pdpyScdZbzY/IFIyKSYOk=
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca/go.mod h1:q51XdMLLHNZJSG6KOGujC20ed2OoLFdx0hBmOEVfRs0=
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 h1:mZluMeUp1vLHKb1nSrMnA0mfupSpBeUkZqDDpfHabrQ=
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
Expand Down
65 changes: 62 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
Expand Down Expand Up @@ -172,8 +173,10 @@ type Interface interface {
logproto.QuerierServer
CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
GetOrCreateInstance(instanceID string) (*instance, error)
// deprecated
LegacyShutdownHandler(w http.ResponseWriter, r *http.Request)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -209,6 +212,10 @@ type Ingester struct {
// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch
// Flag for whether stopping the ingester service should also terminate the
// loki process.
// This is set when calling the shutdown handler.
terminateOnShutdown bool

// Only used by WAL & flusher to coordinate backpressure during replay.
replayController *replayController
Expand Down Expand Up @@ -245,6 +252,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
tailersQuit: make(chan struct{}),
metrics: metrics,
flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
}
i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})

Expand Down Expand Up @@ -506,6 +514,12 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()

// In case the flag to terminate on shutdown is set we need to mark the
// ingester service as "failed", so Loki will shut down entirely.
// The module manager logs the failure `modules.ErrStopProcess` in a special way.
if i.terminateOnShutdown && errs.Err() == nil {
return modules.ErrStopProcess
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just rely on nil error? We can return internal server error from ShutdownAndForgetHandler handler when error is non-nil or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because we need to be able to distinguish between stopping of the ingester service that happened through the signal handler, and stopping of the service that was triggered by the shutdown handler. In case of the former, the error will be nil as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In both cases, we are returning w.WriteHeader(http.StatusNoContent) so I don't see any special handling in ShutdownAndForgetHandler for either of the cases. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what you mean, but the returned error modules.ErrStopProcess is not only used by the shutdown handler, but also by the service manager in the Run() function, which determines whether to stop Loki when one of its services failed.

level.Info(util_log.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase())

}
return errs.Err()
}

Expand All @@ -526,10 +540,16 @@ func (i *Ingester) loop() {
}
}

// ShutdownHandler triggers the following set of operations in order:
// LegacyShutdownHandler triggers the following set of operations in order:
// * Change the state of ring to stop accepting writes.
// * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
// Note: This handler does not trigger a termination of the Loki process,
// despite its name. Instead, the ingester service is stopped, so an external
// source can trigger a safe termination through a signal to the process.
// The handler is deprecated and usage is discouraged. Use ShutdownHandler
// instead.
func (i *Ingester) LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) {
level.Warn(util_log.Logger).Log("msg", "The handler /ingester/flush_shutdown is deprecated and usage is discouraged. Please use /ingester/shutdown?flush=true instead.")
originalState := i.lifecycler.FlushOnShutdown()
// We want to flush the chunks if transfer fails irrespective of original flag.
i.lifecycler.SetFlushOnShutdown(true)
Expand All @@ -538,6 +558,45 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// ShutdownHandler handles a graceful shutdown of the ingester service and
// termination of the Loki process.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
// Don't allow calling the shutdown handler multiple times
if i.State() != services.Running {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("Ingester is stopping or already stopped."))
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
return
}
params := r.URL.Query()
doFlush := util.FlagFromValues(params, "flush", true)
doDeleteRingTokens := util.FlagFromValues(params, "delete_ring_tokens", false)
doTerminate := util.FlagFromValues(params, "terminate", true)
err := i.handleShutdown(doTerminate, doFlush, doDeleteRingTokens)

// Stopping the module will return the modules.ErrStopProcess error. This is
// needed so the Loki process is shut down completely.
if err == nil || err == modules.ErrStopProcess {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
}
}

// handleShutdown triggers the following operations:
// * Change the state of ring to stop accepting writes.
// * optional: Flush all the chunks.
// * optional: Delete ring tokens file
// * Unregister from KV store
// * optional: Terminate process (handled by service manager in loki.go)
func (i *Ingester) handleShutdown(terminate, flush, del bool) error {
i.lifecycler.SetFlushOnShutdown(flush)
i.lifecycler.SetClearTokensOnShutdown(del)
i.lifecycler.SetUnregisterOnShutdown(true)
i.terminateOnShutdown = terminate
return services.StopAndAwaitTerminated(context.Background(), i)
}

// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := tenant.TenantID(ctx)
Expand Down
12 changes: 9 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,15 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
)
t.Server.HTTP.Path("/flush").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)))
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)))

t.Server.HTTP.Methods("GET", "POST").Path("/flush").Handler(
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)),
)
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.LegacyShutdownHandler)),
)
t.Server.HTTP.Methods("POST").Path("/ingester/shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)),
)
return t.Ingester, nil
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"html/template"
"io"
"net/http"
"net/url"
"strings"

"github.com/go-kit/log"
Expand Down Expand Up @@ -285,3 +286,14 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi
}
return nil
}

func FlagFromValues(values url.Values, key string, d bool) bool {
switch strings.ToLower(values.Get(key)) {
case "t", "true", "1":
return true
case "f", "false", "0":
return false
default:
return d
}
}
Loading