性能提升:优化客户端访问和服务端读写性能
本课程为精品小课,不标配音频
你好,我是文强。
到了第 10 课我们其实就完成了本次课程主体部分的开发。这节课我们主要来看一下如何提升集群的性能和可用性。
集群的性能提升可以分为客户端和服务端两部分,先来看下图:
这是 Placement Center 的读写示意图。从技术上分析,重点可以关注以下三个方面来提升性能和可用性。
-
客户端基于连接池复用连接,避免连接频繁地创建、删除,从而提升性能。
-
实现自动化重试机制,以解决当出现可恢复的异常时(比如网络抖动),可以自动进行重试,从而提升请求的成功率。
-
均摊服务端多个节点之间的压力,由 Leader 节点负责写请求,所有节点负责读请求,避免 Leader 节点的单点瓶颈。
在 第 5 课 我们基于 gRPC 框架实现的网络层的基础上,我们来看一下在 Rust 中如何实现 gPRC 的连接池。
基于 mobc 库实现连接池
如上图所示,连接池的原理本质上就是通过预先创建一批连接,并将可用连接保持在一定数量范围内。当客户端发起访问时,从连接池取出可用连接,从而避免每次创建/销毁连接产生的时间和资源开销。
从代码实现角度看,连接池的实现并不复杂,就是细节比较多,比如连接被动关闭时如何自动创建连接,如何保证连接不超过最大可用连接,空闲连接回收,连接心跳保持等等。
为了节省工作量,我们直接选择Rust 的连接池库 mobc 来实现我们 gRPC 的连接池。mobc 库本身就不展开细讲了,你可以直接参考 《官方文档》。建议你先看完官方文档,再来看接下来的实现,会更容易理解。
从代码实现角度来看,基于 mobc 库来实现连接池主要包含下面两步:
-
实现 mobc 中名为 Manager 的 trait。
-
创建连接池,并将连接池变为一个全局可访问的变量。
先来看第 1 点,我们通过 KvServiceManager 来实现 Manager trait。完整代码在 《kv/mod.rs》 文件中,有兴趣可以去查看。
#![allow(unused)] fn main() { #[derive(Clone)] pub struct KvServiceManager { pub addr: String, } impl KvServiceManager { pub fn new(addr: String) -> Self { Self { addr } } } #[tonic::async_trait] impl Manager for KvServiceManager { // 使用 Rust 类型别名的语法,将类型 KvServiceClient<Channel> 重命名为Connection // 使用别名的好处是,你可以自定义connect方法的返回值,因为connect的返回值是 Self::Connection // 在这里我们将KvServiceClient<Channel>重命名为Connection,因此connect方法的返回值就是KvServiceClient<Channel> type Connection = KvServiceClient<Channel>; // 如上 type Error = CommonError; // 实现 Manager trait 中的 connect 方法 // 该方法用来返回 GRPC KvService 可用连接 async fn connect(&self) -> Result<Self::Connection, Self::Error> { // 创建一个 GRPC KvService 的连接 match KvServiceClient::connect(format!("http://{}", self.addr.clone())).await { Ok(client) => { // 返回一个可用的 GRPC KvService 连接 return Ok(client); } Err(err) => return Err(CommonError::CommmonError(format!( "{},{}", err.to_string(), self.addr.clone() ))), }; } // 实现 Manager trait 中的 check 方法 // 该方法用来检查 GRPC KvService 连接是否可用 async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> { Ok(conn) } } }
在上面的代码中,我们创建了 KvServiceManager 来实现 mobc 库中的 Manager trait,实现了 trait 中的 connect 和 check 方法,功能分别是创建一个可用的连接、检查连接是否可用。
再来看第 2 点,如何在多个线程中都可以使用这个连接池。从 Rust 代码来看,核心思路就是有一个全局的变量,并且在不同的线程中使用该变量。
因为我们肯定会有多个服务端节点和多个 gRPC Service,因此我们 需要管理多个服务端节点及其对应的gRPC Service 的连接池。这里我们是通过一个名为 ClientPool 的结构体来管理这些连接池。
来看一下 ClientPool 的代码。
#![allow(unused)] fn main() { #[derive(Clone)] pub struct ClientPool { // 定义每个连接池的最大连接数 max_open_connection: u64, // 使用 DashMap 来存储每个服务端IP 和GRPC Service对应的连接池 placement_center_kv_service_pools: DashMap<String, Pool<KvServiceManager>>, } impl ClientPool { pub fn new(max_open_connection: u64) -> Self { Self { max_open_connection, placement_center_kv_service_pools: DashMap::with_capacity(2), } } pub async fn placement_center_kv_services_client( &self, addr: String, ) -> Result<Connection<KvServiceManager>, CommonError> { let module = "KvServices".to_string(); // 根据模块和地址构建一个 key,唯一标识服务器和GRPC Service let key = format!("{}_{}_{}", "PlacementCenter", module, addr); // 判断连接池是否存在 if !self.placement_center_kv_service_pools.contains_key(&key) { // 创建一个连接池 let manager = KvServiceManager::new(addr.clone()); let pool = Pool::builder() .max_open(self.max_open_connection) .build(manager); // 将连接池存储在 map 中 self.placement_center_kv_service_pools .insert(key.clone(), pool); } // 从 map 中取出目标 IP 对应的连接池 if let Some(poll) = self.placement_center_kv_service_pools.get(&key) { // 从连接池获取可用的连接 match poll.get().await { Ok(conn) => { return Ok(conn); } Err(e) => { return Err(CommonError::NoAvailableGrpcConnection( module, e.to_string(), )); } }; } return Err(CommonError::NoAvailableGrpcConnection( module, "connection pool is not initialized".to_string(), )); } } }
这段代码并不复杂,核心逻辑是通过 DashMap 缓存每个 IP 和 gRPC Service 组成的二元组对应的连接池。当需要访问某个服务器的 Service 时,就从 DashMap 中取出对应的连接池,获取可用连接,并访问服务。
接下来看一下连接池如何使用。
#![allow(unused)] fn main() { // 初始化ClientPoll let client_poll: Arc<ClientPool> = Arc::new(ClientPool::new(5)); // 从连接池获取可用连接 match client_poll.placement_center_kv_services_client(addr).await { Ok(client) => { let key = "mq".to_string(); let value = "robustmq".to_string(); let request = tonic::Request::new(SetRequest { key: key.clone(), value: value.clone(), }); let _ = client.set(request).await.unwrap(); } Err(e) => { } } }
上面这段代码比较简单,就不展开讲了。不过这里有个细节问题,就是它执行一次就会返回。如果出现网络抖动错误,就会直接失败。从使用者的角度,我们会希望它能够自动进行重试 。
所以接下来我们来实现统一的自动重试机制。
客户端实现统一的自动重试机制
从原理上看,自动重试的思路不复杂,核心逻辑就是将上面的代码装到一个 loop { } 中,当请求成功时退出循环,如果失败则根据重试策略进行重试。伪代码如下所示:
#![allow(unused)] fn main() { let mut times = 1; loop{ // 初始化ClientPoll let client_poll: Arc<ClientPool> = Arc::new(ClientPool::new(5)); // 从连接池获取可用连接 match client_poll.placement_center_kv_services_client(addr).await { Ok(client) => { let key = "mq".to_string(); let value = "robustmq".to_string(); let request = tonic::Request::new(SetRequest { key: key.clone(), value: value.clone(), }); match client.set(request).await{ Ok(_) => break; Err(e) => { // 达到最大次数时,退出循环 if times > retry_times() { break } // 重试次数 + 1 times = times + 1; } } } Err(e) => { } } } }
这段代码也很好理解,但是有一个问题,就是因为 上面只是client.set 的调用,我们还会有client.get/delete/exists的调用,那每一个都是像上面这样加一个 loop 循环和重试次数的判断吗? 有没有更简单的方法呢?
答案肯定是不能每个方法都有loop + 重试策略,这样代码就太不优雅了,并且后续如果要改重试策略,那么就特别繁琐。
接下来我们以 Set 方法举例,来介绍一下我们的实现。
tips:我们这种写法不是唯一的写法,可能也不是最好的写法。你可以按照需求写一个自己的实现,和我们的实现对比,看思路有哪些差异。
从实现来看,会涉及到下面四个方法:
-
placement_set:封装了连接池和重试机制的 Kv Service 的 Set 方法。它接受连接池 client_poll、服务端地址列表、请求参数来完成 Set 请求的调用。
-
retry_call:重试策略的核心代码,统一封装了重试策略。
-
kv_interface_call:因为 gRPC 的特性是,每一个 Service 都有一个独立的 Client,比如 KVService 就有一个 KVClient,因此就需要对每个 Service 的调用做一个分流。
-
inner_set:封装 KVService Set 调用的统一逻辑。
下面我们来看一下这四个方法的主要逻辑,完整代码你可以看 《placement/kv》。
- placement_set
#![allow(unused)] fn main() { pub async fn placement_set( client_poll: Arc<ClientPool>, addrs: Vec<String>, request: SetRequest, ) -> Result<CommonReply, CommonError> { // 将SetRequest 转化为 vec 类型,递交给 retry_call 处理 let request_data = SetRequest::encode_to_vec(&request); match retry_call( // 定义这次调用是 KvClient PlacementCenterService::Kv, // 定义这次调用的是KvClient 的 set 方法 PlacementCenterInterface::Set, // client poll、addrs、request_data 请求方法 client_poll, addrs, request_data, ) .await { // 将返回结果 decode 为CommonReply类型,并返回 Ok(data) => match CommonReply::decode(data.as_ref()) { Ok(da) => return Ok(da), Err(e) => return Err(CommonError::CommmonError(e.to_string())), }, Err(e) => { return Err(e); } } } }
- retry_call
#![allow(unused)] fn main() { async fn retry_call( service: PlacementCenterService, interface: PlacementCenterInterface, client_poll: Arc<ClientPool>, addrs: Vec<String>, request: Vec<u8>, ) -> Result<Vec<u8>, CommonError> { let mut times = 1; loop { let index = times % addrs.len(); let addr = addrs.get(index).unwrap().clone(); let result = match service { // 执行 Kv Service 的方法 PlacementCenterService::Kv => { kv_interface_call( interface.clone(), client_poll.clone(), addr.clone(), request.clone(), ) .await } }; match result { Ok(data) => { return Ok(data); } Err(e) => { error!( "{:?}@{:?}@{},{},", service.clone(), interface.clone(), addr.clone(), e ); // 定义最大重试次数 if times > retry_times() { return Err(e); } times = times + 1; } } // 定义重试的退避策略 sleep(Duration::from_secs(retry_sleep_time(times) as u64)).await; } } }
- kv_interface_call
#![allow(unused)] fn main() { pub(crate) async fn kv_interface_call( interface: PlacementCenterInterface, client_poll: Arc<ClientPool>, addr: String, request: Vec<u8>, ) -> Result<Vec<u8>, CommonError> { // 获取 Kv Client match kv_client(client_poll.clone(), addr.clone()).await { Ok(client) => { // 执行对应的 set、delete、get、exists 方法 let result = match interface { PlacementCenterInterface::Set => inner_set(client, request.clone()).await, PlacementCenterInterface::Delete => inner_delete(client, request.clone()).await, PlacementCenterInterface::Get => inner_get(client, request.clone()).await, PlacementCenterInterface::Exists => inner_exists(client, request.clone()).await, _ => return Err(CommonError::CommmonError(format!( "kv service does not support service interfaces [{:?}]", interface ))), }; // 返回结果 match result { Ok(data) => return Ok(data), Err(e) => { return Err(e); } } } Err(e) => { return Err(e); } } } }
- inner_set
#![allow(unused)] fn main() { pub(crate) async fn inner_set( mut client: Connection<KvServiceManager>, request: Vec<u8>, ) -> Result<Vec<u8>, CommonError> { // 将请求 decode 为SetRequest match SetRequest::decode(request.as_ref()) { // 调用 kvCleint 的 set 方法 Ok(request) => match client.set(request).await { Ok(result) => { // 将返回值encode 为 vec,返回 return Ok(CommonReply::encode_to_vec(&result.into_inner())); } Err(e) => return Err(CommonError::GrpcServerStatus(e)), }, Err(e) => { return Err(CommonError::CommmonError(e.to_string())); } } } }
可以看到,上面我们 为了统一封装多个gRPC Service 的重试策略,流程分为了四步。从代码上看,核心思路是通过 match 来区分不同的 Client 和不同的方法,并进行调用。
当前这个实现的好处就是流程清晰,代码可读性比较强,不过代码看起来是比较繁琐的。所以我们在主项目 RobustMQ 中有一个更优雅的实现,但是这个优雅实现的代码可读性较差,使用了大量的泛型和 trait,有兴趣的话你也可以去参考一下。
聊完了客户端,我们来聊聊服务端的性能优化。
Leader 写和所有节点可读
从原理上看,集群中 Leader 节点同时负责写入和读取是为了解决数据一致性的问题。这种方式的好处是不管如何读写,数据都是准确的、最新的。缺点是 Leader 节点会成为集群的性能瓶颈,无法横向扩容。
在服务端性能优化中,一个核心思考点是: 服务端的性能是可以随着节点的横向扩容而增强的。为了解决横向扩容的问题,就需要把 Leader 的压力分摊到所有节点上。
从技术上看,元数据服务的业务特点是读写比例较低,也就是 写少读多。因此我们可以先把读请求的压力从 Leader 分摊到所有节点上。
那代码上看要怎么实现呢? 先来看架构图。
上面的核心思路是:
-
客户端允许配置服务端地址列表,客户端会轮询挑选一台服务器进行访问。
-
服务端判断标记每一个接口是读请求还是写请求。比如 KV 存储模型中 Set/Delete 是写请求,Get/Exists 是读请求。
-
服务器会根据请求的类型进行处理。
-
如果是写请求,会先判断自己是否是 Leader,如果是则直接处理请求,如果自己是 Follwer,则将该请求转发到 Leader 进行处理;
-
如果是读请求,则可以直接处理请求。
-
第 1 点在前面的客户端代码已经实现了。在上面举例的 Set 操作的placement_set和retry_call中。placement_set 接受的服务器地址 addrs 是一个 Vec 列表,即允许配置多个服务器地址。在retry_call中,当请求处理失败后,会更换一个新的服务器进行访问,以避免某个节点无法提供服务。主要起作用的代码是:
#![allow(unused)] fn main() { loop{ let index = times % addrs.len(); let addr = addrs.get(index).unwrap().clone(); ...... times = times + 1; } }
第 2 点标记接口是读还是写请求需要人工判断。比如 KvService 中有 Set/Get/Delete/Exists 四个请求。从接口功能来看,Set 和 Delete 会改变数据的内容,所以它属于写请求。而 Get 和 Exists 只是会读取数据,因此它属于读请求。
第 3 点的实现主要是代码逻辑的处理,完整代码在 《services_kv.rs》 中,我们以 Set 和 Get 来分别讲一下写/读请求的处理。
先来看 Set 的代码:
#![allow(unused)] fn main() { async fn set(&self, request: Request<SetRequest>) -> Result<Response<CommonReply>, Status> { let req = request.into_inner(); // if req.key.is_empty() || req.value.is_empty() { return Err(Status::cancelled( RobustMQError::ParameterCannotBeNull("key or value".to_string()).to_string(), )); } if !self.is_leader() { let leader_addr = self.leader_addr(); match placement_set(self.client_poll.clone(), vec![leader_addr], req).await { Ok(reply) => { return Ok(Response::new(reply)); } Err(e) => { return Err(Status::cancelled(e.to_string())); } } } // Raft state machine is used to store Node data let data = StorageData::new(StorageDataType::KvSet, SetRequest::encode_to_vec(&req)); match self .placement_center_storage .apply_propose_message(data, "set".to_string()) .await { Ok(_) => return Ok(Response::new(CommonReply::default())), Err(e) => { return Err(Status::cancelled(e.to_string())); } } } }
上面这段代码在上节课已经讲过,只是添加了下面这段代码:
#![allow(unused)] fn main() { if !self.is_leader() { let leader_addr = self.leader_addr(); match placement_set(self.client_poll.clone(), vec![leader_addr], req).await { Ok(reply) => { return Ok(Response::new(reply)); } Err(e) => { return Err(Status::cancelled(e.to_string())); } } } }
即判断当前节点是否是 Leader,如果不是Leader,则获取 Leader 的地址,并将请求转发到 Leader 节点进行处理。is_leader 和 leader_addr 的逻辑如下:
#![allow(unused)] fn main() { pub fn is_leader(&self) -> bool { return self.placement_cluster.read().unwrap().is_leader(); } pub fn leader_addr(&self) -> String { return self.placement_cluster.read().unwrap().leader_addr(); } }
结合 第 9 课 可以知道,Leader 信息是由 Raft 状态机来维护的。当 Leader 发生切换时,Raft 状态机就会触发 Leader 信息的变更。
也就是说如果是写请求,就通过is_leader判断本节点是否是 Leader,是的话就正常处理,否的话就通过leader_addr方法获取 Leader 地址,并通过我们在这节课前半部分开发的客户端将请求转发给 Leader 进行处理。
再来看一下 Get 的代码:
#![allow(unused)] fn main() { async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetReply>, Status> { let req = request.into_inner(); if req.key.is_empty() { return Err(Status::cancelled( RobustMQError::ParameterCannotBeNull("key".to_string()).to_string(), )); } // 实例化KvStorage let kv_storage = KvStorage::new(self.rocksdb_engine_handler.clone()); let mut reply = GetReply::default(); // 从 RocksDB 中获取对应 Key 的数据 match kv_storage.get(req.key) { Ok(Some(data)) => { reply.value = data; return Ok(Response::new(reply)); } Ok(None) => {} Err(e) => return Err(Status::cancelled(e.to_string())), } return Ok(Response::new(reply)); } }
上面的代码比较简单,就是直接通过 KvStorage 从 RocksDB 中获取对应的 Key 数据。因为 Get 是读请求,则不需要进行判断转发的逻辑,直接进行逻辑处理即可。
到了这里,如下图所示,我们就完成了 Leader 负责写入、所有节点可读的特性了。
总结
tips:每节课的代码都能在项目 https://github.com/robustmq/robustmq-geek 中找到源码,有兴趣的同学可以下载源码来看。
这节课我们讲了通过连接池来提高客户端访问服务端的性能,并通过合适的代码实现统一封装多个 RPC Service 的重试机制,从而实现所有客户端调用都有统一的重试策略。然后通过在区分读写请求,并针对写请求做判断,从而实现了 Leader 写和所有节点可读的特性。
我们在 《深入拆解消息队列47 讲》 中讲到过,集群性能的提升包括单节点性能的提升和集群能力的提升。单节点性能的提升主要是网络层、存储层、计算层的性能的提升,在元数据服务中就是我们第 5 课(网络层)和第 6 课(存储层)的部分。
而集群能力的提升核心就是允许水平扩展,Leader 写和所有节点可读只是第一步,因为集群的写入压力都集中在 Leader 上,如果写入请求太大,那么依旧会出问题。所以下一步就是实现 Raft Group 的能力,即第 9 课说的允许集群中有多个 Leader 的存在。Raft Group 的特性,课程中我们没有展开,欢迎在交流群中讨论。
思考题
这里是本节课推荐的相关 issue 的任务列表,请点击查看 《Good First Issue》,任务列表会不间断地更新。另外欢迎给我的项目 https://github.com/robustmq/robustmq 点个 Star 啊!