apache / dubbo-rust

Apache Dubbo Rust implementation
https://dubbo.apache.org/
Apache License 2.0
273 stars 63 forks source link

Add extension module #180

Closed onewe closed 5 months ago

onewe commented 6 months ago

一、背景

Extension 模块在 Dubbo 中是用来做扩展,插件化的重要模块. 在 JavaExtension 使用 SPI 机制来实现. 受限于 Rust 语言层面的限制, 迫切需要一个全局使用、注册各种插件、根据 Url 为数据模型来加载Extension 的加载器。

二、设计方案

​ 从用户使用的角度出发, 设想一下以下的使用方式:

// 注册 registry 插件
let _ = dubbo::extension::EXTENSIONS.register<NacosRegistry>().await;

// 加载 registry 插件
let registry_url = Url;
let _ = dubbo::extension::EXTENSIONS.load_registry(registry_url).await;

// 注册 failover cluster 插件
let _ = dubbo::extension::EXTENSIONS.register<FailoverCluster>().await;

// 加载 failover cluster 插件
let failover_cluster_url = Url;
let _ = dubbo::extension::EXTENSIONS.load_cluster(failover_cluster_url).await;

​ 从上面的伪代码可以看出, 需要设计一个全局静态、不可变的 EXTENSIONS 变量, 方便用户或者框架内部使用. EXTENSIONS 至少需要 2 个方法, 一个方法用于注册插件,另一个则是用来加载插件. 根据上面的需求, 可以快速得到代码原型:

// 全局静态、不可变
pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectory> =
    once_cell::sync::Lazy::new(|| ExtensionDirectory);

struct ExtensionDirectory;

impl ExtensionDirectory {

  pub async fn register<T>(&self) -> Result<(), StdError> {}

  pub async fn load_registry(&self, url: Url) -> Result<?, StdError> {}
}

​ 这里ExtensionDirectory 有 2 个问题:

  1. 当用户使用 register 方法注册的时候, 为了方便用户使用, 这里直接采用了一个泛型 T , 意味需要抽象出 Extension, 这样才能通过泛型的约束, 在编译期获取 Extension 一些相关信息, 来实现注册。
  2. 当用户或者框架本身使用 load_registry (这里只是为了方便举例, 后续随着扩展点的增多, 可能还有 load_clusterload_route等方法)方法来加载指定插件时, 这个返回值(也就是 Registry 插件), 需要一个抽象, 这样才能统一返回一个通用的对象, 例如: Box<dyn Registry + Send + 'static'>

为了解决这 2 个问题, 需要抽象出 ExtensionRegistry(这里只是为了方便举例, 后续随着扩展点的增多, 可能还有 ClusterRoute 等)

2.1 Extension Trait

Extension 应该具有什么功能? 作为 dubbo 框架本身的扩展点, 最核心的 3 个功能应该是: 插件唯一标识符号、 插件的构造函数 和 插件的类型.

2.1.1 插件唯一标识符

​ 相当于是插件的名称, 需要在同类型插件中保证唯一即可. 例如 Registry 插件可以有很多实现, 可以是 NacosRegistryZookeeperRegistryStaticRegistry. 所以需要在 Registry 类型插件中名称需要保证唯一, 防止插件注册被覆盖.

2.1.2 插件的类型

​ 需要插件的类型信息来执行不同的插件的注册、加载逻辑.

简单的实现:

#[async_trait::async_trait]
pub trait Extension {

    type Target;

    fn name() -> String;

    // 给插件提供异步初始化的能力
    async fn create(url: &Url) -> Result<Self::Target, StdError>;

    fn extension_type() -> ExtensionType;
}

pub enum ExtensionType {
    Registry,
    Cluster,
    Route,
}

2.1.3 插件的构造函数

​ 为简化用户插件注册过程, 只需要用户提供一个类型信息即可完成插件的注册, 所以需要用户提供一个构造方法, 以便于在运行过程中根据不同的 Url 动态的构造插件对象.

​ 不同类型的插件, 构造函数的签名都差不多, 唯一的不同是在返回值上. 所以这里需要使用枚举来统一. 构造函数这里的作用与 Java 中的 Factory 的功能其实是差不多的.

// 枚举
pub enum ExtensionFactories {
    RegistryExtensionFactory(RegistryExtensionFactory),
}

// 以 Registry 来举例
type RegistryConstructor = for<'a> fn(
    &'a Url,
) -> Pin<
    Box<dyn Future<Output = Result<Box<dyn Registry + Send + 'static>, StdError>> + Send + 'a>,
>;

// RegistryProxy 在后续会进行说明
pub struct RegistryExtensionFactory {
    constructor: RegistryConstructor,
    instances: HashMap<String, RegistryProxy>,
}

impl RegistryExtensionFactory {
  pub async fn create(&mut self, url: &Url) -> Result<RegistryProxy, StdError> {
    todo!()
    }
}

RegistryExtensionFactory 的基本逻辑是通过一个 url 来构造插件, 如果之前已经创建过了, clone 一个实例返回, 不需要重复创建.

​ 为了后续方便使用 RegistryExtensionFactory, 需要引入另外的 trait 来进行转换

pub trait ConvertToExtensionFactories {
    fn convert_to_extension_factories() -> ExtensionFactories;
}

// 给所有实现 Registry 和 Extension 的实现 ConvertToExtensionFactories
impl<T> ConvertToExtensionFactories for T
where
    T: Registry + Send + 'static,
    T: Extension<Target = Box<dyn Registry + Send + 'static>>,
{
    fn convert_to_extension_factories() -> ExtensionFactories {
        fn constrain<F>(f: F) -> F
        where
            F: for<'a> Fn(
                &'a Url,
            ) -> Pin<
                Box<
                    dyn Future<Output = Result<Box<dyn Registry + Send + 'static>, StdError>>
                        + Send
                        + 'a,
                >,
            >,
        {
            f
        }

        let constructor = constrain(|url: &Url| {
            let f = <T as Extension>::create(url);
            Box::pin(f)
        });

        ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(constructor))
    }
}

2.2 Registry Trait

Registry 插件需要有什么功能呢? 首先必须明确的一点是, 要实现 Registry 必须得实现 Extension , 因为 RegistryExtension 的一种, 但这个约束太强了. Extension 可以变成可选项, 只有用户需要注册插件到 dubbo 框架中才需要实现 Extension. 但是 Extension 并不是任何对象都能实现的, 必须得是 dubbo 中某个插件的具体实现才能够实现 Extension.

​ 那 Registry 的核心功能是啥呢? 无外乎是注册服务、服务下线、订阅和取消订阅服务

pub type ServiceChange = Change<String, ()>;
pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
#[async_trait]
pub trait Registry {
    async fn register(&self, url: Url) -> Result<(), StdError>;

    async fn unregister(&self, url: Url) -> Result<(), StdError>;

    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;

    async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;

    fn url(&self) -> &Url;
}

pub(crate) trait Sealed {}

#[async_trait::async_trait]
pub trait Extension: Sealed {

    type Target;

    fn name() -> String;

    // 给插件提供异步初始化的能力
    async fn create(url: &Url) -> Result<Self::Target, StdError>;

    fn extension_type() -> ExtensionType;
}

pub enum ExtensionType {
    Registry,
    Cluster,
    Route,
}

​ 这样看起来比较好点, 只有实现了 Sealed 才能够实现 Extension, 因为 Sealed 的可见性是 crate, 所以需要一个小 trick 来让自定义插件的用户可以实现 Extension

pub type ServiceChange = Change<String, ()>;
pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
#[async_trait]
pub trait Registry {
    async fn register(&self, url: Url) -> Result<(), StdError>;

    async fn unregister(&self, url: Url) -> Result<(), StdError>;

    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;

    async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;

    fn url(&self) -> &Url;
}

pub(crate) trait Sealed {}

#[async_trait::async_trait]
pub trait Extension: Sealed {

    type Target;

    fn name() -> String;

    // 给插件提供异步初始化的能力
    async fn create(url: &Url) -> Result<Self::Target, StdError>;

    fn extension_type() -> ExtensionType;
}

pub enum ExtensionType {
    Registry,
    Cluster,
    Route,
}

// 只要实现了 Registry 都自动实现 Sealed
impl<T> Sealed for T where T: Registry + Send + 'static {}

​ 为所有实现了 Registry 的都自动实现 Sealed, 通过这样的方式用户就可以自由的实现 Extension. 上面的代码还可以改进一下,

extension_type 这个方法其实没必要写在 Extension 中, 如果放在 Extension 中就会导致相同类型的插件不同的实现重复编写 extension_type 方法的实现. 可以把 extension_type 单独拿出来

pub type ServiceChange = Change<String, ()>;
pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
#[async_trait]
pub trait Registry {
    async fn register(&self, url: Url) -> Result<(), StdError>;

    async fn unregister(&self, url: Url) -> Result<(), StdError>;

    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;

    async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;

    fn url(&self) -> &Url;
}

pub(crate) trait Sealed {}

#[async_trait::async_trait]
pub trait Extension: Sealed {

    type Target;

    fn name() -> String;

    // 给插件提供异步初始化的能力
    async fn create(url: &Url) -> Result<Self::Target, StdError>;
}

pub enum ExtensionType {
    Registry,
    Cluster,
    Route,
}

// 只要实现了 Registry 都自动实现 Sealed
impl<T> Sealed for T where T: Registry + Send + 'static {}

pub(crate) trait ExtensionMetaInfo {
    // 单独的 trait
    fn extension_type() -> ExtensionType;
}

// 给实现了 Registry 和 Extension 的类型自动实现 ExtensionMetaInfo
// 固定类型为 Registry
impl<T> ExtensionMetaInfo for T
where
    T: Registry + Send + 'static,
    T: Extension<Target = Box<dyn Registry + Send + 'static>>,
{
    fn extension_type() -> ExtensionType {
        ExtensionType::Registry
    }
}

2.3 RegistryProxy

​ 前面把 Registry 给抽象出来了, 可以使用 Box<dyn Proxy + Send + 'static'> 来表示任意的 Registry, 但这种方式呢, 不能clone, clone 在插件设计中也是一个比较重要的能力, 有了 clone 能力, 就可以在任意的场景中使用, 灵活度比较高.

​ 使用 channel 来给 Box<dyn Proxy + Send + 'static'> 增加 clone 能力

registry_proxy

pub mod proxy{

     pub(super) enum RegistryOpt {
        Register(Url, oneshot::Sender<Result<(), StdError>>),
        Unregister(Url, oneshot::Sender<Result<(), StdError>>),
        Subscribe(Url, oneshot::Sender<Result<DiscoverStream, StdError>>),
        UnSubscribe(Url, oneshot::Sender<Result<(), StdError>>),
    }

     #[derive(Clone)]
    pub struct RegistryProxy {
        sender: tokio::sync::mpsc::Sender<RegistryOpt>,
        url: Url,
    }

    impl RegistryProxy {

        async fn register(&self, url: Url) -> Result<(), StdError> {
            todo!()
        }

        async fn unregister(&self, url: Url) -> Result<(), StdError> {
            todo!()
        }

        async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
            todo!()
        }

        async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
            todo!()
        }

        fn url(&self) -> &Url {
           todo!()
        }
    }

}

​ 这样一来 RegistryProxy 就可以无所顾及的 clone了.

3.1 ExtensionDirectory

​ 前面把基础抽象给做了,回到 ExtensionDirectory 设计上来.

// 全局静态、不可变
pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectory> =
    once_cell::sync::Lazy::new(|| ExtensionDirectory);

struct ExtensionDirectory;

impl ExtensionDirectory {

  pub async fn register<T>(&self) -> Result<(), StdError>  
  where
      T: Extension,
      T: ExtensionMetaInfo,
      T: ConvertToExtensionFactories,
  {
    todo!()
  }

  pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, StdError> {
    todo!()
  }
}

​ 大体结构就差不多了, 但这里还存在一个问题. 注册插件总得要把注册的东西放在一个地方, ExtensionDirectory 结构体里什么都没有, 需要一个数据结构来保存注册的东西.

​ 注册的核心逻辑就是把插件的构造函数给保存起来, 可以根据这点得到


#[derive(Default)]
pub struct RegistryExtensionLoader {
    factories: HashMap<String, RegistryExtensionFactory>,
}

impl RegistryExtensionLoader {
    pub fn new() -> Self {
        Self {
            factories: HashMap::new(),
        }
    }

    pub async fn register(
        &mut self,
        extension_name: String,
        factory: RegistryExtensionFactory,
    ) {
        todo!()
    }

    pub async fn load(&mut self, url: &Url) -> Result<RegistryProxy, StdError> {
       todo!()
    }
}

RegistryExtensionLoader 里的 factories 存放 Registry 不同实现的 factory, 所以 Extension 的唯一标识符是比较重要的.

// 全局静态、不可变
pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectory> =
    once_cell::sync::Lazy::new(|| ExtensionDirectory);

struct ExtensionDirectory {
    registry_extension_loader: RegistryExtensionLoader,
}

impl ExtensionDirectory {

  pub async fn register<T>(&self) -> Result<(), StdError>  
  where
      T: Extension,
      T: ExtensionMetaInfo, // 根据 ExtensionMetaInfo 得到类型信息, 来执行不同的注册逻辑例如: registry 执行 RegistryExtensionLoader, cluster 执行 ClusterExtensionLoader ...etc
        T: ConvertToExtensionFactories,
  {
    todo!()
  }

  pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, StdError> {
    todo!()
  }
}

​ 注意 ExtensionDirectory 里使用的是不可变引用, 而 RegistryExtensionLoader 使用的是可变引用. 为了解决这个问题, 需要再次引入 channel

pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectoryCommander> =
    once_cell::sync::Lazy::new(|| ExtensionDirectory::init());

#[derive(Default)]
struct ExtensionDirectory {
    registry_extension_loader: registry_extension::RegistryExtensionLoader,
}

impl ExtensionDirectory {
    fn init() -> ExtensionDirectoryCommander {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExtensionOpt>(64);

        tokio::spawn(async move {
            let mut extension_directory = ExtensionDirectory::default();

            while let Some(extension_opt) = rx.recv().await {
                match extension_opt {
                    ExtensionOpt::Register(
                        extension_name,
                        extension_factories,
                        extension_type,
                        tx,
                    ) => {
                        let result = extension_directory
                            .register(extension_name, extension_factories, extension_type)
                            .await;
                        let _ = tx.send(result);
                    }
                    ExtensionOpt::Load(url, extension_type, tx) => {
                        let result = extension_directory.load(url, extension_type).await;
                        let _ = tx.send(result);
                    }
                }
            }
        });

        ExtensionDirectoryCommander { sender: tx }
    }

    async fn register(
        &mut self,
        extension_name: String,
        extension_factories: ExtensionFactories,
        extension_type: ExtensionType,
    ) -> Result<(), StdError> {
        match extension_type {
            ExtensionType::Registry => match extension_factories {
                ExtensionFactories::RegistryExtensionFactory(registry_extension_factory) => {
                    self.registry_extension_loader
                        .register(extension_name, registry_extension_factory)
                        .await;
                    Ok(())
                }
            },
        }
    }

    async fn load(
        &mut self,
        url: Url,
        extension_type: ExtensionType,
    ) -> Result<Extensions, StdError> {
        match extension_type {
            ExtensionType::Registry => {
                let extension = self.registry_extension_loader.load(&url).await;
                match extension {
                    Ok(extension) => Ok(Extensions::Registry(extension)),
                    Err(err) => {
                        error!("load extension failed: {}", err);
                        Err(err)
                    }
                }
            }
        }
    }
}

pub struct ExtensionDirectoryCommander {
    sender: tokio::sync::mpsc::Sender<ExtensionOpt>,
}

impl ExtensionDirectoryCommander {
    pub async fn register<T>(&self) -> Result<(), StdError>
    where
        T: Extension,
        T: ExtensionMetaInfo,
        T: ConvertToExtensionFactories,
    {
       todo!()
    }

    pub async fn remove<T>(&self) -> Result<(), StdError>
    where
        T: Extension,
        T: ExtensionMetaInfo,
    {
        todo!()
    }

    pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, StdError> {
        todo!()
    }
}

enum ExtensionOpt {
    Register(
        String,
        ExtensionFactories,
        ExtensionType,
        oneshot::Sender<Result<(), StdError>>,
    ),
    Load(
        Url,
        ExtensionType,
        oneshot::Sender<Result<Extensions, StdError>>,
    ),
}

整体结构如下

all