deephaven / deephaven-core

Deephaven Community Core
Other
255 stars 81 forks source link

Calling a Python UDF in agg.Formula() with agg_by on PartitionedTableProxy failed with function not being found. #4847

Open jmao-denver opened 11 months ago

jmao-denver commented 11 months ago

Test code:

    def test_agg_formula_scope(self):
        with self.subTest("agg_by_formula"):
            def agg_by_formula():
                def my_fn(vals):
                    import deephaven.dtypes as dht
                    return dht.array(dht.double, [i + 2 for i in vals])

                t = empty_table(1000).update_view(["A=i%2", "B=A+3"])
                pt_proxy = t.partition_by("A").proxy()
                rlt_pt_proxy = pt_proxy.agg_by([formula("(double[])my_fn(each)", formula_param='each', cols=['C=B']),
                                      median("B")],
                             by='A')
                return rlt_pt_proxy

            ptp = agg_by_formula()
            self.assertIsNotNone(ptp)

Part of the traceback:

  File "/tmp/py/server/tests/test_pt_proxy.py", line 362, in test_agg_formula_scope
    t = agg_by_formula()
  File "/tmp/py/server/tests/test_pt_proxy.py", line 357, in agg_by_formula
    rlt_pt_proxy = pt_proxy.agg_by([formula("(double[])my_fn(each)", formula_param='each', cols=['C=B']),
  File "/tmp/py/server/deephaven/table.py", line 3328, in agg_by
    raise DHError(e, "agg_by operation on the PartitionedTableProxy failed.") from e
deephaven.dherror.DHError: agg_by operation on the PartitionedTableProxy failed. : Cannot find method my_fn(io.deephaven.vector.IntVector)
Traceback (most recent call last):
  File "/tmp/py/server/deephaven/table.py", line 3326, in agg_by
    return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.aggBy(j_agg_list, *by))
RuntimeError: io.deephaven.engine.table.impl.select.FormulaCompilationException: Formula compilation error for: (double[])my_fn(B)
    at io.deephaven.engine.table.impl.select.DhFormulaColumn.initDef(DhFormulaColumn.java:214)
    at io.deephaven.engine.table.impl.by.FormulaChunkedOperator.<init>(FormulaChunkedOperator.java:98)
    at io.deephaven.engine.table.impl.by.AggregationProcessor$NormalConverter.visit(AggregationProcessor.java:752)
    at io.deephaven.api.agg.spec.AggSpecFormula.walk(AggSpecFormula.java:69)
    at io.deephaven.engine.table.impl.by.AggregationProcessor$Converter.visit(AggregationProcessor.java:403)
    at io.deephaven.api.agg.ColumnAggregation.walk(ColumnAggregation.java:33)
    at io.deephaven.engine.table.impl.by.AggregationProcessor$Converter.walkAllAggregations(AggregationProcessor.java:357)
    at io.deephaven.engine.table.impl.by.AggregationProcessor$Converter.build(AggregationProcessor.java:350)
    at io.deephaven.engine.table.impl.by.AggregationProcessor.makeAggregationContext(AggregationProcessor.java:293)
    at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.aggregation(ChunkedOperatorAggregationHelper.java:155)
    at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.lambda$aggregation$1(ChunkedOperatorAggregationHelper.java:127)
    at io.deephaven.engine.table.impl.BaseTable.initializeWithSnapshot(BaseTable.java:1293)
    at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.aggregation(ChunkedOperatorAggregationHelper.java:124)
    at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.aggregation(ChunkedOperatorAggregationHelper.java:77)
    at io.deephaven.engine.table.impl.QueryTable.lambda$aggNoMemo$15(QueryTable.java:814)
    at io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder.withNugget(QueryPerformanceRecorder.java:519)
    at io.deephaven.engine.table.impl.QueryTable.aggNoMemo(QueryTable.java:813)
    at io.deephaven.engine.table.impl.QueryTable.lambda$aggBy$14(QueryTable.java:788)
    at io.deephaven.engine.table.impl.QueryTable$MemoizedResult.getOrCompute(QueryTable.java:3483)
    at io.deephaven.engine.table.impl.QueryTable.memoizeResult(QueryTable.java:3452)
    at io.deephaven.engine.table.impl.QueryTable.aggBy(QueryTable.java:788)
    at io.deephaven.engine.table.impl.QueryTable.aggBy(QueryTable.java:106)
    at io.deephaven.engine.table.impl.partitioned.PartitionedTableProxyImpl.lambda$aggBy$34(PartitionedTableProxyImpl.java:556)
    at io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl.lambda$assertResultsStatic$4(PartitionedTableImpl.java:375)
    at io.deephaven.engine.table.impl.partitioned.TableTransformationColumn$OutputFormula.transformAndFill(TableTransformationColumn.java:154)
    at io.deephaven.engine.table.impl.partitioned.TableTransformationColumn$OutputFormula.fillChunk(TableTransformationColumn.java:133)
    at io.deephaven.engine.table.impl.sources.ViewColumnSource.fillChunk(ViewColumnSource.java:219)
    at io.deephaven.engine.table.impl.select.analyzers.SelectColumnLayer.doApplyUpdate(SelectColumnLayer.java:412)
    at io.deephaven.engine.table.impl.select.analyzers.SelectColumnLayer.lambda$doParallelApplyUpdate$3(SelectColumnLayer.java:280)
    at io.deephaven.engine.util.systemicmarking.SystemicObjectTracker.executeSystemically(SystemicObjectTracker.java:56)
    at io.deephaven.engine.table.impl.select.analyzers.SelectColumnLayer.doParallelApplyUpdate(SelectColumnLayer.java:279)
    at io.deephaven.engine.table.impl.select.analyzers.SelectColumnLayer.lambda$prepareParallelUpdate$0(SelectColumnLayer.java:246)
    at io.deephaven.engine.table.impl.util.JobScheduler.lambda$iterateParallel$1(JobScheduler.java:353)
    at io.deephaven.engine.table.impl.util.JobScheduler$IterationManager$TaskInvoker.execute(JobScheduler.java:255)
    at io.deephaven.engine.table.impl.util.JobScheduler$IterationManager.lambda$startTasks$0(JobScheduler.java:161)
    at io.deephaven.engine.table.impl.util.OperationInitializationPoolJobScheduler.lambda$submit$0(OperationInitializationPoolJobScheduler.java:26)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at io.deephaven.engine.table.impl.OperationInitializationThreadPool$1.lambda$newThread$0(OperationInitializationThreadPool.java:63)
    at org.jpy.PyLib.callAndReturnObject(Native Method)
    at org.jpy.PyObject.call(PyObject.java:449)
    at io.deephaven.server.console.python.DebuggingInitializer.lambda$createInitializer$0(DebuggingInitializer.java:30)
    at java.base/java.lang.Thread.run(Thread.java:1583) 
jmao-denver commented 11 months ago

From @rcaudy

PartitionedTableProxyImpl avoids needing to pin the QueryScope by pre-initializing SelectColumn and WhereFilter inputs. This is highly desirable. aggBy with AggFormula does not do this pre-initialization. In the meantime, users could workaround by using PartitionedTable.transform and controlling the ExecutionContext that will be used in order to ensure that it preserves the QueryScope.