Skip to content

Async API

Non-blocking simulation execution using asyncio.

async_simulate

idfkit.simulation.async_runner.async_simulate(model, weather, *, output_dir=None, energyplus=None, expand_objects=True, annual=False, design_day=False, output_prefix='eplus', output_suffix='C', readvars=False, timeout=3600.0, extra_args=None, cache=None, fs=None, on_progress=None) async

Run an EnergyPlus simulation without blocking the event loop.

This is the async counterpart to simulate. All parameters and return values are identical; the only difference is that EnergyPlus runs as an asyncio subprocess, allowing the caller to await the result while other coroutines continue executing.

Cancellation is supported: if the wrapping asyncio.Task is cancelled, the EnergyPlus subprocess is killed and cleaned up.

Parameters:

Name Type Description Default
model IDFDocument

The EnergyPlus model to simulate.

required
weather str | Path

Path to the weather file (.epw).

required
output_dir str | Path | None

Directory for output files (default: auto temp dir).

None
energyplus EnergyPlusConfig | None

Pre-configured EnergyPlus installation. If None, uses find_energyplus for auto-discovery.

None
expand_objects bool

Run ExpandObjects before simulation. When True, also runs the Slab and Basement ground heat-transfer preprocessors if the model contains the corresponding objects.

True
annual bool

Run annual simulation (-a flag).

False
design_day bool

Run design-day-only simulation (-D flag).

False
output_prefix str

Prefix for output files (default "eplus").

'eplus'
output_suffix Literal['C', 'L', 'D']

Output file naming suffix: "C" for combined table files (default), "L" for legacy separate table files, or "D" for timestamped separate files.

'C'
readvars bool

Run ReadVarsESO after simulation (-r flag).

False
timeout float

Maximum runtime in seconds (default 3600).

3600.0
extra_args list[str] | None

Additional command-line arguments.

None
cache SimulationCache | None

Optional simulation cache for content-hash lookups.

None
fs FileSystem | AsyncFileSystem | None

Optional file system backend for storing results on remote storage (e.g., S3). Both sync FileSystem and async AsyncFileSystem are accepted. When an AsyncFileSystem is provided, uploads and result reads are truly non-blocking. A sync FileSystem is automatically wrapped in asyncio.to_thread to avoid blocking the event loop.

None
on_progress Callable[[SimulationProgress], Any] | Literal['tqdm'] | None

Optional callback invoked with a SimulationProgress event each time EnergyPlus emits a progress line. Both synchronous and async callables are accepted -- async callables are awaited. Pass "tqdm" to use a built-in tqdm progress bar (requires pip install idfkit[progress]).

None

Returns:

Type Description
SimulationResult

SimulationResult with paths to output files.

Raises:

Type Description
SimulationError

On timeout, OS error, or missing weather file.

ExpandObjectsError

If a preprocessing step fails.

EnergyPlusNotFoundError

If EnergyPlus cannot be found.

Source code in src/idfkit/simulation/async_runner.py
async def async_simulate(
    model: IDFDocument,
    weather: str | Path,
    *,
    output_dir: str | Path | None = None,
    energyplus: EnergyPlusConfig | None = None,
    expand_objects: bool = True,
    annual: bool = False,
    design_day: bool = False,
    output_prefix: str = "eplus",
    output_suffix: Literal["C", "L", "D"] = "C",
    readvars: bool = False,
    timeout: float = 3600.0,
    extra_args: list[str] | None = None,
    cache: SimulationCache | None = None,
    fs: FileSystem | AsyncFileSystem | None = None,
    on_progress: Callable[[SimulationProgress], Any] | Literal["tqdm"] | None = None,
) -> SimulationResult:
    """Run an EnergyPlus simulation without blocking the event loop.

    This is the async counterpart to [simulate][idfkit.simulation.runner.simulate].
    All parameters and return values are identical; the only difference is that
    EnergyPlus runs as an [asyncio][] subprocess, allowing the caller to
    ``await`` the result while other coroutines continue executing.

    Cancellation is supported: if the wrapping [asyncio.Task][] is
    cancelled, the EnergyPlus subprocess is killed and cleaned up.

    Args:
        model: The EnergyPlus model to simulate.
        weather: Path to the weather file (.epw).
        output_dir: Directory for output files (default: auto temp dir).
        energyplus: Pre-configured EnergyPlus installation. If None,
            uses [find_energyplus][idfkit.simulation.config.find_energyplus] for auto-discovery.
        expand_objects: Run ExpandObjects before simulation.  When
            ``True``, also runs the Slab and Basement ground heat-transfer
            preprocessors if the model contains the corresponding objects.
        annual: Run annual simulation (``-a`` flag).
        design_day: Run design-day-only simulation (``-D`` flag).
        output_prefix: Prefix for output files (default "eplus").
        output_suffix: Output file naming suffix: ``"C"`` for combined table
            files (default), ``"L"`` for legacy separate table files, or
            ``"D"`` for timestamped separate files.
        readvars: Run ReadVarsESO after simulation (``-r`` flag).
        timeout: Maximum runtime in seconds (default 3600).
        extra_args: Additional command-line arguments.
        cache: Optional simulation cache for content-hash lookups.
        fs: Optional file system backend for storing results on remote
            storage (e.g., S3).  Both sync [FileSystem][idfkit.simulation.fs.FileSystem]
            and async [AsyncFileSystem][idfkit.simulation.fs.AsyncFileSystem] are accepted.
            When an ``AsyncFileSystem`` is provided, uploads and result reads
            are truly non-blocking.  A sync ``FileSystem`` is automatically
            wrapped in [asyncio.to_thread][] to avoid blocking the event
            loop.
        on_progress: Optional callback invoked with a
            [SimulationProgress][idfkit.simulation.progress.SimulationProgress] event
            each time EnergyPlus emits a progress line.  Both synchronous
            and async callables are accepted -- async callables are awaited.
            Pass ``"tqdm"`` to use a built-in tqdm progress bar (requires
            ``pip install idfkit[progress]``).

    Returns:
        SimulationResult with paths to output files.

    Raises:
        SimulationError: On timeout, OS error, or missing weather file.
        ExpandObjectsError: If a preprocessing step fails.
        EnergyPlusNotFoundError: If EnergyPlus cannot be found.
    """
    if fs is not None and output_dir is None:
        msg = "output_dir is required when using a file system backend"
        raise ValueError(msg)

    progress_cb, progress_cleanup = resolve_on_progress(on_progress)

    try:
        config = resolve_config(energyplus)
        weather_path = Path(weather).resolve()

        if not weather_path.is_file():
            msg = f"Weather file not found: {weather_path}"
            raise SimulationError(msg)

        logger.info("Starting async simulation with weather %s", weather_path.name)

        cache_key: CacheKey | None = None
        if cache is not None:
            cache_key = cache.compute_key(
                model,
                weather_path,
                expand_objects=expand_objects,
                annual=annual,
                design_day=design_day,
                output_suffix=output_suffix,
                readvars=readvars,
                extra_args=extra_args,
            )
            cached = cache.get(cache_key)
            if cached is not None:
                return cached

        # Copy model to avoid mutation
        sim_model = model.copy()
        ensure_sql_output(sim_model)

        # Preprocessing may invoke subprocesses synchronously — delegate to a
        # thread so we don't block the event loop.
        sim_model, ep_expand = await asyncio.to_thread(
            maybe_preprocess, model, sim_model, config, weather_path, expand_objects
        )

        # When using a remote fs, always run locally in a temp dir
        local_output_dir = None if fs is not None else output_dir
        run_dir = prepare_run_directory(local_output_dir, weather_path)
        idf_path = run_dir / "model.idf"

        from ..writers import write_idf

        write_idf(sim_model, idf_path)

        cmd = build_command(
            config=config,
            idf_path=idf_path,
            weather_path=run_dir / weather_path.name,
            output_dir=run_dir,
            output_prefix=output_prefix,
            output_suffix=output_suffix,
            expand_objects=ep_expand,
            annual=annual,
            design_day=design_day,
            readvars=readvars,
            extra_args=extra_args,
        )

        start = time.monotonic()

        if progress_cb is not None:
            stdout, stderr, returncode = await _run_with_progress(cmd, run_dir, timeout, progress_cb)
        else:
            stdout, stderr, returncode = await _run_simple(cmd, run_dir, timeout)
    finally:
        if progress_cleanup is not None:
            progress_cleanup()

    elapsed = time.monotonic() - start

    logger.info("Async simulation finished (exit_code=%d) in %.1fs", returncode, elapsed)

    if fs is not None:
        remote_dir = Path(str(output_dir))
        if _is_async_fs(fs):
            await async_upload_results(run_dir, remote_dir, fs)  # type: ignore[arg-type]
            result = SimulationResult(
                run_dir=remote_dir,
                success=returncode == 0,
                exit_code=returncode,
                stdout=stdout,
                stderr=stderr,
                runtime_seconds=elapsed,
                output_prefix=output_prefix,
                async_fs=fs,  # type: ignore[arg-type]
            )
        else:
            await asyncio.to_thread(upload_results, run_dir, remote_dir, fs)  # type: ignore[arg-type]
            result = SimulationResult(
                run_dir=remote_dir,
                success=returncode == 0,
                exit_code=returncode,
                stdout=stdout,
                stderr=stderr,
                runtime_seconds=elapsed,
                output_prefix=output_prefix,
                fs=fs,  # type: ignore[arg-type]
            )
    else:
        result = SimulationResult(
            run_dir=run_dir,
            success=returncode == 0,
            exit_code=returncode,
            stdout=stdout,
            stderr=stderr,
            runtime_seconds=elapsed,
            output_prefix=output_prefix,
        )
    if cache is not None and cache_key is not None and result.success:
        cache.put(cache_key, result)
    return result

async_simulate_batch

idfkit.simulation.async_batch.async_simulate_batch(jobs, *, energyplus=None, max_concurrent=None, cache=None, fs=None, on_progress=None) async

Run multiple EnergyPlus simulations concurrently using asyncio.

This is the async counterpart to simulate_batch. Concurrency is controlled with an asyncio.Semaphore instead of a thread pool.

Individual job failures are captured as failed SimulationResult entries -- the batch never raises due to a single job failing.

Parameters:

Name Type Description Default
jobs Sequence[SimulationJob]

Sequence of simulation jobs to execute.

required
energyplus EnergyPlusConfig | None

Shared EnergyPlus configuration (auto-discovered if None).

None
max_concurrent int | None

Maximum number of concurrent simulations. Defaults to min(len(jobs), os.cpu_count() or 1).

None
cache SimulationCache | None

Optional simulation cache for content-hash lookups.

None
fs FileSystem | AsyncFileSystem | None

Optional file system backend passed through to each async_simulate call.

None
on_progress Callable[[SimulationProgress], Any] | None

Optional callback invoked with SimulationProgress events during each individual simulation. Events include job_index and job_label to identify which batch job they belong to. Both sync and async callables are accepted. The "tqdm" shorthand is not supported for batch runners; use tqdm_progress with a custom per-job callback instead.

None

Returns:

Type Description
BatchResult

A BatchResult with results in the

BatchResult

same order as jobs.

Raises:

Type Description
ValueError

If jobs is empty.

Source code in src/idfkit/simulation/async_batch.py
async def async_simulate_batch(
    jobs: Sequence[SimulationJob],
    *,
    energyplus: EnergyPlusConfig | None = None,
    max_concurrent: int | None = None,
    cache: SimulationCache | None = None,
    fs: FileSystem | AsyncFileSystem | None = None,
    on_progress: Callable[[SimulationProgress], Any] | None = None,
) -> BatchResult:
    """Run multiple EnergyPlus simulations concurrently using asyncio.

    This is the async counterpart to
    [simulate_batch][idfkit.simulation.batch.simulate_batch].  Concurrency is
    controlled with an [asyncio.Semaphore][] instead of a thread pool.

    Individual job failures are captured as failed
    [SimulationResult][idfkit.simulation.result.SimulationResult] entries -- the batch
    never raises due to a single job failing.

    Args:
        jobs: Sequence of simulation jobs to execute.
        energyplus: Shared EnergyPlus configuration (auto-discovered if
            ``None``).
        max_concurrent: Maximum number of concurrent simulations.  Defaults
            to ``min(len(jobs), os.cpu_count() or 1)``.
        cache: Optional simulation cache for content-hash lookups.
        fs: Optional file system backend passed through to each
            [async_simulate][idfkit.simulation.async_runner.async_simulate] call.
        on_progress: Optional callback invoked with
            [SimulationProgress][idfkit.simulation.progress.SimulationProgress] events
            during each individual simulation.  Events include
            ``job_index`` and ``job_label`` to identify which batch job
            they belong to.  Both sync and async callables are accepted.
            The ``"tqdm"`` shorthand is not supported for batch runners;
            use [tqdm_progress][idfkit.simulation.progress_bars.tqdm_progress]
            with a custom per-job callback instead.

    Returns:
        A [BatchResult][idfkit.simulation.batch.BatchResult] with results in the
        same order as *jobs*.

    Raises:
        ValueError: If *jobs* is empty.
    """
    if not jobs:
        msg = "jobs must not be empty"
        raise ValueError(msg)

    if on_progress == "tqdm":
        msg = (
            'on_progress="tqdm" is not supported for batch simulations because a single '
            "progress bar cannot represent multiple concurrent jobs. Use the tqdm_progress() "
            "context manager with a custom callback instead."
        )
        raise ValueError(msg)

    progress_cb, progress_cleanup = resolve_on_progress(on_progress)

    try:
        if max_concurrent is None:
            max_concurrent = min(len(jobs), os.cpu_count() or 1)

        logger.info("Starting async batch of %d jobs with concurrency %d", len(jobs), max_concurrent)

        semaphore = asyncio.Semaphore(max_concurrent)
        results: list[SimulationResult | None] = [None] * len(jobs)
        start = time.monotonic()

        async def _run_one(idx: int, job: SimulationJob) -> None:
            async with semaphore:
                results[idx] = await _async_run_job(idx, job, energyplus, cache, fs, progress_cb)

        tasks = [asyncio.create_task(_run_one(i, job)) for i, job in enumerate(jobs)]
        await asyncio.gather(*tasks)
    finally:
        if progress_cleanup is not None:
            progress_cleanup()

    elapsed = time.monotonic() - start

    final: list[SimulationResult] = []
    for r in results:
        assert r is not None  # noqa: S101
        final.append(r)

    batch_result = BatchResult(results=tuple(final), total_runtime_seconds=elapsed)
    logger.info(
        "Async batch complete: %d succeeded, %d failed in %.1fs",
        len(batch_result.succeeded),
        len(batch_result.failed),
        elapsed,
    )
    return batch_result

async_simulate_batch_stream

idfkit.simulation.async_batch.async_simulate_batch_stream(jobs, *, energyplus=None, max_concurrent=None, cache=None, fs=None, on_progress=None) async

Run simulations concurrently, yielding events as each one completes.

This is an async generator variant of async_simulate_batch that yields SimulationEvent objects in completion order. This enables real-time progress reporting without needing a callback:

.. code-block:: python

async for event in async_simulate_batch_stream(jobs, max_concurrent=4):
    print(f"[{event.completed}/{event.total}] {event.label}")

Parameters:

Name Type Description Default
jobs Sequence[SimulationJob]

Sequence of simulation jobs to execute.

required
energyplus EnergyPlusConfig | None

Shared EnergyPlus configuration (auto-discovered if None).

None
max_concurrent int | None

Maximum number of concurrent simulations. Defaults to min(len(jobs), os.cpu_count() or 1).

None
cache SimulationCache | None

Optional simulation cache for content-hash lookups.

None
fs FileSystem | AsyncFileSystem | None

Optional file system backend.

None
on_progress Callable[[SimulationProgress], Any] | None

Optional callback invoked with SimulationProgress events during each individual simulation. Events include job_index and job_label. The "tqdm" shorthand is not supported for batch runners; use tqdm_progress with a custom per-job callback instead.

None

Yields:

Type Description
AsyncIterator[SimulationEvent]

SimulationEvent for each completed simulation, in the order

AsyncIterator[SimulationEvent]

they finish.

Raises:

Type Description
ValueError

If jobs is empty.

Source code in src/idfkit/simulation/async_batch.py
async def async_simulate_batch_stream(
    jobs: Sequence[SimulationJob],
    *,
    energyplus: EnergyPlusConfig | None = None,
    max_concurrent: int | None = None,
    cache: SimulationCache | None = None,
    fs: FileSystem | AsyncFileSystem | None = None,
    on_progress: Callable[[SimulationProgress], Any] | None = None,
) -> AsyncIterator[SimulationEvent]:
    """Run simulations concurrently, yielding events as each one completes.

    This is an async generator variant of [async_simulate_batch][idfkit.simulation.async_batch.async_simulate_batch] that
    yields [SimulationEvent][idfkit.simulation.async_batch.SimulationEvent] objects in *completion order*.  This
    enables real-time progress reporting without needing a callback:

    .. code-block:: python

        async for event in async_simulate_batch_stream(jobs, max_concurrent=4):
            print(f"[{event.completed}/{event.total}] {event.label}")

    Args:
        jobs: Sequence of simulation jobs to execute.
        energyplus: Shared EnergyPlus configuration (auto-discovered if
            ``None``).
        max_concurrent: Maximum number of concurrent simulations.  Defaults
            to ``min(len(jobs), os.cpu_count() or 1)``.
        cache: Optional simulation cache for content-hash lookups.
        fs: Optional file system backend.
        on_progress: Optional callback invoked with
            [SimulationProgress][idfkit.simulation.progress.SimulationProgress] events
            during each individual simulation.  Events include
            ``job_index`` and ``job_label``.  The ``"tqdm"`` shorthand
            is not supported for batch runners; use
            [tqdm_progress][idfkit.simulation.progress_bars.tqdm_progress]
            with a custom per-job callback instead.

    Yields:
        [SimulationEvent][idfkit.simulation.async_batch.SimulationEvent] for each completed simulation, in the order
        they finish.

    Raises:
        ValueError: If *jobs* is empty.
    """
    if not jobs:
        msg = "jobs must not be empty"
        raise ValueError(msg)

    if on_progress == "tqdm":
        msg = (
            'on_progress="tqdm" is not supported for batch simulations because a single '
            "progress bar cannot represent multiple concurrent jobs. Use the tqdm_progress() "
            "context manager with a custom callback instead."
        )
        raise ValueError(msg)

    progress_cb, progress_cleanup = resolve_on_progress(on_progress)

    if max_concurrent is None:
        max_concurrent = min(len(jobs), os.cpu_count() or 1)

    semaphore = asyncio.Semaphore(max_concurrent)
    total = len(jobs)
    queue: asyncio.Queue[SimulationEvent] = asyncio.Queue()
    completed_count = 0

    async def _run_one(idx: int, job: SimulationJob) -> None:
        nonlocal completed_count
        async with semaphore:
            result = await _async_run_job(idx, job, energyplus, cache, fs, progress_cb)
        completed_count += 1
        await queue.put(
            SimulationEvent(
                index=idx,
                label=job.label,
                result=result,
                completed=completed_count,
                total=total,
            )
        )

    tasks = [asyncio.create_task(_run_one(i, job)) for i, job in enumerate(jobs)]

    try:
        for _ in range(total):
            event = await queue.get()
            yield event
    finally:
        if progress_cleanup is not None:
            progress_cleanup()
        # If the consumer breaks out early, cancel remaining tasks.
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)

SimulationEvent

idfkit.simulation.async_batch.SimulationEvent dataclass

Progress event emitted by async_simulate_batch_stream.

Each event represents a single simulation that has finished (successfully or not). Events are yielded in completion order, not submission order.

Attributes:

Name Type Description
index int

Zero-based position of this job in the original jobs sequence.

label str

Human-readable label from the SimulationJob.

result SimulationResult

The simulation result.

completed int

Number of jobs completed so far (including this one).

total int

Total number of jobs in the batch.

Source code in src/idfkit/simulation/async_batch.py
@dataclass(frozen=True, slots=True)
class SimulationEvent:
    """Progress event emitted by [async_simulate_batch_stream][idfkit.simulation.async_batch.async_simulate_batch_stream].

    Each event represents a single simulation that has finished (successfully
    or not).  Events are yielded in *completion order*, not submission order.

    Attributes:
        index: Zero-based position of this job in the original *jobs* sequence.
        label: Human-readable label from the [SimulationJob][idfkit.simulation.batch.SimulationJob].
        result: The simulation result.
        completed: Number of jobs completed so far (including this one).
        total: Total number of jobs in the batch.
    """

    index: int
    label: str
    result: SimulationResult
    completed: int
    total: int

index instance-attribute

label instance-attribute

result instance-attribute

completed instance-attribute

total instance-attribute