Skip to content

Commit

Permalink
Add AppConfig and Event Hubs samples for using Monitor exporter (#17565)
Browse files Browse the repository at this point in the history
* Add AppConfig and Event Hubs samples for using exporters

* Fix compiler warnings

* Update sdk/monitor/microsoft-opentelemetry-exporter-azuremonitor/pom.xml

* Update method names
  • Loading branch information
srnagar authored Nov 13, 2020
1 parent 5262e66 commit 88e5368
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 28 deletions.
21 changes: 21 additions & 0 deletions sdk/monitor/microsoft-opentelemetry-exporter-azuremonitor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@
<version>1.5.1</version> <!-- {x-version-update;com.azure:azure-core-test;dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-appconfiguration</artifactId>
<version>1.1.7</version> <!-- {x-version-update;com.azure:azure-data-appconfiguration;dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-tracing-opentelemetry</artifactId>
<version>1.0.0-beta.6</version> <!-- {x-version-update;com.azure:azure-core-tracing-opentelemetry;dependency} -->
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.opentelemetry.exporter.azuremonitor;

import com.azure.data.appconfiguration.ConfigurationClient;
import com.azure.data.appconfiguration.ConfigurationClientBuilder;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.TracerSdkProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;

/**
* Sample to demonstrate using {@link AzureMonitorExporter} to export telemetry events when setting a configuration
* in App Configuration through the {@link ConfigurationClient}.
*/
public class AppConfigurationAzureMonitorExporterSample {

private static final Tracer TRACER = configureAzureMonitorExporter();
private static final String CONNECTION_STRING = "<YOUR_CONNECTION_STRING>";

/**
* The main method to run the application.
* @param args Ignored args.
*/
public static void main(String[] args) {
doClientWork();
}

/**
* Configure the OpenTelemetry {@link AzureMonitorExporter} to enable tracing.
* @return The OpenTelemetry {@link Tracer} instance.
*/
private static Tracer configureAzureMonitorExporter() {
AzureMonitorExporter exporter = new AzureMonitorExporterBuilder()
.connectionString("{connection-string}")
.buildExporter();

TracerSdkProvider tracerSdkProvider = OpenTelemetrySdk.getTracerProvider();
tracerSdkProvider.addSpanProcessor(SimpleSpanProcessor.newBuilder(exporter).build());
return tracerSdkProvider.get("Sample");
}

/**
* Creates the {@link ConfigurationClient} and sets a configuration in Azure App Configuration with distributed
* tracing enabled and using the Azure Monitor exporter to export telemetry events to Azure Monitor.
*/
private static void doClientWork() {
ConfigurationClient client = new ConfigurationClientBuilder()
.connectionString(CONNECTION_STRING)
.buildClient();

Span span = TRACER.spanBuilder("user-parent-span").startSpan();
final Scope scope = TRACER.withSpan(span);
try {
// Thread bound (sync) calls will automatically pick up the parent span and you don't need to pass it explicitly.
client.setConfigurationSetting("hello", "text", "World");
} finally {
span.end();
scope.close();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.opentelemetry.exporter.azuremonitor;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.TracerSdkProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.OPERATION_TIMEOUT;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample to demontrate using {@link AzureMonitorExporter} to export telemetry events when sending events to Event Hubs
* using {@link EventHubProducerAsyncClient}.
*/
public class EventHubsAzureMonitorExporterSample {
private static final Tracer TRACER = configureAzureMonitorExporter();
private static final String CONNECTION_STRING = "<YOUR_CONNECTION_STRING>";

/**
* The main method to run the application.
* @param args Ignored args.
*/
public static void main(String[] args) {
doClientWork();
}

/**
* Configure the OpenTelemetry {@link AzureMonitorExporter} to enable tracing.
* @return The OpenTelemetry {@link Tracer} instance.
*/
private static Tracer configureAzureMonitorExporter() {
AzureMonitorExporter exporter = new AzureMonitorExporterBuilder()
.connectionString("{connection-string}")
.buildExporter();

TracerSdkProvider tracerSdkProvider = OpenTelemetrySdk.getTracerProvider();
tracerSdkProvider.addSpanProcessor(SimpleSpanProcessor.newBuilder(exporter).build());
return tracerSdkProvider.get("Sample");
}

/**
* Method that creates {@link EventHubProducerAsyncClient} to send events to Event Hubs with distributed
* telemetry enabled and using Azure Monitor exporter to export telemetry events.
*/
private static void doClientWork() {
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(CONNECTION_STRING)
.buildAsyncProducerClient();

Span span = TRACER.spanBuilder("user-parent-span").startSpan();
final Scope scope = TRACER.withSpan(span);
try {
String firstPartition = producer.getPartitionIds().blockFirst(OPERATION_TIMEOUT);

final byte[] body = "EventData Sample 1".getBytes(UTF_8);
final byte[] body2 = "EventData Sample 2".getBytes(UTF_8);

// We will publish three events based on simple sentences.
Flux<EventData> data = Flux.just(
new EventData(body).addContext(PARENT_SPAN_KEY, TRACER.getCurrentSpan()),
new EventData(body2).addContext(PARENT_SPAN_KEY, TRACER.getCurrentSpan()));

// Create a batch to send the events.
final CreateBatchOptions options = new CreateBatchOptions()
.setPartitionId(firstPartition)
.setMaximumSizeInBytes(256);

final AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
producer.createBatch(options).block(OPERATION_TIMEOUT));

data.flatMap(event -> {
final EventDataBatch batch = currentBatch.get();
if (batch.tryAdd(event)) {
return Mono.empty();
}

// The batch is full, so we create a new batch and send the batch. Mono.when completes when both
// operations
// have completed.
return Mono.when(
producer.send(batch),
producer.createBatch(options).map(newBatch -> {
currentBatch.set(newBatch);

// Add that event that we couldn't before.
if (!newBatch.tryAdd(event)) {
throw Exceptions.propagate(new IllegalArgumentException(String.format(
"Event is too large for an empty batch. Max size: %s. Event: %s",
newBatch.getMaxSizeInBytes(), event.getBodyAsString())));
}

return newBatch;
}));
}).then()
.doFinally(signal -> {
final EventDataBatch batch = currentBatch.getAndSet(null);
if (batch != null) {
producer.send(batch).block(OPERATION_TIMEOUT);
}
})
.subscribe(unused -> System.out.println("Complete"),
error -> System.out.println("Error sending events: " + error),
() -> {
System.out.println("Completed sending events.");
span.end();
});


// The .subscribe() creation and assignment is not a blocking call. For the purpose of this example, we sleep
// the thread so the program does not end before the send operation is complete. Using .block() instead of
// .subscribe() will turn this into a synchronous call.
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ignored) {
} finally {
// Disposing of our producer.
producer.close();
}
} finally {
scope.close();
}
}
}

0 comments on commit 88e5368

Please sign in to comment.