Skip to content

Commit

Permalink
Merge pull request #76 from hsenasilva/feature/scatter-gather-queries…
Browse files Browse the repository at this point in the history
…-extensions

[#16] Adding Scatter Gather extension queries
  • Loading branch information
smcvb committed Jan 22, 2021
2 parents 6f84e14 + 292b9db commit 41d9d82
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ import org.axonframework.messaging.responsetypes.ResponseTypes
import org.axonframework.queryhandling.QueryGateway
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.stream.Stream

/**
* Query Gateway extensions.
*
* @author Henrique Sena
*/

/**
* Reified version of [QueryGateway.query]
Expand Down Expand Up @@ -113,3 +121,114 @@ inline fun <reified R, reified Q> QueryGateway.queryOptional(query: Q): Completa
inline fun <reified R, reified Q> QueryGateway.queryOptional(queryName: String, query: Q): CompletableFuture<Optional<R>> {
return this.query(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java))
}

/**
* Reified version of [QueryGateway.scatterGather]
* which expects an Stream object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
* @param query Query to send
* @param timeout a timeout for the query
* @param timeUnit the selected TimeUnit for the given timeout
* @param [Q] the type of payload of the query
* @param [R] the response class contained in the given responseType
* @return [Stream] a stream of results
* @see QueryGateway.scatterGather
* @see ResponseTypes
* @since 0.2.0
*/
inline fun <reified R, reified Q> QueryGateway.scatterGather(query: Q, timeout: Long,
timeUnit: TimeUnit): Stream<R> {
return this.scatterGather(query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit)
}

/**
* Reified version of [QueryGateway.scatterGather] with explicit query name
* which expects an Stream object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
* @param query Query to send
* @param queryName Name of the query
* @param timeout a timeout for the query
* @param timeUnit the selected TimeUnit for the given timeout
* @param [Q] the type of payload of the query
* @param [R] the response class contained in the given responseType
* @return [Stream] a stream of results
* @see QueryGateway.scatterGather
* @see ResponseTypes
* @since 0.2.0
*/
inline fun <reified R, reified Q> QueryGateway.scatterGather(queryName: String, query: Q, timeout: Long,
timeUnit: TimeUnit): Stream<R> {
return this.scatterGather(queryName, query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit)
}

/**
* Reified version of [QueryGateway.scatterGather]
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.MultipleInstancesResponseType]
* @param query Query to send
* @param timeout a timeout for the query
* @param timeUnit the selected TimeUnit for the given timeout
* @param [Q] the type of payload of the query
* @param [R] the response class contained in the given responseType
* @return [Stream] a stream of results
* @see QueryGateway.scatterGather
* @see ResponseTypes
* @since 0.2.0
*/
inline fun <reified R, reified Q> QueryGateway.scatterGatherMany(query: Q, timeout: Long,
timeUnit: TimeUnit): Stream<List<R>> {
return this.scatterGather(query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit)
}

/**
* Reified version of [QueryGateway.scatterGather] with explicit query name
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.MultipleInstancesResponseType]
* @param query Query to send
* @param queryName Name of the query
* @param timeout a timeout for the query
* @param timeUnit the selected TimeUnit for the given timeout
* @param [Q] the type of payload of the query
* @param [R] the response class contained in the given responseType
* @return [Stream] a stream of results
* @see QueryGateway.scatterGather
* @see ResponseTypes
* @since 0.2.0
*/
inline fun <reified R, reified Q> QueryGateway.scatterGatherMany(queryName: String, query: Q, timeout: Long,
timeUnit: TimeUnit): Stream<List<R>> {
return this.scatterGather(queryName, query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit)
}

/**
* Reified version of [QueryGateway.scatterGather]
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.OptionalResponseType]
* @param query Query to send
* @param timeout a timeout for the query
* @param timeUnit the selected TimeUnit for the given timeout
* @param [Q] the type of payload of the query
* @param [R] the response class contained in the given responseType
* @return [Stream] a stream of results
* @see QueryGateway.scatterGather
* @see ResponseTypes
* @since 0.2.0
*/
inline fun <reified R, reified Q> QueryGateway.scatterGatherOptional(query: Q, timeout: Long,
timeUnit: TimeUnit): Stream<Optional<R>> {
return this.scatterGather(query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit)
}

/**
* Reified version of [QueryGateway.scatterGather] with explicit query name
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.OptionalResponseType]
* @param query Query to send
* @param queryName Name of the query
* @param timeout a timeout for the query
* @param timeUnit the selected TimeUnit for the given timeout
* @param [Q] the type of payload of the query
* @param [R] the response class contained in the given responseType
* @return [Stream] a stream of results
* @see QueryGateway.scatterGather
* @see ResponseTypes
* @since 0.2.0
*/
inline fun <reified R, reified Q> QueryGateway.scatterGatherOptional(queryName: String, query: Q, timeout: Long,
timeUnit: TimeUnit): Stream<Optional<R>> {
return this.scatterGather(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import org.axonframework.messaging.responsetypes.InstanceResponseType
import org.axonframework.queryhandling.QueryGateway
import java.util.*
import java.util.concurrent.CompletableFuture
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertSame
import kotlin.test.assertTrue
import java.util.concurrent.TimeUnit
import java.util.stream.Stream
import kotlin.test.*

/**
* Tests Query Gateway extensions.
*
* @author Stefan Andjelkovic
* @author Henrique Sena
*/
internal class QueryGatewayExtensionsTest {

Expand All @@ -43,6 +42,11 @@ internal class QueryGatewayExtensionsTest {
private val optionalReturnValue: CompletableFuture<Optional<String>> = CompletableFuture.completedFuture(Optional.of("Value"))
private val listReturnValue: CompletableFuture<List<String>> = CompletableFuture.completedFuture(listOf("Value", "Second value"))
private val subjectGateway = mockk<QueryGateway>()
private val timeout: Long = 1000
private val timeUnit = TimeUnit.SECONDS
private val streamInstanceReturnValue = Stream.of("Value")
private val streamMultipleReturnValue = Stream.of(listOf("Value", "Second Value"))
private val streamOptionalReturnValue = Stream.of(Optional.of("Value"))

@BeforeTest
fun before() {
Expand All @@ -52,6 +56,12 @@ internal class QueryGatewayExtensionsTest {
every { subjectGateway.query(queryName, exampleQuery, matchInstanceResponseType<String>()) } returns instanceReturnValue
every { subjectGateway.query(queryName, exampleQuery, matchOptionalResponseType<String>()) } returns optionalReturnValue
every { subjectGateway.query(queryName, exampleQuery, matchMultipleInstancesResponseType<String>()) } returns listReturnValue
every { subjectGateway.scatterGather(exampleQuery, matchInstanceResponseType<String>(), timeout, timeUnit) } returns streamInstanceReturnValue
every { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) } returns streamMultipleReturnValue
every { subjectGateway.scatterGather(exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) } returns streamOptionalReturnValue
every { subjectGateway.scatterGather(queryName, exampleQuery, matchInstanceResponseType<String>(), timeout, timeUnit) } returns streamInstanceReturnValue
every { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) } returns streamMultipleReturnValue
every { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) } returns streamOptionalReturnValue
}

@AfterTest
Expand Down Expand Up @@ -117,9 +127,9 @@ internal class QueryGatewayExtensionsTest {
}

val queryResult = nullableQueryGateway.query<String?, ExampleQuery>(query = exampleQuery)

assertSame(queryResult, nullInstanceReturnValue)
assertTrue(nullInstanceReturnValue.get() == null)
assertEquals(nullInstanceReturnValue.get(), null)
verify(exactly = 1) { nullableQueryGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) }
}

Expand Down Expand Up @@ -181,8 +191,83 @@ internal class QueryGatewayExtensionsTest {
val queryResult = nullableQueryGateway.query<String?, ExampleQuery>(queryName = queryName, query = exampleQuery)

assertSame(queryResult, nullInstanceReturnValue)
assertTrue(nullInstanceReturnValue.get() == null)
assertEquals(nullInstanceReturnValue.get(), null)
verify(exactly = 1) { nullableQueryGateway.query(queryName, exampleQuery, matchExpectedResponseType(String::class.java)) }
}

@Test
fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() {
val result = subjectGateway.scatterGather<String, ExampleQuery>(
query = exampleQuery,
timeout = timeout,
timeUnit = timeUnit
)

assertSame(result, streamInstanceReturnValue)
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) }
}

@Test
fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() {
val result = subjectGateway.scatterGatherMany<String, ExampleQuery>(
query = exampleQuery,
timeout = timeout,
timeUnit = timeUnit
)

assertSame(result, streamMultipleReturnValue)
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) }
}

@Test
fun `ScatterGather for Optional should invoke scatterGather method with correct generic parameters`() {
val result = subjectGateway.scatterGatherOptional<String, ExampleQuery>(
query = exampleQuery,
timeout = timeout,
timeUnit = timeUnit
)

assertSame(result, streamOptionalReturnValue)
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) }
}

@Test
fun `ScatterGather for Single should invoke scatterGather method with explicit query name`() {
val result = subjectGateway.scatterGather<String, ExampleQuery>(
queryName = queryName,
query = exampleQuery,
timeout = timeout,
timeUnit = timeUnit
)

assertSame(result, streamInstanceReturnValue)
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) }
}

@Test
fun `ScatterGather for Multiple should invoke scatterGather method with explicit query name`() {
val result = subjectGateway.scatterGatherMany<String, ExampleQuery>(
queryName = queryName,
query = exampleQuery,
timeout = timeout,
timeUnit = timeUnit
)

assertSame(result, streamMultipleReturnValue)
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) }
}

@Test
fun `ScatterGather for Optional should invoke scatterGather method with explicit query name`() {
val result = subjectGateway.scatterGatherOptional<String, ExampleQuery>(
queryName = queryName,
query = exampleQuery,
timeout = timeout,
timeUnit = timeUnit
)

assertSame(result, streamOptionalReturnValue)
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) }
}

}

0 comments on commit 41d9d82

Please sign in to comment.