kaskada-ai / kaskada

Modern, open-source event-processing
https://kaskada.io/
Apache License 2.0
351 stars 15 forks source link

bug: slicing returns no results #520

Open kevinjnguyen opened 1 year ago

kevinjnguyen commented 1 year ago

Description Slicing (documented here) does not produce the expected results for either EntityPercentFilter or EntityFilter. Both appear to return no results.

To Reproduce Steps to reproduce the behavior:

  1. Create a table called transactions
    import kaskada.table
    kaskada.table.create_table('transactions', 'time', 'name')
  2. Load the provided CSV file: dataset1.csv
    kaskada.table.load('transactions', 'dataset1.csv')
  3. Create a slice and set the default slice
    
    from kaskada.slice_filters import EntityFilter
    import kaskada.client

entity_keys = ["m0", "m10"] entity_filter = EntityFilter(entity_keys) kaskada.client.set_default_slice(entity_filter)


or 

```python
from kaskada.slice_filters import EntityPercentFilter
filter_percentage = 50
entity_percent_filter = EntityPercentFilter(filter_percentage)
kaskada.client.set_default_slice(entity_percent_filter)
  1. Perform a query
    %%fenl
    transactions

Actual Behavior The result of the query is an empty dataframe.

Expected Behavior A filtered dataset with only results for m0 and m10 (about 100) records.

Additional context

Relevant Logs / Links Relevant stack traces, results, links, or screenshots of the issue.

Manager Logs

2023-07-17T13:19:53.258539Z  INFO started call grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.start_time=2023-07-17T08:19:53-05:00 grpc.time_ms=0.075 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.259147Z  INFO started call grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.request.content="{\"query\":{\"expression\":\"transactions\",\"destination\":{\"objectStore\":{\"fileType\":\"FILE_TYPE_PARQUET\"}},\"resultBehavior\":\"RESULT_BEHAVIOR_ALL_RESULTS\",\"limits\":{},\"slice\":{\"entityKeys\":{\"entityKeys\":[\"m0\",\"m10\"]}}},\"queryOptions\":{\"streamMetrics\":true,\"presignResults\":true}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.512 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.262279Z  INFO sending compile request client_id=default-client-id method=compileManager.compileQuery owner_id=1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784 request={"expression_kind":1,"feature_set":{"query":"transactions"},"per_entity_behavior":1,"slice_request":{"Slice":{"EntityKeys":{"entity_keys":["m0","m10"]}}},"tables":[{"config":{"group_column_name":"name","name":"transactions","source":{"Source":{"Kaskada":{}}},"time_column_name":"time","uuid":"ebb2145b-2be6-4ff1-ad61-43161fc844eb"},"metadata":{"schema":{"fields":[{"data_type":{"Kind":{"Primitive":6}}},{"data_type":{"Kind":{"Primitive":14}},"name":"id"},{"data_type":{"Kind":{"Primitive":24}},"name":"time"},{"data_type":{"Kind":{"Primitive":14}},"name":"name"},{"data_type":{"Kind":{"Primitive":6}},"name":"amount"},{"data_type":{"Kind":{"Primitive":14}},"name":"random_col"}]}}}]} request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.282319Z  INFO received compile response client_id=default-client-id fenl_diagnostics={} free_names=["transactions"] incremental_enabled=false method=compileManager.compileQuery missing_names=[] owner_id=1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784 plan_hash={"hash":"DgoOtKzUm/wM6O3syzz8dRd3eYhZnui9ayyEOg=="} request_id=2d8c95eb4fd7b1f738855fb3d17361f4 result_type={"Kind":{"Struct":{"fields":[{"data_type":{"Kind":{"Primitive":6}}},{"data_type":{"Kind":{"Primitive":14}},"name":"id"},{"data_type":{"Kind":{"Primitive":24}},"name":"time"},{"data_type":{"Kind":{"Primitive":14}},"name":"name"},{"data_type":{"Kind":{"Primitive":6}},"name":"amount"},{"data_type":{"Kind":{"Primitive":14}},"name":"random_col"}]}}} slices=[{"Slice":{"EntityKeys":{"entity_keys":["m0","m10"]}},"table_name":"transactions"}]
2023-07-17T13:19:53.283332Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"state\":\"STATE_ANALYSIS\",\"config\":{\"dataTokenId\":\"2fc08f1a-5290-46ad-8221-3fc03a22e1f2\",\"sliceRequest\":{\"entityKeys\":{\"entityKeys\":[\"m0\",\"m10\"]}}},\"analysis\":{\"canExecute\":true,\"schema\":{\"fields\":[{\"dataType\":{\"primitive\":\"PRIMITIVE_TYPE_I64\"}},{\"name\":\"id\",\"dataType\":{\"primitive\":\"PRIMITIVE_TYPE_STRING\"}},{\"name\":\"time\",\"dataType\":{\"primitive\":\"PRIMITIVE_TYPE_TIMESTAMP_NANOSECOND\"}},{\"name\":\"name\",\"dataType\":{\"primitive\":\"PRIMITIVE_TYPE_STRING\"}},{\"name\":\"amount\",\"dataType\":{\"primitive\":\"PRIMITIVE_TYPE_I64\"}},{\"name\":\"random_col\",\"dataType\":{\"primitive\":\"PRIMITIVE_TYPE_STRING\"}}]}},\"fenlDiagnostics\":{},\"metrics\":{},\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.055 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.283665Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"},\"queryId\":\"4ea548e1-3f61-4a3d-99bd-33f476b51335\"}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.013 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.283724Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"state\":\"STATE_PREPARING\",\"metrics\":{},\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.004 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.283971Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"state\":\"STATE_PREPARING\",\"metrics\":{\"timePreparing\":\"0.000119208s\"},\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.048 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.406298Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"state\":\"STATE_COMPUTING\",\"metrics\":{\"timePreparing\":\"0.122545708s\"},\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.013 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.406535Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"state\":\"STATE_COMPUTING\",\"metrics\":{\"timePreparing\":\"0.122545708s\",\"timeComputing\":\"0.000209542s\"},\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.020 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.409088Z  INFO Populating snapshot config if needed client_id=default-client-id incremental_enabled=false is_current_data_token=true method=manager.InitiateQuery owner_id=1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784 request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.409136Z  INFO sending streaming query request to compute backend changed_since_time=null client_id=default-client-id compute_snapshot_config=null destination={"Destination":{"ObjectStore":{"file_type":1,"output_prefix_uri":"file:///Users/kevin.nguyen/.cache/kaskada/data/results/1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784/DgoOtKzUm_wM6O3syzz8dRd3eYhZnui9ayyEOg/"}}} final_result_time=null limits={} method=manager.InitiateQuery owner_id=1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784 request_id=2d8c95eb4fd7b1f738855fb3d17361f4 tables=[{"config":{"group_column_name":"name","name":"transactions","source":{"Source":{"Kaskada":{}}},"time_column_name":"time","uuid":"ebb2145b-2be6-4ff1-ad61-43161fc844eb"},"file_sets":[{"prepared_files":[{"max_event_time":{"nanos":145224192,"seconds":-9223372037},"metadata_path":"file:///Users/kevin.nguyen/.cache/kaskada/data/tables/1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784/ebb2145b-2be6-4ff1-ad61-43161fc844eb/prepared/prep_6/94010854a6320f34fcf4fe18b934591f/79dcf2c2-161f-47f0-a11d-66f8b057e5e7/part-0-metadata.parquet","min_event_time":{"nanos":145224192,"seconds":-9223372037},"path":"file:///Users/kevin.nguyen/.cache/kaskada/data/tables/1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784/ebb2145b-2be6-4ff1-ad61-43161fc844eb/prepared/prep_6/94010854a6320f34fcf4fe18b934591f/79dcf2c2-161f-47f0-a11d-66f8b057e5e7/part-0.parquet"}],"slice_plan":{"Slice":{"EntityKeys":{"entity_keys":["m0","m10"]}},"table_name":"transactions"}}],"metadata":{"schema":{"fields":[{"data_type":{"Kind":{"Primitive":6}}},{"data_type":{"Kind":{"Primitive":14}},"name":"id"},{"data_type":{"Kind":{"Primitive":24}},"name":"time"},{"data_type":{"Kind":{"Primitive":14}},"name":"name"},{"data_type":{"Kind":{"Primitive":6}},"name":"amount"},{"data_type":{"Kind":{"Primitive":14}},"name":"random_col"}]}}}]
2023-07-17T13:19:53.414472Z  INFO received progress from execute request client_id=default-client-id method=queryservice.CreateQuery owner_id=1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784 progress={} request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.414765Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"state\":\"STATE_COMPUTING\",\"metrics\":{\"timePreparing\":\"0.122545708s\",\"timeComputing\":\"0.008334125s\",\"outputFiles\":\"1\"},\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"},\"destination\":{\"objectStore\":{\"fileType\":\"FILE_TYPE_PARQUET\",\"outputPrefixUri\":\"file:///Users/kevin.nguyen/.cache/kaskada/data/results/1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784/DgoOtKzUm_wM6O3syzz8dRd3eYhZnui9ayyEOg/\",\"outputPaths\":{\"paths\":[\"/Users/kevin.nguyen/.cache/kaskada/data/results/1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784/DgoOtKzUm_wM6O3syzz8dRd3eYhZnui9ayyEOg/2b3fe9b0-6c58-41ef-a6b5-e4eb1731f545-part-0.parquet\"]}}}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.020 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.415002Z  INFO received final message from execute request client_id=default-client-id method=queryservice.CreateQuery owner_id=1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784 query_done=true request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.415058Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"state\":\"STATE_COMPUTING\",\"metrics\":{\"timePreparing\":\"0.122545708s\",\"timeComputing\":\"0.008777625s\",\"outputFiles\":\"1\"},\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.008 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.415124Z  INFO stream response grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.response.content="{\"state\":\"STATE_SUCCESS\",\"metrics\":{\"timePreparing\":\"0.122545708s\",\"timeComputing\":\"0.008777625s\",\"outputFiles\":\"1\"},\"requestDetails\":{\"requestId\":\"2d8c95eb4fd7b1f738855fb3d17361f4\"}}" grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.time_ms=0.003 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4
2023-07-17T13:19:53.415179Z  INFO finished call grpc.code=OK grpc.component=server grpc.method=CreateQuery grpc.method_type=server_stream grpc.service=kaskada.kaskada.v1alpha.QueryService grpc.start_time=2023-07-17T08:19:53-05:00 grpc.time_ms=157.286 peer.address=[::1]:62853 protocol=grpc request_id=2d8c95eb4fd7b1f738855fb3d17361f4

Engine Logs

2023-07-17T13:19:53.410114Z DEBUG Start of request:Execute:Start of request:Execute: Starting query execution
2023-07-17T13:19:53.410185Z  INFO Start of request:Execute:Start of request:Execute: Skipping empty file 'file:///Users/kevin.nguyen/.cache/kaskada/data/tables/1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784/ebb2145b-2be6-4ff1-ad61-43161fc844eb/prepared/prep_6/94010854a6320f34fcf4fe18b934591f/79dcf2c2-161f-47f0-a11d-66f8b057e5e7/part-0.parquet' for table 'transactions'
2023-07-17T13:19:53.410214Z DEBUG Start of request:Execute:Start of request:Execute:Operation{index=1 operation_label="select"}: No state to restore
2023-07-17T13:19:53.410225Z DEBUG Start of request:Execute:Start of request:Execute:Operation{index=1 operation_label="select"}: Full operation is SelectOperation { condition_input_column: 4, input_stream: ReceiverStream { inner: Receiver { chan: Rx { inner: Chan { tx: Tx { block_tail: 0x13b013800, tail_position: 0 }, semaphore: Semaphore { semaphore: Semaphore { permits: 7 }, bound: 7 }, rx_waker: AtomicWaker, tx_count: 1, rx_fields: "..." } } } }, helper: SingleConsumerHelper { incoming_columns: [3], key_index_state: KeyHashIndex { key_hash_to_index: "0 entries" } } }
2023-07-17T13:19:53.410238Z DEBUG Start of request:Execute:Start of request:Execute:Operation{index=0 operation_label="scan"}: No state to restore
2023-07-17T13:19:53.410231Z  INFO Start of request:Execute: Waiting for 3 compute threads
2023-07-17T13:19:53.410291Z DEBUG Start of request:Execute:Start of request:Execute:Operation{index=0 operation_label="scan"}: Full operation is ScanOperation { projected_schema: Schema { fields: [Field { name: "", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "id", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "amount", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "random_col", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, key_hash_index: KeyHashIndex { key_hash_to_index: "0 entries" }, .. }
2023-07-17T13:19:53.410328Z DEBUG Start of request:Execute:Start of request:Execute:Operation{index=0 operation_label="scan"}: No state store; nothing to save
2023-07-17T13:19:53.410345Z DEBUG Start of request:Execute:Start of request:Execute:Operation{index=1 operation_label="select"}: No state store; nothing to save
2023-07-17T13:19:53.410388Z  INFO Task completed
2023-07-17T13:19:53.410392Z  INFO Task completed
2023-07-17T13:19:53.414258Z  INFO Start of request:Execute:Start of request:Execute:Output Writer{destination=Destination { destination: Some(ObjectStore(ObjectStoreDestination { file_type: Parquet, output_prefix_uri: "file:///Users/kevin.nguyen/.cache/kaskada/data/results/1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784/DgoOtKzUm_wM6O3syzz8dRd3eYhZnui9ayyEOg/", output_paths: None })) }}: Wrote 0 to file:///Users/kevin.nguyen/.cache/kaskada/data/results/1bbd83fc-a1ca-4ab9-b0b1-253cd42ba784/DgoOtKzUm_wM6O3syzz8dRd3eYhZnui9ayyEOg/2b3fe9b0-6c58-41ef-a6b5-e4eb1731f545-part-0.parquet
2023-07-17T13:19:53.414298Z  INFO Task completed
2023-07-17T13:19:53.414302Z  INFO Compute threads completed
2023-07-17T13:19:53.414318Z  INFO Uploading checkpoint files: No snapshot config; not uploading compute store.
2023-07-17T13:19:53.414342Z  INFO No diagnostic to upload for kind PlanYaml
2023-07-17T13:19:53.414345Z  INFO No diagnostic to upload for kind FlightRecord
epinzur commented 1 year ago

this is confusing to me because we have integration tests for percent slicing, which should be working per the last integration test run.

see: https://github.com/kaskada-ai/kaskada/blob/main/tests/integration/api/query_v1_slicing_test.go

could the issue only be for entityfilters?

kevinjnguyen commented 1 year ago

could the issue only be for entityfilters?

This appears to be true. I ran it again for the Entity Percent Filters and it appears to work as expected. My initial try may just not have refreshed the filter.

bjchambers commented 1 year ago

I wonder if it could be related to the hash function? Perhaps the entity is being compared differently (eg., hashing a different value) so the hashes are never equal?