JavaとgoのRabbitMQでAMQPを使ってメッセージのやり取りをしてみる

JavaとgoのRabbitMQのサンプルコードを動かして通信を確認してみます。RabbitMQはMacであればbrew install rabbitmqでインストールしたものを使うこともできますが、今回は以下のdocker-compose.ymlを起動させて使います。

version: '3'
services:
  rabbitmq:
    image: rabbitmq:management
    ports:
      - "5672:5672"
      - "15672:15672"

起動後は http://localhost:15672/ にアクセスするとログインページが表示さえるのでusername, passwordはguest, guestでログインできます。

Java側ではエンキューを行うのですが、以下を参考にしますが一度エンキューを行った後にJava側ですぐにでキューしていたので
https://github.com/spring-guides/gs-messaging-rabbitmq/blob/main/complete/src/main/java/com/example/messagingrabbitmq/Runner.java
以下のように10秒間隔でエンキューを繰り返すようにします。

package com.example.messagingrabbitmq;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

    private final RabbitTemplate rabbitTemplate;
    private final Receiver receiver;

    public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
        this.receiver = receiver;
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void run(String... args) throws Exception {
        DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
        while (true) {
            System.out.println("Sending message...");
            rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName,
                    "foo.bar.baz", "Hello from RabbitMQ! :" + LocalDateTime.now().format(df));
            Thread.sleep(10000);
        }
    }

}

それからreceiver側のbeanを無効にします。
https://github.com/spring-guides/gs-messaging-rabbitmq/blob/main/complete/src/main/java/com/example/messagingrabbitmq/MessagingRabbitmqApplication.java

次にgo側ですがライブラリは以下を使います。
https://github.com/streadway/amqp

サンプルコードは以下を使います。
https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/go/receive.go

サンプルからの修正点としてはキューの名前を変えるだけで問題なく動きます。今回はjava側がspring-bootのキューにエンキューしていたのでそれに合わせます。

package main

import (
    "log"
    "github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "spring-boot", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    var forever chan struct{}

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

go側のchannnelの関数の引数についてはソース側のコメントを見れば良いかと思います。
https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/channel.go

実行結果は以下のようにメッセージ受信できていることが確認できます。

022/12/04 20:34:52  [*] Waiting for messages. To exit press CTRL+C
2022/12/04 20:34:52 Received a message: Hello from RabbitMQ! :2022/12/04 20:34:48.098
2022/12/04 20:34:58 Received a message: Hello from RabbitMQ! :2022/12/04 20:34:58.103
2022/12/04 20:35:08 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:08.104
2022/12/04 20:35:18 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:18.105
2022/12/04 20:35:28 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:28.111
2022/12/04 20:35:38 Received a message: Hello from RabbitMQ! :2022/12/04 20:35:38.112