1. Requirements & Scope (5 min)
Functional Requirements
- Schedule jobs to run at a specific time, on a recurring cron schedule, or after a delay (e.g., “run in 30 minutes”)
- Execute jobs with at-least-once semantics — every scheduled job must run, even if a worker crashes mid-execution
- Support job dependencies (DAGs) — Job B runs only after Job A completes successfully
- Provide job priority levels (critical, high, normal, low) with priority-based queue ordering
- Support job lifecycle management: create, pause, resume, cancel, retry; expose job status, logs, and execution history
Non-Functional Requirements
- Availability: 99.99% — missed or delayed jobs can cost real money (billing runs, report generation, data pipelines)
- Latency: Job dispatch within 1 second of scheduled time. Job completion depends on the job itself.
- Consistency: Exactly-once scheduling (no duplicate dispatches), at-least-once execution (idempotent jobs handle retries)
- Scale: 10M scheduled jobs, 100K job executions per minute at peak, 10K concurrent running jobs
- Durability: Job definitions and execution history must survive any single node failure. Zero job loss.
2. Estimation (3 min)
Traffic
- 10M total scheduled jobs (mix of one-time and recurring)
- Recurring jobs: 1M cron jobs, average frequency = every hour → 1M/3600 = ~278 job dispatches/sec from cron alone
- One-time jobs: 100K scheduled per hour → ~28 dispatches/sec
- Peak: 5× average → ~1500 dispatches/sec
- API calls (CRUD + status checks): ~5000 QPS
Storage
- Job definition: ~2 KB (name, schedule, payload, config, dependencies)
- 10M jobs × 2 KB = 20 GB — fits in a single PostgreSQL instance
- Execution history: 100K executions/min × 1 KB per record = 100 MB/min = 144 GB/day
- Retention: 30 days of execution history = ~4.3 TB (needs archival strategy)
Worker Pool
- Average job duration: 30 seconds
- 100K executions/min ÷ 60 = 1,667 jobs/sec
- 1,667 jobs/sec × 30 sec average = 50,000 concurrent job slots needed
- With 16 slots per worker: ~3,125 worker machines
Key Insight
The scheduler itself is not compute-heavy — the challenge is reliable, timely dispatch at scale with exactly-once semantics. The workers that execute jobs are a separate scaling concern. The hardest problems are: (1) distributing scheduling responsibility without missing or duplicating jobs, (2) handling worker failures mid-execution, and (3) orchestrating DAG dependencies efficiently.
3. API Design (3 min)
Job Management
// Create a one-time job
POST /jobs
Body: {
"name": "monthly_billing_report",
"type": "one_time",
"scheduled_at": "2024-02-22T18:00:00Z",
"payload": { "report_type": "billing", "month": "2024-01" },
"callback_url": "https://billing.internal/webhook",
"priority": "critical",
"timeout_seconds": 300,
"max_retries": 3,
"retry_backoff": "exponential", // 30s, 60s, 120s
"idempotency_key": "billing-2024-01", // prevent duplicate creation
"tags": ["billing", "monthly"]
}
Response 201: { "job_id": "job_abc123", "status": "scheduled", ... }
// Create a recurring job
POST /jobs
Body: {
"name": "hourly_etl_pipeline",
"type": "recurring",
"cron": "0 * * * *", // every hour
"payload": { "pipeline": "user_events" },
"priority": "high",
"timeout_seconds": 3600,
"max_retries": 2
}
// Create a DAG
POST /dags
Body: {
"dag_id": "daily_data_pipeline",
"schedule": "0 2 * * *", // 2 AM daily
"jobs": [
{ "job_id": "extract", "depends_on": [] },
{ "job_id": "transform", "depends_on": ["extract"] },
{ "job_id": "load", "depends_on": ["transform"] },
{ "job_id": "validate", "depends_on": ["load"] },
{ "job_id": "notify", "depends_on": ["validate"] }
]
}
// Job lifecycle
PATCH /jobs/{job_id} Body: { "status": "paused" }
DELETE /jobs/{job_id}
// Query
GET /jobs/{job_id}
GET /jobs/{job_id}/executions?limit=50
GET /jobs?tag=billing&status=failed&page=1
Key Decisions
- Idempotency key on job creation prevents duplicate scheduling from retried API calls
- Callback URL for async notification when job completes (webhook)
- Tags for filtering and grouping in the dashboard
- DAGs are first-class entities, not just linked jobs
4. Data Model (3 min)
Job Definitions (PostgreSQL)
Table: jobs
job_id | UUID (PK)
name | varchar(255)
type | enum('one_time', 'recurring')
status | enum('scheduled', 'running', 'paused', 'completed', 'failed', 'cancelled')
cron_expression | varchar(100) -- NULL for one-time
next_run_at | timestamp -- when to dispatch next (indexed)
payload | jsonb
callback_url | varchar(500)
priority | enum('critical', 'high', 'normal', 'low')
timeout_seconds | int
max_retries | int
idempotency_key | varchar(255) -- unique index
dag_id | UUID -- NULL if standalone
depends_on | UUID[] -- job_ids this depends on
tags | text[]
created_at | timestamp
updated_at | timestamp
owner_partition | int -- which scheduler shard owns this job
Index: (status, next_run_at) WHERE status = 'scheduled' -- for poll query
Index: (dag_id, status) -- for DAG orchestration
Index: (owner_partition, next_run_at) -- for partitioned scheduling
Execution History (PostgreSQL, partitioned by date)
Table: job_executions (partitioned by started_at monthly)
execution_id | UUID (PK)
job_id | UUID (FK)
status | enum('dispatched', 'running', 'succeeded', 'failed', 'timed_out')
attempt_number | int
started_at | timestamp
completed_at | timestamp
worker_id | varchar(100)
result | jsonb -- output or error details
duration_ms | int
Dead Letter Queue (PostgreSQL)
Table: dead_letter_jobs
job_id | UUID
last_execution_id | UUID
failure_reason | text
failed_at | timestamp
payload | jsonb
retry_count | int
Why PostgreSQL
- Strong consistency (ACID) — critical for job scheduling to prevent duplicates
SELECT ... FOR UPDATE SKIP LOCKED— built-in support for distributed queue patterns- JSONB for flexible payloads
- Partitioning for execution history (millions of rows/day, partition by month, drop old partitions)
- For 10M jobs and 4.3TB execution history, use PostgreSQL with TimescaleDB extension or partition aggressively
5. High-Level Design (12 min)
Architecture
Client/API
→ API Service (stateless) → PostgreSQL (job definitions)
Scheduler Service (partitioned, leader per partition):
→ Poll: SELECT jobs WHERE next_run_at ≤ now AND status='scheduled'
AND owner_partition = my_partition
FOR UPDATE SKIP LOCKED
→ For each due job:
1. Mark status = 'dispatched'
2. Create execution record
3. Enqueue to Kafka (priority topic)
4. If recurring: compute next_run_at, keep scheduled
Kafka (priority topics):
→ topic: jobs.critical
→ topic: jobs.high
→ topic: jobs.normal
→ topic: jobs.low
Worker Pool:
→ Workers consume from priority topics (critical first)
→ Execute job payload
→ Report result back (success/failure)
→ Send heartbeat every 10 seconds while running
Heartbeat Monitor:
→ Track worker heartbeats
→ If no heartbeat for 30 seconds: assume worker dead
→ Re-enqueue job (increment attempt counter)
→ If max retries exceeded: move to dead letter queue
Partitioned Scheduling
10M jobs split across 100 scheduler partitions:
Partition 0: jobs WHERE owner_partition = 0 (100K jobs)
Partition 1: jobs WHERE owner_partition = 1 (100K jobs)
...
Each partition has a single active scheduler (leader):
Leader election via PostgreSQL advisory locks or ZooKeeper
If leader dies, standby takes over within 5 seconds
Why partition?
Single scheduler polling 10M rows is too slow.
100 schedulers each polling 100K rows: fast and parallelizable.
Jobs assigned to partitions by hash(job_id) % 100
DAG Execution Flow
DAG: A → B → C
↘ D
1. Scheduler dispatches A (no dependencies)
2. A completes → DAG Orchestrator checks:
"What jobs depend on A?" → [B]
"Are all of B's dependencies completed?" → [A: yes] → dispatch B
3. B completes → check dependents: [C, D]
C dependencies: [B] → all done → dispatch C
D dependencies: [B] → all done → dispatch D
4. C and D run in parallel
5. When all terminal nodes complete → DAG status = 'completed'
Components
- API Service: REST/gRPC, stateless. CRUD for jobs and DAGs. Validates cron expressions, assigns partition.
- Scheduler Service (100 instances): Each owns a partition. Polls PostgreSQL every second for due jobs. Dispatches to Kafka. Computes next cron run time.
- Kafka: Message queue with priority topics. Durability ensures jobs survive worker crashes. Consumer groups for worker scaling.
- Worker Pool (3000+ machines): Heterogeneous executors (Docker containers, Lambda functions, process pools). Pull jobs from Kafka, execute, report status.
- Heartbeat Monitor: Tracks liveness of workers. Detects stuck/dead jobs. Re-queues or dead-letters.
- DAG Orchestrator: Listens for job completion events. Evaluates DAG dependency graph. Dispatches next eligible jobs.
- PostgreSQL: Source of truth for job definitions, execution history, and scheduling state.
6. Deep Dives (15 min)
Deep Dive 1: Exactly-Once Scheduling, At-Least-Once Execution
The scheduling problem: How do we ensure a job is dispatched exactly once, even with multiple scheduler instances and potential crashes?
Approach: Transactional outbox pattern
BEGIN;
-- Claim the job (SKIP LOCKED prevents other schedulers from seeing it)
SELECT * FROM jobs
WHERE owner_partition = 7
AND status = 'scheduled'
AND next_run_at <= now()
ORDER BY priority, next_run_at
LIMIT 100
FOR UPDATE SKIP LOCKED;
-- For each claimed job:
UPDATE jobs SET status = 'dispatched', updated_at = now() WHERE job_id = ?;
INSERT INTO job_executions (execution_id, job_id, status, attempt_number, started_at)
VALUES (gen_uuid(), ?, 'dispatched', 1, now());
INSERT INTO outbox (event_id, topic, payload, created_at)
VALUES (gen_uuid(), 'jobs.high', '{"job_id": "...", "payload": ...}', now());
-- If recurring, schedule next run
UPDATE jobs SET next_run_at = compute_next_cron(cron_expression, now())
WHERE job_id = ? AND type = 'recurring';
COMMIT;
Outbox relay: A separate process reads the outbox table and publishes to Kafka. After successful publish, marks the outbox row as sent. This guarantees that the database state and the Kafka message are consistent — no orphaned dispatches.
At-least-once execution:
- Worker pulls job from Kafka → executes → reports success/failure
- If worker crashes mid-execution: Kafka consumer offset not committed → message redelivered to another worker
- Jobs must be idempotent: running the same job twice produces the same result (use idempotency keys, check-before-write patterns)
- For non-idempotent jobs: use a fencing token (execution_id). The job checks if its execution_id is still the active one before committing results.
Deep Dive 2: Handling Worker Failures and Timeouts
The problem: A worker picks up a job and then crashes, hangs, or loses network. The job is “in progress” forever.
Heartbeat-based liveness detection:
Worker lifecycle:
1. Pull job from Kafka
2. Start heartbeat thread: every 10 seconds, send:
POST /heartbeat { execution_id, worker_id, progress_pct }
3. Execute job
4. On completion: report result, stop heartbeat
5. On failure: report error, stop heartbeat
Heartbeat Monitor:
- Maintains in-memory map: execution_id → last_heartbeat_time
- Every 15 seconds, scan for stale entries:
if now() - last_heartbeat > 30 seconds:
→ Mark execution as 'timed_out'
→ Re-enqueue job (if attempts < max_retries)
→ Increment attempt counter
Timeout handling:
Job has timeout_seconds = 300 (5 minutes)
Worker-side: wraps job execution in a timer
If execution exceeds timeout → kill the process/thread → report timeout
Server-side (defense in depth):
Scheduler checks: jobs WHERE status = 'running'
AND started_at + timeout_seconds < now()
AND last_heartbeat + 30s < now()
→ Force-timeout these jobs (worker may be alive but stuck in an infinite loop with heartbeats still working)
Retry with exponential backoff:
Attempt 1: immediate
Attempt 2: after 30 seconds
Attempt 3: after 60 seconds
Attempt 4: after 120 seconds (if max_retries = 4)
If all retries exhausted:
→ Move to dead letter queue
→ Alert oncall (PagerDuty webhook)
→ Job marked as 'dead_lettered'
→ Manual intervention required (inspect logs, fix payload, re-enqueue)
Poison pill protection:
- If a job fails 3 times in < 1 minute, it’s likely a poison pill (bad payload causing immediate crash)
- Circuit breaker: stop retrying, dead-letter immediately
- Don’t let a poison pill consume all retry slots and worker capacity
Deep Dive 3: DAG Scheduling and Dependency Resolution
The problem: Jobs with dependencies form a Directed Acyclic Graph (DAG). How do we efficiently resolve dependencies and dispatch jobs as their prerequisites complete?
DAG representation:
DAG stored in PostgreSQL:
dag_id: "daily_pipeline"
jobs: [
{ id: "extract", depends_on: [] },
{ id: "transform", depends_on: ["extract"] },
{ id: "load_dim", depends_on: ["transform"] },
{ id: "load_fact", depends_on: ["transform"] },
{ id: "aggregate", depends_on: ["load_dim", "load_fact"] },
{ id: "report", depends_on: ["aggregate"] }
]
Execution graph:
extract → transform → load_dim → aggregate → report
↘ load_fact ↗
DAG execution lifecycle:
1. Trigger (cron fires at 2 AM):
→ Create dag_run record: { dag_id, run_id, status: 'running' }
→ Create job_instance for each job in DAG: { run_id, job_id, status: 'pending' }
→ Find root jobs (no dependencies): ["extract"]
→ Dispatch root jobs
2. On job completion event:
UPDATE job_instances SET status = 'completed' WHERE run_id = ? AND job_id = 'extract';
-- Find next dispatchable jobs
SELECT ji.job_id
FROM job_instances ji
JOIN job_dependencies jd ON ji.job_id = jd.job_id
WHERE ji.run_id = ? AND ji.status = 'pending'
GROUP BY ji.job_id
HAVING COUNT(*) = (
SELECT COUNT(*) FROM job_dependencies WHERE job_id = ji.job_id
AND depends_on_job_id IN (
SELECT job_id FROM job_instances WHERE run_id = ? AND status = 'completed'
)
);
→ Dispatch: ["transform"]
3. On job failure:
→ Mark job as 'failed'
→ Check failure policy:
- "fail_fast": cancel all pending jobs in DAG, mark DAG as 'failed'
- "continue": only fail downstream dependents, continue independent branches
- "retry": retry failed job up to max_retries before failing DAG
Cycle detection: On DAG creation, run topological sort. If it fails (cycle detected), reject the DAG with an error.
Dynamic DAGs: Some workflows need runtime-determined structure (e.g., “run transform for each country, then aggregate”). Support this via a “fan-out” operator that creates sub-jobs dynamically based on the output of a parent job.
7. Extensions (2 min)
- Multi-tenancy and quotas: Each team/tenant gets a job quota (max concurrent, max per day). Enforce quotas at the scheduler level. Prevent one team from monopolizing the worker pool.
- Job chaining and hooks: Pre-execution hooks (validate environment, acquire locks) and post-execution hooks (cleanup, trigger downstream). Webhook notifications on state transitions.
- Delayed jobs with sub-second precision: For delay < 1 second, use an in-memory timing wheel instead of polling PostgreSQL. Hashed Timing Wheel supports O(1) insertion and expiration.
- Cost-aware scheduling: Tag jobs with resource requirements (CPU, memory, GPU). Schedule on appropriate worker pools. Autoscale worker pools based on queue depth. Spot instances for low-priority jobs.
- Observability and debugging: Gantt chart visualization of DAG execution. Distributed tracing (each job carries a trace_id). Log aggregation per execution. Historical analytics: P50/P99 execution times, failure rates by job type.