Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AI integration: Refactor code how container and database name is flowing to opentelemetry module #3532

Merged
merged 11 commits into from
Nov 15, 2022
15 changes: 8 additions & 7 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default)
{
return this.container.ClientContext.OperationHelperAsync(
nameof(ExecuteAsync),
requestOptions,
(trace) =>
operationName: nameof(ExecuteAsync),
containerName: this.container.Id,
databaseName: this.container.Database.Id,
operationType: Documents.OperationType.Replace,
requestOptions: requestOptions,
task: (trace) =>
{
BatchExecutor executor = new BatchExecutor(
container: this.container,
Expand All @@ -232,10 +235,8 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(trace, cancellationToken);
},
(response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id));
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

internal sealed class ChangeFeedIteratorCore : FeedIteratorInternal
{
Expand Down Expand Up @@ -222,12 +222,12 @@ public ChangeFeedIteratorCore(
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return await this.clientContext.OperationHelperAsync("Change Feed Iterator Read Next Async",
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id ?? this.databaseName,
operationType: OperationType.ReadFeed,
requestOptions: this.changeFeedRequestOptions,
task: (trace) => this.ReadNextInternalAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
openTelemetry: (response) => new OpenTelemetryResponse(responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ public override CosmosElement GetCosmosElementContinuationToken()
/// <returns>A change feed response from cosmos service</returns>
public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return this.clientContext.OperationHelperAsync("Change Feed Processor Read Next Async",
return this.clientContext.OperationHelperAsync(
operationName: "Change Feed Processor Read Next Async",
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id ?? this.databaseName,
operationType: Documents.OperationType.ReadFeed,
requestOptions: this.changeFeedOptions,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ private ChangeFeedEstimatorIterator(

public override Task<FeedResponse<ChangeFeedProcessorState>> ReadNextAsync(CancellationToken cancellationToken = default)
{
return this.monitoredContainer.ClientContext.OperationHelperAsync("Change Feed Estimator Read Next Async",
return this.monitoredContainer.ClientContext.OperationHelperAsync(
operationName: "Change Feed Estimator Read Next Async",
containerName: this.monitoredContainer?.Id,
databaseName: this.monitoredContainer?.Database?.Id,
operationType: Documents.OperationType.ReadFeed,
requestOptions: null,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(
responseMessage: response,
containerName: this.monitoredContainer?.Id,
databaseName: this.monitoredContainer?.Database?.Id ?? this.databaseName),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
164 changes: 86 additions & 78 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -629,9 +627,12 @@ internal CosmosClient(
public virtual Task<AccountProperties> ReadAccountAsync()
{
return this.ClientContext.OperationHelperAsync(
nameof(ReadAccountAsync),
null,
(trace) => ((IDocumentClientInternal)this.DocumentClient).GetDatabaseAccountInternalAsync(this.Endpoint));
operationName: nameof(ReadAccountAsync),
containerName: null,
databaseName: null,
operationType: OperationType.Read,
requestOptions: null,
task: (trace) => ((IDocumentClientInternal)this.DocumentClient).GetDatabaseAccountInternalAsync(this.Endpoint));
}

/// <summary>
Expand Down Expand Up @@ -715,9 +716,12 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
ThroughputProperties throughputProperties = ThroughputProperties.CreateManualThroughput(throughput);
Expand All @@ -729,10 +733,7 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
trace: trace,
cancellationToken: cancellationToken);
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -765,9 +766,12 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return this.CreateDatabaseInternalAsync(
Expand All @@ -777,10 +781,7 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
trace: trace,
cancellationToken: cancellationToken);
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -824,59 +825,60 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
return string.IsNullOrEmpty(id)
? throw new ArgumentNullException(nameof(id))
: this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
async (trace) =>
{
double totalRequestCharge = 0;
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
DatabaseCore database = (DatabaseCore)this.GetDatabase(id);
using (ResponseMessage readResponse = await database.ReadStreamAsync(
operationName: nameof(CreateDatabaseIfNotExistsAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge = readResponse.Headers.RequestCharge;
if (readResponse.StatusCode != HttpStatusCode.NotFound)
task: async (trace) =>
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
}
}

using (ResponseMessage createResponse = await this.CreateDatabaseStreamInternalAsync(
databaseProperties,
throughputProperties,
requestOptions,
trace,
cancellationToken))
{
totalRequestCharge += createResponse.Headers.RequestCharge;
createResponse.Headers.RequestCharge = totalRequestCharge;

if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
}
}

// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
using (ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge += readResponseAfterConflict.Headers.RequestCharge;
readResponseAfterConflict.Headers.RequestCharge = totalRequestCharge;

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
}
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
double totalRequestCharge = 0;
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
DatabaseCore database = (DatabaseCore)this.GetDatabase(id);
using (ResponseMessage readResponse = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge = readResponse.Headers.RequestCharge;
if (readResponse.StatusCode != HttpStatusCode.NotFound)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
}
}

using (ResponseMessage createResponse = await this.CreateDatabaseStreamInternalAsync(
databaseProperties,
throughputProperties,
requestOptions,
trace,
cancellationToken))
{
totalRequestCharge += createResponse.Headers.RequestCharge;
createResponse.Headers.RequestCharge = totalRequestCharge;

if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
}
}

// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
using (ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge += readResponseAfterConflict.Headers.RequestCharge;
readResponseAfterConflict.Headers.RequestCharge = totalRequestCharge;

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
}
},
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -1165,9 +1167,12 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseStreamAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseStreamAsync),
containerName: null,
databaseName: databaseProperties.Id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
Expand All @@ -1177,7 +1182,7 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
trace,
cancellationToken);
},
(response) => new OpenTelemetryResponse(response));
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

/// <summary>
Expand Down Expand Up @@ -1260,9 +1265,12 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseIfNotExistsAsync),
containerName: null,
databaseName: databaseProperties.Id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
Expand All @@ -1272,7 +1280,7 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
trace,
cancellationToken);
},
(response) => new OpenTelemetryResponse(response));
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

private async Task<DatabaseResponse> CreateDatabaseInternalAsync(
Expand Down
Loading