Skip to content

Latest commit

 

History

History
79 lines (58 loc) · 3.79 KB

FunctionComposition.adoc

File metadata and controls

79 lines (58 loc) · 3.79 KB

Function Composition

Spring Cloud Stream includes integration with Spring Cloud Function’s function-based programming model that lets the business logic of an application be modeled as a java.util.Supplier, a java.util.Function, and a java.util.Consumer, representing the roles of a Source, a Processor, and a Sink, respectively.

Building on this foundation, we can extend existing Source and Sink applications by importing the configuration of an existing Source or Sink and adding code that defines a java.util.Function — this delivers a lot of powerful composition possibilities.

Take for instance, the Source applications are auto-configured with functions, which may optionally be included in a composite function definition.

With this, the same Source application can potentially do one or all of the following without having to build it out as a standalone processor.

  • execute SpEL transformations

  • enrich message headers

  • filter events

  • produce task launch requests on upstream events

For example, the time source when it is running, as shown below, will perform a series of internal transformations to finally publish a task launch request every second to the rabbit exchange with the name time-test.

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar \
     --spring.cloud.stream.bindings.output.destination=time-test \
     --spring.cloud.stream.function.definition="timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction" \
     --spel.function.expression="payload.length()" \
     --header.enricher.headers=task-id=payload*2 \
     --task.launch.request.task-name-expression="'task-'+headers['task-id']"

Now, the transformed message would look like:

headers:
task-id:   34
content_type:  application/json
Payload
49 bytes
Encoding: string
{"args":[],"deploymentProps":{},"name":"task-34"}

Let us unpack what is happening behind the scenes. We will start with the following function definition.

timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction

  • Here, the function definition creates a composite Supplier beginning with the default timeSupplier Java function included in this repository, which is the foundation for time-source.

  • The spelFunction applies to a Message from which we can extract and transform the payload or headers, by following standard Spring Integration conventions.

  • The output of spelFunction is the length of the date-time String, 17.

  • From here, we apply the header-enricher Java function to add a Message header, task-id with the value of payload*2. That would be 34.

  • We use the task-id in the header to generate the "task name", and to programmatically derive the task launch request using the SpEL expression "'task-'+headers['task-id']", or task-34.

This somewhat contrived example, but the goal here was to highlight the power of function composition.

If you have had a task definition in Spring Cloud Data Flow with the name task-34, you could build a time | tasklauncher streaming data pipeline to launch that task every second.

Before the 3.0 release of Stream Applications, this composition required extensive customization. And a lot more manual configuration changes, extensions, and custom build of the applications.

Note
Support for composite functions includes auto-configuration for conventional binding name mappings (input and output) derived from the function definition and the presence of spring.cloud.stream.bindings.output…​.

In this example, --spring.cloud.stream.bindings.output.destination=time-test is enabled behind the scenes by the auto-configured property --spring.cloud.stream.function.bindings.timeSupplierspelFunctionheaderEnricherFunctiontaskLaunchRequestFunction-out-0=output.