Skip to content

Commit

Permalink
Add tracing support for Service Bus processor (Azure#17684)
Browse files Browse the repository at this point in the history
* Add tracing support for SB processor

* Make addContext packag-private

* Resolve merge conflict
  • Loading branch information
srnagar authored Nov 20, 2020
1 parent 57c79dd commit 5885efd
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,10 @@ public final class ServiceBusSessionProcessorClientBuilder {

private ServiceBusSessionProcessorClientBuilder() {
sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder();
processorClientOptions = new ServiceBusProcessorClientOptions();
processorClientOptions = new ServiceBusProcessorClientOptions()
.setMaxConcurrentCalls(1)
.setTracerProvider(tracerProvider);
sessionReceiverClientBuilder.maxConcurrentSessions(1);
processorClientOptions.setMaxConcurrentCalls(1);
}

/**
Expand Down Expand Up @@ -1101,7 +1102,9 @@ public final class ServiceBusProcessorClientBuilder {

private ServiceBusProcessorClientBuilder() {
serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder();
processorClientOptions = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1);
processorClientOptions = new ServiceBusProcessorClientOptions()
.setMaxConcurrentCalls(1)
.setTracerProvider(tracerProvider);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,38 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;

import java.io.Closeable;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;

/**
* The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient
* ServiceBusProcessorClients} provides a push-based mechanism that invokes the message processing callback when a
Expand Down Expand Up @@ -44,6 +62,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
private final AtomicReference<Subscription> receiverSubscription = new AtomicReference<>();
private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference<>();
private final AtomicBoolean isRunning = new AtomicBoolean();
private final TracerProvider tracerProvider;
private ScheduledExecutorService scheduledExecutor;

/**
Expand All @@ -65,6 +84,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
this.receiverBuilder = null;
this.tracerProvider = processorOptions.getTracerProvider();
}

/**
Expand All @@ -84,6 +104,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(receiverBuilder.buildAsyncClient());
this.sessionReceiverBuilder = null;
this.tracerProvider = processorOptions.getTracerProvider();
}

/**
Expand Down Expand Up @@ -164,12 +185,22 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
Context processSpanContext = null;
try {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext =
new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);

processSpanContext =
startProcessTracingSpan(serviceBusMessageContext.getMessage(),
receiverClient.getEntityPath(), receiverClient.getFullyQualifiedNamespace());
if (processSpanContext.getData(SPAN_CONTEXT_KEY).isPresent()) {
serviceBusMessageContext.getMessage().addContext(SPAN_CONTEXT_KEY, processSpanContext);
}
processMessage.accept(serviceBusReceivedMessageContext);
endProcessTracingSpan(processSpanContext, Signal.complete());
} catch (Exception ex) {
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));
endProcessTracingSpan(processSpanContext, Signal.error(ex));
if (!processorOptions.isDisableAutoComplete()) {
logger.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
Expand Down Expand Up @@ -201,6 +232,54 @@ public void onComplete() {
});
}

private void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal) {
if (processSpanContext == null) {
return;
}

Optional<Object> spanScope = processSpanContext.getData(SCOPE_KEY);
// Disposes of the scope when the trace span closes.
if (!spanScope.isPresent() || !tracerProvider.isEnabled()) {
return;
}
if (spanScope.get() instanceof Closeable) {
Closeable close = (Closeable) processSpanContext.getData(SCOPE_KEY).get();
try {
close.close();
tracerProvider.endSpan(processSpanContext, signal);
} catch (IOException ioException) {
logger.error("endTracingSpan().close() failed with an error %s", ioException);
}

} else {
logger.warning(String.format(Locale.US,
"Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span",
spanScope.get() != null ? spanScope.getClass() : "null"));
}
}

private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, String entityPath,
String fullyQualifiedNamespace) {

Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY);
if (diagnosticId == null || !tracerProvider.isEnabled()) {
return Context.NONE;
}

Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);

spanContext = spanContext
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, fullyQualifiedNamespace)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
spanContext = receivedMessage.getEnqueuedTime() == null
? spanContext
: spanContext.addData(MESSAGE_ENQUEUED_TIME,
receivedMessage.getEnqueuedTime().toInstant().getEpochSecond());

return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, ProcessKind.PROCESS);
}

private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext,
ServiceBusReceiverAsyncClient receiverClient) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.core.amqp.models.AmqpMessageBodyType;
import com.azure.core.amqp.models.AmqpMessageId;
import com.azure.core.experimental.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;

Expand Down Expand Up @@ -40,10 +41,12 @@ public final class ServiceBusReceivedMessage {
private final AmqpAnnotatedMessage amqpAnnotatedMessage;
private UUID lockToken;
private boolean isSettled = false;
private Context context;

ServiceBusReceivedMessage(BinaryData body) {
Objects.requireNonNull(body, "'body' cannot be null.");
amqpAnnotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.fromData(body.toBytes()));
context = Context.NONE;
}

/**
Expand Down Expand Up @@ -438,6 +441,22 @@ public String getTo() {
return to;
}

/**
* Adds a new key value pair to the existing context on Message.
*
* @param key The key for this context object
* @param value The value for this context object.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
ServiceBusReceivedMessage addContext(String key, Object value) {
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
Objects.requireNonNull(value, "The 'value' parameter cannot be null.");
this.context = context.addData(key, value);
return this;
}

/**
* Gets whether the message has been settled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.servicebus.implementation.models;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.annotation.Fluent;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;

Expand All @@ -15,6 +16,8 @@ public final class ServiceBusProcessorClientOptions {
private int maxConcurrentCalls = 1;
private boolean disableAutoComplete;

private TracerProvider tracerProvider;

/**
* Returns true if the auto-complete and auto-abandon feature is disabled.
* @return true if the auto-complete and auto-abandon feature is disabled.
Expand Down Expand Up @@ -50,4 +53,24 @@ public ServiceBusProcessorClientOptions setMaxConcurrentCalls(int maxConcurrentC
this.maxConcurrentCalls = maxConcurrentCalls;
return this;
}

/**
* Returns the {@link TracerProvider} instance that is used in {@link ServiceBusProcessorClient}.
*
* @return The {@link TracerProvider} instance that is used in {@link ServiceBusProcessorClient}.
*/
public TracerProvider getTracerProvider() {
return tracerProvider;
}

/**
* Sets the {@link TracerProvider} instance to use in {@link ServiceBusProcessorClient}.
*
* @param tracerProvider The {@link TracerProvider} instance to use in {@link ServiceBusProcessorClient}.
* @return The updated instance of {@link ServiceBusProcessorClientOptions}.
*/
public ServiceBusProcessorClientOptions setTracerProvider(TracerProvider tracerProvider) {
this.tracerProvider = tracerProvider;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,38 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.experimental.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -319,6 +332,66 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru
verify(asyncClient, never()).abandon(any(ServiceBusReceivedMessage.class));
}

@Test
public void testProcessorWithTracingEnabled() throws InterruptedException {
final Tracer tracer = mock(Tracer.class);
final List<Tracer> tracers = Collections.singletonList(tracer);
TracerProvider tracerProvider = new TracerProvider(tracers);

String diagnosticId = "00-08ee063508037b1719dddcbf248e30e2-1365c684eb25daed-01";

when(tracer.extractContext(eq(diagnosticId), any())).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
return passed.addData(SPAN_CONTEXT_KEY, "value");
}
);
when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent());
return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (Closeable) () -> {
return;
}).addData(PARENT_SPAN_KEY, "value2");
}
);
Flux<ServiceBusMessageContext> messageFlux =
Flux.create(emitter -> {
for (int i = 0; i < 5; i++) {
ServiceBusReceivedMessage serviceBusReceivedMessage =
new ServiceBusReceivedMessage(BinaryData.fromString("hello"));
serviceBusReceivedMessage.setMessageId(String.valueOf(i));
serviceBusReceivedMessage.setEnqueuedTime(OffsetDateTime.now());
serviceBusReceivedMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, diagnosticId);
ServiceBusMessageContext serviceBusMessageContext =
new ServiceBusMessageContext(serviceBusReceivedMessage);
emitter.next(serviceBusMessageContext);
}
});

ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux);

AtomicInteger messageId = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(5);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
messageContext -> {
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
countDownLatch.countDown();
},
error -> Assertions.fail("Error occurred when receiving messages from the processor"),
new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setTracerProvider(tracerProvider));

serviceBusProcessorClient.start();
boolean success = countDownLatch.await(5, TimeUnit.SECONDS);
serviceBusProcessorClient.close();

assertTrue(success, "Failed to receive all expected messages");
verify(tracer, times(5)).extractContext(eq(diagnosticId), any());
verify(tracer, times(5)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS));
verify(tracer, times(5)).end(eq("success"), isNull(), any());

}

private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder(
Flux<ServiceBusMessageContext> messageFlux) {

Expand Down

0 comments on commit 5885efd

Please sign in to comment.