FederatedAI / FATE

An Industrial Grade Federated Learning Framework
Apache License 2.0
5.65k stars 1.55k forks source link

FATE2.0读取外部数据源 #5669

Open FancyXun opened 1 month ago

FancyXun commented 1 month ago

请教个问题,目前官方给的例子都是基于内置的数据进行训练预测的,在实际生产中,我们会从数据库,比如Mysql读取数据,请问fate2.0支持从外部读取数据吗?我看有个table bind,但不知道具体用法,这个怎么把外部的数据库信息给到fate呢

sagewe commented 1 month ago

目前FATE端不支持直接从mysql中读取数据,原因之一是不同的计算引擎支持的存储数据格式不一样,比如spark支持文件跟hdfs,eggroll支持的是自己的格式(底层是lmdb)。我们在之前的版本中有采取转换的思路,这个版本可能还没有适配? @zhihuiwan

sagewe commented 1 month ago

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

FancyXun commented 1 month ago

目前FATE端不支持直接从mysql中读取数据,原因之一是不同的计算引擎支持的存储数据格式不一样,比如spark支持文件跟hdfs,eggroll支持的是自己的格式(底层是lmdb)。我们在之前的版本中有采取转换的思路,这个版本可能还没有适配? @zhihuiwan

感谢回复,我看现在官方默认upload 数据的时候,里面配置对应的数据路径(csv文件),但是这个路径必须存在在fate flow里面,这样Job里面会出现tranfromer这些,我理解就是你说的转成imdb格式。

FancyXun commented 1 month ago

我理解原始数据最终都是要转成fate能读取的数据格式lmdb,现在都是直接从内置的fate flow 里面读取csv文件进行转换,我的述求就是如何读取外部的数据库,进行转换也可。我理解直接1.x版本是可以 table bind 一个外部数据源,比如mysql这样。@sagewe

sagewe commented 1 month ago

我理解原始数据最终都是要转成fate能读取的数据格式lmdb,现在都是直接从内置的fate flow 里面读取csv文件进行转换,我的述求就是如何读取外部的数据库,进行转换也可。我理解直接1.x版本是可以 table bind 一个外部数据源,比如mysql这样。@sagewe

是的,这个后续版本会有支持,是我们推进容器化支持的一部分

FancyXun commented 1 month ago

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

sagewe commented 1 month ago

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

从这里改可能更简单: https://github.com/FederatedAI/FATE/blob/0e36edc936394331dbefa868eec236808fadbd62/python/fate/components/core/component_desc/artifacts/data/_table.py#L46-L53

class TableReader(_ArtifactTypeReader):
    def read(self):
        self.artifact.consumed()
        if self.artifact.uri.scheme == "mysql":
            from sqlalchemy import create_engine
            import copy

            database, table = self.artifact.uri.path_splits()
            database_uri = copy.deepcopy(self.artifact.uri)
            database_uri.path.replace(f"/{table}", "")
            engine = create_engine(database_uri.to_string(), echo=True)
            with engine.connect() as con:

                rs = con.execute(f'SELECT * FROM {table}')
                def get_data():
                    for row in rs:
                        # TODO: process row
                        yield ...

                table = self.ctx.computing.parallelize(
                    data=get_data(),
                    partition=16,
                )
                table.schema = self.artifact.metadata.metadata.get("schema", {})
                return table

        return self.ctx.computing.load(
            uri=self.artifact.uri,
            schema=self.artifact.metadata.metadata.get("schema", {}),
            options=self.artifact.metadata.metadata.get("options", None),
        )

你可能会碰到的问题:

  1. flow是否能传递mysql uri进来?这个需要 @zhihuiwan 来给你相应的指导
  2. 读mysql是单线程的,数据量大了可能有点慢

在这个位置实现的利弊

FancyXun commented 1 month ago

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

从这里改可能更简单:

https://github.com/FederatedAI/FATE/blob/0e36edc936394331dbefa868eec236808fadbd62/python/fate/components/core/component_desc/artifacts/data/_table.py#L46-L53

class TableReader(_ArtifactTypeReader):
    def read(self):
        self.artifact.consumed()
        if self.artifact.uri.scheme == "mysql":
            from sqlalchemy import create_engine
            import copy

            database, table = self.artifact.uri.path_splits()
            database_uri = copy.deepcopy(self.artifact.uri)
            database_uri.path.replace(f"/{table}", "")
            engine = create_engine(database_uri.to_string(), echo=True)
            with engine.connect() as con:

                rs = con.execute(f'SELECT * FROM {table}')
                def get_data():
                    for row in rs:
                        # TODO: process row
                        yield ...

                table = self.ctx.computing.parallelize(
                    data=get_data(),
                    partition=16,
                )
                table.schema = self.artifact.metadata.metadata.get("schema", {})
                return table

        return self.ctx.computing.load(
            uri=self.artifact.uri,
            schema=self.artifact.metadata.metadata.get("schema", {}),
            options=self.artifact.metadata.metadata.get("options", None),
        )

你可能会碰到的问题:

  1. flow是否能传递mysql uri进来?这个需要 @zhihuiwan 来给你相应的指导
  2. 读mysql是单线程的,数据量大了可能有点慢

在这个位置实现的利弊

  • 好处是对引擎透明
  • 缺点是如果底层引擎有更好的实现无法发挥,但是可以根据未来需要通过简单的接口重构克服

非常感谢你提供的思路,这个我后续可以看看如何实现,目前使用了一个比较简单快捷的办法https://github.com/FederatedAI/FATE-Flow/pull/574