以终为始:手写开源消息队列最终成果展示

本课程为精品小课,不标配音频

你好,我是文强。

在课程的最开始,我想先带你了解一下我们的最终产出是什么,也就是这个消息队列它是什么样子的,还会分享一些我自己在学习 Rust 这门语言过程中的经验,帮助你大胆入门。

对比经典的消息队列

开篇词中我分享过,本系列课程的目标是完成一个分布式基础软件:消息队列。所以我们不妨先来看一张经典的消息队列系统架构图。

图片

如上图所示,消息队列在架构上分为 客户端服务端集群消费端 三大部分。我们这门课要实现的是服务端集群这部分。相当于现在社区主流的消息队列,比如 RocketMQ、Kafka、RabbitMQ 、Pulsar等。

从架构的角度,服务端集群都是由 Broker 集群和元数据集群两部分组成。Broker 集群负责消息数据的读写,元数据集群负责 Broker 集群元数据的管理和部分 Broker 集群的管控、调度操作。

从实现来看,Broker 集群可以分为计算层和存储层,计算层负责消息队列相关逻辑的处理,存储层负责消息数据的持久化存储。

在当前主流的消息队列中,Broker 集群和元数据集群的组成关系如下:

图片

我们在开篇词中讲了,我们希望做成一个很牛逼的消息队列。那你可能有疑问,既然有这么多成熟的消息队列了,为什么还要再写一个?会有什么优势?你可以去我的 项目官网 中找找答案,也欢迎在留言区与我交流!

不过这里我更想强调的是,我们不是在做一个玩具,这也不是一个单纯练手的项目。 项目的一切设计和实现,都是按照标准工业级别的开源基础软件要求来设计和落地的。

那我们最终会做成一个什么样子的消息队列呢?

最终作品:云原生 Serverless 消息队列

先来下个定义:

目标是基于 Rust 实现可以兼容多种主流消息队列协议、架构上具备完整 Serverless 能力的消息队列。

从定义来看,你要围绕着“ 兼容多种主流消息队列协议”和“ 架构上具备完整 Serverless 能力”这两个点来理解我们后续的架构设计。

先看一下我们最终要完成的消息队列系统架构图。

图片

如上图所示,从设计上来看,我们要完成的 MQ 也是由 Broker 集群(计算层 + 存储层)和元数据集群两部分组成。元数据集群对应图中的 Placement Center,Broker 集群也分为计算层和存储层,每个部分具备分布式集群化部署、快速水平扩容的能力。

和上面的表格关系对应如下:

图片

了解了大致的系统架构,接下来我们放一张 MQ 最终架构的详细设计图,针对以上三部分做了展开,你可以尝试理解一下。因为这门课并不能覆盖全部实现,所以这里就不详细展开讲了,有需要的话,我们可以在评论区讨论。

图片

值得一提的是,从实现来看,整个 MQ 的实现是 100% 用 Rust 开发完成的。 在系列课程的第一阶段,我将会带你完成元数据集群(Placement Center)主体功能的开发

第一阶段作品:元数据存储服务

在我们的设计中,元数据集群功能分为两部分: 元数据存储集群调度。

元数据存储部分,你可以把它理解为一个分布式的 KV 存储引擎(类似 ZooKeeper),集群调度可以理解为在存储引擎之上,实现了对 Broker 集群的一些管控、调度逻辑。

所以,你可以把第一阶段的课程,简单理解为我们在实现一个 分布式的 KV 存储引擎。接下来,我们来看下元数据服务的详细架构图,从而来拆解我们在这个阶段的课程要做哪些事情。

图片

如上图所示,我们把元数据集群命名为 Placement Center。它是一个由多台 Server 组成的、基于 Raft 协议构建的集群,每台 Server 的存储层是基于 RocksDB 构建的 KV 存储模型。所以我们在实现元数据集群的过程中,就需要实现网络 Server、单机存储、分布式集群三个大模块。在接下来的课程中,我们会详细讲解各个部分的设计。

我们都知道,Rust 的学习经验陡峭。那么如果用 Rust 开发完成消息队列这种复杂的基础软件,需要先具备哪些能力呢?具体的知识点储备有很多,下节课我会集中整理,这里我想先分享一下我个人在 Rust 上的学习实践经验,你也可以在心里建立一个预期。

Rust 学习实践经验分享

在我看来,说 Rust 这个语言学习曲线陡峭的原因在于: 它的语法和语言特性和其他主流语言是有很大区别的,并且语法和特性都比较复杂。比如它通过生命周期来代替垃圾回收(GC),就衍生了所有权、借用、各种智能指针、Sync/Send/.await 等多线程编程语法等等的概念和用法。

在我看来,Rust 是拔高了其语言学习本身的成本,换来了其高性能和高安全性。这也是它现在被很多人推崇的原因。也就是: 学会了,就很好用

在我自己学习实践 Rust 的过程中,我总结了四条经验。

经验一:学习 Rust 语言的基础知识点,主要选择《Rust语言圣经》这份资料就够了,然后需要具备检索和举一反三的能力。

在我开始学习 Rust 时,我买了很多 Rust 的书籍。但是我发现,这些书籍的学习效果都不是特别理想。整个过程下来,我个人对 Rust 语言本身基础知识点的学习,80% 都来源于 《Rust语言圣经》。这本书,你需要重点关注第 十一 等6个章节。这里包含了 Rust 编程会用到的几乎所有知识点。

从表达上来看,它也更适合中文的阅读习惯。它的问题是有的点讲得不够详细,这就需要通过其他资料来扩充,比如这三份:

  1. 基础知识点补充: 《Rust 程序设计语言-中文版》《Rust 程序设计语言-英文原版》 是官方的 Rust 书,对每个知识点讲得很细。只是这里面的内容大部分在《Rust语言圣经》中已经讲到了,所以不建议直接看这本书,而是把它作为补充,在看《Rust语言圣经》有不明白的地方,就按照目录索引来这里找找有没有想要的答案。

  2. Tokio:你只要用 Rust 编程,Tokio 就一定要掌握。而学习 Tokio,主要看这两个文档: 《Tokio 官网》《Rust异步编程和Tokio框架》。你一定要掌握 Tokio 的几个主要知识点:Runtime、Task、Network、Channel、Mutex、RwLock、Notify、Barrier、Semaphore。在实践中非常常用。

  3. 宏编程:宏编程作为 Rust 的一个主要特性,它主要是用来简化重复代码的。在很多开源项目中会大量使用宏,但是在自己的项目中,宏的使用不是必须的。不过从学习的角度来看,宏是必须掌握的,只是优先级比较低,因为不掌握它有时很难看懂一些成熟的 Rust 项目。宏的学习我推荐 《Rust 宏小册》

经验二:刚开始学习时,不要试图一遍记住某个知识点的所有内容,只要做到记住这个知识点大概是做什么的,有什么能力即可。

在学习 Rust 语法阶段,如果看每个知识点都试图完全理解的话,那是非常痛苦的,也是不现实的。因为Rust 的很多语法很复杂,只有在实践中才能真正领会。所以在这个阶段不要在一个知识点上死磕,学习时只要知道:大概有这么个东西,它大概是怎么用的,它的资料在哪里就够了。

不同的知识点联动起来才有意义,才能体现出作用,才能更好地被记住。而联动的最佳方式是带着目的和需求,不断地复习、实践,反复多次,才能彻底掌握。在我看来,这就是学习 Rust 最高效的方式。

经验三:在深入学习时,一定要学习和实践反复切换着来,在写的同时不断地反复去回顾前面的知识点。

相信大多数人会遇到类似的问题,在学完了基础语法后,好像懂了,又好像什么都不懂。然后可能有人就放弃了。

在完成基础学习的阶段后,最需要的是选择一个适合自己的项目来提升 Rust 编程的功力。现在业界有很多简单的项目可以练习,比如编写命令行工具、Web Server、简单的 KV 存储。

但这类项目很难让我们彻底熟练地掌握 Rust。 因为它不是一个真正意义上的业务需求,很难将 Rust 那些核心特性、语法用上,并且做到精益求精。

此时最好的方式是找一个成熟的开源项目,学习它的实现,参与它的工作,跟着它一起成长。但最大的问题是:开源项目往往比较复杂,参与起来需要投入大量精力,并且很多任务并不适合初学者。 这也是本系列课程希望解决的问题

经验四:不仅仅是 Rust。保持耐心,带着目的去学习。用好工具。

想学好 Rust,就不应该将它仅仅看作是学习 Rust。我们要知道,Rust 只是一门编程语言,说白了,它也只是一个工具,你可以把它理解为一个做木工的锤子。通常意义上说,学习 Rust,就是学习它的基础语法、特性、语法糖,也就是去学这个锤子本身怎么用。

而真正能做好家具,还得配合其他计算机领域的知识点。所以在学习 Rust 的过程中,我们会自然而然地接触到网络、存储、操作系统、分布式系统设计等等这些知识点。你要学会 Rust 本身,并且学会这些相关知识点,才算真正学会 Rust。

最重要的是,在学习 Rust 的时候,你一定一定要保持耐心。我个人从开始学习 Rust 到真正有入门的体验,大概花了四个月的时间。这四个月是在保持平均每天至少有两个小时的投入,带着问题和目的去学习的状态下。作为一个老研发,这是让我比较惊讶的一点,因为之前我对一门新语言的定义是,学个两三天就能产出了。

另外,就是善于利用现有的一些平台和工具。在今天,Rust 的资料和开源项目已经比较齐全了。我给你推荐两个常用的工具和相关论坛。

  1. Crates.io:这是各种 Rust 开源库的管理平台,类似 Java 的 Maven。这里每个仓库都有非常详细的说明和示例。另外有个小技巧是,如果在 Crates.io 对某个库没有详细说明,那么就可以直接跳转到库对应的 GitHub 仓库。一个成熟的开源库,在 GitHub 仓库都会有很详细的使用说明。

  2. awesome-rust:这是一个 Rust 成熟开源项目的集合索引项目,它整合了 Rust 领域有影响力的项目。你可以在这里找到自己感兴趣的项目,把源码下载下来,学习它们的语法和实现方式。看成熟项目的代码,是一个效率很高的学习方式,也是我常用的。因为在学习了很多知识点后,其实很难一下子去融会贯通,而看别人写的代码,再通过这些写法去理解这些语法,效率就很高。

比如看到这段代码,你会想到什么呢?

#![allow(unused)]
fn main() {
async fn report_broker_sysdescr<S>(
    client_poll: &Arc<ClientPool>,
    metadata_cache: &Arc<CacheManager>,
    message_storage_adapter: &Arc<S>,
) where
    S: StorageAdapter + Clone + Send + Sync + 'static,
{
    let topic_name = replace_topic_name(SYSTEM_TOPIC_BROKERS_SYSDESCR.to_string());
    let info = format!("{}", os_info::get());
    if let Some(record) = MQTTMessage::build_system_topic_message(topic_name.clone(), info) {
        write_topic_data(
            &message_storage_adapter,
            &metadata_cache,
            &client_poll,
            topic_name,
            record,
        )
        .await;
    }
}

}

你需要看懂这段代码使用了哪些Rust 语法,比如 Arc、where、S、&等等。

  1. Rust 语言中文社区Rust 官方论坛:这是我经常逛的两个 Rust 论坛。一个是国内中文的论坛,基本可以了解到国内 Rust 这个领域最新发生的一些事情,看看大家都在做什么。一个是 Rust 官方的英文论坛,里面会有很多 Rust 语言本身的比如特性、Bug 等相关的讨论。如果想学好 Rust,建议养成日常浏览这两个论坛的习惯,毕竟抬头看天,看一下业界都在做什么是很重要的。

希望这些经验之谈,能为你学好 Rust 这门语言扫清一些障碍。同时,这门课程其实也是在记录我通关的过程,基于我所走过的弯路,经过系统的梳理和总结,并结合真实的工程实践,去拉平 Rust 的学习曲线。

不妨先体验一下,我们用一个 Trait 的例子来入门 Rust 编程。

从一个 Trait 的例子开始

下面这个代码是一个实际的业务需求。

在持久化存储数据的时候,数据存储需要支持不同的存储引擎,比如 Redis、本地文件、MySQL 等等。此时如何用 Rust 来实现这个存储层,适配不同类型的存储,该怎么写?再加一个条件,这个存储层需要能在多线程环境下运行。

接下来,我们看一下这个适配多个存储引擎的存储层的主要代码。你要重点关注 build_driver 方法。

  1. 定义 AuthStorageAdapter Trait
#![allow(unused)]
fn main() {
pub trait AuthStorageAdapter {
    async fn read_all_user(&self) -> Result<DashMap<String, MQTTUser>, MQError>;

    async fn get_user(&self, username: String) -> Result<Option<MQTTUser>, MQError>;
}

}
  1. AuthStorageAdapter 的实现:PlacementAuthStorageAdapter
#![allow(unused)]
fn main() {
pub struct PlacementAuthStorageAdapter {
}

impl PlacementAuthStorageAdapter {
    pub fn new() -> Self {
        return PlacementAuthStorageAdapter {};
    }
}

#[async_trait]
impl AuthStorageAdapter for PlacementAuthStorageAdapter {
    async fn read_all_user(&self) -> Result<DashMap<String, MQTTUser>, RobustMQError> {
        return Ok(DashMap::with_capacity(2));
    }

    async fn get_user(&self, username: String) -> Result<Option<MQTTUser>, RobustMQError> {
        return Ok(None);
    }
}

}
  1. AuthStorageAdapter 的实现:MySQLAuthStorageAdapter
#![allow(unused)]
fn main() {
pub struct MySQLAuthStorageAdapter {
}

impl MySQLAuthStorageAdapter {
    pub fn new() -> Self {
        return PlacementAuthStorageAdapter {};
    }
}

#[async_trait]
impl AuthStorageAdapter for MySQLAuthStorageAdapter {
    async fn read_all_user(&self) -> Result<DashMap<String, MQTTUser>, RobustMQError> {
        return Ok(DashMap::with_capacity(2));
    }

    async fn get_user(&self, username: String) -> Result<Option<MQTTUser>, RobustMQError> {
        return Ok(None);
    }
}

}
  1. build_driver 方法:通过 Arc<dyn AuthStorageAdapter + Send + 'static + Sync> 返回一个可以在多线程间共享的 AuthStorageAdapter Trait 实现
#![allow(unused)]
fn main() {
pub fn build_driver() -> Result<Arc<dyn AuthStorageAdapter + Send + 'static + Sync>, RobustMQError> {
    if storage_is_placement(&auth.storage_type) {
        let driver = PlacementAuthStorageAdapter::new();
        return Ok(Arc::new(driver));
    }

    if storage_is_mysql(&auth.storage_type) {
        let driver = MySQLAuthStorageAdapter::new();
        return Ok(Arc::new(driver));
    }

    return Err(RobustMQError::UnavailableStorageType);
}

}

上面这段代码,我们先不展开细讲,后续课程都会涉及。但我可以先告诉你,我们的代码中都使用了哪些 Rust 知识点。

  • Rust 基础语法(变量、类型、函数、流程控制等)
  • 特征 Trait
  • 智能指针 Arc、Box
  • 特征对象 Dyn
  • 生命周期-静态变量 'static
  • 多线程编程 Send、Sync

短短的这么一段代码,几乎就囊括了 Rust 语法的核心部分。你看,这就是动手实践的意义!

总结

这节课我分享了最终作品的架构是什么样子的,明确了我们在系列课程的第一阶段要完成的元数据服务(也就是类 ZooKeeper 的分布式协调服务)的系统架构。同时也分享了很多我在学习实践 Rust 过程中的经验。

最后我想强调的是,学习 Rust 的最好方式是带着目的去学习,以终为始。通过需求来组合各个知识点,不要死记硬背,更不要抄代码,要先理解需求,理解 Rust 语法,然后再去写代码。

思考题

课程中的 build_driver 这段代码是什么意思?重点解释:Result<Arc<dyn AuthStorageAdapter + Send + 'static + Sync>, RobustMQError> 这句代码的作用。

#![allow(unused)]
fn main() {
pub fn build_driver(
    client_poll: Arc<ClientPool>,
    auth: Auth,
) -> Result<Arc<dyn AuthStorageAdapter + Send + 'static + Sync>, RobustMQError> {
    return Err(RobustMQError::UnavailableStorageType);
}

}

期待你的分享,如果今天的课程对你有所帮助,也欢迎你转发给有需要的同学,我们下节课再见!