Celery Integration¶
This tutorial shows how to run EnergyPlus simulations as distributed Celery tasks. The pattern works well when you need to:
- Run hundreds or thousands of simulations across a cluster of machines.
- Integrate simulation jobs into a larger web application or data pipeline.
- Get automatic retries, rate limiting, and monitoring for free.
When to use Celery vs simulate_batch()
idfkit's built-in simulate_batch() is the
simplest way to run simulations in parallel on a single machine.
Reach for Celery when you need to distribute work across multiple
machines, integrate with an existing task queue, or require features
like retries, priority queues, and persistent result storage.
Prerequisites¶
Install idfkit and Celery with a Redis broker:
You also need a running Redis instance. The fastest way to get one locally:
Project Layout¶
# project/
# ├── celeryconfig.py # Celery configuration
# ├── tasks.py # Task definitions
# ├── submit.py # Client that submits jobs
# ├── models/ # EnergyPlus IDF/epJSON files
# │ └── office.idf
# └── weather/ # Weather files
# └── chicago.epw
Step 1: Configure Celery¶
# celeryconfig.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
# Serialisation — JSON is safe and human-readable
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
# Prevent a slow simulation from being acknowledged before it finishes.
# With acks_late the message is re-delivered if the worker crashes mid-run.
task_acks_late = True
task_reject_on_worker_lost = True
# One simulation per worker process — EnergyPlus is CPU-bound
worker_concurrency = 1
# Long timeout for annual simulations (4 hours)
task_time_limit = 14400
task_soft_time_limit = 14000
# Result expiry (24 hours)
result_expires = 86400
Key choices explained:
| Setting | Why |
|---|---|
task_acks_late = True |
The message stays in Redis until the task finishes. If a worker crashes mid-simulation, another worker picks up the job automatically. |
worker_concurrency = 1 |
EnergyPlus is CPU-bound. Running one simulation per worker process avoids CPU contention. Scale by adding more worker processes or machines instead. |
task_serializer = "json" |
Task arguments must be JSON-serializable (strings, numbers, bools). This avoids pickle security issues and makes task payloads inspectable. |
task_time_limit |
Kills the worker process if a simulation exceeds the hard limit, preventing runaway jobs from blocking the queue. |
Step 2: Define a Simulation Task¶
# tasks.py
from pathlib import Path
from celery import Celery
from idfkit import load_idf
from idfkit.simulation import simulate
app = Celery("tasks")
app.config_from_object("celeryconfig")
@app.task(bind=True, name="simulate_building")
def simulate_building(
self,
idf_path: str,
weather_path: str,
output_dir: str,
design_day: bool = False,
) -> dict:
"""Run a single EnergyPlus simulation and return a result summary."""
model = load_idf(idf_path)
result = simulate(
model,
weather_path,
output_dir=output_dir,
design_day=design_day,
timeout=14000.0,
)
return {
"success": result.success,
"runtime": result.runtime_seconds,
"output_dir": str(Path(result.run_dir).resolve()),
}
Pass file paths, not objects
Celery serializes task arguments to JSON. Pass file paths (strings)
to the task and call load_idf() inside the worker. Never try to pass
an IDFDocument directly — it is not JSON-serializable.
Step 3: Submit Jobs¶
Single Simulation¶
# submit.py — send a single simulation to the queue
from tasks import simulate_building
result = simulate_building.delay(
idf_path="models/office.idf",
weather_path="weather/chicago.epw",
output_dir="/tmp/sim-results/run-001",
design_day=True,
)
# Block until done (or poll with result.ready())
summary = result.get(timeout=3600)
print(summary)
# {"success": True, "runtime": 42.3, "output_dir": "/tmp/sim-results/run-001"}
Fan-Out Batch¶
Use Celery's group primitive to submit many simulations at once:
from celery import group
from tasks import simulate_building
# Fan-out: submit many simulations at once
jobs = group(
simulate_building.s(
idf_path=f"models/variant_{i}.idf",
weather_path="weather/chicago.epw",
output_dir=f"/tmp/sim-results/variant-{i}",
)
for i in range(20)
)
batch = jobs.apply_async()
# Wait for all results
results = batch.get(timeout=7200)
succeeded = [r for r in results if r["success"]]
print(f"{len(succeeded)}/{len(results)} simulations succeeded")
Each job runs on whichever worker is available. With 4 workers, up to 4 simulations run in parallel.
Parametric Studies¶
For parametric sweeps, pass scalar parameters to the task and build the model variant on the worker:
Task Definition¶
# tasks.py — parametric task that builds variants on the worker
from pathlib import Path
from celery import Celery
from idfkit import load_idf
from idfkit.simulation import simulate
app = Celery("tasks")
app.config_from_object("celeryconfig")
@app.task(bind=True, name="run_parametric_case")
def run_parametric_case(
self,
base_idf_path: str,
weather_path: str,
output_dir: str,
wall_conductivity: float | None = None,
window_u_factor: float | None = None,
infiltration_rate: float | None = None,
label: str = "",
) -> dict:
"""Apply parameter overrides to a base model and simulate."""
model = load_idf(base_idf_path)
# Apply overrides
if wall_conductivity is not None:
for mat in model["Material"]:
if "wall" in mat.name.lower():
mat.conductivity = wall_conductivity
if window_u_factor is not None:
for win in model["WindowMaterial:SimpleGlazingSystem"]:
win.u_factor = window_u_factor
if infiltration_rate is not None:
for inf in model["ZoneInfiltration:DesignFlowRate"]:
inf.design_flow_rate = infiltration_rate
result = simulate(model, weather_path, output_dir=output_dir, design_day=True)
return {
"label": label,
"success": result.success,
"runtime": result.runtime_seconds,
"output_dir": str(Path(result.run_dir).resolve()),
}
Submitting a Parameter Grid¶
import itertools
from celery import group
from tasks import run_parametric_case
conductivities = [0.5, 1.0, 1.5]
u_factors = [1.5, 2.5, 3.5]
jobs = group(
run_parametric_case.s(
base_idf_path="models/office.idf",
weather_path="weather/chicago.epw",
output_dir=f"/tmp/parametric/k{k}_u{u}",
wall_conductivity=k,
window_u_factor=u,
label=f"k={k}, U={u}",
)
for k, u in itertools.product(conductivities, u_factors)
)
batch = jobs.apply_async()
results = batch.get(timeout=7200)
for r in sorted(results, key=lambda x: x["label"]):
status = "OK" if r["success"] else "FAIL"
print(f"[{status}] {r['label']} ({r['runtime']:.1f}s)")
This fans out 9 jobs (3 conductivities x 3 U-factors) across all available workers.
Error Handling and Retries¶
EnergyPlus can fail for transient reasons (file system issues, resource exhaustion). Celery's built-in retry mechanism handles this:
from pathlib import Path
from celery import Celery
from idfkit import load_idf
from idfkit.exceptions import SimulationError
from idfkit.simulation import simulate
app = Celery("tasks")
app.config_from_object("celeryconfig")
@app.task(
bind=True,
name="simulate_with_retry",
autoretry_for=(SimulationError, OSError),
retry_backoff=60,
retry_backoff_max=600,
max_retries=3,
)
def simulate_with_retry(
self,
idf_path: str,
weather_path: str,
output_dir: str,
) -> dict:
"""Simulate with automatic retry on transient failures."""
model = load_idf(idf_path)
result = simulate(model, weather_path, output_dir=output_dir, design_day=True)
if not result.success:
# Raise to trigger retry for EnergyPlus errors that may be transient
raise SimulationError(
f"Simulation failed (attempt {self.request.retries + 1})",
exit_code=1,
)
return {
"success": True,
"runtime": result.runtime_seconds,
"output_dir": str(Path(result.run_dir).resolve()),
"retries": self.request.retries,
}
The autoretry_for parameter catches both idfkit SimulationError and
OS-level errors. The task retries up to 3 times with exponential backoff
(60s, then 120s, capped at 600s).
Progress Reporting¶
idfkit's on_progress callback integrates naturally with Celery's custom
state updates:
Task with Progress¶
from pathlib import Path
from celery import Celery
from idfkit import load_idf
from idfkit.simulation import SimulationProgress, simulate
app = Celery("tasks")
app.config_from_object("celeryconfig")
@app.task(bind=True, name="simulate_with_progress")
def simulate_with_progress(
self,
idf_path: str,
weather_path: str,
output_dir: str,
) -> dict:
"""Report EnergyPlus progress back to Celery task state."""
def report_progress(progress: SimulationProgress) -> None:
self.update_state(
state="SIMULATING",
meta={
"environment": progress.environment,
"percent": progress.percent,
},
)
model = load_idf(idf_path)
result = simulate(
model,
weather_path,
output_dir=output_dir,
on_progress=report_progress,
)
return {
"success": result.success,
"runtime": result.runtime_seconds,
"output_dir": str(Path(result.run_dir).resolve()),
}
Polling Progress from the Client¶
import time
from tasks import simulate_with_progress
result = simulate_with_progress.delay(
idf_path="models/office.idf",
weather_path="weather/chicago.epw",
output_dir="/tmp/sim-results/progress-demo",
)
while not result.ready():
meta = result.info # dict with progress metadata
if isinstance(meta, dict) and meta.get("percent") is not None:
pct = meta["percent"]
env = meta.get("environment", "")
print(f" {pct:.0f}% — {env}")
time.sleep(2)
print("Done:", result.get())
Caching¶
Share a SimulationCache across workers to
skip duplicate simulations. If workers run on the same machine (or share a
network file system), point the cache at a shared directory:
from pathlib import Path
from celery import Celery
from idfkit import load_idf
from idfkit.simulation import SimulationCache, simulate
app = Celery("tasks")
app.config_from_object("celeryconfig")
# Shared cache directory — must be accessible by all workers.
# Use a network file system (NFS, EFS) or a local directory if
# workers run on the same machine.
CACHE_DIR = Path("/shared/simulation-cache")
cache = SimulationCache(cache_dir=CACHE_DIR)
@app.task(bind=True, name="simulate_cached")
def simulate_cached(
self,
idf_path: str,
weather_path: str,
output_dir: str,
design_day: bool = False,
) -> dict:
"""Simulate with content-addressed caching to skip duplicate work."""
model = load_idf(idf_path)
result = simulate(
model,
weather_path,
output_dir=output_dir,
design_day=design_day,
cache=cache,
)
return {
"success": result.success,
"runtime": result.runtime_seconds,
"output_dir": str(Path(result.run_dir).resolve()),
}
When the same model + weather combination is submitted again, the worker returns the cached result instantly instead of re-running EnergyPlus.
Cloud Storage (S3)¶
Upload simulation results directly to S3 from workers:
from celery import Celery
from idfkit import load_idf
from idfkit.simulation import S3FileSystem, simulate
app = Celery("tasks")
app.config_from_object("celeryconfig")
@app.task(bind=True, name="simulate_to_s3")
def simulate_to_s3(
self,
idf_path: str,
weather_path: str,
s3_prefix: str,
design_day: bool = False,
) -> dict:
"""Run a simulation and upload results to S3."""
model = load_idf(idf_path)
fs = S3FileSystem(bucket="my-sim-bucket", prefix=s3_prefix)
result = simulate(
model,
weather_path,
output_dir="results",
design_day=design_day,
fs=fs,
)
return {
"success": result.success,
"runtime": result.runtime_seconds,
"s3_prefix": s3_prefix,
}
Workers need AWS credentials (environment variables, IAM role, or
~/.aws/credentials). See Cloud Simulations (S3)
for details.
Task Composition¶
Celery's chain primitive lets you compose multi-step workflows — for
example, running a simulation and then post-processing the results:
from celery import Celery, chain
app = Celery("tasks")
app.config_from_object("celeryconfig")
@app.task(name="collect_results")
def collect_results(sim_result: dict) -> dict:
"""Post-process a simulation result (runs after the simulation task)."""
if not sim_result["success"]:
return {"error": "simulation failed", **sim_result}
from idfkit.simulation import SimulationResult
result = SimulationResult.from_directory(sim_result["output_dir"])
heating = result.sql.get_timeseries(
variable_name="Zone Ideal Loads Heating Energy",
key_value="OFFICE",
)
return {
**sim_result,
"peak_heating_W": float(heating.max()),
}
# Compose: simulate → collect_results
from tasks import simulate_building
workflow = chain(
simulate_building.s(
idf_path="models/office.idf",
weather_path="weather/chicago.epw",
output_dir="/tmp/sim-results/chained",
design_day=True,
),
collect_results.s(),
)
final = workflow.apply_async()
print(final.get(timeout=3600))
Deployment with Docker¶
Dockerfile¶
# Dockerfile
#
# FROM nrel/energyplus:25.2.0
#
# # Install Python and uv
# RUN apt-get update && apt-get install -y python3 python3-pip curl \
# && curl -LsSf https://astral.sh/uv/install.sh | sh
#
# WORKDIR /app
# COPY pyproject.toml uv.lock ./
# RUN uv sync --frozen
#
# COPY celeryconfig.py tasks.py ./
# COPY models/ ./models/
# COPY weather/ ./weather/
#
# CMD ["uv", "run", "celery", "-A", "tasks", "worker", "--loglevel=info"]
Docker Compose¶
# docker-compose.yml
#
# services:
# redis:
# image: redis:7-alpine
# ports:
# - "6379:6379"
#
# worker:
# build: .
# command: celery -A tasks worker --loglevel=info --concurrency=1
# environment:
# - ENERGYPLUS_DIR=/usr/local/EnergyPlus-25-2-0
# volumes:
# - ./models:/app/models:ro
# - ./weather:/app/weather:ro
# - sim-results:/tmp/sim-results
# depends_on:
# - redis
# deploy:
# replicas: 4 # 4 workers = 4 parallel simulations
# resources:
# limits:
# cpus: "1" # 1 CPU per worker
# memory: 2G
#
# flower:
# image: mher/flower
# command: celery --broker=redis://redis:6379/0 flower
# ports:
# - "5555:5555"
# depends_on:
# - redis
#
# volumes:
# sim-results:
Start everything with:
This starts 4 worker containers (4 concurrent simulations), a Redis
broker, and the Flower monitoring
dashboard at http://localhost:5555.
Monitoring¶
Flower Dashboard¶
Flower provides a real-time web UI for monitoring workers, tasks, and queues:
Open http://localhost:5555 to see active workers, task history, success
rates, and runtime distributions.
CLI Inspection¶
# List active workers
celery -A tasks inspect active
# Purge all pending tasks
celery -A tasks purge
# View task result
celery -A tasks result <task-id>
Best Practices¶
-
One simulation per worker process. Set
worker_concurrency = 1. EnergyPlus is CPU-bound, and running multiple simulations in one process causes contention. Scale horizontally by adding workers. -
Use
acks_latewithreject_on_worker_lost. This ensures that if a worker dies mid-simulation, the job is re-queued and retried by another worker. -
Pass paths, not objects. All task arguments must be JSON-serializable. Pass IDF file paths and scalar parameters — load models inside the worker.
-
Set time limits. Use
task_time_limit(hard kill) andtask_soft_time_limit(raisesSoftTimeLimitExceeded) to prevent runaway simulations from blocking workers indefinitely. -
Use a shared cache for parametric studies. Point
SimulationCacheat a network file system or shared volume so workers skip duplicate model + weather combinations. -
Pin the EnergyPlus version. Ensure all workers use the same EnergyPlus version (set
ENERGYPLUS_DIRor embed it in the Docker image) to avoid inconsistent results. -
Separate queues for fast and slow jobs. Route design-day simulations to a "fast" queue and annual simulations to a "slow" queue using Celery's task routing.
-
Monitor with Flower. Run Flower to track worker health, task throughput, and failure rates in production.
See Also¶
- Batch Processing — Single-machine parallel execution
- Async Simulation — Non-blocking execution for async apps
- Cloud Simulations (S3) — S3 result storage
- Caching — Content-addressed simulation caching