Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mutex guard to Channel methods #242

Open
gnuletik opened this issue Jan 30, 2024 · 6 comments
Open

Add mutex guard to Channel methods #242

gnuletik opened this issue Jan 30, 2024 · 6 comments
Assignees
Labels
enhancement New feature or request

Comments

@gnuletik
Copy link

Is your feature request related to a problem? Please describe.

I used Channel.QueueDeclare concurrently and got the following error:

Exception (503) Reason: "unexpected command received"

Describe the solution you'd like

I'd like to add the following guard:

ch.m.Lock()
defer ch.m.Unlock()

to some methods of the Channel struct:

  • Qos, Cancel
  • QueueDeclare, QueueDeclarePassive, QueueInspect, QueueBind, QueueUnbind, QueuePurge, QueueDelete
  • ExchangeDeclare, ExchangeDeclarePassive, ExchangeDelete, ExchangeBind, ExchangeUnbind

This is already implemented on the following methods (publish / ack related):

  • PublishWithDeferredConfirmWithContext, Ack, Nack , Reject

So I think that it would make sense to have it on other methods too.

Describe alternatives you've considered

Implementing the mutex in the business code is doable but it makes less sense considering that this is done by the library for some methods.

Additional context

No response

@gnuletik gnuletik added the enhancement New feature or request label Jan 30, 2024
@lukebakken
Copy link
Contributor

lukebakken commented Jan 31, 2024

All of the RabbitMQ client libraries specifically do not allow sharing channels across threads and assume that applications that use the libraries have a connection per-thread, with associated channels. I doubt we will add this feature here, but I'd like @Zerpet's feedback.

I'm not sure why certain methods do use a mutex. I'll investigate when I have time.

@gnuletik
Copy link
Author

gnuletik commented Jan 31, 2024

Thanks for the fast feedback!

I can see the point of letting users manually handle the channels.

It seems that Ack() and Nack() are both concurrent-safe since 2018:

Regarding Publish(), this seems to be handled since 2012.

Docs

I think that it should be clarified in the docs that Publish*, Ack, Nack, Reject are thread-safe and others are not.

The docs could also suggest to create one channel per goroutine.

The only mention of thread-safe that I see is that SetLogger is not thread-safe.

Connection Pool

Would a concurrent-safe connection pool package (in the same fashion as pgxpool) would fit into this repo?

However, it seems that having multiple connections may be less useful with RabbitMQ in comparison to PostgreSQL because calls seems to be way faster. But I don't have huge experience with Rabbit. If that's true, then having a mutex in the library would be more efficient.

@Zerpet
Copy link
Contributor

Zerpet commented Jan 31, 2024

We could do a better job at documenting the thread-safety of channels. The document that Luke mention is this one: https://rabbitmq.com/channels.html

I'm not against this change. My concern is whether we may lose performance by making every operation on the channel synchronous. If we can prove with a benchmark that performance penalty is reasonable, I'll be happy to see this change in the library.

Regarding a connection pool, it will definitely benefit this repo. However, a connection pool is a concept of "smart clients", and this library has, intentionally, been kept as simple bindings to the AMQP protocol + RabbitMQ extension. If you may notice that this library does not have auto-reconnection, whilst the Java and .NET rabbitmq libraries do. This is an inherited non-goal. What I'm trying to say is that we can consider a connection pool type for this library, as long as it's not too clever 🙂

Something you may also consider is whether the existing CloudAMQP AMQProxy already covers what you had in mind for the connection pool.

@gnuletik
Copy link
Author

gnuletik commented Jan 31, 2024

Thanks for the fast feedback!

I'm glad the connection pool idea can be considered.
However, I'm afraid having this may be too high level to respect the non-goal of the project.
Especially as a connection pool implementation should need to drop failed connections / channels.

Thanks also for suggesting the AMQProxy! However, it feels a bit heavy to setup for a simple use-case.

Regarding the benchmark, is there a way to run the test with a mocked RabbitMQ server in the CI? I don't see a channel_test.go file.

However, I've setup this simple benchmark with a real rabbitMQ :

docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
package main

import (
	"fmt"
	"testing"

	amqp "github.com/rabbitmq/amqp091-go"
	"github.com/stretchr/testify/require"
)

func BenchmarkQueueDeclare(b *testing.B) {
	config := amqp.Config{
		Vhost:      "/",
		Properties: amqp.NewConnectionProperties(),
	}

	conn, err := amqp.DialConfig("amqp://user:password@192.168.215.5", config)
	require.NoError(b, err)
	defer conn.Close()

	channel, err := conn.Channel()
	require.NoError(b, err)
	defer channel.Close()

	for i := 0; i < b.N; i++ {
		name := fmt.Sprintf("queue-%d", i)
		_, err := channel.QueueDeclare(
			name,  // name of the queue
			false, // durable
			false, // delete when unused
			false, // exclusive
			false, // noWait
			nil,   // arguments
		)
		require.NoError(b, err)

		_, err = channel.QueueDelete(
			name,
			false, // ifUnused
			false, // ifEmpty
			false, // noWait
		)
		require.NoError(b, err)
	}
}

Here are the results:

Without mutex lock in QueueDeclare

$ go test -bench=. -count 5
goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkQueueDeclare-10    	     459	   2188841 ns/op
BenchmarkQueueDeclare-10    	     588	   1948132 ns/op
BenchmarkQueueDeclare-10    	     674	   1859471 ns/op
BenchmarkQueueDeclare-10    	     740	   1598775 ns/op
BenchmarkQueueDeclare-10    	     742	   1626909 ns/op
PASS
ok  	test/producer	7.859s

$ go test -bench=. -count 5
goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkQueueDeclare-10    	     758	   1502975 ns/op
BenchmarkQueueDeclare-10    	     730	   1562227 ns/op
BenchmarkQueueDeclare-10    	     762	   1502248 ns/op
BenchmarkQueueDeclare-10    	     703	   1577003 ns/op
BenchmarkQueueDeclare-10    	     724	   1687284 ns/op
PASS
ok  	test/producer	7.402s

With mutex lock in QueueDeclare

$ go test -bench=. -count 5
goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkQueueDeclare-10    	     739	   1571499 ns/op
BenchmarkQueueDeclare-10    	     777	   1627958 ns/op
BenchmarkQueueDeclare-10    	     801	   1543984 ns/op
BenchmarkQueueDeclare-10    	     738	   1519895 ns/op
BenchmarkQueueDeclare-10    	     699	   1888766 ns/op
PASS
ok  	test/producer	8.040s

$ go test -bench=. -count 5
goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkQueueDeclare-10    	     754	   1557935 ns/op
BenchmarkQueueDeclare-10    	     740	   1559442 ns/op
BenchmarkQueueDeclare-10    	     669	   1547426 ns/op
BenchmarkQueueDeclare-10    	     771	   1459199 ns/op
BenchmarkQueueDeclare-10    	     772	   1658854 ns/op
PASS
ok  	test/producer	7.552s

It can be reduced to measuring the performance of

func BenchmarkMutex(b *testing.B) {
	var m sync.Mutex

	for i := 0; i < b.N; i++ {
		m.Lock()
		m.Unlock()
	}
}

which is:

goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkMutex-10    	88810612	        13.50 ns/op
BenchmarkMutex-10    	87675774	        13.46 ns/op
BenchmarkMutex-10    	87437000	        13.39 ns/op
BenchmarkMutex-10    	89020771	        13.48 ns/op
BenchmarkMutex-10    	88700265	        13.58 ns/op
PASS
ok  	test/producer	6.960s

So, the mutex lock / unlock seems to represent 0.00084% of ns/op value, which seems really light IMO but it depends on you.

Thanks!

@gnuletik
Copy link
Author

Any news on this?

@lukebakken
Copy link
Contributor

@gnuletik there is no need to bump this issue. This issue is not urgent, and, as we've said, this library works as documented.

If you'd like to submit a PR with tests and benchmarks, it would be appreciated, but there is no guarantee of when we can review it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants