Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RavenDB-21956 Add azure queue storage etl #18293

Open
wants to merge 7 commits into
base: v6.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
using System;
using System.Linq;
using Raven.Client.Documents.Operations.ETL.SQL;
using Sparrow.Json.Parsing;

namespace Raven.Client.Documents.Operations.ETL.Queue;

public sealed class AzureQueueStorageConnectionSettings
{
public EntraId EntraId { get; set; }

public string ConnectionString { get; set; }

public Passwordless Passwordless { get; set; }

public bool IsValidConnection()
{
if (IsOnlyOneConnectionProvided() == false)
{
return false;
}

if (EntraId != null && EntraId.IsValid() == false)
{
return false;
}

if (Passwordless != null && Passwordless.IsValid() == false)
{
return false;
}

return true;
}

private bool IsOnlyOneConnectionProvided()
{
int count = 0;

if (EntraId != null)
count++;

if (!string.IsNullOrWhiteSpace(ConnectionString))
count++;

if (Passwordless != null)
count++;

return count == 1;
}

public string GetStorageUrl()
{
if (ConnectionString != null)
{
return GetUrlFromConnectionString(ConnectionString);
}

string storageAccountName = GetStorageAccountName();
return $"https://{storageAccountName}.queue.core.windows.net/";
arekpalinski marked this conversation as resolved.
Show resolved Hide resolved
}

private string GetUrlFromConnectionString(string connectionString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have unit tests for method? So we'll know that we properly deal with connection strings (it can be handled with the usage of a public GetStorageUrl() method)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
var protocol = SqlConnectionStringParser.GetConnectionStringValue(connectionString, ["DefaultEndpointsProtocol"]);
if (string.IsNullOrWhiteSpace(protocol))
{
ThrowConnectionStringError("Protocol not found in the connection string");
}

if (protocol.Equals("http"))
{
var queueEndpoint = SqlConnectionStringParser.GetConnectionStringValue(connectionString, ["QueueEndpoint"]);
if (string.IsNullOrWhiteSpace(queueEndpoint))
{
ThrowConnectionStringError("Queue endpoint not found in the connection string");
}

return queueEndpoint;
}

var accountName = SqlConnectionStringParser.GetConnectionStringValue(connectionString, ["AccountName"]);
if (string.IsNullOrWhiteSpace(accountName))
{
ThrowConnectionStringError("Storage account name not found in the connection string");
}

return $"https://{accountName}.queue.core.windows.net/";
}

private string GetStorageAccountName()
{
string storageAccountName = "";

if (EntraId != null)
{
storageAccountName = EntraId.StorageAccountName;
}
else if (Passwordless != null)
{
storageAccountName = Passwordless.StorageAccountName;
}

return storageAccountName;
}

private void ThrowConnectionStringError(string message)
{
throw new ArgumentException(message, nameof(ConnectionString));
}

public DynamicJsonValue ToJson()
{
var json = new DynamicJsonValue
{
[nameof(ConnectionString)] = ConnectionString,
[nameof(EntraId)] = EntraId == null
? null
: new DynamicJsonValue
{
[nameof(EntraId.StorageAccountName)] = EntraId?.StorageAccountName,
[nameof(EntraId.TenantId)] = EntraId?.TenantId,
[nameof(EntraId.ClientId)] = EntraId?.ClientId,
[nameof(EntraId.ClientSecret)] = EntraId?.ClientSecret
},
[nameof(Passwordless)] = Passwordless == null
? null
: new DynamicJsonValue { [nameof(Passwordless.StorageAccountName)] = Passwordless?.StorageAccountName }
};

return json;
}
}

public sealed class EntraId
{
public string StorageAccountName { get; set; }
public string TenantId { get; set; }
public string ClientId { get; set; }
public string ClientSecret { get; set; }

public bool IsValid()
{
return string.IsNullOrWhiteSpace(StorageAccountName) == false &&
string.IsNullOrWhiteSpace(TenantId) == false &&
string.IsNullOrWhiteSpace(ClientId) == false &&
string.IsNullOrWhiteSpace(ClientSecret) == false;
}
}

// this is used for machine authentication
public sealed class Passwordless
arekpalinski marked this conversation as resolved.
Show resolved Hide resolved
{
public string StorageAccountName { get; set; }

public bool IsValid()
{
return string.IsNullOrWhiteSpace(StorageAccountName) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ public enum QueueBrokerType
{
None,
Kafka,
RabbitMq
RabbitMq,
AzureQueueStorage
arekpalinski marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public sealed class QueueConnectionString : ConnectionString
public KafkaConnectionSettings KafkaConnectionSettings { get; set; }

public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }

public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }

public override ConnectionStringType Type => ConnectionStringType.Queue;

Expand All @@ -31,6 +33,12 @@ protected override void ValidateImpl(ref List<string> errors)
errors.Add($"{nameof(RabbitMqConnectionSettings)} has no valid setting.");
}
break;
case QueueBrokerType.AzureQueueStorage:
if (AzureQueueStorageConnectionSettings.IsValidConnection() == false)
{
errors.Add($"{nameof(AzureQueueStorageConnectionSettings)} has no valid setting.");
}
break;
default:
throw new NotSupportedException($"'{BrokerType}' broker is not supported");
}
Expand All @@ -52,6 +60,9 @@ internal string GetUrl()

url = indexOfStartServerUri != -1 ? connectionString.Substring(indexOfStartServerUri + 1) : null;
break;
case QueueBrokerType.AzureQueueStorage:
url = AzureQueueStorageConnectionSettings.GetStorageUrl();
break;
default:
throw new NotSupportedException($"'{BrokerType}' broker is not supported");
}
Expand All @@ -66,6 +77,7 @@ public override DynamicJsonValue ToJson()
json[nameof(BrokerType)] = BrokerType;
json[nameof(KafkaConnectionSettings)] = KafkaConnectionSettings?.ToJson();
json[nameof(RabbitMqConnectionSettings)] = RabbitMqConnectionSettings?.ToJson();
json[nameof(AzureQueueStorageConnectionSettings)] = AzureQueueStorageConnectionSettings?.ToJson();

return json;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ public override bool UsingEncryptedCommunicationChannel()
}
break;
case QueueBrokerType.RabbitMq:
return Connection.RabbitMqConnectionSettings.ConnectionString.StartsWith("amqps", StringComparison.OrdinalIgnoreCase);
return Connection.RabbitMqConnectionSettings.ConnectionString.StartsWith("amqps",
StringComparison.OrdinalIgnoreCase);
case QueueBrokerType.AzureQueueStorage:
return Connection.AzureQueueStorageConnectionSettings.GetStorageUrl()
.StartsWith("https", StringComparison.OrdinalIgnoreCase);
arekpalinski marked this conversation as resolved.
Show resolved Hide resolved
default:
throw new NotSupportedException($"Unknown broker type: {BrokerType}");
}
Expand Down
12 changes: 12 additions & 0 deletions src/Raven.Server/Config/Categories/EtlConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,17 @@ public sealed class EtlConfiguration : ConfigurationCategory
[TimeUnit(TimeUnit.Seconds)]
[ConfigurationEntry("ETL.Queue.Kafka.InitTransactionsTimeoutInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)]
public TimeSetting KafkaInitTransactionsTimeout { get; set; }

[Description("Lifespan of a message in the queue")]
[DefaultValue(604800)] // 7 days (Azure default)
[TimeUnit(TimeUnit.Seconds)]
[ConfigurationEntry("ETL.Queue.AzureQueueStorage.TimeToLiveInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)]
public TimeSetting AzureQueueStorageTimeToLive{ get; set; }

[Description("How long a message is hidden after being retrieved but not deleted")]
[DefaultValue(0)] // azure default
[TimeUnit(TimeUnit.Seconds)]
[ConfigurationEntry("ETL.Queue.AzureQueueStorage.VisibilityTimeoutInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)]
public TimeSetting AzureQueueStorageVisibilityTimeout{ get; set; }
}
}
9 changes: 7 additions & 2 deletions src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData
long rabbitMqEtlCountOnNode = GetTaskCountOnNode<QueueEtlConfiguration>(database, dbRecord, serverStore, database.EtlLoader.QueueDestinations,
task => EtlLoader.GetProcessState(task.Transforms, database, task.Name), task => task.BrokerType == QueueBrokerType.RabbitMq);

var azureQueueStorageEtlCount = database.EtlLoader.GetQueueDestinationCountByBroker(QueueBrokerType.AzureQueueStorage);
long azureQueueStorageEtlCountOnNode = GetTaskCountOnNode<QueueEtlConfiguration>(database, dbRecord, serverStore, database.EtlLoader.QueueDestinations,
task => EtlLoader.GetProcessState(task.Transforms, database, task.Name), task => task.BrokerType == QueueBrokerType.AzureQueueStorage);

var periodicBackupCount = database.PeriodicBackupRunner.PeriodicBackups.Count;
long periodicBackupCountOnNode = BackupUtils.GetTasksCountOnNode(serverStore, database.Name, context);

Expand All @@ -364,8 +368,8 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData
task => QueueSinkLoader.GetProcessState(task.Scripts, database, task.Name), task => task.BrokerType == QueueBrokerType.RabbitMq);

ongoingTasksCount = extRepCount + replicationHubCount + replicationSinkCount +
ravenEtlCount + sqlEtlCount + elasticSearchEtlCount + olapEtlCount + kafkaEtlCount + rabbitMqEtlCount +
periodicBackupCount + subscriptionCount +
ravenEtlCount + sqlEtlCount + elasticSearchEtlCount + olapEtlCount + kafkaEtlCount +
rabbitMqEtlCount + azureQueueStorageEtlCount + periodicBackupCount + subscriptionCount +
kafkaSinkCount + rabbitMqSinkCount;

return new DatabaseOngoingTasksInfoItem
Expand All @@ -380,6 +384,7 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData
OlapEtlCount = olapEtlCountOnNode,
KafkaEtlCount = kafkaEtlCountOnNode,
RabbitMqEtlCount = rabbitMqEtlCountOnNode,
AzureQueueStorageEtlCount = azureQueueStorageEtlCountOnNode,
PeriodicBackupCount = periodicBackupCountOnNode,
SubscriptionCount = subscriptionCountOnNode,
KafkaSinkCount = kafkaSinkCountOnNode,
Expand Down
3 changes: 3 additions & 0 deletions src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public sealed class DatabaseOngoingTasksInfoItem : IDynamicJson
public long KafkaEtlCount { get; set; }

public long RabbitMqEtlCount { get; set; }

public long AzureQueueStorageEtlCount { get; set; }

public long PeriodicBackupCount { get; set; }

Expand All @@ -57,6 +59,7 @@ public DynamicJsonValue ToJson()
[nameof(OlapEtlCount)] = OlapEtlCount,
[nameof(KafkaEtlCount)] = KafkaEtlCount,
[nameof(RabbitMqEtlCount)] = RabbitMqEtlCount,
[nameof(AzureQueueStorageEtlCount)] = AzureQueueStorageEtlCount,
arekpalinski marked this conversation as resolved.
Show resolved Hide resolved
[nameof(PeriodicBackupCount)] = PeriodicBackupCount,
[nameof(SubscriptionCount)] = SubscriptionCount,
[nameof(KafkaSinkCount)] = KafkaSinkCount,
Expand Down
31 changes: 31 additions & 0 deletions src/Raven.Server/Documents/ETL/EtlLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Raven.Server.Documents.ETL.Providers.ElasticSearch;
using Raven.Server.Documents.ETL.Providers.OLAP;
using Raven.Server.Documents.ETL.Providers.Queue;
using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;
using Raven.Server.Documents.ETL.Providers.Queue.Kafka;
using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq;
using Raven.Server.Documents.ETL.Providers.Raven;
Expand Down Expand Up @@ -646,6 +647,29 @@ public void HandleDatabaseRecordChange(DatabaseRecord record)

break;
}
case AzureQueueStorageEtl azureQueueStorageEtl:
{
QueueEtlConfiguration existing = null;

foreach (var config in myQueueEtl)
{
var diff = azureQueueStorageEtl.Configuration.Compare(config);

if (diff == EtlConfigurationCompareDifferences.None)
{
existing = config;
break;
}
}

if (existing != null)
{
toRemove.Remove(processesPerConfig.Key);
myQueueEtl.Remove(existing);
}

break;
}
case ElasticSearchEtl elasticSearchEtl:
{
ElasticSearchEtlConfiguration existing = null;
Expand Down Expand Up @@ -794,6 +818,13 @@ private void LogLongRunningDisposeIfNeeded(Stopwatch sp, string processName)
if (existing != null)
differences = rabbitMqEtl.Configuration.Compare(existing, transformationDiffs);
}
else if (process is AzureQueueStorageEtl azureQueueStorageEtl)
arekpalinski marked this conversation as resolved.
Show resolved Hide resolved
{
var existing = myQueueEtl.FirstOrDefault(x => x.Name.Equals(azureQueueStorageEtl.ConfigurationName, StringComparison.OrdinalIgnoreCase));

if (existing != null)
differences = azureQueueStorageEtl.Configuration.Compare(existing, transformationDiffs);
}
else
{
throw new InvalidOperationException($"Unknown ETL process type: " + process.GetType().FullName);
Expand Down
16 changes: 16 additions & 0 deletions src/Raven.Server/Documents/ETL/EtlProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using Raven.Server.Documents.ETL.Providers.OLAP;
using Raven.Server.Documents.ETL.Providers.OLAP.Test;
using Raven.Server.Documents.ETL.Providers.Queue;
using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;
using Raven.Server.Documents.ETL.Providers.Queue.Kafka;
using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq;
using Raven.Server.Documents.ETL.Providers.Raven;
Expand Down Expand Up @@ -1316,6 +1317,21 @@ public override OngoingTaskConnectionStatus GetConnectionStatus()

var result = rabbitMqEtl.RunTest(results, context);
result.DebugOutput = debugOutput;
return result;
}
case AzureQueueStorageEtl azureQueueStorageEtl:
using (azureQueueStorageEtl.EnterTestMode(out debugOutput))
{
azureQueueStorageEtl.EnsureThreadAllocationStats();

var queueItem = new QueueItem(document, docCollection);

var results = azureQueueStorageEtl.Transform(new[] { queueItem }, context, new EtlStatsScope(new EtlRunStats()),
new EtlProcessState());

var result = azureQueueStorageEtl.RunTest(results, context);
result.DebugOutput = debugOutput;
arekpalinski marked this conversation as resolved.
Show resolved Hide resolved

return result;
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading.Tasks;
using JetBrains.Annotations;
using Raven.Client.Documents.Operations.ETL.Queue;
using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;
using Raven.Server.Documents.ETL.Providers.Queue.Kafka;
using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq;
using Raven.Server.Documents.ETL.Stats;
Expand Down Expand Up @@ -37,6 +38,7 @@ await using (var writer = new AsyncBlittableJsonTextWriterForDebug(context, Serv
{
RabbitMqEtl => QueueBrokerType.RabbitMq,
KafkaEtl => QueueBrokerType.Kafka,
AzureQueueStorageEtl => QueueBrokerType.AzureQueueStorage,
_ => null
}
}).ToArray();
Expand Down