apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.87k stars 656 forks source link

Flink Sedona,geomTbl.execute().print() happen error: #1446

Open Leopold-Xu opened 3 months ago

Leopold-Xu commented 3 months ago

Expected behavior

i hope this proble solved,print result like this,Thanks!: +----+--------------------------------+--------------------------------+ | op | geom_polygon | name_polygon | +----+--------------------------------+--------------------------------+ | +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 | | +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 | | +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 | | +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 | | +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 | | +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 | | +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 | | +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 | | +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 | | +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 | +----+--------------------------------+--------------------------------+ 10 rows in set

Actual behavior

this is java code:

 public void test() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.0.102",8081,
                "C:/Users/15218/Desktop/quickstart/jars/flink-connector-jdbc-3.1.2-1.18.jar",
                "C:/Users/15218/Desktop/quickstart/jars/postgresql-42.7.3.jar"
                );
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        StreamTableEnvironment send = SedonaContext.create(env, tableEnv);

        String sourceSql = "SELECT id,age,wkt FROM test";

        String postgresSinkDDL = "CREATE TABLE source_table  (\n" +
                " id INT,\n" +
                " age INT,\n" +
                " wkt STRING,\n" +
                " PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'jdbc',\n" +
                " 'url' = 'jdbc:postgresql://192.168.0.102:5432/jjlbsdb',\n"+
                " 'table-name' = 'test',\n" +
                " 'driver' = 'org.postgresql.Driver',\n" +
                " 'username' = 'postgres',\n" +
                " 'password' ='postgres'\n" +
                ")";
        tableEnv.executeSql(postgresSinkDDL);
        Table result = tableEnv.sqlQuery("select  id,age,wkt from source_table");
        result.execute().print();
        send.createTemporaryView("myTable", result);
        Table geomTbl = send.sqlQuery("SELECT id,age,ST_GeomFromWKT(wkt) as point FROM myTable");
        geomTbl.execute().print();
    }

Steps to reproduce the problem

when program execute last line "geomTbl.execute().print() ' happened ereor:` Caused by: org.apache.flink.util.FlinkRuntimeException: java.io.EOFException at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:66) ~[flink-table-common-1.19.0.jar:1.19.0] at GeneratedCastExecutor$0.cast(Unknown Source) ~[na:na] at org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.lambda$init$0(RowDataToStringConverterImpl.java:81) ~[flink-table-planner_2.12-1.19.0.jar:1.19.0] at org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.convert(RowDataToStringConverterImpl.java:94) ~[flink-table-planner_2.12-1.19.0.jar:1.19.0] at org.apache.flink.table.utils.print.TableauStyle.rowFieldsToString(TableauStyle.java:174) ~[flink-table-common-1.19.0.jar:1.19.0] at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:148) ~[flink-table-common-1.19.0.jar:1.19.0] at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:163) ~[flink-table-api-java-1.19.0.jar:1.19.0] at com.jjgis.sedonaquickstart.demos.web.BasicController.test(BasicController.java:68) ~[classes/:na] at com.jjgis.sedonaquickstart.demos.web.BasicController.run(BasicController.java:75) ~[classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:773) [spring-boot-2.6.13.jar:2.6.13] ... 5 common frames omitted

Settings

Sedona version = 1.6.0

Apache Spark version = ?

Apache Flink version = 1.19

API type = Java

Scala version = 2.12,

JRE version = 1.8

Python version = ?

Environment = Standalone, AWS EC2, EMR, Azure, Databricks?

jiayuasu commented 3 months ago

Hey @docete did you try the flink code we have before? Did you run into this issue?

Leopold-Xu commented 3 months ago

Hey @docete did you try the flink code we have before? Did you run into this issue?

Hey,how's it going?