A good transaction is a stateless transaction
We know that a good database is a stateless database, but what does that tell us about transactions? Probably that they should be stateless as well. Which is a problem, because RDBMS transactions are very stateful.
When I open a transaction against a MySQL or Postgres DB, I’m forcing the server to maintain state on my behalf. You might be thinking “managing state is a database’s whole job! Why is this so terrible?”. It’s because transaction state represents an interdependency between a single client and a single server that only ends when the transaction finishes1. State as “it’s the server’s problem now” is good, state as “it’s both of our problems” is very bad.
Like every other database system, the client’s work isn’t finished until it has
sent its COMMIT
message to the server and received an acknowledgment back.
Until then, the server can always terminate the transaction2 (e.g.
due to deadlock), refuse to commit it, or fail and go away entirely.
Illustrating this point, the Postgres docs have this to say
about both the repeatable read and serializable isolation levels:
Applications using this level must be prepared to retry transactions due to serialization failures.
Most client applications completely ignore this reality. Non-trivial client apps that correctly handle retries and idempotency against their RDBMSes are extremely rare; I can count the number of such codebases I have interacted with on zero hands. If we truly cared about correctness, we’d do all DB interactions as calls to “pure” stored procedures, where each call executes a single transaction and returns an atomic result. That way, transactional logic would always be trivially retryable.
But we’re already working in two runtime environments (our app, and a SQL dialect). Managing versioning, deployment, monitoring, optimization, and error handling in yet another runtime environment (e.g. PL/pgsql or plv8) sounds like something to be avoided if at all possible. If only there was a way to force our application to interact with the DB in the same way that a stored procedure would…
Some non-SQL DBs approximate this. DynamoDB’s TransactWriteItems API
doesn’t allow the client to open a stateful transaction against the DB at all.
Instead the client submits a one-shot API call with (a) a list of conditions
that must be satisfied, like user_1.balance == 250 && user_2.balance == 80
and (b) a set of writes, like user_1.balance = 150; user_2.balance = 180
. The
DB itself doesn’t need a stored procedure language in this scenario. Instead
the client performs optimistic reads in advance of the mutation: in this
example it retrieves the two users’ balances. It then computes the new desired
balances and submits the transaction to DynamoDB as a single TransactWriteItems
API call. A coordinator node within DynamoDB is responsible for all of the
stateful 2PC aspects, and the storage nodes even know how to kick a
coordinator that is operating too slowly:
Storage nodes also invoke recovery when local items have stalled transactions…If the accepted transaction has a timestamp that is older than some threshold, the storage node sends a message with the key for the item and the pending transaction id. The recovery manager receiving this message checks the ledger for the state of the transaction and, if the transaction has not been completed, resumes its execution.
Granted, this approach makes the client developer’s life harder, since they must now follow a procedure of:
- Assign a transaction ID as an idempotency key3
- Perform a non-transactional
GetItems
to check a ledger for the transaction ID. If it is already present, return success. - Perform a non-transactional
GetItems
on the account balances - Do a local computation of updated account balances
- Formulate all of the items you read in steps 2-3 as
ConditionCheck
clauses - Send a
TransactWriteItems
API call to DynamoDB - On timeout or failure, go to step 2
We can emulate the TransactWriteItems API statelessly in a single PL/pgsql
DO
block. Postgres will even automatically roll back the effects of prior
statements if subsequent statements fail: the DO block gets its own
transaction.
DO $$
DECLARE
rc BIGINT := 0;
BEGIN
INSERT INTO transactions (id) VALUES ("abcdefg") ON CONFLICT DO NOTHING;
GET DIAGNOSTICS rc = ROW_COUNT;
IF rc <> 1 THEN
RAISE EXCEPTION 'Condition 1 failed';
END IF;
UPDATE balances
SET balance = 150
WHERE id = 1
AND balance = 250;
GET DIAGNOSTICS rc = ROW_COUNT;
IF rc <> 1 THEN
RAISE EXCEPTION 'Condition 2 failed';
END IF;
UPDATE balances
SET balance = 180
WHERE id = 2
AND balance = 80
GET DIAGNOSTICS rc = ROW_COUNT;
IF rc <> 1 THEN
RAISE EXCEPTION 'Condition 3 failed';
END IF;
END
$$;
Of course this means rewriting our application to no longer use an ORM, which sounds like a drag, especially if we have an existing app. What we’d really like is a way to continue to use our familiar ORM tools, and continue to have the database be responsible for doing transactional conflict-checking. For reference, here’s the in-ORM logic:
- Assign a transaction ID as an idempotency key
- Open a transaction
- Perform a
SELECT
against a ledger to check whether the transaction ID has already been added to the DB. If it is already present, return success. - Perform
SELECT
s to gather account balances - Do a local computation of updated account balances
- Send an
UPDATE
- Send a
COMMIT
- On timeout or failure, go to step 2
The big problem with the ORM abstraction isn’t that it executes within the client: the problem with the ORM is that it provides so few facilities for correct retries and idempotency. We can solve this by “fixating” our application on the database for the duration of the transaction. While the transaction is open, the application is only allowed to: (a) send commands to the DB and (b) do on-CPU work.
Fixating the app produces exactly the desired benefit: it prevents the application from causing side-effects while executing its transaction against its RDBMS. There’s no chance4 of doing something like making an RPC call to another internal service, calling a third party API, or otherwise acting on the result of the transaction before it has been committed. In other words, the DB logic will always be retryable, and will always execute without delay from external factors5.
The rigorous way to do this would be to watch the asyncio Task that interacts with the database, and throw an error if it ever returns into the event loop without first registering a callback on the DB connection’s file descriptor. This is achievable, but I have gone with the simpler method of just checking that the Task code has sent off a query before it returned control to the event loop.
The webapp below implements a little “bank” using SQLAlchemy, with accounts and
transfers between them. You can call the transfer endpoint to move balances
between accounts, but if you pass the error=true
URL parameter, the app will
attempt to perform asyncio.sleep()
during its DB transaction, which will
produce a ConnectionFeelsLonely
error.
#!/usr/bin/env python3.12
# Copyright 2024 Josh Snyder
# Licensed under https://www.apache.org/licenses/LICENSE-2.0.txt
import asyncio
import functools
import inspect
import traceback
import uuid
from collections import abc
from contextlib import asynccontextmanager
from contextvars import ContextVar
from enum import Enum, auto
from typing import Self
import uvicorn # 0.34.0
from fastapi import Body, Depends, FastAPI, HTTPException # 0.115.6
from pydantic import BaseModel, Field # 2.10.4
from sqlalchemy import ( # 2.0.36
Column,
DateTime,
Integer,
Numeric,
String,
create_engine,
event,
func,
)
from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import (
AsyncConnection,
AsyncEngine,
AsyncSession,
AsyncSessionTransaction,
)
from sqlalchemy.orm import declarative_base, sessionmaker
active_db_connection = ContextVar("active_db_connection")
txn_began_trace = ContextVar("txn_began_trace")
class ConnectionState(Enum):
"""Represents the state of a connection."""
WAITING_FOR_ME = auto() # in a transaction. Waiting for me to query.
WAITING_FOR_DB = auto() # in a transaction. Waiting for the DB to respond.
class ConnectionFeelsLonely(Exception):
"""Raised when the application attempts to context switch while in a
transaction, but the database hasn't been given anything to do."""
@classmethod
def create(cls, conn) -> Self | None:
"""Evaluate whether the connection `conn` feels lonely, returning an
exception if so."""
if conn is None:
return None
connection_state = conn.info.get("fixation", None)
if connection_state != ConnectionState.WAITING_FOR_ME:
# If the ball isn't in my court, we're all good.
return None
# Prefer a traceback on the connection itself, but fall back to using a
# ContextVar
trace = conn.info.get("txn_began_trace", None)
if trace is None:
trace = txn_began_trace.get(None)
return cls(conn, trace)
def __str__(self):
lines = [f"{self.args[0]!r}"]
if self.args[1] is not None:
tb = "".join(
[" {}\n".format(line) for line in self.args[1].split("\n")]
)
lines.append(" See above for offending stack trace.")
lines.append(f" Transaction began at:\n{tb}")
return "\n".join(lines)
class ConnectionFeelsScorned(Exception):
"""Raised when the application attempts to begin a transaction against a
connection while already in a transaction on a different connection."""
@classmethod
def create(cls, conn) -> Self | None:
if conn is None:
return None
return cls(conn)
def before_cursor_execute(
conn, cursor, statement, parameters, context, executemany
):
if "fixation" not in conn.info:
# Ignore connections that don't have a fixation
return
# This is our heuristic that the database has been given something to do.
conn.info["fixation"] = ConnectionState.WAITING_FOR_DB
def after_cursor_execute(
conn, cursor, statement, parameters, context, executemany
):
if "fixation" not in conn.info:
# Ignore connections that don't have a fixation
return
# The database has returned a result to us, so the ball is in our court
# to give it something to do again.
conn.info["fixation"] = ConnectionState.WAITING_FOR_ME
def on_transaction_begin(conn):
exc = ConnectionFeelsScorned.create(active_db_connection.get(None))
if exc:
# It's actually really hard to reach here. You have to create a new
# connection _without_ going to sleep, which is impossible if you have
# to do a connect() that raises EINPROGRESS/EAGAIN.
# Either that, or you forgot to wrap your coroutine with
# ConnectionFixatingWrapper
raise exc
conn.info["fixation"] = ConnectionState.WAITING_FOR_ME
# We're currently executing in a greenlet off of the main asyncio event
# loop, so the fact that we're able to set variables in the same context is
# pretty nice/useful.
active_db_connection.set(conn)
def on_transaction_end(conn):
# After the transaction ends we're back to being a normal connection with
# no fixation.
conn.info.pop("fixation", None)
if active_db_connection.get(None) is conn:
active_db_connection.set(None)
class FixatedAsyncConnection(AsyncConnection):
def begin(self):
# Store a representation of where the transaction began. This provides
# contextual information so that people can see why
# ConnectionFeelsLonely was raised.
# We need to access conn._proxied so that we can store data on the
# underlying synchronous connection, rather than the async one.
self._proxied.info["txn_began_trace"] = "".join(
traceback.format_stack()[:-1]
)
return super().begin()
class FixatedAsyncEngine(AsyncEngine):
_connection_cls = FixatedAsyncConnection
class FixatedAsyncSessionTransaction(AsyncSessionTransaction):
async def start(self, *args, **kwargs):
# When we're in an AsyncSessionTransaction, there's actually no
# connection yet, necessarily. The connection will be created lazily.
# So instead we save the stack trace on a ContextVar
txn_began_trace.set("".join(traceback.format_stack()[:-1]))
return await super().start(*args, **kwargs)
class FixatedAsyncSession(AsyncSession):
def begin(self):
return FixatedAsyncSessionTransaction(self)
sync_engine = create_engine("postgresql+asyncpg:///", echo=True)
engine = FixatedAsyncEngine(sync_engine)
event.listen(sync_engine, "begin", on_transaction_begin)
event.listen(sync_engine, "commit", on_transaction_end)
event.listen(sync_engine, "rollback", on_transaction_end)
event.listen(sync_engine, "before_cursor_execute", before_cursor_execute)
event.listen(sync_engine, "after_cursor_execute", after_cursor_execute)
class ConnectionFixatingWrapper(abc.Coroutine):
"""A coroutine that wraps an underlying coroutine. It throws exceptions
when the underlying coroutine creates "fixated" SQLAlchemy connections and
leaves them with no work to do."""
def __init__(self, coro):
# Use the `cr_await` field so that we look like a regular coroutine
# This allows any debuggers/profilers to walk the stack.
self.cr_await = coro
def send(self, v):
# Do the underlying coroutine's business
ret = self.cr_await.send(v)
# Check whether `self.cr_await` left a lonely connection
exc = ConnectionFeelsLonely.create(active_db_connection.get(None))
if not exc:
# It isn't. We're all good
return ret
return self.cr_await.throw(exc)
# Boilerplate
def throw(self, *exc):
return self.cr_await.throw(*exc)
def close(self):
return self.cr_await.close()
def __await__(self):
return self
def __iter__(self):
return self
def __next__(self):
return self.send(None)
# --- Normal DB setup ---
Session = sessionmaker(bind=engine, class_=FixatedAsyncSession)
Base = declarative_base()
async def get_db():
"""Yields a database session, rolling back on error."""
async with Session() as session:
yield session
def retry_serialization_errors(fn):
@functools.wraps(fn)
async def wrapper(session, *args, **kwargs):
while True:
try:
async with session.begin():
return await fn(session, *args, **kwargs)
except DBAPIError as e:
if not e.orig.sqlstate == "40001":
raise
return wrapper
# --- Models ---
class Balance(Base):
__tablename__ = "balances"
account_id = Column(Integer, primary_key=True)
balance = Column(Numeric, nullable=False, default=0)
class Transaction(Base):
__tablename__ = "transactions"
id = Column(String, primary_key=True, nullable=False)
source_account = Column(Integer, nullable=False)
target_account = Column(Integer, nullable=False)
amount = Column(Integer, nullable=False)
created_at = Column(DateTime, server_default=func.now())
# --- Pydantic Schemas ---
class TransferRequest(BaseModel):
source: int
target: int
amount: int
idempotency_key: str = Field(default=None)
class CreateAccountRequest(BaseModel):
initial_balance: int = Field(default=0)
# --- App setup ---
@asynccontextmanager
async def app_lifespan(app):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
app = FastAPI(lifespan=app_lifespan)
@retry_serialization_errors
async def _create_account_logic(
session: AsyncSession, data: CreateAccountRequest
):
account = Balance(balance=data.initial_balance)
session.add(account)
await session.flush()
return account.account_id
@app.post("/accounts")
async def create_account(
session: AsyncSession = Depends(get_db),
data: CreateAccountRequest = Body(...),
):
"""Create an account.
echo '{"initial_balance": 100}' | jq -c | \
curl -X POST http://localhost:8000/accounts \
-H "Content-Type: application/json" \
-d @-
"""
return await _create_account_logic(session, data)
@retry_serialization_errors
async def _transfer_logic(
session: AsyncSession, data: TransferRequest, error: bool
):
# Check idempotency
existing_txn = await session.get(Transaction, data.idempotency_key)
if existing_txn is not None:
return data.idempotency_key
src_bal = await session.get(Balance, data.source)
if src_bal is None:
raise HTTPException(status_code=404, detail="Source account not found")
dst_bal = await session.get(Balance, data.target)
if dst_bal is None:
raise HTTPException(
status_code=404, detail="Destination account not found"
)
if src_bal.balance < data.amount:
raise HTTPException(status_code=400, detail="Insufficient funds")
src_bal.balance -= data.amount
dst_bal.balance += data.amount
tx = Transaction(
id=data.idempotency_key,
source_account=data.source,
target_account=data.target,
amount=data.amount,
)
session.add(tx)
if error:
await session.flush()
# Should produce a ConnectionFeelsLonely
await asyncio.sleep(0)
return dict(id=data.idempotency_key)
@app.post("/transfer")
async def transfer(
db: AsyncSession = Depends(get_db),
data: TransferRequest = Body(...),
# Passing error=true will result in a ConnectionFeelsLonely error
error: bool = False,
):
"""Transfer funds
echo '{"source": 1, "target": 2, "amount": 50}' | jq -c | \
curl -X POST http://localhost:8000/transfer \
-H "Content-Type: application/json" \
-d @-
"""
# Generate an idempotency key if not provided
if data.idempotency_key is None:
data.idempotency_key = str(uuid.uuid4())
return await _transfer_logic(db, data, error)
# --- Server Setup ---
def _iscoroutinefunction(fn):
"""Determine if `fn` is a coroutine function, thoroughly."""
if asyncio.iscoroutinefunction(fn):
return True
try:
fn = fn.__call__
except AttributeError:
return False
return asyncio.iscoroutinefunction(fn)
def compose(fn1):
"""Make a function that composes `fn1` onto an underlying `fn2`."""
def wrapper(fn2):
@functools.wraps(fn2)
def wrapped(*args, **kwargs):
return fn1(fn2(*args, **kwargs))
# uvicorn wants to see a coroutine function, so we oblige.
if _iscoroutinefunction(fn2):
wrapped = inspect.markcoroutinefunction(wrapped)
return wrapped
return wrapper
async def main():
app_ = compose(ConnectionFixatingWrapper)(app)
config = uvicorn.Config(app_, port=8000)
server = uvicorn.Server(config)
await server.serve()
if __name__ == "__main__":
raise SystemExit(asyncio.run(main()))
-
This has all sorts of negative implications on the DB server, which has to hold additional data for MVCC purposes (e.g. undo log in MySQL/InnoDB, old tuples in Postgres). It also cannot fail over while a transaction is held open. More modern DBs just put hard limits on this kind of resource utilization: in FoundationDB your transactions can last at most 5 seconds. ↩
-
TIL that this is true even of read-only transactions, under certain circumstances. Basically Postgres can realize midway through a transaction that the read view it is providing you cannot be assigned to a serial moment in time. The docs say: “When relying on Serializable transactions to prevent anomalies, it is important that any data read from a permanent user table not be considered valid until the transaction which read it has successfully committed.” So basically you should be prepared to retry even your read-only transactions. ↩
-
In both the DynamoDB case and the transactional case, the idempotency key should probably be computed by the upstream client before the request even reaches our app. That way the upstream client can safely replay their request multiple times against our “account balances service” without needing to worry about it being replayed. ↩
-
“no chance” might be too strong, since a sufficiently motivated application developer could certainly launch a background worker that does something side-effecting while the transaction is ongoing. ↩
-
In practice “external factors” just means no other services can slow down our DB transaction logic. The app code can still be starved of CPU, or experience network latency making roundtrips to the DB. ↩