Manual refreshing - user specifies which snapshot to load and the engine will parse the snapshot to add/remove Iceberg data files as needed and notify downstream tables of the changes
Auto refreshing - at regular intervals (user configurable) the engine will query Iceberg for the latest snapshot and then parse and load
Example code:
Java automatic and manually refreshing tables
import io.deephaven.iceberg.util.*;
import org.apache.iceberg.catalog.*;
// Create a map to hold the Iceberg Catalog properties
def properties = [
"type": "rest",
"uri": "http://rest:8181",
"client.region": "us-east-1",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.endpoint": "http://minio:9000",
]
adapter = IcebergTools.createAdapter("generic-adapter", properties);
//////////////////////////////////////////////////////////////////////
tableAdapter = adapter.loadTable("sales.sales_multi")
snapshots = tableAdapter.snapshots()
definition = tableAdapter.definitionTable()
//////////////////////////////////////////////////////////////////////
// Load the latest snapshot as a static table
sales_multi_static = tableAdapter.table()
// Load a specific snapshot as a static table
sales_multi_static = tableAdapter.table(6119130922262455673L)
//////////////////////////////////////////////////////////////////////
// Manual refreshing
refreshing_instructions = IcebergInstructions.builder()
.updateMode(IcebergUpdateMode.manualRefreshingMode())
.build()
// Load a table with a specific snapshot
sales_multi = adapter.readTable(
"sales.sales_multi",
5120804857276751995,
iceberg_instructions)
// Update the table to a specific snapshot
sales_multi.update(848129305390678414L)
// Update to the latest snapshot
sales_multi.update()
//////////////////////////////////////////////////////////////////////
import io.deephaven.iceberg.util.IcebergUpdateMode;
// Automatic refreshing every 1 second
iceberg_instructions = IcebergInstructions.builder()
.updateMode(IcebergUpdateMode.autoRefreshingMode(1_000L))
.build()
sales_multi = tableAdapter.table(iceberg_instructions)
// Load the table using the default refresh of 60 seconds
iceberg_instructions = IcebergInstructions.builder()
.updateMode(IcebergUpdateMode.autoRefreshingMode())
.build()
sales_multi = tableAdapter.table(iceberg_instructions)
Python automatic and manually refreshing tables
from deephaven.experimental import s3, iceberg
local_adapter = iceberg.adapter(name="generic-adapter", properties={
"type" : "rest",
"uri" : "http://rest:8181",
"client.region" : "us-east-1",
"s3.access-key-id" : "admin",
"s3.secret-access-key" : "password",
"s3.endpoint" : "http://minio:9000"
});
t_namespaces = local_adapter.namespaces()
t_tables = local_adapter.tables("sales")
## Create a table adapter for
table_adapter = local_adapter.load_table("sales.sales_multi")
t_snapshots = table_adapter.snapshots()
t_definition = table_adapter.definition()
#################################################
# Get the latest snapshot as a static table
sales_multi_static_latest = table_adapter.table()
# Get a specific snapshot as a static table
sales_multi_static_snap = table_adapter.table(snapshot_id=6119130922262455673)
#################################################
iceberg_instructions = iceberg.IcebergInstructions(
update_mode=iceberg.IcebergUpdateMode.manual_refresh())
# Get the latest snapshot as a manual refreshing table
sales_multi_refreshing = table_adapter.table(instructions=iceberg_instructions)
# Get a specific snapshot as a manual refreshing table
sales_multi_refreshing = table_adapter.table(
snapshot_id=6119130922262455673,
instructions=iceberg_instructions)
# Update to a specific snapshot
sales_multi_refreshing.update(861950607215619880)
# Update to a specific snapshot
sales_multi_refreshing.update(4720492918960789101)
# Update to the latest snapshot
sales_multi_refreshing.update()
#################################################
# Get an auto refreshing table that updates each second (1000 ms)
iceberg_instructions = iceberg.IcebergInstructions(
update_mode=iceberg.IcebergUpdateMode.auto_refresh(1000))
sales_multi = table_adapter.table(instructions=iceberg_instructions)
# Get an auto refreshing table that updates at the default interval of 60 seconds
iceberg_instructions = iceberg.IcebergInstructions(
update_mode=iceberg.IcebergUpdateMode.auto_refresh())
sales_multi = table_adapter.table(instructions=iceberg_instructions)
This issue was auto-generated
PR: https://github.com/deephaven/deephaven-core/pull/5707 Author: lbooker42
Original PR Body
Add two methods of refreshing tables:
Example code:
Java automatic and manually refreshing tables
Python automatic and manually refreshing tables