Skip to content

Celery Integration

This tutorial shows how to run EnergyPlus simulations as distributed Celery tasks. The pattern works well when you need to:

  • Run hundreds or thousands of simulations across a cluster of machines.
  • Integrate simulation jobs into a larger web application or data pipeline.
  • Get automatic retries, rate limiting, and monitoring for free.

When to use Celery vs simulate_batch()

idfkit's built-in simulate_batch() is the simplest way to run simulations in parallel on a single machine. Reach for Celery when you need to distribute work across multiple machines, integrate with an existing task queue, or require features like retries, priority queues, and persistent result storage.

Prerequisites

Install idfkit and Celery with a Redis broker:

pip install idfkit celery[redis]

You also need a running Redis instance. The fastest way to get one locally:

docker run -d -p 6379:6379 redis:7-alpine

Project Layout

# project/
# ├── celeryconfig.py       # Celery configuration
# ├── tasks.py              # Task definitions
# ├── submit.py             # Client that submits jobs
# ├── models/               # EnergyPlus IDF/epJSON files
# │   └── office.idf
# └── weather/              # Weather files
#     └── chicago.epw

Step 1: Configure Celery

# celeryconfig.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"

# Serialisation — JSON is safe and human-readable
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]

# Prevent a slow simulation from being acknowledged before it finishes.
# With acks_late the message is re-delivered if the worker crashes mid-run.
task_acks_late = True
task_reject_on_worker_lost = True

# One simulation per worker process — EnergyPlus is CPU-bound
worker_concurrency = 1

# Long timeout for annual simulations (4 hours)
task_time_limit = 14400
task_soft_time_limit = 14000

# Result expiry (24 hours)
result_expires = 86400

Key choices explained:

Setting Why
task_acks_late = True The message stays in Redis until the task finishes. If a worker crashes mid-simulation, another worker picks up the job automatically.
worker_concurrency = 1 EnergyPlus is CPU-bound. Running one simulation per worker process avoids CPU contention. Scale by adding more worker processes or machines instead.
task_serializer = "json" Task arguments must be JSON-serializable (strings, numbers, bools). This avoids pickle security issues and makes task payloads inspectable.
task_time_limit Kills the worker process if a simulation exceeds the hard limit, preventing runaway jobs from blocking the queue.

Step 2: Define a Simulation Task

# tasks.py
from pathlib import Path

from celery import Celery

from idfkit import load_idf
from idfkit.simulation import simulate

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(bind=True, name="simulate_building")
def simulate_building(
    self,
    idf_path: str,
    weather_path: str,
    output_dir: str,
    design_day: bool = False,
) -> dict:
    """Run a single EnergyPlus simulation and return a result summary."""
    model = load_idf(idf_path)
    result = simulate(
        model,
        weather_path,
        output_dir=output_dir,
        design_day=design_day,
        timeout=14000.0,
    )
    return {
        "success": result.success,
        "runtime": result.runtime_seconds,
        "output_dir": str(Path(result.run_dir).resolve()),
    }

Pass file paths, not objects

Celery serializes task arguments to JSON. Pass file paths (strings) to the task and call load_idf() inside the worker. Never try to pass an IDFDocument directly — it is not JSON-serializable.

Step 3: Submit Jobs

Single Simulation

# submit.py — send a single simulation to the queue
from tasks import simulate_building

result = simulate_building.delay(
    idf_path="models/office.idf",
    weather_path="weather/chicago.epw",
    output_dir="/tmp/sim-results/run-001",
    design_day=True,
)

# Block until done (or poll with result.ready())
summary = result.get(timeout=3600)
print(summary)
# {"success": True, "runtime": 42.3, "output_dir": "/tmp/sim-results/run-001"}

Fan-Out Batch

Use Celery's group primitive to submit many simulations at once:

from celery import group
from tasks import simulate_building

# Fan-out: submit many simulations at once
jobs = group(
    simulate_building.s(
        idf_path=f"models/variant_{i}.idf",
        weather_path="weather/chicago.epw",
        output_dir=f"/tmp/sim-results/variant-{i}",
    )
    for i in range(20)
)

batch = jobs.apply_async()

# Wait for all results
results = batch.get(timeout=7200)
succeeded = [r for r in results if r["success"]]
print(f"{len(succeeded)}/{len(results)} simulations succeeded")

Each job runs on whichever worker is available. With 4 workers, up to 4 simulations run in parallel.

Parametric Studies

For parametric sweeps, pass scalar parameters to the task and build the model variant on the worker:

Task Definition

# tasks.py — parametric task that builds variants on the worker
from pathlib import Path

from celery import Celery

from idfkit import load_idf
from idfkit.simulation import simulate

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(bind=True, name="run_parametric_case")
def run_parametric_case(
    self,
    base_idf_path: str,
    weather_path: str,
    output_dir: str,
    wall_conductivity: float | None = None,
    window_u_factor: float | None = None,
    infiltration_rate: float | None = None,
    label: str = "",
) -> dict:
    """Apply parameter overrides to a base model and simulate."""
    model = load_idf(base_idf_path)

    # Apply overrides
    if wall_conductivity is not None:
        for mat in model["Material"]:
            if "wall" in mat.name.lower():
                mat.conductivity = wall_conductivity

    if window_u_factor is not None:
        for win in model["WindowMaterial:SimpleGlazingSystem"]:
            win.u_factor = window_u_factor

    if infiltration_rate is not None:
        for inf in model["ZoneInfiltration:DesignFlowRate"]:
            inf.design_flow_rate = infiltration_rate

    result = simulate(model, weather_path, output_dir=output_dir, design_day=True)
    return {
        "label": label,
        "success": result.success,
        "runtime": result.runtime_seconds,
        "output_dir": str(Path(result.run_dir).resolve()),
    }

Submitting a Parameter Grid

import itertools

from celery import group
from tasks import run_parametric_case

conductivities = [0.5, 1.0, 1.5]
u_factors = [1.5, 2.5, 3.5]

jobs = group(
    run_parametric_case.s(
        base_idf_path="models/office.idf",
        weather_path="weather/chicago.epw",
        output_dir=f"/tmp/parametric/k{k}_u{u}",
        wall_conductivity=k,
        window_u_factor=u,
        label=f"k={k}, U={u}",
    )
    for k, u in itertools.product(conductivities, u_factors)
)

batch = jobs.apply_async()
results = batch.get(timeout=7200)

for r in sorted(results, key=lambda x: x["label"]):
    status = "OK" if r["success"] else "FAIL"
    print(f"[{status}] {r['label']}  ({r['runtime']:.1f}s)")

This fans out 9 jobs (3 conductivities x 3 U-factors) across all available workers.

Error Handling and Retries

EnergyPlus can fail for transient reasons (file system issues, resource exhaustion). Celery's built-in retry mechanism handles this:

from pathlib import Path

from celery import Celery

from idfkit import load_idf
from idfkit.exceptions import SimulationError
from idfkit.simulation import simulate

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(
    bind=True,
    name="simulate_with_retry",
    autoretry_for=(SimulationError, OSError),
    retry_backoff=60,
    retry_backoff_max=600,
    max_retries=3,
)
def simulate_with_retry(
    self,
    idf_path: str,
    weather_path: str,
    output_dir: str,
) -> dict:
    """Simulate with automatic retry on transient failures."""
    model = load_idf(idf_path)
    result = simulate(model, weather_path, output_dir=output_dir, design_day=True)

    if not result.success:
        # Raise to trigger retry for EnergyPlus errors that may be transient
        raise SimulationError(
            f"Simulation failed (attempt {self.request.retries + 1})",
            exit_code=1,
        )

    return {
        "success": True,
        "runtime": result.runtime_seconds,
        "output_dir": str(Path(result.run_dir).resolve()),
        "retries": self.request.retries,
    }

The autoretry_for parameter catches both idfkit SimulationError and OS-level errors. The task retries up to 3 times with exponential backoff (60s, then 120s, capped at 600s).

Progress Reporting

idfkit's on_progress callback integrates naturally with Celery's custom state updates:

Task with Progress

from pathlib import Path

from celery import Celery

from idfkit import load_idf
from idfkit.simulation import SimulationProgress, simulate

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(bind=True, name="simulate_with_progress")
def simulate_with_progress(
    self,
    idf_path: str,
    weather_path: str,
    output_dir: str,
) -> dict:
    """Report EnergyPlus progress back to Celery task state."""

    def report_progress(progress: SimulationProgress) -> None:
        self.update_state(
            state="SIMULATING",
            meta={
                "environment": progress.environment,
                "percent": progress.percent,
            },
        )

    model = load_idf(idf_path)
    result = simulate(
        model,
        weather_path,
        output_dir=output_dir,
        on_progress=report_progress,
    )

    return {
        "success": result.success,
        "runtime": result.runtime_seconds,
        "output_dir": str(Path(result.run_dir).resolve()),
    }

Polling Progress from the Client

import time

from tasks import simulate_with_progress

result = simulate_with_progress.delay(
    idf_path="models/office.idf",
    weather_path="weather/chicago.epw",
    output_dir="/tmp/sim-results/progress-demo",
)

while not result.ready():
    meta = result.info  # dict with progress metadata
    if isinstance(meta, dict) and meta.get("percent") is not None:
        pct = meta["percent"]
        env = meta.get("environment", "")
        print(f"  {pct:.0f}%  —  {env}")
    time.sleep(2)

print("Done:", result.get())

Caching

Share a SimulationCache across workers to skip duplicate simulations. If workers run on the same machine (or share a network file system), point the cache at a shared directory:

from pathlib import Path

from celery import Celery

from idfkit import load_idf
from idfkit.simulation import SimulationCache, simulate

app = Celery("tasks")
app.config_from_object("celeryconfig")

# Shared cache directory — must be accessible by all workers.
# Use a network file system (NFS, EFS) or a local directory if
# workers run on the same machine.
CACHE_DIR = Path("/shared/simulation-cache")
cache = SimulationCache(cache_dir=CACHE_DIR)


@app.task(bind=True, name="simulate_cached")
def simulate_cached(
    self,
    idf_path: str,
    weather_path: str,
    output_dir: str,
    design_day: bool = False,
) -> dict:
    """Simulate with content-addressed caching to skip duplicate work."""
    model = load_idf(idf_path)
    result = simulate(
        model,
        weather_path,
        output_dir=output_dir,
        design_day=design_day,
        cache=cache,
    )
    return {
        "success": result.success,
        "runtime": result.runtime_seconds,
        "output_dir": str(Path(result.run_dir).resolve()),
    }

When the same model + weather combination is submitted again, the worker returns the cached result instantly instead of re-running EnergyPlus.

Cloud Storage (S3)

Upload simulation results directly to S3 from workers:

from celery import Celery

from idfkit import load_idf
from idfkit.simulation import S3FileSystem, simulate

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(bind=True, name="simulate_to_s3")
def simulate_to_s3(
    self,
    idf_path: str,
    weather_path: str,
    s3_prefix: str,
    design_day: bool = False,
) -> dict:
    """Run a simulation and upload results to S3."""
    model = load_idf(idf_path)

    fs = S3FileSystem(bucket="my-sim-bucket", prefix=s3_prefix)
    result = simulate(
        model,
        weather_path,
        output_dir="results",
        design_day=design_day,
        fs=fs,
    )

    return {
        "success": result.success,
        "runtime": result.runtime_seconds,
        "s3_prefix": s3_prefix,
    }

Workers need AWS credentials (environment variables, IAM role, or ~/.aws/credentials). See Cloud Simulations (S3) for details.

Task Composition

Celery's chain primitive lets you compose multi-step workflows — for example, running a simulation and then post-processing the results:

from celery import Celery, chain

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(name="collect_results")
def collect_results(sim_result: dict) -> dict:
    """Post-process a simulation result (runs after the simulation task)."""
    if not sim_result["success"]:
        return {"error": "simulation failed", **sim_result}

    from idfkit.simulation import SimulationResult

    result = SimulationResult.from_directory(sim_result["output_dir"])
    heating = result.sql.get_timeseries(
        variable_name="Zone Ideal Loads Heating Energy",
        key_value="OFFICE",
    )
    return {
        **sim_result,
        "peak_heating_W": float(heating.max()),
    }


# Compose: simulate → collect_results
from tasks import simulate_building

workflow = chain(
    simulate_building.s(
        idf_path="models/office.idf",
        weather_path="weather/chicago.epw",
        output_dir="/tmp/sim-results/chained",
        design_day=True,
    ),
    collect_results.s(),
)

final = workflow.apply_async()
print(final.get(timeout=3600))

Deployment with Docker

Dockerfile

# Dockerfile
#
# FROM nrel/energyplus:25.2.0
#
# # Install Python and uv
# RUN apt-get update && apt-get install -y python3 python3-pip curl \
#     && curl -LsSf https://astral.sh/uv/install.sh | sh
#
# WORKDIR /app
# COPY pyproject.toml uv.lock ./
# RUN uv sync --frozen
#
# COPY celeryconfig.py tasks.py ./
# COPY models/ ./models/
# COPY weather/ ./weather/
#
# CMD ["uv", "run", "celery", "-A", "tasks", "worker", "--loglevel=info"]

Docker Compose

# docker-compose.yml
#
# services:
#   redis:
#     image: redis:7-alpine
#     ports:
#       - "6379:6379"
#
#   worker:
#     build: .
#     command: celery -A tasks worker --loglevel=info --concurrency=1
#     environment:
#       - ENERGYPLUS_DIR=/usr/local/EnergyPlus-25-2-0
#     volumes:
#       - ./models:/app/models:ro
#       - ./weather:/app/weather:ro
#       - sim-results:/tmp/sim-results
#     depends_on:
#       - redis
#     deploy:
#       replicas: 4          # 4 workers = 4 parallel simulations
#       resources:
#         limits:
#           cpus: "1"         # 1 CPU per worker
#           memory: 2G
#
#   flower:
#     image: mher/flower
#     command: celery --broker=redis://redis:6379/0 flower
#     ports:
#       - "5555:5555"
#     depends_on:
#       - redis
#
# volumes:
#   sim-results:

Start everything with:

docker compose up -d --scale worker=4

This starts 4 worker containers (4 concurrent simulations), a Redis broker, and the Flower monitoring dashboard at http://localhost:5555.

Monitoring

Flower Dashboard

Flower provides a real-time web UI for monitoring workers, tasks, and queues:

pip install flower
celery -A tasks flower --port=5555

Open http://localhost:5555 to see active workers, task history, success rates, and runtime distributions.

CLI Inspection

# List active workers
celery -A tasks inspect active

# Purge all pending tasks
celery -A tasks purge

# View task result
celery -A tasks result <task-id>

Best Practices

  1. One simulation per worker process. Set worker_concurrency = 1. EnergyPlus is CPU-bound, and running multiple simulations in one process causes contention. Scale horizontally by adding workers.

  2. Use acks_late with reject_on_worker_lost. This ensures that if a worker dies mid-simulation, the job is re-queued and retried by another worker.

  3. Pass paths, not objects. All task arguments must be JSON-serializable. Pass IDF file paths and scalar parameters — load models inside the worker.

  4. Set time limits. Use task_time_limit (hard kill) and task_soft_time_limit (raises SoftTimeLimitExceeded) to prevent runaway simulations from blocking workers indefinitely.

  5. Use a shared cache for parametric studies. Point SimulationCache at a network file system or shared volume so workers skip duplicate model + weather combinations.

  6. Pin the EnergyPlus version. Ensure all workers use the same EnergyPlus version (set ENERGYPLUS_DIR or embed it in the Docker image) to avoid inconsistent results.

  7. Separate queues for fast and slow jobs. Route design-day simulations to a "fast" queue and annual simulations to a "slow" queue using Celery's task routing.

  8. Monitor with Flower. Run Flower to track worker health, task throughput, and failure rates in production.

See Also