What is my asyncio app doing?
It turns out that asyncio tasks don’t really have stack traces‽ The
documentation for Task.get_stack
says:
Only one stack frame is returned for a suspended coroutine.
This is kinda annoying when you want to know what your asyncio event loop is
doing (or not doing, as the case may be). Moreover, the stack frame which does
get returned is the outermost stack frame, so func_c
in the below example.
async def func_a():
await asyncio.sleep(1)
async def func_b():
await func_a()
async def func_c():
await func_b()
asyncio.run(func_c())
In theory, the newly-resumed code in func_c
can decide to do whatever it
wants, like dancing the Macarena or somesuch. In practice, the newly-resumed
coroutine is just going to pass control to the unfinished coroutine it was most
recently await
ing, which is func_b
in this case. The Python maintainers are
aware of this fact.
But it turns out that if you want to know what your event loop is up to, the coroutines themselves are pretty useless. The thing you really want to know is: which coroutine is my event loop going to resume in response to any given event? And that requires interrogating the event loop itself.
In the below code snippet, I do that in a bottom-up fashion. Start with the events that can trigger a coroutine to run:
- a task can be runnable right now, meaning that the event loop is planning to switch to it within the current iteration, or the next one.
- a task can be runnable at a time in the future, and tracked by the event loop.
- a task can become runnable by an outside stimulus from a file descriptor.
Then (and this is the tricky part), figure out which chain1 of callbacks the event loop is planning to call as a result of a any given event. In the happy case you can just see “X file descriptor is being watched by Y asyncio Task”. But it’s also typical for the event loop itself to execute some Python code (e.g. read from the socket and decode some TLS records) before dispatching the resulting data to somewhere else. Asking where Turing-complete code is planning to jump is literally undecidable, so we’re stuck doing our best with heuristics.
The good news is that the heuristics mostly work! I have hooked this up to make
a basic debugger/profiler for a FastAPI webapp. I can hit the /tasks
endpoint
and see what all of the other requests are doing.
#!/usr/bin/env python3.12
# Copyright 2024 Josh Snyder
# Licensed under https://www.apache.org/licenses/LICENSE-2.0.txt
import _asyncio
import asyncio
import asyncio.futures
import builtins
import inspect
import os
import time
from io import StringIO
from itertools import chain
from threading import Thread
from typing import Callable
from traceback import FrameSummary, StackSummary
import uvicorn # 0.34.0
from fastapi import FastAPI # 0.115.6
from fastapi.responses import PlainTextResponse
def _get_frame(coro) -> FrameSummary | None:
try:
return coro.frame_summary
except AttributeError:
pass
try:
frame = coro.cr_frame
except AttributeError:
return None
return StackSummary.extract(
[
(frame, frame.f_lineno),
]
)[0]
# Other discussions/implementations at:
# https://github.com/python/cpython/issues/91048
# https://stackoverflow.com/questions/41748443
def _get_frames(coro) -> StackSummary:
"""Produces a "stack trace" of a running coroutine based on what it is
awaiting.
"""
frames = []
while True:
frame = _get_frame(coro)
if frame is None:
break
frames.append(frame)
try:
coro = coro.cr_await
except AttributeError:
break
return StackSummary.from_list(frames)
def _process_selector_handle(loop, handle):
"""Process a handle object from within the Selector, attempting to
determine, which Future (or other callback) it targets."""
callback = handle._callback
if inspect.isbuiltin(callback):
target = callback.__self__
if hasattr(target, "set_result"):
# It targets a builtin Future.
return target
return None
if inspect.ismethod(callback):
target = callback.__self__
# It targets a future of some other kind
if hasattr(target, "set_result"):
return target
name = callback.__name__
if target is not loop:
return callback
if name.startswith("_sock_") and isinstance(
handle._args[0], _asyncio.Future
):
# This is a socket-handling method with an associated Future.
# Just return the Future.
return handle._args[0]
return callback
return callback
def _process_single_selector(loop, key):
# A selector contains two callbacks: one for reading and another for
# writing. Process them in-turn.
_, (reader, writer) = key.events, key.data
if reader is not None:
future = _process_selector_handle(loop, reader)
if future is not None:
yield future, "readable"
if writer is not None:
future = _process_selector_handle(loop, writer)
if future is not None:
yield future, "writable"
def _get_selector_callbacks(loop) -> dict[Callable, tuple[int, str]]:
"""Use the selector object's internal data structures to figure out what
file descriptors it is watching, and what it will do when they become ready.
"""
ret = dict()
selector_mapping = loop._selector.get_map()
for fd, key in selector_mapping.items():
for future, action in _process_single_selector(loop, key):
ret[future] = (fd, action)
return ret
def get_wakeable_tasks_from_future(future):
"""Look at a future object and try to figure out what will wake up when
it completes. Currently just implements the simple case of a Task or Tasks
being woken upon the completion of `future`.
"""
try:
if future._callbacks is None:
return
except AttributeError:
return future
for callback, ctx in future._callbacks:
if not inspect.isbuiltin(callback):
continue
if callback.__name__ != "task_wakeup":
continue
if not isinstance(callback.__self__, _asyncio.Task):
continue
yield callback.__self__
def get_activity_digest():
loop = asyncio.get_running_loop()
output = StringIO()
def print(*args, **kwargs):
kwargs["file"] = output
return builtins.print(*args, **kwargs)
tasks = set() # All "tasks" that we've seen on the loop
ready = set() # The set of ready "tasks"
# "Tasks" in quotation marks because they might be one-shot callbacks,
# rather than asyncio.Task objects
# Source 1: ready callbacks
# loop._ready contains the list of "run this now" callbacks
for handle in loop._ready:
callback = handle._callback
try:
target = callback.__self__
except AttributeError:
continue
if target is loop:
# If the loop is targeted, it's probably a non-Task callback
# Try to resolve it into an actual task.
for task in get_wakeable_tasks_from_future(handle._args[0]):
ready.add(task)
else:
ready.add(target)
tasks.update(ready)
# Source 2: file-backed tasks
callbacks = _get_selector_callbacks(loop)
# Source 3: this is the event loop's heap of scheduled callbacks
schedule = dict()
for handle in loop._scheduled:
if handle._callback is not asyncio.futures._set_result_unless_cancelled:
# We currently only handle the popular case of asyncio.sleep
continue
future, result = handle._args
schedule[future] = (result, handle._when)
# Take a look at the list of callbacks we found and attempt to resolve
# them into tasks.
# The ready list already had its tasks resolved (slightly differently)
for future in chain(callbacks, schedule):
for task in get_wakeable_tasks_from_future(future):
tasks.add(task)
now = loop.time()
task_list = sorted(tasks, key=lambda t: id(t))
for task in task_list:
print(task.get_name())
stack = _get_frames(task.get_coro())
future = task._fut_waiter
print("".join(stack.format()))
is_ready = task in ready
if is_ready:
print(" Is runnable")
if is_ready != (future is None or future.done()):
# This is supposedly an invariant within asyncio.
print(" ERROR: Mismatch between ready state and future")
try:
fd, desired_state = callbacks.pop(future)
except KeyError:
pass
else:
print(f" Waiting on FD {fd} to become {desired_state}")
try:
result, when = schedule.pop(future)
except KeyError:
pass
else:
delay = when - now
print(
f" Completes with result {result!r} in {delay:0.06f} seconds"
)
print()
if callbacks:
print("Non-task file callbacks")
for callback, (fd, desired_state) in callbacks.items():
protocol = ""
try:
protocol = callback.__self__.get_protocol().__class__.__name__
except AttributeError:
pass
if protocol:
protocol = f" protocol:{protocol}"
print(
f" Waiting on FD {fd} to become {desired_state}: "
f"{callback!r}{protocol}"
)
return output.getvalue()
app = FastAPI()
@app.get("/tasks")
async def get_tasks():
return PlainTextResponse(get_activity_digest())
# time.sleep is a suitable stand-in for monopolizing the event loop with CPU
# usage
use_cpu_for = time.sleep
@app.get("/block")
async def blocker(amount: float = 0.0, nice: bool = True):
"""Block the main thread, so it looks like we're doing something CPU-bound.
"""
step = amount
if nice:
# If not nice, we'll literally just block the event loop for `amount`
step = 0.01
while amount > 0:
if nice:
await asyncio.sleep(0)
use_cpu_for(step)
amount -= step
@app.get("/sleep")
async def sleeper(amount: float = 0.0):
"""Sleep in asyncio."""
await asyncio.sleep(amount)
@app.get("/io")
async def sleep_on_timer(amount: float = 0.0):
"""Sleep, but in another thread so it looks like file I/O."""
r, w = os.pipe()
def wakeup():
# While anyone with a brain can tell that this will sleep for `amount`
# seconds, it will be scheduled by the OS scheduler, so our event loop
# will see the main thread as just waiting for file I/O
time.sleep(amount)
os.write(w, b"\x00")
os.close(w)
Thread(target=wakeup).start()
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.add_reader(r, future.set_result, None)
await future
loop.remove_reader(r)
os.close(r)
async def main():
config = uvicorn.Config(app)
server = uvicorn.Server(config)
await server.serve()
if __name__ == "__main__":
asyncio.run(main())