Rust聊天室
这篇文章,我将带大家使用Rust来搭建一个简单的聊天室。这个聊天室我们分两部分来实现:服务端和客户端;
一、服务端
在服务端,我们要实现监听端口、接收消息和转发消息的功能。
1.监听端口
在该聊天室项目中,我们采用TCP来完成通信功能。在Rust中我们可以使用标准库std下面的net模块中的TcpListener结构来监听指定端口。
TcpListener的解释如下:
一个Tcp Socket服务器,监听到该端口的任何连接。
TcpListener的使用示例如下:
use std::net::TcpListener;//导入TcpListener结构
const LOCAL_HOST : &str = "127.0.0.1:8080";//设置监听的地址和端口
//创建TcpListener监听器
let listener = TcpListener.bind(LOCAL_HOST).expect("Failed to create TcpListener");
//将监听器设置为非阻塞模式
listener.set_nonblocking(true).expect("Cannot set non-blocking")
在上面代码中,我们使用到了TcpListener的bind()方法和set_nonblocking方法,这两个方法的解释如下:
pub fn bind<A:ToSocketAddrs>(addr:A) -> Result<TcpListener>: 创建一个连接到传入的具体地址的TCP监听器,该方法将返回一个可以接收连接的TCP监听器。
pub fn set_nonblocking(&self,nonblocking:bool) -> Result<()>: 将TCP流设置为阻塞或者非阻塞模式。
关于阻塞和非阻塞的概念可以参考这篇博客
在实现端口监听对象的创建后,我们需要在服务端使用一个数组来保存所有连接到服务端的客户端,方便后续的信息转发。并且在listener监听到连接后,将客户端的socket保存到该数组。
let mut clients = vec![];//创建数组
//接受连接
if let Ok((mut socket,address)) = listener.accept(){
clients.push(socket.try_clone().expect("Failed to clone client"));//向数组插入客户端对象
}
在这里我们用到了TcpListener的accept方法,这个方法的解释如下:
pub fn accept(&self) -> Result<(TcpStream,SocketAddr)> : 该方法会接收一个新的到服务端TCP监听器的连接,并返回一个TCP流对象和对应的Socket地址。
因此,在上述的接收到的socket,即客户端到服务端的TCP流,插入到数组中,方便后续向多个客户端转发消息。
在完成端口监听和连接接收后,接下来我们要完成的是服务端的消息接收和消息转发功能。由于我们的服务端一般都是需要实现同时收发消息的,所以消息的接收和消息的转发是需要分别在不同的线程内去完成的。例如,我们可以在线程A监听客户端发送的消息C,在线程B向其他客户端转发这条消息C,那我们在Rust中如何保证消息C能在线程A和B之间传递呢?
Rust的标准库std中,在同步sync模块中的高级对象mpsc中一个叫做channel的mpsc队列。所谓的mpsc就是指multi-producer&single consumer。所以,我们可以在服务端使用channel来作为消息队列,对应的负责接收客户端消息的多个子线程就相当于消息的producer,而负责转发消息的子线程就相当于consumer。
use std::sync::mpsc;
//创建消息队列
let (sender,receiver) = mpsc::<String>();
2.接收消息
在接收到客户端的连接后,我们除了要将客户端的流插入到数组外,还需要单独创建一个子线程,用来监听这个客户端发送的消息。
use std::thread;
use std::io::{ErrorKind,Read,Write};
let sd = sender.clone();//复制一个消息队列的生产者
//创建子线程
thread::spawn(move || loop{
//创建一个指定大小的信息缓存区
let mut buffer = vec![0;32];
//socket是指TCPListener的accept获取到的连接的客户端TCP流
match socket.read_exact(&mut buffer){//读取TCP流中的消息
Ok(_) =>{//获取成功
let message = buffer.into_iter().take_while(|&x| x!=0).collect::<Vec<_>>();//从缓冲区中读取信息
let message = String::from_utf8(message).expect("Invalid utf8 message");//将信息转换为utf8格式
sd.send(message).expect("Failed to send message to receiver");;//将消息发送到消息队列
},
Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),//阻塞错误
Err(_) => {//发生错误
//处理错误
break;//结束线程
}
}
//线程休眠
thread::sleep(::std::time::Duration::from_millis(100));
});
首先,我们创建了一个大小为32的数组来作为信息的缓冲区,然后使用accept得到的socket来读取流中的信息。这个socket的类型是TcpStream。
TcpStream : 标准库std中的net模块的一种结构;
通过查看标准库文档,可以看到TcpStream这个结构实现了Read这个Trait。在这里,我们定义了缓冲区的大小为32,所以我们希望每次都读满整个缓冲区,因为使用了Read这个Trait中的read_exact方法。
fn read_exact(&mut self,buf : &mut [u8]) -> Result<()>:
该方法从流中读取了特定的字节数的数据来填满传入的参数buf
在这里创建了一个大小为32的缓冲区,且由于read_exact这个方法的特性,造成本文实现的程序有一个缺陷:不能传输超过32字节数的信息(这将在后续的文章进行改进)
从上面接收信息的具体程序可以看出,当获取信息成功时,先对信息进行转码,然后使用信道生产者sender将该信息传送到消息队列中。
3.转发消息
转发消息功能主要做的工作就是:从消息队列获取消息,然后转发给每一个客户端。
if let Ok(message) = receiver.try_recv(){//从队列获取信息
let msg = message.clone();
println!("Message [{}] is received.",msg);
//转发给每一个客户端
clients = clients.into_iter().filter_map(|mut client|{
let mut buffer = message.clone().into_bytes();//将消息放入缓冲区
buffer.resize(MESSAGE_SIZE,0);
client.write_all(&buffer).map(|_| client).ok()
}).collect::<Vec<_>>();
}
在这里使用filter_map和collect这两个方法,来辅助消息的转发过程,在这个过程中,将转发失败(即下线的客户端)删除,避免下次继续转发这个客户端。
同样地,在这里还有个小Bug,就是在转发的过程中,没有仔细地去判断信息的来源,从而导致发送该信息的客户端,同样地也会接收到这条信息,这将会在后续的文章进行改进
二、客户端
在该聊天室的客户端实现中,客户端需要完成以下三个功能:连接服务端、发送消息和接收消息。
1.连接服务端
在服务端建立监听指定地址的对象时,使用的结构TcpListener,而我们在客户端中去连接该客户端时,要使用TcpStream结构来帮助实现。
TcpStream : 本地与远程服务端之间的socket对象,可通过connect连接方法或者accept方法生成。
连接的代码如下所示:
use std::net::TcpStream;
use std::sync::mpsc::{self};
const LOCAL_HOST : &str = "127.0.0.1:8080";//服务端地址
const MESSAGE_SIZE : usize = 32;//缓冲区大小
let mut client = TcpStream::connect(LOCAL_HOST).expect("Failed to connect");//连接服务端
client.set_nonblocking(true).expect("Failed to intiate non-blocking");//设置为非阻塞模式
let (sender,receiver) = mpsc::channel::<String>();
这里创建了一个消息队列,该消息队列主要用于发送信息的功能。
2.发送消息
发送消息的完成步骤可以分为两步:(1)读取用户在命令行发送的消息;(2)将该消息发送到服务端
为了不影响主线程从命令终端读取用户的输入,消息的发送时放在子线程中,因此前面创建的消息队列就起到了连接消息发送功能的步骤(1)和步骤(2)的作用。
读取用户输入的代码如下:
use std::io::{self};
loop{//不断等待从终端读取信息
let mut buffer = String::new();
io::stdin().read_line(&mut buf).expect("Failed to read from stdin");
let message = buffer.trim().to_string();
if message == "exit" || sender.send(message).is_err{
break;
}
}
该代码还添加了判断,如果用户输入的是“exit”,则表示退出客户端,或者信息往消息队列发送消息失败时退出程序。
在读取到了用户输入的内容后,并将其输入到消息队列,我们就将在子线程中向服务端发送用户输入的信息:
use std::sync::mpsc::{self,TryRecvError};
//从消息队列接收消息
match receiver.try_recv(){
Ok(message) => {//接收成功
let mut buffer = message.clone().into_bytes();
buffer.resize(MESSAGE,0);
client.write_all(&buffer).expect("Failed to write to socket");
},
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => {
break;
}
}
3.接收消息
客户端接收服务端转发的消息的代码也是运行在子线程,该代码比较简单。
match client.read_exact(&mut buffer){
Ok(_) =>{
let message = buffer.into_iter().take_while(|&x| x!=0).collect::<Vec<_>>();
let message = str::from_utf8(&message).unwrap();
println!("Message: {:?}",message);
},
Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
Err(_) =>{
println!("Connection with server was served");
break;
}
}
三、总结
本文的实现工程可以访问该仓库。接下来,针对上述提出的bug,我将通过一系列文章来解决并完整这个Rust聊天室项目。