Skip to content

Commit

Permalink
Merge branch 'master' into users/sourabhjain/queryperffix1
Browse files Browse the repository at this point in the history
  • Loading branch information
j82w committed Nov 9, 2021
2 parents 899399b + 19fa7fb commit 79ffee3
Show file tree
Hide file tree
Showing 20 changed files with 537 additions and 213 deletions.
3 changes: 3 additions & 0 deletions Microsoft.Azure.Cosmos.Samples/Tools/CTL/CTLConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class CTLConfig
[Option("ctl_throughput", Required = false, HelpText = "Provisioned throughput to use")]
public int Throughput { get; set; } = 100000;

[Option("ctl_db_throughput", Required = false, HelpText = "Provisioned throughput to use for databases")]
public int? DatabaseThroughput { get; set; }

[Option("ctl_read_write_query_pct", Required = false, HelpText = "Distribution of read, writes, and queries")]
public string ReadWriteQueryPercentage { get; set; } = "90,9,1";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ namespace CosmosCTL
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Net;
using System.Diagnostics;
using App.Metrics;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;
using App.Metrics.Gauge;
using App.Metrics.Timer;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;

internal class ChangeFeedPullScenario : ICTLScenario
{
Expand Down Expand Up @@ -55,7 +55,6 @@ internal class ChangeFeedPullScenario : ICTLScenario
{
Stopwatch stopWatch = Stopwatch.StartNew();

long diagnosticsThresholdDuration = (long)config.DiagnosticsThresholdDurationAsTimespan.TotalMilliseconds;
GaugeOptions documentGauge= new GaugeOptions { Name = "#Documents received", Context = loggingContextIdentifier };

TimerOptions readLatencyTimer = new TimerOptions
Expand Down Expand Up @@ -87,11 +86,12 @@ internal class ChangeFeedPullScenario : ICTLScenario
using (TimerContext timerContext = metrics.Measure.Timer.Time(readLatencyTimer))
{
response = await changeFeedPull.ReadNextAsync();
long latency = (long)timerContext.Elapsed.TotalMilliseconds;
if (latency > diagnosticsThresholdDuration)
{
logger.LogInformation("Change Feed request took more than latency threshold {0}, diagnostics: {1}", config.DiagnosticsThresholdDuration, response.Diagnostics.ToString());
}
Utils.LogDiagnostics(
logger: logger,
operationName: nameof(ChangeFeedPullScenario),
timerContextLatency: timerContext.Elapsed,
config: config,
cosmosDiagnostics: response.Diagnostics);
}

documentTotal += response.Count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ internal class QueryScenario : ICTLScenario
query.Dispose();
query = container.GetItemQueryIterator<Dictionary<string, string>>(queryText, continuation);
}

Utils.LogDiagnostics(
logger: logger,
operationName: queryName,
timerContextLatency: response.Diagnostics.GetClientElapsedTime(),
config: config,
cosmosDiagnostics: response.Diagnostics);
}

query.Dispose();
Expand Down Expand Up @@ -268,6 +275,13 @@ internal class QueryScenario : ICTLScenario
FeedResponse<Dictionary<string, string>> response = await query.ReadNextAsync();
documentTotal += response.Count;
continuation = response.ContinuationToken;

Utils.LogDiagnostics(
logger: logger,
operationName: queryName,
timerContextLatency: response.Diagnostics.GetClientElapsedTime(),
config: config,
cosmosDiagnostics: response.Diagnostics);
}

metrics.Measure.Gauge.SetValue(documentGauge, documentTotal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ internal class ReadManyScenario : ICTLScenario
Reservoir = () => new App.Metrics.ReservoirSampling.Uniform.DefaultAlgorithmRReservoir()
};

long diagnosticsThresholdDuration = (long)config.DiagnosticsThresholdDurationAsTimespan.TotalMilliseconds;
Container container = cosmosClient.GetContainer(config.Database, config.Collection);
while (stopWatch.Elapsed <= config.RunningTimeDurationAsTimespan
&& !cancellationToken.IsCancellationRequested)
Expand All @@ -90,11 +89,12 @@ internal class ReadManyScenario : ICTLScenario
using (TimerContext timerContext = metrics.Measure.Timer.Time(readLatencyTimer))
{
response = await container.ReadManyItemsAsync<Dictionary<string, string>>(this.idAndPkPairs);
long latency = (long)timerContext.Elapsed.TotalMilliseconds;
if (latency > diagnosticsThresholdDuration)
{
logger.LogInformation("ReadMany request took more than latency threshold {0}, diagnostics: {1}", config.DiagnosticsThresholdDuration, response.Diagnostics.ToString());
}
Utils.LogDiagnostics(
logger: logger,
operationName: nameof(ReadManyScenario),
timerContextLatency: timerContext.Elapsed,
config: config,
cosmosDiagnostics: response.Diagnostics);
}

metrics.Measure.Gauge.SetValue(documentGauge, response.Count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ namespace CosmosCTL
using App.Metrics.Counter;
using App.Metrics.Timer;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Extensions.Logging;

internal class ReadWriteQueryScenario : ICTLScenario
Expand Down Expand Up @@ -141,7 +138,6 @@ internal class ReadWriteQueryScenario : ICTLScenario
SemaphoreSlim concurrencyControlSemaphore = new SemaphoreSlim(config.Concurrency);
Stopwatch stopwatch = Stopwatch.StartNew();
int writeRange = readWriteQueryPercentage.ReadPercentage + readWriteQueryPercentage.WritePercentage;
long diagnosticsThresholdDuration = (long)config.DiagnosticsThresholdDurationAsTimespan.TotalMilliseconds;
List<Task> operations = new List<Task>();
for (long i = 0; ShouldContinue(stopwatch, i, config); i++)
{
Expand All @@ -156,7 +152,8 @@ internal class ReadWriteQueryScenario : ICTLScenario
partitionKeyAttributeName: config.CollectionPartitionKey,
containers: initializationResult.Containers,
createdDocumentsPerContainer: this.createdDocuments)),
onSuccess: () => {
onSuccess: () =>
{
concurrencyControlSemaphore.Release();
metrics.Measure.Counter.Increment(readSuccessMeter);
},
Expand All @@ -166,11 +163,11 @@ internal class ReadWriteQueryScenario : ICTLScenario
metrics.Measure.Counter.Increment(readFailureMeter);
Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during read operation");
},
logDiagnostics: (ItemResponse<Dictionary<string, string>> response, TimeSpan latency) => this.LogDiagnostics(
operationName: "Read",
logDiagnostics: (ItemResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
logger: logger,
diagnosticsThresholdDuration: diagnosticsThresholdDuration,
latency: latency,
operationName: "Read",
timerContextLatency: latency,
config: config,
cosmosDiagnostics: response.Diagnostics)));
}
else if (index < writeRange)
Expand All @@ -193,11 +190,11 @@ internal class ReadWriteQueryScenario : ICTLScenario
metrics.Measure.Counter.Increment(writeFailureMeter);
Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during write operation");
},
logDiagnostics: (ItemResponse<Dictionary<string, string>> response, TimeSpan latency) => this.LogDiagnostics(
operationName: "Create",
logDiagnostics: (ItemResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
logger: logger,
diagnosticsThresholdDuration: diagnosticsThresholdDuration,
latency: latency,
operationName: "Write",
timerContextLatency: latency,
config: config,
cosmosDiagnostics: response.Diagnostics)));

}
Expand All @@ -219,11 +216,11 @@ internal class ReadWriteQueryScenario : ICTLScenario
metrics.Measure.Counter.Increment(queryFailureMeter);
Utils.LogError(logger, loggingContextIdentifier, ex, "Failure during query operation");
},
logDiagnostics: (FeedResponse<Dictionary<string, string>> response, TimeSpan latency) => this.LogDiagnostics(
operationName: "Query",
logDiagnostics: (FeedResponse<Dictionary<string, string>> response, TimeSpan latency) => Utils.LogDiagnostics(
logger: logger,
diagnosticsThresholdDuration: diagnosticsThresholdDuration,
latency: latency,
operationName: "Query",
timerContextLatency: latency,
config: config,
cosmosDiagnostics: response.Diagnostics)));
}
}
Expand All @@ -234,27 +231,6 @@ internal class ReadWriteQueryScenario : ICTLScenario
operations.Count, stopwatch.Elapsed.TotalSeconds);
}

private void LogDiagnostics(
string operationName,
ILogger logger,
long diagnosticsThresholdDuration,
TimeSpan latency,
CosmosDiagnostics cosmosDiagnostics)
{
if (latency.TotalMilliseconds > diagnosticsThresholdDuration)
{
logger.LogInformation(operationName + " request took more than latency threshold {0}, diagnostics: {1}", diagnosticsThresholdDuration, cosmosDiagnostics.ToString());
return;
}

CosmosTraceDiagnostics traceDiagnostics = (CosmosTraceDiagnostics)cosmosDiagnostics;
if (traceDiagnostics.IsGoneExceptionHit())
{
logger.LogInformation(operationName + " request contains 410(GoneExceptions), latencyInMS:{0}; diagnostics:{1}", latency.TotalMilliseconds, cosmosDiagnostics.ToString());
return;
}
}

private Task<ItemResponse<Dictionary<string, string>>> CreateReadOperation(
long operation,
string partitionKeyAttributeName,
Expand Down
41 changes: 36 additions & 5 deletions Microsoft.Azure.Cosmos.Samples/Tools/CTL/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace CosmosCTL
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Extensions.Logging;

internal static class Utils
Expand Down Expand Up @@ -134,27 +135,57 @@ await Utils.ForEachAsync(documentsToCreate, (Dictionary<string, string> doc)
{
database = await cosmosClient.GetDatabase(config.Database).ReadAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
catch (CosmosException exception) when (exception.StatusCode == HttpStatusCode.NotFound)
{
DatabaseResponse databaseResponse = await cosmosClient.CreateDatabaseAsync(config.Database, config.Throughput);
await cosmosClient.CreateDatabaseAsync(config.Database, config.DatabaseThroughput);

result.CreatedDatabase = true;
database = databaseResponse.Database;
database = cosmosClient.GetDatabase(config.Database);
}

Container container;
try
{
container = await database.GetContainer(config.Collection).ReadContainerAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
catch (CosmosException exception) when (exception.StatusCode == HttpStatusCode.NotFound)
{
await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}");
if (config.Throughput > 0)
{
await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}", config.Throughput);
}
else
{
await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}");
}

result.CreatedContainer = true;
}

return result;
}

public static void LogDiagnostics(
ILogger logger,
string operationName,
TimeSpan timerContextLatency,
CTLConfig config,
CosmosDiagnostics cosmosDiagnostics)
{

if (timerContextLatency > config.DiagnosticsThresholdDurationAsTimespan)
{
logger.LogInformation($"{operationName}; LatencyInMs:{timerContextLatency.TotalMilliseconds}; request took more than latency threshold {config.DiagnosticsThresholdDuration}, diagnostics: {cosmosDiagnostics}");
}

CosmosTraceDiagnostics traceDiagnostics = (CosmosTraceDiagnostics)cosmosDiagnostics;
if (traceDiagnostics.IsGoneExceptionHit())
{
logger.LogInformation($"{operationName}; LatencyInMs:{timerContextLatency.TotalMilliseconds}; request contains 410(GoneExceptions), diagnostics:{cosmosDiagnostics}");
return;
}
}

public static void LogError(
ILogger logger,
string context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ internal sealed class ClientTelemetryProperties
internal string DateTimeUtc { get; set; }

[JsonProperty(PropertyName = "clientId")]
private string ClientId { get; }
internal string ClientId { get; }

[JsonProperty(PropertyName = "processId")]
private string ProcessId { get; }
internal string ProcessId { get; }

[JsonProperty(PropertyName = "userAgent")]
private string UserAgent { get; }
internal string UserAgent { get; }

[JsonProperty(PropertyName = "connectionMode")]
private string ConnectionMode { get; }
internal string ConnectionMode { get; }

[JsonProperty(PropertyName = "globalDatabaseAccountName")]
internal string GlobalDatabaseAccountName { get; set; }
Expand All @@ -36,7 +36,7 @@ internal sealed class ClientTelemetryProperties
internal string HostEnvInfo { get; set; }

[JsonProperty(PropertyName = "acceleratedNetworking")]
private bool? AcceleratedNetworking { get; set; }
internal bool? AcceleratedNetworking { get; set; }

/// <summary>
/// Preferred Region set by the client
Expand Down
3 changes: 3 additions & 0 deletions Microsoft.Azure.Cosmos/src/Telemetry/SystemInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ namespace Microsoft.Azure.Cosmos.Telemetry
[Serializable]
internal sealed class SystemInfo
{
[JsonProperty(PropertyName = "resource")]
internal string Resource => "HostMachine";

[JsonProperty(PropertyName = "metricInfo")]
internal MetricInfo MetricInfo { get; set; }

Expand Down

0 comments on commit 79ffee3

Please sign in to comment.