JavaとgoのRabbitMQでAMQPを使ってメッセージのやり取りをしてみる
JavaとgoのRabbitMQのサンプルコードを動かして通信を確認してみます。RabbitMQはMacであればbrew install rabbitmq
でインストールしたものを使うこともできますが、今回は以下のdocker-compose.ymlを起動させて使います。
version: '3' services: rabbitmq: image: rabbitmq:management ports: - "5672:5672" - "15672:15672"
起動後は http://localhost:15672/ にアクセスするとログインページが表示さえるのでusername, passwordはguest, guestでログインできます。
Java側ではエンキューを行うのですが、以下を参考にしますが一度エンキューを行った後にJava側ですぐにでキューしていたので
https://github.com/spring-guides/gs-messaging-rabbitmq/blob/main/complete/src/main/java/com/example/messagingrabbitmq/Runner.java
以下のように10秒間隔でエンキューを繰り返すようにします。
package com.example.messagingrabbitmq; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class Runner implements CommandLineRunner { private final RabbitTemplate rabbitTemplate; private final Receiver receiver; public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) { this.receiver = receiver; this.rabbitTemplate = rabbitTemplate; } @Override public void run(String... args) throws Exception { DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS"); while (true) { System.out.println("Sending message..."); rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ! :" + LocalDateTime.now().format(df)); Thread.sleep(10000); } } }
それからreceiver側のbeanを無効にします。
https://github.com/spring-guides/gs-messaging-rabbitmq/blob/main/complete/src/main/java/com/example/messagingrabbitmq/MessagingRabbitmqApplication.java
次にgo側ですがライブラリは以下を使います。
https://github.com/streadway/amqp
サンプルコードは以下を使います。
https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go/receive.go
サンプルからの修正点としてはキューの名前を変えるだけで問題なく動きます。今回はjava側がspring-bootのキューにエンキューしていたのでそれに合わせます。
package main import ( "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "spring-boot", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") var forever chan struct{} go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }
go側のchannnelの関数の引数についてはソース側のコメントを見れば良いかと思います。
https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/channel.go
実行結果は以下のようにメッセージ受信できていることが確認できます。
022/12/04 20:34:52 [*] Waiting for messages. To exit press CTRL+C 2022/12/04 20:34:52 Received a message: Hello from RabbitMQ! :2022/12/04 20:34:48.098 2022/12/04 20:34:58 Received a message: Hello from RabbitMQ! :2022/12/04 20:34:58.103 2022/12/04 20:35:08 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:08.104 2022/12/04 20:35:18 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:18.105 2022/12/04 20:35:28 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:28.111 2022/12/04 20:35:38 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:38.112