deephaven / deephaven-docs-community

Source code for Community docs on the deephaven.io website.
Apache License 2.0
0 stars 5 forks source link

feat: a new PartitionedTableService API in Python #353

Open deephaven-internal opened 2 weeks ago

deephaven-internal commented 2 weeks ago

This issue was auto-generated

PR: https://github.com/deephaven/deephaven-core/pull/6175 Author: jmao-denver

Original PR Body

Fixes #6171

Most recent nightlies run: https://github.com/nbauernfeind/deephaven-core/actions/runs/11502810543/

nbauernfeind commented 2 weeks ago

This is a working example that I believe appropriate to use as the basis of deephaven.io documentation:

### Below is a sample implementation showing:
### 1) how to implement a sample TableDataServiceBackend
### 2) manually manipulate the TableDataServiceBackend to demonstrate behavior of static and refreshing scenarios
### 3) how to fetch a table from that backend in both static and refreshing contexts

from typing import Callable, Optional, Dict
import pyarrow as pa
from deephaven.experimental.table_data_service import TableDataServiceBackend, TableKey, TableLocationKey, TableDataService

class TableKeyImpl(TableKey):
    def __init__(self, key: str):
        self.key = key

    def __hash__(self):
        return hash(self.key)

    def __eq__(self, other):
        if not isinstance(other, TableKeyImpl):
            return NotImplemented
        return self.key == other.key

    def __str__(self):
        return f"TableKeyImpl{{{self.key}}}"

class TableLocationKeyImpl(TableLocationKey):
    def __init__(self, key: str):
        self.key = key

    def __hash__(self):
        return hash(self.key)

    def __eq__(self, other):
        if not isinstance(other, TableLocationKeyImpl):
            return NotImplemented
        return self.key == other.key

    def __str__(self):
        return f"TableLocationKeyImpl{{{self.key}}}"

class TestTable():
    class TestTableLocation():
        def __init__(self, data_schema: pa.Schema, partitioning_values: Optional[pa.Table]):
            self.partitioning_values = partitioning_values
            self.size_cb: Callable[[int], None] = lambda *x:x
            self.size_failure_cb: Callable[[], None] = lambda *x:x
            self.data: pa.Table = data_schema.empty_table()

        def append_data(self, new_data: pa.Table):
            rbs = self.data.to_batches()
            for batch in new_data.to_batches():
                rbs.append(batch)
            self.data = pa.Table.from_batches(rbs)
            self.size_cb(self.data.num_rows)

    def __init__(self, data_schema: pa.Schema, partitioning_column_schema: Optional[pa.Schema]):
        self.data_schema = data_schema
        self.partitioning_column_schema = partitioning_column_schema
        self.locations: Dict[TableLocationKey, self.TestTableLocation] = {}
        self.location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None] = lambda *x:x
        self.location_failure_cb: Callable[[str], None] = lambda *x:x

    def add_table_location(self, table_location_key: TableLocationKeyImpl,
                           partitioning_column_values: Optional[pa.Table],
                           data_values: pa.Table):
        if table_location_key in self.locations:
            raise ValueError(f"Cannot add table location {table_location_key} already exists")
        new_location = self.TestTableLocation(self.data_schema, partitioning_column_values)
        new_location.append_data(data_values)
        self.locations[table_location_key] = new_location

    def append_table_location(self, table_location_key: TableLocationKeyImpl, data_values: pa.Table):
        if table_location_key not in self.locations:
            raise ValueError(f"Cannot append to non-existent table location {table_location_key}")
        self.locations[table_location_key].append_data(data_values)

class TestBackend(TableDataServiceBackend):
    def __init__(self):
        self.tables: Dict[TableKey, TestTable] = {}

    def add_table(self, table_key: TableKeyImpl, table: TestTable):
        if table_key in self.tables:
            raise ValueError(f"{table_key} already exists")
        self.tables[table_key] = table

    def table_schema(self, table_key: TableKeyImpl,
                     schema_cb: Callable[[pa.Schema, Optional[pa.Schema]], None],
                     failure_cb: Callable[[str], None]) -> None:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return

        table = self.tables[table_key]
        schema_cb(table.data_schema, table.partitioning_column_schema)

    def table_locations(self, table_key: TableKeyImpl,
                        location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
                        success_cb: Callable[[], None],
                        failure_cb: Callable[[str], None]) -> None:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return

        for key, location in self.tables[table_key].locations:
            location_cb([key, location.partitioning_values])
        success_cb()

    def table_location_size(self, table_key: TableKeyImpl, table_location_key: TableLocationKeyImpl,
                            size_cb: Callable[[int], None],
                            failure_cb: Callable[[str], None]) -> None:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return

        table = self.tables[table_key]
        if table_location_key not in table.locations:
            failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
            return

        size_cb(table.locations[table_location_key].data.num_rows)

    def column_values(self, table_key: TableKeyImpl, table_location_key: TableLocationKeyImpl,
                      col: str, offset: int, min_rows: int, max_rows: int,
                      values_cb: Callable[[pa.Table], None],
                      failure_cb: Callable[[str], None]) -> None:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return

        table = self.tables[table_key]
        if table_location_key not in table.locations:
            failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
            return

        location = table.locations[table_location_key]
        values_cb(location.data.select([col]).slice(offset, min_rows))

    def subscribe_to_table_locations(self, table_key: TableKeyImpl,
                                    location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
                                    success_cb: Callable[[], None],
                                    failure_cb: Callable[[str], None]) -> Callable[[], None]:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return lambda *x:x

        table = self.tables[table_key]
        table.location_cb = location_cb
        table.location_failure_cb = failure_cb

        # send all existing locations straight away
        for key, location in table.locations.items():
            location_cb(key, location.partitioning_values)
        success_cb()

        def unsubscribe():
            table.location_cb = lambda *x:x
            table.location_failure_cb = lambda *x:x

        return unsubscribe

    def subscribe_to_table_location_size(self, table_key: TableKeyImpl,
                                         table_location_key: TableLocationKeyImpl,
                                         size_cb: Callable[[int], None],
                                         success_cb: Callable[[], None],
                                         failure_cb: Callable[[str], None]) -> Callable[[], None]:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return lambda *x:x

        table = self.tables[table_key]
        if table_location_key not in table.locations:
            failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
            return lambda *x:x

        location = table.locations[table_location_key]
        location.size_cb = size_cb
        location.failure_cb = failure_cb

        # send existing size
        size_cb(location.data.num_rows)
        success_cb()

        def unsubscribe():
            location.size_cb = lambda *x:x
            location.failure_cb = lambda *x:x

        return unsubscribe

The backend implementation would then be used as follows:

import numpy as np
import deephaven.arrow as dharrow
from deephaven.column import *
from deephaven import new_table

from deephaven.time import to_j_instant
# generate the same data for each location; noting that we do not need to include partitioning columns
location_cols = [
    bool_col(name="Boolean", data=[True, False]),
    byte_col(name="Byte", data=(1, -1)),
    char_col(name="Char", data='-1'),
    short_col(name="Short", data=[1, -1]),
    int_col(name="Int", data=[1, -1]),
    long_col(name="Long", data=[1, -1]),
    long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)),
    float_col(name="Float", data=[1.01, -1.01]),
    double_col(name="Double", data=[1.01, -1.01]),
    string_col(name="String", data=["foo", "bar"]),
    datetime_col(name="Datetime", data=[to_j_instant('2024-10-01T12:30:00 ET'), to_j_instant('2024-10-01T12:45:00 ET')]),
]
location_data = dharrow.to_arrow(new_table(cols=location_cols))

def generate_partitioning_values(ticker: str, exchange: str) -> pa.Table:
    partitioning_cols = [
        string_col(name="Ticker", data=[ticker]),
        string_col(name="Exchange", data=[exchange]),
    ]
    return dharrow.to_arrow(new_table(cols=partitioning_cols))

backend = TestBackend()
data_service = TableDataService(backend)

# generate a simple table
backend.add_table(
    TableKeyImpl("sample"), 
    TestTable(location_data.schema, generate_partitioning_values("DUMMY_VAL", "DUMMY_VAL").schema))

def add_ticker_data(ticker: str, exchange: str):
    table_key = TableKeyImpl("sample")
    table_location_key = TableLocationKeyImpl(ticker + ":" + exchange)
    if table_key not in backend.tables:
        raise ValueError(f'{table_key} does not exist')
    if table_location_key not in backend.tables[table_key].locations:
        backend.tables[table_key].add_table_location(
            table_location_key, generate_partitioning_values(ticker, exchange), location_data)
    else:
        backend.tables[table_key].append_table_location(table_location_key, location_data)

# add just a tiny amount of data
add_ticker_data("GOOG", "NYSE")
add_ticker_data("MSFT", "BZX")
add_ticker_data("MSFT", "BZY")

from deephaven.liveness_scope import LivenessScope
scope = LivenessScope()

with scope.open():
    t = data_service.make_table(TableKeyImpl("sample"), refreshing=True)

Once the table is open and visible in the REPL, could then be appended to with more calls to add_ticker_data like this:

# this adds a new table location to the already opened table
add_ticker_data("GOOG", "BZX") 

# these append to existing table locations of the already opened table
add_ticker_data("MSFT", "BZX") 
add_ticker_data("MSFT", "BZY")

There is some value in asserting that the unsubscribe callbacks returned from subscribe_* methods validate that they are only removing the callbacks that were assigned from the originating call. The system is allowed to race an unsubscribe with new subscribe_* calls. I am not certain if comparing the stored callback to the captured callback was enough, or if python needs to mark critical sections clearly; so I'm leaving this up to the documentation/python experts.