New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow configuring custom executor with WebSocket message broker [SPR-12272] #16877
Comments
Rossen Stoyanchev commented If I understand correctly the request is allow plugging in a custom ThreadPoolTaskExecutor? If so I'd like to change the title to reflect that. |
Brian Gadwell commented In part, you are correct Rossen. But, additionally, the ExecutorSubscribableChannel.SendTask class to be public and that class provide a getter for the Message. |
Rossen Stoyanchev commented Okay thanks for clarifying. Just wondering if on the ExecutorSubscribableChannel side it would be sufficient to create the Runnable task in a protected method that also provides access to the Message? In other words rather than extending the currently private SendTask, could you wrap it instead? |
Rossen Stoyanchev commented Modified title (was "spring-websocket-sequential-delivery-of-messages-on-clientoutboundchannel and clientinboundchannel-per queue") |
Brian Gadwell commented I do not have the solution I coded up in front of me right now so going off memory.
Thanks for the quick response here! |
Rossen Stoyanchev commented I've added an option to configure a custom ThreadPoolTaskExecutor (commit 521bbf) and an interface exposing the Message and MessageHandler associated with the Runnable (commit 179b23). |
Brian Gadwell commented Wow...thanks for the amazingly fast response to this Rossen! |
Brian Gadwell opened SPR-12272 and commented
First a little background:
One way to achieve this is to configure the ThreadPoolTaskExecutor to only use 1 thread. This does indeed work, but at the expense of scalability and performance.
As suggested in the stackoverflow link, this may be able to be handled by using an interceptor to add some type of ordering data to the message header and then using that data to try an re-order the messages when they arrive at the destination. This seems difficult to do correctly.
Now for my solution:
I was able to solve the issue by using an extension of ThreadPoolTaskExecutor that I created that allows for affinity based processing of tasks submitted to the executor. What I mean by this is that handlers can be registered with the executor and the handlers have an opportunity to determine if there is an affinity for the task (runnable) that was submitted. All tasks with the same affinity (just represented as a string) will be guaranteed to be executed in the order they were submitted. In our case, certain destination queues have an affinity and others do not. How long a thread waits for another task to be submitted with a given affinity is configurable, but some short value works well. I feel this is still scalable etc...
Now for the problem and why I am entering this issue:
@EnableWebsocketMessageBroker
annotation and extending AbstractWebSocketMessageBrokerConfigurer. If a <? extends ThreadPoolTaskExecutor> could be specified then this would not be a problem.I realize my suggestions for changes may not be agreeable and perhaps my solution may not be advisable, but I think it works well. This is a showstopper issue for the project I am working on and I was able to work around the problems by going down a nasty hack route that I would rather avoid. I am hoping the 3 changes above could be accomplished with little controversy.
Affects: 4.1 GA
Reference URL: http://stackoverflow.com/questions/23158928/spring-websocket-sequential-delivery-of-messages-on-clientoutboundchannel-per
Referenced from: commits 44e2921, 521bbfc, 179b236
1 votes, 3 watchers
The text was updated successfully, but these errors were encountered: