Skip to content

salvois/ScatterGather

Repository files navigation

ScatterGather - .NET library to implement the scatter-gather pattern using a database to store distributed progress state

NuGet package Link
ScatterGather NuGet
ScatterGather.DynamoDB NuGet
ScatterGather.MongoDB NuGet

Scatter-gather is an enteprise integration pattern where a single big operation is split into a number of sub-operations, usually performed by separate workers, then some other operation must be carried on after all sub-operations have finished.

This library provides means to keep track of how many scattered sub-operations have been completed, that is gathered, using a database (currently Amazon DynamoDB and MongoDB are supported) to store that state and a gateway to manage it.

Use case

A typical scenario involves a component that is asked to process a large operation, and a set of workers this component wants to delegate parts of that large operation. For example, the first component may want to send messages to workers using a message queue, keeping processing of parts asynchronous.

Once a worker completes processing of a part, it may send a message back to the first component, or even to a third one, which act as an aggregator for the processed parts. When all parts have been processed, the large operation may move forward.

The first component performs a scatter of the large operation, while each worker perform a gather.

When a worker performs a gather, and the scatter-gather gateway provided by this libary notices it was the last part left, it calls a callback function in the context of that worker, and only that worker.

Comparison with AWS Step Functions

The scatter-gather pattern may be implemented on AWS using the Map state of Step functions. Using Map, you can split an operation across multiple workers. If you need to process a large number of sub-operations, you can run a so-called Distributed Map state, which takes its input from a file saved in an S3 bucket, containing the list the parts to scatter.

The Map state of Step functions will wait for all workers to complete before moving forward. In case of errors, the Map state will produce reports containing the failed or pending parts (saving them into an S3 bucket, in case of a Distributed Map state), which you can use to fabricate a new input to resume the failed state machine.

As an alternative, you might want to use the scatter-gather gateway provided by this library in the following cases:

  • you want to decouple the scattering component and workers using a message queue, so that processing is asynchronous
  • in case of errors, you want to take advantage of dead-letter queues so that you can restart a failing scatter-gather operation for the failed parts, using the same mechanics of non-scatter-gather operations
  • you want more control over the progress state of the scatter-gather operation
  • you don't want to create a file on S3 to list the scattered parts for a Distributed Map state
  • you don't want to parse the result files saved by the Distributed Map state to S3 to know which parts failed and which parts did not even start

Usage

The IScatterGatherGateway interface in the ScatterGather namespace is the entry point for functionality of the scatter-gather gateway provided by this library. Two implementations are provided, one using Amazon DynamoDB and MongoDB.

Each scatter-gather request must be identified by a unique ScatterRequestId generated by the application. Each scattered part must be identified by a ScatterPartId that must be unique in that scatter-gather operation. Both types are just simple wrappers for a string, to make these values type safe.

Both the scattering component and gathering workers must create an IScatterGatherGateway using one of the provided constructors.

Typically, the scattering component creates a unique ScatterRequestId and executes a BeginScatter, Scatter, EndScatter sequence, maybe calling Scatter multiple times to add more parts to the scatter-gather operation (that is, the scatter-gather gateway is "stream friendly"). Each worker calls Gather to mark each part as completed.

Performance-wise, the run time of all methods of IScatterGatherGateway is proportional to the number of elements passed to that method, but is irrespective of the number of elements in the whole scatter-gather operation.

Scattering methods

Task    BeginScatter(ScatterRequestId requestId, string context);
Task    Scatter     (ScatterRequestId requestId, IEnumerable<ScatterPartId> partIds, Func<Task> callback);
Task<T> Scatter<T>  (ScatterRequestId requestId, IEnumerable<ScatterPartId> partIds, Func<Task<T>> callback);
Task    EndScatter  (ScatterRequestId requestId, Func<string, Task> handleCompletion);

BeginScatter initializes a new scatter-gather request identified by requestId and accepts an arbitrary context string that is associated with that request. This context string can contain any text meaningful to the application, perhaps even some JSON-encoded data, and it will be passed to the completion handler function. The size of the context string is limited by the size of the underlying storage, that is less than 400 kB for DynamoDB and less than 16 megabytes for MongoDB.

Scatter tells the scatter-gather gateway that there are one or more new parts that are about to be scattered for the request identified by requestId. Part identifiers are listed in partIds. The callback function must be specified to execute specific action on each scattered part, for example to send a message to a worker. Scatter has two overloads: one accepting a callback function returning a Task<T>, where the resulting T is returned by Scatter itself, or one returning a Task, where Scatter itself returns nothing.

EndScatter signals that all parts have been scattered, thus it is now possible to expect completion of the whole scatter-gather operation. The EndScatter calls the completion handler function in case all parts, if any, have been gathered so fast that the scatter-gather operation is already completed.

Gathering methods

Task Gather(ScatterRequestId requestId, IReadOnlyCollection<ScatterPartId> partIds, Func<string, Task> handleCompletion)

A worker calls Gather after processing its parts (after is important for idempotency), to mark those parts as complete. The gathered parts (maybe just one) are identified by partIds within requestId. The handleCompletion function will be executed if the scatter-gather gateway notices that that was the last part to be gathered.

Note that only one worker will be able to call the completion handler function, because the scatter-gather gateway treats it as a critical section. Also note that, in case of errors during the completion handler function, restarting the worker that was processing the last Gather will restart the completion handler function (that is, the critical section is re-entrant).

Database-specific implementations

This library provides two implementations of the IScatterGatherGateway interface, allowing to choose between Amazon DynamoDB and MongoDB as storage to persist the progress state of a scatter-gather operation among distributed workers.

DynamoDB

The scatter-gather gateway for DynamoDB is provided by the ScatterGather.DynamoDB library.

The scatter-gather gateway needs a pair of master-detail DynamoDB tables to keep its state: one to store current scatter requests and one to list scattered parts.

The scatter-gather gateway will try to create those tables if they don't already exist, otherwise it will use the ones already present. Considering the typical usage patter of the scatter-gather gateway, the tables are created with a "pay per request" billing mode.

If you want more control over the billing mode and provisioned capacity, you can create the pair of tables manually, as follows:

  • a table for scatter requests, that will contain an item for each scatter request that has been created, having a simple primary key composed of a RequestId string field as the partition key
  • a table for scattered parts, that will contain an item for each scattered sub-operation of a scatter request, having a composite primary key composed of a RequestId string field as the partition key and a PartId string field as the sort key

Note that your application will need the following permissions on those two tables: dynamodb:CreateTable, dynamodb:DescribeTable, dynamodb:Query, dynamodb:PutItem, dynamodb:DeleteItem, dynamodb:UpdateItem and dynamodb:BatchWriteItem.

For billing, given a scatter-gather operation consisting of N scattered parts, account for about 4 write request units for the request table, and about 2N write request units and N read request units for the part table. This is for normal operation with no errors and restarts involved.

To construct the scatter-gather gateway use one of the constructors of the ScatterGatherGateway class in the ScatterGather.DynamoDB namespace:

ScatterGatherGateway(string requestTableName, string partTableName);
ScatterGatherGateway(string dynamoDbServiceUrlOption, string requestTableName, string partTableName);

Pass either constructor the names for the request table and the part table, which may either be already existing or not. The second constructor allows you to pass a custom URL for the DynamoDB service, useful when you want to use, for example, a local DynamoDB for development and testing.

MongoDB

The scatter-gather gateway for MongoDB is provided by the ScatterGather.MongoDB library.

To construct the scatter-gather gateway use one of the constructors of the ScatterGatherGateway class in the ScatterGather.MongoDB namespace:

ScatterGatherGateway(IMongoDatabase mongoDatabase, string collectionNamePrefix);

Pass the constructor the instance of the Mongo database to store the scatter-gather state and a string that will be used as a name prefix for the collections used by the scatter-gather gateway. Two collections are used, named <collectionNamePrefix>.Requests and <collectionNamePrefix>.Parts, storing scatted-gather operations and sub-operations respectively.

Example code

A full example using either database-specific implementation is provided in the Example directory of the repository.

Testing locally

Automated tests are run against containers created from the DynamoDB-local and MongoDB Docker images. A docker-compose file is provided so that you can just run docker-compose up to run the DynamoDB and MongoDB containers before running tests. DynamoDB and MongoDB are mapped to TCP ports 8998 and 27017 on the host, respectively, and use no authentication.

Special thanks

Thanks to Matteo Pierangeli for his initial review and comments!

License

Permissive, 2-clause BSD style

ScatterGather - .NET library to implement the scatter-gather pattern using a database to store distributed progress state

Copyright 2023 Salvatore ISAJA. All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.

  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

About

.NET library to implement the scatter-gather pattern using a database to store distributed progress state

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages