uhop / stream-json

The micro-library of Node.js stream components for creating custom JSON processing pipelines with a minimal memory footprint. It can parse JSON files far exceeding available memory streaming individual primitives using a SAX-inspired API.
Other
976 stars 47 forks source link

Dropping objects inside array that is not the root object conditionally #77

Closed raine closed 4 years ago

raine commented 4 years ago

Hey. I've looking at the various wiki pages of this project and played around Pick etc. for a good while now but can't figure out if this library support this use case or not:

Incoming JSON:

{
  "foo": "bar",
  "arr": [
    { "x": 1 },
    { "x": 0 },
    { "x": 1 },
    { "x": 0 }
  ]
}

I would like parse this JSON in a streaming fashion, drop objects from arr where x === 0, otherwise keep it as is, and stringify the JSON for the next consumer which is HTTP response to a client.

The Pick Filter looks promising but I can't really figure out how target nested objects in this way.

Thank you in advance.

uhop commented 4 years ago

It really depends on what you want to do. If every item of your array is small, we can simplify it considerably:

const {chain, none} = require('stream-chain');
const {parser} = require('stream-json');
const {pick} = require('stream-json/filters/Pick');
const {streamValues} = require('stream-json/streamers/StreamValues');
const {disassembler} = require('stream-json/Disassembler');
const {stringer} = require('stream-json/Stringer');

// assembling a processing pipe:
const pipe = chain([
  // we have to parse JSON
  parser(),
  // we'll pick values from the array
  pick({filter: /^arr\.\d+/}),
  // assemble them to JS values and stream them
  streamValues(),
  // now let's filter them according to our criteria
  item => item.value.x !== 0 ? item : none,
  // disassemble a stream of objects back to tokens
  disassembler(),
  // back to JSON as an array of objects
  stringer({makeArray: true})
], {writableObjectMode: false, readableObjectMode: false});

// pipe the result to stdout so we can see it
pipe.pipe(process.stdout);

// it is always good keep an eye on errors
pipe.on('error', error => console.error('ERROR:', error));

// our "database"
const data = {
  foo: 'bar',
  arr: [
    {x: 1},
    {x: 0},
    {x: 1},
    {x: 0}
  ]
};

// let's feed it
pipe.end(JSON.stringify(data));

If your next consumer can handle JSONL, you can replace disassembler() + stringer() with jsonl/Stringer (see the wiki).

uhop commented 4 years ago

As an optimization, you can conditionally assemble object with StreamValues, which will shave off some time. For that, instead of this pair:

streamValues(),
item => item.value.x !== 0 ? item : none,

You can use more efficient:

streamValues({objectFilter: asm => {
  if (asm.current && asm.current.hasOwnProperty('x')) return asm.current.x !== 0;
}}),

How much you save depends on how many other properties in your items, and how close to the beginning of an object is your deciding property ("x" in this case). As soon as the object filter function returns false the rest is going to be ignored.

raine commented 4 years ago

Thank you for the comprehensive examples.

Looking at the first snippet, it does not seem to return the original root object (with required modifications to arr), but rather focuses into the arr and returns the picked objects.

What I'm having trouble with is how to retain the original structure and data of the input but modify some part of it that may not be on the top level.

uhop commented 4 years ago

When streaming big files we cannot go back and retract already streamed pieces. For example, if we stream an object, we cannot "unstream" it, if some subobject does not conform to some criteria. We have to buffer it somehow and make a decision when to stream the buffer or discard it completely. In general, because our memory is limited, we cannot do that and stream as we go or assemble an object before streaming. There is no generic way to do that reasonably. That's why the design above.

In practice, I suggest implementing two workflows:

  1. Stream everything but arr. Use Replace for that.
  2. Stream just arr as explained above.

Then you can either merge them back together or just use two inputs as it fits your algorithm.

In one case, I actually merged three text (JSON) files to create one. It was done purely with Node's streams with the prep done with stream-json.

uhop commented 4 years ago

I didn't try it, but https://www.npmjs.com/package/stream-template looks promising to merge text streams.

nazrhyn commented 4 years ago

It's kind of insane to me that @raine happens to have just asked about almost the exact case I'm trying to work through, myself, right now. The only difference is that, for me, the outer structure is an array of those base/outer objects which, themselves, have a property which is an array of objects.

[
  { other: 'prop', list: [ { item: 1 }, ... ] },
  ...
]

I've been thinking about how to know what outer object goes with which inner objects if I've got the two streams like you suggest, and the best I can think about is reading that from the path somehow. In both cases, I'll have the n index on the outer array.

But ... where do I get that from? Do I just add a pick with a filter function that always returns false but has the side effect of tracking what outer index I'm on? Is there some other construct that would be better to just observe the path but not do anything?

Can you think of some better way to associate the inner list items with the appropriate outer object??


As an aside, I have to say that your documentation is fantastic, and the fact that you are so active here is something quite singular in the open source world. Thanks for such a cool set of well-documented and well-thought-out tools. :heart:

uhop commented 4 years ago

What you are asking is the black magic of stream data processing. I wish it was a mechanical thing, but it is an art. Fortunately like any art, it has some patterns. But first, we should make some reasonable assumptions regarding the shape of your data. Judging by your sample:

Given all that I created a sample data:

[
  {"other": "a", "list": [{"item": 1}, {"item": 2}, {"item": 3}, {"item": 4}]},
  {"other": "b", "list": [{"item": 1}, {"item": 2}, {"item": 3}, {"item": 4}]},
  {"other": "c", "list": [{"item": 1}, {"item": 2}, {"item": 3}, {"item": 4}]}
]

My go-to tool in situations like that is Assembler. The trick is to manage memory properly. So here we go:

const fs = require('fs');

const {chain, none} = require('stream-chain');

const {parser} = require('stream-json');
const Assembler = require('stream-json/Assembler');

// custom assembler

const asm = new Assembler();

const customAsm = chunk => {
  // the standard way to consume a chunk
  asm.consume(chunk);
  // let's inspect "list"
  if (asm.depth !== 3) return none; // not there yet
  // optionally we can check the path and invariants, we don't
  const list = asm.current; // our list
  if (!list.length) return none; // it is empty
  // take an assembled item off the list
  // this way we avoid memory problems
  const item = list.pop();
  // the previous item on stack is the parent
  const parent = asm.stack[asm.stack.length - 2];
  // let's borrow properties from the parent
  item.other = parent.other;
  // here we go: a shiny new item!
  return item;
};

// the pipeline

const pipeline = chain([
  fs.createReadStream('./data.json', {encoding: 'utf8'}),
  parser(),
  customAsm
]);

pipeline.on('data', data => console.log(data));
pipeline.on('error', error => console.error(error));
pipeline.on('finish', () => console.log('Done.'));

The output:

{ item: 1, other: 'a' }
{ item: 2, other: 'a' }
{ item: 3, other: 'a' }
{ item: 4, other: 'a' }
{ item: 1, other: 'b' }
{ item: 2, other: 'b' }
{ item: 3, other: 'b' }
{ item: 4, other: 'b' }
{ item: 1, other: 'c' }
{ item: 2, other: 'c' }
{ item: 3, other: 'c' }
{ item: 4, other: 'c' }
Done.

It is pretty much what I wanted to achieve in this example: stream items while "enriching" them with the properties of their parents.

Obviously, you may have multiple parents with many properties you want to import in your item. Or you may want to generate items with a different shape. Everything is up to you! But make sure that the assumptions hold. For example, we could not generate what we did if other goes after list. It's probably still possible, but will require multi-pass algorithms, splitting streams, painstakingly joining them later, while processing items, and so on.

I hope it was useful for you.

nazrhyn commented 4 years ago

Wow, I feel like I'm being inducted into a secret society. 👁‍🗨

This is pretty fantastic. I figured there was some lower-level thing that I was probably going to have to dig into (hence me reaching for kind of abusing the function form of filter in a pick).

Luckily, in this case, I actually fully control the shape of the data as I've, in an earlier process, written it out myself; so, I should be able to make sure that the list comes last. In fact, I think it already does 🤔. I think the only difference in my case is that I will be reconstructing the parent fully, including the child list, but filtering the items in the child list to exclude certain ones. I am pretty sure I can see how to do that.

Thanks for your time and that example. I'm excited to try this out tomorrow!

uhop commented 4 years ago

The example above takes care of list but allows the outer array to grow. In the real code, it should be taken care of the same way, e.g., right after consuming a chunk:

if (asm.depth === 1) {
  asm.current.pop();
  return none;
}

The idea is to pop it periodically when it grows.

nazrhyn commented 4 years ago

Here's what I ended up with, combining all of this. The idea was:

const pipeline = chain([
    parser(),
    custom,
]);

function custom(chunk) {
    asm.consume(chunk);

    if (asm.depth === 1 && asm.current.length) {
        asm.current.pop();
        return none;
    }

    if (asm.depth === 2 && 'list' in asm.current) {
        return asm.current;
    }

    if (asm.depth !== 3) {
        return none;
    }

    const list = asm.current;

    if (!list.length) {
        return none;
    }

    const lastItem = list[list.length - 1];

    if (lastItem.item % 2 !== 0) {
        list.pop();
    }

    return none;
}

So, the output of the pipeline is in object mode and is a sequence of outer list objects containing filtered inner list items.

I've tested it (1) just collecting them up with the data and finish events and (2) streaming directly to a file by adding disassembler and stringer to the pipeline...

const pipeline = chain([
    parser(),
    custom,
    disassembler(),
    stringer({ makeArray: true }),
]);

...and then just piping it to a writable stream. Of course, in (1), the memory savings in the assembler is negated by actually still collecting it all in memory. Ultimately whether I have to do that or not will depend on some quirks of the transport when I send this data out. I'm hoping I can figure out a nice way around that.

Thanks, again, and if you have any more thoughts, I'd love to hear them.

uhop commented 4 years ago

Looks fine. If you want to pop only the outer list objects, the simplest way to do it is to change your code like that:

function custom(chunk) {
  asm.consume(chunk);

  if (asm.depth === 1 && asm.current.length) {
    return asm.current.pop();
  }

  if (asm.depth !== 3) {
    return none;
  }

  const list = asm.current;

  // filter list items as you see fit...

  return none;
}

But that's the idea in a nutshell.