Scaling databases is one of the most challenging aspects of application growth. What works for 1,000 users breaks at 100,000, and solutions for 100,000 users become bottlenecks at millions. This comprehensive guide shares proven strategies for scaling databases from startup to enterprise scale, covering vertical scaling, read replicas, caching, sharding, and knowing when to adopt alternative database technologies.
The Scaling Reality
Instagram scaled to 14 million users on a single PostgreSQL database before needing sharding. Twitter rewrote their architecture three times to handle growth. Database scaling isn't just technical—it's about making strategic decisions at the right time.
Understanding Database Performance Bottlenecks
Before scaling, understand what's actually slow. Database performance problems typically stem from CPU saturation, disk I/O limits, memory constraints, network bandwidth, or lock contention. Each requires different solutions.
Profile before optimizing. Use database-specific tools: PostgreSQL's pg_stat_statements, MySQL's slow query log, or MongoDB's profiler. Identify the queries consuming the most time and resources. Often, adding indexes or optimizing queries eliminates the need for complex scaling strategies.
- CPU bottlenecks: Inefficient queries, missing indexes, complex aggregations
- Disk I/O bottlenecks: Too many random reads, insufficient IOPS, large table scans
- Memory bottlenecks: Working set exceeds RAM, inefficient buffer pool usage
- Network bottlenecks: Transferring too much data, chatty applications
- Lock contention: Long-running transactions, row-level lock conflicts
- Connection exhaustion: Too many concurrent connections, connection pool issues
Stage 1: Optimize First, Scale Second
Most database performance problems are solved through optimization, not scaling. Start here before investing in complex infrastructure.
-- Identify slow queries in PostgreSQL
SELECT
query,
calls,
total_exec_time,
mean_exec_time,
max_exec_time,
stddev_exec_time
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 20;
-- Analyze query execution plan
EXPLAIN (ANALYZE, BUFFERS)
SELECT u.*, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
WHERE u.created_at > NOW() - INTERVAL '30 days'
GROUP BY u.id;
-- Check missing indexes
SELECT
schemaname,
tablename,
attname,
n_distinct,
correlation
FROM pg_stats
WHERE schemaname = 'public'
AND tablename = 'orders'
AND n_distinct > 100
AND correlation < 0.1;
-- Create appropriate indexes
CREATE INDEX CONCURRENTLY idx_orders_user_created
ON orders(user_id, created_at DESC);
-- Partial index for common filters
CREATE INDEX idx_active_orders
ON orders(user_id, created_at)
WHERE status = 'active';
-- Composite index for common query pattern
CREATE INDEX idx_orders_composite
ON orders(user_id, status, created_at DESC);Stage 2: Vertical Scaling - Bigger Hardware
Vertical scaling (bigger servers) is the simplest approach and often underutilized. Modern cloud providers offer database instances with 768GB RAM and 128 vCPUs. This can handle tens of millions of users before needing horizontal scaling.
Advantages: No application changes required, maintains ACID guarantees, simpler operations. Disadvantages: Limited by hardware maximums, single point of failure, expensive at extreme sizes, downtime during upgrades.
- Start with storage optimization: Use SSD/NVMe for faster I/O
- Add RAM: More memory for buffer cache means fewer disk reads
- Increase CPU: Better query performance and more concurrent queries
- Monitor resource utilization: Scale when consistently over 70% usage
- Use provisioned IOPS for predictable performance
- Consider AWS RDS, Google Cloud SQL, or Azure Database for managed scaling
Stage 3: Read Replicas for Read-Heavy Workloads
Most applications are read-heavy (90%+ reads vs writes). Read replicas distribute read queries across multiple database instances while the primary handles writes.
// Database connection pool with read replica routing
import { Pool } from 'pg';
class DatabasePool {
constructor() {
// Primary for writes
this.primaryPool = new Pool({
host: process.env.DB_PRIMARY_HOST,
port: 5432,
database: 'myapp',
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 20,
idleTimeoutMillis: 30000,
});
// Read replicas for reads
this.replicaPools = [
new Pool({
host: process.env.DB_REPLICA1_HOST,
port: 5432,
database: 'myapp',
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 50, // More connections for read-heavy load
idleTimeoutMillis: 30000,
}),
new Pool({
host: process.env.DB_REPLICA2_HOST,
port: 5432,
database: 'myapp',
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 50,
idleTimeoutMillis: 30000,
}),
];
this.replicaIndex = 0;
}
// Round-robin load balancing across replicas
getReadPool() {
const pool = this.replicaPools[this.replicaIndex];
this.replicaIndex = (this.replicaIndex + 1) % this.replicaPools.length;
return pool;
}
// Always use primary for writes
getWritePool() {
return this.primaryPool;
}
// Execute read query on replica
async query(sql, params, options = {}) {
const pool = options.write ? this.getWritePool() : this.getReadPool();
try {
return await pool.query(sql, params);
} catch (error) {
// Fallback to primary on replica failure
if (!options.write && this.isReplicaError(error)) {
console.warn('Replica query failed, falling back to primary');
return await this.primaryPool.query(sql, params);
}
throw error;
}
}
isReplicaError(error) {
return error.code === 'ECONNREFUSED' ||
error.code === 'ETIMEDOUT';
}
}
// Usage in application
const db = new DatabasePool();
// Read queries use replicas
const users = await db.query(
'SELECT * FROM users WHERE status = $1',
['active']
);
// Write queries use primary
const result = await db.query(
'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING *',
['John Doe', 'john@example.com'],
{ write: true }
);
// Handle replication lag
class UserService {
async createUser(data) {
// Write to primary
const user = await db.query(
'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING *',
[data.name, data.email],
{ write: true }
);
// Read from primary immediately after write to avoid replication lag
const fullUser = await db.query(
'SELECT * FROM users WHERE id = $1',
[user.rows[0].id],
{ write: true } // Force primary read
);
return fullUser.rows[0];
}
async getUser(id) {
// Reads can use replicas
const result = await db.query(
'SELECT * FROM users WHERE id = $1',
[id]
);
return result.rows[0];
}
}Stage 4: Caching for Performance
Caching is the most effective way to reduce database load. Redis or Memcached can serve millions of requests per second, orders of magnitude faster than disk-based databases.
// Multi-layer caching strategy
import Redis from 'ioredis';
import { LRUCache } from 'lru-cache';
class CachedDatabase {
constructor(db) {
this.db = db;
// L1: In-memory cache (fastest, limited size)
this.l1Cache = new LRUCache({
max: 1000, // Store 1000 items
ttl: 60000, // 1 minute TTL
updateAgeOnGet: true,
});
// L2: Redis cache (fast, larger, shared across instances)
this.redis = new Redis({
host: process.env.REDIS_HOST,
port: 6379,
maxRetriesPerRequest: 3,
});
}
async getUser(id) {
const cacheKey = `user:${id}`;
// Check L1 cache
let user = this.l1Cache.get(cacheKey);
if (user) {
console.log('L1 cache hit');
return user;
}
// Check L2 cache (Redis)
const cached = await this.redis.get(cacheKey);
if (cached) {
console.log('L2 cache hit');
user = JSON.parse(cached);
// Populate L1 cache
this.l1Cache.set(cacheKey, user);
return user;
}
// Cache miss - query database
console.log('Cache miss - querying database');
const result = await this.db.query(
'SELECT * FROM users WHERE id = $1',
[id]
);
user = result.rows[0];
if (!user) return null;
// Store in both cache layers
this.l1Cache.set(cacheKey, user);
await this.redis.setex(cacheKey, 3600, JSON.stringify(user));
return user;
}
async invalidateUser(id) {
const cacheKey = `user:${id}`;
// Invalidate both cache layers
this.l1Cache.delete(cacheKey);
await this.redis.del(cacheKey);
}
async updateUser(id, data) {
// Update database
const result = await this.db.query(
'UPDATE users SET name = $1, email = $2 WHERE id = $3 RETURNING *',
[data.name, data.email, id],
{ write: true }
);
const user = result.rows[0];
// Invalidate cache (option 1: delete)
await this.invalidateUser(id);
// Or update cache (option 2: write-through)
const cacheKey = `user:${id}`;
this.l1Cache.set(cacheKey, user);
await this.redis.setex(cacheKey, 3600, JSON.stringify(user));
return user;
}
// Cache warm-up for frequently accessed data
async warmCache() {
const result = await this.db.query(
'SELECT * FROM users WHERE last_active > NOW() - INTERVAL \'1 day\' LIMIT 1000'
);
const pipeline = this.redis.pipeline();
for (const user of result.rows) {
const cacheKey = `user:${user.id}`;
pipeline.setex(cacheKey, 3600, JSON.stringify(user));
}
await pipeline.exec();
console.log(`Warmed cache with ${result.rows.length} users`);
}
// Cache-aside pattern with stampede protection
async getUserWithStampedeProtection(id) {
const cacheKey = `user:${id}`;
const lockKey = `lock:${cacheKey}`;
// Check cache
const cached = await this.redis.get(cacheKey);
if (cached) return JSON.parse(cached);
// Acquire lock to prevent cache stampede
const lockAcquired = await this.redis.set(
lockKey,
'1',
'EX', 10, // 10 second lock
'NX' // Only set if not exists
);
if (lockAcquired) {
try {
// Query database
const result = await this.db.query(
'SELECT * FROM users WHERE id = $1',
[id]
);
const user = result.rows[0];
// Update cache
if (user) {
await this.redis.setex(cacheKey, 3600, JSON.stringify(user));
}
return user;
} finally {
// Release lock
await this.redis.del(lockKey);
}
} else {
// Another process is loading - wait and retry
await new Promise(resolve => setTimeout(resolve, 100));
return this.getUserWithStampedeProtection(id);
}
}
}Stage 5: Connection Pooling
Database connections are expensive. Opening and closing connections for every query wastes resources. Connection pooling maintains a pool of reusable connections, dramatically improving performance.
- Size pool appropriately: Start with 2-3x CPU cores, adjust based on metrics
- Set connection timeouts to prevent connection leaks
- Monitor pool utilization and queue depth
- Use PgBouncer or ProxySQL for connection pooling at database level
- Implement circuit breakers for connection failures
- Close idle connections to free resources
Stage 6: Database Sharding - Horizontal Partitioning
Sharding splits data across multiple database instances. Each shard contains a subset of data, allowing linear scalability. This is complex but necessary for massive scale.
// Sharding implementation
class ShardedDatabase {
constructor() {
// Multiple database shards
this.shards = [
new Pool({ host: 'shard1.db.internal', database: 'myapp_shard1' }),
new Pool({ host: 'shard2.db.internal', database: 'myapp_shard2' }),
new Pool({ host: 'shard3.db.internal', database: 'myapp_shard3' }),
new Pool({ host: 'shard4.db.internal', database: 'myapp_shard4' }),
];
}
// Hash-based sharding by user ID
getShardForUser(userId) {
const hash = this.hashUserId(userId);
const shardIndex = hash % this.shards.length;
return this.shards[shardIndex];
}
hashUserId(userId) {
// Consistent hashing for stable shard assignment
let hash = 0;
const str = userId.toString();
for (let i = 0; i < str.length; i++) {
hash = ((hash << 5) - hash) + str.charCodeAt(i);
hash = hash & hash; // Convert to 32-bit integer
}
return Math.abs(hash);
}
// Query specific shard
async getUserOrders(userId) {
const shard = this.getShardForUser(userId);
const result = await shard.query(
'SELECT * FROM orders WHERE user_id = $1 ORDER BY created_at DESC',
[userId]
);
return result.rows;
}
// Query across all shards (expensive!)
async getAllRecentOrders() {
const promises = this.shards.map(shard =>
shard.query(
'SELECT * FROM orders WHERE created_at > NOW() - INTERVAL \'1 hour\''
)
);
const results = await Promise.all(promises);
// Merge results from all shards
const allOrders = results.flatMap(r => r.rows);
// Sort merged results
return allOrders.sort((a, b) =>
b.created_at.getTime() - a.created_at.getTime()
);
}
// Transactions within a single shard
async createOrder(userId, items) {
const shard = this.getShardForUser(userId);
const client = await shard.connect();
try {
await client.query('BEGIN');
// Insert order
const orderResult = await client.query(
'INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING *',
[userId, items.reduce((sum, item) => sum + item.price, 0)]
);
const order = orderResult.rows[0];
// Insert order items
for (const item of items) {
await client.query(
'INSERT INTO order_items (order_id, product_id, quantity, price) VALUES ($1, $2, $3, $4)',
[order.id, item.productId, item.quantity, item.price]
);
}
await client.query('COMMIT');
return order;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
// Resharding - moving data to new shard count
async reshard(newShardCount) {
// This is complex and requires careful planning!
// 1. Create new shards
// 2. Copy data to new shard assignment
// 3. Maintain dual writes during migration
// 4. Switch reads to new shards
// 5. Remove old shards
console.warn('Resharding is a complex operation requiring downtime or dual-write period');
}
}Sharding Strategies
Different sharding approaches balance simplicity, performance, and flexibility:
- Range-based sharding: Shard 1 (IDs 1-1M), Shard 2 (IDs 1M-2M). Simple but can create hotspots.
- Hash-based sharding: Hash user ID to determine shard. Even distribution but resharding is complex.
- Geographic sharding: Shard by region. Good for latency, compliance. Complex cross-region queries.
- Entity-based sharding: Shard by tenant or customer. Perfect for multi-tenant SaaS.
- Consistent hashing: Minimizes data movement when adding/removing shards.
- Hybrid approaches: Combine strategies for specific needs.
Stage 7: Database Partitioning
Partitioning splits tables within a single database. Unlike sharding, partitions are transparent to applications. PostgreSQL and MySQL support native partitioning.
-- PostgreSQL table partitioning by date range
CREATE TABLE orders (
id BIGSERIAL,
user_id BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL,
total DECIMAL(10,2),
status VARCHAR(50)
) PARTITION BY RANGE (created_at);
-- Create partitions for each month
CREATE TABLE orders_2024_01 PARTITION OF orders
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE orders_2024_02 PARTITION OF orders
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
CREATE TABLE orders_2024_03 PARTITION OF orders
FOR VALUES FROM ('2024-03-01') TO ('2024-04-01');
-- Indexes on each partition
CREATE INDEX idx_orders_2024_01_user ON orders_2024_01(user_id);
CREATE INDEX idx_orders_2024_02_user ON orders_2024_02(user_id);
CREATE INDEX idx_orders_2024_03_user ON orders_2024_03(user_id);
-- Queries automatically use appropriate partition
SELECT * FROM orders
WHERE created_at BETWEEN '2024-02-01' AND '2024-02-28';
-- Only scans orders_2024_02 partition
-- Partition pruning improves performance
EXPLAIN SELECT * FROM orders
WHERE created_at > '2024-02-15';
-- Shows which partitions are scanned
-- Automated partition management
CREATE OR REPLACE FUNCTION create_monthly_partition()
RETURNS void AS $$
DECLARE
partition_date DATE := DATE_TRUNC('month', CURRENT_DATE + INTERVAL '1 month');
partition_name TEXT := 'orders_' || TO_CHAR(partition_date, 'YYYY_MM');
start_date TEXT := partition_date::TEXT;
end_date TEXT := (partition_date + INTERVAL '1 month')::TEXT;
BEGIN
EXECUTE format(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF orders FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
EXECUTE format(
'CREATE INDEX IF NOT EXISTS idx_%I_user ON %I(user_id)',
partition_name, partition_name
);
END;
$$ LANGUAGE plpgsql;
-- Schedule monthly partition creation
SELECT cron.schedule(
'create-monthly-partition',
'0 0 1 * *', -- First day of each month
'SELECT create_monthly_partition()'
);When to Use NoSQL Databases
Sometimes the right scaling strategy is switching to a database designed for your workload. NoSQL databases excel at specific use cases where RDBMS struggles.
MongoDB for flexible schemas and document storage. Cassandra for massive write throughput and multi-datacenter replication. DynamoDB for serverless, auto-scaling key-value storage. Elasticsearch for full-text search and analytics. Neo4j for graph relationships and traversals. Choose based on access patterns, not trends.
Monitoring and Metrics
You can't optimize what you don't measure. Comprehensive monitoring is essential for database scaling.
- Query Performance: Slow query log, query execution time distribution
- Resource Utilization: CPU, memory, disk I/O, network
- Connection Pool: Active connections, queue depth, wait times
- Replication Lag: Monitor lag on read replicas
- Cache Hit Ratio: Measure cache effectiveness
- Lock Waits: Identify lock contention and blocking queries
- Database Size: Track growth rate and plan capacity
- Error Rates: Failed queries, connection errors, timeouts
Database Migrations at Scale
Schema changes on large databases require careful planning. A simple ALTER TABLE can lock tables for hours on billion-row tables.
- Use online schema change tools: pt-online-schema-change for MySQL, pg_repack for PostgreSQL
- Add columns without defaults to avoid table rewrites
- Create indexes CONCURRENTLY to avoid blocking writes
- Break large migrations into smaller, incremental steps
- Test migrations on production-sized datasets
- Have rollback procedures ready
- Monitor replication lag during migrations
Common Scaling Mistakes
- Premature optimization: Sharding before exhausting simpler options
- Ignoring indexes: Most performance issues are solved with proper indexes
- Over-normalizing: Excessive joins hurt performance at scale
- Not monitoring: Flying blind without metrics
- Forgetting about backups: Scaling increases backup time and complexity
- Ignoring replication lag: Stale data causes bugs
- No connection pooling: Overwhelming database with connections
- Inconsistent sharding keys: Creating unbalanced shards
The Scaling Decision Tree
Follow this progression when scaling databases:
- Step 1: Optimize queries and add indexes (hours of work, 10x improvement)
- Step 2: Add caching layer (days of work, 100x improvement for reads)
- Step 3: Vertical scaling - bigger hardware (instant, 2-4x improvement)
- Step 4: Read replicas for read-heavy loads (days of work, linear scaling for reads)
- Step 5: Connection pooling (hours of work, better resource utilization)
- Step 6: Partitioning for large tables (weeks of work, significant improvement)
- Step 7: Sharding (months of work, unlimited scaling but high complexity)
- Step 8: Consider NoSQL for specific workloads (major rewrite, optimal for use case)
Real-World Scaling Examples
Instagram ran on single PostgreSQL database to 14M users using aggressive caching and read replicas. Discord handles billions of messages using Cassandra for message storage and ScyllaDB for high-write analytics. Slack uses MySQL sharded by workspace for isolation and compliance. Pinterest uses sharded MySQL for user data and HBase for massive time-series data.
Conclusion
Database scaling is a journey requiring careful planning and execution. Start simple with query optimization and vertical scaling. Add complexity only when necessary, following the proven progression: caching, read replicas, partitioning, and finally sharding. Monitor religiously, measure everything, and make data-driven decisions. Most importantly, optimize before scaling—the best performance improvement is often a well-placed index, not a distributed database architecture.
Need Database Scaling Help?
At Jishu Labs, we've scaled databases from thousands to millions of users across PostgreSQL, MySQL, MongoDB, and Cassandra. Our database team can audit your architecture and design a scaling strategy. Contact us to discuss your database challenges.
David Kumar
David Kumar is a Cloud Infrastructure Architect at Jishu Labs with extensive experience scaling databases for high-traffic applications. He has designed database architectures handling billions of queries daily for Fortune 500 companies.