From fee076fc9f97a89e6ebba0beb0efc0b76de698ee Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 17 May 2022 16:04:45 +0200 Subject: [PATCH 1/3] Update grafana/dskit to 07166f9 Signed-off-by: Christian Haudum --- go.mod | 2 +- go.sum | 4 +- .../kv/memberlist/http_status_handler.go | 219 ++++++++++++++++++ .../dskit/kv/memberlist/kv_init_service.go | 197 +--------------- .../dskit/kv/memberlist/memberlist_client.go | 67 +++--- .../grafana/dskit/kv/memberlist/status.gohtml | 143 ++++++------ .../grafana/dskit/ring/basic_lifecycler.go | 9 +- vendor/github.com/grafana/dskit/ring/http.go | 2 +- .../grafana/dskit/ring/lifecycler.go | 78 ++++--- .../grafana/dskit/ring/lifecycler_metrics.go | 12 - vendor/github.com/grafana/dskit/ring/ring.go | 40 +--- .../grafana/dskit/services/basic_service.go | 2 +- .../grafana/dskit/spanlogger/spanlogger.go | 4 +- vendor/modules.txt | 2 +- 14 files changed, 397 insertions(+), 384 deletions(-) create mode 100644 vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go diff --git a/go.mod b/go.mod index 025cd165a392..0e9d70364eef 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 - github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca + github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 diff --git a/go.sum b/go.sum index d0a1d0bbdd46..76d6d459b1d0 100644 --- a/go.sum +++ b/go.sum @@ -1039,8 +1039,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= -github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca h1:0qHzm6VS0bCsSWKHuyfpt+pdpyScdZbzY/IFIyKSYOk= -github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca/go.mod h1:q51XdMLLHNZJSG6KOGujC20ed2OoLFdx0hBmOEVfRs0= +github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 h1:mZluMeUp1vLHKb1nSrMnA0mfupSpBeUkZqDDpfHabrQ= +github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go b/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go new file mode 100644 index 000000000000..3ecd83e387b6 --- /dev/null +++ b/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go @@ -0,0 +1,219 @@ +package memberlist + +import ( + _ "embed" + "encoding/json" + "fmt" + "html/template" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/hashicorp/memberlist" +) + +// HTTPStatusHandler is a http.Handler with status information about memberlist. +type HTTPStatusHandler struct { + kvs *KVInitService + tpl *template.Template +} + +// StatusPageData represents the data passed to the template rendered by HTTPStatusHandler +type StatusPageData struct { + Now time.Time + Memberlist *memberlist.Memberlist + SortedMembers []*memberlist.Node + Store map[string]ValueDesc + MessageHistoryBufferBytes int + SentMessages []Message + ReceivedMessages []Message +} + +// NewHTTPStatusHandler creates a new HTTPStatusHandler that will render the provided template using the data from StatusPageData. +func NewHTTPStatusHandler(kvs *KVInitService, tpl *template.Template) HTTPStatusHandler { + return HTTPStatusHandler{kvs, tpl} +} + +func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + kv := h.kvs.getKV() + if kv == nil { + w.Header().Set("Content-Type", "text/plain") + // Ignore inactionable errors. + _, _ = w.Write([]byte("This instance doesn't use memberlist.")) + return + } + + const ( + downloadKeyParam = "downloadKey" + viewKeyParam = "viewKey" + viewMsgParam = "viewMsg" + deleteMessagesParam = "deleteMessages" + ) + + if err := req.ParseForm(); err == nil { + if req.Form[downloadKeyParam] != nil { + downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. + return + } + + if req.Form[viewKeyParam] != nil { + viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) + return + } + + if req.Form[viewMsgParam] != nil { + msgID, err := strconv.Atoi(req.Form[viewMsgParam][0]) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + sent, received := kv.getSentAndReceivedMessages() + + for _, m := range append(sent, received...) { + if m.ID == msgID { + viewMessage(w, kv, m, getFormat(req)) + return + } + } + + http.Error(w, "message not found", http.StatusNotFound) + return + } + + if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" { + kv.deleteSentReceivedMessages() + + // Redirect back. + w.Header().Set("Location", "?"+deleteMessagesParam+"=false") + w.WriteHeader(http.StatusFound) + return + } + } + + members := kv.memberlist.Members() + sort.Slice(members, func(i, j int) bool { + return members[i].Name < members[j].Name + }) + + sent, received := kv.getSentAndReceivedMessages() + + v := StatusPageData{ + Now: time.Now(), + Memberlist: kv.memberlist, + SortedMembers: members, + Store: kv.storeCopy(), + MessageHistoryBufferBytes: kv.cfg.MessageHistoryBufferBytes, + SentMessages: sent, + ReceivedMessages: received, + } + + accept := req.Header.Get("Accept") + if strings.Contains(accept, "application/json") { + w.Header().Set("Content-Type", "application/json") + + if err := json.NewEncoder(w).Encode(v); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + return + } + + w.Header().Set("Content-Type", "text/html") + if err := h.tpl.Execute(w, v); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func getFormat(req *http.Request) string { + const viewFormat = "format" + + format := "" + if len(req.Form[viewFormat]) > 0 { + format = req.Form[viewFormat][0] + } + return format +} + +func viewMessage(w http.ResponseWriter, kv *KV, msg Message, format string) { + c := kv.GetCodec(msg.Pair.Codec) + if c == nil { + http.Error(w, "codec not found", http.StatusNotFound) + return + } + + val, err := c.Decode(msg.Pair.Value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) + return + } + + formatValue(w, val, format) +} + +func viewKey(w http.ResponseWriter, store map[string]ValueDesc, key string, format string) { + if store[key].value == nil { + http.Error(w, "value not found", http.StatusNotFound) + return + } + + formatValue(w, store[key].value, format) +} + +func formatValue(w http.ResponseWriter, val interface{}, format string) { + w.WriteHeader(200) + w.Header().Add("content-type", "text/plain") + + switch format { + case "json", "json-pretty": + enc := json.NewEncoder(w) + if format == "json-pretty" { + enc.SetIndent("", " ") + } + + err := enc.Encode(val) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + default: + _, _ = fmt.Fprintf(w, "%#v", val) + } +} + +func downloadKey(w http.ResponseWriter, kv *KV, store map[string]ValueDesc, key string) { + if store[key].value == nil { + http.Error(w, "value not found", http.StatusNotFound) + return + } + + val := store[key] + + c := kv.GetCodec(store[key].CodecID) + if c == nil { + http.Error(w, "codec not found", http.StatusNotFound) + return + } + + encoded, err := c.Encode(val.value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Add("content-type", "application/octet-stream") + // Set content-length so that client knows whether it has received full response or not. + w.Header().Add("content-length", strconv.Itoa(len(encoded))) + w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.Version, key)) + w.WriteHeader(200) + + // Ignore errors, we cannot do anything about them. + _, _ = w.Write(encoded) +} + +//go:embed status.gohtml +var defaultPageContent string +var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ + "StringsJoin": strings.Join, +}).Parse(defaultPageContent)) diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go b/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go index 1a8313cded15..5b505a54882d 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go @@ -2,19 +2,10 @@ package memberlist import ( "context" - _ "embed" - "encoding/json" - "fmt" - "html/template" "net/http" - "sort" - "strconv" - "strings" "sync" - "time" "github.com/go-kit/log" - "github.com/hashicorp/memberlist" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -95,191 +86,5 @@ func (kvs *KVInitService) stopping(_ error) error { } func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) { - kv := kvs.getKV() - if kv == nil { - w.Header().Set("Content-Type", "text/plain") - // Ignore inactionable errors. - _, _ = w.Write([]byte("This instance doesn't use memberlist.")) - return - } - - const ( - downloadKeyParam = "downloadKey" - viewKeyParam = "viewKey" - viewMsgParam = "viewMsg" - deleteMessagesParam = "deleteMessages" - ) - - if err := req.ParseForm(); err == nil { - if req.Form[downloadKeyParam] != nil { - downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. - return - } - - if req.Form[viewKeyParam] != nil { - viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) - return - } - - if req.Form[viewMsgParam] != nil { - msgID, err := strconv.Atoi(req.Form[viewMsgParam][0]) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - sent, received := kv.getSentAndReceivedMessages() - - for _, m := range append(sent, received...) { - if m.ID == msgID { - viewMessage(w, kv, m, getFormat(req)) - return - } - } - - http.Error(w, "message not found", http.StatusNotFound) - return - } - - if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" { - kv.deleteSentReceivedMessages() - - // Redirect back. - w.Header().Set("Location", "?"+deleteMessagesParam+"=false") - w.WriteHeader(http.StatusFound) - return - } - } - - members := kv.memberlist.Members() - sort.Slice(members, func(i, j int) bool { - return members[i].Name < members[j].Name - }) - - sent, received := kv.getSentAndReceivedMessages() - - v := pageData{ - Now: time.Now(), - Memberlist: kv.memberlist, - SortedMembers: members, - Store: kv.storeCopy(), - SentMessages: sent, - ReceivedMessages: received, - } - - accept := req.Header.Get("Accept") - if strings.Contains(accept, "application/json") { - w.Header().Set("Content-Type", "application/json") - - if err := json.NewEncoder(w).Encode(v); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - - w.Header().Set("Content-Type", "text/html") - if err := defaultPageTemplate.Execute(w, v); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func getFormat(req *http.Request) string { - const viewFormat = "format" - - format := "" - if len(req.Form[viewFormat]) > 0 { - format = req.Form[viewFormat][0] - } - return format -} - -func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) { - c := kv.GetCodec(msg.Pair.Codec) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - val, err := c.Decode(msg.Pair.Value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) - return - } - - formatValue(w, val, format) -} - -func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) { - if store[key].value == nil { - http.Error(w, "value not found", http.StatusNotFound) - return - } - - formatValue(w, store[key].value, format) -} - -func formatValue(w http.ResponseWriter, val interface{}, format string) { - w.WriteHeader(200) - w.Header().Add("content-type", "text/plain") - - switch format { - case "json", "json-pretty": - enc := json.NewEncoder(w) - if format == "json-pretty" { - enc.SetIndent("", " ") - } - - err := enc.Encode(val) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - - default: - _, _ = fmt.Fprintf(w, "%#v", val) - } -} - -func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) { - if store[key].value == nil { - http.Error(w, "value not found", http.StatusNotFound) - return - } - - val := store[key] - - c := kv.GetCodec(store[key].codecID) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - encoded, err := c.Encode(val.value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) - return - } - - w.Header().Add("content-type", "application/octet-stream") - // Set content-length so that client knows whether it has received full response or not. - w.Header().Add("content-length", strconv.Itoa(len(encoded))) - w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key)) - w.WriteHeader(200) - - // Ignore errors, we cannot do anything about them. - _, _ = w.Write(encoded) + NewHTTPStatusHandler(kvs, defaultPageTemplate).ServeHTTP(w, req) } - -type pageData struct { - Now time.Time - Memberlist *memberlist.Memberlist - SortedMembers []*memberlist.Node - Store map[string]valueDesc - SentMessages []message - ReceivedMessages []message -} - -//go:embed status.gohtml -var defaultPageContent string -var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ - "StringsJoin": strings.Join, -}).Parse(defaultPageContent)) diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index 30f0992d3521..23c40ac764f4 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -232,7 +232,7 @@ type KV struct { // KV Store. storeMu sync.Mutex - store map[string]valueDesc + store map[string]ValueDesc // Codec registry codecs map[string]codec.Codec @@ -245,9 +245,9 @@ type KV struct { // Buffers with sent and received messages. Used for troubleshooting only. // New messages are appended, old messages (based on configured size limit) removed from the front. messagesMu sync.Mutex - sentMessages []message + sentMessages []Message sentMessagesSize int - receivedMessages []message + receivedMessages []Message receivedMessagesSize int messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI. @@ -285,7 +285,7 @@ type KV struct { // Message describes incoming or outgoing message, and local state after applying incoming message, or state when sending message. // Fields are exported for templating to work. -type message struct { +type Message struct { ID int // Unique local ID of the message. Time time.Time // Time when message was sent or received. Size int // Message size @@ -296,21 +296,22 @@ type message struct { Changes []string // List of changes in this message (as computed by *this* node). } -type valueDesc struct { +// ValueDesc stores the value along with it's codec and local version. +type ValueDesc struct { // We store the decoded value here to prevent decoding the entire state for every // update we receive. Whilst the updates are small and fast to decode, // the total state can be quite large. // The CAS function is passed a deep copy because it modifies in-place. value Mergeable - // version (local only) is used to keep track of what we're gossiping about, and invalidate old messages - version uint + // Version (local only) is used to keep track of what we're gossiping about, and invalidate old messages. + Version uint // ID of codec used to write this value. Only used when sending full state. - codecID string + CodecID string } -func (v valueDesc) Clone() (result valueDesc) { +func (v ValueDesc) Clone() (result ValueDesc) { result = v if v.value != nil { result.value = v.value.Clone() @@ -318,8 +319,8 @@ func (v valueDesc) Clone() (result valueDesc) { return } -func (v valueDesc) String() string { - return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID) +func (v ValueDesc) String() string { + return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID) } var ( @@ -343,7 +344,7 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer registerer: registerer, provider: dnsProvider, - store: make(map[string]valueDesc), + store: make(map[string]ValueDesc), codecs: make(map[string]codec.Codec), watchers: make(map[string][]chan string), prefixWatchers: make(map[string][]chan string), @@ -642,7 +643,7 @@ func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, _, _ = v.value.RemoveTombstones(time.Time{}) } - return v.value, v.version, nil + return v.value, v.Version, nil } // WatchKey watches for value changes for given key. When value changes, 'f' function is called with the @@ -909,7 +910,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - m.addSentMessage(message{ + m.addSentMessage(Message{ Time: time.Now(), Size: len(pairData), Pair: kvPair, @@ -964,7 +965,7 @@ func (m *KV) NotifyMsg(msg []byte) { changes = mod.MergeContent() } - m.addReceivedMessage(message{ + m.addReceivedMessage(Message{ Time: time.Now(), Size: len(msg), Pair: kvPair, @@ -1033,9 +1034,9 @@ func (m *KV) LocalState(join bool) []byte { continue } - codec := m.GetCodec(val.codecID) + codec := m.GetCodec(val.CodecID) if codec == nil { - level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key) + level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.CodecID, "key", key) continue } @@ -1048,7 +1049,7 @@ func (m *KV) LocalState(join bool) []byte { kvPair.Reset() kvPair.Key = key kvPair.Value = encoded - kvPair.Codec = val.codecID + kvPair.Codec = val.CodecID ser, err := kvPair.Marshal() if err != nil { @@ -1068,11 +1069,11 @@ func (m *KV) LocalState(join bool) []byte { } buf.Write(ser) - m.addSentMessage(message{ + m.addSentMessage(Message{ Time: sent, Size: len(ser), Pair: kvPair, // Makes a copy of kvPair. - Version: val.version, + Version: val.Version, }) } @@ -1136,7 +1137,7 @@ func (m *KV) MergeRemoteState(data []byte, join bool) { changes = change.MergeContent() } - m.addReceivedMessage(message{ + m.addReceivedMessage(Message{ Time: received, Size: int(kvPairLength), Pair: kvPair, // Makes a copy of kvPair. @@ -1184,7 +1185,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui // the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS). curr := m.store[key] // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. - if casVersion > 0 && curr.version != casVersion { + if casVersion > 0 && curr.Version != casVersion { return nil, 0, errVersionMismatch } result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0) @@ -1215,11 +1216,11 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui } } - newVersion := curr.version + 1 - m.store[key] = valueDesc{ + newVersion := curr.Version + 1 + m.store[key] = ValueDesc{ value: result, - version: newVersion, - codecID: codec.CodecID(), + Version: newVersion, + CodecID: codec.CodecID(), } // The "changes" returned by Merge() can contain references to the "result" @@ -1240,17 +1241,17 @@ func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, return oldVal, change, err } -func (m *KV) storeCopy() map[string]valueDesc { +func (m *KV) storeCopy() map[string]ValueDesc { m.storeMu.Lock() defer m.storeMu.Unlock() - result := make(map[string]valueDesc, len(m.store)) + result := make(map[string]ValueDesc, len(m.store)) for k, v := range m.store { result[k] = v.Clone() } return result } -func (m *KV) addReceivedMessage(msg message) { +func (m *KV) addReceivedMessage(msg Message) { if m.cfg.MessageHistoryBufferBytes == 0 { return } @@ -1264,7 +1265,7 @@ func (m *KV) addReceivedMessage(msg message) { m.receivedMessages, m.receivedMessagesSize = addMessageToBuffer(m.receivedMessages, m.receivedMessagesSize, m.cfg.MessageHistoryBufferBytes, msg) } -func (m *KV) addSentMessage(msg message) { +func (m *KV) addSentMessage(msg Message) { if m.cfg.MessageHistoryBufferBytes == 0 { return } @@ -1278,12 +1279,12 @@ func (m *KV) addSentMessage(msg message) { m.sentMessages, m.sentMessagesSize = addMessageToBuffer(m.sentMessages, m.sentMessagesSize, m.cfg.MessageHistoryBufferBytes, msg) } -func (m *KV) getSentAndReceivedMessages() (sent, received []message) { +func (m *KV) getSentAndReceivedMessages() (sent, received []Message) { m.messagesMu.Lock() defer m.messagesMu.Unlock() // Make copy of both slices. - return append([]message(nil), m.sentMessages...), append([]message(nil), m.receivedMessages...) + return append([]Message(nil), m.sentMessages...), append([]Message(nil), m.receivedMessages...) } func (m *KV) deleteSentReceivedMessages() { @@ -1296,7 +1297,7 @@ func (m *KV) deleteSentReceivedMessages() { m.receivedMessagesSize = 0 } -func addMessageToBuffer(msgs []message, size int, limit int, msg message) ([]message, int) { +func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Message, int) { msgs = append(msgs, msg) size += msg.Size diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml b/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml index 3ab6d0936374..6f845b6e0603 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml +++ b/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml @@ -1,4 +1,4 @@ -{{- /*gotype: github.com/grafana/dskit/kv/memberlist.statusPageData */ -}} +{{- /*gotype: github.com/grafana/dskit/kv/memberlist.StatusPageData */ -}} @@ -20,7 +20,8 @@ Key - Value Details + Codec + Version Actions @@ -29,7 +30,8 @@ {{ range $k, $v := .Store }} {{ $k }} - {{ $v }} + {{ $v.CodecID }} + {{ $v.Version }} json | json-pretty @@ -68,76 +70,83 @@

State: 0 = Alive, 1 = Suspect, 2 = Dead, 3 = Left

-

Received Messages

+

Message History

-Delete All Messages (received and sent) +{{ if .MessageHistoryBufferBytes }} - - - - - - - - - - - - +

Received Messages

- - {{ range .ReceivedMessages }} + Delete All Messages (received and sent) + +
IDTimeKeyValue in the MessageVersion After Update (0 = no change)ChangesActions
+ - - - - - - - + + + + + + + - {{ end }} - -
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} - json - | json-pretty - | struct - IDTimeKeyValue in the MessageVersion After Update (0 = no change)ChangesActions
- -

Sent Messages

- -Delete All Messages (received and sent) - - - - - - - - - - - - - - - - {{ range .SentMessages }} + + + + {{ range .ReceivedMessages }} + + + + + + + + + + {{ end }} + +
IDTimeKeyValueVersionChangesActions
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} + json + | json-pretty + | struct +
+ +

Sent Messages

+ + Delete All Messages (received and sent) + + + - - - - - - - + + + + + + + - {{ end }} - -
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} - json - | json-pretty - | struct - IDTimeKeyValueVersionChangesActions
+ + + + {{ range .SentMessages }} + + {{ .ID }} + {{ .Time.Format "15:04:05.000" }} + {{ .Pair.Key }} + size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }} + {{ .Version }} + {{ StringsJoin .Changes ", " }} + + json + | json-pretty + | struct + + + {{ end }} + + +{{ else }} +

Message history buffer is disabled, refer to the configuration to enable it in order to troubleshoot the message history.

+{{ end }} \ No newline at end of file diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 32775c98291c..1bb95c08370f 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -405,8 +405,13 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc, // This could happen if the backend store restarted (and content deleted) // or the instance has been forgotten. In this case, we do re-insert it. if !ok { - level.Warn(l.logger).Log("msg", "instance missing in the ring, adding it back", "ring", l.ringName) - instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), l.GetRegisteredAt()) + level.Warn(l.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", l.ringName) + + // Due to how shuffle sharding work, the missing instance for some period of time could have cause + // a resharding of tenants among instances: to guarantee query correctness we need to update the + // registration timestamp to current time. + registeredAt := time.Now() + instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt) } prevTimestamp := instanceDesc.Timestamp diff --git a/vendor/github.com/grafana/dskit/ring/http.go b/vendor/github.com/grafana/dskit/ring/http.go index 18a56177cbf2..bcf3d1cc89ab 100644 --- a/vendor/github.com/grafana/dskit/ring/http.go +++ b/vendor/github.com/grafana/dskit/ring/http.go @@ -93,7 +93,7 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - _, ownedTokens := ringDesc.countTokens() + ownedTokens := ringDesc.countTokens() var ingesterIDs []string for id := range ringDesc.Ingesters { diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 602e1fdb58af..2479ad03c8e4 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" - perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -110,8 +109,9 @@ type Lifecycler struct { Zone string // Whether to flush if transfer fails on shutdown. - flushOnShutdown *atomic.Bool - unregisterOnShutdown *atomic.Bool + flushOnShutdown *atomic.Bool + unregisterOnShutdown *atomic.Bool + clearTokensOnShutdown *atomic.Bool // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. @@ -160,23 +160,22 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa } l := &Lifecycler{ - cfg: cfg, - flushTransferer: flushTransferer, - KVStore: store, - Addr: fmt.Sprintf("%s:%d", addr, port), - ID: cfg.ID, - RingName: ringName, - RingKey: ringKey, - flushOnShutdown: atomic.NewBool(flushOnShutdown), - unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), - Zone: cfg.Zone, - actorChan: make(chan func()), - state: PENDING, - lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), - logger: logger, - } - - l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens)) + cfg: cfg, + flushTransferer: flushTransferer, + KVStore: store, + Addr: fmt.Sprintf("%s:%d", addr, port), + ID: cfg.ID, + RingName: ringName, + RingKey: ringKey, + flushOnShutdown: atomic.NewBool(flushOnShutdown), + unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), + clearTokensOnShutdown: atomic.NewBool(false), + Zone: cfg.Zone, + actorChan: make(chan func()), + state: PENDING, + lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), + logger: logger, + } l.BasicService = services. NewBasicService(nil, l.loop, l.stopping). @@ -304,8 +303,6 @@ func (i *Lifecycler) getTokens() Tokens { } func (i *Lifecycler) setTokens(tokens Tokens) { - i.lifecyclerMetrics.tokensOwned.Set(float64(len(tokens))) - i.stateMtx.Lock() defer i.stateMtx.Unlock() @@ -397,7 +394,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { // First, see if we exist in the cluster, update our state to match if we do, // and add ourselves (without tokens) if we don't. if err := i.initRing(context.Background()); err != nil { - return perrors.Wrapf(err, "failed to join the ring %s", i.RingName) + return errors.Wrapf(err, "failed to join the ring %s", i.RingName) } // We do various period tasks @@ -420,14 +417,14 @@ func (i *Lifecycler) loop(ctx context.Context) error { // let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING // ingesters, but we also signal that it is not fully functional yet. if err := i.autoJoin(context.Background(), JOINING); err != nil { - return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName) observeChan = time.After(i.cfg.ObservePeriod) } else { if err := i.autoJoin(context.Background(), ACTIVE); err != nil { - return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } } } @@ -514,11 +511,18 @@ heartbeatLoop: if i.ShouldUnregisterOnShutdown() { if err := i.unregister(context.Background()); err != nil { - return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "instance removed from the KV store", "ring", i.RingName) } + if i.cfg.TokensFilePath != "" && i.ClearTokensOnShutdown() { + if err := os.Remove(i.cfg.TokensFilePath); err != nil { + return errors.Wrapf(err, "failed to delete tokens file %s", i.cfg.TokensFilePath) + } + level.Info(i.logger).Log("msg", "removed tokens file from disk", "path", i.cfg.TokensFilePath) + } + return nil } @@ -738,9 +742,13 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error { } instanceDesc, ok := ringDesc.Ingesters[i.ID] + if !ok { - // consul must have restarted - level.Info(i.logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName) + // If the instance is missing in the ring, we need to add it back. However, due to how shuffle sharding work, + // the missing instance for some period of time could have cause a resharding of tenants among instances: + // to guarantee query correctness we need to update the registration timestamp to current time. + level.Info(i.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", i.RingName) + i.setRegisteredAt(time.Now()) ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt()) } else { instanceDesc.Timestamp = time.Now().Unix() @@ -825,8 +833,20 @@ func (i *Lifecycler) SetUnregisterOnShutdown(enabled bool) { i.unregisterOnShutdown.Store(enabled) } +// ClearTokensOnShutdown returns if persisted tokens should be cleared on shutdown. +func (i *Lifecycler) ClearTokensOnShutdown() bool { + return i.clearTokensOnShutdown.Load() +} + +// SetClearTokensOnShutdown enables/disables deletions of tokens on shutdown. +// Set to `true` in case one wants to clear tokens on shutdown which are +// otherwise persisted, e.g. useful in custom shutdown handlers. +func (i *Lifecycler) SetClearTokensOnShutdown(enabled bool) { + i.clearTokensOnShutdown.Store(enabled) +} + func (i *Lifecycler) processShutdown(ctx context.Context) { - flushRequired := i.flushOnShutdown.Load() + flushRequired := i.FlushOnShutdown() transferStart := time.Now() if err := i.flushTransferer.TransferOut(ctx); err != nil { if err == ErrTransferDisabled { diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go index 422a564c18b7..fe29cdfd5fc8 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go @@ -7,8 +7,6 @@ import ( type LifecyclerMetrics struct { consulHeartbeats prometheus.Counter - tokensOwned prometheus.Gauge - tokensToOwn prometheus.Gauge shutdownDuration *prometheus.HistogramVec } @@ -19,16 +17,6 @@ func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *Lifecycle Help: "The total number of heartbeats sent to consul.", ConstLabels: prometheus.Labels{"name": ringName}, }), - tokensOwned: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "member_ring_tokens_owned", - Help: "The number of tokens owned in the ring.", - ConstLabels: prometheus.Labels{"name": ringName}, - }), - tokensToOwn: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "member_ring_tokens_to_own", - Help: "The number of tokens to own in the ring.", - ConstLabels: prometheus.Labels{"name": ringName}, - }), shutdownDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "shutdown_duration_seconds", Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).", diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 6c7e4a49fc0d..be78fee59804 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -183,12 +183,9 @@ type Ring struct { // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring - memberOwnershipGaugeVec *prometheus.GaugeVec numMembersGaugeVec *prometheus.GaugeVec totalTokensGauge prometheus.Gauge - numTokensGaugeVec *prometheus.GaugeVec oldestTimestampGaugeVec *prometheus.GaugeVec - reportedOwners map[string]struct{} logger log.Logger } @@ -227,11 +224,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client strategy: strategy, ringDesc: &Desc{}, shuffledSubringCache: map[subringCacheKey]*Ring{}, - memberOwnershipGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "ring_member_ownership_percent", - Help: "The percent ownership of the ring by member", - ConstLabels: map[string]string{"name": name}}, - []string{"member"}), numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "ring_members", Help: "Number of members in the ring", @@ -241,11 +233,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client Name: "ring_tokens_total", Help: "Number of tokens in the ring", ConstLabels: map[string]string{"name": name}}), - numTokensGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "ring_tokens_owned", - Help: "The number of tokens in the ring owned by the member", - ConstLabels: map[string]string{"name": name}}, - []string{"member"}), oldestTimestampGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "ring_oldest_member_timestamp", Help: "Timestamp of the oldest member in the ring.", @@ -514,12 +501,10 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro }, nil } -// countTokens returns the number of tokens and tokens within the range for each instance. -func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { +// countTokens returns the number tokens within the range for each instance. +func (r *Desc) countTokens() map[string]uint32 { var ( - owned = map[string]uint32{} - numTokens = map[string]uint32{} - + owned = map[string]uint32{} ringTokens = r.GetTokens() ringInstanceByToken = r.getTokensInfo() ) @@ -535,7 +520,6 @@ func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { } info := ringInstanceByToken[token] - numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 owned[info.InstanceID] = owned[info.InstanceID] + diff } @@ -543,11 +527,10 @@ func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { for id := range r.Ingesters { if _, ok := owned[id]; !ok { owned[id] = 0 - numTokens[id] = 0 } } - return numTokens, owned + return owned } // updateRingMetrics updates ring metrics. Caller must be holding the Write lock! @@ -587,21 +570,6 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { return } - prevOwners := r.reportedOwners - r.reportedOwners = make(map[string]struct{}) - numTokens, ownedRange := r.ringDesc.countTokens() - for id, totalOwned := range ownedRange { - r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) - r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) - delete(prevOwners, id) - r.reportedOwners[id] = struct{}{} - } - - for k := range prevOwners { - r.memberOwnershipGaugeVec.DeleteLabelValues(k) - r.numTokensGaugeVec.DeleteLabelValues(k) - } - r.totalTokensGauge.Set(float64(len(r.ringTokens))) } diff --git a/vendor/github.com/grafana/dskit/services/basic_service.go b/vendor/github.com/grafana/dskit/services/basic_service.go index ead611a3f97c..6ced33aabf92 100644 --- a/vendor/github.com/grafana/dskit/services/basic_service.go +++ b/vendor/github.com/grafana/dskit/services/basic_service.go @@ -15,7 +15,7 @@ import ( type StartingFn func(serviceContext context.Context) error // RunningFn function is called when service enters Running state. When it returns, service will move to Stopping state. -// If RunningFn or Stopping return error, Service will end in Failed state, otherwise if both functions return without +// If RunningFn or StoppingFn return error, Service will end in Failed state, otherwise if both functions return without // error, service will end in Terminated state. type RunningFn func(serviceContext context.Context) error diff --git a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go index 91876a728157..a639460bbe6f 100644 --- a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go +++ b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go @@ -97,11 +97,9 @@ func (s *SpanLogger) Error(err error) error { } func withContext(ctx context.Context, logger log.Logger, resolver TenantResolver) log.Logger { - // Weaveworks uses "orgs" and "orgID" to represent Cortex users, - // even though the code-base generally uses `userID` to refer to the same thing. userID, err := resolver.TenantID(ctx) if err == nil && userID != "" { - logger = log.With(logger, "org_id", userID) + logger = log.With(logger, "user", userID) } traceID, ok := tracing.ExtractSampledTraceID(ctx) diff --git a/vendor/modules.txt b/vendor/modules.txt index a0b0f90ebfa0..bae41a2d13c6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -528,7 +528,7 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.4.2 ## explicit; go 1.12 github.com/gorilla/websocket -# github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca +# github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 ## explicit; go 1.17 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency From 16648d9812737463e58a5b2f38fd4d2c46d9f65e Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 17 May 2022 16:06:26 +0200 Subject: [PATCH 2/3] Add /ingester/shutdown handler This handler replaces the deprecated /ingester/flush_shutdown handler and can be used to gracefully shut down a Loki instance and delete the file that persists the tokens of the ingester ring. In production environments you usually want to persist ring tokens so that during a restart of an ingester instance, or during rollout, the tokens from that instance are not re-distributed to other instances, but instead kept so that the same streams end up on the same instance once it is up and running again. For that, the tokens are written to a file that can be specified via the `-ingester.tokens-file-path` argument. In certain cases, however, you want to forget the tokens and re-distribute them when shutting down an ingester instance. This was already possible by calling `/ingester/flush_shutdown`, deleting the tokens file and terminating the process. The new handler `/ingester/shutdown` combines these manual steps into a single handler. Signed-off-by: Christian Haudum --- docs/sources/api/_index.md | 26 +++++++++++++-- pkg/ingester/ingester.go | 65 ++++++++++++++++++++++++++++++++++++-- pkg/loki/modules.go | 12 +++++-- pkg/util/http.go | 12 +++++++ 4 files changed, 107 insertions(+), 8 deletions(-) diff --git a/docs/sources/api/_index.md b/docs/sources/api/_index.md index 2b113b96d6f8..55ac2e123ef5 100644 --- a/docs/sources/api/_index.md +++ b/docs/sources/api/_index.md @@ -48,7 +48,8 @@ These endpoints are exposed by the distributor: These endpoints are exposed by the ingester: - [`POST /flush`](#post-flush) -- [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown) +- **Deprecated** [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown) +- [`POST /ingester/shutdown`](#post-ingestershutdown) The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably. @@ -807,13 +808,34 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester. ## `POST /ingester/flush_shutdown` +**Deprecated**: Please use `/ingester/shutdown?flush=true` instead. + `/ingester/flush_shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds. This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned, but instead flushed to our chunk backend. In microservices mode, the `/ingester/flush_shutdown` endpoint is exposed by the ingester. -### `GET /distributor/ring` +## `POST /ingester/shutdown` + +`/ingester/shutdown` is similar to the [`/ingester/flush_shutdown`](#post-ingesterflush_shutdown) +endpoint, but accepts three URL query parameters `flush`, `delete_ring_tokens`, and `terminate`. + +**URL query parameters:** + +* `flush=`: + Flag to control whether to flush any in-memory chunks the ingester holds. Defaults to `true`. +* `delete_ring_tokens=`: + Flag to control whether to delete the file that contains the ingester ring tokens of the instance if the `-ingester.token-file-path` is specified. +* `terminate=`: + Flag to control whether to terminate the Loki process after service shutdown. Defaults to `true`. + +This handler, in contrast to the `/ingester/flush_shutdown` handler, terminates the Loki process by default. +This behaviour can be changed by setting the `terminate` query parameter to `false`. + +In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester. + +## `GET /distributor/ring` Displays a web page with the distributor hash ring status, including the state, healthy and last heartbeat time of each distributor. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f01461e8ab88..894354b208f0 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/grafana/dskit/modules" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" @@ -172,8 +173,10 @@ type Interface interface { logproto.QuerierServer CheckReady(ctx context.Context) error FlushHandler(w http.ResponseWriter, _ *http.Request) - ShutdownHandler(w http.ResponseWriter, r *http.Request) GetOrCreateInstance(instanceID string) (*instance, error) + // deprecated + LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) + ShutdownHandler(w http.ResponseWriter, r *http.Request) } // Ingester builds chunks for incoming log streams. @@ -209,6 +212,10 @@ type Ingester struct { // Denotes whether the ingester should flush on shutdown. // Currently only used by the WAL to signal when the disk is full. flushOnShutdownSwitch *OnceSwitch + // Flag for whether stopping the ingester service should also terminate the + // loki process. + // This is set when calling the shutdown handler. + terminateOnShutdown bool // Only used by WAL & flusher to coordinate backpressure during replay. replayController *replayController @@ -245,6 +252,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid tailersQuit: make(chan struct{}), metrics: metrics, flushOnShutdownSwitch: &OnceSwitch{}, + terminateOnShutdown: false, } i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) @@ -506,6 +514,12 @@ func (i *Ingester) stopping(_ error) error { } i.flushQueuesDone.Wait() + // In case the flag to terminate on shutdown is set we need to mark the + // ingester service as "failed", so Loki will shut down entirely. + // The module manager logs the failure `modules.ErrStopProcess` in a special way. + if i.terminateOnShutdown && errs.Err() == nil { + return modules.ErrStopProcess + } return errs.Err() } @@ -526,10 +540,16 @@ func (i *Ingester) loop() { } } -// ShutdownHandler triggers the following set of operations in order: +// LegacyShutdownHandler triggers the following set of operations in order: // * Change the state of ring to stop accepting writes. // * Flush all the chunks. -func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { +// Note: This handler does not trigger a termination of the Loki process, +// despite its name. Instead, the ingester service is stopped, so an external +// source can trigger a safe termination through a signal to the process. +// The handler is deprecated and usage is discouraged. Use ShutdownHandler +// instead. +func (i *Ingester) LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) { + level.Warn(util_log.Logger).Log("msg", "The handler /ingester/flush_shutdown is deprecated and usage is discouraged. Please use /ingester/shutdown?flush=true instead.") originalState := i.lifecycler.FlushOnShutdown() // We want to flush the chunks if transfer fails irrespective of original flag. i.lifecycler.SetFlushOnShutdown(true) @@ -538,6 +558,45 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +// ShutdownHandler handles a graceful shutdown of the ingester service and +// termination of the Loki process. +func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { + // Don't allow calling the shutdown handler multiple times + if i.State() != services.Running { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte("Ingester is stopping or already stopped.")) + return + } + params := r.URL.Query() + doFlush := util.FlagFromValues(params, "flush", true) + doDeleteRingTokens := util.FlagFromValues(params, "delete_ring_tokens", false) + doTerminate := util.FlagFromValues(params, "terminate", true) + err := i.handleShutdown(doTerminate, doFlush, doDeleteRingTokens) + + // Stopping the module will return the modules.ErrStopProcess error. This is + // needed so the Loki process is shut down completely. + if err == nil || err == modules.ErrStopProcess { + w.WriteHeader(http.StatusNoContent) + } else { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + } +} + +// handleShutdown triggers the following operations: +// * Change the state of ring to stop accepting writes. +// * optional: Flush all the chunks. +// * optional: Delete ring tokens file +// * Unregister from KV store +// * optional: Terminate process (handled by service manager in loki.go) +func (i *Ingester) handleShutdown(terminate, flush, del bool) error { + i.lifecycler.SetFlushOnShutdown(flush) + i.lifecycler.SetClearTokensOnShutdown(del) + i.lifecycler.SetUnregisterOnShutdown(true) + i.terminateOnShutdown = terminate + return services.StopAndAwaitTerminated(context.Background(), i) +} + // Push implements logproto.Pusher. func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { instanceID, err := tenant.TenantID(ctx) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index d58c0a3d9788..4485256d8a99 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -342,9 +342,15 @@ func (t *Loki) initIngester() (_ services.Service, err error) { httpMiddleware := middleware.Merge( serverutil.RecoveryHTTPMiddleware, ) - t.Server.HTTP.Path("/flush").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler))) - t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler))) - + t.Server.HTTP.Methods("GET", "POST").Path("/flush").Handler( + httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)), + ) + t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler( + httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.LegacyShutdownHandler)), + ) + t.Server.HTTP.Methods("POST").Path("/ingester/shutdown").Handler( + httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)), + ) return t.Ingester, nil } diff --git a/pkg/util/http.go b/pkg/util/http.go index 3305a9410a49..ed1c5a4220ca 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -9,6 +9,7 @@ import ( "html/template" "io" "net/http" + "net/url" "strings" "github.com/go-kit/log" @@ -285,3 +286,14 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi } return nil } + +func FlagFromValues(values url.Values, key string, d bool) bool { + switch strings.ToLower(values.Get(key)) { + case "t", "true", "1": + return true + case "f", "false", "0": + return false + default: + return d + } +} From 095653c077271bfadeb8df4b4b0ece4243e0eb5c Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 17 May 2022 17:35:32 +0200 Subject: [PATCH 3/3] Add changelog entry Signed-off-by: Christian Haudum --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93c7978512a8..9d2cc6eaa6df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields * [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target +* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully * [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage * [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization * [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail