Skip to content

Commit

Permalink
[Internal] ClientTelemetry: Adds logic to limit payload size to 2 MB (#…
Browse files Browse the repository at this point in the history
…3717)

* first draft

wip

fix test and logic

* resolve conflicts

* limit 2 mb

* ad callback

* fix tests

* code refactor

* cosmos json to newtosoft json

* clean up files

* fix logging to argumrnt based

* code refactor

* add null check
  • Loading branch information
sourabh1007 authored Mar 9, 2023
1 parent dc3d037 commit 6ebb8d4
Show file tree
Hide file tree
Showing 10 changed files with 859 additions and 258 deletions.
184 changes: 31 additions & 153 deletions Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Handler;
Expand All @@ -20,9 +18,6 @@ namespace Microsoft.Azure.Cosmos.Telemetry
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
using Microsoft.Azure.Documents.Rntbd;
using Newtonsoft.Json;
using Util;
using static Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum;

Expand All @@ -36,12 +31,10 @@ internal class ClientTelemetry : IDisposable
{
private const int allowedNumberOfFailures = 3;

private static readonly Uri endpointUrl = ClientTelemetryOptions.GetClientTelemetryEndpoint();
private static readonly TimeSpan observingWindow = ClientTelemetryOptions.GetScheduledTimeSpan();

private readonly ClientTelemetryProperties clientTelemetryInfo;
private readonly CosmosHttpClient httpClient;
private readonly AuthorizationTokenProvider tokenProvider;
private readonly ClientTelemetryProcessor processor;
private readonly DiagnosticsHandlerHelper diagnosticsHelper;

private readonly CancellationTokenSource cancellationTokenSource;
Expand Down Expand Up @@ -108,7 +101,7 @@ public static ClientTelemetry CreateAndStartBackgroundTelemetry(
return clientTelemetry;
}

private ClientTelemetry(
internal ClientTelemetry(
string clientId,
CosmosHttpClient httpClient,
string userAgent,
Expand All @@ -118,9 +111,8 @@ private ClientTelemetry(
IReadOnlyList<string> preferredRegions,
GlobalEndpointManager globalEndpointManager)
{
this.httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
this.diagnosticsHelper = diagnosticsHelper ?? throw new ArgumentNullException(nameof(diagnosticsHelper));
this.tokenProvider = authorizationTokenProvider ?? throw new ArgumentNullException(nameof(authorizationTokenProvider));
this.processor = new ClientTelemetryProcessor(httpClient, authorizationTokenProvider);

this.clientTelemetryInfo = new ClientTelemetryProperties(
clientId: clientId,
Expand Down Expand Up @@ -170,39 +162,45 @@ private async Task EnrichAndSendAsync()

await Task.Delay(observingWindow, this.cancellationTokenSource.Token);

this.clientTelemetryInfo.DateTimeUtc = DateTime.UtcNow.ToString(ClientTelemetryOptions.DateFormat);
this.clientTelemetryInfo.MachineId = VmMetadataApiHandler.GetMachineId();

// Load host information from cache
Compute vmInformation = VmMetadataApiHandler.GetMachineInfo();
this.clientTelemetryInfo.ApplicationRegion = vmInformation?.Location;
this.clientTelemetryInfo.HostEnvInfo = ClientTelemetryOptions.GetHostInformation(vmInformation);

// If cancellation is requested after the delay then return from here.
if (this.cancellationTokenSource.IsCancellationRequested)
{
DefaultTrace.TraceInformation("Observer Task Cancelled.");

break;
}

this.RecordSystemUtilization();
this.clientTelemetryInfo.SystemInfo = ClientTelemetryHelper.RecordSystemUtilization(this.diagnosticsHelper,
this.clientTelemetryInfo.IsDirectConnectionMode);

this.clientTelemetryInfo.DateTimeUtc = DateTime.UtcNow.ToString(ClientTelemetryOptions.DateFormat);

ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot
// Take the copy for further processing i.e. serializing and dividing into chunks
ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot
= Interlocked.Exchange(ref this.operationInfoMap, new ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)>());

ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot
= Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>());
= Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>());

ConcurrentDictionary<RequestInfo, LongConcurrentHistogram> requestInfoSnapshot
= Interlocked.Exchange(ref this.requestInfoMap, new ConcurrentDictionary<RequestInfo, LongConcurrentHistogram>());

this.clientTelemetryInfo.OperationInfo = ClientTelemetryHelper.ToListWithMetricsInfo(operationInfoSnapshot);
this.clientTelemetryInfo.CacheRefreshInfo = ClientTelemetryHelper.ToListWithMetricsInfo(cacheRefreshInfoSnapshot);
this.clientTelemetryInfo.RequestInfo = ClientTelemetryHelper.ToListWithMetricsInfo(requestInfoSnapshot);

await this.SendAsync();
try
{
await this.processor
.ProcessAndSendAsync(
clientTelemetryInfo: this.clientTelemetryInfo,
operationInfoSnapshot: operationInfoSnapshot,
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
requestInfoSnapshot: requestInfoSnapshot,
cancellationToken: this.cancellationTokenSource.Token);

this.numberOfFailures = 0;
}
catch (Exception ex)
{
this.numberOfFailures++;

DefaultTrace.TraceError("Telemetry Job Processor failed with error : {0}", ex);
}
}
}
catch (Exception ex)
Expand Down Expand Up @@ -233,8 +231,8 @@ internal void CollectCacheInfo(string cacheRefreshSource,
throw new ArgumentNullException(nameof(cacheRefreshSource));
}

DefaultTrace.TraceVerbose($"Collecting cacheRefreshSource {cacheRefreshSource} data for Telemetry.");
DefaultTrace.TraceVerbose($"Collecting cacheRefreshSource {0} data for Telemetry.", cacheRefreshSource);

string regionsContacted = ClientTelemetryHelper.GetContactedRegions(regionsContactedList);

// Recording Request Latency
Expand Down Expand Up @@ -290,7 +288,7 @@ internal void CollectOperationInfo(CosmosDiagnostics cosmosDiagnostics,
ITrace trace)
{
DefaultTrace.TraceVerbose("Collecting Operation data for Telemetry.");

if (cosmosDiagnostics == null)
{
throw new ArgumentNullException(nameof(cosmosDiagnostics));
Expand All @@ -301,7 +299,7 @@ internal void CollectOperationInfo(CosmosDiagnostics cosmosDiagnostics,
this.RecordRntbdResponses(containerId, databaseId, summaryDiagnostics.StoreResponseStatistics.Value);

string regionsContacted = ClientTelemetryHelper.GetContactedRegions(cosmosDiagnostics.GetContactedRegions());

// Recording Request Latency and Request Charge
OperationInfo payloadKey = new OperationInfo(regionsContacted: regionsContacted?.ToString(),
responseSizeInBytes: responseSizeInBytes,
Expand Down Expand Up @@ -370,126 +368,6 @@ private void RecordRntbdResponses(string containerId, string databaseId, List<St
}
}
}

/// <summary>
/// Record CPU and memory usage which will be sent as part of telemetry information
/// </summary>
private void RecordSystemUtilization()
{
try
{
DefaultTrace.TraceVerbose("Started Recording System Usage for telemetry.");

SystemUsageHistory systemUsageHistory = this.diagnosticsHelper.GetClientTelemetrySystemHistory();

if (systemUsageHistory != null )
{
ClientTelemetryHelper.RecordSystemUsage(
systemUsageHistory: systemUsageHistory,
systemInfoCollection: this.clientTelemetryInfo.SystemInfo,
isDirectConnectionMode: this.clientTelemetryInfo.IsDirectConnectionMode);
}
else
{
DefaultTrace.TraceWarning("System Usage History not available");
}
}
catch (Exception ex)
{
DefaultTrace.TraceError("System Usage Recording Error : {0} ", ex);
}
}

/// <summary>
/// Task to send telemetry information to configured Juno endpoint.
/// If endpoint is not configured then it won't even try to send information. It will just trace an error message.
/// In any case it resets the telemetry information to collect the latest one.
/// </summary>
/// <returns>Async Task</returns>
private async Task SendAsync()
{
if (endpointUrl == null)
{
DefaultTrace.TraceError("Telemetry is enabled but endpoint is not configured");
return;
}

try
{
DefaultTrace.TraceInformation("Sending Telemetry Data to {0}", endpointUrl.AbsoluteUri);

string json = JsonConvert.SerializeObject(this.clientTelemetryInfo, ClientTelemetryOptions.JsonSerializerSettings);

using HttpRequestMessage request = new HttpRequestMessage
{
Method = HttpMethod.Post,
RequestUri = endpointUrl,
Content = new StringContent(json, Encoding.UTF8, "application/json")
};

async ValueTask<HttpRequestMessage> CreateRequestMessage()
{
INameValueCollection headersCollection = new StoreResponseNameValueCollection();
await this.tokenProvider.AddAuthorizationHeaderAsync(
headersCollection,
endpointUrl,
"POST",
AuthorizationTokenType.PrimaryMasterKey);

foreach (string key in headersCollection.AllKeys())
{
request.Headers.Add(key, headersCollection[key]);
}

request.Headers.Add(HttpConstants.HttpHeaders.DatabaseAccountName, this.clientTelemetryInfo.GlobalDatabaseAccountName);
String envName = ClientTelemetryOptions.GetEnvironmentName();
if (!String.IsNullOrEmpty(envName))
{
request.Headers.Add(HttpConstants.HttpHeaders.EnvironmentName, envName);
}

return request;
}

using HttpResponseMessage response = await this.httpClient.SendHttpAsync(CreateRequestMessage,
ResourceType.Telemetry,
HttpTimeoutPolicyNoRetry.Instance,
null,
this.cancellationTokenSource.Token);

if (!response.IsSuccessStatusCode)
{
this.numberOfFailures++;

DefaultTrace.TraceError("Juno API response not successful. Status Code : {0}, Message : {1}", response.StatusCode, response.ReasonPhrase);
}
else
{
this.numberOfFailures = 0; // Ressetting failure counts on success call.
DefaultTrace.TraceInformation("Telemetry data sent successfully.");
}

}
catch (Exception ex)
{
this.numberOfFailures++;

DefaultTrace.TraceError("Exception while sending telemetry data : {0}", ex);
}
finally
{
// Reset SystemInfo Dictionary for new data.
this.Reset();
}
}

/// <summary>
/// Reset all the operation, System Utilization and Cache refresh related collections
/// </summary>
private void Reset()
{
this.clientTelemetryInfo.SystemInfo.Clear();
}

/// <summary>
/// Dispose of cosmos client.It will get disposed with client so not making it thread safe.
Expand Down
Loading

0 comments on commit 6ebb8d4

Please sign in to comment.