adejanovski / cassandra-jdbc-wrapper

A JDBC wrapper for the Datastax Java Driver for Cassandra
Apache License 2.0
53 stars 36 forks source link

It does not work with SparkSession.read().jdbc() since CQL does not support "where 1=0" syntax #11

Closed paulzwu closed 7 years ago

paulzwu commented 8 years ago

Please see the real issue after the comment below.

adejanovski commented 8 years ago

Hi,

actually "Test Cluster" is the name of your cluster, while the DCAwareRoundRobinPolicy is expecting a datacenter name. Replace DCAwareRoundRobinPolicy(%22Test%20Cluster%22) by DCAwareRoundRobinPolicy('datacenter1')" and you'll be ready to go.

paulzwu commented 8 years ago

Wow, response so fast. Thanks a lot.

I tried the cluster with some other way (Spark 2.0 's SQLContext) and it works. But I tried the jdbc way in Spark 2.0, it has been unsuccessful:

The log showed the following, which seems to be incorrect somewhere.

16/09/08 15:17:46 INFO CassandraConnection: Connected to cluster: %s

16/09/08 15:17:46 INFO CassandraConnection: Datacenter: %s; Host: %s; Rack: %s

16/09/08 15:17:46 INFO CassandraConnection: Datacenter: %s; Host: %s; Rack: %s

16/09/08 15:17:46 INFO CassandraConnection: Datacenter: %s; Host: %s; Rack: %s

16/09/08 15:17:46 INFO CassandraConnection: Datacenter: %s; Host: %s; Rack: %s

16/09/08 15:17:46 INFO CassandraConnection: Datacenter: %s; Host: %s; Rack: %s

Exception in thread "main" java.sql.SQLTransientException: com.datastax.driver.core.exceptions.SyntaxError: line 1:29 no viable alternative at input '1' (SELECT * FROM sql_demo WHERE [1]...)

The program is just (Spark 2.0):

final Properties connectionProperties = new Properties();
        //connectionProperties.put("user", MYSQL_USERNAME);
        //put("password", MYSQL_PWD);_

        final String dbTable
                = "sql_demo";

        //Load MySQL query result as Dataset
        Dataset<Row> jdbcDF
                = sparkSession.read()
                .jdbc(MYSQL_CONNECTION_URL, dbTable, "key", 0,10, 10, connectionProperties);

        List<Row> rows = jdbcDF.collectAsList();

        System.out.println(rows.size());
paulzwu commented 8 years ago

Hello, could you please help the issue? why the code like:

final String dbTable= "sql_demo";

        Dataset<Row> jdbcDF
                = sparkSession.read()
                .jdbc(SQL_CONNECTION_URL, dbTable, connectionProperties);

        List<Row> rows = jdbcDF.collectAsList();

generated the sql like the following log:

_16/09/20 13:16:35 INFO CassandraConnection 138: Datacenter: %s; Host: %s; Rack: %s

16/09/20 13:16:35 INFO CassandraConnection 138: Datacenter: %s; Host: %s; Rack: %s

16/09/20 13:16:35 TRACE CassandraPreparedStatement 98: CQL: SELECT * FROM sql_demo WHERE 1=0 16/09/20 13:16:35 TRACE RequestHandler 71: [19400322] com.datastax.driver.core.Statement$1@41ccb3b9_

How can I fix the problem? Your help is much appreciated.

paulzwu commented 8 years ago

More thinking: I think Spark may want to get the schema by the sql generated, but Cassandra does not support the syntax 'where 1=0'. Not sure how to get around of this.

paulzwu commented 8 years ago

Work around: Add 2 lines After Line 87 in CassandraPreparedStatement.java:


       this.cql = cql.replace("WHERE 1=0", "limit 1");
       cql = this.cql;
adejanovski commented 8 years ago

Hi,

actually I would not recommend using the JDBC driver with Spark. The Cassandra-Spark connector maintained by Datastax will be much more efficient at doing this job, as the queries can be executed on colocated Cassandra/Spark nodes. It now has a version that works with Spark 2.0 : https://github.com/datastax/spark-cassandra-connector/tree/v2.0.0-M3

paulzwu commented 7 years ago

Just let you know the issue is due to a Spark bug. I filed it at here: https://issues.apache.org/jira/browse/SPARK-17614?filter=-2 and it will be fixed in Spark 2.1