分布式:实现集群化、多副本KV存储引擎

本课程为精品小课,不标配音频

你好,我是文强。

这节课我们将在前三节课基于 Raft 协议构建的集群的基础上,将 第 6 课 实现的单机 KV 存储扩展为集群化、多副本的 KV 存储引擎。

从技术上看,分布式系统都是基于多副本来实现数据的高可靠存储。因此接下来,我们先来看一下副本和我们前面构建的 Raft 集群的关系。

集群和副本

先来看下图:

图片

可以看到,这是一个由 3 个节点组成的、基于 Raft 协议构建的集群,它包含 1 个 Leader 节点和 2 个 Follower 节点。因此写入数据时,数据会先写入 Leader 节点,Leader 节点再将数据分发到两个 Follower 节点。

从技术上看这个集群只有一个 Raft 状态机,这个状态机由 3 个 Voter(Node)组成。所以整个集群只能有一个Leader,也就是在上图中 一个节点就是一个副本。如下图所示,如果需要需要新增一个副本,就需要再增加一个节点。

图片

那么就会有一个问题: 一个节点上能运行多个副本吗? 答案肯定是可以的。来看下图:

图片

为了在一个节点上运行多个副本,我们首先抽象了 Shard(分片)的概念。从技术上看,一个 Shard 对应一个 Raft 状态机,也就是说每个 Shard 都有自己的 Leader 和 Follower。上图中有 3 个 Shard,每个 Shard 有 1 个 Leader、2个 Follower。因此总共有 3 个Leader及其对应的 6 个 Follower。同一个 Shard 对应的 Leader 和 Follower 都分布在不同的节点上。这是 因为副本是为了容灾而存在的,因此同一个 Shard 的不同副本在同一个物理节点上是没有意义的

所以说,如果要在一个节点上运行多个副本,那么就需要集群中有多组副本,也就是说一个集群需要运行多个状态机,也就是我们前面提到的状态机组(比如Raft Group)。

tips: 在本次课程中,我们只会实现一个集群只有一个状态机的模式,如果你需要了解状态机组的实现,我们可以在交流群中讨论。

了解了副本和集群的关系后,接下来我们来看一下,如何将单机的 KV 存储扩展为集群化、多副本的 KV 存储,在开发上需要做哪些事情。

从技术上看,主要有下面三部分的工作:

  1. 网络层:即客户端通过该网络层往集群读写 KV 数据。

  2. 一致性协议:集群会通过一致性协议(Raft)在 Leader、Follower 节点之间同步数据。从实现上看,这部分是指网络层、状态机、存储层三部分的交互。

  3. 存储层:单机维度基于 RocksDB 实现数据的持久化存储。

在第 6 课我们其实已经完成了基于 RocksDB 的存储层的开发。接下来我们来实现 网络层一致性协议 这两个部分的开发。

先来看网络层的实现。

网络层的实现

我们依旧基于 gRPC 来构建我们的网络层,先来看 proto 的定义。

#![allow(unused)]
fn main() {
syntax = "proto3";
package kv;

service KvService {
  rpc set(SetRequest) returns(CommonReply){}

  rpc delete(DeleteRequest) returns(CommonReply){}

  rpc get(GetRequest) returns(GetReply){}

  rpc exists(ExistsRequest) returns(ExistsReply){}
}

message SetRequest{
    string key = 1;
    string value = 2;
}

message GetRequest{
    string key = 1;
}

message GetReply{
    string value = 1;
}

message DeleteRequest{
    string key = 1;
}

message ExistsRequest{
    string key = 1;
}

message ExistsReply{
    bool flag = 1;
}

message CommonReply{

}

}

在上面的 proto 中,可以看到,我们定义了 set/get/delete/exists 四个方法及其对应的请求和返回参数,分别对应存储层 KvStorage 里面的 set/get/delete/exists 四个操作。

接下来以 Set 方法举例,看一下 gRPC Service 部分代码的实现。

#![allow(unused)]
fn main() {
async fn set(&self, request: Request<SetRequest>) -> Result<Response<CommonReply>, Status> {
        let req = request.into_inner();

        // 参数校验
        if req.key.is_empty() || req.value.is_empty() {
            return Err(Status::cancelled(
                CommonError::ParameterCannotBeNull("key or value".to_string()).to_string(),
            ));
        }

        // 构建StorageData类型的数据
        let data = StorageData::new(StorageDataType::KvSet, SetRequest::encode_to_vec(&req));

        // 将数据交给 Raft 状态机处理
        match self
            .placement_center_storage
            .apply_propose_message(data, "set".to_string())
            .await
        {
            Ok(_) => return Ok(Response::new(CommonReply::default())),
            Err(e) => {
                return Err(Status::cancelled(e.to_string()));
            }
        }
    }

}

这段代码很简单,获取请求参数,判断参数是否为空,构建StorageData类型的数据。然后通过apply_propose_message 方法将 KV 数据传给 Raft 状态机处理,并等待 Raft 状态机的处理结果。

如下所示,StorageData 是一个自定义的数据结构,用来在网络层和状态机之间交互。data_type 表示这是一个什么类型的操作,比如 Kv 的 Set 或 Delete 操作,value 表示具体的数据内容。

#![allow(unused)]
fn main() {
pub struct StorageData {
    pub data_type: StorageDataType,
    pub value: Vec<u8>,
}

}

在上面的代码中,你可以看到,当网络层收到请求后,是通过调用 apply_propose_message 方法来将消息传递给 Raft 状态机,通过状态机内部的一致性协议来实现数据持久化存储的。

所以接下来,我们来看 apply_propose_message 方法内部的实现,也就是网络层和状态机的交互过程。

网络层和状态机的交互

先来看一张图,这张图是我们在上节课讲到的网络层通过 Tokio Channel 和状态机进行交互的示意图。

图片

整个交互过程可以分为 请求返回 两步,先来看请求。

请求 是指网络层将 KV 数据传递给 Raft 状态机处理的过程。从技术上看,它是通过 tokio 的 mpsc 类型的 channel 来完成的。

具体流程是:首先初始化一个 mpsc 类型的 channel。

#![allow(unused)]
fn main() {
let (raft_message_send, raft_message_recv) = mpsc::channel::<RaftMessage>(1000);

}

网络层使用 send(tokio::sync::mpsc::Sender)方法将数据传递给 channel。RawNode 使用 recv(tokio::sync::mpsc::Receiver)接收从网络层传递过来的数据(比如用户写入的数据),并驱动 Raft 运行。

再来看返回。

返回 是指 Raft 状态机处理数据成功或失败后,通知网络层处理结果的过程。从技术上看,这个过程是通过一个 oneshot 类型的 channel 来实现的。如下所示:

#![allow(unused)]
fn main() {
let (sx, rx) = oneshot::channel::<RaftResponseMesage>();

}

网络层会调用 oneshot 的 recv 方法接收状态机的运行结果,状态机会将处理结果写入到 sender 中,从而完成交互。

接下来来看整个过程的核心代码段的实现,完整代码请参考 《apply.rs》

#![allow(unused)]
fn main() {
    pub async fn apply_propose_message(
        &self,
        data: StorageData,
        action: String,
    ) -> Result<(), CommonError> {

        // 定义oneshot channel,用于接收 raft 状态机的返回
        let (sx, rx) = oneshot::channel::<RaftResponseMesage>();

        // 将数据传给 Raft 状态机处理,比等待 raft 状态机的处理结果
        return Ok(self
            .apply_raft_status_machine_message(
                RaftMessage::Propose {
                    data: serialize(&data).unwrap(),
                    chan: sx,
                },
                action,
                rx,
            )
            .await?);
    }

    async fn apply_raft_status_machine_message(
        &self,
        message: RaftMessage,
        action: String,
        rx: Receiver<RaftResponseMesage>,
    ) -> Result<(), RobustMQError> {

        // 将消息发送给状态机,状态机向前驱动时会处理该消息
        let _ = self.raft_status_machine_sender.send(message).await;

        // 等待状态机对该消息的处理结果,比如成功或者失败
        if !self.wait_recv_chan_resp(rx).await {
            return Err(RobustMQError::RaftLogCommitTimeout(action));
        }
        return Ok(());
    }

    async fn wait_recv_chan_resp(&self, rx: Receiver<RaftResponseMesage>) -> bool {
        // 定义状态机处理消息的最长时间,如果超过 30s 消息没处理成功,就表示写入失败
        let res = timeout(Duration::from_secs(30), async {
            match rx.await {
                Ok(val) => {
                    return val;
                }
                Err(_) => {
                    return RaftResponseMesage::Fail;
                }
            }
        });

        // 等待状态机的处理结果
        match res.await {
            Ok(_) => return true,
            Err(_) => {
                return false;
            }
        }
    }

}

可以看到,上面代码有apply_propose_message、apply_raft_status_machine_message、wait_recv_chan_resp 三个方法,它们的主要功能是:

  • apply_propose_message:在网络层存储数据时调用,用于保存用户从客户端写入的数据。

  • apply_raft_status_machine_message:给 apply_propose_message 方法调用,主要完成将数据发送给 Raft 状态机处理,并等待 Raft 状态机的处理结果,这两个事情。

  • wait_recv_chan_resp:给 apply_raft_status_machine_message 调用,通过 timeout 和 oneshot 等待 Raft 状态机的处理结果。

那状态机是如何处理数据的呢?这个答案你去回顾 第 9 课 就知道了,这里不再重复讲了。

不知道你注意到没有,我们在第 9 课中是没有讲状态机是如何处理业务数据的。比如我们写了一个 KV 数据到集群,此时状态机是如何持久化存储这条KV数据的呢?

tips:这里说的 KV 数据是指客户端写入的数据,不是 Raft 自身运行产生的 Raft Log。这两类数据的区别你可以去回顾一下第7、8、9 这三节课。

接下来,我们就来看一下状态机是如何存储这些业务数据的。

状态机和存储层的交互

直接来看代码,完整代码在 《machine.rs》 中,状态机和存储层的交互我们只要关注 handle_committed_entries 方法就可以了,来看下面这段代码:

#![allow(unused)]
fn main() {
    fn handle_committed_entries(
        &mut self,
        raft_node: &mut RawNode<RaftRocksDBStorage>,
        entrys: Vec<Entry>,
    ) {
        let data_route = self.data_route.write().unwrap();
        // 循环处理 Entry
        for entry in entrys {
            // 如果 Entry 的 data 为空,就不需要处理
            if !entry.data.is_empty() {
                info!("ready entrys entry type:{:?}", entry.get_entry_type());
                // 根据不同raft-rs 库定义的不同的消息类型,进行不同的逻辑处理
                // 在 raft-rs 库中,主要分为分为EntryNormal和EntryConfChange两种类型的消息。
                // EntryNormal 表示客户业务消息
                // EntryConfChange 表示 Raft 集群内的消息,比如节点的上线下线
                match entry.get_entry_type() {
                    EntryType::EntryNormal => {
                        // 用户的消息数据直接通过 DataRoute 方法进行分发处理
                        match data_route.route(entry.get_data().to_vec()) {
                            Ok(_) => {}
                            Err(err) => {
                                error!("{}", err);
                            }
                        }
                    }
                    EntryType::EntryConfChange => {
                        let change = ConfChange::decode(entry.get_data())
                            .map_err(|e| tonic::Status::invalid_argument(e.to_string()))
                            .unwrap();
                        let id = change.get_node_id();
                        let change_type = change.get_change_type();
                        match change_type {
                            // 处理增加节点的消息
                            ConfChangeType::AddNode => {
                                match deserialize::<BrokerNode>(change.get_context()) {
                                    Ok(node) => {
                                        let mut cls = self.placement_cluster.write().unwrap();
                                        cls.add_peer(id, node);
                                    }
                                    Err(e) => {
                                        error!("Failed to parse Node data from context with error message {:?}", e);
                                    }
                                }
                            }

                            // 处理节点移除的消息
                            ConfChangeType::RemoveNode => {
                                let mut cls = self.placement_cluster.write().unwrap();
                                cls.remove_peer(id);
                            }
                            _ => unimplemented!(),
                        }

                        if let Ok(cs) = raft_node.apply_conf_change(&change) {
                            let _ = raft_node.mut_store().set_conf_state(cs);
                        }
                    }
                    EntryType::EntryConfChangeV2 => {}
                }
            }

            // 消息处理成功后提交更新最新的 commit index
            let idx: u64 = entry.get_index();
            let _ = raft_node.mut_store().commmit_index(idx);

            // 通过上面提到的 oneshot channel 通知网络层消息处理成功
            match deserialize(entry.get_context()) {
                Ok(seq) => match self.resp_channel.remove(&seq) {
                    Some(chan) => match chan.send(RaftResponseMesage::Success) {
                        Ok(_) => {}
                        Err(_) => {
                            error!("commit entry Fails to return data to chan. chan may have been closed");
                        }
                    },
                    None => {}
                },
                Err(_) => {}
            }
        }
    }

}

参考代码的注释,我们可以知道,handle_committed_entries 中是状态机和存储层交互的入口方法,它的核心逻辑是: 处理状态机中已经可以被 commit 的 Entry

从 Raft 协议可以知道,当业务数据提交给 Raft Leader 节点处理时,此时这个消息还没分发给 Follower,因此这个消息是 uncommit 的消息,也就是 uncommit 的 Entry。只有当集群中超过半数的节点处理成功后,这个消息才会变为 commit 的 Entry。

handle_committed_entries 会拿到可以被 commit 的 Entry。拿到消息后,就需要遍历处理这些消息,这些消息分为 客户端写入的消息Raft 本身产生的消息 两种类型,这两种消息有不同的处理逻辑。

这里我们只讲如何处理客户端写入的消息,Raft 本身产生的消息的处理你可以直接看一下代码,比较简单,有需要的话我们交流群中讨论。

在上面代码中,可以看到客户端写入的消息是由 data_route.route(entry.get_data().to_vec()) 来处理的。DataRoute 的代码如下:

#![allow(unused)]
fn main() {
pub struct DataRoute {
    rocksdb_engine_handler: Arc<RocksDBEngine>,
}

impl DataRoute {
    pub fn new(rocksdb_engine_handler: Arc<RocksDBEngine>) -> DataRoute {
        return DataRoute {
            rocksdb_engine_handler,
        };
    }

    pub fn route(&self, data: Vec<u8>) -> Result<(), RobustMQError> {
        let storage_data: StorageData = deserialize(data.as_ref()).unwrap();
        match storage_data.data_type {
            StorageDataType::KvSet => {
               // 调用KvStorage的 set 方法持久化存储数据
                let kv_storage = KvStorage::new(self.rocksdb_engine_handler.clone());
                let req: SetRequest = SetRequest::decode(data.as_ref()).unwrap();
                return kv_storage.set(req.key, req.value);
            }

            StorageDataType::KvDelete => {
                // 调用KvStorage的 delete 方法删除持久化的数据
                let kv_storage = KvStorage::new(self.rocksdb_engine_handler.clone());
                let req: DeleteRequest = DeleteRequest::decode(data.as_ref()).unwrap();
                return kv_storage.delete(req.key);
            }
        }
    }
}

}

这段代码很简单,就是根据前面提到的结构体 StorageData 中的 data_type 来区分不同的数据类型,然后调用在第 6 课创建的 KvStorage 对象中的对应的方法来持久化存储数据。

讲到这里,我们就完成了集群化、多副本 KV 存储引擎的开发,接下来我们来总结并展示一下效果。

效果展示

先来看一张全流程示意图:

图片

如上图所示:

  1. 使用 gRPC 客户端调用名为 Set 的 RPC 方法,往集群的 Leader 节点写入 KV 数据。

  2. Leader 节点收到数据后,通过 apply_propose_message 方法将消息传递给 Raft 状态机进行处理。

  3. Raft 状态机根据 Raft 协议的规范将消息分发到多台 Follower 节点。

  4. 当超过半数的 Raft Node 收到消息后,就可以 commit 这个 Entry。

  5. 当有可以 commit 的 Entry 后,Raft 状态机会调用 handle_committed_entries 方法,handle_committed_entries 再调用 DataRoute 的 route 方法来完成数据的持久化存储。

最后来写个Set 方法的测试用例,演示一下如何将数据存储到集群中。

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use std::net::TcpStream;

    use axum::http::request;
    use protocol::kv::{
        kv_service_client::KvServiceClient, DeleteRequest, ExistsReply, ExistsRequest, GetRequest,
        SetRequest,
    };

    #[tokio::test]
    async fn kv_test() {
        let mut client = KvServiceClient::connect("http://127.0.0.1:8871")
            .await
            .unwrap();
        let key = "mq".to_string();
        let value = "robustmq".to_string();
        let request = tonic::Request::new(SetRequest {
            key: key.clone(),
            value: value.clone(),
        });

        let _ = client.set(request).await.unwrap();
    }
}

}

总结

tips:每节课的代码都能在项目 https://github.com/robustmq/robustmq-geek 中找到源码,有兴趣的同学可以下载源码来看。

这节课,我们把第 3~9 课的内容全部串起来了,主要把第 5 课基于 gRPC 的网络层、第 6 课基于 RocksDB 的存储层,第 7、8、9 课基于 Raft 协议的集群串联起来,完成了单机 KV 存储到分布式、集群化、多副本的KV 存储的演进。

从技术上来看,演进的核心就是我们在 7、8、9 课中基于 Raft 构建的分布式集群。从这一点也可以知道,为什么我们总会看到一个论点: 分布式的核心是一致性协议。这是因为一致性协议决定了数据在集群中的流动、同步的方式。

这节课我们没有把所有的代码都贴上来,所以建议你多去看看 《robustmq-geek》 中的代码,多揣摩一下每段代码的实现,只要你能完整理解这节课的内容,那你就完成了这个课程的学习任务了。

思考题

这里是本节课推荐的相关 issue 的任务列表,请点击查看 《Good First Issue》。另外欢迎给我的项目 https://github.com/robustmq/robustmq 点个 Star 啊!