Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data race in the client example #72

Closed
yaa110 opened this issue Apr 16, 2022 · 9 comments · Fixed by #260
Closed

Data race in the client example #72

yaa110 opened this issue Apr 16, 2022 · 9 comments · Fixed by #260
Assignees
Labels
bug Something isn't working help wanted Extra attention is needed
Milestone

Comments

@yaa110
Copy link

yaa110 commented Apr 16, 2022

How to reproduce

  • Setup go mod:
go mod init test-amqp
Code Example
package amqp_client

import (
	"errors"
	"log"
	"os"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

type Client struct {
	queueName       string
	logger          *log.Logger
	connection      *amqp.Connection
	channel         *amqp.Channel
	done            chan bool
	notifyConnClose chan *amqp.Error
	notifyChanClose chan *amqp.Error
	notifyConfirm   chan amqp.Confirmation
	isReady         bool
}

const (
	// When reconnecting to the server after connection failure
	reconnectDelay = 5 * time.Second

	// When setting up the channel after a channel exception
	reInitDelay = 2 * time.Second

	// When resending messages the server didn't confirm
	resendDelay = 5 * time.Second
)

var (
	errNotConnected  = errors.New("not connected to a server")
	errAlreadyClosed = errors.New("already closed: not connected to the server")
	errShutdown      = errors.New("client is shutting down")
)

// New creates a new consumer state instance, and automatically
// attempts to connect to the server.
func New(queueName, addr string) *Client {
	client := Client{
		logger:    log.New(os.Stdout, "", log.LstdFlags),
		queueName: queueName,
		done:      make(chan bool),
	}
	go client.handleReconnect(addr)
	return &client
}

// handleReconnect will wait for a connection error on
// notifyConnClose, and then continuously attempt to reconnect.
func (client *Client) handleReconnect(addr string) {
	for {
		client.isReady = false
		client.logger.Println("Attempting to connect")

		conn, err := client.connect(addr)

		if err != nil {
			client.logger.Println("Failed to connect. Retrying...")

			select {
			case <-client.done:
				return
			case <-time.After(reconnectDelay):
			}
			continue
		}

		if done := client.handleReInit(conn); done {
			break
		}
	}
}

// connect will create a new AMQP connection
func (client *Client) connect(addr string) (*amqp.Connection, error) {
	conn, err := amqp.Dial(addr)

	if err != nil {
		return nil, err
	}

	client.changeConnection(conn)
	client.logger.Println("Connected!")
	return conn, nil
}

// handleReconnect will wait for a channel error
// and then continuously attempt to re-initialize both channels
func (client *Client) handleReInit(conn *amqp.Connection) bool {
	for {
		client.isReady = false

		err := client.init(conn)

		if err != nil {
			client.logger.Println("Failed to initialize channel. Retrying...")

			select {
			case <-client.done:
				return true
			case <-time.After(reInitDelay):
			}
			continue
		}

		select {
		case <-client.done:
			return true
		case <-client.notifyConnClose:
			client.logger.Println("Connection closed. Reconnecting...")
			return false
		case <-client.notifyChanClose:
			client.logger.Println("Channel closed. Re-running init...")
		}
	}
}

// init will initialize channel & declare queue
func (client *Client) init(conn *amqp.Connection) error {
	ch, err := conn.Channel()

	if err != nil {
		return err
	}

	err = ch.Confirm(false)

	if err != nil {
		return err
	}
	_, err = ch.QueueDeclare(
		client.queueName,
		false, // Durable
		false, // Delete when unused
		false, // Exclusive
		false, // No-wait
		nil,   // Arguments
	)

	if err != nil {
		return err
	}

	client.changeChannel(ch)
	client.isReady = true
	client.logger.Println("Setup!")

	return nil
}

// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {
	client.connection = connection
	client.notifyConnClose = make(chan *amqp.Error)
	client.connection.NotifyClose(client.notifyConnClose)
}

// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (client *Client) changeChannel(channel *amqp.Channel) {
	client.channel = channel
	client.notifyChanClose = make(chan *amqp.Error)
	client.notifyConfirm = make(chan amqp.Confirmation, 1)
	client.channel.NotifyClose(client.notifyChanClose)
	client.channel.NotifyPublish(client.notifyConfirm)
}

// Push will push data onto the queue, and wait for a confirm.
// If no confirms are received until within the resendTimeout,
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (client *Client) Push(data []byte) error {
	if !client.isReady {
		return errors.New("failed to push: not connected")
	}
	for {
		err := client.UnsafePush(data)
		if err != nil {
			client.logger.Println("Push failed. Retrying...")
			select {
			case <-client.done:
				return errShutdown
			case <-time.After(resendDelay):
			}
			continue
		}
		select {
		case confirm := <-client.notifyConfirm:
			if confirm.Ack {
				client.logger.Println("Push confirmed!")
				return nil
			}
		case <-time.After(resendDelay):
		}
		client.logger.Println("Push didn't confirm. Retrying...")
	}
}

// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (client *Client) UnsafePush(data []byte) error {
	if !client.isReady {
		return errNotConnected
	}
	return client.channel.Publish(
		"",               // Exchange
		client.queueName, // Routing key
		false,            // Mandatory
		false,            // Immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        data,
		},
	)
}

// Consume will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (client *Client) Consume() (<-chan amqp.Delivery, error) {
	if !client.isReady {
		return nil, errNotConnected
	}
	return client.channel.Consume(
		client.queueName,
		"",    // Consumer
		false, // Auto-Ack
		false, // Exclusive
		false, // No-local
		false, // No-Wait
		nil,   // Args
	)
}

// Close will cleanly shutdown the channel and connection.
func (client *Client) Close() error {
	if !client.isReady {
		return errAlreadyClosed
	}
	close(client.done)
	err := client.channel.Close()
	if err != nil {
		return err
	}
	err = client.connection.Close()
	if err != nil {
		return err
	}

	client.isReady = false
	return nil
}
  • Write a test file amqp_test.go:
package amqp_client

import "testing"

func TestAmqp(t *testing.T) {
	client := New("test_queue", "amqp://guest:guest@localhost:5672/")
	defer client.Close()
	client.Push([]byte("test"))
}
  • Run rabbitmq:
docker run --rm -d --net host --name some-rabbit rabbitmq
  • Run test with -race:
go test -race ./...

Error Message

Test fails with the following message:

==================
WARNING: DATA RACE
Write at 0x00c000100228 by goroutine 8:
  test-amqp.(*Client).handleReconnect()
      /tmp/test-amqp/amqp.go:57 +0x5d
  test-amqp.New.func1()
      /tmp/test-amqp/amqp.go:49 +0x58

Previous read at 0x00c000100228 by goroutine 7:
  test-amqp.(*Client).Push()
      /tmp/test-amqp/amqp.go:180 +0x68
  test-amqp.TestAmqp()
      /tmp/test-amqp/amqp_test.go:8 +0xc4
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1439 +0x213
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1486 +0x47

Goroutine 8 (running) created at:
  test-amqp.New()
      /tmp/test-amqp/amqp.go:49 +0x324
  test-amqp.TestAmqp()
      /tmp/test-amqp/amqp_test.go:6 +0x4f
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1439 +0x213
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1486 +0x47

Goroutine 7 (finished) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1486 +0x724
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1839 +0x99
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1439 +0x213
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1837 +0x7e4
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1719 +0xa71
  main.main()
      _testmain.go:47 +0x2e4
==================
2022/04/16 07:32:08 Attempting to connect
2022/04/16 07:32:08 Connected!
2022/04/16 07:32:08 Setup!
Found 1 data race(s)
FAIL	test-amqp	0.021s
FAIL

Environment

Go Version: 1.18.1-bullseye
Library version: v1.3.4

@yaa110 yaa110 changed the title Data race in the library Data race in the client example Apr 16, 2022
@yaa110
Copy link
Author

yaa110 commented Apr 16, 2022

The problem could be solved by guarding isReady using a Mutex or RWMutex.

@ykalayy
Copy link

ykalayy commented Apr 19, 2022

Hi @yaa110
I couldn't able to reproduce your problem.
You are concurrently calling this

go client.handleReconnect(addr)

and try to publish immediately. Which actually returns an error without doing nothing... Because you are not checking the connection is opened or not

if !client.isReady {
		return errors.New("failed to push: not connected")
}

@lukebakken
Copy link
Contributor

Thank you @ykalayy. I will assume that this is due to user error.

@nemith
Copy link

nemith commented Mar 21, 2024

Um this is still a race condition in the example. The example is straight lifted from the example and so any changed that needed to be made per @ykalayy suggestion should be updated in the example.

Now the suggestion to check if the client.isReady before calling Publish is still a race condition as both the go routine and and the lookup to isReady can be racey.

Copying the EXACT example program and running it with race on shows it is racy. You can confirm at startup without a server, a startup with a server and when a server reconnects.

Without Server

$ go run -race .
2024/03/20 19:24:41 Attempting to connect
2024/03/20 19:24:41 Failed to connect. Retrying...
==================
WARNING: DATA RACE
Read at 0x00c000034098 by main goroutine:
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:205 +0x44
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000034098 by goroutine 7:
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:81 +0x3c
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
Push failed: failed to push: not connected
Push failed: failed to push: not connected
2024/03/20 19:24:46 Attempting to connect
2024/03/20 19:24:46 Failed to connect. Retrying...
Push failed: failed to push: not connected
Push failed: failed to push: not connected

With Server.

$ go run -race .
2024/03/20 19:27:57 Attempting to connect
2024/03/20 19:27:57 Connected!
2024/03/20 19:27:57 Setup!
==================
WARNING: DATA RACE
Read at 0x00c000190098 by main goroutine:
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:205 +0x44
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000190098 by goroutine 7:
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:177 +0xc0
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c000190070 by main goroutine:
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x9c
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000190070 by goroutine 7:
  main.(*Client).changeChannel()
      /Volumes/Code/tmp/rmqrace/main.go:194 +0x38
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:176 +0xb4
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Write at 0x00c00024448c by main goroutine:
  ??()
      -:0 +0x100953c48
  sync/atomic.CompareAndSwapInt32()
      <autogenerated>:1 +0x18
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1562 +0x80
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000244488 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:82 +0x368
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c000244550 by main goroutine:
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1566 +0xd0
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000244550 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:82 +0x368
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c000244548 by main goroutine:
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1567 +0xec
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000244548 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:82 +0x368
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Write at 0x00c000232268 by main goroutine:
  ??()
      -:0 +0x100953c48
  sync/atomic.CompareAndSwapInt32()
      <autogenerated>:1 +0x18
  github.com/rabbitmq/amqp091-go.(*confirms).publish()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:43 +0x3c
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1567 +0xf8
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000232268 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newConfirms()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:26 +0x248
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:87 +0x470
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c000232260 by main goroutine:
  github.com/rabbitmq/amqp091-go.(*confirms).publish()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:46 +0x8c
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1567 +0xf8
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000232260 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newConfirms()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:26 +0x248
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:87 +0x470
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c000232258 by main goroutine:
  github.com/rabbitmq/amqp091-go.(*confirms).publish()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:47 +0xb8
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1567 +0xf8
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000232258 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newConfirms()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:26 +0x248
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:87 +0x470
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Write at 0x00c0002261c0 by main goroutine:
  ??()
      -:0 +0x100953c48
  sync/atomic.CompareAndSwapInt32()
      <autogenerated>:1 +0x18
  github.com/rabbitmq/amqp091-go.(*deferredConfirmations).Add()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:138 +0x38
  github.com/rabbitmq/amqp091-go.(*confirms).publish()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:47 +0xdc
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1567 +0xf8
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c0002261c0 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newDeferredConfirmations()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:132 +0x1c0
  github.com/rabbitmq/amqp091-go.newConfirms()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:28 +0x2cc
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:87 +0x470
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c0002261c8 by main goroutine:
  github.com/rabbitmq/amqp091-go.(*deferredConfirmations).Add()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:143 +0x12c
  github.com/rabbitmq/amqp091-go.(*confirms).publish()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:47 +0xdc
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1567 +0xf8
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c0002261c8 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newDeferredConfirmations()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:132 +0x1c0
  github.com/rabbitmq/amqp091-go.newConfirms()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/confirms.go:28 +0x2cc
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:87 +0x470
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c0002444d4 by main goroutine:
  ??()
      -:0 +0x10094d444
  sync/atomic.LoadInt32()
      <autogenerated>:1 +0x10
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1570 +0x614
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c0002444d0 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:82 +0x368
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c0002444b8 by main goroutine:
  github.com/rabbitmq/amqp091-go.(*Channel).sendOpen()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:232 +0x110
  github.com/rabbitmq/amqp091-go.(*Channel).send()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:166 +0x80
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1570 +0x614
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c0002444b8 by goroutine 7:
  github.com/rabbitmq/amqp091-go.newChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:82 +0x368
  github.com/rabbitmq/amqp091-go.(*Connection).allocateChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:824 +0xbc
  github.com/rabbitmq/amqp091-go.(*Connection).openChannel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:847 +0x2c
  github.com/rabbitmq/amqp091-go.(*Connection).Channel()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:873 +0x34
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:152 +0x2c
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c0001100d8 by main goroutine:
  github.com/rabbitmq/amqp091-go.(*Channel).sendOpen()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:232 +0x128
  github.com/rabbitmq/amqp091-go.(*Channel).send()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:166 +0x80
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1570 +0x614
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c0001100d8 by goroutine 7:
  github.com/rabbitmq/amqp091-go.Open()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:261 +0x254
  github.com/rabbitmq/amqp091-go.DialConfig()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:252 +0x740
  github.com/rabbitmq/amqp091-go.Dial()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:159 +0xa0
  main.(*Client).connect()
      /Volumes/Code/tmp/rmqrace/main.go:105 +0x34
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:84 +0xdc
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c000110140 by main goroutine:
  ??()
      -:0 +0x10094d444
  sync/atomic.LoadInt32()
      <autogenerated>:1 +0x10
  github.com/rabbitmq/amqp091-go.(*Channel).sendOpen()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:262 +0x4bc
  github.com/rabbitmq/amqp091-go.(*Channel).send()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:166 +0x80
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1570 +0x614
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000110140 by goroutine 7:
  github.com/rabbitmq/amqp091-go.Open()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:261 +0x254
  github.com/rabbitmq/amqp091-go.DialConfig()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:252 +0x740
  github.com/rabbitmq/amqp091-go.Dial()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:159 +0xa0
  main.(*Client).connect()
      /Volumes/Code/tmp/rmqrace/main.go:105 +0x34
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:84 +0xdc
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Write at 0x00c00011000c by main goroutine:
  ??()
      -:0 +0x100953c48
  sync/atomic.CompareAndSwapInt32()
      <autogenerated>:1 +0x18
  github.com/rabbitmq/amqp091-go.(*Connection).sendUnflushed()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:529 +0x50
  github.com/rabbitmq/amqp091-go.(*Channel).sendOpen()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:262 +0x4bc
  github.com/rabbitmq/amqp091-go.(*Channel).send()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:166 +0x80
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithDeferredConfirmWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1570 +0x614
  github.com/rabbitmq/amqp091-go.(*Channel).PublishWithContext()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1531 +0x138
  main.(*Client).UnsafePush()
      /Volumes/Code/tmp/rmqrace/main.go:239 +0x94
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:209 +0xc8
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000110008 by goroutine 7:
  github.com/rabbitmq/amqp091-go.Open()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:261 +0x254
  github.com/rabbitmq/amqp091-go.DialConfig()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:252 +0x740
  github.com/rabbitmq/amqp091-go.Dial()
      /Users/brandonbennett/.go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:159 +0xa0
  main.(*Client).connect()
      /Volumes/Code/tmp/rmqrace/main.go:105 +0x34
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:84 +0xdc
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
==================
WARNING: DATA RACE
Read at 0x00c000190090 by main goroutine:
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:219 +0x1fc
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000190090 by goroutine 7:
  main.(*Client).changeChannel()
      /Volumes/Code/tmp/rmqrace/main.go:196 +0xf0
  main.(*Client).init()
      /Volumes/Code/tmp/rmqrace/main.go:176 +0xb4
  main.(*Client).handleReInit()
      /Volumes/Code/tmp/rmqrace/main.go:122 +0x4c
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:97 +0x204
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
2024/03/20 19:27:59 Push confirmed [1]!
Push succeeded!
2024/03/20 19:28:01 Push confirmed [2]!
Push succeeded!
2024/03/20 19:28:03 Push confirmed [3]!
Push succeeded!
2024/03/20 19:28:05 Push confirmed [4]!
Push succeeded!
2024/03/20 19:28:07 Push confirmed [5]!
Push succeeded!
2024/03/20 19:28:09 Push confirmed [6]!
Push succeeded!
2024/03/20 19:28:11 Connection closed. Reconnecting...
2024/03/20 19:28:11 Attempting to connect
2024/03/20 19:28:11 Failed to connect. Retrying...
==================
WARNING: DATA RACE
Read at 0x00c000190098 by main goroutine:
  main.(*Client).Push()
      /Volumes/Code/tmp/rmqrace/main.go:205 +0x44
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:27 +0x190

Previous write at 0x00c000190098 by goroutine 7:
  main.(*Client).handleReconnect()
      /Volumes/Code/tmp/rmqrace/main.go:81 +0x3c
  main.New.gowrap1()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x4c

Goroutine 7 (running) created at:
  main.New()
      /Volumes/Code/tmp/rmqrace/main.go:73 +0x1a0
  main.main()
      /Volumes/Code/tmp/rmqrace/main.go:17 +0x40
==================
Push failed: failed to push: not connected
Push failed: failed to push: not connected
Push failed: failed to push: not connected
2024/03/20 19:28:16 Attempting to connect
2024/03/20 19:28:16 Failed to connect. Retrying...
Found 17 data race(s)
exit status 66

@nemith
Copy link

nemith commented Mar 21, 2024

I have also seen a lot of code in the wild based on the reconnect logic of this example (including inside my company) which has caused real issues so this example should be fixed.

@lukebakken
Copy link
Contributor

@nemith thanks for using this library and RabbitMQ.

A suggestion - when reporting issues on GitHub, please save large amounts of text into a file to attach, or use the <details> markdown feature. I have edited your comment so that this issue is navigable again.

I have also seen a lot of code in the wild based on the reconnect logic of this example (including inside my company) which has caused real issues so this example should be fixed.

By all means, contribute back to the RabbitMQ ecosystem and provide a pull request to fix the example code. It would be greatly appreciated. I will re-open this issue.

@lukebakken lukebakken reopened this Mar 21, 2024
@lukebakken lukebakken self-assigned this Mar 21, 2024
@lukebakken lukebakken added bug Something isn't working help wanted Extra attention is needed labels Mar 21, 2024
@lukebakken lukebakken added this to the 1.9.1 milestone Mar 21, 2024
@nemith
Copy link

nemith commented Mar 21, 2024

Sorry about the formatting. I just followed the initial report in the issue and really i didn't think there was that much output but I could be a problem for people with smaller monitors and phones? Anyway that isn't the point.

Back to the issue at hand I do think that part of the problem is that this library is extremely hard to use properly while still implementing autoreconnect that is actually race free and I still trying to figure out the best way to use it for my private project but if i find a good example I can submit a PR.

Another idea is, for now, just remove all the auto-reconnect logic from the example to at least prevent copy and paste errors for others consuming this library.

@lukebakken
Copy link
Contributor

@nemith you'll be interested in this issue, #253, as well as @jxsl13 's project - https://github.com/jxsl13/amqpx

I would like to have more time to dedicate to this project, but I'm currently working on supporting RabbitMQ customers with support contracts, as well as the next major release of the official .NET client.

@Zerpet Zerpet self-assigned this May 7, 2024
Zerpet added a commit that referenced this issue May 7, 2024
Some users rely on this example as a starting point to their
applications. This commit fixes a data race that could cause issues in
any code that relied on the example as base.

Related to #72

Signed-off-by: Aitor Perez Cedres <aitor.perez@broadcom.com>
@Zerpet
Copy link
Contributor

Zerpet commented May 7, 2024

I submitted a PR #260 to address this issue. Any feedback/comments are welcome.

Zerpet added a commit that referenced this issue May 7, 2024
Some users rely on this example as a starting point to their
applications. This commit fixes a data race that could cause issues in
any code that relied on the example as base.

Related to #72

Signed-off-by: Aitor Perez Cedres <aitor.perez@broadcom.com>
lukebakken pushed a commit that referenced this issue May 7, 2024
Some users rely on this example as a starting point to their
applications. This commit fixes a data race that could cause issues in
any code that relied on the example as base.

Related to #72

Signed-off-by: Aitor Perez Cedres <aitor.perez@broadcom.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants