Arc<Mutex<T>>でスレッド間でデータを共有する場合のDropの必要性について

前回はArc<Mutex>でスレッド間でデータを共有する動作を確認しました。
steavevaivai.hatenablog.com

その延長で今回はTcpStreamを複数スレッドで共有し片方のスレッドは読み込み続けて、もう片方のスレッドで書き込み続ける場合の動作を確認したのですが、際に自前でMutexGuardをDropしたほうが良さそうでした。

最初に確認した実装は以下のようになります。

pub fn main() {

    let mut tcp_stream = TcpStream::connect_timeout(
        &format!("{}:{}", "127.0.0.1", "9880").parse().unwrap(),
        Duration::from_secs(1),
    )
        .expect("Could not connect.");
    tcp_stream
        .set_read_timeout(Some(Duration::from_secs(10)))
        .unwrap();
    tcp_stream
        .set_write_timeout(Some(Duration::from_secs(20)))
        .unwrap();
    let tcp_stream_arc = Arc::new(Mutex::new(tcp_stream));

    for handle in vec![
        send(Arc::clone(&tcp_stream_arc)),
        read(Arc::clone(&tcp_stream_arc)),
    ] {
        handle.join().unwrap();
    }
}

fn read(tcp_stream_arc: Arc<Mutex<TcpStream>>) -> JoinHandle<()> {
    thread::spawn( move || {

        loop {
            let mut tcp_stream = tcp_stream_arc.lock().unwrap();
            let mut tcp_reader = BufReader::new(&*tcp_stream);

            println!("read start");
            let mut buffer = [0; 100];

            let result = tcp_reader.read(&mut buffer);
            println!("read result: {:?}", result);
            if result.is_ok() {
                let buf_read = std::str::from_utf8(&buffer).unwrap();
                println!("receved: {:?}", buf_read);
            }
            println!("read end");
            thread::sleep(Duration::from_secs(1));
        }
    })
}

fn send(tcp_stream_arc: Arc<Mutex<TcpStream>>) -> JoinHandle<()> {
    thread::spawn( move || {
        loop {
            println!("send start");
            let mut tcp_stream = tcp_stream_arc.lock().unwrap();
            let message = create_factory44().logon(1, 30).to_request_string();
            println!("message: {:?}", message);
            tcp_stream.write(message.as_bytes());
            println!("send end");
            thread::sleep(Duration::from_secs(1));
        }
    })
}

適度にロックを解除してreadとsendが呼び出されたら良かったのですが、実際の実行結果は以下のようにreadの方がロックをし続ける形になっていました。

send start
read start
read result: Err(Os { code: 10060, kind: TimedOut, message: "接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでした。または接続済みのホストが応答しなかったため、確立された接続は失敗しました。" })
read end
read start
read result: Err(Os { code: 10060, kind: TimedOut, message: "接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでした。または接続済みのホストが応答しなかったため、確立された接続は失敗しました。" })
read end
read start
read result: Err(Os { code: 10060, kind: TimedOut, message: "接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでした。または接続済みのホストが応答しなかったため、確立された接続は失敗しました。" })
read end

この原因についてMutexのロックの解除については以下で説明がありまして、Mutexに対してlockした結果で得られるMutexGuardは自動的にロックを解除するDropが内部スコープの終わりで発生するようです。
状態共有 - The Rust Programming Language

今回の場合はloop がスコープになりますが、スコープから外れたあとすぐに次のループで再度ロックしていて片方のスレッドでロックを占有しているようでした。なので以下のように明示的にstd::mem::dropを呼び出すようにしてみました。

fn read(tcp_stream_arc: Arc<Mutex<TcpStream>>) -> JoinHandle<()> {
    thread::spawn( move || {

        loop {
            let mut tcp_stream = tcp_stream_arc.lock().unwrap();
            let mut tcp_reader = BufReader::new(&*tcp_stream);

            println!("read start");
            let mut buffer = [0; 100];

            let result = tcp_reader.read(&mut buffer);
            println!("read result: {:?}", result);
            if result.is_ok() {
                let buf_read = std::str::from_utf8(&buffer).unwrap();
                println!("receved: {:?}", buf_read);
            }
            println!("read end");

            std::mem::drop(tcp_reader);
            std::mem::drop(tcp_stream);
            thread::sleep(Duration::from_secs(1));
        }
    })
}

fn send(tcp_stream_arc: Arc<Mutex<TcpStream>>) -> JoinHandle<()> {
    thread::spawn( move || {
        loop {
            println!("send start");
            let mut tcp_stream = tcp_stream_arc.lock().unwrap();
            let message = create_factory44().logon(1, 30).to_request_string();
            println!("message: {:?}", message);
            tcp_stream.write(message.as_bytes());
            println!("send end");
            std::mem::drop(tcp_stream);
            thread::sleep(Duration::from_secs(1));
        }
    })
}

このようにしたところ、片方のスレッドでロックが占有されるということは避けられているようです。

send start
read start
read result: Err(Os { code: 10060, kind: TimedOut, message: "接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでした。または接続済みのホストが応答しなかったため、確立された接続は失敗しました。" })
read end
message: "8=FIX.4.4\u{1}9=61\u{1}35=A\u{1}34=1\u{1}49=BANZAI\u{1}52=20200816-13:05:42\u{1}56=EXEC\u{1}98=0\u{1}108=30\u{1}10=011\u{1}"
send end
send start
read start
read result: Ok(100)
receved: "8=FIX.4.4\u{1}9=103\u{1}35=5\u{1}34=11\u{1}49=EXEC\u{1}52=20200816-13:05:42.506\u{1}56=BANZAI\u{1}58=MsgSeqNum too low, expectin"
read end
message: "8=FIX.4.4\u{1}9=61\u{1}35=A\u{1}34=1\u{1}49=BANZAI\u{1}52=20200816-13:05:43\u{1}56=EXEC\u{1}98=0\u{1}108=30\u{1}10=012\u{1}"
send end
read start
send start
read result: Err(Os { code: 10053, kind: ConnectionAborted, message: "確立された接続がホスト コンピューターのソウトウェアによって中止されました。" })
read end
message: "8=FIX.4.4\u{1}9=61\u{1}35=A\u{1}34=1\u{1}49=BANZAI\u{1}52=20200816-13:05:44\u{1}56=EXEC\u{1}98=0\u{1}108=30\u{1}10=013\u{1}"
send end

loop内でロックをかける場合とかは明示的にDropを呼び出すことを意識したほうが良いのかもしれないです。