brimdata / zed

A novel data lake based on super-structured data
https://zed.brimdata.io/
BSD 3-Clause "New" or "Revised" License
1.38k stars 67 forks source link

Processing values from adjacent pipeline inputs #5308

Open philrz opened 6 days ago

philrz commented 6 days ago

tl;dr

A community user recently inquired about how to compute the delta between the values in back-to-back input records. The only solution we could come up with that uses existing building blocks used collect() to turn the whole of the input to an an array and process it with over, but this has perf/memory limitations. An approach that works in a streaming mode in pipeline context would be preferred.

Details

Repro is with Zed commit d103420.

The user question as posed in a community Slack thread:

Is there a way to get the previous entry in an array? I want to analyze the gap between the timestamps in this input data.zson.gz:

{level:"info",message:"retrieving batch",ts:"2024-09-26T22:05:45.705018615Z",offset:167000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:06:55.352124847Z",offset:168000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:08:04.096384542Z",offset:169000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:09:13.185300393Z",offset:170000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:10:21.898506504Z",offset:171000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:11:31.302608426Z",offset:172000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:12:40.698136642Z",offset:173000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:13:49.398172997Z",offset:174000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:14:58.007201613Z",offset:175000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}

The best we could come up with is a program like this that reads the whole of the input to an array first.

$ zq -version
Version: v1.18.0-4-gd1034203

$ zq -z '
put id:=count()
| collect(this)
| over this with obj=this => (
  put last_ts := (id<2) ? time(ts) : time(obj[id-2].ts), ts:=time(ts)
  | duration:=ts-last_ts
  | drop last_ts,id
)              
' data.zson.gz

{level:"info",message:"retrieving batch",ts:2024-09-26T22:05:45.705018615Z,offset:167000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:0s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:06:55.352124847Z,offset:168000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m9.647106232s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:08:04.096384542Z,offset:169000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m8.744259695s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:09:13.185300393Z,offset:170000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m9.088915851s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:10:21.898506504Z,offset:171000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m8.713206111s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:11:31.302608426Z,offset:172000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m9.404101922s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:12:40.698136642Z,offset:173000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m9.395528216s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:13:49.398172997Z,offset:174000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m8.700036355s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:14:58.007201613Z,offset:175000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m8.609028616s}

The user is quite happy with this solution since it works fine for the amount of data they've got. However, knowing Zed's per-value size limitations, this approach can only go so far. The fact the entire array needs to be assembled first also had a performance hit, plus the disadvantage that it can't stream outputs incrementally.

While the design of a precise solution is TBD, in preliminary chats, the topic of "window functions" and LAG / LEAD as they appear in SQL come to mind.