package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
// Every connection should declare the topology they expect
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
//setup connection
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
}
//build channel in the connection
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
//queue declare
if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
return nil, nil, err
}
return conn, ch, nil
}
func main() {
//amqp url
url := "amqp://guest:[email protected]:5672";
for i := 1; i <= 2; i++ {
fmt.Println("connect ", i)
//two goroutine
go func() {
//queue name
queue := fmt.Sprintf("example.reconnect.%d", i)
//setup channel in the tcp connection
_, pub, err := setup(url, queue)
if err != nil {
fmt.Println("err publisher setup:", err)
return
}
// Purge the queue from the publisher side to establish initial state
if _, err := pub.QueuePurge(queue, false); err != nil {
fmt.Println("err purge:", err)
return
}
//publish msg
if err := pub.Publish("", queue, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%d", i)),
}); err != nil {
fmt.Println("err publish:", err)
return
}
//keep running
for{
time.Sleep(time.Second * 20)
}
}()
}
//keep running
for {
time.Sleep(time.Second * 20)
}
}
I thought there is only one connection between the program and mq-server,
but there are two connection,one connection can only support one channel,why?
can't the two goroutine share the same tcp connection?
Socket descriptor can share in all threads of a process in the theory.
Why the two goroutine don't share one socket but have their own channel?
The model by hand:
The real model in rabbitmq:
A connection is a TCP connection between your application and the RabbitMQ broker. A channel is a virtual connection inside a connection. In other words, a channel multiplexes a TCP connection. Typically, each process only creates one TCP connection, and uses multiple channels in that connection for different threads.
To connect to the message broker, we will use the Advanced Message Queuing Protocol or AMQP for short. The standard port for RabbitMQ is 5672 . As you can see, at the beginning we create a new connection to RabbitMQ and a channel to send data to the queue, called QueueService1 .
A channel acts as a virtual connection inside a TCP connection. A channel reuses a connection, forgoing the need to reauthorize and open a new TCP stream. Channels allow you to use resources more efficiently (more about this later in this article). Every AMQP protocol-related operation occurs over a channel.
In order for a client to interact with RabbitMQ it must first open a connection. This process involves a number of steps: Application configures the client library it uses to use a certain connection endpoint (e.g. hostname and port) The library resolves the hostname to one or more IP addresses.
Looking at the source for the library it appears as though you can call conn.Channel() as many times as you like and it creates a new stream of communication over the same connection.
Ok, I tried it, here's a working example... One goroutine, one connection, two channels I setup the receiver, then send a message, then read from the receiver channel
if you wanted multiple queue's bound in one goroutine, you would call rec.Consume twice and then select across the queues.
package main
import (
"fmt"
"github.com/streadway/amqp"
"os"
)
func main() {
conn, err := amqp.Dial("amqp://localhost")
e(err)
defer conn.Close()
fmt.Println("Connected")
rec, err := conn.Channel()
e(err)
fmt.Println("Setup receiver")
rq, err := rec.QueueDeclare("go-test", false, false, false, false, nil)
e(err)
msgs, err := rec.Consume(rq.Name, "", true, false, false, false, nil)
e(err)
fmt.Println("Setup sender")
send, err := conn.Channel()
e(err)
sq, err := send.QueueDeclare("go-test", false, false, false, false, nil)
e(err)
fmt.Println("Send message")
err = send.Publish("", sq.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("This is a test"),
})
e(err)
msg := <-msgs
fmt.Println("Received from:", rq, "msg:", string(msg.Body))
}
func e(err error) {
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
Output on my box:
$ go run rmq.go
Connected
Setup receiver
Setup sender
Send message
Received from: {go-test 0 0} msg: This is a test
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With