1. Requirements & Scope (5 min)

Functional Requirements

  1. Schedule jobs to run at a specific time, on a recurring cron schedule, or after a delay (e.g., “run in 30 minutes”)
  2. Execute jobs with at-least-once semantics — every scheduled job must run, even if a worker crashes mid-execution
  3. Support job dependencies (DAGs) — Job B runs only after Job A completes successfully
  4. Provide job priority levels (critical, high, normal, low) with priority-based queue ordering
  5. Support job lifecycle management: create, pause, resume, cancel, retry; expose job status, logs, and execution history

Non-Functional Requirements

  • Availability: 99.99% — missed or delayed jobs can cost real money (billing runs, report generation, data pipelines)
  • Latency: Job dispatch within 1 second of scheduled time. Job completion depends on the job itself.
  • Consistency: Exactly-once scheduling (no duplicate dispatches), at-least-once execution (idempotent jobs handle retries)
  • Scale: 10M scheduled jobs, 100K job executions per minute at peak, 10K concurrent running jobs
  • Durability: Job definitions and execution history must survive any single node failure. Zero job loss.

2. Estimation (3 min)

Traffic

  • 10M total scheduled jobs (mix of one-time and recurring)
  • Recurring jobs: 1M cron jobs, average frequency = every hour → 1M/3600 = ~278 job dispatches/sec from cron alone
  • One-time jobs: 100K scheduled per hour → ~28 dispatches/sec
  • Peak: 5× average → ~1500 dispatches/sec
  • API calls (CRUD + status checks): ~5000 QPS

Storage

  • Job definition: ~2 KB (name, schedule, payload, config, dependencies)
  • 10M jobs × 2 KB = 20 GB — fits in a single PostgreSQL instance
  • Execution history: 100K executions/min × 1 KB per record = 100 MB/min = 144 GB/day
  • Retention: 30 days of execution history = ~4.3 TB (needs archival strategy)

Worker Pool

  • Average job duration: 30 seconds
  • 100K executions/min ÷ 60 = 1,667 jobs/sec
  • 1,667 jobs/sec × 30 sec average = 50,000 concurrent job slots needed
  • With 16 slots per worker: ~3,125 worker machines

Key Insight

The scheduler itself is not compute-heavy — the challenge is reliable, timely dispatch at scale with exactly-once semantics. The workers that execute jobs are a separate scaling concern. The hardest problems are: (1) distributing scheduling responsibility without missing or duplicating jobs, (2) handling worker failures mid-execution, and (3) orchestrating DAG dependencies efficiently.


3. API Design (3 min)

Job Management

// Create a one-time job
POST /jobs
Body: {
  "name": "monthly_billing_report",
  "type": "one_time",
  "scheduled_at": "2024-02-22T18:00:00Z",
  "payload": { "report_type": "billing", "month": "2024-01" },
  "callback_url": "https://billing.internal/webhook",
  "priority": "critical",
  "timeout_seconds": 300,
  "max_retries": 3,
  "retry_backoff": "exponential",      // 30s, 60s, 120s
  "idempotency_key": "billing-2024-01", // prevent duplicate creation
  "tags": ["billing", "monthly"]
}
Response 201: { "job_id": "job_abc123", "status": "scheduled", ... }

// Create a recurring job
POST /jobs
Body: {
  "name": "hourly_etl_pipeline",
  "type": "recurring",
  "cron": "0 * * * *",                 // every hour
  "payload": { "pipeline": "user_events" },
  "priority": "high",
  "timeout_seconds": 3600,
  "max_retries": 2
}

// Create a DAG
POST /dags
Body: {
  "dag_id": "daily_data_pipeline",
  "schedule": "0 2 * * *",             // 2 AM daily
  "jobs": [
    { "job_id": "extract",  "depends_on": [] },
    { "job_id": "transform", "depends_on": ["extract"] },
    { "job_id": "load",      "depends_on": ["transform"] },
    { "job_id": "validate",  "depends_on": ["load"] },
    { "job_id": "notify",    "depends_on": ["validate"] }
  ]
}

// Job lifecycle
PATCH /jobs/{job_id}    Body: { "status": "paused" }
DELETE /jobs/{job_id}

// Query
GET /jobs/{job_id}
GET /jobs/{job_id}/executions?limit=50
GET /jobs?tag=billing&status=failed&page=1

Key Decisions

  • Idempotency key on job creation prevents duplicate scheduling from retried API calls
  • Callback URL for async notification when job completes (webhook)
  • Tags for filtering and grouping in the dashboard
  • DAGs are first-class entities, not just linked jobs

4. Data Model (3 min)

Job Definitions (PostgreSQL)

Table: jobs
  job_id              | UUID (PK)
  name                | varchar(255)
  type                | enum('one_time', 'recurring')
  status              | enum('scheduled', 'running', 'paused', 'completed', 'failed', 'cancelled')
  cron_expression     | varchar(100)    -- NULL for one-time
  next_run_at         | timestamp       -- when to dispatch next (indexed)
  payload             | jsonb
  callback_url        | varchar(500)
  priority            | enum('critical', 'high', 'normal', 'low')
  timeout_seconds     | int
  max_retries         | int
  idempotency_key     | varchar(255)    -- unique index
  dag_id              | UUID            -- NULL if standalone
  depends_on          | UUID[]          -- job_ids this depends on
  tags                | text[]
  created_at          | timestamp
  updated_at          | timestamp
  owner_partition     | int             -- which scheduler shard owns this job

Index: (status, next_run_at) WHERE status = 'scheduled'   -- for poll query
Index: (dag_id, status)                                     -- for DAG orchestration
Index: (owner_partition, next_run_at)                       -- for partitioned scheduling

Execution History (PostgreSQL, partitioned by date)

Table: job_executions (partitioned by started_at monthly)
  execution_id        | UUID (PK)
  job_id              | UUID (FK)
  status              | enum('dispatched', 'running', 'succeeded', 'failed', 'timed_out')
  attempt_number      | int
  started_at          | timestamp
  completed_at        | timestamp
  worker_id           | varchar(100)
  result              | jsonb           -- output or error details
  duration_ms         | int

Dead Letter Queue (PostgreSQL)

Table: dead_letter_jobs
  job_id              | UUID
  last_execution_id   | UUID
  failure_reason      | text
  failed_at           | timestamp
  payload             | jsonb
  retry_count         | int

Why PostgreSQL

  • Strong consistency (ACID) — critical for job scheduling to prevent duplicates
  • SELECT ... FOR UPDATE SKIP LOCKED — built-in support for distributed queue patterns
  • JSONB for flexible payloads
  • Partitioning for execution history (millions of rows/day, partition by month, drop old partitions)
  • For 10M jobs and 4.3TB execution history, use PostgreSQL with TimescaleDB extension or partition aggressively

5. High-Level Design (12 min)

Architecture

Client/API
  → API Service (stateless) → PostgreSQL (job definitions)

Scheduler Service (partitioned, leader per partition):
  → Poll: SELECT jobs WHERE next_run_at ≤ now AND status='scheduled'
           AND owner_partition = my_partition
           FOR UPDATE SKIP LOCKED
  → For each due job:
       1. Mark status = 'dispatched'
       2. Create execution record
       3. Enqueue to Kafka (priority topic)
       4. If recurring: compute next_run_at, keep scheduled

Kafka (priority topics):
  → topic: jobs.critical
  → topic: jobs.high
  → topic: jobs.normal
  → topic: jobs.low

Worker Pool:
  → Workers consume from priority topics (critical first)
  → Execute job payload
  → Report result back (success/failure)
  → Send heartbeat every 10 seconds while running

Heartbeat Monitor:
  → Track worker heartbeats
  → If no heartbeat for 30 seconds: assume worker dead
  → Re-enqueue job (increment attempt counter)
  → If max retries exceeded: move to dead letter queue

Partitioned Scheduling

10M jobs split across 100 scheduler partitions:
  Partition 0: jobs WHERE owner_partition = 0 (100K jobs)
  Partition 1: jobs WHERE owner_partition = 1 (100K jobs)
  ...

Each partition has a single active scheduler (leader):
  Leader election via PostgreSQL advisory locks or ZooKeeper
  If leader dies, standby takes over within 5 seconds

Why partition?
  Single scheduler polling 10M rows is too slow.
  100 schedulers each polling 100K rows: fast and parallelizable.
  Jobs assigned to partitions by hash(job_id) % 100

DAG Execution Flow

DAG: A → B → C
              ↘ D

1. Scheduler dispatches A (no dependencies)
2. A completes → DAG Orchestrator checks:
   "What jobs depend on A?" → [B]
   "Are all of B's dependencies completed?" → [A: yes] → dispatch B
3. B completes → check dependents: [C, D]
   C dependencies: [B] → all done → dispatch C
   D dependencies: [B] → all done → dispatch D
4. C and D run in parallel
5. When all terminal nodes complete → DAG status = 'completed'

Components

  1. API Service: REST/gRPC, stateless. CRUD for jobs and DAGs. Validates cron expressions, assigns partition.
  2. Scheduler Service (100 instances): Each owns a partition. Polls PostgreSQL every second for due jobs. Dispatches to Kafka. Computes next cron run time.
  3. Kafka: Message queue with priority topics. Durability ensures jobs survive worker crashes. Consumer groups for worker scaling.
  4. Worker Pool (3000+ machines): Heterogeneous executors (Docker containers, Lambda functions, process pools). Pull jobs from Kafka, execute, report status.
  5. Heartbeat Monitor: Tracks liveness of workers. Detects stuck/dead jobs. Re-queues or dead-letters.
  6. DAG Orchestrator: Listens for job completion events. Evaluates DAG dependency graph. Dispatches next eligible jobs.
  7. PostgreSQL: Source of truth for job definitions, execution history, and scheduling state.

6. Deep Dives (15 min)

Deep Dive 1: Exactly-Once Scheduling, At-Least-Once Execution

The scheduling problem: How do we ensure a job is dispatched exactly once, even with multiple scheduler instances and potential crashes?

Approach: Transactional outbox pattern

BEGIN;
  -- Claim the job (SKIP LOCKED prevents other schedulers from seeing it)
  SELECT * FROM jobs
  WHERE owner_partition = 7
    AND status = 'scheduled'
    AND next_run_at <= now()
  ORDER BY priority, next_run_at
  LIMIT 100
  FOR UPDATE SKIP LOCKED;

  -- For each claimed job:
  UPDATE jobs SET status = 'dispatched', updated_at = now() WHERE job_id = ?;

  INSERT INTO job_executions (execution_id, job_id, status, attempt_number, started_at)
  VALUES (gen_uuid(), ?, 'dispatched', 1, now());

  INSERT INTO outbox (event_id, topic, payload, created_at)
  VALUES (gen_uuid(), 'jobs.high', '{"job_id": "...", "payload": ...}', now());

  -- If recurring, schedule next run
  UPDATE jobs SET next_run_at = compute_next_cron(cron_expression, now())
  WHERE job_id = ? AND type = 'recurring';
COMMIT;

Outbox relay: A separate process reads the outbox table and publishes to Kafka. After successful publish, marks the outbox row as sent. This guarantees that the database state and the Kafka message are consistent — no orphaned dispatches.

At-least-once execution:

  • Worker pulls job from Kafka → executes → reports success/failure
  • If worker crashes mid-execution: Kafka consumer offset not committed → message redelivered to another worker
  • Jobs must be idempotent: running the same job twice produces the same result (use idempotency keys, check-before-write patterns)
  • For non-idempotent jobs: use a fencing token (execution_id). The job checks if its execution_id is still the active one before committing results.

Deep Dive 2: Handling Worker Failures and Timeouts

The problem: A worker picks up a job and then crashes, hangs, or loses network. The job is “in progress” forever.

Heartbeat-based liveness detection:

Worker lifecycle:
  1. Pull job from Kafka
  2. Start heartbeat thread: every 10 seconds, send:
     POST /heartbeat { execution_id, worker_id, progress_pct }
  3. Execute job
  4. On completion: report result, stop heartbeat
  5. On failure: report error, stop heartbeat

Heartbeat Monitor:
  - Maintains in-memory map: execution_id → last_heartbeat_time
  - Every 15 seconds, scan for stale entries:
    if now() - last_heartbeat > 30 seconds:
      → Mark execution as 'timed_out'
      → Re-enqueue job (if attempts < max_retries)
      → Increment attempt counter

Timeout handling:

Job has timeout_seconds = 300 (5 minutes)

Worker-side: wraps job execution in a timer
  If execution exceeds timeout → kill the process/thread → report timeout

Server-side (defense in depth):
  Scheduler checks: jobs WHERE status = 'running'
    AND started_at + timeout_seconds < now()
    AND last_heartbeat + 30s < now()
  → Force-timeout these jobs (worker may be alive but stuck in an infinite loop with heartbeats still working)

Retry with exponential backoff:

Attempt 1: immediate
Attempt 2: after 30 seconds
Attempt 3: after 60 seconds
Attempt 4: after 120 seconds (if max_retries = 4)

If all retries exhausted:
  → Move to dead letter queue
  → Alert oncall (PagerDuty webhook)
  → Job marked as 'dead_lettered'
  → Manual intervention required (inspect logs, fix payload, re-enqueue)

Poison pill protection:

  • If a job fails 3 times in < 1 minute, it’s likely a poison pill (bad payload causing immediate crash)
  • Circuit breaker: stop retrying, dead-letter immediately
  • Don’t let a poison pill consume all retry slots and worker capacity

Deep Dive 3: DAG Scheduling and Dependency Resolution

The problem: Jobs with dependencies form a Directed Acyclic Graph (DAG). How do we efficiently resolve dependencies and dispatch jobs as their prerequisites complete?

DAG representation:

DAG stored in PostgreSQL:
  dag_id: "daily_pipeline"
  jobs: [
    { id: "extract",   depends_on: [] },
    { id: "transform",  depends_on: ["extract"] },
    { id: "load_dim",   depends_on: ["transform"] },
    { id: "load_fact",  depends_on: ["transform"] },
    { id: "aggregate",  depends_on: ["load_dim", "load_fact"] },
    { id: "report",     depends_on: ["aggregate"] }
  ]

  Execution graph:
  extract → transform → load_dim  → aggregate → report
                      ↘ load_fact ↗

DAG execution lifecycle:

1. Trigger (cron fires at 2 AM):
   → Create dag_run record: { dag_id, run_id, status: 'running' }
   → Create job_instance for each job in DAG: { run_id, job_id, status: 'pending' }
   → Find root jobs (no dependencies): ["extract"]
   → Dispatch root jobs

2. On job completion event:
   UPDATE job_instances SET status = 'completed' WHERE run_id = ? AND job_id = 'extract';

   -- Find next dispatchable jobs
   SELECT ji.job_id
   FROM job_instances ji
   JOIN job_dependencies jd ON ji.job_id = jd.job_id
   WHERE ji.run_id = ? AND ji.status = 'pending'
   GROUP BY ji.job_id
   HAVING COUNT(*) = (
     SELECT COUNT(*) FROM job_dependencies WHERE job_id = ji.job_id
     AND depends_on_job_id IN (
       SELECT job_id FROM job_instances WHERE run_id = ? AND status = 'completed'
     )
   );

   → Dispatch: ["transform"]

3. On job failure:
   → Mark job as 'failed'
   → Check failure policy:
     - "fail_fast": cancel all pending jobs in DAG, mark DAG as 'failed'
     - "continue": only fail downstream dependents, continue independent branches
     - "retry": retry failed job up to max_retries before failing DAG

Cycle detection: On DAG creation, run topological sort. If it fails (cycle detected), reject the DAG with an error.

Dynamic DAGs: Some workflows need runtime-determined structure (e.g., “run transform for each country, then aggregate”). Support this via a “fan-out” operator that creates sub-jobs dynamically based on the output of a parent job.


7. Extensions (2 min)

  • Multi-tenancy and quotas: Each team/tenant gets a job quota (max concurrent, max per day). Enforce quotas at the scheduler level. Prevent one team from monopolizing the worker pool.
  • Job chaining and hooks: Pre-execution hooks (validate environment, acquire locks) and post-execution hooks (cleanup, trigger downstream). Webhook notifications on state transitions.
  • Delayed jobs with sub-second precision: For delay < 1 second, use an in-memory timing wheel instead of polling PostgreSQL. Hashed Timing Wheel supports O(1) insertion and expiration.
  • Cost-aware scheduling: Tag jobs with resource requirements (CPU, memory, GPU). Schedule on appropriate worker pools. Autoscale worker pools based on queue depth. Spot instances for low-priority jobs.
  • Observability and debugging: Gantt chart visualization of DAG execution. Distributed tracing (each job carries a trace_id). Log aggregation per execution. Historical analytics: P50/P99 execution times, failure rates by job type.