HyperionGray / trio-chrome-devtools-protocol

Trio driver for Chrome DevTools Protocol (CDP)
MIT License
60 stars 17 forks source link

Doing a heap snapshot #3

Closed jean closed 4 years ago

jean commented 4 years ago

Sorry to abuse the tracker for support. Is there a better forum?

I'm trying to port this code:

const chunks = [];
cdpSession.on('HeapProfiler.addHeapSnapshotChunk', ({chunk}) => {chunks.push(chunk);});
await cdpSession.send('HeapProfiler.takeHeapSnapshot', {reportProgress: false});
rawSnapshotData = chunks.join('');
fs.writeFile(path, Buffer.from(rawSnapshotData), (ok, err) => {}

This is what I have now. Obvious problem: it doesn't gather chunks:

            async with session.wait_for(heap_profiler.AddHeapSnapshotChunk):
                session.execute(heap_profiler.take_heap_snapshot())
            with open(outdir / '%s.heapsnapshot' % datetime.today().isoformat(), 'ab') as outfile:
                outfile.write(chunk)

How do I react to all AddHeapSnapshotChunk events until I have the whole snapshot?

mehaase commented 4 years ago

Hi @jean, this is a fine place to ask for help. There is a listen() API that gets a stream of events. It's not documented well (or at all, perhaps) but the basic idea is this (this is untested code—just a sketch of how it should work):

event_stream = await session.listen(heap_profiler.AddHeapSnapshotChunk)
session.execute(heap_profiler.take_heap_snapshot())
async with trio.Path(outdir / '%s.heapsnapshot' % datetime.today().isoformat()).open('ab') as outfile:
    async for event in event_stream:
        await outfile.write(event.chunk)

To explain how it works: listen() returns a trio.abc.ReceiveChannel object, which can be used as an async iterator. The channel doesn't automatically close, however, so that async for loop will run forever as I've written it. It's not clear to me how you would know that the heap snapshot is finished unless you enable report_progress... so this may not be a complete answer for you, but let me know if it helps.

Here's where the listen() API is implemented: https://github.com/HyperionGray/trio-chrome-devtools-protocol/blob/master/trio_cdp/__init__.py#L77

jean commented 4 years ago

Thanks!

Hmm event_stream = await session.listen(heap_profiler.AddHeapSnapshotChunk) says: TypeError: object MemoryReceiveChannel can't be used in 'await' expression. And it looks like the lack of an await before session.execute blows up with:

RuntimeWarning: coroutine 'CdpBase.execute' was never awaited session.execute(heap_profiler.take_heap_snapshot(report_progress=True))
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

So I fixed those .. see the code below.

It's not clear to me how you would know that the heap snapshot is finished

Yeah I was wondering the same thing with the javascript original. There they simply have a session.on and an await session.send with reportProgress: false. It seems to work, maybe accidentally.

How would it work? I guess you would enable report_progress and then listen for both types of events This is what I ended up with:

event_stream = session.listen(heap_profiler.AddHeapSnapshotChunk, heap_profiler.ReportHeapSnapshotProgress)
await session.execute(heap_profiler.take_heap_snapshot(report_progress=True))
async with trio.Path(outdir / ('%s.heapsnapshot' % datetime.today().isoformat())).open('ab') as outfile:
  async for event in event_stream:
    import pdb; pdb.set_trace()
    await outfile.write(event.chunk)

This actually seems to work, up to a point. I get an error longer than my terminal scrollback which ends "135",\n"8137",\n"8139",\n"8140",\n"8141",\n"8142",\n"8144",\n"8145",\n"8146",\n"8147",\n"8148",\n"8149",\n"8150",\n"8151",\n"8154",\n"8155",\n"8165",\n"8168",\n"8172",\n"8173",\n"8178",\n"8180",\n"8185",\n"8186",\n"8193",\n"descriptors",\n"prototype_info",\n"transitions",\n"constant_elements",\n"get arguments",\n"set arguments",\n"get caller",\n"set caller",\n"previous",\n"back_pointer",\n"transition",\n"transition_info",\n"bound_this",\n"bound_function"]}')" due to full channel <send channel at 0x770f289160, using buffer at 0x770f328e20>

chrome_monitor_tool/monitor.py:82: RuntimeWarning: coroutine 'Path.open' was never awaited
  async with trio.Path(outdir / ('%s.heapsnapshot' % datetime.today().isoformat())).open('ab') as outfile:
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
  File "/data/data/com.termux/files/home/.local/bin/chrome_monitor_tool", line 11, in <module>
    load_entry_point('chrome-monitor-tool', 'console_scripts', 'chrome_monitor_tool')()
  File "/data/data/com.termux/files/home/.local/lib/python3.8/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/data/data/com.termux/files/home/.local/lib/python3.8/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/data/data/com.termux/files/home/.local/lib/python3.8/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/data/data/com.termux/files/home/.local/lib/python3.8/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/data/data/com.termux/files/home/.local/lib/python3.8/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/data/data/com.termux/files/home/.local/lib/python3.8/site-packages/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/data/data/com.termux/files/home/sc/chrome-monitor-tool/chrome_monitor_tool/chrome_monitor_tool.py", line 25, in memory
    trio.run(monitor.memory, ctx, restrict_keyboard_interrupt_to_checkpoints=True)
  File "/data/data/com.termux/files/home/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1804, in run
    raise runner.main_task_outcome.error
  File "/data/data/com.termux/files/home/sc/chrome-monitor-tool/chrome_monitor_tool/monitor.py", line 82, in memory
    async with trio.Path(outdir / ('%s.heapsnapshot' % datetime.today().isoformat())).open('ab') as outfile:
AttributeError: __aexit__
jean commented 4 years ago

Straightened out the trio.Path bit ... though I confess I don't really understand why we need async with await instead of async with:

event_stream = session.listen(heap_profiler.AddHeapSnapshotChunk, heap_profiler.ReportHeapSnapshotProgress)
await session.execute(heap_profiler.take_heap_snapshot(report_progress=True))
async with await trio.Path(outdir / ('%s.heapsnapshot' % datetime.today().isoformat())).open('ab') as outfile:
    async for event in event_stream:
        if type(event) is heap_profiler.ReportHeapSnapshotProgress:
            if event.finished:
                break
            continue
        await outfile.write(event.chunk)

We do get 2019-12-07T22:08:11.315053.heapsnapshot files, but they're zero length. The run now ends with:

\n"8173",\n"8178",\n"8180",\n"8185",\n"8186",\n"8193",\n"descriptors",\n"prototype_info",\n"transitions",\n"constant_elements",\n"get arguments",\n"set arguments",\n"get caller",\n"set caller",\n"previous",\n"back_pointer",\n"transition",\n"transition_info",\n"bound_this",\n"bound_function"]}')" due to full channel <send channel at 0x71a8414880, using buffer at 0x71a83fddc0>

mehaase commented 4 years ago

So I fixed those .. see the code below.

Whoops... very untested code. I'm glad you got it working!

I confess I don't really understand why we need async with await instead of async with

I agree that its a confusing API. It's because trio.Path.open() is itself a coroutine, and it returns a file object that is also an async context manager. I make this mistake frequently.

And I just noticed another bug: trio.Path(outdir / (...filename...)) should be trio.Path(outdir) / (...filename...)–the right parenthesis is in the wrong place.

what's that about a full channel?

The error log ERROR:trio_cdp:Unable to send event AddHeapSnapshotChunk ... due to full channel is caused because the channel you opened with listen() has a fixed capacity to buffer unread events. For example, if Chrome is sending events faster than your code can handle them, we either have to allocate potentially unbounded memory to buffer all of the events, or else we have to signal some kind of error when the buffer is full.

You can increase the buffer size like this: await session.listen(EventType1, EventType2, ..., buffer_size=1000).

This was one area of the API where I was unsure what to do. It would be nice to tell the browser "hey slow down!" but the only realistic way to do that is to stop reading events off of the websocket (which creates backpressure at the TCP levle), which would block all other events as well, not to mention responses to commands you've sent. I'm open to ideas about how to improve this API.

We do get 2019-12-07T22:08:11.315053.heapsnapshot files, but they're zero length.

This is curious. I tried commenting out the block inside the for loop and just printing the event type and I noticed this:

<class 'cdp.heap_profiler.ReportHeapSnapshotProgress'>
<class 'cdp.heap_profiler.ReportHeapSnapshotProgress'>
...snip....
<class 'cdp.heap_profiler.ReportHeapSnapshotProgress'>
<class 'cdp.heap_profiler.ReportHeapSnapshotProgress'>
<class 'cdp.heap_profiler.AddHeapSnapshotChunk'>
<class 'cdp.heap_profiler.AddHeapSnapshotChunk'>
...snip...
<class 'cdp.heap_profiler.AddHeapSnapshotChunk'>
<class 'cdp.heap_profiler.AddHeapSnapshotChunk'>

It sends all of the ReportHeapSnapshotProgress events before it sends the first AddHeapSnapshotChunk event. I think "report progress" means to report on how long the browser is taking to build the snapshot in its own memory space, not how much of the snapshot has been transmitted to the client.

Then I noticed that all of the chunks are exactly 102,400 bytes, except the last chunk. So I modified the code to break when it receives a smaller chunk:

event_stream = session.listen(heap_profiler.AddHeapSnapshotChunk, buffer_size=1000)
await session.execute(heap_profiler.take_heap_snapshot())
file_path = trio.Path('.') / '{}.heapsnapshot'.format(datetime.today().isoformat())
async with await file_path.open('w') as outfile:
    async for event in event_stream:
        await outfile.write(event.chunk)
        if len(event.chunk) < 102400:
            break

I don't think this is a great way to implement it (what happens if the size of heap snapshot is an exact multiple of 102400?) but this does at least produce a snapshot file.

jean commented 4 years ago

First some superficialities ...

I confess I don't really understand why we need async with await instead of async with

I agree that its a confusing API. It's because trio.Path.open() is itself a coroutine, and it returns a file object that is also an async context manager.

Word for word that makes perfect sense, but I can't say I grok it yet ..

And I just noticed another bug: trio.Path(outdir / (...filename...)) should be trio.Path(outdir) / (...filename...)–the right parenthesis is in the wrong place.

Hmm it currently works because it's trio.Path(trio.Path('...filename...') / (...filename...)).open(...) which comes to the same thing as (trio.Path('...filename...') / (...filename...)).open(...), just ugly. Fixed thanks!


Now some more digging ...

The error log ERROR:trio_cdp:Unable to send event AddHeapSnapshotChunk ... due to full channel is caused because the channel you opened with listen() has a fixed capacity to buffer unread events. For example, if Chrome is sending events faster than your code can handle them, we either have to allocate potentially unbounded memory to buffer all of the events, or else we have to signal some kind of error when the buffer is full.

That makes sense, but I don't see why that would be the case. The point of async streaming is that events are handled as they come in, right? In this case, handling entails writing each chunk to a file. However it sounds like all events are being buffered before handling starts.

This was one area of the API where I was unsure what to do. It would be nice to tell the browser "hey slow down!" but the only realistic way to do that is to stop reading events off of the websocket (which creates backpressure at the TCP levle), which would block all other events as well, not to mention responses to commands you've sent. I'm open to ideas about how to improve this API.

Maybe add an option to start dropping events when the buffer is full, but keep a count (something like event_stream.ok = set(1,2,3,5,6,10,11); event_stream.dropped = set(4,7,8,9)). That way it's still possible to get a sample of events.

This is curious. I tried commenting out the block inside the for loop and just printing the event type and I noticed this: [...] It sends all of the ReportHeapSnapshotProgress events before it sends the first AddHeapSnapshotChunk event.

Wow!

I think "report progress" means to report on how long the browser is taking to build the snapshot in its own memory space, not how much of the snapshot has been transmitted to the client.

That could be. What would the point of that be though?

Then I noticed that all of the chunks are exactly 102,400 bytes, except the last chunk. So I modified the code to break when it receives a smaller chunk:

[...]

I don't think this is a great way to implement it (what happens if the size of heap snapshot is an exact multiple of 102400?) but this does at least produce a snapshot file.

Yeah this isn't the answer. Without the break, I do get a snapshot file, but of course the script doesn't terminate. I'm pretty sure the JS original just opens new files without closing the old ones.

Thanks a lot for the help! I feel a lot less stuck than on Friday.

jean commented 4 years ago

Ignore this comment, if I understand correctly then iterating over receiver does process events as they come in.


To explain how it works: listen() returns a trio.abc.ReceiveChannel object, which can be used as an async iterator.

Don't we need to start a nursery if we want to consume the channel as events come in? nursery.start_soon(consumer, receive_channel) (docs)

Something like

diff --git a/trio_cdp/__init__.py b/trio_cdp/__init__.py
index e7cd8c6..29b0658 100644
--- a/trio_cdp/__init__.py
+++ b/trio_cdp/__init__.py
@@ -74,12 +74,14 @@ class CdpBase:
             raise response
         return response

-    def listen(self, *event_types, buffer_size=10):
+    def listen(self, *event_types, listener=None, buffer_size=10):
         ''' Return an async iterator that iterates over events matching the
         indicated types. '''
         sender, receiver = trio.open_memory_channel(buffer_size)
         for event_type in event_types:
             self.channels[event_type].add(sender)
+        if listener:
+            nursery.start_soon(listener, receiver)
         return receiver
jean commented 4 years ago

Profiler starts taking snapshot immediately when the command arrives (as before) and if reportProgress param is true it will send HeapProfiler.reportHeapSnapshotProgress events. After that a series of snapshot chunks is sent to the frontend as HeapProfiler.addHeapSnapshotChunk events. When whole snapshot is sent the backend will sent response to HeapProfiler.takeHeapSnapshot command. (ref)

event_stream = session.listen(heap_profiler.AddHeapSnapshotChunk, buffer_size=1000)
done = await session.execute(heap_profiler.take_heap_snapshot())
# so when we get here, all the chunks have already been sent. Can we close the stream?
event_stream.close() # Now it will raise `EndOfChannel` :grimacing: 

Why does done turn out to be None though, if execute returns response? And what if we're waiting for something that takes a really long time to complete? If we await session.execute we won't be able to start iterating the event_stream until the end.

mehaase commented 4 years ago

Ignore this comment, if I understand correctly then iterating over receiver does process events as they come in.

Correct.

Profiler starts taking snapshot immediately when the command arrives...(snip) When whole snapshot is sent the backend will sent response to HeapProfiler.takeHeapSnapshot command.

Good find! Now I understand why its behaving this way. In the first version of the code, we didn't start getting events until after the command finished, but by that point its already tried to send all of its events. That caused the event buffer to fill up and error out.

Why does done turn out to be None though, if execute returns response?

Some CDP commands have no return value, so execute() just returns None. The browser does send a response, but it just contains success/error signals that are used to decide whether to raise an exception or continue. Those details are abstracted from the caller.

If we await session.execute we won't be able to start iterating the event_stream until the end.

Yep, and this is where we need a nursery to solve the problem: one task to run in the background and read events, and then the main task just waits for take_heap_snapshot() to finish. (Optionally, a second background can print out the progress reports.) I wrapped this all up in a function:

async def take_heap_snapshot(session, fp, report_progress=False):
    async def chunk_helper():
        async for event in session.listen(heap_profiler.AddHeapSnapshotChunk):
            await fp.write(event.chunk)

    async def progress_helper():
        async for event in session.listen(
                heap_profiler.ReportHeapSnapshotProgress):
            print('Heap snapshot: {:0.1f}%{}'.format(
                event.done * 100 / event.total,
                ' (finished)' if event.finished else ''))

    async with trio.open_nursery() as nursery:
        nursery.start_soon(chunk_helper)
        if report_progress:
            nursery.start_soon(progress_helper)
        await session.execute(heap_profiler.take_heap_snapshot(report_progress))
        nursery.cancel_scope.cancel()

And it's called like this:

        file_path = trio.Path('.') / '{}.heapsnapshot'.format(
            datetime.today().isoformat())
        async with await file_path.open('w') as outfile:
            await take_heap_snapshot(session, outfile, report_progress=True)
jean commented 4 years ago

Thanks a lot :smile_cat: ~Is it possible that the chunk_helper is getting shut down before all the chunks have been written to disk? I have 3 out of 8 snapshots that end prematurely.~

jean commented 4 years ago

Is that nursery.cancel_scope.cancel() really necessary? Doesn't the context manager handle the cleanup?

Qianming1363 commented 2 years ago

i has got chunk,but it is a string like this "set scrollTo", "get scrollBy", "set scrollBy", "get scrollToElement", "set scrollToElement", "get resetPosition", "set resetPosition", "1 / part of key (Object @221225) -> value (system / JSProxy @221227) pair in WeakMap (table @148973)", "1 / part of key (Object @221229) -> value (system / JSProxy @221231) pair in WeakMap (table @148973)", "1 / part of key (Object @221233) -> value (system / JSProxy @221235) pair in WeakMap (table @148973)", "1 / part of key (Object @221247) -> value (system / JSProxy @221249) pair in WeakMap (table @148973)", "1 / part of key (Object @221253) -> value (system / JSProxy @221255) pair in WeakMap (table @148973)", "1 / part of key (Object @221263) -> value (system / JSProxy @221265) pair in WeakMap (table @148973)", "1 / part of key (Object @221267) -> value (system / JSProxy @221269) pair in WeakMap (table @148973)", "1 / part of key (Object @221273) -> value (system / JSProxy @221275) pair in WeakMap (table @148973)", "1 / part of key (Object @221277) -> value (system / JSProxy @221279) pair in WeakMap (table @148973)",

how to use it