Async Simulation¶
The async simulation API provides non-blocking counterparts to simulate()
and simulate_batch(), built on Python's asyncio module. Use these when
you need to run EnergyPlus simulations inside an async application (FastAPI,
Jupyter async, event-driven orchestrators) or when you want streaming
progress without callbacks.
When to Use Async vs Sync¶
| Scenario | Recommended |
|---|---|
| Scripts, CLI tools | simulate() / simulate_batch() |
| Async web servers (FastAPI, aiohttp) | async_simulate() / async_simulate_batch() |
| Real-time progress UI without callbacks | async_simulate_batch_stream() |
Jupyter with await support |
Either — async avoids blocking the notebook |
| Mixing simulations with other async I/O | async_simulate() |
Basic Usage¶
import asyncio
from idfkit import load_idf
from idfkit.simulation import async_simulate
async def main():
model = load_idf("building.idf")
result = await async_simulate(model, "weather.epw", design_day=True)
print(f"Success: {result.success}")
print(f"Runtime: {result.runtime_seconds:.1f}s")
asyncio.run(main())
async_simulate() accepts exactly the same parameters as simulate() and
returns the same SimulationResult. The only difference is that EnergyPlus
runs as an asyncio subprocess, so the event loop is free to do other work
while waiting.
Async Batch Processing¶
async_simulate_batch() mirrors simulate_batch() but uses an
asyncio.Semaphore for concurrency control instead of a thread pool:
import asyncio
from idfkit.simulation import async_simulate_batch, SimulationJob
async def main():
jobs = [
SimulationJob(model=model1, weather="weather.epw", label="baseline"),
SimulationJob(model=model2, weather="weather.epw", label="improved"),
]
batch = await async_simulate_batch(jobs, max_concurrent=4)
print(f"Completed: {len(batch.succeeded)}/{len(batch)}")
for i, result in enumerate(batch):
print(f" Job {i}: {'Success' if result.success else 'Failed'}")
asyncio.run(main())
Results are returned in the same order as the input jobs, identical to
simulate_batch().
Concurrency¶
Control how many simulations run at once:
# Use all CPUs (default)
batch = await async_simulate_batch(jobs)
# Limit to 4 concurrent simulations
batch = await async_simulate_batch(jobs, max_concurrent=4)
# Sequential (useful for debugging)
batch = await async_simulate_batch(jobs, max_concurrent=1)
Default: min(len(jobs), os.cpu_count())
Streaming Progress¶
async_simulate_batch_stream() is an async generator that yields
SimulationEvent objects as each simulation completes — no callbacks needed:
import asyncio
from idfkit.simulation import async_simulate_batch_stream, SimulationJob
async def main():
jobs = [
SimulationJob(model=variant, weather="weather.epw", label=f"case-{i}") for i, variant in enumerate(variants)
]
async for event in async_simulate_batch_stream(jobs, max_concurrent=4):
status = "OK" if event.result.success else "FAIL"
print(f"[{event.completed}/{event.total}] {event.label}: {status}")
asyncio.run(main())
SimulationEvent¶
Each event contains:
| Attribute | Type | Description |
|---|---|---|
index |
int |
Position of this job in the original sequence |
label |
str |
Human-readable label from SimulationJob |
result |
SimulationResult |
The simulation result |
completed |
int |
Number of jobs completed so far |
total |
int |
Total number of jobs |
Events arrive in completion order, not submission order. Use event.index
to map back to the original job.
Early Termination¶
Breaking out of the stream cancels remaining simulations:
async for event in async_simulate_batch_stream(jobs, max_concurrent=4):
if not event.result.success:
print(f"Job {event.label} failed — aborting remaining")
break # Remaining tasks are cancelled automatically
Cancellation¶
Async tasks support cancellation natively. Cancelling a task kills the underlying EnergyPlus subprocess:
async def run_with_timeout():
task = asyncio.create_task(async_simulate(model, "weather.epw"))
try:
result = await asyncio.wait_for(task, timeout=120)
except asyncio.TimeoutError:
print("Simulation cancelled after 120s")
Parametric Study¶
Create model variants and analyze results — the async equivalent of the pattern shown in Batch Processing:
import asyncio
from idfkit.simulation import async_simulate_batch, SimulationJob
async def main():
# Create variants
jobs = []
for insulation in [0.05, 0.10, 0.15, 0.20]:
variant = model.copy()
variant["Material"]["Insulation"].thickness = insulation
jobs.append(
SimulationJob(
model=variant,
weather="weather.epw",
label=f"insulation-{insulation}m",
design_day=True,
)
)
# Run all variants
batch = await async_simulate_batch(jobs, max_concurrent=4)
# Analyze results
for job, result in zip(jobs, batch):
if result.success:
ts = result.sql.get_timeseries(
"Zone Mean Air Temperature",
"ZONE 1",
)
print(f"{job.label}: Max temp {max(ts.values):.1f}°C")
asyncio.run(main())
Running Simulations Alongside Other Async Work¶
A key benefit of the async API is running simulations concurrently with other I/O-bound tasks — database queries, HTTP requests, file uploads — without blocking:
import asyncio
from idfkit.simulation import async_simulate
async def fetch_weather_data(station_id: str) -> dict:
"""Fetch weather metadata from a remote API."""
...
async def main():
# Run a simulation and an API call concurrently
sim_task = async_simulate(model, "weather.epw", design_day=True)
api_task = fetch_weather_data("725300")
result, weather_meta = await asyncio.gather(sim_task, api_task)
print(f"Simulation: {result.runtime_seconds:.1f}s")
print(f"Weather station: {weather_meta}")
asyncio.run(main())
Collecting Streaming Results¶
The streaming API yields events in completion order. To collect and reorder results for analysis:
import asyncio
from idfkit.simulation import async_simulate_batch_stream, SimulationJob
async def main():
jobs = [
SimulationJob(model=variant, weather="weather.epw", label=f"case-{i}") for i, variant in enumerate(variants)
]
# Collect events and reorder by original index
results = [None] * len(jobs)
async for event in async_simulate_batch_stream(jobs, max_concurrent=4):
results[event.index] = event.result
pct = event.completed / event.total * 100
print(f"[{pct:3.0f}%] {event.label}: {'OK' if event.result.success else 'FAIL'}")
# Results are now in submission order
for i, result in enumerate(results):
if result.success:
ts = result.sql.get_timeseries("Zone Mean Air Temperature", "ZONE 1")
print(f"Case {i}: max temp {max(ts.values):.1f}°C")
asyncio.run(main())
Caching and Cloud Storage¶
All async functions accept the same cache and fs parameters as their
sync counterparts. For best results in async code, use an AsyncFileSystem
to avoid blocking the event loop during file uploads and result reads:
from idfkit.simulation import SimulationCache, AsyncLocalFileSystem
cache = SimulationCache()
fs = AsyncLocalFileSystem()
result = await async_simulate(
model, "weather.epw",
cache=cache,
output_dir="run-001",
fs=fs,
)
# Use async accessors to read results without blocking
errors = await result.async_errors()
sql = await result.async_sql()
For S3 storage, use AsyncS3FileSystem (requires pip install idfkit[async-s3]):
from idfkit.simulation import AsyncS3FileSystem, SimulationCache
cache = SimulationCache()
async with AsyncS3FileSystem(bucket="my-bucket", prefix="study/") as fs:
result = await async_simulate(
model, "weather.epw",
cache=cache,
output_dir="run-001",
fs=fs,
)
A sync FileSystem (e.g., S3FileSystem) is still accepted — the upload
step is automatically wrapped in asyncio.to_thread() so it doesn't block
the event loop. However, using AsyncFileSystem is recommended because it
provides true non-blocking I/O for both uploads and result reads (via the
async_errors(), async_sql(), etc. accessors), and avoids thread-pool
overhead.
FastAPI Integration¶
A minimal example of an async simulation endpoint:
from fastapi import FastAPI
from idfkit import load_idf
from idfkit.simulation import async_simulate
app = FastAPI()
@app.post("/simulate")
async def run_simulation(idf_path: str, weather_path: str):
model = load_idf(idf_path)
result = await async_simulate(model, weather_path, design_day=True)
return {
"success": result.success,
"runtime": result.runtime_seconds,
"errors": result.errors.summary(),
}
Because async_simulate doesn't block the event loop, the server remains
responsive to other requests while EnergyPlus runs.
Preprocessing¶
When expand_objects=True (the default), the Slab and Basement
preprocessors run in a background thread via asyncio.to_thread() so they
don't block the event loop. This is transparent — no user action required.
Error Handling¶
Error handling is identical to the sync API:
from idfkit.exceptions import SimulationError
try:
result = await async_simulate(model, weather, timeout=60)
except SimulationError as e:
if e.exit_code is None:
print("Simulation timed out")
else:
print(f"Failed: {e}")
In batch mode, individual failures are captured in the results — the batch never raises due to a single job failing.
See Also¶
- Running Simulations — Sync simulation guide
- Batch Processing — Sync batch guide
- Simulation Architecture — Design decisions