Skip to content

Direct reply-to still needs correlation ID to work!? #1312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
igor-susic opened this issue Mar 19, 2021 · 8 comments · Fixed by #1334
Closed

Direct reply-to still needs correlation ID to work!? #1312

igor-susic opened this issue Mar 19, 2021 · 8 comments · Fixed by #1334

Comments

@igor-susic
Copy link

igor-susic commented Mar 19, 2021

When communicating between two different RabbitMQ clients and using direct reply-to feature, spring-amqp project requests message correlation ID to be set. This is not quite how this feature should behave if I understood the documentation well enough.

So per direct reply-to documentation when you send request (message) that has property repy-to set to amq.rabbitmq.reply-to then Rabbit will generate some unique string in a way that will look like this:
amq.rabbitmq.reply-to.g1hkABlyYWJiaXRAcmFiYml0LXNlY29uZC1ub2RlAAAEBQAAAABgVO6L.dRFik1oCgCYm5rUQycVZ3Q==
which then should be used by the server that will respond by sending it's response to the "" exchange and with the mentioned unique string as routing key.

Nowhere does it asks for correlation ID, it should not be requested as the Rabbit itself can manage pairing request with response with this approach.

Also in spring-amqp documentation for direct repy-to it stands:

Starting with version 3.4.0, the RabbitMQ server supports direct reply-to. This eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with Spring AMQP version 1.4.1 direct reply-to is used by default (if supported by the server) instead of creating temporary reply queues.

and also

When using a fixed reply queue (other than amq.rabbitmq.reply-to), you must provide correlation data so that replies can be correlated to requests.

This leads me to believe that correlation ID should not come in question in this use case. Spring on the other hand behaves differently.
While sending message it will set the correlation ID to the value of message tag this happens at the line 2033 inside RabbitTemplate and when receiving reply it fails on line 615 in RabbitTemplate where it needs correlation ID to proceed.

Some code to reproduce this would be:

Java client service as Spring Boot @Service bean:

@Service
@AllArgsConstructor
@Slf4j
public class PredictionService {

    private final RabbitTemplate rabbitTemplate;

    public String getReply() {
        log.info("Sending RPC request");
        HashMap result = (HashMap) rabbitTemplate
                .convertSendAndReceive("amq.direct", "key", "Hi, give me prediction");
        log.info("Received RPC request: {}", result);

        Double predictionScore = (Double) result.get("predictionScore");
        String metaData = (String) result.get("someMetaData");
        log.info("Meta data received: {}", metaData);

        if (predictionScore > 0.5) {
            return "Great";
        }

        return "Not great";
    }
}

Python server service:

from fastapi import FastAPI
import pika
import random
import json

app = FastAPI()
credentials = pika.PlainCredentials(username='test', password='test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672, virtual_host="/", credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue="rpc_queue")
channel.queue_bind(exchange="amq.direct", queue="rpc_queue", routing_key="key")

def on_request(ch, method_frame, props, body):
    ch.basic_publish(exchange='', routing_key=props.reply_to, body=json.dumps(
        {"predictionScore": 0.7, "someMetaData":"bla bla"}))
    ch.basic_ack(delivery_tag=method_frame.delivery_tag)

channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print("Waiting for message to come :)")

Where the trick to get this working is to set correlation ID at the same value that came with the message

ch.basic_publish(exchange='', routing_key=props.reply_to, body=json.dumps(
        {"predictionScore": 0.7, "someMetaData":"bla bla"}),
                     properties=pika.BasicProperties(correlation_id=props.correlation_id))

Problem with this approach is, that there is no way for someone using Python to know this need of spring-amqp because per documentation you don't need to do it, it's enough to set exchange='', routing_key=props.reply_to.

Versions used in this test were:

  • RabbitMQ: 3.8.9
  • Spring AMQP: 2.4.4
  • Python Pika: 1.2.0

And sorry for this block of text if I got something wrong 🙇

@igor-susic igor-susic changed the title Direct reply-to still needs coreelationId to work!? Direct reply-to still needs correlationId to work!? Mar 19, 2021
@igor-susic igor-susic changed the title Direct reply-to still needs correlationId to work!? Direct reply-to still needs correlation ID to work!? Mar 19, 2021
@garyrussell
Copy link
Contributor

Well, the generated reply-to is tied to a consumer.

To avoid the overhead of creating and canceling a consumer for each request, we use a DirectReplyToMessageListenerContainer together with a pool of consumers. That way we can reuse the consumer(s) for multiple requests/replies.

We need the correlation id to detect a late reply for a previous timeout.

I suppose we could consider canceling the consumer when we get a timeout instead (since this is an abnormal condition), but that's not the current architecture.

@garyrussell garyrussell added this to the Backlog milestone Mar 19, 2021
@garyrussell
Copy link
Contributor

Actually, you can set useDirectReplyToContainer to false and in that case, no correlation id header is needed (similar to when a temporary reply queue is created) - we create and cancel the consumer in that case.

@igor-susic
Copy link
Author

@garyrussell When I set the useDirectReplyToContainer to false it works without replying with correlation-id but now, I'm not sure how big of an overhead is creating and canceling the consumer for every request 😄 And it feels a little hacky, it's not clear to me why would I need this to get things to work.

@garyrussell
Copy link
Contributor

garyrussell commented Mar 19, 2021

@igor-susic As i said, we need to be able to detect a late reply for a previously timed-out request. We'd have to change the template to cause the consumer to be canceled instead of returning it to the pool for reuse (when a timeout occurs).

Not too difficult to do, but not currently supported; hence I marked this issue as a feature request.

@igor-susic
Copy link
Author

I understand, I was just noting that for someone else it isn't clear why to use it 🙈 I will probably use this feature for some good load traffic, so maybe In coordination with you I could push some PR when I find time, if you agree.

@garyrussell
Copy link
Contributor

Sure; the logic in question is here:

private Message doSendAndReceiveWithDirect(String exchange, String routingKey, Message message,
@Nullable CorrelationData correlationData) {
ConnectionFactory connectionFactory = obtainTargetConnectionFactory(
this.sendConnectionFactorySelectorExpression, message);
if (this.usePublisherConnection && connectionFactory.getPublisherConnectionFactory() != null) {
connectionFactory = connectionFactory.getPublisherConnectionFactory();
}
DirectReplyToMessageListenerContainer container = this.directReplyToContainers.get(connectionFactory);
if (container == null) {
synchronized (this.directReplyToContainers) {
container = this.directReplyToContainers.get(connectionFactory);
if (container == null) {
container = new DirectReplyToMessageListenerContainer(connectionFactory);
container.setMessageListener(this);
container.setBeanName(this.beanName + "#" + this.containerInstance.getAndIncrement());
if (this.taskExecutor != null) {
container.setTaskExecutor(this.taskExecutor);
}
if (this.afterReceivePostProcessors != null) {
container.setAfterReceivePostProcessors(this.afterReceivePostProcessors
.toArray(new MessagePostProcessor[this.afterReceivePostProcessors.size()]));
}
container.setNoLocal(this.noLocalReplyConsumer);
if (this.replyErrorHandler != null) {
container.setErrorHandler(this.replyErrorHandler);
}
container.start();
this.directReplyToContainers.put(connectionFactory, container);
this.replyAddress = Address.AMQ_RABBITMQ_REPLY_TO;
}
}
}
ChannelHolder channelHolder = container.getChannelHolder();
try {
Channel channel = channelHolder.getChannel();
if (this.confirmsOrReturnsCapable) {
addListener(channel);
}
return doSendAndReceiveAsListener(exchange, routingKey, message, correlationData, channel);
}
catch (Exception e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
finally {
container.releaseConsumerFor(channelHolder, false, null);
}
}

In the finally block, we have

container.releaseConsumerFor(channelHolder, false, null); 

Where the second parameter is a boolean cancelConsumer.

I think all that needs to be done is, if we timed out (the reply is null), we need to release the consumer with that set to true and an appropriate message in the third parameter to log the reason for the cancel.

We should probably make the new capability optional under a new config flag, with the default being the current behavior.

The trickier part will be correlating the async reply in onMessage, but I think that can be done by changing it to a ChannelAwareMessageListener and use the channel (instead of the correlation id) as the key to find the thread waiting for the reply, in a similar map to the current replyHolder.

@igor-susic
Copy link
Author

Sure, I will take a look, and get back to you :)

garyrussell added a commit to garyrussell/spring-amqp that referenced this issue May 5, 2021
Resolves spring-projects#1312

With Direct Reply-To we can correlate the reply using the channel
instead of requiring the server to echo back the correlation id.
garyrussell added a commit to garyrussell/spring-amqp that referenced this issue May 5, 2021
Resolves spring-projects#1312

With Direct Reply-To we can correlate the reply using the channel
instead of requiring the server to echo back the correlation id.
@garyrussell
Copy link
Contributor

I issued a PR for this #1334

@garyrussell garyrussell modified the milestones: Backlog, 2.3.7 May 5, 2021
artembilan pushed a commit that referenced this issue May 5, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
Resolves #1312

With Direct Reply-To we can correlate the reply using the channel
instead of requiring the server to echo back the correlation id.

* Fix link in what's new.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants