JavaとgoのRabbitMQでAMQPを使ってメッセージのやり取りをしてみる
JavaとgoのRabbitMQのサンプルコードを動かして通信を確認してみます。RabbitMQはMacであればbrew install rabbitmqでインストールしたものを使うこともできますが、今回は以下のdocker-compose.ymlを起動させて使います。
version: '3'
services:
rabbitmq:
image: rabbitmq:management
ports:
- "5672:5672"
- "15672:15672"
起動後は http://localhost:15672/ にアクセスするとログインページが表示さえるのでusername, passwordはguest, guestでログインできます。
Java側ではエンキューを行うのですが、以下を参考にしますが一度エンキューを行った後にJava側ですぐにでキューしていたので
https://github.com/spring-guides/gs-messaging-rabbitmq/blob/main/complete/src/main/java/com/example/messagingrabbitmq/Runner.java
以下のように10秒間隔でエンキューを繰り返すようにします。
package com.example.messagingrabbitmq;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws Exception {
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
while (true) {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName,
"foo.bar.baz", "Hello from RabbitMQ! :" + LocalDateTime.now().format(df));
Thread.sleep(10000);
}
}
}
それからreceiver側のbeanを無効にします。
https://github.com/spring-guides/gs-messaging-rabbitmq/blob/main/complete/src/main/java/com/example/messagingrabbitmq/MessagingRabbitmqApplication.java
次にgo側ですがライブラリは以下を使います。
https://github.com/streadway/amqp
サンプルコードは以下を使います。
https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go/receive.go
サンプルからの修正点としてはキューの名前を変えるだけで問題なく動きます。今回はjava側がspring-bootのキューにエンキューしていたのでそれに合わせます。
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"spring-boot", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
go側のchannnelの関数の引数についてはソース側のコメントを見れば良いかと思います。
https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/channel.go
実行結果は以下のようにメッセージ受信できていることが確認できます。
022/12/04 20:34:52 [*] Waiting for messages. To exit press CTRL+C 2022/12/04 20:34:52 Received a message: Hello from RabbitMQ! :2022/12/04 20:34:48.098 2022/12/04 20:34:58 Received a message: Hello from RabbitMQ! :2022/12/04 20:34:58.103 2022/12/04 20:35:08 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:08.104 2022/12/04 20:35:18 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:18.105 2022/12/04 20:35:28 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:28.111 2022/12/04 20:35:38 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:38.112