brimdata / super

A novel data lake based on super-structured data
https://zed.brimdata.io/
BSD 3-Clause "New" or "Revised" License
1.38k stars 64 forks source link

Processing values from adjacent pipeline inputs #5308

Open philrz opened 2 weeks ago

philrz commented 2 weeks ago

tl;dr

A community user recently inquired about how to compute the delta between the values in back-to-back input records. The only solution we could come up with that uses existing building blocks used collect() to turn the whole of the input to an an array and process it with over, but this has perf/memory limitations. An approach that works in a streaming mode in pipeline context would be preferred.

Details

Repro is with Zed commit d103420.

The user question as posed in a community Slack thread:

Is there a way to get the previous entry in an array? I want to analyze the gap between the timestamps in this input data.zson.gz:

{level:"info",message:"retrieving batch",ts:"2024-09-26T22:05:45.705018615Z",offset:167000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:06:55.352124847Z",offset:168000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:08:04.096384542Z",offset:169000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:09:13.185300393Z",offset:170000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:10:21.898506504Z",offset:171000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:11:31.302608426Z",offset:172000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:12:40.698136642Z",offset:173000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:13:49.398172997Z",offset:174000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}
{level:"info",message:"retrieving batch",ts:"2024-09-26T22:14:58.007201613Z",offset:175000,page_size:1000,table:"NextTransactionLineLink",total_results:892574}

The best we could come up with is a program like this that reads the whole of the input to an array first.

$ zq -version
Version: v1.18.0-4-gd1034203

$ zq -z '
put id:=count()
| collect(this)
| over this with obj=this => (
  put last_ts := (id<2) ? time(ts) : time(obj[id-2].ts), ts:=time(ts)
  | duration:=ts-last_ts
  | drop last_ts,id
)              
' data.zson.gz

{level:"info",message:"retrieving batch",ts:2024-09-26T22:05:45.705018615Z,offset:167000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:0s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:06:55.352124847Z,offset:168000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m9.647106232s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:08:04.096384542Z,offset:169000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m8.744259695s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:09:13.185300393Z,offset:170000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m9.088915851s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:10:21.898506504Z,offset:171000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m8.713206111s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:11:31.302608426Z,offset:172000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m9.404101922s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:12:40.698136642Z,offset:173000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m9.395528216s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:13:49.398172997Z,offset:174000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m8.700036355s}
{level:"info",message:"retrieving batch",ts:2024-09-26T22:14:58.007201613Z,offset:175000,page_size:1000,table:"NextTransactionLineLink",total_results:892574,duration:1m8.609028616s}

The user is quite happy with this solution since it works fine for the amount of data they've got. However, knowing Zed's per-value size limitations, this approach can only go so far. The fact the entire array needs to be assembled first also had a performance hit, plus the disadvantage that it can't stream outputs incrementally.

While the design of a precise solution is TBD, in preliminary chats, the topic of "window functions" and LAG / LEAD as they appear in SQL come to mind.

philrz commented 1 week ago

For comparison, here's the equivalent in SQL using LAG.

$ zq -j data.zson.gz > data.ndjson 

$ duckdb
v1.1.1 af39bd0dcf
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
D CREATE TABLE log (
    level         VARCHAR(4) NOT NULL,
    message       VARCHAR(25) NOT NULL,
    ts            TIMESTAMP NOT NULL,
    "offset"      INT NOT NULL,
    page_size     INT NOT NULL,
    "table"       VARCHAR(25) NOT NULL,
    total_results INT NOT NULL
  ); 

D COPY log from "data.ndjson";

D SELECT *,ts - LAG(ts, 1, ts) OVER (ORDER BY ts) FROM log;
┌─────────┬──────────────────┬────────────────────────────┬────────┬───────────┬─────────────────────────┬───────────────┬──────────────────────────────────────────┐
│  level  │     message      │             ts             │ offset │ page_size │          table          │ total_results │ (ts - lag(ts, 1, ts) OVER (ORDER BY ts)) │
│ varchar │     varchar      │         timestamp          │ int32  │   int32   │         varchar         │     int32     │                 interval                 │
├─────────┼──────────────────┼────────────────────────────┼────────┼───────────┼─────────────────────────┼───────────────┼──────────────────────────────────────────┤
│ info    │ retrieving batch │ 2024-09-26 22:05:45.705018 │ 167000 │      1000 │ NextTransactionLineLink │        892574 │ 00:00:00                                 │
│ info    │ retrieving batch │ 2024-09-26 22:06:55.352124 │ 168000 │      1000 │ NextTransactionLineLink │        892574 │ 00:01:09.647106                          │
│ info    │ retrieving batch │ 2024-09-26 22:08:04.096384 │ 169000 │      1000 │ NextTransactionLineLink │        892574 │ 00:01:08.74426                           │
│ info    │ retrieving batch │ 2024-09-26 22:09:13.1853   │ 170000 │      1000 │ NextTransactionLineLink │        892574 │ 00:01:09.088916                          │
│ info    │ retrieving batch │ 2024-09-26 22:10:21.898506 │ 171000 │      1000 │ NextTransactionLineLink │        892574 │ 00:01:08.713206                          │
│ info    │ retrieving batch │ 2024-09-26 22:11:31.302608 │ 172000 │      1000 │ NextTransactionLineLink │        892574 │ 00:01:09.404102                          │
│ info    │ retrieving batch │ 2024-09-26 22:12:40.698136 │ 173000 │      1000 │ NextTransactionLineLink │        892574 │ 00:01:09.395528                          │
│ info    │ retrieving batch │ 2024-09-26 22:13:49.398172 │ 174000 │      1000 │ NextTransactionLineLink │        892574 │ 00:01:08.700036                          │
│ info    │ retrieving batch │ 2024-09-26 22:14:58.007201 │ 175000 │      1000 │ NextTransactionLineLink │        892574 │ 00:01:08.609029                          │
└─────────┴──────────────────┴────────────────────────────┴────────┴───────────┴─────────────────────────┴───────────────┴──────────────────────────────────────────┘
philrz commented 1 day ago

There was another recent community Slack thread with a use case that seems to fit under this same umbrella.

Their example started with these two separate input records:

{id:1,data:{b:2}}
{c:3}

with the user's goal being to replace the embedded record under data with the latter of the two input records, such that the final output is {id:1,data:{c:3}}.

To achieve this with what's in the language currently, since assembling the desired output requires combining fields from separate record values in the input data stream, it seems inevitable it has to kick off with an aggregation to get the multiple values into a single complex value. Then the complex value can be manipulated to move the embedded record.

On their own, the user came up with this approach that uses array indexing.

$ zq -version
Version: v1.18.0-29-g4f551f2b

$ echo "{id:1,data:{b:2}} {c:3}" |   zq -z 'collect(this) | {o1:this[0],o2:this[1]} | o1.data:=o2 | yield o1' -
{id:1,data:{c:3}}

This has the advantage of not having to know anything about the second input value, i.e., it works just as fine even if it's not a record.

$ echo "{id:1,data:{b:2}} [1,2,3]" |   zq -z 'collect(this) | {o1:this[0],o2:this[1]} | o1.data:=o2 | yield o1' -
{id:1,data:[1,2,3]}

As a possible alternative that may fit the user's original stated goal of moving a record, I proposed using the idiom below that turns the whole of the input into a single record, which then allows for the move via a single call to cut, though this requires knowing the field name of the record being moved.

$ echo "{id:1,data:{b:2}} {c:3}" | zq -z 'over this | collect(this) | unflatten(this) | cut id,data:={c}' -
{id:1,data:{c:3}}

In conclusion, however, we recognize the language would require some enhancement to achieve this more directly, similar to what was shown above with SQL's LAG / LEAD.