Rust – Trao đổi dữ liệu luồng qua Channel


Được đăng vào ngày 30/01/2018 | 0 bình luận

Chúng ta có thể trao đổi dữ liệu giữa các luồng thông qua cơ chế channel của Rust.

Channel có thể hiểu như các ống truyền dữ liệu giữa 2 luồng bất kì, dữ liệu nào được gửi trước sẽ được xử lý trước, 2 luồng giữa 2 đầu channel sẽ đóng vai trò một bên gửi và một bên nhận, bên gửi sẽ mang kiểu Sender<T>, còn bên nhận sẽ là kiểu Receiver<T>, đây là các kiểu generic, do đó kiểu dữ liệu T của bên gửi và nhận phải giống nhau.

Khi truyền dữ liệu giữa các luồng thì dữ liệu sẽ được tạo thành các bản copy để luồng Receiver làm việc, do đó chúng ta chỉ nên truyền các dữ liệu nhỏ, nếu dữ liệu lớn sẽ ngốn rất nhiều bộ nhớ.

Tạo channel

Để tạo một channel thì chúng ta gọi hàm channel() trong module std::sync::mpsc, ví dụ:

use std::thread;
use std::sync::mpsc::channel;
use std::sync::mpsc::{ Sender, Receiver };
fn main() {
let (mySender, myReceiver): (Sender<&’static str>, Receiver<&’static str>) = channel();
}

Ngoài ra chúng ta import thêm lớp SenderReceiver từ module mpsc. Hàm channel() sẽ trả về một tuple gồm một đối tượng Sender và một đối tượng Receiver.

Gửi và nhận dữ liệu

Để gửi dữ liệu thì chúng ta gọi hàm send() từ lớp Sender, và nhận dữ liệu thông qua hàm recv(). Ví dụ:

use std::thread;
use std::sync::mpsc::channel;
use std::sync::mpsc::{ Sender, Receiver };
fn main() {
let (mySender, myReceiver): (Sender<&’static str>, Receiver<&’static str>) = channel();
thread::spawn(move || {
mySender.send("A message from phocode");
});
let res = myReceiver.recv().unwrap();
println!("{}", res);
}

Trong đoạn code trên, chúng ta gửi một chuỗi kí tự trong luồng con thông qua hàm send(), và nhận lại chuỗi kí tự đó trong luồng cha (là luồng main()). Hàm unwrap() sẽ chuyển dữ liệu lại thành đối tượng Receiver.

A message from phocode

Nếu chúng ta không gọi hàm send() mà chỉ gọi hàm recv(), tức là chỉ nhận dữ liệu từ channel mà không gửi bất kì thứ gì vào channel thì luồng luồng đó sẽ bị block, tức là bị tạm dừng, và luồng sẽ đợi cho đến khi có dữ liệu trong channel để đọc, nếu không muốn bị block thì chúng ta dùng hàm try_recv().
Cả 2 hàm send()recv() đều trả về kiểu Result, tức là một trong 2 kiểu Ok hoặc Err. Tuy nhiên khi trả về Err thì channel sẽ không làm việc nữa, nên cho dù chúng ta có bắt lỗi Err thì ứng dụng cũng ngừng hoạt động.

Trên thực tế thì chúng ta sẽ cho các luồng con làm các công việc mang nặng tính toán, rồi nhận kết quả trong luồng cha.

Trao đổi dữ liệu đồng bộ và bất đồng bộ

Cách thức gửi và nhận dữ liệu ở đoạn code trên là bất đồng bộ, có nghĩa là các luồng không bị tạm dừng khi chạy. Tuy nhiên Rust cũng có cơ chế đồng bộ các luồng với nhau. Cơ chế này hoạt động bằng cách yêu cầu luồng con gửi dữ liệu bị tạm dừng khi dữ liệu được gửi vô channel đã đầy, và luồng này sẽ đợi cho đến khi luồng cha lấy dữ liệu ra khỏi channel.

Để làm điều này thì thay vì dùng hàm channel(), chúng ta sẽ dùng hàm sync_channel(), chúng ta cũng sẽ đùng lớp SyncSender thay cho lớp Sender. Ví dụ:

use std::thread;
use std::sync::mpsc::{SyncSender, Receiver};
use std::sync::mpsc::sync_channel;
fn main() {
let (mySender, myReceiver): (SyncSender<&’static str>, Receiver<&’static str>) = sync_channel(1);
thread::spawn(move || {
mySender.send("Message 1").unwrap();
println!("Message 1 sent into the buffer");
mySender.send("Message 2").unwrap();
println!("Message 2 sent into the buffer");
});
thread::sleep_ms(3000);
let data = myReceiver.recv().unwrap();
println!("The data: {}", data);
}

Trong đoạn code trên, chúng ta tạo ra môt tuple (SyncSender, Receiver) bằng hàm sync_channel(), hàm này nhận vào tham số là 1, tức là chỉ nhận send() 1 lần là đầy bộ nhớ channel, sau đó chúng ta tạo một luồng và gọi hàm send() 2 lần, rồi ở bên ngoài luồng cha chúng ta cho đợi 3 giây, sau đó lấy dữ liệu từ ra. Chúng ta sẽ được output như sau:

Message 1 sent into the buffer
The data: Message 1
Message 2 sent into the buffer

Lý do chuỗi “Message 2 sent into the buffer” được hiện ra sau cùng là vì khi chúng ta gọi send() lần 1, thì channel đã đầy, nên luồng này bị đóng băng lại, đợi cho luồng cha lấy dữ liệu đó ra rồi lần send() thứ 2 mới có thể gọi được.

Được đăng vào ngày 30/01/2018