siddhi-io / siddhi

Stream Processing and Complex Event Processing Engine
http://siddhi.io
Apache License 2.0
1.52k stars 527 forks source link

Removal of the last stored event in the InMemory table leads to the broken references chain #1822

Open ydidukh opened 9 months ago

ydidukh commented 9 months ago

Description: The removal of the last stored event in the InMemory table disrupts the references chain, subsequently causing improper behavior within the InMemory table. Any future inserts are erroneously appended to the removed event, leading to unexpected and undesired outcomes.

Affected Siddhi Version:

4.1.27 and higher. The problem has been introduced in the version 4.1.27 when support of incremental snapshots has been added (SnapshotableStreamEventQueue class).

OS, DB, other environment details and versions: N/A

Steps to reproduce:

    @Test
    public void deleteFromTableTest6() throws InterruptedException {
        log.info("deleteFromTableTest6");

        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "" +
                "define stream StockStream (symbol string, price float, vol long); " +
                "define stream DeleteStockStream (symbol string, price float, vol long); " +
                "define stream CountStockStream (symbol string); " +
                "define table StockTable (symbol string, price float, volume long); ";
        String query = "" +
                "@info(name = 'query1') " +
                "from StockStream " +
                "select symbol, price, vol as volume " +
                "insert into StockTable ;" +
                "" +
                "@info(name = 'query2') " +
                "from DeleteStockStream[vol>=100] " +
                "delete StockTable " +
                "   on StockTable.symbol==symbol ;" +
                "" +
                "@info(name = 'query3') " +
                "from CountStockStream#window.length(0) join StockTable" +
                " on CountStockStream.symbol==StockTable.symbol " +
                "select CountStockStream.symbol as symbol " +
                "insert into CountResultsStream ;";

        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);

        InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
        InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
        InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream");

        siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                EventPrinter.print(events);
                inEventCount += events.length;
            }
        });
        siddhiAppRuntime.start();

        stockStream.send(new Object[]{"WSO2", 55.6f, 100L});
        stockStream.send(new Object[]{"IBM", 75.6f, 100L});
        // Remove last event in the StockTable
        deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L});

        stockStream.send(new Object[]{"WSO2", 57.6f, 100L});
        countStockStream.send(new Object[]{"WSO2"});

        Thread.sleep(500);
        AssertJUnit.assertEquals(2, inEventCount);
        siddhiAppRuntime.shutdown();

    }

Related Issues: N/A

ydidukh commented 1 month ago

Guys, can someone look at this issue?