ing-bank / cassandra-jdbc-wrapper

A JDBC wrapper of Java Driver for Apache Cassandra®, which offers a simple JDBC compliant API to work with CQL3.
Apache License 2.0
70 stars 25 forks source link

Observed data loss while fetching records from Cassandra through Cassandra JDBC #24

Closed MohamedKamarudeen closed 1 year ago

MohamedKamarudeen commented 1 year ago

Table Structure : testkeyspace.productInfo (productID, productName, timestamp, price) - Partition key column in the above table is “productId”. - table -> testkeyspace.productInfo consists of 10k records

Test Case

Test consists of two Java applications Application 1 -> Writes data to Cassandra table (testkeyspace.productInfo) Application 2 -> Reads from Cassandra table (testkeyspace.productInfo) continuously with 5 seconds interval Partition Key (productID) is unique for all records. Hence, Number of partitions = Number of records in table

How data is read from Cassandra table? (Read Pattern)

  1. Initially, "SELECT *" is issued to Cassandra table with the help of Cassandra JDBC
  2. Latest Timestamp (lastNotedTimestamp) of the fetched records is noted from the timestamp column of the table
  3. The subsequent "SELECT *" are issued with a WHERE condition of timestamp > lastNotedTimestamp
  4. Step 2 and 3 are repeated until application kill.

Issue

  1. While Step 4 is executed in parallel with the write application, few random records are missing in the ResultSet returned from the JDBC call.
  2. While Step 4 is executed after write application has completed its task, records are not missed and the fetching is successful.

Note: Above mentioned issue is applicable even when using a numeric-based column in the WHERE clause of the select query.

It's tested and observed in the single node cluster.

maximevw commented 1 year ago

Hello @MohamedKamarudeen,

How does the write application work? Does it loop over a list (of 10k+ items) to execute INSERT statements one by one then the application stops once all the items are inserted? Do you use batch statements for insertions (https://github.com/ing-bank/cassandra-jdbc-wrapper#insertupdate)?

When you say "few random records are missing", do you mean entire rows are absent or only some columns are incompletely filled?

On your SELECT query in step 3, do you use ALLOW FILTERING or do you have a secondary index on the timestamp column?

By default, CQL only allows select queries that don’t involve a full scan of all partitions. If all partitions are scanned, then returning the results may experience a significant latency proportional to the amount of data in the table. The ALLOW FILTERING option explicitly executes a full scan. Thus, the performance of the query can be unpredictable. - Source: https://cassandra.apache.org/doc/latest/cassandra/cql/dml.html

MohamedKamarudeen commented 1 year ago

@maximevw

Let's assume we have two applications: one is the Writer application, and the other is the Reader application. Both applications are independent, which means the Reader does not know when the Writer application completes the write operation.

Scenario: Writer Application - Performs insert operations on the ProductInfo table. It periodically inserts 10K events, either by looping over a list to execute INSERT statements one by one or by using batch statements for insertions. The Partition key column of the productId has unique values for all rows; they are not the same.

Reader Application - Reads data from the ProductInfo table frequently. It periodically executes SELECT queries with a WHERE clause like timestampColumn > lastNotedTimestamp and uses the ORDER BY function of the timestamp column to retrieve data.

In the Reader application, observe random data loss even when using ALLOW FILTERING.

maximevw commented 1 year ago

Hello @MohamedKamarudeen,

I tried to reproduce your case, but I really don't understand how this could work. If I use the following table:

CREATE TABLE ProductInfo (
  productId int,
  name text,
  insertionTs timestamp,
  PRIMARY KEY (productId)
);

In the reader application, I can't execute such a CQL query: SELECT * FROM ProductInfo WHERE insertionTs > ? ORDER BY insertionTs ALLOW FILTERING because we got the error "ORDER BY is only supported when the partition key is restricted by an EQ or an IN.".

Obviously, if you remove the ORDER BY clause, you'll get some results but you'll not be able to determine the correct value for lastNotedTimestamp.

As you said in the original issue, no data loss is observed with the following scenario:

  1. Start the reader application, reading the table ProductInfo every 20 seconds.
  2. Start the writer application, writing 10k rows in ProductInfo.
  3. The next round of the reader application happens after all the rows are inserted (because it took few seconds to insert 10k rows, so the interval of 20 seconds is enough to not read rows in the same time as writing them) and all the rows are correctly read.

So, it could be helpful to provide:

MohamedKamarudeen commented 1 year ago

Hello @maximevw, Attaching the code for both the Writer and Reader applications. Please find below the steps to reproduce:

Steps to Reproduce:

  1. Add the Cassandra JAR to the dependencies.
  2. Start the Reader application first.
  3. Then start the Writer application.

Expected Result: The Reader application should retrieve all the records from the test.ProductInfo table, and the fetch count should be equivalent to the row count of the test.ProductInfo table in the database.

Reader code logic: In the readAndWriteToTable method, the SQL queries (NORMAL_QUERY and EXCLUDING_QUERY) are executed based on the positions tracked in the positionMap to efficiently retrieve data and duplicate prevention: NORMAL_QUERY:

  1. This query is executed when there are no duplicates to handle, or when the code is initially started.
  2. It fetches data from the source table (test.ProductInfo) without any exclusions. Essentially, it retrieves all available data. EXCLUDING_QUERY:
  3. The purpose of this query is to retrieve only the new data, ensuring that records with timestamps less than the latest processed timestamp are not fetched, thus preventing the insertion of duplicates.
  4. The allow filtering clause indicates that filtering based on the timestamp is allowed, which is necessary for excluding records based on the timestamp condition.

Tracking the Latest Timestamp:

  1. The primary purpose of the positionMap is to keep track of the most recent timestamp (insertionTs) associated with each unique productId.
  2. The TreeMap data structure is chosen for positionMap because it automatically maintains the keys (in this case, productId) in sorted order. This sorting is crucial when it comes to efficiently identifying the latest timestamp.

Please find below the programs:

Writer.java:

package org.example.cassandraTest;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;

/**
 * A class for inserting data into a database table.
 */
public class Writer {

    private static final String INSERT_QUERY = "INSERT INTO test.ProductInfo (productId, name, insertionTs) VALUES (?, ?, ?)";
    private static final int BATCH_SIZE = 1000;

    /**
     * Inserts data into the database.
     *
     * @param connection The database connection.
     * @throws SQLException If a database error occurs.
     */
    public void insert(Connection connection, int start, int end) throws SQLException {
        PreparedStatement preparedStatement = null;
        try {
            preparedStatement = connection.prepareStatement(INSERT_QUERY);

            int batchCount = 0;

            for (int iteration = start; iteration <= end; iteration++) {
                preparedStatement.setInt(1, iteration);
                preparedStatement.setString(2, "text value");
                preparedStatement.setTimestamp(3, new Timestamp(System.currentTimeMillis()));

                preparedStatement.addBatch();
                batchCount++;

                if (batchCount == BATCH_SIZE) {
                    preparedStatement.executeBatch();
                    batchCount = 0;
                }
            }
            preparedStatement.executeBatch();

        } catch (SQLException exception) {
            throw exception;
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }

    public static void main(String[] args) {
        Connection connection = null;
        try {
            Writer writer = new Writer();
            connection = DBConnection.getConnection();

            for (int i=0; i<10; i++) {
                writer.insert(connection, i*10000+1, (i+1)*10000);
                Thread.sleep(2000);
            }

        } catch (SQLException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

Reader.java:

package org.example.cassandraTest;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.TreeMap;

/**
 * A class for reading data from a database table and writing it to another table.
 */
public class Reader {

    private static final String NORMAL_QUERY = "select productId, name, insertionTs from test.ProductInfo";
    private static final String EXCLUDING_QUERY = "select productId, name, insertionTs from test.ProductInfo where insertionTs > ? allow filtering";
    private static final String INSERT_QUERY = "INSERT INTO test.ProductInfo2 (productId, name, insertionTs) VALUES (?, ?, ?)";

    /**
     * Reads data from a source table and writes it to a target table.
     *
     * @param connection The database connection.
     */
    public void readAndWriteToTable(Connection connection) {
        PreparedStatement ps = null;
        PreparedStatement insertStatement = null;
        ResultSet rs = null;
        Timestamp timestampPos = null;
        int fetchCount = 0;
        int insertedCount = 0;

        TreeMap<Integer, Timestamp> positionMap = new TreeMap<>();
        try {
            ps = connection.prepareStatement(NORMAL_QUERY);
            ps.setFetchSize(1000);
            rs = ps.executeQuery();
            insertStatement = connection.prepareStatement(INSERT_QUERY);

            while (true) {
                while (rs.next()) {
                    int productId = rs.getInt(1);
                    String name = rs.getString(2);
                    Timestamp insertionTs = rs.getTimestamp(3);
                    fetchCount++;
                    if (positionMap.containsKey(productId)) {
                        System.out.println("Duplicate Record Fetched:" + productId);
                        continue;
                    }

                    insertStatement.setInt(1, productId);
                    insertStatement.setString(2, name);
                    insertStatement.setTimestamp(3, insertionTs);
                    insertStatement.executeUpdate();
                    insertedCount++;

                    positionMap.put(productId, insertionTs);
                }

                rs.close();
                ps.close();

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                if (!positionMap.isEmpty()) {
                    timestampPos = positionMap.get(positionMap.lastKey());
                    ps = connection.prepareStatement(EXCLUDING_QUERY);
                    ps.setTimestamp(1, timestampPos);

                    positionMap.clear();
                    System.out.println("After clearing the map, Executing the Excluding Query with the pos: " + timestampPos);
                    System.out.println("Total Fetch Count: " + fetchCount);
                    System.out.println("Total Insert Count: " + insertedCount);
                } else if (timestampPos != null) {
                    ps = connection.prepareStatement(EXCLUDING_QUERY);
                    ps.setTimestamp(1, timestampPos);

                    System.out.println("Executing the Excluding Query with the pos: " + timestampPos);
                    System.out.println("Total Fetch Count: " + fetchCount);
                    System.out.println("Total Insert Count: " + insertedCount);
                } else {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    ps = connection.prepareStatement(NORMAL_QUERY);
                    System.out.println("Executing the normal Query");
                }
                ps.setFetchSize(1000);
                rs = ps.executeQuery();
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        } finally {
            System.out.println("Total Fetch Count: " + fetchCount);
            System.out.println("Total Insert Count: " + insertedCount);
            try {
                if (rs != null)
                    rs.close();
                if (ps != null)
                    ps.close();
                if (insertStatement != null)
                    insertStatement.close();
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) {
        Reader reader = new Reader();
        try {
            reader.readAndWriteToTable(DBConnection.getConnection());
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}

DBConnection.java

package org.example.cassandraTest;

import java.sql.DriverManager;
import java.sql.SQLException;

public class DBConnection {
    private static final String URL = "jdbc:cassandra://localhost:9042/test";
    private static final String USERNAME = "cassandra";
    private static final String PASS = "cassandra";

    public static java.sql.Connection getConnection() throws SQLException {
        try {
            Class.forName("com.github.adejanovski.cassandra.jdbc.CassandraDriver");
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
        return DriverManager.getConnection(URL, USERNAME, PASS);
    }
}
maximevw commented 1 year ago

Hello @MohamedKamarudeen,

First of all, you don't use the correct driver: it should be com.ing.data.cassandra.jdbc.CassandraDriver. Please also be sure you use the latest available version of cassandra-jdbc-wrapper (4.9.0). This project doesn't support versions prior to 4.4.0.

I finally succeeded to run your test case using the provided code and I was able to reproduce the described behaviour. Based on this, I replaced the JDBC connection for SELECT queries in the reader by calls directly using the Datastax Java driver (I kept the insertion using JDBC since all the fetched rows are correctly inserted in all the cases) and I got the following results:

Executing the normal Query
Executing the normal Query
After clearing the map, Executing the Excluding Query with the pos: 2023-08-21 20:06:04.757
Total Fetch Count: 16964
Total Insert Count: 16964
After clearing the map, Executing the Excluding Query with the pos: 2023-08-21 20:06:09.104
Total Fetch Count: 31902
Total Insert Count: 31902
After clearing the map, Executing the Excluding Query with the pos: 2023-08-21 20:06:19.77
Total Fetch Count: 62090
Total Insert Count: 62090
Executing the Excluding Query with the pos: 2023-08-21 20:06:19.77
Total Fetch Count: 62090
Total Insert Count: 62090
...

The file Reader.java using DataStax Java driver:

package com.github.maximevw;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Reader {
    public static void main(String[] args) throws SQLException {
        Reader reader = new Reader();
        try {
            reader.readAndWriteToTable(DBConnection.getConnection());
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * A class for reading data from a database table and writing it to another table.
     */
    public static class Reader {

        private static final String NORMAL_QUERY = "select productId, name, insertionTs from ProductInfo";
        private static final String EXCLUDING_QUERY = "select productId, name, insertionTs from ProductInfo where insertionTs > ? allow filtering";
        private static final String INSERT_QUERY = "INSERT INTO ProductInfo2 (productId, name, insertionTs) VALUES (?, ?, ?)";

        /**
         * Reads data from a source table and writes it to a target table.
         *
         * @param connection The database connection.
         */
        public void readAndWriteToTable(Connection connection) {
            Timestamp timestampPos = null;
            int fetchCount = 0;
            AtomicInteger fetchCountAtomic = new AtomicInteger(0);
            int insertedCount = 0;
            AtomicInteger insertedCountAtomic = new AtomicInteger(0);

            TreeMap<Integer, Timestamp> positionMap = new TreeMap<>();
            try {
                com.datastax.oss.driver.api.core.cql.ResultSet rsds = null;
                CqlSession session = CqlSession.builder().build();
                SimpleStatement stmt = SimpleStatement.newInstance(NORMAL_QUERY).setPageSize(1000);
                rsds = session.execute(stmt);

                while (true) {
                    rsds.forEach(row -> {
                        int productId = row.getInt(0);
                        String name = row.getString(1);
                        Timestamp insertionTs = Timestamp.from(row.getInstant(2));
                        fetchCountAtomic.addAndGet(1);
                        if (positionMap.containsKey(productId)) {
                            System.out.println("Duplicate Record Fetched:" + productId);
                        }
                        PreparedStatement insertStatement = null;
                        try {
                            insertStatement = connection.prepareStatement(INSERT_QUERY);
                            insertStatement.setInt(1, productId);
                            insertStatement.setString(2, name);
                            insertStatement.setTimestamp(3, insertionTs);
                            insertStatement.executeUpdate();
                            insertedCountAtomic.addAndGet(1);
                        } catch (SQLException e) {
                            System.err.println(e);
                        } finally {
                            try {
                                if (insertStatement != null)
                                    insertStatement.close();
                            } catch (SQLException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        positionMap.put(productId, insertionTs);
                    });

                    fetchCount = fetchCountAtomic.get();
                    insertedCount = insertedCountAtomic.get();

                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                    if (!positionMap.isEmpty()) {
                        timestampPos = positionMap.get(positionMap.lastKey());
                        com.datastax.oss.driver.api.core.cql.PreparedStatement prepared = session.prepare(EXCLUDING_QUERY);
                        BoundStatement bound = prepared.bind(timestampPos.toInstant());
                        rsds = session.execute(bound);

                        positionMap.clear();
                        System.out.println("After clearing the map, Executing the Excluding Query with the pos: " + timestampPos);
                        System.out.println("Total Fetch Count: " + fetchCount);
                        System.out.println("Total Insert Count: " + insertedCount);
                    } else if (timestampPos != null) {
                        com.datastax.oss.driver.api.core.cql.PreparedStatement prepared = session.prepare(EXCLUDING_QUERY);
                        BoundStatement bound = prepared.bind(timestampPos.toInstant());
                        rsds = session.execute(bound);

                        System.out.println("Executing the Excluding Query with the pos: " + timestampPos);
                        System.out.println("Total Fetch Count: " + fetchCount);
                        System.out.println("Total Insert Count: " + insertedCount);
                    } else {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        stmt = SimpleStatement.newInstance(NORMAL_QUERY).setPageSize(1000);
                        rsds = session.execute(stmt);
                        System.out.println("Executing the normal Query");
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                System.out.println("Total Fetch Count: " + fetchCount);
                System.out.println("Total Insert Count: " + insertedCount);
            }
        }
    }
}

So, since cassandra-jdbc-wrapper is just a wrapper of the DataStax Java driver, it just returns all the results obtained from the driver. As shown above, the behaviour is not specific to the JDBC wrapper but seems to be more related to the way Cassandra and/or the DataStax Java driver handles the simultaneous read/write operations on a table.

I advise you to discuss your case using the appropriate channel in the Cassandra project: https://cassandra.apache.org/_/community.html#discussions.

maximevw commented 1 year ago

@MohamedKamarudeen I found you already posted the question on StackOverflow 3 months ago and you got interesting answers confirming what I said in my last message: it's more a design issue since Cassandra is not optimized to perform your specific scenario.

Since the issue is not related to this project, I close it.