Open xinyiZzz opened 11 months ago
Arrow Flight SQL is much faster than Pymysql and other MySQL protocol-based libraries in all column types, with more than 20 times performance improvement in most large data volume reading scenarios, and 100 times performance improvement in some scenarios.
Compared with the traditional jdbc:mysql connection method, the performance of reading data from Doris using three Arrow Flight SQL connection methods was tested. The test code can be found by searching Arrow Flight in https://github.com/apache/doris-websit. jdbc:mysql and jdbc:arrow-flight-sql both return data in JDBC ResultSet format, and Flight AdbcDriver and Flight JdbcDriver both return data in Arrow format.
Column type / cost (Unit: s) | JDK version | Java jdbc:mysql DriverManager | Java jdbc:arrow-flight-sql DriverManager | Java Flight AdbcDriver | Java Flight JdbcDriver | SQL | |
---|---|---|---|---|---|---|---|
1 | Int column (1000w) | JDK 1.8 | 3.772 | 0.425 | 0.568 | 0.829 | select ClientIP from clickbench.hits limit 10000000; |
JDK 17 | 3.826 | 0.451 | 0.510 | 0.756 | |||
2 | Bool column (1000w) | JDK 1.8 | 4.353 | 0.430 | 0.547 | 0.815 | select CounterClass from clickbench.hits where CounterClass!=0 limit 10000000; |
JDK 17 | 3.755 | 0.421 | 0.491 | 0.751 | |||
3 | String column (1000w) | JDK 1.8 | 9.800 | 1.103 | 1.218 | 4.519 | select URL from clickbench.hits where URL!='' limit 10000000 |
JDK 17 | 5.454 | 0.973 | 1.062 | 3.102 | |||
4 | Mixed columns (100w) | JDK 1.8 | 8.478 | 1.799 | 2.123 | 13.431 | select * from clickbench.hits limit 1000000 |
JDK 17 | 4.355 | 1.794 | 1.919 | 10.544 | |||
5 | Decimal column (60000w) | JDK 1.8 | OutOfMemoryError: Java heap space -Xms20G -Xmx40g | 23.354 | 24.288 | Cannot get simple type for type DECIMAL | select l_extendedprice from tpch.lineitem; |
JDK 17 | OutOfMemoryError: Java heap space -Xms20G -Xmx40g | 23.357 | 23.701 | Cannot get simple type for type DECIMAL | |||
6 | Decimal column (1000w) | JDK 1.8 | 4.499 | 0.654 | 0.873 | Cannot get simple type for type DECIMAL | select l_extendedprice from tpch.lineitem limit 10000000; |
JDK 17 | 3.456 | 0.660 | 0.789 | Cannot get simple type for type DECIMAL | |||
7 | DATE (60000w) | JDK 1.8 | OutOfMemoryError: Java heap space -Xms20G -Xmx40g | 24.323 | 24.559 | 122.514 | select l_commitdate from tpch.lineitem; |
JDK17 | OutOfMemoryError: Java heap space -Xms20G -Xmx40g | 23.932 | 23.919 | 124.309 | |||
8 | DATE (1000w) | JDK 1.8 | 4.554 | 0.636 | 0.864 | 2.784 | select l_commitdate from tpch.lineitem limit 10000000; |
JDK 17 | 3.226 | 0.689 | 0.838 | 2.712 | |||
9 | Mixed columns (100w) | JDK 1.8 | 1.690 | 0.892 | 1.070 | Cannot get simple type for type DECIMAL | select * from tpch.lineitem limit 1000000; |
JDK 17 | 1.061 | 0.756 | 0.919 | Cannot get simple type for type DECIMAL |
The above test can be concluded:
Compare the performance of reading data from Doris and converting it to String using different connection methods, and call resultSet.toString() or VectorSchemaRoot.contentToTSVString in the loop body of each getResultSet or loadNextBatch.
Column type / cost (Unit: s) | JDK version | Java jdbc:mysql DriverManager | Java jdbc:arrow-flight-sql DriverManager | Java Flight AdbcDriver | Java Flight JdbcDriver | SQL | |
---|---|---|---|---|---|---|---|
1 | Int column (1000w) | JDK 1.8 | 11.972 | 2.239 | 1.797 | 2.078 | select ClientIP from clickbench.hits limit 10000000; |
JDK 17 | 10.793 | 1.580 | 1.466 | 1.686 | |||
2 | Bool column (1000w) | JDK 1.8 | 6.514 | 2.337 | 1.332 | 1.581 | select CounterClass from clickbench.hits where CounterClass!=0 limit 10000000; |
JDK 17 | 5.436 | 1.601 | 1.074 | 1.343 | |||
3 | String column(1000w) | JDK 1.8 | 12.376 | 2.378 | 4.536 | 7.855 | select URL from clickbench.hits where URL!='' limit 10000000 |
JDK 17 | 7.460 | 1.766 | 5.674 | 6.709 | |||
4 | Mixed columns (100w) | JDK 1.8 | 14.957 | 1.856 | 13.97 | 26.437 | select * from clickbench.hits limit 1000000 |
JDK 17 | 7.840 | 1.818 | 13.24 | 25.421 | |||
5 | Decimal column (60000w) | JDK 1.8 | OutOfMemoryError: Java heap space -Xms20G -Xmx40g | 86.219 | 72.473 | Cannot get simple type for type DECIMAL | select l_extendedprice from tpch.lineitem; |
JDK 17 | OutOfMemoryError: Java heap space -Xms20G -Xmx40g | 49.458 | 70.62 | Cannot get simple type for type DECIMAL | |||
6 | Decimal column (1000w) | JDK 1.8 | 6.556 | 2.256 | 2.131 | Cannot get simple type for type DECIMAL | select l_extendedprice from tpch.lineitem limit 10000000; |
JDK 17 | 5.298 | 1.726 | 2.168 | Cannot get simple type for type DECIMAL | |||
7 | DATE (60000w) | JDK 1.8 | OutOfMemoryError: Java heap space -Xms20G -Xmx40g | 86.006 | 40.807 | 170.358 | select l_commitdate from tpch.lineitem; |
JDK 17 | OutOfMemoryError: Java heap space -Xms20G -Xmx40g | 49.792 | 37.610 | 169.474 | |||
8 | DATE (1000w) | JDK 1.8 | 7.117 | 2.253 | 1.532 | 3.484 | select l_commitdate from tpch.lineitem limit 10000000; |
JDK 17 | 4.753 | 1.697 | 1.365 | 3.307 | |||
9 | Mixed columns (100w) | JDK 1.8 | 2.126 | 0.992 | 3.309 | Cannot get simple type for type DECIMAL | select * from tpch.lineitem limit 1000000; |
JDK 17 | 1.264 | 0.793 | 3.109 | Cannot get simple type for type DECIMAL |
The above test can be concluded as follows:
(zh-CN)
Doris 基于 Arrow Flight SQL 协议实现了高速数据链路,支持多种语言使用 SQL 从 Doris 高速读取大批量数据。
在数据科学场景中,经常需要从 Doris 加载大量数据到 Python/Java/Spark,过去使用 Pymysql/Pandas 或 JDBC 加载大批量数据的速度很慢。
如今许多大数据系统都使用列式的内存数据格式,Mysql/JDBC/ODBC 作为与数据库系统交互的主流协议与标准,它们的性能缺陷在当今的大数据世界中愈发明显,需要将数据从系统特定的列存格式序列化为 Mysql/JDBC/ODBC 的行存格式,然后再反序列化回客户端的列存格式,这会使数据移动速度大幅降低。
如果源数据库和目标客户端都支持 Arrow 作为列式内存格式,使用 Arrow Flight SQL 协议传输将无需序列化和反序列化数据,从而消除这部分数据传输中的开销。此外 Arrow Flight 还可以利用多节点和多核架构,通过完全并行化优化吞吐能力。
Apache Arrow Flight SQL 是一个由 Apache Arrow 社区开发的与数据库系统交互的协议,用于 ADBC 客户端使用 Arrow 数据格式与实现了 Arrow Flight SQL 协议的数据库交互,具有 Arrow Flight 的速度优势以及 JDBC/ODBC 的易用性。一些基本概念如下:
Apache Arrow Apache Arrow 作为一个高效的列式内存格式,广泛用于大规模数据处理,被当今所有主流编程语言的许多库所支持。
Apache Arrow Flight Arrow Flight 是一种传输 Arrow 数据格式的RPC框架,允许不同系统间使用 Arrow 格式进行高速的数据交互。
Arrow Flight SQL Mysql/JDBC/ODBC 协议与标准虽然速度较慢,但具有简单的API供开发人员使用,为此基于 Arrow Flight 引入 Arrow Flight SQL,提供更友好的接口与数据库系统交互。
ADBC ADBC 是一个支持不同语言访问数据库的 Driver,需要数据库实现 Arrow Flight SQL 协议,类似JDBC/ODBC。
See also: Introducing Apache Arrow Flight SQL: Accelerating Database Access 介绍了 Arrow Flight SQL 的原理和实现。 https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/ Arrow Flight SQL 介绍 Arrow Flight SQL 文档,介绍了 API 和交互流程。 https://arrow.apache.org/docs/format/FlightSql.html An Introduction to Apache Arrow Flight SQL 介绍了 Arrow Flight SQL 对比 JDBC/ODBC 的优势。 https://www.dremio.com/blog/an-introduction-to-apache-arrow-flight-sql Apache Arrow ADBC ADBC 文档,介绍了不同语言 ADBC Driver 的使用方法。 https://arrow.apache.org/adbc/main/
在 Apache Doris 中,查询结果以列存格式的 Block 组织。在之前版本中,如需将这些数据通过 MySQL Client 或 JDBC/ODBC 驱动传输至目标客户端时,需要先将 Block 序列化为行存格式的 Bytes,如果目标客户端是类似 Pandas 的列存数据科学组件或列存数据库,还需将行存格式的 Bytes 再反序列化为列存格式,而序列化/反序列化操作是一个非常耗时的过程。
使用 Arrow Flight SQL,我们在 Doris 中先将列存的 Block 转为同样列存的 Arrow RecordBatch,这一步转换非常快,传输过程中无需再次序列化和反序列化,然后在 Python 客户端再将 Arrow RecordBatch 转到同样列存的 Pandas DataFrame 中,这一步转换同样非常快。
此外 Arrow Flight SQL 还提供了通用 JDBC 驱动,支持完全兼容 JDBC 标准的使用 Arrow Flight SQL 与数据库系统交互。 自此实现 Python 读取 Doris 加速,如图所示: [pic]
[pic] 1)ADBC Client 向 Doris FE发送查询请求,并在第一次请求时完成鉴权。 2)FE解析查询计划并将要执行的 Fragment 发送给 BE。 3)BE完成Fragment的 prepare 和 open 后将Arrow格式的查询结果的Schema返回给FE,并开始执行查询,将查询结果放入一个队列中。 4)FE将QueryID、查询结果的Schema、查询结果所在的BE地址(Endpoints)发回ADBC Client。 5)ADBC Client向BE请求拉取指定QueryID的查询结果。 6)BE 将队列中 Arrow 格式的查询结果返回 ADBC Client,ADBC Client对结果的Schema校验无误后完成。
Arrow 版本:13.0.0 以 ADBC Low-Level API执行流程为例: [pic]
1.1. db = adbc_driver_flightsql.connect(uri="grpc://ip:port?user=&password=") 创建一个 Database 连接器,可以同时保持多个共享的 Connection,参数:Arrow Flight Server IP、port、username、password
1.2 conn = adbc_driver_manager.AdbcConnection(db) 创建一个 Database 的链接,将触发鉴权和获取 FlightSqlInfo。 1) Auth 第一次请求Arrow Flight Server时会触发鉴权操作。 返回值是一个 Bearer Token(不记名Token)。之后每次请求 Arrow Flight Server 都会带上这个Token。 2)getFlightInfoSqlInfo 请求Arrow Flight Server返回SQL Info,包括数据库支持的SQL语法等,返回值是SQL Info的schema和endpoint,SQL Info也是一个arrow格式的数据,endpoint还是当前doris fe的flight server。 arrow flight中交互的所有数据都是arrow,通常在获取一个arrow数据前,第一次请求会先获取它的 endpoint 和 schema,并封装在一个FlightInfo中,然后将再次请求endpoint获取arrow数据并校验schema 3)getStreamSqlInfo 请求endpoint获取SQL Info,结果被包装在 ArrowArrayStream 中,关联了一个 ServerStreamListener。
1.3 stmt = adbc_driver_manager.AdbcStatement(conn) 用于保持查询的状态,可以是一次性的查询,或者prepare的语句,可以重复使用,不过之前的查询结果将失效。
1.4 stmt.set_sql_query("select * from tpch.hdf5 limit 10;")
1.5 stream, _ = stmt.execute_query() 执行 Query 返回 RecordBatchReader,被包装在一个 RecordBatchStream 中。 1)getFlightInfoStatement 返回查询结果所在的 Endpoints 和 Schema,也就是 Stream 的 Metadata。 2)getStreamStatement 返回用于读取查询结果的 RecordBatchReader。
1.6 reader = pyarrow.RecordBatchReader._import_from_c(stream.address) 使用 Stream 创建了一个 Reader。
1.7 arrow_data = reader.read_all() read_all() 会循环调用 RecordBatchReader.ReadNext() 获取查询结果的 RecordBatch。 对应代码示例:
2.1 Auth 实现 arrow.flight.auth2 相关的接口在ADBC client第一次连接时响应鉴权。提取请求header中的用户名和密码后进行身份验证,生成一个130位的Token后,将Token与用户的权限信息关联并保存到一个cache中,cache的大小和Token的过期时间可在Config中调整,最终将Token返回给ADBC client。
2.2 getFlightInfoSqlInfo 响应arrow flight sql请求返回SQL Info,实现 FlightSqlProducer.getFlightInfoSqlInfo() 和 FlightSqlProducer.getStreamSqlInfo两个方法。 Arrow Flight Server 初始化时会创建响应ADBC请求的 FlightSqlProducer,FlightSqlProducer初始化时会绑定SQL Info,包括Arrow的版本、是否支持读写、是否支持建表和修改schema等DDL语句、支持的函数列表等SQL语法等等。
2.3 getFlightInfoStatement 响应arrow flight sql请求执行 Query 并返回查询结果的 Endpoints 和 Schema,实现 FlightSqlProducer.getFlightInfoStatement方法。
3.1 Execute Fragment 执行 Fragment 并返回 Arrow Result Set Schema,整体执行流程和之前相同,区别在于 Fragment 中 ResultSinkNode 的类型不再是 MYSQL_PROTOCAL,而是 ARROW_FLIGHT_PROTOCAL,会在 Prepare 和 Open 结束后将查询结果的Arrow Schema放入一个Map中等待 FE 获取,并初始化 ArrowFlightResultWriter。 后续查询结果到达 ResultSink 后,使用 ArrowFlightResultWriter::append_block 将数据 Block 转成 Arrow 格式的 RecordBatch,然后放入一个单独的队列 BufferControlBlock 中,等待 ADBC Client拉取。
3.2 GetStatement ADBC Client 在收到 Doris FE 发回的 Endpoints 后,会请求 Endpoints 对应的位于 Doris BE 的 Arrow Flight Server 地址,Doris BE 在收到 ADBC Client 请求后,会先 Decode Ticket 后得到 SQL 和 QueryID,然后使用 QueryID 找到之前保存的查询结果的Arrow Schema并初始化一个 RecordBatchReader 返回,用于 ADBC Client 后续拉取数据,实现 FlightSqlServerBase::DoGetStatement() 方法。 此外,在 ADBC Client 第一次请求 Doris BE 的 Arrow Flight Server 时,Header 中同样会包含 Bearer Token,但 BE Arrow Flight Server 初始化时使用的 HeaderAuthServerMiddleware 和 BearerAuthServerMiddleware 都是 NoOp,即不会做任何验证,所以当前 BE Arrow Flight Server 对请求的权限验证基于 QueryID,即只要 QueryID 正确就允许 ADBC Client 读取数据。
3.3 ArrowFlightBatchReader::ReadNext ADBC Client 会循环调用之前返回的 RecordBatchReader 的 ReadNext 方法拉取数据,BE Arrow Flight Server 会使用请求中的 QueryID 从 BufferControlBlock 中拉取 Arrow 格式的 RecordBatch 并返回。
对比 Python 使用 Pymysql, Pandas 和 Arrow Flight SQL 读取 Doris 的性能: 测试结论: Arrow Flight SQL 在所有列类型上均远快于 Pymysql 等基于 Mysql 协议的库,多数大数据量读取场景下有20倍以上的性能提升,部分场景有百倍的性能提升。
对比传统的 jdbc:mysql 连接方式,测试了三种 Arrow Flight SQL 连接方式从 Doris 读取数据的性能,测试代码可以在 https://github.com/apache/doris-websit 中搜索 Arrow Flight,其中jdbc:mysql和jdbc:arrow-flight-sql都返回 JDBC ResultSet 格式的数据,Flight AdbcDriver 和Flight JdbcDriver都返回 Arrow 格式的数据。
上面的测试可以得出结论:
对比不同连接方式从 Doris 读取数据并转换为 String 的性能,在每一次getResultSet 或 loadNextBatch 的循环体中调用 resultSet.toString() 或 VectorSchemaRoot.contentToTSVString。
上面的测试可以得出结论:
Very cool feature! Can it be maintained on the confluence?
Very cool feature! Can it be maintained on the confluence?
Yes, I will maintain it later, thanks~
Very cool feature! Can it be maintained on the confluence?
Yes, I will maintain it later, thanks~
I'd love to get involved in this issue, is there anything I can do to help?
@xinyiZzz Thank you for creating this feature request and for implementing Arrow Flight in Doris. Currently (Doris 2.1.0) we have only data retrieval. Are you also planning to implement data ingestion via Arrow Flight?
@xinyiZzz Thank you for creating this feature request and for implementing Arrow Flight in Doris. Currently (Doris 2.1.0) we have only data retrieval. Are you also planning to implement data ingestion via Arrow Flight?
Hi @jpohanka, currently no plans to implement data ingestion, but expect to support in future, especially in Spark and Flink. In the past we have tested Spark via Arrow Flight load Doris and reduce data serialization time by 10 times.
Thanks for adding this! Wondering if there are plans for doing something similar on the federated query side. I think it would slot in nicely between data lake support and JDBC, allowing for the best of both worlds.
A simple use case would be to run queries against another Doris instance via ADBC instead of mysql protocol.
If someone wanted to try to implement this, what would be a good template for a partitioned / parallelisable data source? (assuming we'd want to distribute the query across multiple BEs).
Hi @aditanase , good suggestion!
We thought about using Arrow Flight SQL to implement federated query between multiple Doris clusters. to replace now use jdbc:mysql
.
For partitioned data sources, users often divide data sources according to business. for Storage and Compute Separation, it is easy to achieve resource isolation. for Doris that use local storage, different businesses often create different Doris clusters. Arrow Flight SQL is helpful for federated query and data migration between clusters.
Search before asking
Description
Doris implements a high-speed data link based on the Arrow Flight SQL protocol, supporting multiple languages to use SQL to read large data batches from Doris at high speed.
1. Motivation
In data science scenarios, it is often necessary to load large amounts of data from Doris to Python/Java/Spark. Loading data using Pymysql/Pandas or JDBC is very slow.
Nowadays, many big data systems use columnar in-memory data formats, Mysql/JDBC/ODBC are the mainstream protocols and standards for interacting with database systems. Their performance defects have become more and more obvious in today’s big data world. Data needs to be transferred from the system. A specific column storage format is serialized into the row storage format of Mysql/JDBC/ODBC and then deserialized back to the client's column storage format, which will significantly slow down the data movement.
If both the source database and the target client support Arrow as a columnar in-memory format, transferring using the Arrow Flight SQL protocol eliminates the need to serialize and deserialize the data, thereby eliminating the overhead in this portion of the data transfer. Additionally, Arrow Flight can leverage multi-node and multi-core architectures to optimize throughput through full parallelization.
2. Introduction to Arrow Flight SQL
Apache Arrow Flight SQL is a protocol developed by the Apache Arrow community to interact with database systems. It is used by ADBC clients to use the Arrow data format to interact with databases that implement the Arrow Flight SQL protocol. It has the speed advantage of Arrow Flight and the advantages of JDBC/ODBC. Ease of use. Some basic concepts are as follows:
Apache Arrow Apache Arrow is an efficient columnar memory format widely used for large-scale data processing and is supported by many libraries in all major programming languages.
Apache Arrow Flight Arrow Flight is an RPC framework that transmits Arrow data format, allowing high-speed data exchange between different systems using Arrow format.
Arrow Flight SQL Although the Mysql/JDBC/ODBC protocols and standards are slower, they have simple APIs for developers to use. For this reason, Arrow Flight SQL is introduced based on Arrow Flight to provide a more friendly interface to interact with the database system.
ADBC ADBC is a driver that supports different languages to access the database. The database needs to implement the Arrow Flight SQL protocol, similar to JDBC/ODBC.
3. Implementation method
3.1 Principle
In Apache Doris, query results are organized in columnar format blocks. In previous versions, if you need to transfer this data to the target client through MySQL Client or JDBC/ODBC driver, you need to first serialize the blocks into row-format bytes. If the target client is a column-formatted data science component or column-formatted database like Pandas, you also need to deserialize the row-format bytes into column-formatted ones, and the serialization/deserialization operation is a very time-consuming process.
Using Arrow Flight SQL, we first convert the column-stored Block into the same column-stored Arrow RecordBatch in Doris. This conversion step is very fast. There is no need to serialize and deserialize again during the transmission process, and then use the Python client to convert the Arrow RecordBatch is transferred to the Pandas DataFrame stored in the same column. This conversion step is also very fast.
In addition, Arrow Flight SQL also provides a universal JDBC driver, which supports the use of Arrow Flight SQL to interact with database systems that is fully compatible with the JDBC standard.
Python reading Doris acceleration has been implemented, as shown in the figure:
3.2 Outline design
ADBC Client sends a query request to Doris FE and completes the authentication on the first request.
FE parses the query plan and sends the Fragment to be executed to BE.
After BE completes the prepare and open of the Fragment, it returns the Schema of the query result in Arrow format to FE, starts executing the query, and puts the query results into a queue.
FE sends the QueryID, the Schema of the query result, and the BE address (Endpoints) where the query result is located back to the ADBC Client.
ADBC Client requests BE to pull the query results of the specified QueryID.
BE returns the query results in Arrow format in the queue to the ADBC Client, and the ADBC Client completes after verifying the Schema of the results.
3.3 Detailed design
Arrow version: 13.0.0
Take the ADBC Low-Level API execution process as an example:
3.3.1 ADBC Client
1.1 db = adbc_driver_flightsql.connect(uri="grpc://ip:port?user=&password=")
Create a Database connector that can maintain multiple shared Connections at the same time. Parameters: Arrow Flight Server IP, port, username, password
1.2 conn = adbc_driver_manager.AdbcConnection(db)
Creating a Database link will trigger authentication and obtain FlightSqlInfo.
Auth The authentication operation will be triggered when Arrow Flight Server is requested for the first time. The return value is a Bearer Token. Each subsequent request to Arrow Flight Server will bring this Token.
getFlightInfoSqlInfo Request Arrow Flight Server to return SQL Info, including the SQL syntax supported by the database, etc. The return value is the schema and endpoint of SQL Info. SQL Info is also data in arrow format, and the endpoint is still the current doris fe flight server. All data interacted in arrow flight are arrows. Usually before obtaining an arrow data, the first request will obtain its endpoint and schema and encapsulate it in a FlightInfo. Then the endpoint will be requested again to obtain the arrow data and verify it. schema
getStreamSqlInfo Request the endpoint to obtain SQL Info. The result is wrapped in ArrowArrayStream and associated with a ServerStreamListener.
1.3 stmt = adbc_driver_manager.AdbcStatement(conn)
It is used to maintain the status of the query. It can be a one-time query or a prepare statement, which can be used repeatedly, but the previous query results will be invalid.
1.4 stmt.set_sql_query("select * from tpch.hdf5 limit 10;")
1.5 stream, _ = stmt.execute_query()
Executing Query returns a RecordBatchReader, wrapped in a RecordBatchStream.
getFlightInfoStatement Returns the Endpoints and Schema where the query results are located, which is the Metadata of the Stream.
getStreamStatement Returns a RecordBatchReader for reading query results.
1.6 reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
Created a Reader using Stream.
1.7 arrow_data = reader.read_all()
read_all() will loop to call RecordBatchReader.ReadNext() to obtain the RecordBatch of query results.
Corresponding code example:
3.3.2 Doris FE
2.1 Authentication
Implement arrow.flight.auth2 related interfaces to respond to authentication when the ADBC client connects for the first time. Extract the username and password in the request header and perform authentication. After generating a 130-bit Token, associate the Token with the user's permission information and save it in a cache. The cache size and Token expiration time can be adjusted in Config. Finally, the Token is returned to the ADBC client.
2.2 getFlightInfoSqlInfo
In response to the arrow flight sql request, SQL Info is returned and two methods, FlightSqlProducer.getFlightInfoSqlInfo() and FlightSqlProducer.getStreamSqlInfo, are implemented.
When Arrow Flight Server is initialized, it will create a FlightSqlProducer that responds to ADBC requests. When FlightSqlProducer is initialized, it will bind SQL Info, including the version of Arrow, whether it supports reading and writing, whether it supports DDL statements such as creating tables and modifying schema, and supported function lists and other SQL syntax. etc.
2.3 getFlightInfoStatement
Execute Query in response to the arrow flight sql request and return the Endpoints and Schema of the query results, implementing the FlightSqlProducer.getFlightInfoStatement method.
Initialize ConnectContext. The first time ADBC Client makes an Execute Query request, it will initialize ConnectContext, which is a Session that stores information related to query execution, including user permissions, Session variables, etc.
Initialize the executor FlightStatementExecutor. Saves Query, QueryID, connectContext, and resultServerInfo.
Execute Query. Initialize QueryID and StmtExecutor, then executeArrowFlightQuery to generate the query plan, initialize and execute the Coordinator, and send the Fragment to the specified BE.
Get the Arrow Result Set Schema. Request the Arrow Flight Server of the BE where the Result Sink Node in the query plan is located. The latter will generate the Schema of the query result after the Fragment completes Prepare and Open.
Use Query and QueryID to initialize Ticket, use the Arrow Flight Server address of the BE where the Result Sink Node in the query plan is located, that is, the Server address where the query result Arrow Result Set is located, and the Ticket to initialize FlightEndpoint, and finally use Arrow Result Set Schema and Endpoints to initialize FlightInfo Then send it back to ADBC Client.
3.3.3 Doris BE
3.1 Execute Fragment
Execute the Fragment and return the Arrow Result Set Schema. The overall execution process is the same as before. The difference is that the type of ResultSinkNode in the Fragment is no longer MYSQL_PROTOCAL, but ARROW_FLIGHT_PROTOCAL. After the Prepare and Open are completed, the Arrow Schema of the query result will be put into a Map. Wait for FE to obtain and initialize ArrowFlightResultWriter.
After the subsequent query results arrive at the ResultSink, use ArrowFlightResultWriter::append_block to convert the data block into a RecordBatch in Arrow format, and then put it into a separate queue BufferControlBlock, waiting for the ADBC Client to pull it.
3.2 GetStatement
After receiving the Endpoints sent back by Doris FE, the ADBC Client will request the Arrow Flight Server address corresponding to the Endpoints located in Doris BE. After receiving the ADBC Client request, Doris BE will first Decode the Ticket and then obtain the SQL and QueryID, and then use the QueryID Find the Arrow Schema of the previously saved query result and initialize a RecordBatchReader to return it, which is used by the ADBC Client to subsequently pull data and implement the FlightSqlServerBase::DoGetStatement() method.
In addition, when the ADBC Client requests Doris BE's Arrow Flight Server for the first time, the Header will also contain the Bearer Token, but the HeaderAuthServerMiddleware and BearerAuthServerMiddleware used when the BE Arrow Flight Server is initialized are both NoOp, that is, no verification will be done, so currently BE Arrow Flight Server's permission verification of requests is based on QueryID, that is, ADBC Client is allowed to read data as long as the QueryID is correct.
3.3 ArrowFlightBatchReader::ReadNext
ADBC Client will cyclically call the ReadNext method of the previously returned RecordBatchReader to pull data, and BE Arrow Flight Server will use the QueryID in the request to pull the RecordBatch in Arrow format from the BufferControlBlock and return it.
4. How to use
Using the ADBC Driver based on Python (require version >= 3.9) as an example, connect to Doris that implements Arrow Flight SQL and supports common syntaxes such as DDL, DML, Session Veriable, Show Stmt, etc.
Modify the configuration of Doris FE and BE:
After Python uses the ADBC Driver to connect to Doris, which implements Arrow Flight SQL, the following uses various ADBC APIs to load the Clickbench data set from Doris to Python.
The execution results are as follows (repeated output is ignored). It can be seen that it takes 3 seconds to load the 1 million rows, 105 columns, and 780M Clickbench data set from Doris.
5. Progress and TODO
upgrade thirdparty libs - again https://github.com/apache/doris/pull/23414
(step1) BE support Arrow Flight server, read data only https://github.com/apache/doris/pull/23765
(step2) FE support Arrow Flight server https://github.com/apache/doris/pull/24314
(step3) Support authentication and user session https://github.com/apache/doris/pull/24772
(step4) Support other DML and DDL statements, besides Select https://github.com/apache/doris/pull/25919
(step5) Support JDBC and PreparedStatement and Fix Bug https://github.com/apache/doris/pull/27661
(step6) Support regression test https://github.com/apache/doris/pull/27847
6. Test
CREATE TABLE
hdf5(
k0varchar(65532) NULL,
k1float NULL,
k2float NULL, ……
k95float NULL ) ENGINE=OLAP DUPLICATE KEY(
k0) DISTRIBUTED BY HASH(
k0) BUCKETS 64;
6.1 Python
Compare the performance of Python using Pymysql, Pandas and Arrow Flight SQL to read Doris: