性能提升:优化客户端访问和服务端读写性能

本课程为精品小课,不标配音频

你好,我是文强。

到了第 10 课我们其实就完成了本次课程主体部分的开发。这节课我们主要来看一下如何提升集群的性能和可用性。

集群的性能提升可以分为客户端和服务端两部分,先来看下图:

图片

这是 Placement Center 的读写示意图。从技术上分析,重点可以关注以下三个方面来提升性能和可用性。

  1. 客户端基于连接池复用连接,避免连接频繁地创建、删除,从而提升性能。

  2. 实现自动化重试机制,以解决当出现可恢复的异常时(比如网络抖动),可以自动进行重试,从而提升请求的成功率。

  3. 均摊服务端多个节点之间的压力,由 Leader 节点负责写请求,所有节点负责读请求,避免 Leader 节点的单点瓶颈。

第 5 课 我们基于 gRPC 框架实现的网络层的基础上,我们来看一下在 Rust 中如何实现 gPRC 的连接池。

基于 mobc 库实现连接池

图片

如上图所示,连接池的原理本质上就是通过预先创建一批连接,并将可用连接保持在一定数量范围内。当客户端发起访问时,从连接池取出可用连接,从而避免每次创建/销毁连接产生的时间和资源开销。

从代码实现角度看,连接池的实现并不复杂,就是细节比较多,比如连接被动关闭时如何自动创建连接,如何保证连接不超过最大可用连接,空闲连接回收,连接心跳保持等等。

为了节省工作量,我们直接选择Rust 的连接池库 mobc 来实现我们 gRPC 的连接池。mobc 库本身就不展开细讲了,你可以直接参考 《官方文档》。建议你先看完官方文档,再来看接下来的实现,会更容易理解。

从代码实现角度来看,基于 mobc 库来实现连接池主要包含下面两步:

  1. 实现 mobc 中名为 Manager 的 trait。

  2. 创建连接池,并将连接池变为一个全局可访问的变量。

先来看第 1 点,我们通过 KvServiceManager 来实现 Manager trait。完整代码在 《kv/mod.rs》 文件中,有兴趣可以去查看。

#![allow(unused)]
fn main() {
#[derive(Clone)]
pub struct KvServiceManager {
    pub addr: String,
}

impl KvServiceManager {
    pub fn new(addr: String) -> Self {
        Self { addr }
    }
}

#[tonic::async_trait]
impl Manager for KvServiceManager {
    // 使用 Rust 类型别名的语法,将类型 KvServiceClient<Channel> 重命名为Connection
    // 使用别名的好处是,你可以自定义connect方法的返回值,因为connect的返回值是 Self::Connection
    // 在这里我们将KvServiceClient<Channel>重命名为Connection,因此connect方法的返回值就是KvServiceClient<Channel>

    type Connection = KvServiceClient<Channel>;

    // 如上
    type Error = CommonError;

    // 实现 Manager trait 中的 connect 方法
    // 该方法用来返回 GRPC KvService 可用连接
    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
         // 创建一个 GRPC KvService 的连接
         match KvServiceClient::connect(format!("http://{}", self.addr.clone())).await {
            Ok(client) => {
                // 返回一个可用的 GRPC KvService 连接
                return Ok(client);
            }
            Err(err) => return Err(CommonError::CommmonError(format!(
                "{},{}",
                err.to_string(),
                self.addr.clone()
            ))),
        };
    }

    // 实现 Manager trait 中的 check 方法
    // 该方法用来检查 GRPC KvService 连接是否可用
    async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
        Ok(conn)
    }
}

}

在上面的代码中,我们创建了 KvServiceManager 来实现 mobc 库中的 Manager trait,实现了 trait 中的 connect 和 check 方法,功能分别是创建一个可用的连接、检查连接是否可用。

再来看第 2 点,如何在多个线程中都可以使用这个连接池。从 Rust 代码来看,核心思路就是有一个全局的变量,并且在不同的线程中使用该变量。

因为我们肯定会有多个服务端节点和多个 gRPC Service,因此我们 需要管理多个服务端节点及其对应的gRPC Service 的连接池。这里我们是通过一个名为 ClientPool 的结构体来管理这些连接池。

来看一下 ClientPool 的代码。

#![allow(unused)]
fn main() {
#[derive(Clone)]
pub struct ClientPool {

    // 定义每个连接池的最大连接数
    max_open_connection: u64,

    // 使用 DashMap 来存储每个服务端IP 和GRPC Service对应的连接池
    placement_center_kv_service_pools: DashMap<String, Pool<KvServiceManager>>,
}

impl ClientPool {

    pub fn new(max_open_connection: u64) -> Self {
        Self {
            max_open_connection,
            placement_center_kv_service_pools: DashMap::with_capacity(2),
        }
    }

    pub async fn placement_center_kv_services_client(
        &self,
        addr: String,
    ) -> Result<Connection<KvServiceManager>, CommonError> {
        let module = "KvServices".to_string();

        // 根据模块和地址构建一个 key,唯一标识服务器和GRPC Service
        let key = format!("{}_{}_{}", "PlacementCenter", module, addr);
        // 判断连接池是否存在
        if !self.placement_center_kv_service_pools.contains_key(&key) {
            // 创建一个连接池
            let manager = KvServiceManager::new(addr.clone());
            let pool = Pool::builder()
                .max_open(self.max_open_connection)
                .build(manager);

            // 将连接池存储在 map 中
            self.placement_center_kv_service_pools
                .insert(key.clone(), pool);
        }

        // 从 map 中取出目标 IP 对应的连接池
        if let Some(poll) = self.placement_center_kv_service_pools.get(&key) {
            // 从连接池获取可用的连接
            match poll.get().await {
                Ok(conn) => {
                    return Ok(conn);
                }
                Err(e) => {
                    return Err(CommonError::NoAvailableGrpcConnection(
                        module,
                        e.to_string(),
                    ));
                }
            };
        }

        return Err(CommonError::NoAvailableGrpcConnection(
            module,
            "connection pool is not initialized".to_string(),
        ));
    }
}

}

这段代码并不复杂,核心逻辑是通过 DashMap 缓存每个 IP 和 gRPC Service 组成的二元组对应的连接池。当需要访问某个服务器的 Service 时,就从 DashMap 中取出对应的连接池,获取可用连接,并访问服务。

接下来看一下连接池如何使用。

#![allow(unused)]
fn main() {
 // 初始化ClientPoll
 let client_poll: Arc<ClientPool> = Arc::new(ClientPool::new(5));

 // 从连接池获取可用连接
 match client_poll.placement_center_kv_services_client(addr).await {
     Ok(client) => {
        let key = "mq".to_string();
        let value = "robustmq".to_string();
        let request = tonic::Request::new(SetRequest {
            key: key.clone(),
            value: value.clone(),
        });

        let _ = client.set(request).await.unwrap();
     }
     Err(e) => {

     }
}

}

上面这段代码比较简单,就不展开讲了。不过这里有个细节问题,就是它执行一次就会返回。如果出现网络抖动错误,就会直接失败。从使用者的角度,我们会希望它能够自动进行重试

所以接下来我们来实现统一的自动重试机制。

客户端实现统一的自动重试机制

从原理上看,自动重试的思路不复杂,核心逻辑就是将上面的代码装到一个 loop { } 中,当请求成功时退出循环,如果失败则根据重试策略进行重试。伪代码如下所示:

#![allow(unused)]
fn main() {
  let mut times = 1;
  loop{
   // 初始化ClientPoll
   let client_poll: Arc<ClientPool> = Arc::new(ClientPool::new(5));

   // 从连接池获取可用连接
   match client_poll.placement_center_kv_services_client(addr).await {
       Ok(client) => {
          let key = "mq".to_string();
          let value = "robustmq".to_string();
          let request = tonic::Request::new(SetRequest {
              key: key.clone(),
              value: value.clone(),
          });

          match client.set(request).await{
            Ok(_) => break;
            Err(e) => {
                // 达到最大次数时,退出循环
               if times > retry_times() {
                    break
               }
               // 重试次数 + 1
               times = times + 1;
            }
          }
       }
       Err(e) => {
       }
  }
 }

}

这段代码也很好理解,但是有一个问题,就是因为 上面只是client.set 的调用,我们还会有client.get/delete/exists的调用,那每一个都是像上面这样加一个 loop 循环和重试次数的判断吗? 有没有更简单的方法呢?

答案肯定是不能每个方法都有loop + 重试策略,这样代码就太不优雅了,并且后续如果要改重试策略,那么就特别繁琐。

接下来我们以 Set 方法举例,来介绍一下我们的实现。

tips:我们这种写法不是唯一的写法,可能也不是最好的写法。你可以按照需求写一个自己的实现,和我们的实现对比,看思路有哪些差异。

从实现来看,会涉及到下面四个方法:

  1. placement_set:封装了连接池和重试机制的 Kv Service 的 Set 方法。它接受连接池 client_poll、服务端地址列表、请求参数来完成 Set 请求的调用。

  2. retry_call:重试策略的核心代码,统一封装了重试策略。

  3. kv_interface_call:因为 gRPC 的特性是,每一个 Service 都有一个独立的 Client,比如 KVService 就有一个 KVClient,因此就需要对每个 Service 的调用做一个分流。

  4. inner_set:封装 KVService Set 调用的统一逻辑。

下面我们来看一下这四个方法的主要逻辑,完整代码你可以看 《placement/kv》

  • placement_set
#![allow(unused)]
fn main() {
pub async fn placement_set(
    client_poll: Arc<ClientPool>,
    addrs: Vec<String>,
    request: SetRequest,
) -> Result<CommonReply, CommonError> {
    // 将SetRequest 转化为 vec 类型,递交给 retry_call 处理
    let request_data = SetRequest::encode_to_vec(&request);
    match retry_call(
        // 定义这次调用是 KvClient
        PlacementCenterService::Kv,
        // 定义这次调用的是KvClient 的 set 方法
        PlacementCenterInterface::Set,
        // client poll、addrs、request_data 请求方法
        client_poll,
        addrs,
        request_data,
    )
    .await
    {
        // 将返回结果 decode 为CommonReply类型,并返回
        Ok(data) => match CommonReply::decode(data.as_ref()) {
            Ok(da) => return Ok(da),
            Err(e) => return Err(CommonError::CommmonError(e.to_string())),
        },
        Err(e) => {
            return Err(e);
        }
    }
}

}
  • retry_call
#![allow(unused)]
fn main() {
async fn retry_call(
    service: PlacementCenterService,
    interface: PlacementCenterInterface,
    client_poll: Arc<ClientPool>,
    addrs: Vec<String>,
    request: Vec<u8>,
) -> Result<Vec<u8>, CommonError> {
    let mut times = 1;

    loop {
        let index = times % addrs.len();
        let addr = addrs.get(index).unwrap().clone();
        let result = match service {
            // 执行 Kv Service 的方法
            PlacementCenterService::Kv => {
                kv_interface_call(
                    interface.clone(),
                    client_poll.clone(),
                    addr.clone(),
                    request.clone(),
                )
                .await
            }
        };

        match result {
            Ok(data) => {
                return Ok(data);
            }
            Err(e) => {
                error!(
                    "{:?}@{:?}@{},{},",
                    service.clone(),
                    interface.clone(),
                    addr.clone(),
                    e
                );
                // 定义最大重试次数
                if times > retry_times() {
                    return Err(e);
                }
                times = times + 1;
            }
        }
        // 定义重试的退避策略
        sleep(Duration::from_secs(retry_sleep_time(times) as u64)).await;
    }
}

}
  • kv_interface_call
#![allow(unused)]
fn main() {
pub(crate) async fn kv_interface_call(
    interface: PlacementCenterInterface,
    client_poll: Arc<ClientPool>,
    addr: String,
    request: Vec<u8>,
) -> Result<Vec<u8>, CommonError> {
    // 获取 Kv Client
    match kv_client(client_poll.clone(), addr.clone()).await {
        Ok(client) => {
            // 执行对应的 set、delete、get、exists 方法
            let result = match interface {
                PlacementCenterInterface::Set => inner_set(client, request.clone()).await,
                PlacementCenterInterface::Delete => inner_delete(client, request.clone()).await,
                PlacementCenterInterface::Get => inner_get(client, request.clone()).await,
                PlacementCenterInterface::Exists => inner_exists(client, request.clone()).await,
                _ => return Err(CommonError::CommmonError(format!(
                    "kv service does not support service interfaces [{:?}]",
                    interface
                ))),
            };
            // 返回结果
            match result {
                Ok(data) => return Ok(data),
                Err(e) => {
                    return Err(e);
                }
            }
        }
        Err(e) => {
            return Err(e);
        }
    }
}

}
  • inner_set
#![allow(unused)]
fn main() {
pub(crate) async fn inner_set(
    mut client: Connection<KvServiceManager>,
    request: Vec<u8>,
) -> Result<Vec<u8>, CommonError> {
    // 将请求 decode 为SetRequest
    match SetRequest::decode(request.as_ref()) {
        // 调用 kvCleint 的 set 方法
        Ok(request) => match client.set(request).await {
            Ok(result) => {
                // 将返回值encode 为 vec,返回
                return Ok(CommonReply::encode_to_vec(&result.into_inner()));
            }
            Err(e) => return Err(CommonError::GrpcServerStatus(e)),
        },
        Err(e) => {
            return Err(CommonError::CommmonError(e.to_string()));
        }
    }
}

}

可以看到,上面我们 为了统一封装多个gRPC Service 的重试策略,流程分为了四步。从代码上看,核心思路是通过 match 来区分不同的 Client 和不同的方法,并进行调用。

当前这个实现的好处就是流程清晰,代码可读性比较强,不过代码看起来是比较繁琐的。所以我们在主项目 RobustMQ 中有一个更优雅的实现,但是这个优雅实现的代码可读性较差,使用了大量的泛型和 trait,有兴趣的话你也可以去参考一下。

聊完了客户端,我们来聊聊服务端的性能优化。

Leader 写和所有节点可读

从原理上看,集群中 Leader 节点同时负责写入和读取是为了解决数据一致性的问题。这种方式的好处是不管如何读写,数据都是准确的、最新的。缺点是 Leader 节点会成为集群的性能瓶颈,无法横向扩容。

在服务端性能优化中,一个核心思考点是: 服务端的性能是可以随着节点的横向扩容而增强的。为了解决横向扩容的问题,就需要把 Leader 的压力分摊到所有节点上。

从技术上看,元数据服务的业务特点是读写比例较低,也就是 写少读多。因此我们可以先把读请求的压力从 Leader 分摊到所有节点上。

那代码上看要怎么实现呢? 先来看架构图。

图片

上面的核心思路是:

  1. 客户端允许配置服务端地址列表,客户端会轮询挑选一台服务器进行访问。

  2. 服务端判断标记每一个接口是读请求还是写请求。比如 KV 存储模型中 Set/Delete 是写请求,Get/Exists 是读请求。

  3. 服务器会根据请求的类型进行处理。

    1. 如果是写请求,会先判断自己是否是 Leader,如果是则直接处理请求,如果自己是 Follwer,则将该请求转发到 Leader 进行处理;

    2. 如果是读请求,则可以直接处理请求。

第 1 点在前面的客户端代码已经实现了。在上面举例的 Set 操作的placement_set和retry_call中。placement_set 接受的服务器地址 addrs 是一个 Vec 列表,即允许配置多个服务器地址。在retry_call中,当请求处理失败后,会更换一个新的服务器进行访问,以避免某个节点无法提供服务。主要起作用的代码是:

#![allow(unused)]
fn main() {
loop{
  let index = times % addrs.len();
  let addr = addrs.get(index).unwrap().clone();
  ......
  times = times + 1;
}

}

第 2 点标记接口是读还是写请求需要人工判断。比如 KvService 中有 Set/Get/Delete/Exists 四个请求。从接口功能来看,Set 和 Delete 会改变数据的内容,所以它属于写请求。而 Get 和 Exists 只是会读取数据,因此它属于读请求。

第 3 点的实现主要是代码逻辑的处理,完整代码在 《services_kv.rs》 中,我们以 Set 和 Get 来分别讲一下写/读请求的处理。

先来看 Set 的代码:

#![allow(unused)]
fn main() {
 async fn set(&self, request: Request<SetRequest>) -> Result<Response<CommonReply>, Status> {
        let req = request.into_inner();
        //
        if req.key.is_empty() || req.value.is_empty() {
            return Err(Status::cancelled(
                RobustMQError::ParameterCannotBeNull("key or value".to_string()).to_string(),
            ));
        }

        if !self.is_leader() {
            let leader_addr = self.leader_addr();
            match placement_set(self.client_poll.clone(), vec![leader_addr], req).await {
                Ok(reply) => {
                    return Ok(Response::new(reply));
                }
                Err(e) => {
                    return Err(Status::cancelled(e.to_string()));
                }
            }
        }

        // Raft state machine is used to store Node data
        let data = StorageData::new(StorageDataType::KvSet, SetRequest::encode_to_vec(&req));
        match self
            .placement_center_storage
            .apply_propose_message(data, "set".to_string())
            .await
        {
            Ok(_) => return Ok(Response::new(CommonReply::default())),
            Err(e) => {
                return Err(Status::cancelled(e.to_string()));
            }
        }
    }

}

上面这段代码在上节课已经讲过,只是添加了下面这段代码:

#![allow(unused)]
fn main() {
        if !self.is_leader() {
            let leader_addr = self.leader_addr();
            match placement_set(self.client_poll.clone(), vec![leader_addr], req).await {
                Ok(reply) => {
                    return Ok(Response::new(reply));
                }
                Err(e) => {
                    return Err(Status::cancelled(e.to_string()));
                }
            }
        }


}

即判断当前节点是否是 Leader,如果不是Leader,则获取 Leader 的地址,并将请求转发到 Leader 节点进行处理。is_leader 和 leader_addr 的逻辑如下:

#![allow(unused)]

fn main() {
    pub fn is_leader(&self) -> bool {
        return self.placement_cluster.read().unwrap().is_leader();
    }

    pub fn leader_addr(&self) -> String {
        return self.placement_cluster.read().unwrap().leader_addr();
    }

}

结合 第 9 课 可以知道,Leader 信息是由 Raft 状态机来维护的。当 Leader 发生切换时,Raft 状态机就会触发 Leader 信息的变更。

也就是说如果是写请求,就通过is_leader判断本节点是否是 Leader,是的话就正常处理,否的话就通过leader_addr方法获取 Leader 地址,并通过我们在这节课前半部分开发的客户端将请求转发给 Leader 进行处理。

再来看一下 Get 的代码:

#![allow(unused)]

fn main() {
    async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetReply>, Status> {
        let req = request.into_inner();
        if req.key.is_empty() {
            return Err(Status::cancelled(
                RobustMQError::ParameterCannotBeNull("key".to_string()).to_string(),
            ));
        }

        // 实例化KvStorage
        let kv_storage = KvStorage::new(self.rocksdb_engine_handler.clone());
        let mut reply = GetReply::default();
        // 从 RocksDB 中获取对应 Key 的数据
        match kv_storage.get(req.key) {
            Ok(Some(data)) => {
                reply.value = data;
                return Ok(Response::new(reply));
            }
            Ok(None) => {}
            Err(e) => return Err(Status::cancelled(e.to_string())),
        }

        return Ok(Response::new(reply));
    }

}

上面的代码比较简单,就是直接通过 KvStorage 从 RocksDB 中获取对应的 Key 数据。因为 Get 是读请求,则不需要进行判断转发的逻辑,直接进行逻辑处理即可。

到了这里,如下图所示,我们就完成了 Leader 负责写入、所有节点可读的特性了。

总结

tips:每节课的代码都能在项目 https://github.com/robustmq/robustmq-geek 中找到源码,有兴趣的同学可以下载源码来看。

这节课我们讲了通过连接池来提高客户端访问服务端的性能,并通过合适的代码实现统一封装多个 RPC Service 的重试机制,从而实现所有客户端调用都有统一的重试策略。然后通过在区分读写请求,并针对写请求做判断,从而实现了 Leader 写和所有节点可读的特性。

我们在 《深入拆解消息队列47 讲》 中讲到过,集群性能的提升包括单节点性能的提升和集群能力的提升。单节点性能的提升主要是网络层、存储层、计算层的性能的提升,在元数据服务中就是我们第 5 课(网络层)和第 6 课(存储层)的部分。

而集群能力的提升核心就是允许水平扩展,Leader 写和所有节点可读只是第一步,因为集群的写入压力都集中在 Leader 上,如果写入请求太大,那么依旧会出问题。所以下一步就是实现 Raft Group 的能力,即第 9 课说的允许集群中有多个 Leader 的存在。Raft Group 的特性,课程中我们没有展开,欢迎在交流群中讨论。

思考题

这里是本节课推荐的相关 issue 的任务列表,请点击查看 《Good First Issue》,任务列表会不间断地更新。另外欢迎给我的项目 https://github.com/robustmq/robustmq 点个 Star 啊!