RustでのQuickFix実装日誌_2020/09/22

RustでQuickFixの実装を進めてまして以下のJavaでの実装のサンプルにあるBanzaiの動きをRustで書いています。 github.com

現在の進捗としてlogonを投げたあとHeartbeatを投げるところまで実装しています、 github.com

ログを見ると以下のように交互にHeartbeatを投げることが確認できました。

    Finished dev [unoptimized + debuginfo] target(s) in 1.38s
     Running `target\debug\quickfixr.exe`
[2020-09-22T04:56:34Z INFO  quickfixr::quickfix::application] start
[2020-09-22T04:56:34Z INFO  quickfixr::quickfix::application] init stream start
[2020-09-22T04:56:34Z INFO  quickfixr::quickfix::application] init stream end
[2020-09-22T04:56:34Z INFO  quickfixr::quickfix::application] init app start
[2020-09-22T04:56:34Z INFO  quickfixr::quickfix::application] init app end
[2020-09-22T04:56:34Z INFO  quickfixr::quickfix::application] "BANZAI=>EXEC: 8=FIX.4.4\u{1}9=67\u{1}35=A\u{1}34=1\u{1}49=BANZAI\u{1}52=20200922-04:56:34\u{1}56=EXEC\u{1}98=0\u{1}108=10\u{1}141=Y\u{1}10=065\u{1}"
[2020-09-22T04:56:34Z INFO  quickfixr::quickfix::application] "EXEC=>BANZAI: 8=FIX.4.4\u{1}9=76\u{1}35=A\u{1}9=71\u{1}34=1\u{1}49=EXEC\u{1}52=20200922-04:56:34.325\u{1}56=BANZAI\u{1}98=0\u{1}108=10\u{1}141=Y\u{1}10=232\u{1}"    
[2020-09-22T04:56:44Z INFO  quickfixr::quickfix::application] "BANZAI=>EXEC: 8=FIX.4.4\u{1}9=49\u{1}35=0\u{1}34=2\u{1}49=BANZAI\u{1}52=20200922-04:56:44\u{1}56=EXEC\u{1}10=238\u{1}"
[2020-09-22T04:56:44Z INFO  quickfixr::quickfix::application] "EXEC=>BANZAI: 8=FIX.4.4\u{1}9=58\u{1}35=0\u{1}9=53\u{1}34=2\u{1}49=EXEC\u{1}52=20200922-04:56:44.323\u{1}56=BANZAI\u{1}10=147\u{1}"
[2020-09-22T04:56:54Z INFO  quickfixr::quickfix::application] "BANZAI=>EXEC: 8=FIX.4.4\u{1}9=49\u{1}35=0\u{1}34=3\u{1}49=BANZAI\u{1}52=20200922-04:56:54\u{1}56=EXEC\u{1}10=240\u{1}"
[2020-09-22T04:56:54Z INFO  quickfixr::quickfix::application] "EXEC=>BANZAI: 8=FIX.4.4\u{1}9=58\u{1}35=0\u{1}9=53\u{1}34=3\u{1}49=EXEC\u{1}52=20200922-04:56:54.323\u{1}56=BANZAI\u{1}10=149\u{1}"
error: process didn't exit successfully: `target\debug\quickfixr.exe` (exit code: 0xc000013a, STATUS_CONTROL_C_EXIT)
^C

Rustでの実装で気を付けたことですが、以下のcontroller内で必要なスレッドを立ち上げてjoinしています。self_init_app()の部分でアプリケーションの初期化を行っているのですが、これは複数スレッドで共有する必要があるので関数の定義はfn init_app(&mut self) -> Arc<Mutex<Application>>となっており、Arc<Mutex<T>>を返すようにしています。

    pub fn start(&mut self, f: fn(Sender<Message>)) {
        info!("start");
        let mut app = self.init_app();
        let mut tcp_stream = app.lock().unwrap().stream_clone();
        let (tx, rx) = mpsc::channel::<Message>();
        let mut read_handler = self.read(tcp_stream, Arc::clone(&app));
        let mut send_handler_tx = self.send_tx(tx.clone(), f);
        let mut send_handler_rx = self.send_rx(Arc::clone(&app), rx);
        let mut heart_beat_handler =
            self.send_heart_beat(app.lock().unwrap().heart_beat as u64, tx.clone());

        app.lock().unwrap().start();
        for handle in vec![
            read_handler,
            send_handler_tx,
            send_handler_rx,
            heart_beat_handler,
        ] {
            handle.join().unwrap();
        }
    }

コントローラーが立ち上げている各スレッドについて

各スレッドで直接アプリケーションのメソッドを直接呼び出すようにしたくなかったので以下のようにチャンネル経由でメッセージを受け取ってアプリケーションを呼び出すためのスレッドを用意しています。

    fn send_rx(
        &mut self,
        application: Arc<Mutex<Application>>,
        rx: Receiver<Message>,
    ) -> JoinHandle<()> {
        thread::spawn(move || loop {
            let message = rx.recv().unwrap();
            application.lock().unwrap().send(message);
        })
    }

それからチャンネル経由でアプリケーションにメッセージを送るためのスレッドを用意しています。メッセージの呼び出し元は呼び出し元が自由に定義できるようにクロージャで用意しています。

    fn send_tx(&mut self, tx: Sender<Message>, f: fn(Sender<Message>)) -> JoinHandle<()> {
        thread::spawn(move || f(tx))
    }

通信先から受け取ったメッセージを受け取った都度表示をしたいのでstreamを読み込み続けるスレッドを作っています。

    fn read(
        &mut self,
        tcp_stream: TcpStream,
        application: Arc<Mutex<Application>>,
    ) -> JoinHandle<()> {
        thread::spawn(move || {
            let mut tcp_reader = BufReader::new(&tcp_stream);
            loop {
                let message = message_read::res_read(BufReader::new(&tcp_stream));
                application.lock().unwrap().receive(message);
            }
        })
    }

最後にハートビートを定期的に送るためのスレッドを作っています。

    fn send_heart_beat(&mut self, heart_beat_interval: u64, tx: Sender<Message>) -> JoinHandle<()> {
        thread::spawn(move || loop {
            thread::sleep(Duration::from_secs(heart_beat_interval));
            tx.send(heart_beat_message());
        })
    }