What is my asyncio app doing?

It turns out that asyncio tasks don’t really have stack traces‽ The documentation for Task.get_stack says:

Only one stack frame is returned for a suspended coroutine.

This is kinda annoying when you want to know what your asyncio event loop is doing (or not doing, as the case may be). Moreover, the stack frame which does get returned is the outermost stack frame, so func_c in the below example.

async def func_a():
    await asyncio.sleep(1)

async def func_b():
    await func_a()

async def func_c():
    await func_b()

asyncio.run(func_c())

In theory, the newly-resumed code in func_c can decide to do whatever it wants, like dancing the Macarena or somesuch. In practice, the newly-resumed coroutine is just going to pass control to the unfinished coroutine it was most recently awaiting, which is func_b in this case. The Python maintainers are aware of this fact.

But it turns out that if you want to know what your event loop is up to, the coroutines themselves are pretty useless. The thing you really want to know is: which coroutine is my event loop going to resume in response to any given event? And that requires interrogating the event loop itself.

In the below code snippet, I do that in a bottom-up fashion. Start with the events that can trigger a coroutine to run:

  1. a task can be runnable right now, meaning that the event loop is planning to switch to it within the current iteration, or the next one.
  2. a task can be runnable at a time in the future, and tracked by the event loop.
  3. a task can become runnable by an outside stimulus from a file descriptor.

Then (and this is the tricky part), figure out which chain1 of callbacks the event loop is planning to call as a result of a any given event. In the happy case you can just see “X file descriptor is being watched by Y asyncio Task”. But it’s also typical for the event loop itself to execute some Python code (e.g. read from the socket and decode some TLS records) before dispatching the resulting data to somewhere else. Asking where Turing-complete code is planning to jump is literally undecidable, so we’re stuck doing our best with heuristics.

The good news is that the heuristics mostly work! I have hooked this up to make a basic debugger/profiler for a FastAPI webapp. I can hit the /tasks endpoint and see what all of the other requests are doing.

#!/usr/bin/env python3.12
# Copyright 2024 Josh Snyder
# Licensed under https://www.apache.org/licenses/LICENSE-2.0.txt
import _asyncio
import asyncio
import asyncio.futures
import builtins
import inspect
import os
import time
from io import StringIO
from itertools import chain
from threading import Thread
from typing import Callable
from traceback import FrameSummary, StackSummary

import uvicorn # 0.34.0
from fastapi import FastAPI # 0.115.6
from fastapi.responses import PlainTextResponse


def _get_frame(coro) -> FrameSummary | None:
    try:
        return coro.frame_summary
    except AttributeError:
        pass

    try:
        frame = coro.cr_frame
    except AttributeError:
        return None

    return StackSummary.extract(
        [
            (frame, frame.f_lineno),
        ]
    )[0]


# Other discussions/implementations at:
#   https://github.com/python/cpython/issues/91048
#   https://stackoverflow.com/questions/41748443
def _get_frames(coro) -> StackSummary:
    """Produces a "stack trace" of a running coroutine based on what it is
    awaiting.
    """
    frames = []
    while True:
        frame = _get_frame(coro)
        if frame is None:
            break

        frames.append(frame)
        try:
            coro = coro.cr_await
        except AttributeError:
            break

    return StackSummary.from_list(frames)


def _process_selector_handle(loop, handle):
    """Process a handle object from within the Selector, attempting to
    determine, which Future (or other callback) it targets."""
    callback = handle._callback
    if inspect.isbuiltin(callback):
        target = callback.__self__
        if hasattr(target, "set_result"):
            # It targets a builtin Future.
            return target

        return None

    if inspect.ismethod(callback):
        target = callback.__self__
        # It targets a future of some other kind
        if hasattr(target, "set_result"):
            return target

        name = callback.__name__
        if target is not loop:
            return callback

        if name.startswith("_sock_") and isinstance(
            handle._args[0], _asyncio.Future
        ):
            # This is a socket-handling method with an associated Future.
            # Just return the Future.
            return handle._args[0]

        return callback

    return callback


def _process_single_selector(loop, key):
    # A selector contains two callbacks: one for reading and another for
    # writing. Process them in-turn.
    _, (reader, writer) = key.events, key.data
    if reader is not None:
        future = _process_selector_handle(loop, reader)
        if future is not None:
            yield future, "readable"

    if writer is not None:
        future = _process_selector_handle(loop, writer)
        if future is not None:
            yield future, "writable"


def _get_selector_callbacks(loop) -> dict[Callable, tuple[int, str]]:
    """Use the selector object's internal data structures to figure out what
    file descriptors it is watching, and what it will do when they become ready.
    """
    ret = dict()
    selector_mapping = loop._selector.get_map()
    for fd, key in selector_mapping.items():
        for future, action in _process_single_selector(loop, key):
            ret[future] = (fd, action)

    return ret


def get_wakeable_tasks_from_future(future):
    """Look at a future object and try to figure out what will wake up when
    it completes. Currently just implements the simple case of a Task or Tasks
    being woken upon the completion of `future`.
    """
    try:
        if future._callbacks is None:
            return
    except AttributeError:
        return future

    for callback, ctx in future._callbacks:
        if not inspect.isbuiltin(callback):
            continue

        if callback.__name__ != "task_wakeup":
            continue

        if not isinstance(callback.__self__, _asyncio.Task):
            continue

        yield callback.__self__


def get_activity_digest():
    loop = asyncio.get_running_loop()

    output = StringIO()

    def print(*args, **kwargs):
        kwargs["file"] = output
        return builtins.print(*args, **kwargs)

    tasks = set()  # All "tasks" that we've seen on the loop
    ready = set()  # The set of ready "tasks"

    # "Tasks" in quotation marks because they might be one-shot callbacks,
    # rather than asyncio.Task objects

    # Source 1: ready callbacks
    # loop._ready contains the list of "run this now" callbacks
    for handle in loop._ready:
        callback = handle._callback
        try:
            target = callback.__self__
        except AttributeError:
            continue

        if target is loop:
            # If the loop is targeted, it's probably a non-Task callback
            # Try to resolve it into an actual task.
            for task in get_wakeable_tasks_from_future(handle._args[0]):
                ready.add(task)
        else:
            ready.add(target)

    tasks.update(ready)

    # Source 2: file-backed tasks
    callbacks = _get_selector_callbacks(loop)

    # Source 3: this is the event loop's heap of scheduled callbacks
    schedule = dict()
    for handle in loop._scheduled:
        if handle._callback is not asyncio.futures._set_result_unless_cancelled:
            # We currently only handle the popular case of asyncio.sleep
            continue

        future, result = handle._args
        schedule[future] = (result, handle._when)

    # Take a look at the list of callbacks we found and attempt to resolve
    # them into tasks.
    # The ready list already had its tasks resolved (slightly differently)
    for future in chain(callbacks, schedule):
        for task in get_wakeable_tasks_from_future(future):
            tasks.add(task)

    now = loop.time()
    task_list = sorted(tasks, key=lambda t: id(t))
    for task in task_list:
        print(task.get_name())
        stack = _get_frames(task.get_coro())
        future = task._fut_waiter
        print("".join(stack.format()))

        is_ready = task in ready
        if is_ready:
            print("  Is runnable")

        if is_ready != (future is None or future.done()):
            # This is supposedly an invariant within asyncio.
            print("  ERROR: Mismatch between ready state and future")

        try:
            fd, desired_state = callbacks.pop(future)
        except KeyError:
            pass
        else:
            print(f"  Waiting on FD {fd} to become {desired_state}")

        try:
            result, when = schedule.pop(future)
        except KeyError:
            pass
        else:
            delay = when - now
            print(
                f"  Completes with result {result!r} in {delay:0.06f} seconds"
            )

        print()

    if callbacks:
        print("Non-task file callbacks")
        for callback, (fd, desired_state) in callbacks.items():
            protocol = ""
            try:
                protocol = callback.__self__.get_protocol().__class__.__name__
            except AttributeError:
                pass

            if protocol:
                protocol = f" protocol:{protocol}"
            print(
                f"  Waiting on FD {fd} to become {desired_state}: "
                f"{callback!r}{protocol}"
            )

    return output.getvalue()


app = FastAPI()


@app.get("/tasks")
async def get_tasks():
    return PlainTextResponse(get_activity_digest())


# time.sleep is a suitable stand-in for monopolizing the event loop with CPU
# usage
use_cpu_for = time.sleep


@app.get("/block")
async def blocker(amount: float = 0.0, nice: bool = True):
    """Block the main thread, so it looks like we're doing something CPU-bound.
    """
    step = amount
    if nice:
        # If not nice, we'll literally just block the event loop for `amount`
        step = 0.01

    while amount > 0:
        if nice:
            await asyncio.sleep(0)
        use_cpu_for(step)
        amount -= step


@app.get("/sleep")
async def sleeper(amount: float = 0.0):
    """Sleep in asyncio."""
    await asyncio.sleep(amount)


@app.get("/io")
async def sleep_on_timer(amount: float = 0.0):
    """Sleep, but in another thread so it looks like file I/O."""
    r, w = os.pipe()

    def wakeup():
        # While anyone with a brain can tell that this will sleep for `amount`
        # seconds, it will be scheduled by the OS scheduler, so our event loop
        # will see the main thread as just waiting for file I/O
        time.sleep(amount)
        os.write(w, b"\x00")
        os.close(w)

    Thread(target=wakeup).start()

    loop = asyncio.get_running_loop()
    future = loop.create_future()

    loop.add_reader(r, future.set_result, None)
    await future
    loop.remove_reader(r)
    os.close(r)


async def main():
    config = uvicorn.Config(app)
    server = uvicorn.Server(config)
    await server.serve()


if __name__ == "__main__":
    asyncio.run(main())

  1. As this comment recognizes, it’s not actually a chain: it’s a directed graph. The kind with cycles.