Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i can't find anything about callback after acked. why there's no before and after method in MessageListener? #1338

Closed
bthulu opened this issue May 12, 2021 · 11 comments

Comments

@bthulu
Copy link

bthulu commented May 12, 2021

Affects Version(s): <Spring AMQP version>
all

Enhancement

sometimes we used some state in memory map. needs to get it in listener#onMessage, and remove it after acked.
but now i can only remove it in onMessage, if the method exits and ack failed, the state lost.
next time, the same message comes to onMessage, but the state is gone.
so, i was wondering why there's no interceptor like before and after method in MessageListener, then we can do some inilialize before invoke onMessage and release some resources after acked ok or not.

MessagePostProcessor seems to be act as before interceptor, what about create a new MessageAckProcessor?

@artembilan
Copy link
Member

Let's see if adviceChain option on the Listener Container may help you!

https://docs.spring.io/spring-amqp/docs/current/reference/html/#containerAttributes

An array of AOP Advice to apply to the listener execution. This can be used to apply additional cross-cutting concerns, such as automatic retry in the event of broker death. Note that simple re-connection after an AMQP error is handled by the CachingConnectionFactory, as long as the broker is still alive.

So, you write your own MethodInterceptor to deal with these args before and after calling proceed(): Channel channel, Object data.

@bthulu
Copy link
Author

bthulu commented May 17, 2021

it works, thanks

@bthulu bthulu closed this as completed May 17, 2021
@bthulu
Copy link
Author

bthulu commented May 24, 2021

the aop advice works on target method void onMessage(Message message), not after acked.
in some condition, i need to cancel consume queue after consume message acked.
the aop advice can not do this.

@bthulu bthulu reopened this May 24, 2021
@garyrussell
Copy link
Contributor

You would need to add a reference to the listener container to the advice and stop the container (use the async stop(() -> {}) to avoid a deadlock pause).

@bthulu
Copy link
Author

bthulu commented May 25, 2021

You would need to add a reference to the listener container to the advice and stop the container (use the async stop(() -> {}) to avoid a deadlock pause).

why stop container, i only need to cancel some not all.
i used one container to consume from thousands queues. but not all queues have the same behavior, some needs to cancel when run in some condition, the rest doesn't.
if every cancel by a stop, then nothing to do but stop, start, stop, start.

@bthulu
Copy link
Author

bthulu commented May 25, 2021

right now i used manual ack, then remove queueNames from container, may be another thread to remove, i'm not sure.
i wonder why not support ack callback. in a product environment, many things need to do after ack.
the advice chain works around on the method void onMessage(Message message), but the method is defined by developer himself, and only one MessageListener per container, we can just write it in the method, the advice chain seems to worth nothing.

            if (queueNameMeta.mode == TaskModeEnum.FULL && packet.isFullEnd()) {
                channel.basicAck(deliveryTag, false);
                container.removeQueueNames(props.getConsumerQueue());
                return;
            }

@garyrussell
Copy link
Contributor

Ok; now I see what your requirements are.

You are correct, there is currently no callback after the ack.

Contributions are welcome.

@zysaaa
Copy link
Contributor

zysaaa commented Dec 14, 2021

This requirement seems simple to achieve. Define a method in AbstractMessageListenerContainer and call it where SMLC and DMLC call channel#ack, for example:

        interface AckListener {
		void onComplete(boolean success, long deliveryTag, @Nullable Throwable cause) throws Exception;
	}

	AckListener ackListener;

	protected void notifyAckListener(boolean success, long deliverTag, @Nullable Throwable cause) {
		if (ackListener == null) {
			return;
		}
		try {
			ackListener.onComplete(success, deliverTag, cause);
		} catch (Exception e) {
			logger.error("xxx.", e);
		}
	}

Is it necessary to implement this feature? If necessary, does this approach make sense? @garyrussell

@garyrussell
Copy link
Contributor

Looks reasonable to me.

@zysaaa
Copy link
Contributor

zysaaa commented Dec 15, 2021

Hi gary! I am trying to implement this feature today. In the process of implementation, I found it is easy to pass in deliverTag in the callback method. But getting deliverTag in callback method doesn't seem to help?

Messageitself may be more desirable. If we want to pass in message in the callback method, there may be more changes in implementation, because both SMLC and DMLC support batch ack, which means we need to pass in a message list. (We may need to have additional data structures to store messages that have not been acked)

Any ideas? @garyrussell

@garyrussell
Copy link
Contributor

@zysaaa I don't think we want to over-complicate this; the user can capture the delivery tag in the listener and then correlate it in the callback if (s)he needs to.

zysaaa added a commit to zysaaa/spring-amqp that referenced this issue Dec 16, 2021
zysaaa added a commit to zysaaa/spring-amqp that referenced this issue Dec 16, 2021
garyrussell added a commit that referenced this issue Jun 15, 2022
- Add a default ack listener
- Use lambdas in tests
- NOSONAR tags
garyrussell pushed a commit that referenced this issue Jun 15, 2022
Resolves #1338

GH-1338: Test case for MessageAckListener
garyrussell added a commit that referenced this issue Jun 15, 2022
- Add a default ack listener
- Use lambdas in tests
- NOSONAR tags
garyrussell added a commit that referenced this issue Jun 15, 2022
garyrussell added a commit that referenced this issue Jun 15, 2022
@garyrussell garyrussell modified the milestones: Backlog, 3.0.0-M4 Oct 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants