Skip to content

Cloud & Remote Storage

idfkit's simulation module supports pluggable storage backends through the FileSystem protocol, enabling cloud-native workflows with S3 and other storage systems.

The FileSystem Protocol

The FileSystem protocol defines a minimal interface for file operations:

class FileSystem(Protocol):
    def read_bytes(self, path: str | Path) -> bytes: ...
    def write_bytes(self, path: str | Path, data: bytes) -> None: ...
    def read_text(self, path: str | Path, encoding: str = "utf-8") -> str: ...
    def write_text(self, path: str | Path, text: str, encoding: str = "utf-8") -> None: ...
    def exists(self, path: str | Path) -> bool: ...
    def makedirs(self, path: str | Path, *, exist_ok: bool = False) -> None: ...
    def copy(self, src: str | Path, dst: str | Path) -> None: ...
    def glob(self, path: str | Path, pattern: str) -> list[str]: ...
    def remove(self, path: str | Path) -> None: ...

Built-in Implementations

LocalFileSystem

The default backend, wrapping pathlib.Path operations:

from idfkit.simulation import LocalFileSystem

fs = LocalFileSystem()  # This is the default
result = simulate(model, weather)  # Implicitly uses LocalFileSystem

S3FileSystem

Amazon S3 backend for cloud workflows:

from idfkit.simulation import S3FileSystem

fs = S3FileSystem(
    bucket="my-simulations",
    prefix="batch-42/",
)

result = simulate(model, weather, output_dir="run-001", fs=fs)

Requires the boto3 package: pip install idfkit[s3]

Cloud Workflow Pattern

For cloud-based parametric simulations (AWS Batch, Kubernetes, etc.), the typical workflow is:

1. Local Preparation

Create simulation jobs with S3 output paths:

from idfkit.simulation import SimulationJob, S3FileSystem

fs = S3FileSystem(bucket="simulations", prefix="study-001/")

jobs = [
    SimulationJob(
        model=variant,
        weather="weather.epw",
        label=f"case-{i}",
        output_dir=f"case-{i}",
        fs=fs,
    )
    for i, variant in enumerate(variants)
]

2. Cloud Execution

Workers run simulations locally, results upload to S3:

# In your AWS Batch / Kubernetes job:
from idfkit.simulation import simulate, S3FileSystem

fs = S3FileSystem(bucket="simulations", prefix="study-001/")
result = simulate(model, weather, output_dir="case-42", fs=fs)

# Result files are now in s3://simulations/study-001/case-42/

3. Result Collection

Retrieve results from S3 from any machine:

from idfkit.simulation import SimulationResult, S3FileSystem

fs = S3FileSystem(bucket="simulations", prefix="study-001/")

# Reconstruct result from S3
result = SimulationResult.from_directory("case-42", fs=fs)

# Query data (transparently reads from S3)
ts = result.sql.get_timeseries("Zone Mean Air Temperature", "ZONE 1")

S3 Configuration

Authentication

S3FileSystem uses boto3's credential chain:

  1. Explicit credentials in constructor
  2. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  3. IAM role (on EC2/ECS/Lambda)
  4. Shared credentials file (~/.aws/credentials)
# IAM role (recommended for cloud)
fs = S3FileSystem(bucket="my-bucket")

# Explicit credentials (for testing)
fs = S3FileSystem(
    bucket="my-bucket",
    aws_access_key_id="AKIA...",
    aws_secret_access_key="...",
)

S3-Compatible Services

Works with MinIO, LocalStack, and other S3-compatible services:

# MinIO
fs = S3FileSystem(
    bucket="local-bucket",
    endpoint_url="http://localhost:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
)

# LocalStack
fs = S3FileSystem(
    bucket="test-bucket",
    endpoint_url="http://localhost:4566",
    region_name="us-east-1",
)

Key Prefixes

Use prefixes to namespace simulations:

# All files stored under "project-x/batch-42/"
fs = S3FileSystem(
    bucket="simulations",
    prefix="project-x/batch-42/",
)

# output_dir="run-001" → s3://simulations/project-x/batch-42/run-001/

Implementing Custom Backends

Implement the FileSystem protocol for other storage systems:

class AzureBlobFileSystem:
    """Azure Blob Storage backend."""

    def __init__(self, container: str, connection_string: str):
        from azure.storage.blob import ContainerClient

        self._client = ContainerClient.from_connection_string(connection_string, container)

    def read_bytes(self, path: str | Path) -> bytes:
        blob = self._client.get_blob_client(str(path))
        return blob.download_blob().readall()

    def write_bytes(self, path: str | Path, data: bytes) -> None:
        blob = self._client.get_blob_client(str(path))
        blob.upload_blob(data, overwrite=True)

    # ... implement remaining methods

Async File System

For use with async_simulate() and the async batch functions, an AsyncFileSystem protocol is available. This avoids blocking the event loop during file uploads and result reads — important for network-backed storage like S3.

Built-in: AsyncLocalFileSystem

Wraps LocalFileSystem via asyncio.to_thread():

from idfkit.simulation import AsyncLocalFileSystem, async_simulate

fs = AsyncLocalFileSystem()
result = await async_simulate(
    model, "weather.epw",
    output_dir="run-001",
    fs=fs,
)

# Non-blocking result access
errors = await result.async_errors()
sql = await result.async_sql()

Built-in: AsyncS3FileSystem

Non-blocking S3 backend powered by aiobotocore:

from idfkit.simulation import AsyncS3FileSystem, async_simulate

async with AsyncS3FileSystem(bucket="my-bucket", prefix="sims/") as fs:
    result = await async_simulate(
        model, "weather.epw",
        output_dir="run-001",
        fs=fs,
    )
    errors = await result.async_errors()

Requires: pip install idfkit[async-s3]

The AsyncS3FileSystem must be used as an async context manager (async with) which manages the underlying aiobotocore client lifecycle. It accepts the same **boto_kwargs as S3FileSystem (e.g., region_name, endpoint_url, explicit credentials).

S3-compatible services (MinIO, LocalStack) work identically:

async with AsyncS3FileSystem(
    bucket="local-bucket",
    endpoint_url="http://localhost:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
) as fs:
    ...

Custom Async Backend

Implement the AsyncFileSystem protocol for other storage systems (Azure Blob Storage, GCS, etc.):

from pathlib import Path

from idfkit.simulation import AsyncFileSystem


class AsyncGCSFileSystem:
    """Example async GCS backend — implements AsyncFileSystem."""

    async def read_bytes(self, path: str | Path) -> bytes:
        ...

    async def write_bytes(self, path: str | Path, data: bytes) -> None:
        ...

    async def read_text(self, path: str | Path, encoding: str = "utf-8") -> str:
        return (await self.read_bytes(path)).decode(encoding)

    async def write_text(self, path: str | Path, text: str, encoding: str = "utf-8") -> None:
        await self.write_bytes(path, text.encode(encoding))

    async def exists(self, path: str | Path) -> bool:
        ...

    async def makedirs(self, path: str | Path, *, exist_ok: bool = False) -> None:
        ...

    async def copy(self, src: str | Path, dst: str | Path) -> None:
        ...

    async def glob(self, path: str | Path, pattern: str) -> list[str]:
        ...

    async def remove(self, path: str | Path) -> None:
        ...

Backward Compatibility

A sync FileSystem passed to async_simulate() is automatically wrapped in asyncio.to_thread() for the upload step, so existing code continues to work without changes. However, using AsyncFileSystem avoids the thread-pool overhead and provides true non-blocking I/O.

EnergyPlus Execution

Important: EnergyPlus always runs locally. The FileSystem abstraction covers:

  • Pre-simulation: Preparing run directory
  • Post-simulation: Uploading results
  • Result reading: Downloading files on demand

The actual simulation happens in a local temporary directory, then results are copied to the configured FileSystem.

Performance Considerations

Lazy Loading

Result files are read on-demand, so only accessed data is downloaded:

result = SimulationResult.from_directory("run-001", fs=s3_fs)

# Nothing downloaded yet
# ...

# Downloads only the SQLite file
ts = result.sql.get_timeseries(...)

Local Caching

For repeated access, consider downloading to local disk:

import tempfile

# Download entire result directory
with tempfile.TemporaryDirectory() as tmp:
    # Copy from S3 to local
    for path in s3_fs.glob("run-001", "*"):
        data = s3_fs.read_bytes(path)
        local_path = Path(tmp) / Path(path).name
        local_path.write_bytes(data)

    # Use local result
    result = SimulationResult.from_directory(tmp)
    # Multiple queries without network calls

See Also