RustでCrossbeamのグリーンスレッドを試してみる
Rustでは所有権やライフタイムがあるおかげで安全な実装をすることが出来ますが、Rustを始めたばかりの人は同じインスタンスに対し複数スレッドで一つのスレッドで監視をして、もう一つのスレッドで操作するといったことを行いたくなった場合大分てこずると思います。自分も色々調べて大分てこずっているので、分かったことをまとめてみたいと思います。
Rustで扱えるスレッドについてはOSスレッドとグリーンスレッドがあり、RustBookの以下のドキュメントに説明があります。
https://doc.rust-jp.rs/book/second-edition/ch16-01-threads.html
OSスレッドはOSのAPI呼び出しによって得られるスレッドになっていまして、グリーンスレッドは仮想マシン上で管理されているスレッドになります。OSスレッドの方が性能が良いのですがRustの所有権のライフタイムによりてこずっていました。
例えば、定期的なランダム入力と標準入力を受けるためのスレッドを以下のように分けようとしたばあ、
#[derive(Debug)] struct Executor { count: i16 } impl Executor { fn new() -> Self { Self { count: 0 } } pub fn countup(&mut self) { self.count += 1; } pub fn start(&mut self) { let (tx, rx) = mpsc::channel(); let txc = mpsc::Sender::clone(&tx); thread::spawn(move || { self.start_random(tx); }); thread::spawn(move || { self.start_input(txc); }); for message in rx { self.receive_message(message); } } fn start_random(&mut self, tx: std::sync::mpsc::Sender<Message>) -> () { loop { ...省略 } } fn start_input(&mut self, tx: std::sync::mpsc::Sender<Message>) -> () { loop { ...省略 } } fn receive_message(&mut self, message: Message) { println!("receive: {:?}", message); if message.method.eq("up") { self.count += 1; } else if message.method.eq("down") { self.count -= 1; } println!("count: {:?}", self.count); } }
これをコンパイルすると以下のようにライフタイムのチェックでエラーが出ます。
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements --> src\main.rs:58:27 | 58 | thread::spawn(move || { | ___________________________^ 59 | | self.start_random(tx); 60 | | }); | |_____________^ |
Rust標準のスレッド処理であるthread::spawnで生成されるのはOSスレッドでありきちんと制御しないと複数スレッドで同じインスタンスにアクセスするのは難しいと思います。
それでRustでグリーンスレッドが扱えるcrossbeamを試してみます。crossbeamではスコープ付きスレッドを扱えるので、OSスレッドを扱うよりだいぶ楽になります。crossbeamを使うように修正すると以下のようになり、crossbeamのscopeに渡すクロージャのスコープを使いまわすことで複数スレッドを束ねています。
pub fn start(&mut self) { crossbeam::scope(|scope| { let (tx, rx) = mpsc::channel(); let txc = mpsc::Sender::clone(&tx); self.start_random(scope, tx); self.start_input(scope, txc); for message in rx { self.receive_message(message); } }); } fn start_random(&mut self, scope: &crossbeam::thread::Scope<'_>, tx: std::sync::mpsc::Sender<Message>) -> () { scope.spawn(move |s| { loop { let mut rng = rand::thread_rng(); let random_val: f64 = rng.gen(); let message = if random_val > 0.5 { Message::new(String::from("up"), String::from("random")) } else { Message::new(String::from("down"), String::from("random")) }; send!(tx, message); thread::sleep(Duration::from_secs(1)); } }); } fn start_input(&mut self, scope: &crossbeam::thread::Scope<'_>, tx: std::sync::mpsc::Sender<Message>) -> () { scope.spawn(move |s| { loop { let tx1 = tx.clone(); println!("input: U(Up) or D(Down)"); let mut s = String::new(); std::io::stdin().read_line(&mut s).ok(); let input = s.trim(); let messageOp = match input { "U" => Some(Message::new(String::from("up"), String::from("input"))), "D" => Some(Message::new(String::from("dodn"), String::from("input"))), _ => { println!("invalid input: {:?}", input); None } }; match messageOp { Some(message) => {send!(tx, message);} _ => {} } } }); }
プログラムの全体像は以下のようになりました。
extern crate crossbeam; use rand::prelude::*; use std::thread; use std::time::Duration; use std::sync::mpsc; use actix_rt::System; use std::sync::mpsc::SendError; #[derive(Debug, Clone)] struct Message { method: String, from: String } impl Message { pub fn new(method: String, from: String) -> Self { Message { method, from } } } #[derive(Debug)] struct Executor { count: i16 } macro_rules! send { ($tx:tt, $message:ident) => { let mut count = 0; loop { let result = $tx.send($message.clone()); match result { Err (_) => { println!("send error:"); count += 1; if count > 10 { println!("send error over 10 times"); break; } thread::sleep(Duration::from_millis(50)); } Ok(_) => { break; } } } } } impl Executor { fn new() -> Self { Self { count: 0 } } pub fn start(&mut self) { crossbeam::scope(|scope| { let (tx, rx) = mpsc::channel(); let txc = mpsc::Sender::clone(&tx); self.start_random(scope, tx); self.start_input(scope, txc); for message in rx { self.receive_message(message); } }); } fn start_random(&mut self, scope: &crossbeam::thread::Scope<'_>, tx: std::sync::mpsc::Sender<Message>) -> () { scope.spawn(move |s| { loop { let mut rng = rand::thread_rng(); let random_val: f64 = rng.gen(); let message = if random_val > 0.5 { Message::new(String::from("up"), String::from("random")) } else { Message::new(String::from("down"), String::from("random")) }; send!(tx, message); thread::sleep(Duration::from_secs(1)); } }); } fn start_input(&mut self, scope: &crossbeam::thread::Scope<'_>, tx: std::sync::mpsc::Sender<Message>) -> () { scope.spawn(move |s| { loop { let tx1 = tx.clone(); println!("input: U(Up) or D(Down)"); let mut s = String::new(); std::io::stdin().read_line(&mut s).ok(); let input = s.trim(); let messageOp = match input { "U" => Some(Message::new(String::from("up"), String::from("input"))), "D" => Some(Message::new(String::from("dodn"), String::from("input"))), _ => { println!("invalid input: {:?}", input); None } }; match messageOp { Some(message) => {send!(tx, message);} _ => {} } } }); } fn send_message(self, tx: std::sync::mpsc::Sender<Message>, message: Message) { tx.send(message); } fn receive_message(&mut self, message: Message) { println!("receive: {:?}", message); if message.method.eq("up") { self.count += 1; } else if message.method.eq("down") { self.count -= 1; } println!("count: {:?}", self.count); } } pub fn main() { let mut executor = Executor::new(); executor.start(); }
実行結果は以下のようになり、複数スレッドで同一のインスタンスでアクセスできていることが確認できます。
input: U(Up) or D(Down) receive: Message { method: "up", from: "random" } count: 1 receive: Message { method: "up", from: "random" } count: 2 receive: Message { method: "down", from: "random" } count: 1 U input: U(Up) or D(Down) receive: Message { method: "up", from: "input" } count: 2