Skip to content

Migration API

Forward-migrate IDF models across EnergyPlus versions by orchestrating the Transition-VX-to-VY binaries shipped in PreProcess/IDFVersionUpdater.

migrate

idfkit.migration.runner.migrate(model, target_version, *, energyplus=None, migrator=None, on_progress=None, work_dir=None, keep_work_dir=False)

Forward-migrate model to target_version through EnergyPlus transition binaries.

Plans the chain of transition steps between model.version and target_version, runs each one in sequence via the supplied migrator (defaulting to a SubprocessMigrator rooted at the EnergyPlus installation's PreProcess/IDFVersionUpdater directory), re-parses the final IDF into a fresh IDFDocument, and returns a structured MigrationReport.

The input model is never mutated.

Parameters:

Name Type Description Default
model IDFDocument

The document to migrate. Its version defines the source.

required
target_version tuple[int, int, int] | str

Either a (major, minor, patch) tuple or a dotted string (e.g. "25.2.0").

required
energyplus EnergyPlusConfig | None

Pre-configured EnergyPlus installation. Ignored when migrator is provided. When neither is supplied, find_energyplus() is used to discover an installation automatically.

None
migrator Migrator | None

Custom backend implementing Migrator. Overrides energyplus when provided.

None
on_progress OnMigrationProgress | None

Optional callback invoked with MigrationProgress events on every phase transition.

None
work_dir str | Path | None

Directory in which to stage per-step work subdirectories. Defaults to a newly-created temporary directory. The directory is removed after migration unless keep_work_dir is True.

None
keep_work_dir bool

Set True to preserve intermediate files for inspection.

False

Returns:

Type Description
MigrationReport

A MigrationReport whose

MigrationReport

migrated_model is None only for a no-op migration (source

MigrationReport

equals target) — in that case the caller's original model is the

MigrationReport

result.

Raises:

Type Description
UnsupportedVersionError

If either the source or target version is not in ENERGYPLUS_VERSIONS.

ValueError

If target_version is older than model.version (backward migration is not supported).

MigrationError

If any transition step fails. The exception's completed_steps attribute records the prefix of steps that succeeded; inspect it to recover partial progress.

EnergyPlusNotFoundError

If migrator is None and no EnergyPlus installation can be discovered.

Source code in src/idfkit/migration/runner.py
def migrate(
    model: IDFDocument,
    target_version: tuple[int, int, int] | str,
    *,
    energyplus: EnergyPlusConfig | None = None,
    migrator: Migrator | None = None,
    on_progress: OnMigrationProgress | None = None,
    work_dir: str | Path | None = None,
    keep_work_dir: bool = False,
) -> MigrationReport:
    """Forward-migrate *model* to *target_version* through EnergyPlus transition binaries.

    Plans the chain of transition steps between ``model.version`` and
    *target_version*, runs each one in sequence via the supplied *migrator*
    (defaulting to a [SubprocessMigrator][idfkit.migration.subprocess_backend.SubprocessMigrator]
    rooted at the EnergyPlus installation's ``PreProcess/IDFVersionUpdater``
    directory), re-parses the final IDF into a fresh
    [IDFDocument][idfkit.document.IDFDocument], and returns a structured
    [MigrationReport][idfkit.migration.report.MigrationReport].

    The input *model* is never mutated.

    Args:
        model: The document to migrate. Its ``version`` defines the source.
        target_version: Either a ``(major, minor, patch)`` tuple or a dotted
            string (e.g. ``"25.2.0"``).
        energyplus: Pre-configured EnergyPlus installation. Ignored when
            *migrator* is provided. When neither is supplied,
            [find_energyplus()][idfkit.simulation.config.find_energyplus] is used
            to discover an installation automatically.
        migrator: Custom backend implementing
            [Migrator][idfkit.migration.protocol.Migrator]. Overrides
            *energyplus* when provided.
        on_progress: Optional callback invoked with
            [MigrationProgress][idfkit.migration.progress.MigrationProgress]
            events on every phase transition.
        work_dir: Directory in which to stage per-step work subdirectories.
            Defaults to a newly-created temporary directory. The directory is
            removed after migration unless *keep_work_dir* is ``True``.
        keep_work_dir: Set ``True`` to preserve intermediate files for
            inspection.

    Returns:
        A [MigrationReport][idfkit.migration.report.MigrationReport] whose
        ``migrated_model`` is ``None`` only for a no-op migration (source
        equals target) — in that case the caller's original *model* is the
        result.

    Raises:
        idfkit.exceptions.UnsupportedVersionError: If either the source or
            target version is not in
            `ENERGYPLUS_VERSIONS`.
        ValueError: If *target_version* is older than ``model.version``
            (backward migration is not supported).
        idfkit.exceptions.MigrationError: If any transition step fails. The
            exception's ``completed_steps`` attribute records the prefix of
            steps that succeeded; inspect it to recover partial progress.
        idfkit.exceptions.EnergyPlusNotFoundError: If *migrator* is ``None``
            and no EnergyPlus installation can be discovered.
    """
    target = normalize_target(target_version)
    source = model.version

    _emit(on_progress, MigrationProgress(phase="planning", message="Planning migration chain"))
    chain = plan_migration_chain(source, target)

    if not chain:
        _emit(
            on_progress,
            MigrationProgress(phase="complete", message="Source equals target; nothing to do", percent=100.0),
        )
        return MigrationReport(
            migrated_model=None,
            source_version=source,
            target_version=target,
            requested_target=target,
            steps=(),
            diff=MigrationDiff(),
        )

    resolved_migrator = _resolve_migrator(migrator, energyplus)
    total_steps = len(chain)
    _emit(
        on_progress,
        MigrationProgress(
            phase="preparing",
            message=f"Running {total_steps} transition step{'s' if total_steps != 1 else ''}",
            total_steps=total_steps,
        ),
    )

    owned_work_dir = work_dir is None
    root = Path(tempfile.mkdtemp(prefix="idfkit_migrate_")) if owned_work_dir else Path(work_dir).resolve()
    if not owned_work_dir:
        root.mkdir(parents=True, exist_ok=True)

    try:
        report = _run_chain(
            model=model,
            chain=chain,
            migrator=resolved_migrator,
            work_root=root,
            on_progress=on_progress,
            source=source,
            target=target,
        )
    finally:
        if owned_work_dir and not keep_work_dir:
            shutil.rmtree(root, ignore_errors=True)

    return report

async_migrate

idfkit.migration.async_runner.async_migrate(model, target_version, *, energyplus=None, migrator=None, on_progress=None, work_dir=None, keep_work_dir=False) async

Async counterpart to migrate().

Same semantics as the sync version; the only difference is that each transition step is driven via asyncio.create_subprocess_exec (or a caller-supplied AsyncMigrator), and progress callbacks may be async.

Parameters:

Name Type Description Default
model IDFDocument

The document to migrate.

required
target_version tuple[int, int, int] | str

Target version (tuple or dotted string).

required
energyplus EnergyPlusConfig | None

EnergyPlus installation. Ignored when migrator is set.

None
migrator AsyncMigrator | None

Optional AsyncMigrator. When None, defaults to an AsyncSubprocessMigrator rooted at the installed EnergyPlus's IDFVersionUpdater directory.

None
on_progress OnMigrationProgress | None

Optional sync or async callback receiving MigrationProgress events.

None
work_dir str | Path | None

Directory in which to stage per-step working directories. Defaults to a newly-created temporary directory that is removed after the migration unless keep_work_dir is True.

None
keep_work_dir bool

Preserve intermediate files for inspection.

False

Returns:

Type Description
MigrationReport

Raises:

Type Description
UnsupportedVersionError

For an unsupported version.

ValueError

If the target is older than model.version.

MigrationError

On any transition-step failure; the exception's completed_steps records the prefix that succeeded.

EnergyPlusNotFoundError

If migrator is None and no EnergyPlus installation can be discovered.

Source code in src/idfkit/migration/async_runner.py
async def async_migrate(
    model: IDFDocument,
    target_version: tuple[int, int, int] | str,
    *,
    energyplus: EnergyPlusConfig | None = None,
    migrator: AsyncMigrator | None = None,
    on_progress: OnMigrationProgress | None = None,
    work_dir: str | Path | None = None,
    keep_work_dir: bool = False,
) -> MigrationReport:
    """Async counterpart to [migrate()][idfkit.migration.runner.migrate].

    Same semantics as the sync version; the only difference is that each
    transition step is driven via [asyncio.create_subprocess_exec][] (or a
    caller-supplied [AsyncMigrator][idfkit.migration.protocol.AsyncMigrator]),
    and progress callbacks may be async.

    Args:
        model: The document to migrate.
        target_version: Target version (tuple or dotted string).
        energyplus: EnergyPlus installation. Ignored when *migrator* is set.
        migrator: Optional [AsyncMigrator][idfkit.migration.protocol.AsyncMigrator].
            When ``None``, defaults to an
            [AsyncSubprocessMigrator][idfkit.migration.async_subprocess_backend.AsyncSubprocessMigrator]
            rooted at the installed EnergyPlus's IDFVersionUpdater directory.
        on_progress: Optional sync or async callback receiving
            [MigrationProgress][idfkit.migration.progress.MigrationProgress]
            events.
        work_dir: Directory in which to stage per-step working directories.
            Defaults to a newly-created temporary directory that is removed
            after the migration unless *keep_work_dir* is ``True``.
        keep_work_dir: Preserve intermediate files for inspection.

    Returns:
        A [MigrationReport][idfkit.migration.report.MigrationReport].

    Raises:
        idfkit.exceptions.UnsupportedVersionError: For an unsupported version.
        ValueError: If the target is older than ``model.version``.
        idfkit.exceptions.MigrationError: On any transition-step failure; the
            exception's ``completed_steps`` records the prefix that succeeded.
        idfkit.exceptions.EnergyPlusNotFoundError: If *migrator* is ``None``
            and no EnergyPlus installation can be discovered.
    """
    target = normalize_target(target_version)
    source = model.version

    await _emit(on_progress, MigrationProgress(phase="planning", message="Planning migration chain"))
    chain = plan_migration_chain(source, target)

    if not chain:
        await _emit(
            on_progress,
            MigrationProgress(phase="complete", message="Source equals target; nothing to do", percent=100.0),
        )
        return MigrationReport(
            migrated_model=None,
            source_version=source,
            target_version=target,
            requested_target=target,
            steps=(),
            diff=MigrationDiff(),
        )

    resolved_migrator = await asyncio.to_thread(_resolve_async_migrator, migrator, energyplus)
    total_steps = len(chain)
    await _emit(
        on_progress,
        MigrationProgress(
            phase="preparing",
            message=f"Running {total_steps} transition step{'s' if total_steps != 1 else ''}",
            total_steps=total_steps,
        ),
    )

    owned_work_dir = work_dir is None
    root = Path(tempfile.mkdtemp(prefix="idfkit_migrate_")) if owned_work_dir else Path(work_dir).resolve()
    if not owned_work_dir:
        root.mkdir(parents=True, exist_ok=True)

    try:
        report = await _run_chain_async(
            model=model,
            chain=chain,
            migrator=resolved_migrator,
            work_root=root,
            on_progress=on_progress,
            source=source,
            target=target,
        )
    finally:
        if owned_work_dir and not keep_work_dir:
            await asyncio.to_thread(shutil.rmtree, root, True)

    return report

MigrationReport

idfkit.migration.report.MigrationReport dataclass

Result of a full migration run.

Attributes:

Name Type Description
migrated_model IDFDocument | None

The IDFDocument at target_version (None only on a no-op migration where source equals target — in that case the caller's original model is the result).

source_version tuple[int, int, int]

The version the model started at.

target_version tuple[int, int, int]

The version the model was migrated to. On a partial failure this is the last successfully reached version, which may be earlier than the originally-requested target.

requested_target tuple[int, int, int]

The originally-requested target version.

steps tuple[MigrationStep, ...]

Ordered record of every transition step that ran.

diff MigrationDiff

Structural diff computed after migration. Empty when no steps ran (source == target).

Source code in src/idfkit/migration/report.py
@dataclass(frozen=True, slots=True)
class MigrationReport:
    """Result of a full migration run.

    Attributes:
        migrated_model: The [IDFDocument][idfkit.document.IDFDocument] at
            ``target_version`` (``None`` only on a no-op migration where
            source equals target — in that case the caller's original model is
            the result).
        source_version: The version the model started at.
        target_version: The version the model was migrated to. On a partial
            failure this is the last successfully reached version, which may
            be earlier than the originally-requested target.
        requested_target: The originally-requested target version.
        steps: Ordered record of every transition step that ran.
        diff: Structural diff computed after migration. Empty when no
            steps ran (source == target).
    """

    migrated_model: IDFDocument | None
    source_version: tuple[int, int, int]
    target_version: tuple[int, int, int]
    requested_target: tuple[int, int, int]
    steps: tuple[MigrationStep, ...] = ()
    diff: MigrationDiff = field(default_factory=MigrationDiff)

    @property
    def success(self) -> bool:
        """``True`` when every transition step succeeded (or none ran)."""
        return all(s.success for s in self.steps)

    @property
    def completed_steps(self) -> tuple[MigrationStep, ...]:
        """Steps that completed successfully, in order."""
        return tuple(s for s in self.steps if s.success)

    @property
    def failed_step(self) -> MigrationStep | None:
        """The first step that failed, if any."""
        for s in self.steps:
            if not s.success:
                return s
        return None

    def summary(self) -> str:
        """Return a short multi-line summary suitable for logs or CLI output."""
        s_ver = ".".join(str(x) for x in self.source_version)
        t_ver = ".".join(str(x) for x in self.target_version)
        lines = [
            f"Migration: {s_ver} -> {t_ver}"
            + (
                ""
                if self.target_version == self.requested_target
                else f" (requested {'.'.join(str(x) for x in self.requested_target)})"
            ),
            f"Steps: {len(self.completed_steps)}/{len(self.steps)} succeeded",
        ]
        if self.diff.added_object_types:
            lines.append(f"  + object types: {', '.join(self.diff.added_object_types)}")
        if self.diff.removed_object_types:
            lines.append(f"  - object types: {', '.join(self.diff.removed_object_types)}")
        return "\n".join(lines)

migrated_model instance-attribute

source_version instance-attribute

target_version instance-attribute

requested_target instance-attribute

steps = () class-attribute instance-attribute

diff = field(default_factory=MigrationDiff) class-attribute instance-attribute

success property

True when every transition step succeeded (or none ran).

completed_steps property

Steps that completed successfully, in order.

failed_step property

The first step that failed, if any.

summary()

Return a short multi-line summary suitable for logs or CLI output.

Source code in src/idfkit/migration/report.py
def summary(self) -> str:
    """Return a short multi-line summary suitable for logs or CLI output."""
    s_ver = ".".join(str(x) for x in self.source_version)
    t_ver = ".".join(str(x) for x in self.target_version)
    lines = [
        f"Migration: {s_ver} -> {t_ver}"
        + (
            ""
            if self.target_version == self.requested_target
            else f" (requested {'.'.join(str(x) for x in self.requested_target)})"
        ),
        f"Steps: {len(self.completed_steps)}/{len(self.steps)} succeeded",
    ]
    if self.diff.added_object_types:
        lines.append(f"  + object types: {', '.join(self.diff.added_object_types)}")
    if self.diff.removed_object_types:
        lines.append(f"  - object types: {', '.join(self.diff.removed_object_types)}")
    return "\n".join(lines)

MigrationStep

idfkit.migration.report.MigrationStep dataclass

Record of a single transition step.

Attributes:

Name Type Description
from_version tuple[int, int, int]

Source version for this step.

to_version tuple[int, int, int]

Target version for this step.

success bool

Whether the step completed without error.

binary Path | None

Path to the transition binary that was invoked, if any. None for pure-Python backends or steps that were skipped.

stdout str

Captured standard output (may be empty).

stderr str

Captured standard error (may be empty).

audit_text str | None

Contents of the per-step .audit file if produced, else None.

runtime_seconds float

Wall-clock runtime of the step.

Source code in src/idfkit/migration/report.py
@dataclass(frozen=True, slots=True)
class MigrationStep:
    """Record of a single transition step.

    Attributes:
        from_version: Source version for this step.
        to_version: Target version for this step.
        success: Whether the step completed without error.
        binary: Path to the transition binary that was invoked, if any. ``None``
            for pure-Python backends or steps that were skipped.
        stdout: Captured standard output (may be empty).
        stderr: Captured standard error (may be empty).
        audit_text: Contents of the per-step ``.audit`` file if produced,
            else ``None``.
        runtime_seconds: Wall-clock runtime of the step.
    """

    from_version: tuple[int, int, int]
    to_version: tuple[int, int, int]
    success: bool
    binary: Path | None = None
    stdout: str = ""
    stderr: str = ""
    audit_text: str | None = None
    runtime_seconds: float = 0.0

MigrationDiff

idfkit.migration.report.MigrationDiff dataclass

Structural diff between pre- and post-migration documents.

Attributes:

Name Type Description
added_object_types tuple[str, ...]

Types present after migration but not before.

removed_object_types tuple[str, ...]

Types present before migration but not after.

object_count_delta dict[str, int]

Per-type signed change in object count. Includes only types with a nonzero delta.

field_changes dict[str, FieldDelta]

Per-type schema-level field changes (only types present in both the before and after schemas are included).

Source code in src/idfkit/migration/report.py
@dataclass(frozen=True, slots=True)
class MigrationDiff:
    """Structural diff between pre- and post-migration documents.

    Attributes:
        added_object_types: Types present after migration but not before.
        removed_object_types: Types present before migration but not after.
        object_count_delta: Per-type signed change in object count.
            Includes only types with a nonzero delta.
        field_changes: Per-type schema-level field changes (only types present
            in both the before and after schemas are included).
    """

    added_object_types: tuple[str, ...] = ()
    removed_object_types: tuple[str, ...] = ()
    object_count_delta: dict[str, int] = field(default_factory=_empty_count_delta)
    field_changes: dict[str, FieldDelta] = field(default_factory=_empty_field_changes)

    @property
    def is_empty(self) -> bool:
        """``True`` when the migration produced no observable structural change."""
        return (
            not self.added_object_types
            and not self.removed_object_types
            and not self.object_count_delta
            and not self.field_changes
        )

added_object_types = () class-attribute instance-attribute

removed_object_types = () class-attribute instance-attribute

object_count_delta = field(default_factory=_empty_count_delta) class-attribute instance-attribute

field_changes = field(default_factory=_empty_field_changes) class-attribute instance-attribute

is_empty property

True when the migration produced no observable structural change.

FieldDelta

idfkit.migration.report.FieldDelta dataclass

Per-object-type summary of field changes introduced by the migration.

Attributes:

Name Type Description
added tuple[str, ...]

Field names present in to_version schema but not in from_version.

removed tuple[str, ...]

Field names present in from_version schema but not in to_version.

Source code in src/idfkit/migration/report.py
@dataclass(frozen=True, slots=True)
class FieldDelta:
    """Per-object-type summary of field changes introduced by the migration.

    Attributes:
        added: Field names present in *to_version* schema but not in *from_version*.
        removed: Field names present in *from_version* schema but not in *to_version*.
    """

    added: tuple[str, ...] = ()
    removed: tuple[str, ...] = ()

MigrationProgress

idfkit.migration.progress.MigrationProgress dataclass

Progress event emitted during a migration run.

Attributes:

Name Type Description
phase MigrationPhase

Current migration phase.

message str

Short human-readable description of the current step.

step_index int | None

0-based index of the current transition step (only set during the "transitioning" phase).

total_steps int | None

Total number of transition steps in the chain (only set once the chain has been planned).

from_version tuple[int, int, int] | None

Source version of the current step, if known.

to_version tuple[int, int, int] | None

Target version of the current step, if known.

percent float | None

Estimated completion percentage (0.0 to 100.0) or None when progress is indeterminate.

Source code in src/idfkit/migration/progress.py
@dataclass(frozen=True, slots=True)
class MigrationProgress:
    """Progress event emitted during a migration run.

    Attributes:
        phase: Current migration phase.
        message: Short human-readable description of the current step.
        step_index: 0-based index of the current transition step
            (only set during the ``"transitioning"`` phase).
        total_steps: Total number of transition steps in the chain
            (only set once the chain has been planned).
        from_version: Source version of the current step, if known.
        to_version: Target version of the current step, if known.
        percent: Estimated completion percentage (``0.0`` to ``100.0``) or
            ``None`` when progress is indeterminate.
    """

    phase: MigrationPhase
    message: str
    step_index: int | None = None
    total_steps: int | None = None
    from_version: tuple[int, int, int] | None = None
    to_version: tuple[int, int, int] | None = None
    percent: float | None = None

Migrator

idfkit.migration.protocol.Migrator

Bases: Protocol

Protocol implemented by backends that migrate IDF text one version at a time.

Source code in src/idfkit/migration/protocol.py
@runtime_checkable
class Migrator(Protocol):
    """Protocol implemented by backends that migrate IDF text one version at a time."""

    def migrate_step(
        self,
        idf_text: str,
        from_version: tuple[int, int, int],
        to_version: tuple[int, int, int],
        *,
        work_dir: Path,
    ) -> MigrationStepResult:
        """Migrate *idf_text* from *from_version* to *to_version*.

        Args:
            idf_text: The source IDF text.
            from_version: The source version; the migrator should assume the
                IDF conforms to this version's IDD.
            to_version: The target version; the migrator must emit IDF text
                that conforms to this version's IDD.
            work_dir: A clean working directory the backend may use for
                intermediate files. Caller owns cleanup.

        Returns:
            The [MigrationStepResult][idfkit.migration.protocol.MigrationStepResult]
            for this single step.

        Raises:
            idfkit.exceptions.MigrationError: If the migration fails.
        """
        ...

migrate_step(idf_text, from_version, to_version, *, work_dir)

Migrate idf_text from from_version to to_version.

Parameters:

Name Type Description Default
idf_text str

The source IDF text.

required
from_version tuple[int, int, int]

The source version; the migrator should assume the IDF conforms to this version's IDD.

required
to_version tuple[int, int, int]

The target version; the migrator must emit IDF text that conforms to this version's IDD.

required
work_dir Path

A clean working directory the backend may use for intermediate files. Caller owns cleanup.

required

Returns:

Type Description
MigrationStepResult
MigrationStepResult

for this single step.

Raises:

Type Description
MigrationError

If the migration fails.

Source code in src/idfkit/migration/protocol.py
def migrate_step(
    self,
    idf_text: str,
    from_version: tuple[int, int, int],
    to_version: tuple[int, int, int],
    *,
    work_dir: Path,
) -> MigrationStepResult:
    """Migrate *idf_text* from *from_version* to *to_version*.

    Args:
        idf_text: The source IDF text.
        from_version: The source version; the migrator should assume the
            IDF conforms to this version's IDD.
        to_version: The target version; the migrator must emit IDF text
            that conforms to this version's IDD.
        work_dir: A clean working directory the backend may use for
            intermediate files. Caller owns cleanup.

    Returns:
        The [MigrationStepResult][idfkit.migration.protocol.MigrationStepResult]
        for this single step.

    Raises:
        idfkit.exceptions.MigrationError: If the migration fails.
    """
    ...

AsyncMigrator

idfkit.migration.protocol.AsyncMigrator

Bases: Protocol

Async counterpart to Migrator.

Implementations perform a single version-step migration without blocking the event loop. The default implementation, AsyncSubprocessMigrator, uses asyncio.create_subprocess_exec to drive the Transition-VX-to-VY binaries shipped with EnergyPlus.

Source code in src/idfkit/migration/protocol.py
@runtime_checkable
class AsyncMigrator(Protocol):
    """Async counterpart to [Migrator][idfkit.migration.protocol.Migrator].

    Implementations perform a single version-step migration without blocking
    the event loop. The default implementation,
    [AsyncSubprocessMigrator][idfkit.migration.async_subprocess_backend.AsyncSubprocessMigrator],
    uses [asyncio.create_subprocess_exec][] to drive the ``Transition-VX-to-VY``
    binaries shipped with EnergyPlus.
    """

    async def migrate_step(
        self,
        idf_text: str,
        from_version: tuple[int, int, int],
        to_version: tuple[int, int, int],
        *,
        work_dir: Path,
    ) -> MigrationStepResult:
        """Async counterpart to [Migrator.migrate_step][idfkit.migration.protocol.Migrator.migrate_step]."""
        ...

migrate_step(idf_text, from_version, to_version, *, work_dir) async

Async counterpart to Migrator.migrate_step.

Source code in src/idfkit/migration/protocol.py
async def migrate_step(
    self,
    idf_text: str,
    from_version: tuple[int, int, int],
    to_version: tuple[int, int, int],
    *,
    work_dir: Path,
) -> MigrationStepResult:
    """Async counterpart to [Migrator.migrate_step][idfkit.migration.protocol.Migrator.migrate_step]."""
    ...

MigrationStepResult

idfkit.migration.protocol.MigrationStepResult dataclass

Result of a single-step migration.

Attributes:

Name Type Description
idf_text str

The migrated IDF source text.

stdout str

Captured standard output (may be empty).

stderr str

Captured standard error (may be empty).

audit_text str | None

Contents of the per-step .audit file if produced by the backend, else None.

Source code in src/idfkit/migration/protocol.py
@dataclass(frozen=True, slots=True)
class MigrationStepResult:
    """Result of a single-step migration.

    Attributes:
        idf_text: The migrated IDF source text.
        stdout: Captured standard output (may be empty).
        stderr: Captured standard error (may be empty).
        audit_text: Contents of the per-step ``.audit`` file if produced by
            the backend, else ``None``.
    """

    idf_text: str
    stdout: str = ""
    stderr: str = ""
    audit_text: str | None = None

SubprocessMigrator

idfkit.migration.subprocess_backend.SubprocessMigrator dataclass

Migrator backend that shells out to EnergyPlus's transition binaries.

Attributes:

Name Type Description
version_updater_dir Path

Path to PreProcess/IDFVersionUpdater. Must contain the Transition-V*-to-V* binaries and matching V*-Energy+.idd files.

step_timeout float

Maximum wall-clock seconds for a single transition step.

Source code in src/idfkit/migration/subprocess_backend.py
@dataclass(frozen=True, slots=True)
class SubprocessMigrator:
    """Migrator backend that shells out to EnergyPlus's transition binaries.

    Attributes:
        version_updater_dir: Path to ``PreProcess/IDFVersionUpdater``. Must contain
            the ``Transition-V*-to-V*`` binaries and matching
            ``V*-Energy+.idd`` files.
        step_timeout: Maximum wall-clock seconds for a single transition step.
    """

    version_updater_dir: Path
    step_timeout: float = DEFAULT_STEP_TIMEOUT

    def migrate_step(
        self,
        idf_text: str,
        from_version: tuple[int, int, int],
        to_version: tuple[int, int, int],
        *,
        work_dir: Path,
    ) -> MigrationStepResult:
        """Run a single ``Transition-VX-to-VY`` binary on *idf_text*."""
        binary = self.locate_binary(from_version, to_version)

        work_dir.mkdir(parents=True, exist_ok=True)
        stage_idd_symlinks(self.version_updater_dir, work_dir)
        input_idf = work_dir / "in.idf"
        input_idf.write_text(idf_text, encoding="latin-1")

        try:
            proc = subprocess.run(  # noqa: S603
                [str(binary), str(input_idf)],
                capture_output=True,
                text=True,
                timeout=self.step_timeout,
                cwd=str(work_dir),
                check=False,
            )
        except subprocess.TimeoutExpired as exc:
            msg = f"Transition timed out after {self.step_timeout} seconds"
            raise MigrationError(
                msg,
                from_version=from_version,
                to_version=to_version,
                stderr=str(exc.stderr) if exc.stderr else None,
            ) from exc
        except OSError as exc:
            msg = f"Failed to start transition binary: {exc}"
            raise MigrationError(
                msg,
                from_version=from_version,
                to_version=to_version,
            ) from exc

        if proc.returncode != 0:
            msg = "Transition binary exited with non-zero status"
            raise MigrationError(
                msg,
                from_version=from_version,
                to_version=to_version,
                exit_code=proc.returncode,
                stderr=proc.stderr,
            )

        if not input_idf.is_file():
            msg = f"Transition binary produced no output at {input_idf}"
            raise MigrationError(
                msg,
                from_version=from_version,
                to_version=to_version,
                exit_code=proc.returncode,
                stderr=proc.stderr,
            )

        migrated_text = input_idf.read_text(encoding="latin-1")
        return MigrationStepResult(
            idf_text=migrated_text,
            stdout=proc.stdout,
            stderr=proc.stderr,
            audit_text=collect_audit_text(work_dir),
        )

    def locate_binary(
        self,
        from_version: tuple[int, int, int],
        to_version: tuple[int, int, int],
    ) -> Path:
        """Return the absolute path to the transition binary for a single step.

        Tries the registry-exact name first, then falls back to ``(major, minor, 0)``
        for either side -- EnergyPlus's transition binaries are always named with
        ``patch=0`` (notably, v9.0.1 uses the ``V9-0-0`` binary name).
        """
        candidates = binary_candidates(from_version, to_version)
        for name in candidates:
            p = self.version_updater_dir / name
            if p.is_file():
                return p
        if not self.version_updater_dir.is_dir():
            msg = f"IDFVersionUpdater directory not found: {self.version_updater_dir}"
            raise MigrationError(msg, from_version=from_version, to_version=to_version)
        msg = f"No transition binary found. Tried: {', '.join(candidates)}"
        raise MigrationError(msg, from_version=from_version, to_version=to_version)

locate_binary(from_version, to_version)

Return the absolute path to the transition binary for a single step.

Tries the registry-exact name first, then falls back to (major, minor, 0) for either side -- EnergyPlus's transition binaries are always named with patch=0 (notably, v9.0.1 uses the V9-0-0 binary name).

Source code in src/idfkit/migration/subprocess_backend.py
def locate_binary(
    self,
    from_version: tuple[int, int, int],
    to_version: tuple[int, int, int],
) -> Path:
    """Return the absolute path to the transition binary for a single step.

    Tries the registry-exact name first, then falls back to ``(major, minor, 0)``
    for either side -- EnergyPlus's transition binaries are always named with
    ``patch=0`` (notably, v9.0.1 uses the ``V9-0-0`` binary name).
    """
    candidates = binary_candidates(from_version, to_version)
    for name in candidates:
        p = self.version_updater_dir / name
        if p.is_file():
            return p
    if not self.version_updater_dir.is_dir():
        msg = f"IDFVersionUpdater directory not found: {self.version_updater_dir}"
        raise MigrationError(msg, from_version=from_version, to_version=to_version)
    msg = f"No transition binary found. Tried: {', '.join(candidates)}"
    raise MigrationError(msg, from_version=from_version, to_version=to_version)

migrate_step(idf_text, from_version, to_version, *, work_dir)

Run a single Transition-VX-to-VY binary on idf_text.

Source code in src/idfkit/migration/subprocess_backend.py
def migrate_step(
    self,
    idf_text: str,
    from_version: tuple[int, int, int],
    to_version: tuple[int, int, int],
    *,
    work_dir: Path,
) -> MigrationStepResult:
    """Run a single ``Transition-VX-to-VY`` binary on *idf_text*."""
    binary = self.locate_binary(from_version, to_version)

    work_dir.mkdir(parents=True, exist_ok=True)
    stage_idd_symlinks(self.version_updater_dir, work_dir)
    input_idf = work_dir / "in.idf"
    input_idf.write_text(idf_text, encoding="latin-1")

    try:
        proc = subprocess.run(  # noqa: S603
            [str(binary), str(input_idf)],
            capture_output=True,
            text=True,
            timeout=self.step_timeout,
            cwd=str(work_dir),
            check=False,
        )
    except subprocess.TimeoutExpired as exc:
        msg = f"Transition timed out after {self.step_timeout} seconds"
        raise MigrationError(
            msg,
            from_version=from_version,
            to_version=to_version,
            stderr=str(exc.stderr) if exc.stderr else None,
        ) from exc
    except OSError as exc:
        msg = f"Failed to start transition binary: {exc}"
        raise MigrationError(
            msg,
            from_version=from_version,
            to_version=to_version,
        ) from exc

    if proc.returncode != 0:
        msg = "Transition binary exited with non-zero status"
        raise MigrationError(
            msg,
            from_version=from_version,
            to_version=to_version,
            exit_code=proc.returncode,
            stderr=proc.stderr,
        )

    if not input_idf.is_file():
        msg = f"Transition binary produced no output at {input_idf}"
        raise MigrationError(
            msg,
            from_version=from_version,
            to_version=to_version,
            exit_code=proc.returncode,
            stderr=proc.stderr,
        )

    migrated_text = input_idf.read_text(encoding="latin-1")
    return MigrationStepResult(
        idf_text=migrated_text,
        stdout=proc.stdout,
        stderr=proc.stderr,
        audit_text=collect_audit_text(work_dir),
    )

AsyncSubprocessMigrator

idfkit.migration.async_subprocess_backend.AsyncSubprocessMigrator dataclass

Async migrator backend built on asyncio.create_subprocess_exec.

Attributes:

Name Type Description
version_updater_dir Path

Path to PreProcess/IDFVersionUpdater.

step_timeout float

Maximum wall-clock seconds for a single transition step.

Source code in src/idfkit/migration/async_subprocess_backend.py
@dataclass(frozen=True, slots=True)
class AsyncSubprocessMigrator:
    """Async migrator backend built on [asyncio.create_subprocess_exec][].

    Attributes:
        version_updater_dir: Path to ``PreProcess/IDFVersionUpdater``.
        step_timeout: Maximum wall-clock seconds for a single transition step.
    """

    version_updater_dir: Path
    step_timeout: float = DEFAULT_STEP_TIMEOUT

    async def migrate_step(
        self,
        idf_text: str,
        from_version: tuple[int, int, int],
        to_version: tuple[int, int, int],
        *,
        work_dir: Path,
    ) -> MigrationStepResult:
        """Run a single ``Transition-VX-to-VY`` binary without blocking."""
        binary = self.locate_binary(from_version, to_version)

        await asyncio.to_thread(work_dir.mkdir, parents=True, exist_ok=True)
        await asyncio.to_thread(stage_idd_symlinks, self.version_updater_dir, work_dir)
        input_idf = work_dir / "in.idf"
        await asyncio.to_thread(input_idf.write_text, idf_text, "latin-1")

        try:
            proc = await asyncio.create_subprocess_exec(
                str(binary),
                str(input_idf),
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(work_dir),
            )
        except OSError as exc:
            msg = f"Failed to start transition binary: {exc}"
            raise MigrationError(
                msg,
                from_version=from_version,
                to_version=to_version,
            ) from exc

        try:
            stdout_bytes, stderr_bytes = await asyncio.wait_for(
                proc.communicate(),
                timeout=self.step_timeout,
            )
        except asyncio.TimeoutError as exc:
            proc.kill()
            with contextlib.suppress(ProcessLookupError):
                await proc.wait()
            msg = f"Transition timed out after {self.step_timeout} seconds"
            raise MigrationError(
                msg,
                from_version=from_version,
                to_version=to_version,
            ) from exc
        except asyncio.CancelledError:
            proc.kill()
            with contextlib.suppress(ProcessLookupError):
                await proc.wait()
            raise

        stdout = stdout_bytes.decode("utf-8", errors="replace") if stdout_bytes else ""
        stderr = stderr_bytes.decode("utf-8", errors="replace") if stderr_bytes else ""
        returncode = proc.returncode if proc.returncode is not None else -1

        if returncode != 0:
            msg = "Transition binary exited with non-zero status"
            raise MigrationError(
                msg,
                from_version=from_version,
                to_version=to_version,
                exit_code=returncode,
                stderr=stderr,
            )

        if not input_idf.is_file():
            msg = f"Transition binary produced no output at {input_idf}"
            raise MigrationError(
                msg,
                from_version=from_version,
                to_version=to_version,
                exit_code=returncode,
                stderr=stderr,
            )

        migrated_text = await asyncio.to_thread(input_idf.read_text, "latin-1")
        audit_text = await asyncio.to_thread(collect_audit_text, work_dir)
        return MigrationStepResult(
            idf_text=migrated_text,
            stdout=stdout,
            stderr=stderr,
            audit_text=audit_text,
        )

    def locate_binary(
        self,
        from_version: tuple[int, int, int],
        to_version: tuple[int, int, int],
    ) -> Path:
        """Return the absolute path to the transition binary for a single step."""
        candidates = binary_candidates(from_version, to_version)
        for name in candidates:
            p = self.version_updater_dir / name
            if p.is_file():
                return p
        if not self.version_updater_dir.is_dir():
            msg = f"IDFVersionUpdater directory not found: {self.version_updater_dir}"
            raise MigrationError(msg, from_version=from_version, to_version=to_version)
        msg = f"No transition binary found. Tried: {', '.join(candidates)}"
        raise MigrationError(msg, from_version=from_version, to_version=to_version)

locate_binary(from_version, to_version)

Return the absolute path to the transition binary for a single step.

Source code in src/idfkit/migration/async_subprocess_backend.py
def locate_binary(
    self,
    from_version: tuple[int, int, int],
    to_version: tuple[int, int, int],
) -> Path:
    """Return the absolute path to the transition binary for a single step."""
    candidates = binary_candidates(from_version, to_version)
    for name in candidates:
        p = self.version_updater_dir / name
        if p.is_file():
            return p
    if not self.version_updater_dir.is_dir():
        msg = f"IDFVersionUpdater directory not found: {self.version_updater_dir}"
        raise MigrationError(msg, from_version=from_version, to_version=to_version)
    msg = f"No transition binary found. Tried: {', '.join(candidates)}"
    raise MigrationError(msg, from_version=from_version, to_version=to_version)

migrate_step(idf_text, from_version, to_version, *, work_dir) async

Run a single Transition-VX-to-VY binary without blocking.

Source code in src/idfkit/migration/async_subprocess_backend.py
async def migrate_step(
    self,
    idf_text: str,
    from_version: tuple[int, int, int],
    to_version: tuple[int, int, int],
    *,
    work_dir: Path,
) -> MigrationStepResult:
    """Run a single ``Transition-VX-to-VY`` binary without blocking."""
    binary = self.locate_binary(from_version, to_version)

    await asyncio.to_thread(work_dir.mkdir, parents=True, exist_ok=True)
    await asyncio.to_thread(stage_idd_symlinks, self.version_updater_dir, work_dir)
    input_idf = work_dir / "in.idf"
    await asyncio.to_thread(input_idf.write_text, idf_text, "latin-1")

    try:
        proc = await asyncio.create_subprocess_exec(
            str(binary),
            str(input_idf),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(work_dir),
        )
    except OSError as exc:
        msg = f"Failed to start transition binary: {exc}"
        raise MigrationError(
            msg,
            from_version=from_version,
            to_version=to_version,
        ) from exc

    try:
        stdout_bytes, stderr_bytes = await asyncio.wait_for(
            proc.communicate(),
            timeout=self.step_timeout,
        )
    except asyncio.TimeoutError as exc:
        proc.kill()
        with contextlib.suppress(ProcessLookupError):
            await proc.wait()
        msg = f"Transition timed out after {self.step_timeout} seconds"
        raise MigrationError(
            msg,
            from_version=from_version,
            to_version=to_version,
        ) from exc
    except asyncio.CancelledError:
        proc.kill()
        with contextlib.suppress(ProcessLookupError):
            await proc.wait()
        raise

    stdout = stdout_bytes.decode("utf-8", errors="replace") if stdout_bytes else ""
    stderr = stderr_bytes.decode("utf-8", errors="replace") if stderr_bytes else ""
    returncode = proc.returncode if proc.returncode is not None else -1

    if returncode != 0:
        msg = "Transition binary exited with non-zero status"
        raise MigrationError(
            msg,
            from_version=from_version,
            to_version=to_version,
            exit_code=returncode,
            stderr=stderr,
        )

    if not input_idf.is_file():
        msg = f"Transition binary produced no output at {input_idf}"
        raise MigrationError(
            msg,
            from_version=from_version,
            to_version=to_version,
            exit_code=returncode,
            stderr=stderr,
        )

    migrated_text = await asyncio.to_thread(input_idf.read_text, "latin-1")
    audit_text = await asyncio.to_thread(collect_audit_text, work_dir)
    return MigrationStepResult(
        idf_text=migrated_text,
        stdout=stdout,
        stderr=stderr,
        audit_text=audit_text,
    )

plan_migration_chain

idfkit.migration.chain.plan_migration_chain(source, target)

Plan the ordered list of transition steps from source to target.

The chain walks forward through ENERGYPLUS_VERSIONS from source to target, emitting a (from, to) pair for each consecutive version boundary.

Parameters:

Name Type Description Default
source tuple[int, int, int]

The model's current version. Must be in ENERGYPLUS_VERSIONS.

required
target tuple[int, int, int]

The desired version. Must be in ENERGYPLUS_VERSIONS and >= source.

required

Returns:

Type Description
tuple[TransitionStep, ...]

A tuple of (from, to) pairs. Empty when source == target.

Raises:

Type Description
UnsupportedVersionError

If source or target is not a supported version.

ValueError

If target < source (backward migration is not supported).

Source code in src/idfkit/migration/chain.py
def plan_migration_chain(
    source: tuple[int, int, int],
    target: tuple[int, int, int],
) -> tuple[TransitionStep, ...]:
    """Plan the ordered list of transition steps from *source* to *target*.

    The chain walks forward through `ENERGYPLUS_VERSIONS`
    from *source* to *target*, emitting a ``(from, to)`` pair for each
    consecutive version boundary.

    Args:
        source: The model's current version. Must be in ``ENERGYPLUS_VERSIONS``.
        target: The desired version. Must be in ``ENERGYPLUS_VERSIONS`` and
            ``>= source``.

    Returns:
        A tuple of ``(from, to)`` pairs. Empty when ``source == target``.

    Raises:
        UnsupportedVersionError: If *source* or *target* is not a supported version.
        ValueError: If ``target < source`` (backward migration is not supported).
    """
    if not is_supported_version(source):
        raise UnsupportedVersionError(source, ENERGYPLUS_VERSIONS)
    if not is_supported_version(target):
        raise UnsupportedVersionError(target, ENERGYPLUS_VERSIONS)

    if source == target:
        return ()
    if target < source:
        msg = (
            f"Backward migration is not supported: {source[0]}.{source[1]}.{source[2]}"
            f" -> {target[0]}.{target[1]}.{target[2]}."
        )
        raise ValueError(msg)

    source_idx = ENERGYPLUS_VERSIONS.index(source)
    target_idx = ENERGYPLUS_VERSIONS.index(target)
    steps: list[TransitionStep] = []
    for i in range(source_idx, target_idx):
        steps.append((ENERGYPLUS_VERSIONS[i], ENERGYPLUS_VERSIONS[i + 1]))
    return tuple(steps)

document_diff

idfkit.migration.diff.document_diff(before, after)

Compute a structural diff between before and after.

The diff compares the two documents at three levels:

  1. Object types present in one document but not the other.
  2. Per-type signed change in object count.
  3. Per-type schema-level field renames (computed from each document's bundled schema — so after picks up fields that only exist in the newer IDD, and before may carry fields that have since been removed).

Parameters:

Name Type Description Default
before IDFDocument

Document before migration.

required
after IDFDocument

Document after migration.

required

Returns:

Type Description
MigrationDiff

A MigrationDiff capturing

MigrationDiff

the observable changes.

Source code in src/idfkit/migration/diff.py
def document_diff(before: IDFDocument, after: IDFDocument) -> MigrationDiff:
    """Compute a structural diff between *before* and *after*.

    The diff compares the two documents at three levels:

    1. Object types present in one document but not the other.
    2. Per-type signed change in object count.
    3. Per-type schema-level field renames (computed from each document's
       bundled schema — so ``after`` picks up fields that only exist in the
       newer IDD, and ``before`` may carry fields that have since been removed).

    Args:
        before: Document before migration.
        after: Document after migration.

    Returns:
        A [MigrationDiff][idfkit.migration.report.MigrationDiff] capturing
        the observable changes.
    """
    before_types = set(before.collections.keys())
    after_types = set(after.collections.keys())

    added = tuple(sorted(after_types - before_types))
    removed = tuple(sorted(before_types - after_types))

    count_delta: dict[str, int] = {}
    for obj_type in sorted(before_types | after_types):
        before_count = len(before.collections.get(obj_type, ()))
        after_count = len(after.collections.get(obj_type, ()))
        delta = after_count - before_count
        if delta != 0:
            count_delta[obj_type] = delta

    field_changes = _compute_field_changes(before, after, shared_types=before_types & after_types)

    return MigrationDiff(
        added_object_types=added,
        removed_object_types=removed,
        object_count_delta=count_delta,
        field_changes=field_changes,
    )