Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Watcher.onClose has dedicated WatcherException as parameter. #2616

Merged
merged 4 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Bugs

#### Improvements
* Fix #2614: Watcher.onClose has dedicated WatcherException as parameter.

#### Dependency Upgrade

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,25 @@
*/
package io.fabric8.kubernetes.client;

import org.slf4j.LoggerFactory;

public interface Watcher<T> {

void eventReceived(Action action, T resource);

/**
* Run when the watcher finally closes.
* Invoked when the watcher is gracefully closed.
*/
default void onClose() {
LoggerFactory.getLogger(Watcher.class).debug("Watcher closed");
}

/**
* Invoked when the watcher closes due to an Exception.
*
* @param cause What caused the watcher to be closed. Null means normal close.
* @param cause What caused the watcher to be closed.
*/
void onClose(KubernetesClientException cause);
void onClose(WatcherException cause);

enum Action {
ADDED, MODIFIED, DELETED, ERROR
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client;

import java.net.HttpURLConnection;

public class WatcherException extends Exception {

public WatcherException(String message, Throwable cause) {
super(message, cause);
}

public WatcherException(String message) {
super(message);
}

public KubernetesClientException asClientException() {
final Throwable cause = getCause();
return cause instanceof KubernetesClientException ?
(KubernetesClientException) cause : new KubernetesClientException(getMessage(), cause);
}

public boolean isHttpGone() {
final KubernetesClientException cause = asClientException();
return cause.getCode() == HttpURLConnection.HTTP_GONE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why it's Http but URL… 🤦 😀

|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE);
}

public boolean isShouldRetry() {
return getCause() == null || !isHttpGone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.fabric8.kubernetes.client.dsl.base;

import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -45,7 +46,6 @@
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Replaceable;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.base.WaitForConditionWatcher.WatchException;
import io.fabric8.kubernetes.client.dsl.internal.DefaultOperationInfo;
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager;
Expand Down Expand Up @@ -1104,7 +1104,7 @@ private T waitUntilConditionWithRetries(Predicate<T> condition, long timeoutNano
return watcher.getFuture().get(remainingNanosToWait, NANOSECONDS);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WatchException && ((WatchException) cause).isShouldRetry()) {
if (cause instanceof WatcherException && ((WatcherException) cause).isShouldRetry()) {
LOG.debug("retryable watch exception encountered, retrying after {} millis", currentBackOff, cause);
Thread.sleep(currentBackOff);
currentBackOff *= watchRetryBackoffMultiplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
*/
package io.fabric8.kubernetes.client.dsl.base;

import java.net.HttpURLConnection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;

public class WaitForConditionWatcher<T extends HasMetadata> implements Watcher<T> {

Expand Down Expand Up @@ -52,34 +51,18 @@ public void eventReceived(Action action, T resource) {
}
break;
case ERROR:
future.completeExceptionally(new WatchException("Action.ERROR received"));
future.completeExceptionally(new WatcherException("Action.ERROR received"));
break;
}
}

@Override
public void onClose(KubernetesClientException cause) {
future.completeExceptionally(new WatchException("Watcher closed", cause));
public void onClose(WatcherException cause) {
future.completeExceptionally(cause);
}

public static class WatchException extends Exception {

public WatchException(String message, KubernetesClientException cause) {
super(message, cause);
}

public WatchException(String message) {
super(message);
}

public boolean isShouldRetry() {
return getCause() == null || !isHttpGone();
}

private boolean isHttpGone() {
KubernetesClientException cause = ((KubernetesClientException) getCause());
return cause.getCode() == HttpURLConnection.HTTP_GONE
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE);
}
@Override
public void onClose() {
future.completeExceptionally(new WatcherException("Watcher closed"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import okhttp3.OkHttpClient;
import okhttp3.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public abstract class AbstractWatchManager<T> implements Watch {

private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class);

final Watcher<T> watcher;
final ListOptions listOptions;
final AtomicReference<String> resourceVersion;
final OkHttpClient clonedClient;

final AtomicBoolean forceClosed;
private final int reconnectLimit;
private final int reconnectInterval;
private final int maxIntervalExponent;
final AtomicInteger currentReconnectAttempt;
private final ScheduledExecutorService executorService;


AbstractWatchManager(
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent,
OkHttpClient clonedClient
) {
this.watcher = watcher;
this.listOptions = listOptions;
this.reconnectLimit = reconnectLimit;
this.reconnectInterval = reconnectInterval;
this.maxIntervalExponent = maxIntervalExponent;
this.clonedClient = clonedClient;
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
this.executorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(AbstractWatchManager.this));
ret.setDaemon(true);
return ret;
});
}

final void closeEvent(WatcherException cause) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you make this method final?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a helper method only used by the *Manager implementations. I'm still working on that, but this method is not supposed to be called from the outside neither improved.
As I said, I'm trying to move as much as possible from the vaious *Managers to this class, but it's a WIP.

if (forceClosed.getAndSet(true)) {
logger.debug("Ignoring duplicate firing of onClose event");
return;
}
watcher.onClose(cause);
}

final void closeEvent() {
if (forceClosed.getAndSet(true)) {
logger.debug("Ignoring duplicate firing of onClose event");
return;
}
watcher.onClose();
}

final void closeExecutorService() {
if (executorService != null && !executorService.isShutdown()) {
logger.debug("Closing ExecutorService");
try {
executorService.shutdown();
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
logger.warn("Executor didn't terminate in time after shutdown in close(), killing it.");
executorService.shutdownNow();
}
} catch (Exception t) {
throw KubernetesClientException.launderThrowable(t);
}
}
}

void submit(Runnable task) {
if (!executorService.isShutdown()) {
executorService.submit(task);
}
}

void schedule(Runnable command, long delay, TimeUnit timeUnit) {
if (!executorService.isShutdown()) {
executorService.schedule(command, delay, timeUnit);
}
}

final boolean cannotReconnect() {
return currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0;
}

final long nextReconnectInterval() {
int exponentOfTwo = currentReconnectAttempt.getAndIncrement();
if (exponentOfTwo > maxIntervalExponent)
exponentOfTwo = maxIntervalExponent;
long ret = (long)reconnectInterval * (1 << exponentOfTwo);
logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo);
return ret;
}

static void closeWebSocket(WebSocket webSocket) {
if (webSocket != null) {
logger.debug("Closing websocket {}", webSocket);
try {
if (!webSocket.close(1000, null)) {
logger.warn("Failed to close websocket");
}
} catch (IllegalStateException e) {
logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl.internal;

import java.util.Objects;

abstract class NamedRunnable implements Runnable {
private final String name;

NamedRunnable(String name) {
this.name = Objects.requireNonNull(name);
}

private void tryToSetName(String value) {
try {
Thread.currentThread().setName(value);
} catch (SecurityException ignored) {
// Ignored
}
}

public final void run() {
String oldName = Thread.currentThread().getName();
tryToSetName(this.name + "|" + oldName);
try {
execute();
} finally {
tryToSetName(oldName);
}
}

protected abstract void execute();
}
Loading