Skip to content

Commit

Permalink
add IT for connection breaks
Browse files Browse the repository at this point in the history
fix missing drain trigger
  • Loading branch information
DC2-DanielKrueger committed Nov 28, 2023
1 parent 27dcfe8 commit 36eefb6
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand All @@ -83,7 +87,7 @@ public class BridgeMqttClient {
private final @NotNull EventService eventService;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private List<MqttForwarder> forwarders = Collections.synchronizedList(new ArrayList<>());
private final @NotNull List<MqttForwarder> forwarders = Collections.synchronizedList(new ArrayList<>());

public BridgeMqttClient(
final @NotNull SystemInformation systemInformation,
Expand Down Expand Up @@ -115,6 +119,7 @@ private Mqtt5AsyncClient createClient() {
builder.addConnectedListener(context -> {
log.debug("Bridge {} connected", bridge.getId());
connected.set(true);
forwarders.forEach(MqttForwarder::drainQueue);
});

builder.addConnectedListener(context -> eventService.fireEvent(eventBuilder(Event.SEVERITY.INFO).withMessage(
Expand Down Expand Up @@ -236,10 +241,9 @@ private Mqtt5AsyncClient createClient() {
forwarder.drainQueue();
}


ImmutableList.Builder<CompletableFuture<Mqtt5SubAck>> subscribeFutures = new ImmutableList.Builder<>();
for (RemoteSubscription remoteSubscription : bridge.getRemoteSubscriptions()) {
final List<Mqtt5Subscription> subscriptions = convertSubscriptions(remoteSubscription, bridge);
final List<Mqtt5Subscription> subscriptions = convertSubscriptions(remoteSubscription);
final Consumer<Mqtt5Publish> mqtt5PublishConsumer = new RemotePublishConsumer(remoteSubscription,
bridgeInterceptorHandler,
bridge,
Expand Down Expand Up @@ -268,7 +272,7 @@ private Mqtt5AsyncClient createClient() {

@NotNull
private static List<Mqtt5Subscription> convertSubscriptions(
final @NotNull RemoteSubscription remoteSubscription, final @NotNull MqttBridge bridge) {
final @NotNull RemoteSubscription remoteSubscription) {
return remoteSubscription.getFilters().stream().map(originalFilter -> {
MqttTopicFilter topicFilter = MqttTopicFilter.of(originalFilter);
return Mqtt5Subscription.builder()
Expand Down Expand Up @@ -313,7 +317,7 @@ public boolean isConnected() {
return connected.get();
}

protected Event.Builder eventBuilder(final @NotNull Event.SEVERITY severity) {
protected @NotNull Event.Builder eventBuilder(final @NotNull Event.SEVERITY severity) {
Event.Builder builder = new Event.Builder();
builder.withTimestamp(System.currentTimeMillis());
builder.withSource(TypeIdentifier.create(TypeIdentifier.TYPE.BRIDGE, bridge.getId()));
Expand Down

0 comments on commit 36eefb6

Please sign in to comment.