Skip to content

Commit

Permalink
Fix nats-server-config-reloader for k8s secrets
Browse files Browse the repository at this point in the history
Fixes #130

To fix this issue, the commit ensures that when a file is deleted,
the Inotify watches immediately unregister it and then attempt to
periodically re-register the file every second until the operation
succeeds. This guarantees that the file will continue to be
monitored reliably even after deletion.
  • Loading branch information
jkralik committed Jun 9, 2023
1 parent be342e4 commit 1f27bf7
Showing 1 changed file with 133 additions and 50 deletions.
183 changes: 133 additions & 50 deletions pkg/natsreloader/natsreloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"time"

Expand Down Expand Up @@ -84,6 +85,89 @@ func (r *Reloader) waitForProcess() error {
return nil
}

func removeDuplicateStrings(s []string) []string {
if len(s) < 1 {
return s
}

sort.Strings(s)
prev := 1
for curr := 1; curr < len(s); curr++ {
if s[curr-1] != s[curr] {
s[prev] = s[curr]
prev++
}
}

return s[:prev]
}

func (r *Reloader) handleEvent(event fsnotify.Event, lastConfigAppliedCache map[string][]byte, updatedFiles, deletedFiles []string) ([]string, []string) {
if event.Op&fsnotify.Remove == fsnotify.Remove {
// We don't get a Remove event for the directory itself, so
// we need to detect that separately.
return updatedFiles, append(deletedFiles, event.Name)
}
_, err := os.Stat(event.Name)
if err != nil {
// Beware that this means that we won't reconfigure if a file
// is permanently removed. We want to support transient
// disappearance, waiting for the new content, and have not set
// up any sort of longer-term timers to detect permanent
// deletion.
// If you really need this, then switch a file to be empty
// before removing if afterwards.
return updatedFiles, deletedFiles
}

if len(updatedFiles) > 0 {
return updatedFiles, deletedFiles
}

for _, configFile := range r.ConfigFiles {
h := sha256.New()
f, err := os.Open(configFile)
if err != nil {
log.Printf("Error: %s\n", err)
continue
}
if _, err := io.Copy(h, f); err != nil {
log.Printf("Error: %s\n", err)
continue
}
digest := h.Sum(nil)
lastConfigHash, ok := lastConfigAppliedCache[configFile]
if ok && bytes.Equal(lastConfigHash, digest) {
// No meaningful change or this is the first time we've checked
continue
}
lastConfigAppliedCache[configFile] = digest

log.Printf("changed config; file=%q existing=%v total-files=%d",
configFile, ok, len(lastConfigAppliedCache))

updatedFiles = append(updatedFiles, configFile)

// We only get an event for one file at a time, we can stop checking
// config files here and continue with our business.
break
}
return updatedFiles, deletedFiles
}

// handleEvents handles all events in the queue. It returns the updated and deleted files and can contain duplicates.
func (r *Reloader) handleEvents(configWatcher *fsnotify.Watcher, event fsnotify.Event, lastConfigAppliedCache map[string][]byte) ([]string, []string) {
updatedFiles, deletedFiles := r.handleEvent(event, lastConfigAppliedCache, make([]string, 0, 16), make([]string, 0, 16))
for {
select {
case event := <-configWatcher.Events:
updatedFiles, deletedFiles = r.handleEvent(event, lastConfigAppliedCache, updatedFiles, deletedFiles)
default:
return updatedFiles, deletedFiles
}
}
}

// Run starts the main loop.
func (r *Reloader) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -108,9 +192,13 @@ func (r *Reloader) Run(ctx context.Context) error {
for i := range r.ConfigFiles {
// Ensure our paths are canonical
r.ConfigFiles[i], _ = filepath.Abs(r.ConfigFiles[i])
// Use directory here because k8s remounts the entire folder
// the config file lives in. So, watch the folder so we properly receive events.
if err := configWatcher.Add(filepath.Dir(r.ConfigFiles[i])); err != nil {
}
r.ConfigFiles = removeDuplicateStrings(r.ConfigFiles)
// Follow configuration file updates and trigger reload when
// it is either recreated or written into.
for i := range r.ConfigFiles {
// Watch files individually for https://github.com/kubernetes/kubernetes/issues/112677
if err := configWatcher.Add(r.ConfigFiles[i]); err != nil {
return err
}
}
Expand Down Expand Up @@ -148,70 +236,65 @@ func (r *Reloader) Run(ctx context.Context) error {
r.ConfigFiles)
}

var triggerName string
var updatedFiles bool
// We use a ticker to re-add deleted files to the watcher
t := time.NewTicker(time.Second)
t.Stop()
var tickerRunning bool
var deletedFiles []string
var updatedFiles []string

for {
select {
case <-ctx.Done():
return nil
case event := <-configWatcher.Events:
triggerName = event.Name

_, err := os.Stat(event.Name)
if err != nil {
// Beware that this means that we won't reconfigure if a file
// is permanently removed. We want to support transient
// disappearance, waiting for the new content, and have not set
// up any sort of longer-term timers to detect permanent
// deletion.
// If you really need this, then switch a file to be empty
// before removing if afterwards.
continue
}

updatedFiles = false
for _, configFile := range r.ConfigFiles {
h := sha256.New()
f, err := os.Open(configFile)
if err != nil {
log.Printf("Error: %s\n", err)
continue
}
if _, err := io.Copy(h, f); err != nil {
log.Printf("Error: %s\n", err)
continue
}
digest := h.Sum(nil)
lastConfigHash, ok := lastConfigAppliedCache[configFile]
if ok && bytes.Equal(lastConfigHash, digest) {
// No meaningful change or this is the first time we've checked
continue
case <-t.C:
newDeletedFiles := make([]string, 0, len(deletedFiles))
updated := make([]string, 0, len(deletedFiles))
for _, f := range deletedFiles {
if err := configWatcher.Add(f); err != nil {
newDeletedFiles = append(newDeletedFiles, f)
} else {
updated, _ = r.handleEvent(fsnotify.Event{Name: f, Op: fsnotify.Create}, lastConfigAppliedCache, updated, nil)
}
lastConfigAppliedCache[configFile] = digest

log.Printf("changed config; file=%q existing=%v total-files=%d",
configFile, ok, len(lastConfigAppliedCache))

updatedFiles = true

// We only get an event for one file at a time, we can stop checking
// config files here and continue with our business.
}
updatedFiles = removeDuplicateStrings(updated)
deletedFiles = newDeletedFiles
if len(deletedFiles) == 0 {
// No more deleted files, stop the ticker
t.Stop()
tickerRunning = false
}
if len(updatedFiles) > 0 {
// Send signal to reload the config
log.Printf("updated files: %v", updatedFiles)
break
}
if !updatedFiles {
continue
continue
// Check if the process is still alive
case event := <-configWatcher.Events:
updated, deleted := r.handleEvents(configWatcher, event, lastConfigAppliedCache)
updatedFiles = removeDuplicateStrings(updated)
deletedFiles = removeDuplicateStrings(append(deletedFiles, deleted...))
if !tickerRunning {
// Start the ticker to re-add deleted files
t.Reset(time.Second)
tickerRunning = true
}
if len(updatedFiles) > 0 {
// Send signal to reload the config
log.Printf("updated files: %v", updatedFiles)
break
}
continue
case err := <-configWatcher.Errors:
log.Printf("Error: %s\n", err)
continue
}

// Configuration was updated, try to do reload for a few times
// otherwise give up and wait for next event.
TryReload:
for {
log.Printf("Sending signal '%s' to server to reload configuration due to: %s", r.Signal.String(), triggerName)
log.Printf("Sending signal '%s' to server to reload configuration due to: %s", r.Signal.String(), updatedFiles)
err := r.proc.Signal(r.Signal)
if err != nil {
log.Printf("Error during reload: %s\n", err)
Expand Down

0 comments on commit 1f27bf7

Please sign in to comment.