RustでTcpStreamを書き込みと読み込みに利用する

以前RustでTcpStreamを複数スレッドで共有し、それぞれ読み込み、書き込み用に利用するためにArc<Mutex>を使っていたのですが、改めて調べてみますとTcpStreamにはtry_clone()というメソッドがもともと用意されているようでした。 steavevaivai.hatenablog.com www.reddit.com

自前でArc<Mutex>でストリームをスレッド間で共有する場合ロックの制御が難しいと思われるのでtry_clone()を使ったほうが良さそうです。

試しにtry_cloneでストリームのクローンを生成し2つのスレッドでそれぞれ読み込み、書き込みを行ってみました。try_clone関数のコメントを見てみますと"Creates a new independently owned handle to the underlying socket"とあり、丁度やりたかったことのようです。

use std::str;
use std::thread;

use crate::quickfix::message::message_fix44::logon_message;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::thread::JoinHandle;
use std::time::Duration;

pub fn main() {
    let tcp_stream_write = TcpStream::connect_timeout(
        &format!("{}:{}", "127.0.0.1", "9880").parse().unwrap(),
        Duration::from_secs(1),
    )
    .expect("Could not connect.");

    let tcp_stream_read = tcp_stream_write.try_clone().unwrap();
    for handle in vec![send(tcp_stream_write), read(tcp_stream_read)] {
        handle.join().unwrap();
    }
}

fn read(tcp_stream: TcpStream) -> JoinHandle<()> {
    thread::spawn(move || {
        let mut tcp_reader = BufReader::new(tcp_stream);
        loop {
            let mut buffer = Vec::new();
            loop {
                tcp_reader
                    .read_until(b'', &mut buffer)
                    .expect("failed to read from the socket");

                let buf_read = str::from_utf8(&buffer).unwrap();
                println!("read: {}", buf_read);
                buffer.clear();
            }
        }
    })
}

fn send(mut tcp_stream: TcpStream) -> JoinHandle<()> {
    thread::spawn(move || loop {
        println!("input method: L(Logon)");
        let mut s = String::new();
        std::io::stdin().read_line(&mut s).ok();
        let input = s.trim();
        if input.eq("L") {
            let message = logon_message(5, 10);
            println!("send: {}", message.to_string());
            tcp_stream.write(message.to_request_string().as_bytes());
        }
    })
}

実際に動かしてみると、以下のように複数スレッドでストリームにアクセスできていることが確認できます。

input method: L(Logon)
L
send: 8=FIX.4.434=535=A49=BANZAI52=20200913-08:10:5056=EXEC98=0108=10
input method: L(Logon)
read: 8=FIX.4.4
read: 9=66
read: 35=A
read: 34=17
read: 49=EXEC
read: 52=20200913-08:10:50.848
read: 56=BANZAI
read: 98=0
read: 108=10
read: 10=020