Skip to content

Commit

Permalink
Merge pull request #135 from nats-io/pid-signal-fixes
Browse files Browse the repository at this point in the history
Fixes to config reloader for when server pid changes
  • Loading branch information
wallyqs authored Jul 18, 2023
2 parents 24881bc + 1c04564 commit 0cea926
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
4 changes: 2 additions & 2 deletions cmd/nats-server-config-reloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func main() {
fs.StringVar(&nconfig.PidFile, "pid", "/var/run/nats/gnatsd.pid", "NATS Server Pid File")
fs.Var(&fileSet, "c", "NATS Server Config File (may be repeated to specify more than one)")
fs.Var(&fileSet, "config", "NATS Server Config File (may be repeated to specify more than one)")
fs.IntVar(&nconfig.MaxRetries, "max-retries", 5, "Max attempts to trigger reload")
fs.IntVar(&nconfig.RetryWaitSecs, "retry-wait-secs", 2, "Time to back off when reloading fails before retrying")
fs.IntVar(&nconfig.MaxRetries, "max-retries", 30, "Max attempts to trigger reload")
fs.IntVar(&nconfig.RetryWaitSecs, "retry-wait-secs", 4, "Time to back off when reloading fails before retrying")
fs.IntVar(&customSignal, "signal", 1, "Signal to send to the NATS Server process (default SIGHUP 1)")

fs.Parse(os.Args[1:])
Expand Down
71 changes: 48 additions & 23 deletions pkg/natsreloader/natsreloader.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package natsreloader

import (
Expand All @@ -12,6 +25,7 @@ import (
"path/filepath"
"sort"
"strconv"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -61,14 +75,21 @@ func (r *Reloader) waitForProcess() error {
goto WaitAndRetry
}

// This always succeeds regardless of the process existing or not.
proc, err = os.FindProcess(pid)
if err != nil {
goto WaitAndRetry
}

// Check if the process is still alive.
err = proc.Signal(syscall.Signal(0))
if err != nil {
goto WaitAndRetry
}
break

WaitAndRetry:
log.Printf(errorFmt, err)
log.Printf("Error while monitoring pid %v: %v", pid, err)
attempts++
if attempts > r.MaxRetries {
return fmt.Errorf("too many errors attempting to find server process")
Expand All @@ -77,11 +98,9 @@ func (r *Reloader) waitForProcess() error {
}

if attempts > 0 {
log.Printf("found pid from pidfile %q after %v failed attempts (%v time after start)",
r.PidFile, attempts, time.Since(startTime))
log.Printf("Found pid from pidfile %q after %v failed attempts (took %.3fs)",
r.PidFile, attempts, time.Since(startTime).Seconds())
}

r.pid = pid
r.proc = proc
return nil
}
Expand Down Expand Up @@ -167,8 +186,10 @@ func handleEvents(configWatcher *fsnotify.Watcher, event fsnotify.Event, lastCon
}
}

func handleDeletedFiles(deletedFiles []string, configWatcher *fsnotify.Watcher, lastConfigAppliedCache map[string][]byte) ([]string, []string) {
log.Printf("Ticker is running with deletedFiles %v", deletedFiles)
func handleDeletedFiles(deletedFiles []string, configWatcher *fsnotify.Watcher, lastConfigAppliedCache map[string][]byte) ([]string, []string) {
if len(deletedFiles) > 0 {
log.Printf("Tracking files %v", deletedFiles)
}
newDeletedFiles := make([]string, 0, len(deletedFiles))
updated := make([]string, 0, len(deletedFiles))
for _, f := range deletedFiles {
Expand Down Expand Up @@ -212,7 +233,7 @@ func (r *Reloader) init() (*fsnotify.Watcher, map[string][]byte, error) {
}

// lastConfigAppliedCache is the last config update
// applied by us
// applied by us.
lastConfigAppliedCache := make(map[string][]byte)

// Preload config hashes, so we know their digests
Expand All @@ -225,13 +246,8 @@ func (r *Reloader) init() (*fsnotify.Watcher, map[string][]byte, error) {
}
lastConfigAppliedCache[configFile] = digest
}

// If the two pids don't match then os.FindProcess() has done something
// rather hinkier than we expect, but log them both just in case on some
// future platform there's a weird namespace issue, as a difference will
// help with debugging.
log.Printf("Live, ready to kick pid %v (live, from %v spec) based on any of %v files",
r.proc.Pid, r.pid, len(lastConfigAppliedCache))
log.Printf("Live, ready to kick pid %v on config changes (files=%d)",
r.proc.Pid, len(lastConfigAppliedCache))

if len(lastConfigAppliedCache) == 0 {
log.Printf("Error: no watched config files cached; input spec was: %#v",
Expand All @@ -243,17 +259,26 @@ func (r *Reloader) init() (*fsnotify.Watcher, map[string][]byte, error) {
func (r *Reloader) reload(updatedFiles []string) error {
attempts := 0
for {
log.Printf("Sending signal '%s' to server to reload configuration due to: %s", r.Signal.String(), updatedFiles)
err := r.proc.Signal(r.Signal)
err := r.waitForProcess()
if err != nil {
goto Retry
}

log.Printf("Sending pid %v '%s' signal to reload changes from: %s", r.proc.Pid, r.Signal.String(), updatedFiles)
err = r.proc.Signal(r.Signal)
if err == nil {
return nil
}
log.Printf("Error during reload: %s\n", err)

Retry:
if err != nil {
log.Printf("Error during reload: %s", err)
}
if attempts > r.MaxRetries {
return fmt.Errorf("too many errors (%v) attempting to signal server to reload: %w", attempts, err)
}
delay := retryJitter(time.Duration(r.RetryWaitSecs) * time.Second)
log.Printf("Wait and retrying after some time [%v] ...", delay)
log.Printf("Wait and retrying in %.3fs ...", delay.Seconds())
time.Sleep(delay)
attempts++
}
Expand Down Expand Up @@ -287,23 +312,23 @@ func (r *Reloader) Run(ctx context.Context) error {
case <-t.C:
updatedFiles, deletedFiles = handleDeletedFiles(deletedFiles, configWatcher, lastConfigAppliedCache)
if len(deletedFiles) == 0 {
// No more deleted files, stop the ticker
log.Printf("All monitored files detected.")
t.Stop()
tickerRunning = false
}
if len(updatedFiles) > 0 {
// Send signal to reload the config
// Send signal to reload the config.
log.Printf("Updated files: %v", updatedFiles)
break
}
continue
// Check if the process is still alive
case event := <-configWatcher.Events:
updated, deleted := handleEvents(configWatcher, event, lastConfigAppliedCache)
updatedFiles = removeDuplicateStrings(updated)
deletedFiles = removeDuplicateStrings(append(deletedFiles, deleted...))
if !tickerRunning {
// Start the ticker to re-add deleted files
// Start the ticker to re-add deleted files.
log.Printf("Starting ticker to re-add all tracked files.")
t.Reset(time.Second)
tickerRunning = true
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/natsreloader/natsreloadertest/natsreloader_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package natsreloadertest

import (
Expand Down

0 comments on commit 0cea926

Please sign in to comment.