/
producer.go
175 lines (154 loc) · 4.91 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// This example declares a durable Exchange, and publishes a single message to
// that Exchange with a given routing key.
//
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
exchangeName = flag.String("exchange", "test-exchange", "Durable AMQP exchange name")
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
routingKey = flag.String("key", "test-key", "AMQP routing key")
body = flag.String("body", "foobar", "Body of message")
reliable = flag.Bool("reliable", true, "Wait for the publisher confirmation before exiting")
continuous = flag.Bool("continuous", false, "Keep publishing messages at a 1msg/sec rate")
ErrLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmsgprefix)
Log = log.New(os.Stdout, "[INFO] ", log.LstdFlags|log.Lmsgprefix)
)
func init() {
flag.Parse()
}
func main() {
done := make(chan bool)
SetupCloseHandler(done)
if err := publish(done, *uri, *exchangeName, *exchangeType, *routingKey, *body, *reliable); err != nil {
ErrLog.Fatalf("%s", err)
}
}
func SetupCloseHandler(done chan bool) {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
done <- true
Log.Printf("Ctrl+C pressed in Terminal")
}()
}
func publish(done chan bool, amqpURI, exchange, exchangeType, routingKey, body string, reliable bool) error {
// This function dials, connects, declares, publishes, and tears down,
// all in one go. In a real service, you probably want to maintain a
// long-lived connection as state, and publish against that.
config := amqp.Config{Properties: amqp.NewConnectionProperties()}
config.Properties.SetClientConnectionName("sample-producer")
Log.Printf("dialing %q", amqpURI)
connection, err := amqp.DialConfig(amqpURI, config)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}
defer connection.Close()
Log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
Log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
if err := channel.ExchangeDeclare(
exchange, // name
exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}
var publishes chan uint64 = nil
var confirms chan amqp.Confirmation = nil
// Reliable publisher confirms require confirm.select support from the
// connection.
if reliable {
Log.Printf("enabling publisher confirms.")
if err := channel.Confirm(false); err != nil {
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
// We'll allow for a few outstanding publisher confirms
publishes = make(chan uint64, 8)
confirms = channel.NotifyPublish(make(chan amqp.Confirmation, 1))
go confirmHandler(done, publishes, confirms)
}
Log.Println("declared Exchange, publishing messages")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for {
seqNo := channel.GetNextPublishSeqNo()
Log.Printf("publishing %dB body (%q)", len(body), body)
if err := channel.PublishWithContext(ctx,
exchange, // publish to an exchange
routingKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Exchange Publish: %s", err)
}
Log.Printf("published %dB OK", len(body))
if reliable {
publishes <- seqNo
}
if *continuous {
select {
case <-done:
Log.Println("producer is stopping")
return nil
case <-time.After(time.Second):
continue
}
} else {
break
}
}
return nil
}
func confirmHandler(done chan bool, publishes chan uint64, confirms chan amqp.Confirmation) {
m := make(map[uint64]bool)
for {
select {
case <-done:
Log.Println("confirmHandler is stopping")
return
case publishSeqNo := <-publishes:
Log.Printf("waiting for confirmation of %d", publishSeqNo)
m[publishSeqNo] = false
case confirmed := <-confirms:
if confirmed.DeliveryTag > 0 {
if confirmed.Ack {
Log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
ErrLog.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
delete(m, confirmed.DeliveryTag)
}
}
if len(m) > 1 {
Log.Printf("outstanding confirmations: %d", len(m))
}
}
}