deephaven / deephaven-core

Deephaven Community Core
Other
255 stars 81 forks source link

flatten: clock step assertion errors #4568

Closed nbauernfeind closed 1 year ago

nbauernfeind commented 1 year ago
import io.deephaven.engine.table.impl.QueryTable

import io.deephaven.engine.util.TableTools
import io.deephaven.engine.table.impl.select.SimulationClock
import io.deephaven.engine.table.impl.select.SortedClockFilter

def localDate = DateTimeUtils.parseLocalDate("2023-09-28")
def zdt = localDate.atStartOfDay(ZoneId.of("America/Denver"))

def start = DateTimeUtils.toInstant(zdt)
def end = DateTimeUtils.toInstant(zdt.plus(1, java.time.temporal.ChronoUnit.DAYS))

start_x = DateTimeUtils.epochNanos(start)
end_x = DateTimeUtils.epochNanos(end)
step_x = 5000000L // 5ms

// create source table
def numSteps = (long)((end_x - start_x + step_x - 1) / step_x)
t0 = TableTools.emptyTable(numSteps).update("Time = DateTimeUtils.epochNanosToInstant(start_x + step_x * ii)")

clock = new SimulationClock(start, end, 5000000L)

t1 = t0.where(new SortedClockFilter("Time", clock, true))

clock.start()

import io.deephaven.engine.table.TableUpdate
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter

listener = new InstrumentedTableUpdateListenerAdapter(t1, false) {
  def next = 0
  def void onUpdate(TableUpdate upstream) {
    while (upstream.added().lastRowKey() >= next) {
        next += 10000
        println("" + upstream.added() + ", " + upstream.removed() + ", " + upstream.modified());
    }
  }
}

t1.addUpdateListener(listener)
t2 = t1.tail(1)

I then look at t1 in the web UI and scrolled to the bottom. The UI keeps making new flatten requests, which seems to be related.

I have seen both of these:

 Server shutdown: Exception while processing PeriodicUpdateGraph notification

io.deephaven.base.verify.AssertionFailure: Assertion failed: asserted oldRecordedStep < step, instead oldRecordedStep == 474915, step == 474915.
io.deephaven.base.verify.Assert.fail(Assert.java:100)
io.deephaven.base.verify.Assert.lt(Assert.java:674)
io.deephaven.engine.table.impl.util.StepUpdater.forceUpdateRecordedStep(StepUpdater.java:65)
io.deephaven.engine.table.impl.InstrumentedTableListenerBase.afterRunNotification(InstrumentedTableListenerBase.java:231)
io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.doRunInternal(InstrumentedTableListenerBase.java:366)
io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.doRun(InstrumentedTableListenerBase.java:316)
io.deephaven.engine.table.impl.InstrumentedTableUpdateListener$Notification.run(InstrumentedTableUpdateListener.java:37)
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph.runNotification(PeriodicUpdateGraph.java:1322)
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$ConcurrentNotificationProcessor.processSatisfiedNotifications(PeriodicUpdateGraph.java:1373)
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$NotificationProcessorThreadFactory.lambda$newThread$0(PeriodicUpdateGraph.java:1911)
java.base/java.lang.Thread.run(Thread.java:832)
Server shutdown: Exception while processing PeriodicUpdateGraph notification

io.deephaven.base.verify.AssertionFailure: Assertion failed: asserted Enqueued after lastCompletedStep already set to current step: ListenerImpl-flatten(), step=95283, lastCompletedStep=95283 is never executed.
io.deephaven.base.verify.Assert.fail(Assert.java:87)
io.deephaven.base.verify.Assert.statementNeverExecuted(Assert.java:222)
io.deephaven.engine.table.impl.InstrumentedTableListenerBase.onNotificationCreated(InstrumentedTableListenerBase.java:203)
io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.<init>(InstrumentedTableListenerBase.java:286)
io.deephaven.engine.table.impl.InstrumentedTableUpdateListener$Notification.<init>(InstrumentedTableUpdateListener.java:32)
io.deephaven.engine.table.impl.InstrumentedTableUpdateListener.getNotification(InstrumentedTableUpdateListener.java:23)
io.deephaven.engine.table.impl.InstrumentedTableUpdateListener.getNotification(InstrumentedTableUpdateListener.java:10)
io.deephaven.engine.table.impl.BaseTable.lambda$notifyListeners$8(BaseTable.java:704)
io.deephaven.util.datastructures.SimpleReferenceManager.forEach(SimpleReferenceManager.java:136)
io.deephaven.engine.table.impl.BaseTable.notifyListeners(BaseTable.java:703)
io.deephaven.engine.table.impl.BaseTable.notifyListeners(BaseTable.java:625)
io.deephaven.engine.table.impl.select.ClockFilter.run(ClockFilter.java:115)
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$UpdateSourceRefreshNotification.run(PeriodicUpdateGraph.java:1879)
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph.runNotification(PeriodicUpdateGraph.java:1322)
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$ConcurrentNotificationProcessor.processSatisfiedNotifications(PeriodicUpdateGraph.java:1373)
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$NotificationProcessorThreadFactory.lambda$newThread$0(PeriodicUpdateGraph.java:1911)
java.base/java.lang.Thread.run(Thread.java:832)
rcaudy commented 1 year ago

This is because we're not ensuring that the result FilteredTable checks the satisfaction of its filters before declaring itself satisfied.

rcaudy commented 1 year ago

We should make WhereFilter implement NotificationQueue.Dependency. In most cases they will not need anything special, but in the case of ClockFilter it is not satisfied until the update graph is.

rcaudy commented 1 year ago

FilteredTable should refer to its filters via addParentReference instead of an array for referential integrity + management.

rcaudy commented 1 year ago

Or we could leave stuff mostly alone and just make sure ClockFilter is a NotificationQueue.Dependency that delegates to the UpdateGraph for satisfaction.

Or we could change BaseTable.satisfied() to always check that the UpdateGraph is satisfied.