Should all databases be shard-per-core?
Our previous post concluded that we could quantify the statefulness of any given service by the length of time needed to bring a new node into service in a given cluster. I dubbed this metric “time to first response” (TTFR). The natural next question to ask would be: can I assume that two services with the same TTFR will scale in the same way? Like, if I have a “user profiles” service and it is backed by a ScyllaDB cluster, should I be doing the same math to scale both the application and its backing database?
The answer is an emphatic no! Let’s make these assumptions about both services, for simplicity:
- They should be capable of handling 960k requests per second (RPS).
- Requests arrive according to a steady and perfectly random (Poisson) process.
- The user being queried in any given request is sampled according to a uniform distribution.
- The amount of time required for a CPU to execute each request is sampled from an exponential distribution that is identical across each CPU.
- The average request time is 100 microseconds, implying a per-CPU saturation limit of 10k RPS.
- We are willing to allow up to 150 microseconds of latency for each request, which means that up to 50 microseconds can be spent queuing. This represents a 50% slowdown.
- Each CPU handles requests on a first-come-first-served basis, and context-switching between requests is free.
In other words, these are the two of the most well-behaved services to ever grace this planet. Since we’re planning to send 960k requests per second to each service, our bare minimum resource requirement is 960 / 10=96 CPUs. But 96 CPUs would be just barely enough to keep up with load, not enough to ensure that we meet our latency SLO.
Remarkably, the application service needs only 2 extra CPUs to reach 50% slowdown: we can do the job with just 98 CPUs! Add two more CPUs (making an even 100), and you’ll reliably achieve less than 20% slowdown. The ability to load-balance work perfectly evenly is incredibly powerful here. In Kendall’s notation, this is an idealized M/M/k system.
Switching over to ScyllaDB, we quickly run into a problem. With the application service, each CPU was fungible with all of the other CPUs, so all of the CPUs would need to be simultaneously busy in order for a queue to build up. But ScyllaDB has a shard-per-core architecture1, meaning any given query has only one suitable CPU to query within its latency domain2. If we’re willing to accept a 50% slowdown, we actually need to overprovision servers by a factor of three: 288 CPUs. The general trend is a factor of \( 1 + \frac{1}{S} \): achieving a 33% slowdown implies 4x servers, a 25% slowdown implies 5x, and a 20% slowdown implies 6x. This is stunning, and I had trouble believing the numbers myself.
ScyllaDB implements one M/M/1 queue per shard, meaning that each CPU is on its own, and cannot recruit help from other CPUs. But if we assign CPUs to shards in pairs, then the total required core count drops to 168 (1.75x). As quads, the requirement is 132 (1.375x). As octets, the requirement is only 104 (1.083x); this is optimal because it is the raw resource requirement of 96, plus a single extra group of 8.
In retrospect, I suppose these numbers shouldn’t surprise me, since it’s the same “balls into bins” discussion that we have when designing request routers for unsharded services, like our hypothetical user profiles service. Random load balancing is bad, because it fragments the entire system into M/M/1 queues. Its opposite—a single queue—is optimal, since it implements a perfect M/M/k. Within that spectrum, two random choices is an improvement. More generally, an operator gets to navigate a course between the two extremes, which we’ll name:
- scylla: assign requests to single CPUs. Pay costs in the form of queueing overhead.
- charybdis: assign requests to a single global queue. Pay costs in the form of coordination entering and exiting the queue.
Is Cassandra vulnerable to this same problem? Certainly if you deploy a separate Cassandra JVM-per-core, you’ll run into this issue3. But a database can also easily encounter this problem with its own internal data structures. In the ScyllaDB example, the queues form on a per-CPU basis. For Cassandra, it would be because queues formed waiting on locks within the JVM.
Let’s hypothesize that you need to hold a lock on a partition in order to query it4. In that case, the access to the partition is gated by the lock’s own queue. So long as the load on each partition is small, this shouldn’t be a problem. But queueing delays would build up if you were to deploy either: (1) a very small number of partitions per JVM, or (2) a “hot” partition. The same oversubscription math applies (reciprocally):
- a lock that is occupied 16.67% of the time produces a 20% slowdown
- a lock that is occupied 20% of the time produces a 25% slowdown
- a lock that is occupied 25% of the time produces a 33% slowdown
- a lock that is occupied 33% of the time produces a 50% slowdown
(In case anyone is curious, the general formula for waiting time on a mutex with Poisson arrivals is the Pollaczek–Khinchine formula. Among other things, it teaches us that we can reduce queueing time by a factor of two if each query were to take a constant amount of time, rather than being drawn from an exponential distribution. Variance is costly, apparently.)
So yeah, mutexes are bad. And if you are lucky enough to not use mutexes, node shape matters: 36x 4-core machines will be meaningfully different from 9x 16-core machines, even though the cluster is 144 cores either way. Intuitively, this is because the bigger machines have the ability to perform parallel reads on the same data across multiple cores.
Methods
This simulator was written in-part by GPT-4o1 (preview), which did a reasonably good job IMHO.
import heapq
import random
import numpy as np
class Request:
def __init__(self, arrival_time):
self.arrival_time = arrival_time
self.start_service_time = None
self.completion_time = None
class Server:
def __init__(self, concurrency, service_time_fn):
self.active_requests = 0
self.queue = []
self.max_concurrency = concurrency
self.service_time_fn = service_time_fn
def process_request(self, current_time, request, event_list):
if self.active_requests < self.max_concurrency:
self.active_requests += 1
request.start_service_time = current_time
service_time = self.service_time_fn()
request.completion_time = current_time + service_time
heapq.heappush(
event_list,
(request.completion_time, "departure", self, request),
)
else:
self.queue.append(request)
def finish_request(self, current_time, event_list):
self.active_requests -= 1
if self.queue:
next_request = self.queue.pop(0)
self.process_request(current_time, next_request, event_list)
def simulate_queueing():
num_servers = 288
simulation_time = 1 # second
arrival_rate = 960000 # per second
service_rate = 1 / 100e-6 # per second
service_time_fn = lambda: np.random.exponential(scale=1 / service_rate)
max_concurrency = 1
servers = [
Server(max_concurrency, service_time_fn) for i in range(num_servers)
]
event_list = []
# Initialize variables
current_time = 0.0
all_requests = []
# Schedule first arrival
interarrival_time = np.random.exponential(scale=1 / arrival_rate)
heapq.heappush(event_list, (current_time + interarrival_time, "arrival"))
while event_list:
current_time, event_type, *event_info = heapq.heappop(event_list)
if event_type == "arrival" and current_time < simulation_time:
# Create new request
request = Request(current_time)
all_requests.append(request)
server = random.choice(servers)
server.process_request(current_time, request, event_list)
# Schedule next arrival
interarrival_time = np.random.exponential(scale=1 / arrival_rate)
heapq.heappush(
event_list, (current_time + interarrival_time, "arrival")
)
elif event_type == "departure":
server, request = event_info
server.finish_request(current_time, event_list)
# After simulation ends
total_jobs_processed = len(all_requests)
total_queueing_delay = sum(
request.start_service_time - request.arrival_time
for request in all_requests
)
total_expected_service_time = (
total_jobs_processed * 100e-6
) # Mean service time in seconds
print(f"Total queueing delay: {total_queueing_delay:.6f} seconds")
print(
f"Total expected service time: "
f"{total_expected_service_time:.6f} seconds"
)
print(f"Number of jobs processed: {total_jobs_processed}")
ratio = total_queueing_delay / total_expected_service_time
print(
f"Total queueing delay divided by total expected service time: "
f"{ratio:.6f}"
)
print("Simulation complete.")
simulate_queueing()
-
We should note at this point that I’ve never run ScyllaDB personally, and I can’t speak to its actual in-the-wild performance. I chose it because it is a good example of a high performance thread-per-core database. ↩
-
To be specific, I’m assuming an RF=3 and that each latency domain (“rack”) is far enough from its peers that we want to minimize cross-rack traffic. YMMV. ↩
-
There are other good reasons not to do this: you’ll be overpaying massively for on-heap RAM and GC time. For better or for worse, JVMs were built to amortize fixed costs across many CPUs. ↩
-
It would be interesting to analyze to what extent this hypothesis is true. It’s actually a bit difficult, since many “lock-free” datastructures are actually taking advantage of locking within your memory controller. ↩