Per-task asyncio utilization statistics

Switching from handling web requests in a synchronous process-per-request1 model to the asyncio Task-per-request model loses us a fair bit, including:

  1. an MMU to keep our memory separate from our neighbors.
  2. separable security contexts (although some of this can be recovered).
  3. per-request statistics collected by the OS and exposed through interfaces like getrusage.

This post attempts to improve on that last point, by bringing some of the statistics gathering done by a standard CPU scheduler into the Python interpreter. This is especially valuable in an event-looped context, where we give up some of the facilities that would usually be provided by an OS scheduler. In particular:

  1. OS schedulers can preempt CPU-bound workers in favor of CPU-starved workers. asyncio is non-preemptive (i.e. cooperative), and so tasks must wait for another task to yield the loop before they can run.
  2. OS schedulers can migrate threads and processes from busy CPUs to free CPUs, meaning that the service is a G/G/k queueing system. An asyncio task is bound to the event loop it started on, and will only ever have access to a single CPU at a time, which means it is a G/G/1 queueing system.

Lacking these escape-hatches, the performance stakes are higher within the Python interpreter. But the statistics facilities are basically nonexistent. Let’s fix that.

One possible approach is to build a sampling profiler. This means that every now and again, we’ll pause the app and collect a stack trace, and potentially correlate that stack trace with the running asyncio task. This is the approach taken by tools like py-spy when we use it to produce an on-CPU flamegraph.

The other option is to track per-task counters, and increment the counter whenever the “scheduler” switches between tasks. This is the approach taken by tools like top. I have implemented that here.

I wrote a class UtilizationCoroutineWrapper that emulates an underlying Python coroutine, which in this case is my ASGI app. It keeps track of all of the entry+exit times for the coroutine in order to manage a per-Task running count of on-CPU time. It’s basically a middleware for my ASGI app, except that it runs in a synchronous context.

Assuming your app doesn’t block on a syscall (which it really shouldn’t: the whole point of the event loop is to never block in a syscall), a coroutine’s running time is equal to its on-CPU time. So I just track running time as a single value, and for the purposes of this demo I include the total time spent in the HTTP response.

It was a bit tricky to make the wrapper function in a way that doesn’t obscure the stack traces from any other debugging tools that we might want to use. In the end I was able to produce a stack trace that I thought was reasonable, and it looks like this:

    Task-12
      File "uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
        result = await app(  # type: ignore[func-returns-value]
      File "uvicorn/middleware/proxy_headers.py", line 60, in __call__
        return await self.app(scope, receive, send)
--->  File "loop_utilization.py", line 85, in send
--->    ret = self.cr_await.send(v)
      File "fastapi/applications.py", line 1054, in __call__
        await super().__call__(scope, receive, send)
#!/usr/bin/env python3.12
# Copyright 2024 Josh Snyder
# Licensed under https://www.apache.org/licenses/LICENSE-2.0.txt
import asyncio
import asyncio.futures
import functools
import inspect
import os
import time
from collections import abc
from contextvars import ContextVar
from dataclasses import asdict, dataclass
from threading import Thread
from traceback import StackSummary

import uvicorn # 0.34.0
from fastapi import FastAPI # 0.115.6

# time.sleep is a suitable stand-in for monopolizing the event loop with CPU
# usage
use_cpu_for = time.sleep


def decorate_traceback(cls):
    """Adds a class-level field with a string representation of a single frame's
    traceback, representing a call to the underlying coroutine's send method.
    """

    def stacktrace_getter():
        frame = inspect.currentframe().f_back  # up one frame
        summary = StackSummary.extract(((frame, frame.f_lineno),))
        # Important that this is a `yield` so that this function becomes a
        # generator, and not just a regular function.
        yield summary

    cls.frame_str = next(cls(stacktrace_getter()))
    return cls


time_tracker = ContextVar("time_tracker")


@dataclass
class TimeTracker:
    _stamp: float = 0
    _running: bool = False
    rounds: int = 0
    elapsed: float = 0

    def advance(self, running, now):
        if running:
            self.rounds += 1
        if self._running:
            self.elapsed += now - self._stamp
        self._running = running
        self._stamp = now

    def as_dict(self, now):
        if self._running:
            self.elapsed += now - self._stamp
            self._stamp = now

        ret = asdict(self)
        ret.pop("_stamp")
        ret.pop("_running")
        return ret


@decorate_traceback
class UtilizationCoroutineWrapper(abc.Coroutine):
    def __init__(self, coro, time=time.monotonic):
        self._tracker = TimeTracker()
        # Set a ContextVar, so that the tracker is available to anyone within
        # the same Context.
        time_tracker.set(self._tracker)

        # cr_await makes us look like a coroutine, so that debuggers/profilers
        # can walk the stack into our underlying coroutine.
        self.cr_await = coro

        # It might be more traditional to use time.perf_counter() here, but
        # typical asyncio event loops use time.monotonic().
        self._time = time

    def send(self, v):
        tracker = self._tracker

        tracker.advance(True, self._time())
        try:
            ret = self.cr_await.send(v)
        finally:
            tracker.advance(False, self._time())

        return ret

    def throw(self, *exc):
        return self.cr_await.throw(*exc)

    def close(self):
        return self.cr_await.close()

    def __await__(self):
        return self

    def __next__(self):
        return self.send(None)


app = FastAPI()


def _execution_info():
    # Get the collected statistics and return them
    return time_tracker.get().as_dict(time.monotonic())


@app.get("/block")
async def blocker(amount: float = 0.0, nice: bool = True):
    """Block the main thread, so it looks like we're doing something CPU-bound.
    """
    step = amount
    if nice:
        # If not nice, we'll literally just block the event loop for `amount`
        step = 0.01

    while amount > 0:
        if nice:
            await asyncio.sleep(0)
        use_cpu_for(step)
        amount -= step

    return _execution_info()


@app.get("/sleep")
async def sleeper(amount: float = 0.0):
    """Sleep in asyncio."""
    await asyncio.sleep(amount)
    return _execution_info()


@app.get("/io")
async def sleep_on_timer(amount: float = 0.0):
    """Sleep, but in another thread so it looks like file I/O."""
    r, w = os.pipe()

    def wakeup():
        # While anyone with a brain can tell that this will sleep for `amount`
        # seconds, it will be scheduled by the OS scheduler, so our event loop
        # will see the main thread as just waiting for file I/O
        time.sleep(amount)
        os.write(w, b"\x00")
        os.close(w)

    Thread(target=wakeup).start()

    loop = asyncio.get_running_loop()
    future = loop.create_future()

    loop.add_reader(r, future.set_result, None)
    await future
    loop.remove_reader(r)
    os.close(r)
    return _execution_info()


def _iscoroutinefunction(fn):
    """Determine if `fn` is a coroutine function, thoroughly."""
    if asyncio.iscoroutinefunction(fn):
        return True

    try:
        fn = fn.__call__
    except AttributeError:
        return False

    return asyncio.iscoroutinefunction(fn)


def compose(fn1):
    """Make a function that composes `fn1` onto an underlying `fn2`."""

    def wrapper(fn2):
        @functools.wraps(fn2)
        def wrapped(*args, **kwargs):
            return fn1(fn2(*args, **kwargs))

        # uvicorn wants to see a coroutine function, so we oblige.
        if _iscoroutinefunction(fn2):
            wrapped = inspect.markcoroutinefunction(wrapped)
        return wrapped

    return wrapper


async def main():
    app_ = compose(UtilizationCoroutineWrapper)(app)
    config = uvicorn.Config(app_, port=8000)
    server = uvicorn.Server(config)
    await server.serve()


if __name__ == "__main__":
    raise SystemExit(asyncio.run(main()))

  1. In synchronous Python web services, it is typical to reuse a process for multiple requests over its lifetime, but each process would handle one request at a time.