apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.39k stars 181 forks source link

wrong location in FlightEndpoint causing not support Flight SQL JDBC Driver #1012

Open Curricane opened 2 months ago

Curricane commented 2 months ago

Describe the bug use flight-sql-jdbc-driver can not execute "select 1" in ballista

To Reproduce os: wsl2 ArchLinux

start ballista

RUST_LOG=info ballista-scheduler RUST_LOG=info ballista-executor -c 4

connect ballista with java code

os: windows11 ide: Intellij

package com.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
import org.apache.arrow.compression.CommonsCompressionFactory;
import org.apache.arrow.compression.Lz4CompressionCodec;

public class App {
    public static void main(String[] args) {
        try {
            // Load the JDBC driver
            Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");

            String url = "jdbc:arrow-flight-sql://localhost:50050?useEncryption=0";

            Properties p = new Properties();
            p.setProperty("user", "admin");
            p.setProperty("password", "password");
            Connection connection = DriverManager.getConnection(url, p);

            // Create a statement
            Statement statement = connection.createStatement();

            // Execute a query
            String query = "SELECT 1"; // Replace "your_table" with your actual table name
            ResultSet resultSet = statement.executeQuery(query);

            // Process the result set
            while (resultSet.next()) {
                System.out.println(resultSet.getInt(1)); // Adjust the index or use column names depending on your
                                                            // schema
            }

            // Close the resources
            resultSet.close();
            statement.close();
            connection.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.fusiongrid</groupId>
    <artifactId>arrow-flight-sql-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.6.0</version>
                <configuration>
                    <mainClass>com.test.App</mainClass>
                    <executable>java</executable>
                    <arguments>
                        <argument>--add-opens=java.base/java.nio=ALL-UNNAMED</argument>
                        <argument>-Darrow.memory.debug.allocator=true</argument>
                    </arguments>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>flight-sql-jdbc-driver</artifactId>
            <version>16.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-compression</artifactId>
            <version>16.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.13</version>
        </dependency>

        <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.5.5</version>
    </dependency>

    </dependencies>
</project>

run

error message

java.sql.SQLException: Error while executing SQL "SELECT 1": Ballista Error: General("scheduler::from_proto(Action) invalid or missing action")
    at org.apache.arrow.driver.jdbc.shaded.org.apache.calcite.avatica.Helper.createException(Helper.java:56)
    at org.apache.arrow.driver.jdbc.shaded.org.apache.calcite.avatica.Helper.createException(Helper.java:41)
    at org.apache.arrow.driver.jdbc.shaded.org.apache.calcite.avatica.AvaticaStatement.executeInternal(AvaticaStatement.java:164)
    at org.apache.arrow.driver.jdbc.shaded.org.apache.calcite.avatica.AvaticaStatement.executeQuery(AvaticaStatement.java:228)
    at com.test.App.main(App.java:29)
Caused by: org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.FlightRuntimeException: INTERNAL: Ballista Error: General("scheduler::from_proto(Action) invalid or missing action")
    at org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.CallStatus.toRuntimeException(CallStatus.java:131)
    at org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.grpc.StatusUtils.fromGrpcRuntimeException(StatusUtils.java:165)
    at org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.grpc.StatusUtils.fromThrowable(StatusUtils.java:186)
    at org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.FlightStream$Observer.onError(FlightStream.java:467)
    at org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.FlightClient$1.onError(FlightClient.java:351)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.grpc.ClientInterceptorAdapter$FlightClientCallListener.onClose(ClientInterceptorAdapter.java:116)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.arrow.driver.jdbc.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1570)

INTERNAL: Ballista Error: General("scheduler::from_proto(Action) invalid or missing action")

Expected behavior A clear and concise description of what you expected to happen.

Additional context the bug is get_flight_info_prepared_statement return FlightInfo with wrong FlightEndpoint to ballista executor which contain location returned from https://github.com/apache/datafusion-ballista/blob/04766d5fd7cbb15d583f736979c11de06c720928/ballista/scheduler/src/flight_sql.rs#L292-L301

Curricane commented 2 months ago

https://arrow.apache.org/docs/format/Flight.html#downloading-data

Consume each endpoint returned by the server.

To consume an endpoint, the client should connect to one of the locations in the endpoint, then call DoGet(Ticket) with the ticket in the endpoint. This will give the client a stream of Arrow record batches.

If the server wishes to indicate that the data is on the local server and not a different location, then it can return an empty list of locations