Closed asodeur closed 2 years ago
Hey Andreas,
I understand it is a reasonable/practical request to identify caller/callee relationships between coroutines. However, the issue is asyncio.gather
adds a done_callback to the loop and returns back a Future
. From the profilers perspective there is nothing that links doit()
and aio_workers
as they are not seen in the callstack as a parent/child relation. Unfortunately, there are more functions like this in asyncio
that returns a Future
and currently we have no way of knowing who is the caller of that Future
(e.x: loop.call_soon
or loop.call_later
).
To make this work, one needs to hook into library functions to make sense of what is happening. Currently, I cannot see an easy way through to work this out without not going too deep into the library internals.
Needless to say: I am open to suggestions if you have any?
I was hoping there might be some easy trick using set_*_callback
.
Adding a callback to change the recorded callstack is probably a no-go (not obvious how this could work and performance reasons). Do you think there is a chance a monkey-patched gather
putting hints in a contextvar in combination with set_tag_callback
might leave enough information to clean the callgraph in a post-processing step? If there are no obvious reasons this won't work I could give it a try.
Do you think there is a chance a monkey-patched gather putting hints in a contextvar in combination with set_tag_callback might leave enough information to clean the callgraph in a post-processing step?
I am not sure if I understand your suggested solution. You can use set_tag_callback
to tag any profiled function but still not sure how you will link futures with the gather
. If you can demonstrate a simple PoC, I would be more than happy to help.
I think I got a prototype doing the tagging but I am struggleling with rewriting the statistics. Already failing to filter the stats for tag == 1
. YFuncStat.tag
always seems to return 0 and YFuncStats.get
seems to mutate the statistics in place.
stats = yappi.get_func_stats()
orig_len = len(stats)
assert [fs for fs in stats if fs.tag == 1] == [] # gives [], fs.tag always seems to be zero (but they are not)
assert 0 < len(stats.get({'tag': 1})) < orig_len # there seem to be functions with tag == 1 as expected
len(stats) == len(stats.get({'tag': 1})) # .get seems to have mutated stats
How do I filter for functions by tag?
If you are using the latest master branch, please get it like: yappi.get_func_stats(tag=xxxx)
But the problem seems to be different in your case, are you sure you set correct tag callback? This should work in any case: [fs for fs in stats if fs.tag == 1]
. Maybe you could post the example code?
Attach my code below. The code accessing the tags is inside the __main__
guard at the bottom. Looks to me as if there is an issue with the Python wrapper. YFuncStat.tag
is zero until it is beeing used by YFuncStats.get
, YFuncStats.get
mutates the object in-place, yappi.get_func_stats
gives a new, clean YFuncStats
.
For now a feasible work-around should be to chain subsets for different tags (which works in my case b/c the set of all tags is known upfront)
from asyncio import create_task, run, sleep
from contextvars import ContextVar
import inspect
import yappi
_marker = ContextVar('yappi_task_marker')
_task_counter = 0
_task_map = {}
CREATE_TASK_ID = (
create_task.__code__.co_filename, create_task.__code__.co_firstlineno, create_task.__code__.co_name
)
def _task_tag_cbk():
return _marker.get(0)
async def aio_worker():
await sleep(1.)
return _marker.get(0)
async def doit():
# except for the two lines marked with '# <- keep this' everything should go into the task factory (or monkey-patch)
global _task_counter
coro = aio_worker() # <- keep
f = inspect.currentframe()
caller_id = (f.f_code.co_filename, f.f_code.co_firstlineno, f.f_code.co_name)
callee_id = (coro.cr_code.co_filename, coro.cr_code.co_firstlineno, coro.cr_code.co_name)
fid = (*caller_id, *callee_id)
tag = _task_map.get(fid, 0)
if not tag:
_task_counter += 1
_task_map[fid] = tag = _task_counter
token = _marker.set(tag)
task = create_task(coro) # <- keep this
_marker.reset(token)
return await task # <- keep this
if __name__ == '__main__':
yappi.set_tag_callback(_task_tag_cbk)
yappi.set_clock_type('wall')
with yappi.run(builtins=True):
print('Task tag: ', run(doit(), debug=False))
stats = yappi.get_func_stats()
## various attempts a retrieving functions with tag == 1
assert len(stats) == 274
assert [fs for fs in stats if fs.tag == 1] == [] # gives [], fs.tag always seems to be zero (but they are not)
assert len(stats.get({'tag': 1})) == 61 # using .get and filter finds 61 functions which looks reasonable
assert len([fs for fs in stats if fs.tag == 1]) == 61 # .get with filter seems to have populated the tags
len(stats) == 61 # but also mutated stats in place
##
stats = yappi.get_func_stats() # this seems to get the original stats back
assert len(stats) == 274
The correct/fastest way to retrieve per-tag func stats is like: yappi.get_func_stats({tag:xxx})
as it happens entirely on C. The other version [fs for fs in stats if fs.tag == 1]
will not perform well on big datasets.
yappi.get_func_stats()
will traverse the snapshot that is hold in memory on the Yappi's C extension. That is why, when you call it multiple times, it will traverse that memory again and again and return the same object. Any kind of filtering you do on the stats object will be done entirely on your snapshot. And the reason behind this: profiling might continue between different calls to get_func_stats()
which happens in C side. All other API happening on YFuncStats
are actually a frontend for this Snapshot.
Final: the reason [fs for fs in stats if fs.tag == 1]
is not same with yappi.get_func_stats()
is actually a bug. The tag always returns zero when get_func_stats()
does not specify a tag value. I will hopefully be fixing this ASAP. But fortunately, this should not block you in any way.
Please use:
stats = yappi.get_func_stats({'tag':1})
Thx, this was helpful. Below is a rough sketch of the idea. The resulting callgraph for doit
looks ok.
Still some issue to work-out
doit
's ttot
needs to be propagated-up the call tree (and maybe should be removed from the
loop's .select
).doit
into a task factorycreate_task
per function as the tag
is lost during conversion to
YChildFuncStat
.but none of this looks impossible.
from asyncio import create_task, run, sleep
from contextvars import ContextVar
import inspect
import yappi
_marker = ContextVar('yappi_task_marker')
_task_counter = 0
_task_map = {}
CREATE_TASK_ID = (
create_task.__code__.co_filename, create_task.__code__.co_firstlineno, create_task.__code__.co_name
)
def _task_tag_cbk():
return _marker.get(0)
async def aio_worker():
await sleep(1.)
return _marker.get(0)
async def doit():
# except for the two lines marked with '# <- keep this' everything should go into a task factory (or monkey-patch)
global _task_counter
coro = aio_worker() # <- keep
f = inspect.currentframe()
caller_id = (f.f_code.co_filename, f.f_code.co_firstlineno, f.f_code.co_name)
callee_id = (coro.cr_code.co_filename, coro.cr_code.co_firstlineno, coro.cr_code.co_name)
fid = (*caller_id, *callee_id)
tag = _task_map.get(fid, 0)
if not tag:
_task_counter += 1
_task_map[fid] = tag = _task_counter
token = _marker.set(tag)
task = create_task(coro) # <- keep this
_marker.reset(token)
return await task # <- keep this
def get_func_stats_with_tags(tags):
"""yappi 1.2.4 does not populate YFuncStat.tag unless queried"""
result = yappi.YFuncStats()
for tag in tags:
stats = yappi.get_func_stats()
for fs in stats.get({'tag': tag}):
result.append(fs)
if 0 not in tags:
stats = yappi.get_func_stats()
for fs in stats.get({'tag': 0}):
result.append(fs)
return result
def to_child_func_stat(y_func_stat):
return yappi.YChildFuncStat([
y_func_stat.index,
y_func_stat.ncall,
y_func_stat.nactualcall,
y_func_stat.ttot,
y_func_stat.tsub,
y_func_stat.tavg,
y_func_stat.builtin,
y_func_stat.full_name,
y_func_stat.module,
y_func_stat.lineno,
y_func_stat.name
])
def fix_calltree(stats, task_map):
# TODO: not working yet
fixed_stats = yappi.YFuncStats()
callee_map = {}
caller_map = {}
for (rmodule, rline, rname, emodule, eline, ename), tag in task_map.items():
callee_map[emodule, eline, ename] = tag
caller_map[rmodule, rline, rname] = tag
callees = {}
for fs in stats:
tag = callee_map.get((fs.module, fs.lineno, fs.name))
if tag and fs.tag == tag:
callees[tag] = to_child_func_stat(fs)
create_tasks = {}
for fs in stats:
if (fs.module, fs.lineno, fs.name) == CREATE_TASK_ID:
callee = callees.get(fs.tag)
if callee:
new_fs = yappi.YFuncStat(fs)
new_fs.ttot += callee.ttot
new_fs.tavg += callee.tavg
new_fs.children.append(callee)
fixed_stats.append(new_fs)
create_tasks[fs.tag] = to_child_func_stat(new_fs)
for fs in stats:
new_fs = yappi.YFuncStat(fs)
tag = caller_map.get((fs.module, fs.lineno, fs.name))
if tag:
ct = create_tasks.get(tag)
if ct:
new_fs.tsub -= ct.ttot
new_fs.children.append(ct)
fixed_stats.append(new_fs)
return fixed_stats
if __name__ == '__main__':
yappi.set_tag_callback(_task_tag_cbk)
yappi.set_clock_type('wall')
with yappi.run(builtins=True):
print('Task tag: ', run(doit(), debug=False))
stats = get_func_stats_with_tags(set(_task_map.values()))
fixed_stats = fix_calltree(stats, _task_map)
# save to view with snakerun or similar
fixed_stats.save('the_profile.pstat', type='pstat')
Sorry to bother you again, but bumped into this:
class YFuncStat(YStat):
...
def __eq__(self, other):
if other is None:
return False
return self.full_name == other.full_name
Shouldn't comparison take tag
into account as well. As a knock-on effect you cannot YFuncStats.append
the same function with different tags.
Sorry to bother you again,
Not at all :)
Shouldn't comparison take tag into account as well
I forgot to mention that it is not possible to traverse on [fs for fs in stats if fs.tag == 1]
like this. I completely forgot the implementation details for the get_func_stats
, so my previous comment on this being an issue was wrong. You see the point. The snapshot generated by the get_func_stats()
is aggregated over func_name
or index
fields of the underlying data. So, if you have multiple tags for same function, they will be aggregated into a single one and you cannot traverse it that way. It has historic reasons which I don't remember and it is very hard to do other way without breaking backward compat.
Anyway, the only correct way to traverse per-tag or context id stats is: get_func_stats(tag=, ctx_id)
. I will be updating docs for this. (if there are any mentions of this)
If you really, really want to enumerate the stats yourself, you can always call _yappi.enum_func_stats()
which provides all the raw data but I feel get_func_stats()
should be enough?
Got a first prototype by now.
Turns-out you have to set a context var on the caller side of loop.create_task
to correlate caller and callee (like in doit above). This requires to (i) change the profiled code, (ii) monkey patch asyncio
, (iii) use a custom event loop implementation overriding create_task
, or (iv) customize yappi
's profile function.
The current prototype is trying to customize the yappi
profile function but that does not seem to be supported currently (does nothing with profile_threads==True
, crashes for multi-threaded programs). Are there plans to support customizing the profile function?
(Without support for setting the profile function I'd likely go with (iii), the only downside of that being that you need to run the whole program on a custom event loop even if you are just profiling little pieces)
First: Wow!
Congrats on this!
Now I would like to help you on this but I think this is not something I can include in Yappi for the time being. Let me share my reasoning before moving further:
asyncio
as it requires monkey patching. I remember I mentioned it here: https://github.com/sumerc/yappi/issues/54#issuecomment-614717794 . As a maintainer, I could hardly find time to work on the project and I really would not want to depend on internals of other libraries.gevent
which means I think we should not be too specific on some library(even if it is stdlib). I am hoping Yappi can profile greenlets
as well, without too much effort.gather
, create_task
. but what about call_soon
, call_soon_threadsafe
, call_later
...etc. And all the other functions that can schedule a coroutine?However: If you really would like to go over this: what I would suggest is to implement another library using Yappi or simply fork it. I would try my best to help on your issues.
Let's clarify this first and then we can talk about potential hooks that you request.
Closing this issue as there is no progress.
Coroutines being run via
asyncio.gather
do not show-up in the callgraph for the calling function.asyncio.gather
returns aFuture
gathering the results from the provided coroutines. Timings are correct (up to the caveats in #21) but the callgraph only shows the creation of the gathering future. The caller for the coroutines run viagather
is the event loop.Is there any way to provide hints to yappi to change the caller for the coroutines?
Example:
For me it would be ok if
gather
would not show-up in the callgraph at all andaio_worker
looked like a direct callee ofdoit
.