Database Sharding Strategies: Range, Hash, and Directory-Based Approaches
Concept
Horizontal data partitioning — sharding — is the act of splitting a single logical dataset across multiple physical database instances, each owning a subset of the data. The goal is to exceed the storage, throughput, or concurrency limits of a single node without giving up the data model.
Every sharding decision begins with the partition key: the attribute of the data used to route each record to its shard. The partition key determines every downstream property of the system — query distribution, hotspot risk, cross-shard join cost, and rebalancing complexity. A wrong partition key cannot be corrected without a full data migration.
There are three canonical sharding strategies, each with a different trade-off profile.
Range-Based Sharding
Records are partitioned by consecutive ranges of the partition key. Shard 1 owns user IDs 1–1,000,000; Shard 2 owns 1,000,001–2,000,000; and so on. For time-series data, Shard 1 owns January 2024, Shard 2 owns February 2024, etc.
Strengths:
- Range queries are efficient. "Give me all orders from January 1–31" touches exactly one shard.
- Simple routing logic: compare key against range boundaries.
- Excellent for time-series and audit log patterns where data is written sequentially and queried by time window.
Weaknesses:
- Hot shard problem. If the partition key correlates with write activity (current month, recently created user IDs), all writes land on the same shard while older shards sit idle.
- Rebalancing complexity. Splitting a hot shard requires migrating half its data to a new shard and updating routing configuration — not atomic, not instant.
Hash-Based Sharding
The partition key is hashed, and the hash is modulo-divided by the shard count to determine the target shard. shard_index = hash(partition_key) % shard_count.
Strengths:
- Uniform write distribution. Hash functions distribute keys pseudo-randomly, eliminating hot shards caused by sequential keys.
- Predictable query routing: deterministic, no lookup required.
Weaknesses:
- Range queries are catastrophically expensive. "All orders from January" requires querying every shard and merging results.
- Rebalancing with naïve modulo is destructive. Adding a shard changes
shard_count, invalidatingN-1ofNexisting hash assignments. This is the modulo rehash problem — effectively a full data migration on every scale event.
Consistent hashing solves the modulo problem by placing shards on a hash ring. When a new shard is added, only the data between the new node's position and its predecessor needs to migrate — roughly 1/N of the dataset, not all of it. This is how DynamoDB, Cassandra, and Riak implement their partition schemes.
Directory-Based Sharding
A centralized lookup table (the directory service) maps each partition key — or range of keys — to its shard. The application queries the directory before every data request.
Strengths:
- Maximum flexibility. Routing logic is data, not code. You can move individual records between shards without changing application logic.
- Supports heterogeneous shard sizes. VIP customers can be isolated to dedicated shards without any hash function change.
Weaknesses:
- Directory becomes a single point of failure and a performance bottleneck. Every read and write first hits the directory. Without aggressive caching and high availability, the directory's reliability ceiling becomes the system's ceiling.
- Cache consistency: A stale directory cache routes requests to the wrong shard — potentially the worst failure mode (silent data access errors).
Constraints
The Cross-Shard Query Problem
Sharding breaks the relational model's global query capability. A SQL join between two tables that live on different shards cannot be executed inside the database. Options:
- Application-level join: Fetch from both shards in parallel, merge in memory. Doubles the query round-trips and moves join computation to the application server.
- Denormalization: Pre-join data into a single record that co-locates on one shard. Eliminates cross-shard joins but increases storage and write amplification.
- Broadcast query: Send the query to all shards, merge results. Correct but O(N_shards) cost — degrades linearly as you add shards.
The fundamental constraint: your partition key must align with your most frequent query patterns. If your primary query is "get all orders for a customer," your partition key must be customer_id. If it's "get all orders in a date range," the partition key must be time-based. When the two most frequent query patterns require different partition keys, you either choose one and accept cross-shard cost for the other, or you maintain two physical copies of the data with different partition keys (a common pattern in analytics systems).
Hot Shard Problem — The Operational Reality
The hot shard problem manifests when a single partition key value accounts for a disproportionate fraction of traffic. For a multi-tenant SaaS product: one enterprise customer with 10,000 active users lands on a single shard. That shard's CPU is pegged at 95%; every other shard runs at 15%. The system's capacity is effectively the capacity of the hot shard, not the aggregate capacity.
Mitigations:
- Key suffixing: Append a random suffix (1–N) to the partition key for write distribution, strip the suffix on read and query all N sub-shards. Adds read fan-out but eliminates write hotspots.
- Dedicated shard for hot tenants: Directory-based sharding specifically to isolate known-hot partitions.
- Write-ahead buffering: Buffer writes for hot keys in an in-memory queue or Redis, periodically flushing to the database.
Rebalancing: The Hidden Operational Cost
Most sharding decisions are made at system inception. The data grows. The original shard count is no longer adequate. Rebalancing — redistributing data across a larger shard set — is the most expensive operational event in a sharded system.
The rebalancing process requires:
- A mechanism to stream data from old shards to new shards while the system is live.
- A dual-write window where writes go to both old and new shards.
- A validation phase confirming new shard data is consistent.
- A cutover event that switches routing to the new shards.
- A cleanup phase removing data from old shards.
Steps 2–4 are the dangerous window. Any bug in the dual-write logic produces data divergence that's difficult to detect and expensive to repair.
Trade-offs
When NOT to Shard
Sharding is architecturally expensive. Before accepting that cost, exhaust these alternatives:
| Alternative | Upper Limit | Operational Cost |
|---|---|---|
| Read replicas | Read scaling to any level; write still constrained | Low |
| Vertical scaling (larger instance) | ~10–20TB, ~100K IOPS on cloud | Medium |
| Caching layer (Redis) | Eliminate 80%+ of read load | Medium |
| Partitioning within a single DB | Most RDBMS support table partitioning | Low |
| Columnar store for analytics | Redirects analytics queries off OLTP | Medium |
Sharding is the right choice when:
- Write throughput has exceeded the capacity of a single node (including vertical scaling ceiling)
- Data volume has exceeded what a single node can economically store
- Regulatory requirements mandate geographic data isolation
Sharding is premature when you haven't yet saturated a well-configured, vertically-scaled primary with read replicas.
Code
The following implements a consistent hash ring router in C#, demonstrating the core algorithm behind hash-based sharding with virtual nodes for uniform distribution:
// ConsistentHashShardRouter.cs
// Implements consistent hashing with virtual nodes to minimize rehash on rebalance
public class ConsistentHashShardRouter
{
private readonly SortedDictionary<uint, string> _ring = new();
private readonly int _virtualNodesPerShard;
private readonly HashAlgorithm _hashAlgorithm;
public ConsistentHashShardRouter(int virtualNodesPerShard = 150)
{
_virtualNodesPerShard = virtualNodesPerShard;
_hashAlgorithm = MD5.Create(); // MD5 for speed; SHA-256 for stronger distribution
}
public void AddShard(string shardId, string connectionString)
{
for (int vNode = 0; vNode < _virtualNodesPerShard; vNode++)
{
var virtualNodeKey = $"{shardId}#vnode{vNode}";
uint hash = ComputeHash(virtualNodeKey);
_ring[hash] = shardId;
}
}
public void RemoveShard(string shardId)
{
var keysToRemove = _ring
.Where(kvp => kvp.Value == shardId)
.Select(kvp => kvp.Key)
.ToList();
foreach (var key in keysToRemove)
_ring.Remove(key);
}
/// <summary>
/// Routes a partition key to its shard.
/// After adding a new shard: only keys between the new node and its predecessor
/// are affected — approximately (1 / N) of total keys.
/// </summary>
public string GetShardForKey(string partitionKey)
{
if (_ring.Count == 0)
throw new InvalidOperationException("No shards registered.");
uint keyHash = ComputeHash(partitionKey);
// Find the first shard on the ring clockwise from keyHash
var targetEntry = _ring.FirstOrDefault(kvp => kvp.Key >= keyHash);
// Wrap around: if no shard is clockwise, take the first shard on the ring
return targetEntry.Value ?? _ring.First().Value;
}
private uint ComputeHash(string input)
{
var bytes = _hashAlgorithm.ComputeHash(Encoding.UTF8.GetBytes(input));
return BitConverter.ToUInt32(bytes, 0);
}
}
// Usage in a repository routing layer
public class ShardedOrderRepository
{
private readonly ConsistentHashShardRouter _router;
private readonly IReadOnlyDictionary<string, IDbConnection> _shardConnections;
public ShardedOrderRepository(
ConsistentHashShardRouter router,
IReadOnlyDictionary<string, IDbConnection> shardConnections)
{
_router = router;
_shardConnections = shardConnections;
}
public async Task<Order?> FindByIdAsync(
Guid orderId,
Guid customerId, // Partition key: co-locate orders with their customer
CancellationToken cancellationToken = default)
{
string shardId = _router.GetShardForKey(customerId.ToString());
var connection = _shardConnections[shardId];
return await connection.QuerySingleOrDefaultAsync<Order>(
"SELECT * FROM orders WHERE order_id = @OrderId AND customer_id = @CustomerId",
new { OrderId = orderId, CustomerId = customerId });
}
}
The second example shows the hot shard write-distribution pattern using key suffixing:
// HotShardMitigationWriter.cs
// Distributes writes for hot partition keys across N sub-shards
// Reads must fan out to all sub-shards and merge results
public class HotShardMitigationWriter
{
private readonly ConsistentHashShardRouter _router;
private readonly IReadOnlyDictionary<string, IDbConnection> _shardConnections;
private readonly int _hotShardFanOut;
public HotShardMitigationWriter(
ConsistentHashShardRouter router,
IReadOnlyDictionary<string, IDbConnection> shardConnections,
int hotShardFanOut = 10) // Spread hot key across 10 sub-shards
{
_router = router;
_shardConnections = shardConnections;
_hotShardFanOut = hotShardFanOut;
}
/// <summary>
/// Write with hot shard mitigation: appends a random suffix to the partition key
/// to distribute writes across _hotShardFanOut virtual partitions.
/// </summary>
public async Task WriteEventAsync(
string hotPartitionKey,
DomainEvent domainEvent,
CancellationToken cancellationToken = default)
{
// Random suffix distributes writes evenly across N sub-shards
int suffix = Random.Shared.Next(0, _hotShardFanOut);
string distributedKey = $"{hotPartitionKey}#{suffix}";
string shardId = _router.GetShardForKey(distributedKey);
var connection = _shardConnections[shardId];
await connection.ExecuteAsync(
"INSERT INTO domain_events (partition_key, suffix, event_type, payload, occurred_at) " +
"VALUES (@PartitionKey, @Suffix, @EventType, @Payload, @OccurredAt)",
new
{
PartitionKey = hotPartitionKey,
Suffix = suffix,
EventType = domainEvent.EventType,
Payload = JsonSerializer.Serialize(domainEvent),
OccurredAt = domainEvent.OccurredAt
});
}
/// <summary>
/// Read requires fan-out across all N sub-shards, then merge and sort.
/// Trade-off: write throughput improved; read cost increased by factor N.
/// </summary>
public async Task<IReadOnlyList<DomainEvent>> ReadAllEventsAsync(
string partitionKey,
CancellationToken cancellationToken = default)
{
var tasks = Enumerable.Range(0, _hotShardFanOut).Select(async suffix =>
{
string distributedKey = $"{partitionKey}#{suffix}";
string shardId = _router.GetShardForKey(distributedKey);
var connection = _shardConnections[shardId];
return await connection.QueryAsync<DomainEvent>(
"SELECT * FROM domain_events WHERE partition_key = @Key AND suffix = @Suffix " +
"ORDER BY occurred_at",
new { Key = partitionKey, Suffix = suffix });
});
var results = await Task.WhenAll(tasks);
return results
.SelectMany(r => r)
.OrderBy(e => e.OccurredAt)
.ToList();
}
}
Further Reading
- Module 3 – Distributed Systems Fundamentals — consistency models and the CAP theorem in sharded systems
- Module 5 – Storage Engines & Database Internals — B-tree and LSM-tree behavior under partitioned workloads
- Module 6 – Caching Strategies — offloading reads before sharding becomes necessary
- Module 13 – Reliability Engineering — operating sharded systems: backup, recovery, and rebalancing runbooks
External references:
- Kleppmann, M. (2017). Designing Data-Intensive Applications, Ch. 6. O'Reilly.
- Amazon DynamoDB Documentation: "Choosing a Partition Key."
- DeCandia, G. et al. (2007). "Dynamo: Amazon's Highly Available Key-Value Store." SOSP '07.