Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Flux<...> as QueryHandler return type #1002

Closed
raphael-jungers opened this issue Feb 22, 2019 · 8 comments
Closed

Support Flux<...> as QueryHandler return type #1002

raphael-jungers opened this issue Feb 22, 2019 · 8 comments
Assignees
Labels
Priority 1: Must Highest priority. A release cannot be made if this issue isn’t resolved. Status: Duplicate Use to signal this issue is a duplicate of another. Please refer to the other issue. Status: Resolved Use to signal that work on this issue is done. Type: Feature Use to signal an issue is completely new to the project.
Milestone

Comments

@raphael-jungers
Copy link

When using Spring Data with reactive repositories, the methods return types are Mono<...> or Flux<...>. The QueryGateway doesn't seem to support these types as QueryHandlers return types.

I'm mostly interested in using the subscriptionQuery method with a Flux as the initial response type.

@smcvb smcvb added the Status: Under Discussion Use to signal that the issue in question is being discussed. label Feb 22, 2019
@smcvb
Copy link
Member

smcvb commented Feb 22, 2019

Thanks for your feature request @raphael-jungers.
I've marked this ticket as under discussion for now.
Any suggestions from the team's side will be shared in this thread, so stay tuned. :-)

@raphael-jungers
Copy link
Author

I finally managed to use spring-data-cassandra-reactive with cassandra as the query database without any change to the framework.

Here's a code example for those interested :

The Controller :

    fun getTransactions(payload: Mono<String>): Flux<Payload> {
        return payload.map {
            ObjectMapper().readValue(it, SearchTransactionsQuery::class.java)
        }.map { query ->
            val result = queryGateway.subscriptionQuery(query, ResponseTypes.multipleInstancesOf(Transaction::class.java), ResponseTypes.instanceOf(Transaction::class.java))
            Flux.concat(result.initialResult(), result.updates())
                    .map {
                        DefaultPayload.create(ObjectMapper().writeValueAsString(it))
                    }
        }.flatMapMany { flux -> flux }
    }

The QueryHandler :

    @QueryHandler
    fun handle(query: SearchTransactionsQuery): Iterable<Transaction> {
        return transactionRepository.findAll().toIterable()
    }

The EventHandler (emitting updates)

@EventHandler
    fun on(event: TransactionExecutedEvent) {

        val transaction = Transaction(event.id, event.exampleData)

        transactionRepository.save(transaction).doOnNext { t ->
            queryUpdateEmitter.emit(SearchTransactionsQuery::class.java,
                    { query -> true },
                    t)
        }.subscribe()
    }

@smcvb
Copy link
Member

smcvb commented Feb 27, 2019

Great you got that working! :)
Iterable is indeed one of the types the ResponseType class, specifically the MultipleInstancesResponseType implementation, can cope with.
FYI: the multiple instance of ResponseType should also be able to cope with Streams and arrays, either of those three with or without generics and wildcards.

@UkonnRa
Copy link

UkonnRa commented Mar 6, 2020

+1, and maybe kotlin coroutine should also be supported

@smcvb
Copy link
Member

smcvb commented Mar 9, 2020

@UkonnRa please maintain Kotlin specific requests to the Kotlin Extension Repository. We know you've dropped an issue request there, but simply reiterating the fact here does not increase prioritization on the matter. We first need to figure out an implementation tactic for the matter (hence the "Under Discussion" tag) prior to initiation work on it.

Of course, implementation suggestion or a pull request are very much valued from any users of Axon. Thus if you are up for the task, I'd be looking forward to reviewing your PR!

@psoares
Copy link

psoares commented Jul 13, 2020

Just stumbled upon this use case and ended up doing what looks like a dirty workaround:

Flux<Foo> orderedProducts = Mono.fromFuture(queryGateway.query(new FindFoos(), ResponseTypes.multipleInstancesOf(Foo.class)))
                                      .flatMapMany(Flux::fromIterable);

@smcvb
Copy link
Member

smcvb commented Sep 27, 2022

I think we've missed adding this issue to milestone 4.6.0, when we started our efforts on the Streaming Query support we introduced in #2001.

With that in place you can use QueryGateway#streamingQuery(Q query, Class<R> responseType) to retrieve a Publisher as a response.
Furthermore, it allows your query handlers to be written in this format:

@QueryHandler
public Flux<Order> handle(FindAllOrdersQuery query) {
    return Flux.of(...
}

Given the nature of this issue, we feel the aforementioned support is thus present as of 4.6.0 of Axon Framework.
Please give it a try and let us know what you think!

@smcvb smcvb closed this as completed Sep 27, 2022
@smcvb smcvb added Type: Feature Use to signal an issue is completely new to the project. Priority 1: Must Highest priority. A release cannot be made if this issue isn’t resolved. Status: Duplicate Use to signal this issue is a duplicate of another. Please refer to the other issue. Status: Resolved Use to signal that work on this issue is done. and removed Status: Under Discussion Use to signal that the issue in question is being discussed. labels Sep 27, 2022
@smcvb smcvb added this to the Release 4.6.0 milestone Sep 27, 2022
@gklijs
Copy link
Member

gklijs commented Sep 27, 2022

Please also note that using the reactor extension, we do offer a subscription query with a flux return type.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Priority 1: Must Highest priority. A release cannot be made if this issue isn’t resolved. Status: Duplicate Use to signal this issue is a duplicate of another. Please refer to the other issue. Status: Resolved Use to signal that work on this issue is done. Type: Feature Use to signal an issue is completely new to the project.
Projects
None yet
Development

No branches or pull requests

7 participants