From 36eefb6eca460acfe8e1d57d0854639cb85421df Mon Sep 17 00:00:00 2001 From: Daniel Krueger Date: Tue, 28 Nov 2023 13:10:26 +0100 Subject: [PATCH] add IT for connection breaks fix missing drain trigger --- .../com/hivemq/bridge/mqtt/BridgeMqttClient.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java b/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java index 7ff549373..83cbb4fbe 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java +++ b/hivemq-edge/src/main/java/com/hivemq/bridge/mqtt/BridgeMqttClient.java @@ -59,7 +59,11 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -83,7 +87,7 @@ public class BridgeMqttClient { private final @NotNull EventService eventService; private final AtomicBoolean connected = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false); - private List forwarders = Collections.synchronizedList(new ArrayList<>()); + private final @NotNull List forwarders = Collections.synchronizedList(new ArrayList<>()); public BridgeMqttClient( final @NotNull SystemInformation systemInformation, @@ -115,6 +119,7 @@ private Mqtt5AsyncClient createClient() { builder.addConnectedListener(context -> { log.debug("Bridge {} connected", bridge.getId()); connected.set(true); + forwarders.forEach(MqttForwarder::drainQueue); }); builder.addConnectedListener(context -> eventService.fireEvent(eventBuilder(Event.SEVERITY.INFO).withMessage( @@ -236,10 +241,9 @@ private Mqtt5AsyncClient createClient() { forwarder.drainQueue(); } - ImmutableList.Builder> subscribeFutures = new ImmutableList.Builder<>(); for (RemoteSubscription remoteSubscription : bridge.getRemoteSubscriptions()) { - final List subscriptions = convertSubscriptions(remoteSubscription, bridge); + final List subscriptions = convertSubscriptions(remoteSubscription); final Consumer mqtt5PublishConsumer = new RemotePublishConsumer(remoteSubscription, bridgeInterceptorHandler, bridge, @@ -268,7 +272,7 @@ private Mqtt5AsyncClient createClient() { @NotNull private static List convertSubscriptions( - final @NotNull RemoteSubscription remoteSubscription, final @NotNull MqttBridge bridge) { + final @NotNull RemoteSubscription remoteSubscription) { return remoteSubscription.getFilters().stream().map(originalFilter -> { MqttTopicFilter topicFilter = MqttTopicFilter.of(originalFilter); return Mqtt5Subscription.builder() @@ -313,7 +317,7 @@ public boolean isConnected() { return connected.get(); } - protected Event.Builder eventBuilder(final @NotNull Event.SEVERITY severity) { + protected @NotNull Event.Builder eventBuilder(final @NotNull Event.SEVERITY severity) { Event.Builder builder = new Event.Builder(); builder.withTimestamp(System.currentTimeMillis()); builder.withSource(TypeIdentifier.create(TypeIdentifier.TYPE.BRIDGE, bridge.getId()));