Skip to content

Migrations

The migration API provides snapshots, diffs, generated plans, migration artifacts, history operations, rollback, repair, and squash helpers.

ormdantic.migrations.MigrationManager

MigrationManager(database)

Generate, review, apply, and roll back SQL migration plans.

Source code in ormdantic/migrations.py
def __init__(self, database: Any) -> None:
    self._database = database

snapshot

snapshot()

Return a serializable snapshot for the currently registered models.

Source code in ormdantic/migrations.py
def snapshot(self) -> SchemaSnapshot:
    """Return a serializable snapshot for the currently registered models."""
    return SchemaSnapshot.from_database(
        self._database,
        native_enum_types=self._dialect() == "postgresql",
    )

live_snapshot

live_snapshot(
    *, include_tables=None, exclude_tables=None, schema=None
)

Return a live schema snapshot reflected from the current database.

Source code in ormdantic/migrations.py
def live_snapshot(
    self,
    *,
    include_tables: Sequence[str] | None = None,
    exclude_tables: Sequence[str] | None = None,
    schema: str | None = None,
) -> SchemaSnapshot:
    """Return a live schema snapshot reflected from the current database."""
    return _reflect_schema_snapshot(
        self._connection_url,
        dialect=self._dialect(),
        include_tables=include_tables,
        exclude_tables=exclude_tables,
        schema=schema,
    )

autogenerate

autogenerate(
    revision,
    *,
    dialect=None,
    include_tables=None,
    exclude_tables=None,
    schema=None,
    description=None,
    depends_on=None,
    branch_labels=None,
    path=None,
    skip_noop=True
)

Generate a migration artifact by diffing live schema against models.

Source code in ormdantic/migrations.py
def autogenerate(
    self,
    revision: str,
    *,
    dialect: str | None = None,
    include_tables: Sequence[str] | None = None,
    exclude_tables: Sequence[str] | None = None,
    schema: str | None = None,
    description: str | None = None,
    depends_on: Sequence[str] | None = None,
    branch_labels: Sequence[str] | None = None,
    path: str | PathLike[str] | None = None,
    skip_noop: bool = True,
) -> MigrationArtifact | None:
    """Generate a migration artifact by diffing live schema against models."""
    before = self.live_snapshot(
        include_tables=include_tables,
        exclude_tables=exclude_tables,
        schema=schema,
    )
    after = _filter_snapshot_for_autogenerate_scope(
        self.snapshot(),
        include_tables=include_tables,
        exclude_tables=exclude_tables,
        schema=schema,
    )
    if schema is not None and after.tables:
        after = replace(
            after,
            tables=[
                replace(table, schema=schema) if table.schema is None else table
                for table in after.tables
            ],
        )
    if schema is not None and after.enum_types:
        after = replace(
            after,
            enum_types=[
                (
                    replace(enum_type, schema=schema)
                    if enum_type.schema is None
                    else enum_type
                )
                for enum_type in after.enum_types
            ],
        )
    if schema is not None and after.sequences:
        after = replace(
            after,
            sequences=[
                (
                    replace(sequence, schema=schema)
                    if sequence.schema is None
                    else sequence
                )
                for sequence in after.sequences
            ],
        )
    if schema is not None and after.views:
        after = replace(
            after,
            views=[
                replace(view, schema=schema) if view.schema is None else view
                for view in after.views
            ],
        )
    if before.enum_types and not after.enum_types:
        # Non-PostgreSQL target snapshots do not infer native enum types.
        # Preserve reflected objects so autogenerate does not plan accidental drops.
        after = replace(after, enum_types=list(before.enum_types))
    if before.namespaces and not after.namespaces:
        # Namespace ownership is explicit. Preserve reflected namespaces unless
        # the model snapshot registers its own namespace target.
        after = replace(after, namespaces=list(before.namespaces))
    if before.sequences and not after.sequences:
        # Preserve reflected sequences unless the model snapshot registers its
        # own sequence target.
        after = replace(after, sequences=list(before.sequences))
    if before.views and not after.views:
        # Preserve reflected views unless the model snapshot registers its own
        # view target.
        after = replace(after, views=list(before.views))
    active_dialect = dialect or self._connection_url
    plan = _build_plan(active_dialect, before, after)
    if plan.is_empty() and skip_noop:
        return None
    artifact = MigrationArtifact.from_plan(
        revision,
        plan,
        before,
        after,
        dialect=active_dialect,
        description=description,
        depends_on=depends_on,
        branch_labels=branch_labels,
        metadata={"autogenerated": True, "schema": schema},
    )
    if path is not None:
        artifact.write(path)
    return artifact

diff

diff(from_snapshot=None, to_snapshot=None)

Return structured schema changes between two snapshots.

Source code in ormdantic/migrations.py
def diff(
    self,
    from_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
    to_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
) -> SchemaDiff:
    """Return structured schema changes between two snapshots."""
    before, after = self._resolve_snapshots(from_snapshot, to_snapshot)
    return diff_snapshots(before, after, dialect=self._database._connection)

generate_plan

generate_plan(
    from_snapshot=None, to_snapshot=None, *, dialect=None
)

Generate a migration plan and rollback SQL between two snapshots.

Source code in ormdantic/migrations.py
def generate_plan(
    self,
    from_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
    to_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
    *,
    dialect: str | None = None,
) -> MigrationPlan:
    """Generate a migration plan and rollback SQL between two snapshots."""
    before, after = self._resolve_snapshots(from_snapshot, to_snapshot)
    return _build_plan(dialect or self._database._connection, before, after)

create_migration

create_migration(
    revision,
    from_snapshot=None,
    to_snapshot=None,
    *,
    dialect=None,
    description=None,
    depends_on=None,
    branch_labels=None,
    path=None
)

Generate a serializable migration artifact.

Source code in ormdantic/migrations.py
def create_migration(
    self,
    revision: str,
    from_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
    to_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
    *,
    dialect: str | None = None,
    description: str | None = None,
    depends_on: Sequence[str] | None = None,
    branch_labels: Sequence[str] | None = None,
    path: str | PathLike[str] | None = None,
) -> MigrationArtifact:
    """Generate a serializable migration artifact."""
    before, after = self._resolve_snapshots(from_snapshot, to_snapshot)
    artifact = create_migration_artifact(
        revision,
        before,
        after,
        dialect=dialect or self._database._connection,
        description=description,
        depends_on=depends_on,
        branch_labels=branch_labels,
    )
    if path is not None:
        artifact.write(path)
    return artifact

dry_run

dry_run(
    from_snapshot=None, to_snapshot=None, *, dialect=None
)

Return generated SQL without applying it.

Source code in ormdantic/migrations.py
def dry_run(
    self,
    from_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
    to_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
    *,
    dialect: str | None = None,
) -> list[str]:
    """Return generated SQL without applying it."""
    return self.generate_plan(
        from_snapshot,
        to_snapshot,
        dialect=dialect,
    ).dry_run()

ensure_revision_table async

ensure_revision_table()

Create/upgrade the migration history table when missing.

Source code in ormdantic/migrations.py
async def ensure_revision_table(self) -> None:
    """Create/upgrade the migration history table when missing."""
    connection = self._native_connection()
    _ensure_migration_history_table(connection, self._dialect())

applied_revisions async

applied_revisions()

Return applied migration revisions ordered by apply time.

Source code in ormdantic/migrations.py
async def applied_revisions(self) -> list[str]:
    """Return applied migration revisions ordered by apply time."""
    return [
        entry.revision
        for entry in await self.history()
        if entry.status == MIGRATION_STATUS_APPLIED and not entry.dirty
    ]

history async

history()

Return migration history rows.

Source code in ormdantic/migrations.py
async def history(self) -> list[MigrationHistoryEntry]:
    """Return migration history rows."""
    connection = self._native_connection()
    dialect = self._dialect()
    _ensure_migration_history_table(connection, dialect)
    return _history_entries(connection, dialect)

current async

current()

Return the latest successfully applied revision.

Source code in ormdantic/migrations.py
async def current(self) -> MigrationHistoryEntry | None:
    """Return the latest successfully applied revision."""
    connection = self._native_connection()
    dialect = self._dialect()
    _ensure_migration_history_table(connection, dialect)
    return _current_entry(connection, dialect)

is_dirty async

is_dirty()

Return whether migration history is currently marked dirty.

Source code in ormdantic/migrations.py
async def is_dirty(self) -> bool:
    """Return whether migration history is currently marked dirty."""
    connection = self._native_connection()
    dialect = self._dialect()
    _ensure_migration_history_table(connection, dialect)
    return _is_dirty(connection, dialect)

status async

status()

Return current migration status metadata.

Source code in ormdantic/migrations.py
async def status(self) -> dict[str, Any]:
    """Return current migration status metadata."""
    current = await self.current()
    return {
        "dirty": await self.is_dirty(),
        "current": current.revision if current else None,
        "current_entry": current,
        "applied": await self.applied_revisions(),
    }

repair async

repair(
    *,
    revision=None,
    status=None,
    clear_dirty=True,
    checksum=None
)

Repair migration metadata after a failed/manual migration.

Source code in ormdantic/migrations.py
async def repair(
    self,
    *,
    revision: str | None = None,
    status: str | None = None,
    clear_dirty: bool = True,
    checksum: str | None = None,
) -> int:
    """Repair migration metadata after a failed/manual migration."""
    connection = self._native_connection()
    dialect = self._dialect()
    _ensure_migration_history_table(connection, dialect)
    repaired = _repair_history(
        connection,
        dialect,
        revision=revision,
        status=status,
        clear_dirty=clear_dirty,
        checksum=checksum,
    )
    _commit_migration_history_if_needed(connection, dialect)
    return repaired

apply_artifact async

apply_artifact(artifact, *, allow_destructive=False)

Apply a migration artifact and record its revision.

Source code in ormdantic/migrations.py
async def apply_artifact(
    self,
    artifact: MigrationArtifact | Mapping[str, Any] | str | PathLike[str],
    *,
    allow_destructive: bool = False,
) -> bool:
    """Apply a migration artifact and record its revision."""
    migration = _coerce_artifact(artifact)
    migration.validate_checksum()
    return await self.apply(
        migration.revision,
        migration.to_plan(),
        allow_destructive=allow_destructive,
        checksum=migration.checksum,
        description=migration.description,
        artifact_version=migration.artifact_version,
        metadata=migration.metadata,
    )

apply_file async

apply_file(path, *, allow_destructive=False)

Apply a migration artifact from disk.

Source code in ormdantic/migrations.py
async def apply_file(
    self,
    path: str | PathLike[str],
    *,
    allow_destructive: bool = False,
) -> bool:
    """Apply a migration artifact from disk."""
    return await self.apply_artifact(
        MigrationArtifact.read(path),
        allow_destructive=allow_destructive,
    )

apply_directory async

apply_directory(
    path, *, pattern=None, allow_destructive=False
)

Apply migration artifacts in filename order.

Source code in ormdantic/migrations.py
async def apply_directory(
    self,
    path: str | PathLike[str],
    *,
    pattern: str | None = None,
    allow_destructive: bool = False,
) -> list[str]:
    """Apply migration artifacts in filename order."""
    if await self.is_dirty():
        raise ValueError(
            "migration history is dirty; run `ormdantic migrations repair` before apply-dir"
        )
    applied = []
    applied_set = set(await self.applied_revisions())
    for artifact_path in _migration_files(path, pattern):
        artifact = MigrationArtifact.read(artifact_path)
        missing_dependencies = [
            revision
            for revision in artifact.depends_on
            if revision not in applied_set
        ]
        if missing_dependencies:
            raise ValueError(
                f"migration {artifact.revision} has missing dependencies: "
                + ", ".join(missing_dependencies)
            )
        if await self.apply_artifact(
            artifact,
            allow_destructive=allow_destructive,
        ):
            applied.append(artifact.revision)
            applied_set.add(artifact.revision)
    return applied

apply async

apply(
    revision,
    plan,
    *,
    allow_destructive=False,
    checksum=None,
    description=None,
    artifact_version=MIGRATION_ARTIFACT_VERSION,
    metadata=None
)

Apply a migration plan and record its revision.

Returns False when the revision is already recorded.

Source code in ormdantic/migrations.py
async def apply(
    self,
    revision: str,
    plan: MigrationPlan,
    *,
    allow_destructive: bool = False,
    checksum: str | None = None,
    description: str | None = None,
    artifact_version: int = MIGRATION_ARTIFACT_VERSION,
    metadata: Mapping[str, Any] | None = None,
) -> bool:
    """Apply a migration plan and record its revision.

    Returns ``False`` when the revision is already recorded.
    """
    connection = self._native_connection()
    dialect = self._dialect()
    _ensure_migration_history_table(connection, dialect)
    if _is_dirty(connection, dialect):
        raise ValueError(
            "database is marked dirty from a failed migration; run repair before applying"
        )
    existing = _history_entry(connection, dialect, revision)
    expected_checksum = checksum or _plan_checksum(revision, plan)
    if (
        existing is not None
        and existing.status == MIGRATION_STATUS_APPLIED
        and not existing.dirty
    ):
        if existing.checksum and existing.checksum != expected_checksum:
            raise ValueError(
                f"revision {revision} already applied with checksum "
                f"{existing.checksum}, requested {expected_checksum}"
            )
        return False
    if plan.has_destructive_operations and not allow_destructive:
        raise ValueError(
            "migration contains destructive operations; pass "
            "allow_destructive=True to apply it"
        )
    _raise_if_unsupported_sqlite_plan(dialect, plan.operations)
    _run_migration_operations(
        connection=connection,
        dialect=dialect,
        revision=revision,
        operations=plan.operations,
        status=MIGRATION_STATUS_APPLIED,
        description=description,
        checksum=expected_checksum,
        artifact_version=artifact_version,
        metadata=dict(metadata or {}),
    )
    return True

rollback async

rollback(
    revision,
    plan,
    *,
    allow_destructive=False,
    checksum=None,
    description=None,
    artifact_version=MIGRATION_ARTIFACT_VERSION,
    metadata=None
)

Run rollback SQL and remove a migration revision.

Source code in ormdantic/migrations.py
async def rollback(
    self,
    revision: str,
    plan: MigrationPlan,
    *,
    allow_destructive: bool = False,
    checksum: str | None = None,
    description: str | None = None,
    artifact_version: int = MIGRATION_ARTIFACT_VERSION,
    metadata: Mapping[str, Any] | None = None,
) -> bool:
    """Run rollback SQL and remove a migration revision."""
    connection = self._native_connection()
    dialect = self._dialect()
    _ensure_migration_history_table(connection, dialect)
    existing = _history_entry(connection, dialect, revision)
    if existing is None or existing.status != MIGRATION_STATUS_APPLIED:
        return False
    if not plan.rollback_available:
        raise ValueError(
            "rollback SQL is unavailable for this migration; "
            "provide explicit down SQL or generated rollback operations"
        )
    rollback_plan = MigrationPlan(
        operations=list(plan.rollback_operations),
        diff=plan.diff,
        warnings=plan.warnings,
        safety=plan.safety,
    )
    if rollback_plan.has_destructive_operations and not allow_destructive:
        raise ValueError(
            "rollback contains destructive operations; pass "
            "allow_destructive=True to apply it"
        )
    _raise_if_unsupported_sqlite_plan(dialect, rollback_plan.operations)
    _run_migration_operations(
        connection=connection,
        dialect=dialect,
        revision=revision,
        operations=rollback_plan.operations,
        status=MIGRATION_STATUS_ROLLED_BACK,
        description=description or existing.description,
        checksum=checksum
        or existing.checksum
        or _plan_checksum(revision, rollback_plan),
        artifact_version=artifact_version,
        metadata=dict(metadata or {}),
    )
    return True

rollback_artifact async

rollback_artifact(artifact, *, allow_destructive=False)

Roll back a migration artifact when rollback SQL is available.

Source code in ormdantic/migrations.py
async def rollback_artifact(
    self,
    artifact: MigrationArtifact | Mapping[str, Any] | str | PathLike[str],
    *,
    allow_destructive: bool = False,
) -> bool:
    """Roll back a migration artifact when rollback SQL is available."""
    migration = _coerce_artifact(artifact)
    return await self.rollback(
        migration.revision,
        migration.to_plan(),
        allow_destructive=allow_destructive,
        checksum=migration.checksum,
        description=migration.description,
        artifact_version=migration.artifact_version,
        metadata=migration.metadata,
    )

rollback_file async

rollback_file(path, *, allow_destructive=False)

Roll back a migration artifact from disk.

Source code in ormdantic/migrations.py
async def rollback_file(
    self, path: str | PathLike[str], *, allow_destructive: bool = False
) -> bool:
    """Roll back a migration artifact from disk."""
    return await self.rollback_artifact(
        MigrationArtifact.read(path),
        allow_destructive=allow_destructive,
    )

squash

squash(revision, artifacts, *, dialect=None, path=None)

Squash contiguous migration artifacts into one net migration.

Source code in ormdantic/migrations.py
def squash(
    self,
    revision: str,
    artifacts: Sequence[
        MigrationArtifact | Mapping[str, Any] | str | PathLike[str]
    ],
    *,
    dialect: str | None = None,
    path: str | PathLike[str] | None = None,
) -> MigrationArtifact:
    """Squash contiguous migration artifacts into one net migration."""
    artifact = squash_migrations(
        revision,
        artifacts,
        dialect=dialect or self._database._connection,
    ).with_checksum()
    if path is not None:
        artifact.write(path)
    return artifact

Snapshot Types

The snapshot and plan models live in ormdantic._migrations.models.

ormdantic._migrations.models.SchemaSnapshot dataclass

SchemaSnapshot(
    tables=list(),
    namespaces=list(),
    enum_types=list(),
    sequences=list(),
    views=list(),
    version=1,
)

Serializable database schema snapshot.

tables class-attribute instance-attribute

tables = field(default_factory=list)

namespaces class-attribute instance-attribute

namespaces = field(default_factory=list)

enum_types class-attribute instance-attribute

enum_types = field(default_factory=list)

sequences class-attribute instance-attribute

sequences = field(default_factory=list)

views class-attribute instance-attribute

views = field(default_factory=list)

version class-attribute instance-attribute

version = 1

empty classmethod

empty()
Source code in ormdantic/_migrations/models.py
@classmethod
def empty(cls) -> "SchemaSnapshot":
    return cls()

from_runtime classmethod

from_runtime(tables)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, tables: Sequence[Sequence[Any]]) -> "SchemaSnapshot":
    return cls([TableSnapshot.from_runtime(table) for table in tables])

from_database classmethod

from_database(
    database, *, native_enum_types=False, enum_schema=None
)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_database(
    cls,
    database: Any,
    *,
    native_enum_types: bool = False,
    enum_schema: str | None = None,
) -> "SchemaSnapshot":
    prepare_database_relationships(database)
    enum_types: list[EnumTypeSnapshot] = []
    if native_enum_types:
        from ormdantic.schema import enum_type_descriptors

        enum_types = [
            EnumTypeSnapshot.from_runtime(enum_type)
            for enum_type in enum_type_descriptors(
                database._table_map,
                schema=enum_schema,
            )
        ]
    namespaces = [
        NamespaceSnapshot.from_runtime(namespace)
        for namespace in database._runtime_namespace_specs()
    ]
    sequences = [
        SequenceSnapshot.from_runtime(sequence)
        for sequence in database._runtime_sequence_specs()
    ]
    views = [
        ViewSnapshot.from_runtime(view) for view in database._runtime_view_specs()
    ]
    return cls(
        tables=[
            TableSnapshot.from_runtime(table)
            for table in database._runtime_table_specs(
                native_enum_types=native_enum_types,
                enum_schema=enum_schema,
            )
        ],
        namespaces=namespaces,
        enum_types=enum_types,
        sequences=sequences,
        views=views,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "SchemaSnapshot":
    return cls(
        tables=[
            TableSnapshot.from_dict(table) for table in payload.get("tables", [])
        ],
        namespaces=[
            NamespaceSnapshot.from_dict(namespace)
            for namespace in payload.get("namespaces", [])
        ],
        enum_types=[
            EnumTypeSnapshot.from_dict(enum_type)
            for enum_type in payload.get("enum_types", [])
        ],
        sequences=[
            SequenceSnapshot.from_dict(sequence)
            for sequence in payload.get("sequences", [])
        ],
        views=[ViewSnapshot.from_dict(view) for view in payload.get("views", [])],
        version=int(payload.get("version", 1)),
    )

from_json classmethod

from_json(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_json(cls, payload: str | bytes | bytearray) -> "SchemaSnapshot":
    return cls.from_dict(json.loads(payload))

from_toml classmethod

from_toml(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_toml(cls, payload: str | bytes | bytearray) -> "SchemaSnapshot":
    return cls.from_dict(toml_loads(payload))

read classmethod

read(path, *, format=None)
Source code in ormdantic/_migrations/models.py
@classmethod
def read(
    cls, path: str | PathLike[str], *, format: str | None = None
) -> "SchemaSnapshot":
    input_path = Path(path)
    document = input_path.read_text()
    if document_format(str(input_path), format) == "toml":
        return cls.from_toml(document)
    return cls.from_json(document)

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> list[RuntimeTableSpec]:
    return [table.to_runtime() for table in self.tables]

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "version": self.version,
        "tables": [table.to_dict() for table in self.tables],
    }
    if self.namespaces:
        payload["namespaces"] = [
            namespace.to_dict() for namespace in self.namespaces
        ]
    if self.enum_types:
        payload["enum_types"] = [
            enum_type.to_dict() for enum_type in self.enum_types
        ]
    if self.sequences:
        payload["sequences"] = [sequence.to_dict() for sequence in self.sequences]
    if self.views:
        payload["views"] = [view.to_dict() for view in self.views]
    return payload

to_json

to_json(*, indent=2)
Source code in ormdantic/_migrations/models.py
def to_json(self, *, indent: int | None = 2) -> str:
    return json.dumps(self.to_dict(), indent=indent, sort_keys=True)

to_toml

to_toml()
Source code in ormdantic/_migrations/models.py
def to_toml(self) -> str:
    return toml_dumps(self.to_dict())

write

write(path, *, format=None)
Source code in ormdantic/_migrations/models.py
def write(self, path: str | PathLike[str], *, format: str | None = None) -> None:
    output = Path(path)
    output.parent.mkdir(parents=True, exist_ok=True)
    if document_format(str(output), format) == "toml":
        output.write_text(self.to_toml())
    else:
        output.write_text(self.to_json())

ormdantic._migrations.models.TableSnapshot dataclass

TableSnapshot(
    model_key,
    name,
    primary_key,
    schema=None,
    columns=list(),
    indexes=list(),
    unique_constraints=list(),
    named_unique_constraints=list(),
    check_constraints=list(),
    foreign_key_constraints=list(),
    exclusion_constraints=list(),
    relationships=list(),
    comment=None,
    tablespace=None,
    mysql_engine=None,
    mysql_charset=None,
    mysql_collation=None,
    mysql_row_format=None,
    mysql_key_block_size=None,
    mysql_pack_keys=None,
    mysql_checksum=None,
    mysql_delay_key_write=None,
    mysql_stats_persistent=None,
    mysql_stats_auto_recalc=None,
    mysql_stats_sample_pages=None,
    mysql_avg_row_length=None,
    mysql_max_rows=None,
    mysql_min_rows=None,
    mysql_insert_method=None,
    mysql_data_directory=None,
    mysql_index_directory=None,
    mysql_connection=None,
    mysql_union=list(),
    mysql_partition_by=None,
    mysql_partitions=None,
    mysql_subpartition_by=None,
    mysql_subpartitions=None,
    mysql_auto_increment=None,
    postgres_inherits=list(),
    postgres_with=list(),
    postgres_using=None,
    postgres_partition_by=None,
    postgres_partition_of=None,
    postgres_partition_for=None,
    postgres_unlogged=False,
    sqlite_strict=False,
    sqlite_without_rowid=False,
    oracle_compress=None,
)

Serializable table metadata used by migration snapshots.

model_key instance-attribute

model_key

name instance-attribute

name

primary_key instance-attribute

primary_key

schema class-attribute instance-attribute

schema = None

columns class-attribute instance-attribute

columns = field(default_factory=list)

indexes class-attribute instance-attribute

indexes = field(default_factory=list)

unique_constraints class-attribute instance-attribute

unique_constraints = field(default_factory=list)

named_unique_constraints class-attribute instance-attribute

named_unique_constraints = field(default_factory=list)

check_constraints class-attribute instance-attribute

check_constraints = field(default_factory=list)

foreign_key_constraints class-attribute instance-attribute

foreign_key_constraints = field(default_factory=list)

exclusion_constraints class-attribute instance-attribute

exclusion_constraints = field(default_factory=list)

relationships class-attribute instance-attribute

relationships = field(default_factory=list)

comment class-attribute instance-attribute

comment = None

tablespace class-attribute instance-attribute

tablespace = None

mysql_engine class-attribute instance-attribute

mysql_engine = None

mysql_charset class-attribute instance-attribute

mysql_charset = None

mysql_collation class-attribute instance-attribute

mysql_collation = None

mysql_row_format class-attribute instance-attribute

mysql_row_format = None

mysql_key_block_size class-attribute instance-attribute

mysql_key_block_size = None

mysql_pack_keys class-attribute instance-attribute

mysql_pack_keys = None

mysql_checksum class-attribute instance-attribute

mysql_checksum = None

mysql_delay_key_write class-attribute instance-attribute

mysql_delay_key_write = None

mysql_stats_persistent class-attribute instance-attribute

mysql_stats_persistent = None

mysql_stats_auto_recalc class-attribute instance-attribute

mysql_stats_auto_recalc = None

mysql_stats_sample_pages class-attribute instance-attribute

mysql_stats_sample_pages = None

mysql_avg_row_length class-attribute instance-attribute

mysql_avg_row_length = None

mysql_max_rows class-attribute instance-attribute

mysql_max_rows = None

mysql_min_rows class-attribute instance-attribute

mysql_min_rows = None

mysql_insert_method class-attribute instance-attribute

mysql_insert_method = None

mysql_data_directory class-attribute instance-attribute

mysql_data_directory = None

mysql_index_directory class-attribute instance-attribute

mysql_index_directory = None

mysql_connection class-attribute instance-attribute

mysql_connection = None

mysql_union class-attribute instance-attribute

mysql_union = field(default_factory=list)

mysql_partition_by class-attribute instance-attribute

mysql_partition_by = None

mysql_partitions class-attribute instance-attribute

mysql_partitions = None

mysql_subpartition_by class-attribute instance-attribute

mysql_subpartition_by = None

mysql_subpartitions class-attribute instance-attribute

mysql_subpartitions = None

mysql_auto_increment class-attribute instance-attribute

mysql_auto_increment = None

postgres_inherits class-attribute instance-attribute

postgres_inherits = field(default_factory=list)

postgres_with class-attribute instance-attribute

postgres_with = field(default_factory=list)

postgres_using class-attribute instance-attribute

postgres_using = None

postgres_partition_by class-attribute instance-attribute

postgres_partition_by = None

postgres_partition_of class-attribute instance-attribute

postgres_partition_of = None

postgres_partition_for class-attribute instance-attribute

postgres_partition_for = None

postgres_unlogged class-attribute instance-attribute

postgres_unlogged = False

sqlite_strict class-attribute instance-attribute

sqlite_strict = False

sqlite_without_rowid class-attribute instance-attribute

sqlite_without_rowid = False

oracle_compress class-attribute instance-attribute

oracle_compress = None

from_runtime classmethod

from_runtime(table)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, table: Sequence[Any]) -> "TableSnapshot":
    sqlite_strict = False
    sqlite_without_rowid = False
    schema = None
    if len(table) > 11:
        named_unique_constraints = table[6]
        check_constraints = table[7]
        foreign_key_constraints = table[8]
        exclusion_constraints = table[9]
        if is_runtime_table_options(table[10]):
            comment = optional_str(table[10][0])
            tablespace = optional_str(table[10][1])
            mysql_engine = (
                optional_str(table[10][2]) if len(table[10]) > 2 else None
            )
            mysql_charset = (
                optional_str(table[10][3]) if len(table[10]) > 3 else None
            )
            mysql_collation = (
                optional_str(table[10][4]) if len(table[10]) > 4 else None
            )
            new_mysql_options = len(table[10]) > 11
            mysql_row_format = (
                optional_str(table[10][5]) if new_mysql_options else None
            )
            postgres_offset = 1 if new_mysql_options else 0
            postgres_inherits = (
                [str(parent) for parent in table[10][5 + postgres_offset]]
                if len(table[10]) > 5 + postgres_offset
                else []
            )
            postgres_with = (
                postgres_storage_parameters(table[10][6 + postgres_offset])
                if len(table[10]) > 6 + postgres_offset
                else []
            )
            postgres_using = (
                optional_str(table[10][7 + postgres_offset])
                if len(table[10]) > 7 + postgres_offset
                else None
            )
            postgres_partition_by = (
                optional_str(table[10][8 + postgres_offset])
                if len(table[10]) > 8 + postgres_offset
                else None
            )
            postgres_partition_of = (
                optional_str(table[10][9 + postgres_offset])
                if len(table[10]) > 9 + postgres_offset
                else None
            )
            postgres_partition_for = (
                optional_str(table[10][10 + postgres_offset])
                if len(table[10]) > 10 + postgres_offset
                else None
            )
            postgres_unlogged = (
                bool(table[10][11 + postgres_offset])
                if len(table[10]) > 11 + postgres_offset
                else False
            )
            sqlite_strict = (
                bool(table[10][12 + postgres_offset])
                if len(table[10]) > 12 + postgres_offset
                else False
            )
            sqlite_without_rowid = (
                bool(table[10][13 + postgres_offset])
                if len(table[10]) > 13 + postgres_offset
                else False
            )
            schema = (
                optional_str(table[10][14 + postgres_offset])
                if len(table[10]) > 14 + postgres_offset
                else None
            )
            oracle_compress = (
                oracle_table_compress(table[10][16 + postgres_offset])
                if len(table[10]) > 16 + postgres_offset
                else None
            )
            mysql_key_block_size = (
                optional_int(table[10][17 + postgres_offset])
                if len(table[10]) > 17 + postgres_offset
                else None
            )
            mysql_pack_keys = (
                optional_bool(table[10][18 + postgres_offset])
                if len(table[10]) > 18 + postgres_offset
                else None
            )
            mysql_checksum = (
                optional_bool(table[10][19 + postgres_offset])
                if len(table[10]) > 19 + postgres_offset
                else None
            )
            mysql_delay_key_write = (
                optional_bool(table[10][20 + postgres_offset])
                if len(table[10]) > 20 + postgres_offset
                else None
            )
            mysql_stats_persistent = (
                optional_bool(table[10][21 + postgres_offset])
                if len(table[10]) > 21 + postgres_offset
                else None
            )
            mysql_stats_auto_recalc = (
                optional_bool(table[10][22 + postgres_offset])
                if len(table[10]) > 22 + postgres_offset
                else None
            )
            mysql_stats_sample_pages = (
                optional_int(table[10][23 + postgres_offset])
                if len(table[10]) > 23 + postgres_offset
                else None
            )
            mysql_avg_row_length = (
                optional_int(table[10][24 + postgres_offset])
                if len(table[10]) > 24 + postgres_offset
                else None
            )
            mysql_max_rows = (
                optional_int(table[10][25 + postgres_offset])
                if len(table[10]) > 25 + postgres_offset
                else None
            )
            mysql_min_rows = (
                optional_int(table[10][26 + postgres_offset])
                if len(table[10]) > 26 + postgres_offset
                else None
            )
            mysql_insert_method = (
                optional_str(table[10][27 + postgres_offset])
                if len(table[10]) > 27 + postgres_offset
                else None
            )
            mysql_data_directory = (
                optional_str(table[10][28 + postgres_offset])
                if len(table[10]) > 28 + postgres_offset
                else None
            )
            mysql_index_directory = (
                optional_str(table[10][29 + postgres_offset])
                if len(table[10]) > 29 + postgres_offset
                else None
            )
            mysql_connection = (
                optional_str(table[10][30 + postgres_offset])
                if len(table[10]) > 30 + postgres_offset
                else None
            )
            mysql_union = (
                string_list(table[10][31 + postgres_offset])
                if len(table[10]) > 31 + postgres_offset
                else []
            )
            mysql_partition_by = (
                optional_str(table[10][32 + postgres_offset])
                if len(table[10]) > 32 + postgres_offset
                else None
            )
            mysql_partitions = (
                optional_int(table[10][33 + postgres_offset])
                if len(table[10]) > 33 + postgres_offset
                else None
            )
            mysql_subpartition_by = (
                optional_str(table[10][34 + postgres_offset])
                if len(table[10]) > 34 + postgres_offset
                else None
            )
            mysql_subpartitions = (
                optional_int(table[10][35 + postgres_offset])
                if len(table[10]) > 35 + postgres_offset
                else None
            )
            mysql_auto_increment = (
                optional_int(table[10][36 + postgres_offset])
                if len(table[10]) > 36 + postgres_offset
                else None
            )
        else:
            comment = optional_str(table[10])
            tablespace = None
            mysql_engine = None
            mysql_charset = None
            mysql_collation = None
            mysql_row_format = None
            mysql_key_block_size = None
            mysql_pack_keys = None
            mysql_checksum = None
            mysql_delay_key_write = None
            mysql_stats_persistent = None
            mysql_stats_auto_recalc = None
            mysql_stats_sample_pages = None
            mysql_avg_row_length = None
            mysql_max_rows = None
            mysql_min_rows = None
            mysql_insert_method = None
            mysql_data_directory = None
            mysql_index_directory = None
            mysql_connection = None
            mysql_union = []
            mysql_partition_by = None
            mysql_partitions = None
            mysql_subpartition_by = None
            mysql_subpartitions = None
            mysql_auto_increment = None
            postgres_inherits = []
            postgres_with = []
            postgres_using = None
            postgres_partition_by = None
            postgres_partition_of = None
            postgres_partition_for = None
            postgres_unlogged = False
            schema = None
            oracle_compress = None
        relationships = table[11]
    elif len(table) > 10:
        named_unique_constraints = table[6]
        check_constraints = table[7]
        foreign_key_constraints = table[8]
        exclusion_constraints = table[9]
        comment = None
        tablespace = None
        mysql_engine = None
        mysql_charset = None
        mysql_collation = None
        mysql_row_format = None
        mysql_key_block_size = None
        mysql_pack_keys = None
        mysql_checksum = None
        mysql_delay_key_write = None
        mysql_stats_persistent = None
        mysql_stats_auto_recalc = None
        mysql_stats_sample_pages = None
        mysql_avg_row_length = None
        mysql_max_rows = None
        mysql_min_rows = None
        mysql_insert_method = None
        mysql_data_directory = None
        mysql_index_directory = None
        mysql_connection = None
        mysql_union = []
        mysql_partition_by = None
        mysql_partitions = None
        mysql_subpartition_by = None
        mysql_subpartitions = None
        mysql_auto_increment = None
        postgres_inherits = []
        postgres_with = []
        postgres_using = None
        postgres_partition_by = None
        postgres_partition_of = None
        postgres_partition_for = None
        postgres_unlogged = False
        schema = None
        oracle_compress = None
        relationships = table[10]
    elif len(table) > 9:
        named_unique_constraints = table[6]
        check_constraints = table[7]
        foreign_key_constraints = table[8]
        exclusion_constraints = []
        comment = None
        tablespace = None
        mysql_engine = None
        mysql_charset = None
        mysql_collation = None
        mysql_row_format = None
        mysql_key_block_size = None
        mysql_pack_keys = None
        mysql_checksum = None
        mysql_delay_key_write = None
        mysql_stats_persistent = None
        mysql_stats_auto_recalc = None
        mysql_stats_sample_pages = None
        mysql_avg_row_length = None
        mysql_max_rows = None
        mysql_min_rows = None
        mysql_insert_method = None
        mysql_data_directory = None
        mysql_index_directory = None
        mysql_connection = None
        mysql_union = []
        mysql_partition_by = None
        mysql_partitions = None
        mysql_subpartition_by = None
        mysql_subpartitions = None
        mysql_auto_increment = None
        postgres_inherits = []
        postgres_with = []
        postgres_using = None
        postgres_partition_by = None
        postgres_partition_of = None
        postgres_partition_for = None
        postgres_unlogged = False
        schema = None
        oracle_compress = None
        relationships = table[9]
    elif len(table) > 8:
        named_unique_constraints = table[6]
        check_constraints = table[7]
        foreign_key_constraints = []
        exclusion_constraints = []
        comment = None
        tablespace = None
        mysql_engine = None
        mysql_charset = None
        mysql_collation = None
        mysql_row_format = None
        mysql_key_block_size = None
        mysql_pack_keys = None
        mysql_checksum = None
        mysql_delay_key_write = None
        mysql_stats_persistent = None
        mysql_stats_auto_recalc = None
        mysql_stats_sample_pages = None
        mysql_avg_row_length = None
        mysql_max_rows = None
        mysql_min_rows = None
        mysql_insert_method = None
        mysql_data_directory = None
        mysql_index_directory = None
        mysql_connection = None
        mysql_union = []
        mysql_partition_by = None
        mysql_partitions = None
        mysql_subpartition_by = None
        mysql_subpartitions = None
        mysql_auto_increment = None
        postgres_inherits = []
        postgres_with = []
        postgres_using = None
        postgres_partition_by = None
        postgres_partition_of = None
        postgres_partition_for = None
        postgres_unlogged = False
        schema = None
        oracle_compress = None
        relationships = table[8]
    elif len(table) > 7:
        named_unique_constraints = []
        check_constraints = table[6]
        foreign_key_constraints = []
        exclusion_constraints = []
        comment = None
        tablespace = None
        mysql_engine = None
        mysql_charset = None
        mysql_collation = None
        mysql_row_format = None
        mysql_key_block_size = None
        mysql_pack_keys = None
        mysql_checksum = None
        mysql_delay_key_write = None
        mysql_stats_persistent = None
        mysql_stats_auto_recalc = None
        mysql_stats_sample_pages = None
        mysql_avg_row_length = None
        mysql_max_rows = None
        mysql_min_rows = None
        mysql_insert_method = None
        mysql_data_directory = None
        mysql_index_directory = None
        mysql_connection = None
        mysql_union = []
        mysql_partition_by = None
        mysql_partitions = None
        mysql_subpartition_by = None
        mysql_subpartitions = None
        mysql_auto_increment = None
        postgres_inherits = []
        postgres_with = []
        postgres_using = None
        postgres_partition_by = None
        postgres_partition_of = None
        postgres_partition_for = None
        postgres_unlogged = False
        schema = None
        oracle_compress = None
        relationships = table[7]
    else:
        named_unique_constraints = []
        check_constraints = []
        foreign_key_constraints = []
        exclusion_constraints = []
        comment = None
        tablespace = None
        mysql_engine = None
        mysql_charset = None
        mysql_collation = None
        mysql_row_format = None
        mysql_key_block_size = None
        mysql_pack_keys = None
        mysql_checksum = None
        mysql_delay_key_write = None
        mysql_stats_persistent = None
        mysql_stats_auto_recalc = None
        mysql_stats_sample_pages = None
        mysql_avg_row_length = None
        mysql_max_rows = None
        mysql_min_rows = None
        mysql_insert_method = None
        mysql_data_directory = None
        mysql_index_directory = None
        mysql_connection = None
        mysql_union = []
        mysql_partition_by = None
        mysql_partitions = None
        mysql_subpartition_by = None
        mysql_subpartitions = None
        mysql_auto_increment = None
        postgres_inherits = []
        postgres_with = []
        postgres_using = None
        postgres_partition_by = None
        postgres_partition_of = None
        postgres_partition_for = None
        postgres_unlogged = False
        schema = None
        oracle_compress = None
        relationships = table[6]
    return cls(
        model_key=str(table[0]),
        name=str(table[1]),
        primary_key=str(table[2]),
        schema=schema,
        comment=comment,
        tablespace=tablespace,
        mysql_engine=mysql_engine,
        mysql_charset=mysql_charset,
        mysql_collation=mysql_collation,
        mysql_row_format=mysql_row_format,
        mysql_key_block_size=mysql_key_block_size,
        mysql_pack_keys=mysql_pack_keys,
        mysql_checksum=mysql_checksum,
        mysql_delay_key_write=mysql_delay_key_write,
        mysql_stats_persistent=mysql_stats_persistent,
        mysql_stats_auto_recalc=mysql_stats_auto_recalc,
        mysql_stats_sample_pages=mysql_stats_sample_pages,
        mysql_avg_row_length=mysql_avg_row_length,
        mysql_max_rows=mysql_max_rows,
        mysql_min_rows=mysql_min_rows,
        mysql_insert_method=mysql_insert_method,
        mysql_data_directory=mysql_data_directory,
        mysql_index_directory=mysql_index_directory,
        mysql_connection=mysql_connection,
        mysql_union=mysql_union,
        mysql_partition_by=mysql_partition_by,
        mysql_partitions=mysql_partitions,
        mysql_subpartition_by=mysql_subpartition_by,
        mysql_subpartitions=mysql_subpartitions,
        mysql_auto_increment=mysql_auto_increment,
        postgres_inherits=postgres_inherits,
        postgres_with=postgres_with,
        postgres_using=postgres_using,
        postgres_partition_by=postgres_partition_by,
        postgres_partition_of=postgres_partition_of,
        postgres_partition_for=postgres_partition_for,
        postgres_unlogged=postgres_unlogged,
        sqlite_strict=sqlite_strict,
        sqlite_without_rowid=sqlite_without_rowid,
        oracle_compress=oracle_compress,
        columns=[ColumnSnapshot.from_runtime(column) for column in table[3]],
        indexes=[IndexSnapshot.from_runtime(index) for index in table[4]],
        unique_constraints=[
            [str(column) for column in columns] for columns in table[5]
        ],
        named_unique_constraints=[
            UniqueConstraintSnapshot.from_runtime(constraint)
            for constraint in named_unique_constraints
        ],
        check_constraints=[
            TableCheckSnapshot.from_runtime(check) for check in check_constraints
        ],
        foreign_key_constraints=[
            ForeignKeyConstraintSnapshot.from_runtime(constraint)
            for constraint in foreign_key_constraints
        ],
        exclusion_constraints=[
            ExclusionConstraintSnapshot.from_runtime(constraint)
            for constraint in exclusion_constraints
        ],
        relationships=[
            RelationshipSnapshot.from_runtime(relationship)
            for relationship in relationships
        ],
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "TableSnapshot":
    return cls(
        model_key=str(payload["model_key"]),
        name=str(payload["name"]),
        primary_key=str(payload["primary_key"]),
        schema=optional_str(payload.get("schema")),
        comment=optional_str(payload.get("comment")),
        tablespace=optional_str(payload.get("tablespace")),
        mysql_engine=optional_str(payload.get("mysql_engine")),
        mysql_charset=optional_str(payload.get("mysql_charset")),
        mysql_collation=optional_str(payload.get("mysql_collation")),
        mysql_row_format=optional_str(payload.get("mysql_row_format")),
        mysql_key_block_size=optional_int(payload.get("mysql_key_block_size")),
        mysql_pack_keys=optional_bool(payload.get("mysql_pack_keys")),
        mysql_checksum=optional_bool(payload.get("mysql_checksum")),
        mysql_delay_key_write=optional_bool(payload.get("mysql_delay_key_write")),
        mysql_stats_persistent=optional_bool(payload.get("mysql_stats_persistent")),
        mysql_stats_auto_recalc=optional_bool(
            payload.get("mysql_stats_auto_recalc")
        ),
        mysql_stats_sample_pages=optional_int(
            payload.get("mysql_stats_sample_pages")
        ),
        mysql_avg_row_length=optional_int(payload.get("mysql_avg_row_length")),
        mysql_max_rows=optional_int(payload.get("mysql_max_rows")),
        mysql_min_rows=optional_int(payload.get("mysql_min_rows")),
        mysql_insert_method=optional_str(payload.get("mysql_insert_method")),
        mysql_data_directory=optional_str(payload.get("mysql_data_directory")),
        mysql_index_directory=optional_str(payload.get("mysql_index_directory")),
        mysql_connection=optional_str(payload.get("mysql_connection")),
        mysql_union=string_list(payload.get("mysql_union", [])),
        mysql_partition_by=optional_str(payload.get("mysql_partition_by")),
        mysql_partitions=optional_int(payload.get("mysql_partitions")),
        mysql_subpartition_by=optional_str(payload.get("mysql_subpartition_by")),
        mysql_subpartitions=optional_int(payload.get("mysql_subpartitions")),
        mysql_auto_increment=optional_int(payload.get("mysql_auto_increment")),
        postgres_inherits=[
            str(parent) for parent in payload.get("postgres_inherits", [])
        ],
        postgres_with=postgres_storage_parameters(payload.get("postgres_with", [])),
        postgres_using=optional_str(payload.get("postgres_using")),
        postgres_partition_by=optional_str(payload.get("postgres_partition_by")),
        postgres_partition_of=optional_str(payload.get("postgres_partition_of")),
        postgres_partition_for=optional_str(payload.get("postgres_partition_for")),
        postgres_unlogged=bool(payload.get("postgres_unlogged", False)),
        sqlite_strict=bool(payload.get("sqlite_strict", False)),
        sqlite_without_rowid=bool(payload.get("sqlite_without_rowid", False)),
        oracle_compress=oracle_table_compress(payload.get("oracle_compress")),
        columns=[
            ColumnSnapshot.from_dict(column)
            for column in payload.get("columns", [])
        ],
        indexes=[
            IndexSnapshot.from_dict(index) for index in payload.get("indexes", [])
        ],
        unique_constraints=[
            [str(column) for column in columns]
            for columns in payload.get("unique_constraints", [])
        ],
        named_unique_constraints=[
            UniqueConstraintSnapshot.from_dict(constraint)
            for constraint in payload.get("named_unique_constraints", [])
        ],
        check_constraints=[
            TableCheckSnapshot.from_dict(check)
            for check in payload.get("check_constraints", [])
        ],
        foreign_key_constraints=[
            ForeignKeyConstraintSnapshot.from_dict(constraint)
            for constraint in payload.get("foreign_key_constraints", [])
        ],
        exclusion_constraints=[
            ExclusionConstraintSnapshot.from_dict(constraint)
            for constraint in payload.get("exclusion_constraints", [])
        ],
        relationships=[
            RelationshipSnapshot.from_dict(relationship)
            for relationship in payload.get("relationships", [])
        ],
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeTableSpec:
    return (
        self.model_key,
        self.name,
        self.primary_key,
        [column.to_runtime() for column in self.columns],
        [index.to_runtime() for index in self.indexes],
        [list(columns) for columns in self.unique_constraints],
        [constraint.to_runtime() for constraint in self.named_unique_constraints],
        [check.to_runtime() for check in self.check_constraints],
        [constraint.to_runtime() for constraint in self.foreign_key_constraints],
        [constraint.to_runtime() for constraint in self.exclusion_constraints],
        (
            self.comment,
            self.tablespace,
            self.mysql_engine,
            self.mysql_charset,
            self.mysql_collation,
            self.mysql_row_format,
            list(self.postgres_inherits),
            list(self.postgres_with),
            self.postgres_using,
            self.postgres_partition_by,
            self.postgres_partition_of,
            self.postgres_partition_for,
            self.postgres_unlogged,
            self.sqlite_strict,
            self.sqlite_without_rowid,
            self.schema,
            any(index.mssql_clustered for index in self.indexes)
            or any(
                constraint.mssql_clustered is True
                for constraint in self.named_unique_constraints
            ),
            oracle_table_compress_runtime(self.oracle_compress),
            self.mysql_key_block_size,
            self.mysql_pack_keys,
            self.mysql_checksum,
            self.mysql_delay_key_write,
            self.mysql_stats_persistent,
            self.mysql_stats_auto_recalc,
            self.mysql_stats_sample_pages,
            self.mysql_avg_row_length,
            self.mysql_max_rows,
            self.mysql_min_rows,
            self.mysql_insert_method,
            self.mysql_data_directory,
            self.mysql_index_directory,
            self.mysql_connection,
            list(self.mysql_union),
            self.mysql_partition_by,
            self.mysql_partitions,
            self.mysql_subpartition_by,
            self.mysql_subpartitions,
            self.mysql_auto_increment,
        ),
        [relationship.to_runtime() for relationship in self.relationships],
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "model_key": self.model_key,
        "name": self.name,
        "primary_key": self.primary_key,
        "columns": [column.to_dict() for column in self.columns],
        "indexes": [index.to_dict() for index in self.indexes],
        "unique_constraints": [
            list(columns) for columns in self.unique_constraints
        ],
        "named_unique_constraints": [
            constraint.to_dict() for constraint in self.named_unique_constraints
        ],
        "check_constraints": [check.to_dict() for check in self.check_constraints],
        "foreign_key_constraints": [
            constraint.to_dict() for constraint in self.foreign_key_constraints
        ],
        "exclusion_constraints": [
            constraint.to_dict() for constraint in self.exclusion_constraints
        ],
        "relationships": [
            relationship.to_dict() for relationship in self.relationships
        ],
    }
    if self.schema is not None:
        payload["schema"] = self.schema
    if self.comment is not None:
        payload["comment"] = self.comment
    if self.tablespace is not None:
        payload["tablespace"] = self.tablespace
    if self.mysql_engine is not None:
        payload["mysql_engine"] = self.mysql_engine
    if self.mysql_charset is not None:
        payload["mysql_charset"] = self.mysql_charset
    if self.mysql_collation is not None:
        payload["mysql_collation"] = self.mysql_collation
    if self.mysql_row_format is not None:
        payload["mysql_row_format"] = self.mysql_row_format
    if self.mysql_key_block_size is not None:
        payload["mysql_key_block_size"] = self.mysql_key_block_size
    if self.mysql_pack_keys is not None:
        payload["mysql_pack_keys"] = self.mysql_pack_keys
    if self.mysql_checksum is not None:
        payload["mysql_checksum"] = self.mysql_checksum
    if self.mysql_delay_key_write is not None:
        payload["mysql_delay_key_write"] = self.mysql_delay_key_write
    if self.mysql_stats_persistent is not None:
        payload["mysql_stats_persistent"] = self.mysql_stats_persistent
    if self.mysql_stats_auto_recalc is not None:
        payload["mysql_stats_auto_recalc"] = self.mysql_stats_auto_recalc
    if self.mysql_stats_sample_pages is not None:
        payload["mysql_stats_sample_pages"] = self.mysql_stats_sample_pages
    if self.mysql_avg_row_length is not None:
        payload["mysql_avg_row_length"] = self.mysql_avg_row_length
    if self.mysql_max_rows is not None:
        payload["mysql_max_rows"] = self.mysql_max_rows
    if self.mysql_min_rows is not None:
        payload["mysql_min_rows"] = self.mysql_min_rows
    if self.mysql_insert_method is not None:
        payload["mysql_insert_method"] = self.mysql_insert_method
    if self.mysql_data_directory is not None:
        payload["mysql_data_directory"] = self.mysql_data_directory
    if self.mysql_index_directory is not None:
        payload["mysql_index_directory"] = self.mysql_index_directory
    if self.mysql_connection is not None:
        payload["mysql_connection"] = self.mysql_connection
    if self.mysql_union:
        payload["mysql_union"] = list(self.mysql_union)
    if self.mysql_partition_by is not None:
        payload["mysql_partition_by"] = self.mysql_partition_by
    if self.mysql_partitions is not None:
        payload["mysql_partitions"] = self.mysql_partitions
    if self.mysql_subpartition_by is not None:
        payload["mysql_subpartition_by"] = self.mysql_subpartition_by
    if self.mysql_subpartitions is not None:
        payload["mysql_subpartitions"] = self.mysql_subpartitions
    if self.mysql_auto_increment is not None:
        payload["mysql_auto_increment"] = self.mysql_auto_increment
    if self.postgres_inherits:
        payload["postgres_inherits"] = list(self.postgres_inherits)
    if self.postgres_with:
        payload["postgres_with"] = [
            [name, value] for name, value in self.postgres_with
        ]
    if self.postgres_using is not None:
        payload["postgres_using"] = self.postgres_using
    if self.postgres_partition_by is not None:
        payload["postgres_partition_by"] = self.postgres_partition_by
    if self.postgres_partition_of is not None:
        payload["postgres_partition_of"] = self.postgres_partition_of
    if self.postgres_partition_for is not None:
        payload["postgres_partition_for"] = self.postgres_partition_for
    if self.postgres_unlogged:
        payload["postgres_unlogged"] = self.postgres_unlogged
    if self.sqlite_strict:
        payload["sqlite_strict"] = self.sqlite_strict
    if self.sqlite_without_rowid:
        payload["sqlite_without_rowid"] = self.sqlite_without_rowid
    if self.oracle_compress is not None:
        payload["oracle_compress"] = self.oracle_compress
    return payload

ormdantic._migrations.models.ColumnSnapshot dataclass

ColumnSnapshot(
    name,
    kind,
    nullable,
    primary_key,
    comment=None,
    foreign_table=None,
    foreign_column=None,
    max_length=None,
    unique=False,
    checks=list(),
    server_default=None,
    computed=None,
    computed_persisted=False,
    autoincrement=False,
    identity=False,
    identity_always=False,
    identity_start=None,
    identity_increment=None,
    identity_min_value=None,
    identity_max_value=None,
    identity_no_min_value=False,
    identity_no_max_value=False,
    identity_cycle=False,
    identity_cache=None,
    identity_order=False,
    identity_on_null=False,
    collation=None,
    numeric_precision=None,
    numeric_scale=None,
    foreign_key_name=None,
    on_delete=None,
    on_update=None,
    deferrable=None,
    initially_deferred=False,
    sqlite_on_conflict_primary_key=None,
    sqlite_on_conflict_not_null=None,
    sqlite_on_conflict_unique=None,
)

Serializable table column metadata used by migration snapshots.

name instance-attribute

name

kind instance-attribute

kind

nullable instance-attribute

nullable

primary_key instance-attribute

primary_key

comment class-attribute instance-attribute

comment = None

foreign_table class-attribute instance-attribute

foreign_table = None

foreign_column class-attribute instance-attribute

foreign_column = None

max_length class-attribute instance-attribute

max_length = None

unique class-attribute instance-attribute

unique = False

checks class-attribute instance-attribute

checks = field(default_factory=list)

server_default class-attribute instance-attribute

server_default = None

computed class-attribute instance-attribute

computed = None

computed_persisted class-attribute instance-attribute

computed_persisted = False

autoincrement class-attribute instance-attribute

autoincrement = False

identity class-attribute instance-attribute

identity = False

identity_always class-attribute instance-attribute

identity_always = False

identity_start class-attribute instance-attribute

identity_start = None

identity_increment class-attribute instance-attribute

identity_increment = None

identity_min_value class-attribute instance-attribute

identity_min_value = None

identity_max_value class-attribute instance-attribute

identity_max_value = None

identity_no_min_value class-attribute instance-attribute

identity_no_min_value = False

identity_no_max_value class-attribute instance-attribute

identity_no_max_value = False

identity_cycle class-attribute instance-attribute

identity_cycle = False

identity_cache class-attribute instance-attribute

identity_cache = None

identity_order class-attribute instance-attribute

identity_order = False

identity_on_null class-attribute instance-attribute

identity_on_null = False

collation class-attribute instance-attribute

collation = None

numeric_precision class-attribute instance-attribute

numeric_precision = None

numeric_scale class-attribute instance-attribute

numeric_scale = None

foreign_key_name class-attribute instance-attribute

foreign_key_name = None

on_delete class-attribute instance-attribute

on_delete = None

on_update class-attribute instance-attribute

on_update = None

deferrable class-attribute instance-attribute

deferrable = None

initially_deferred class-attribute instance-attribute

initially_deferred = False

sqlite_on_conflict_primary_key class-attribute instance-attribute

sqlite_on_conflict_primary_key = None

sqlite_on_conflict_not_null class-attribute instance-attribute

sqlite_on_conflict_not_null = None

sqlite_on_conflict_unique class-attribute instance-attribute

sqlite_on_conflict_unique = None

has_identity property

has_identity

from_runtime classmethod

from_runtime(column)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, column: Sequence[Any]) -> "ColumnSnapshot":
    return cls(
        name=str(column[0]),
        kind=str(column[1]),
        nullable=bool(column[2]),
        primary_key=bool(column[3]),
        foreign_table=optional_str(column[4]),
        foreign_column=optional_str(column[5]),
        max_length=optional_int(column[6]),
        unique=bool(column[7]),
        checks=[runtime_check(check) for check in column[8]],
        **runtime_column_options(column),
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "ColumnSnapshot":
    return cls(
        name=str(payload["name"]),
        kind=str(payload["kind"]),
        nullable=bool(payload["nullable"]),
        primary_key=bool(payload["primary_key"]),
        foreign_table=optional_str(payload.get("foreign_table")),
        foreign_column=optional_str(payload.get("foreign_column")),
        max_length=optional_int(payload.get("max_length")),
        unique=bool(payload.get("unique", False)),
        comment=optional_str(payload.get("comment")),
        checks=[runtime_check(check) for check in payload.get("checks", [])],
        server_default=optional_str(payload.get("server_default")),
        computed=optional_str(payload.get("computed")),
        computed_persisted=bool(payload.get("computed_persisted", False)),
        autoincrement=bool(payload.get("autoincrement", False)),
        identity=bool(payload.get("identity", False)),
        identity_always=bool(payload.get("identity_always", False)),
        identity_start=optional_int(payload.get("identity_start")),
        identity_increment=optional_int(payload.get("identity_increment")),
        identity_min_value=optional_int(payload.get("identity_min_value")),
        identity_max_value=optional_int(payload.get("identity_max_value")),
        identity_no_min_value=bool(payload.get("identity_no_min_value", False)),
        identity_no_max_value=bool(payload.get("identity_no_max_value", False)),
        identity_cycle=bool(payload.get("identity_cycle", False)),
        identity_cache=optional_int(payload.get("identity_cache")),
        identity_order=bool(payload.get("identity_order", False)),
        identity_on_null=bool(payload.get("identity_on_null", False)),
        collation=optional_str(payload.get("collation")),
        numeric_precision=optional_int(payload.get("numeric_precision")),
        numeric_scale=optional_int(payload.get("numeric_scale")),
        foreign_key_name=optional_str(payload.get("foreign_key_name")),
        on_delete=optional_str(payload.get("on_delete")),
        on_update=optional_str(payload.get("on_update")),
        deferrable=optional_bool(payload.get("deferrable")),
        initially_deferred=bool(payload.get("initially_deferred", False)),
        sqlite_on_conflict_primary_key=optional_str(
            payload.get("sqlite_on_conflict_primary_key")
        ),
        sqlite_on_conflict_not_null=optional_str(
            payload.get("sqlite_on_conflict_not_null")
        ),
        sqlite_on_conflict_unique=optional_str(
            payload.get("sqlite_on_conflict_unique")
        ),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeColumn:
    return (
        self.name,
        self.kind,
        self.nullable,
        self.primary_key,
        self.foreign_table,
        self.foreign_column,
        self.max_length,
        self.unique,
        list(self.checks),
        (
            self.server_default,
            self.computed,
            self.computed_persisted,
            self.autoincrement,
            self.collation,
            self.numeric_precision,
            self.numeric_scale,
            (
                (
                    self.identity_always,
                    self.identity_start,
                    self.identity_increment,
                    self.identity_min_value,
                    self.identity_max_value,
                    self.identity_cycle,
                    self.identity_cache,
                    self.identity_order,
                    self.identity_on_null,
                    self.identity_no_min_value,
                    self.identity_no_max_value,
                )
                if self.has_identity
                else None
            ),
            self.foreign_key_name,
            self.on_delete,
            self.on_update,
            (
                (
                    (self.deferrable, self.initially_deferred)
                    if self.deferrable is not None or self.initially_deferred
                    else None
                ),
                self.comment,
                (
                    (
                        self.sqlite_on_conflict_primary_key,
                        self.sqlite_on_conflict_not_null,
                        self.sqlite_on_conflict_unique,
                    )
                    if (
                        self.sqlite_on_conflict_primary_key is not None
                        or self.sqlite_on_conflict_not_null is not None
                        or self.sqlite_on_conflict_unique is not None
                    )
                    else None
                ),
            ),
        ),
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "name": self.name,
        "kind": self.kind,
        "nullable": self.nullable,
        "primary_key": self.primary_key,
        "foreign_table": self.foreign_table,
        "foreign_column": self.foreign_column,
        "max_length": self.max_length,
        "unique": self.unique,
        "checks": [list(check) for check in self.checks],
    }
    if self.comment is not None:
        payload["comment"] = self.comment
    if self.server_default is not None:
        payload["server_default"] = self.server_default
    if self.computed is not None:
        payload["computed"] = self.computed
    if self.computed_persisted:
        payload["computed_persisted"] = self.computed_persisted
    if self.autoincrement:
        payload["autoincrement"] = self.autoincrement
    if self.has_identity:
        payload["identity"] = True
    if self.identity_always:
        payload["identity_always"] = self.identity_always
    if self.identity_start is not None:
        payload["identity_start"] = self.identity_start
    if self.identity_increment is not None:
        payload["identity_increment"] = self.identity_increment
    if self.identity_min_value is not None:
        payload["identity_min_value"] = self.identity_min_value
    if self.identity_max_value is not None:
        payload["identity_max_value"] = self.identity_max_value
    if self.identity_no_min_value:
        payload["identity_no_min_value"] = self.identity_no_min_value
    if self.identity_no_max_value:
        payload["identity_no_max_value"] = self.identity_no_max_value
    if self.identity_cycle:
        payload["identity_cycle"] = self.identity_cycle
    if self.identity_cache is not None:
        payload["identity_cache"] = self.identity_cache
    if self.identity_order:
        payload["identity_order"] = self.identity_order
    if self.identity_on_null:
        payload["identity_on_null"] = self.identity_on_null
    if self.collation is not None:
        payload["collation"] = self.collation
    if self.numeric_precision is not None:
        payload["numeric_precision"] = self.numeric_precision
    if self.numeric_scale is not None:
        payload["numeric_scale"] = self.numeric_scale
    if self.foreign_key_name is not None:
        payload["foreign_key_name"] = self.foreign_key_name
    if self.on_delete is not None:
        payload["on_delete"] = self.on_delete
    if self.on_update is not None:
        payload["on_update"] = self.on_update
    if self.deferrable is not None:
        payload["deferrable"] = self.deferrable
    if self.initially_deferred:
        payload["initially_deferred"] = self.initially_deferred
    if self.sqlite_on_conflict_primary_key is not None:
        payload["sqlite_on_conflict_primary_key"] = (
            self.sqlite_on_conflict_primary_key
        )
    if self.sqlite_on_conflict_not_null is not None:
        payload["sqlite_on_conflict_not_null"] = self.sqlite_on_conflict_not_null
    if self.sqlite_on_conflict_unique is not None:
        payload["sqlite_on_conflict_unique"] = self.sqlite_on_conflict_unique
    return payload

ormdantic._migrations.models.IndexSnapshot dataclass

IndexSnapshot(
    name,
    columns,
    unique=False,
    where=None,
    include_columns=list(),
    method=None,
    expressions=list(),
    postgres_with=list(),
    comment=None,
    postgres_tablespace=None,
    mssql_filegroup=None,
    mssql_clustered=False,
    oracle_tablespace=None,
    mysql_prefix=None,
    mysql_length=dict(),
    mysql_using=None,
    postgres_ops=dict(),
    mysql_visible=None,
    oracle_bitmap=False,
    oracle_compress=None,
    postgres_nulls_not_distinct=False,
)

Serializable index metadata used by migration snapshots.

name instance-attribute

name

columns instance-attribute

columns

unique class-attribute instance-attribute

unique = False

where class-attribute instance-attribute

where = None

include_columns class-attribute instance-attribute

include_columns = field(default_factory=list)

method class-attribute instance-attribute

method = None

expressions class-attribute instance-attribute

expressions = field(default_factory=list)

postgres_with class-attribute instance-attribute

postgres_with = field(default_factory=list)

comment class-attribute instance-attribute

comment = None

postgres_tablespace class-attribute instance-attribute

postgres_tablespace = None

mssql_filegroup class-attribute instance-attribute

mssql_filegroup = None

mssql_clustered class-attribute instance-attribute

mssql_clustered = False

oracle_tablespace class-attribute instance-attribute

oracle_tablespace = None

mysql_prefix class-attribute instance-attribute

mysql_prefix = None

mysql_length class-attribute instance-attribute

mysql_length = field(default_factory=dict)

mysql_using class-attribute instance-attribute

mysql_using = None

postgres_ops class-attribute instance-attribute

postgres_ops = field(default_factory=dict)

mysql_visible class-attribute instance-attribute

mysql_visible = None

oracle_bitmap class-attribute instance-attribute

oracle_bitmap = False

oracle_compress class-attribute instance-attribute

oracle_compress = None

postgres_nulls_not_distinct class-attribute instance-attribute

postgres_nulls_not_distinct = False

from_runtime classmethod

from_runtime(index)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, index: Sequence[Any]) -> "IndexSnapshot":
    if len(index) > 13 and isinstance(index[13], Mapping):
        mysql_prefix = None
        mysql_lengths = mysql_index_lengths(index[13])
        mysql_using = optional_str(index[14]) if len(index) > 14 else None
    else:
        mysql_prefix = optional_str(index[13]) if len(index) > 13 else None
        mysql_lengths = mysql_index_lengths(index[14]) if len(index) > 14 else {}
        mysql_using = optional_str(index[15]) if len(index) > 15 else None
    return cls(
        str(index[0]),
        [str(column) for column in index[1]],
        bool(index[2]),
        str(index[3]) if len(index) > 3 and index[3] is not None else None,
        [str(column) for column in index[4]] if len(index) > 4 else [],
        str(index[5]) if len(index) > 5 and index[5] is not None else None,
        [str(expression) for expression in index[6]] if len(index) > 6 else [],
        postgres_storage_parameters(index[7]) if len(index) > 7 else [],
        optional_str(index[8]) if len(index) > 8 else None,
        optional_str(index[9]) if len(index) > 9 else None,
        optional_str(index[10]) if len(index) > 10 else None,
        bool(index[11]) if len(index) > 11 else False,
        optional_str(index[12]) if len(index) > 12 else None,
        mysql_prefix,
        mysql_lengths,
        mysql_using,
        postgres_index_ops(index[16]) if len(index) > 16 else {},
        optional_bool(index[17]) if len(index) > 17 else None,
        bool(index[18]) if len(index) > 18 else False,
        oracle_index_compress(index[19]) if len(index) > 19 else None,
        bool(index[20]) if len(index) > 20 else False,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "IndexSnapshot":
    return cls(
        str(payload["name"]),
        [str(column) for column in payload["columns"]],
        bool(payload.get("unique", False)),
        (
            str(payload["where"])
            if payload.get("where") is not None
            else (
                str(payload["where_expr"])
                if payload.get("where_expr") is not None
                else None
            )
        ),
        [str(column) for column in payload.get("include_columns", [])],
        str(payload["method"]) if payload.get("method") is not None else None,
        [str(expression) for expression in payload.get("expressions", [])],
        postgres_storage_parameters(payload.get("postgres_with", [])),
        optional_str(payload.get("comment")),
        optional_str(payload.get("postgres_tablespace")),
        optional_str(payload.get("mssql_filegroup")),
        bool(payload.get("mssql_clustered", False)),
        optional_str(payload.get("oracle_tablespace")),
        optional_str(payload.get("mysql_prefix")),
        mysql_index_lengths(payload.get("mysql_length", {})),
        optional_str(payload.get("mysql_using")),
        postgres_index_ops(payload.get("postgres_ops", {})),
        optional_bool(payload.get("mysql_visible")),
        bool(payload.get("oracle_bitmap", False)),
        oracle_index_compress(payload.get("oracle_compress")),
        bool(payload.get("postgres_nulls_not_distinct", False)),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeIndex:
    return (
        self.name,
        list(self.columns),
        self.unique,
        self.where,
        list(self.include_columns),
        self.method,
        list(self.expressions),
        list(self.postgres_with),
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "name": self.name,
        "columns": list(self.columns),
        "unique": self.unique,
    }
    if self.where is not None:
        payload["where"] = self.where
    if self.include_columns:
        payload["include_columns"] = list(self.include_columns)
    if self.method is not None:
        payload["method"] = self.method
    if self.expressions:
        payload["expressions"] = list(self.expressions)
    if self.postgres_with:
        payload["postgres_with"] = [
            [name, value] for name, value in self.postgres_with
        ]
    if self.comment is not None:
        payload["comment"] = self.comment
    if self.postgres_tablespace is not None:
        payload["postgres_tablespace"] = self.postgres_tablespace
    if self.mssql_filegroup is not None:
        payload["mssql_filegroup"] = self.mssql_filegroup
    if self.mssql_clustered:
        payload["mssql_clustered"] = self.mssql_clustered
    if self.oracle_tablespace is not None:
        payload["oracle_tablespace"] = self.oracle_tablespace
    if self.oracle_bitmap:
        payload["oracle_bitmap"] = self.oracle_bitmap
    if self.oracle_compress is not None:
        payload["oracle_compress"] = self.oracle_compress
    if self.mysql_prefix is not None:
        payload["mysql_prefix"] = self.mysql_prefix
    if self.mysql_length:
        payload["mysql_length"] = dict(self.mysql_length)
    if self.mysql_using is not None:
        payload["mysql_using"] = self.mysql_using
    if self.postgres_ops:
        payload["postgres_ops"] = dict(self.postgres_ops)
    if self.mysql_visible is not None:
        payload["mysql_visible"] = self.mysql_visible
    if self.postgres_nulls_not_distinct:
        payload["postgres_nulls_not_distinct"] = self.postgres_nulls_not_distinct
    return payload

ormdantic._migrations.models.TableCheckSnapshot dataclass

TableCheckSnapshot(
    name,
    expression,
    validated=True,
    no_inherit=False,
    comment=None,
)

Serializable table-level CHECK constraint metadata.

name instance-attribute

name

expression instance-attribute

expression

validated class-attribute instance-attribute

validated = True

no_inherit class-attribute instance-attribute

no_inherit = False

comment class-attribute instance-attribute

comment = None

from_runtime classmethod

from_runtime(check)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, check: Sequence[Any]) -> "TableCheckSnapshot":
    return cls(
        str(check[0]),
        str(check[1]),
        bool(check[2]) if len(check) > 2 else True,
        bool(check[3]) if len(check) > 3 else False,
        optional_str(check[4]) if len(check) > 4 else None,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "TableCheckSnapshot":
    return cls(
        str(payload["name"]),
        str(payload["expression"]),
        bool(payload.get("validated", True)),
        bool(payload.get("no_inherit", False)),
        optional_str(payload.get("comment")),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeTableCheck:
    return (self.name, self.expression, self.validated, self.no_inherit)

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "name": self.name,
        "expression": self.expression,
    }
    if not self.validated:
        payload["validated"] = False
    if self.no_inherit:
        payload["no_inherit"] = True
    if self.comment is not None:
        payload["comment"] = self.comment
    return payload

ormdantic._migrations.models.UniqueConstraintSnapshot dataclass

UniqueConstraintSnapshot(
    name,
    columns,
    deferrable=None,
    initially_deferred=False,
    nulls_not_distinct=False,
    sqlite_on_conflict=None,
    mssql_filegroup=None,
    mssql_clustered=None,
    comment=None,
    postgres_include=list(),
    oracle_tablespace=None,
    oracle_compress=None,
)

Serializable named UNIQUE constraint metadata.

name instance-attribute

name

columns instance-attribute

columns

deferrable class-attribute instance-attribute

deferrable = None

initially_deferred class-attribute instance-attribute

initially_deferred = False

nulls_not_distinct class-attribute instance-attribute

nulls_not_distinct = False

sqlite_on_conflict class-attribute instance-attribute

sqlite_on_conflict = None

mssql_filegroup class-attribute instance-attribute

mssql_filegroup = None

mssql_clustered class-attribute instance-attribute

mssql_clustered = None

comment class-attribute instance-attribute

comment = None

postgres_include class-attribute instance-attribute

postgres_include = field(default_factory=list)

oracle_tablespace class-attribute instance-attribute

oracle_tablespace = None

oracle_compress class-attribute instance-attribute

oracle_compress = None

from_runtime classmethod

from_runtime(constraint)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, constraint: Sequence[Any]) -> "UniqueConstraintSnapshot":
    mssql_filegroup: str | None = None
    mssql_clustered: bool | None = None
    oracle_tablespace: str | None = None
    oracle_compress: int | bool | None = None
    comment: str | None = None
    postgres_include: list[str] = []
    if len(constraint) > 10:
        mssql_filegroup = (
            optional_str(constraint[6]) if len(constraint) > 6 else None
        )
        mssql_clustered = (
            optional_bool(constraint[7]) if len(constraint) > 7 else None
        )
        if len(constraint) > 8:
            comment = optional_str(constraint[8])
        if len(constraint) > 9 and constraint[9] is not None:
            postgres_include = [str(column) for column in constraint[9]]
        oracle_tablespace = optional_str(constraint[10])
        if len(constraint) > 11:
            oracle_compress = oracle_index_compress(constraint[11])
    elif len(constraint) == 10 and is_non_string_sequence(constraint[9]):
        mssql_filegroup = (
            optional_str(constraint[6]) if len(constraint) > 6 else None
        )
        mssql_clustered = (
            optional_bool(constraint[7]) if len(constraint) > 7 else None
        )
        comment = optional_str(constraint[8])
        postgres_include = [str(column) for column in constraint[9]]
    elif len(constraint) == 10:
        mssql_filegroup = (
            optional_str(constraint[6]) if len(constraint) > 6 else None
        )
        mssql_clustered = (
            optional_bool(constraint[7]) if len(constraint) > 7 else None
        )
        oracle_tablespace = optional_str(constraint[8])
        oracle_compress = oracle_index_compress(constraint[9])
    elif len(constraint) == 9:
        mssql_filegroup = (
            optional_str(constraint[6]) if len(constraint) > 6 else None
        )
        mssql_clustered = (
            optional_bool(constraint[7]) if len(constraint) > 7 else None
        )
        oracle_tablespace = optional_str(constraint[8])
    elif len(constraint) > 7 and is_non_string_sequence(constraint[7]):
        comment = optional_str(constraint[6]) if len(constraint) > 6 else None
        postgres_include = [str(column) for column in constraint[7]]
    else:
        if len(constraint) > 6:
            if isinstance(constraint[6], bool):
                mssql_clustered = optional_bool(constraint[6])
            elif constraint[6] is not None:
                if len(constraint) > 7:
                    mssql_filegroup = optional_str(constraint[6])
                else:
                    comment = optional_str(constraint[6])
        if len(constraint) > 7:
            if isinstance(constraint[7], bool):
                mssql_clustered = optional_bool(constraint[7])
            elif constraint[7] is not None:
                comment = optional_str(constraint[7])
        if len(constraint) > 8 and constraint[8] is not None:
            if is_non_string_sequence(constraint[8]):
                postgres_include = [str(column) for column in constraint[8]]
            else:
                comment = optional_str(constraint[8])
        if len(constraint) > 9 and constraint[9] is not None:
            postgres_include = [str(column) for column in constraint[9]]
    return cls(
        str(constraint[0]),
        [str(column) for column in constraint[1]],
        optional_bool(constraint[2]) if len(constraint) > 2 else None,
        bool(constraint[3]) if len(constraint) > 3 else False,
        bool(constraint[4]) if len(constraint) > 4 else False,
        optional_str(constraint[5]) if len(constraint) > 5 else None,
        mssql_filegroup,
        mssql_clustered,
        comment,
        postgres_include,
        oracle_tablespace,
        oracle_compress,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "UniqueConstraintSnapshot":
    return cls(
        str(payload["name"]),
        [str(column) for column in payload["columns"]],
        optional_bool(payload.get("deferrable")),
        bool(payload.get("initially_deferred", False)),
        bool(payload.get("nulls_not_distinct", False)),
        optional_str(payload.get("sqlite_on_conflict")),
        optional_str(payload.get("mssql_filegroup")),
        optional_bool(payload.get("mssql_clustered")),
        optional_str(payload.get("comment")),
        [str(column) for column in payload.get("postgres_include") or []],
        optional_str(payload.get("oracle_tablespace")),
        oracle_index_compress(payload.get("oracle_compress")),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeUniqueConstraint:
    return (
        self.name,
        list(self.columns),
        self.deferrable,
        self.initially_deferred,
        self.nulls_not_distinct,
        self.sqlite_on_conflict,
        self.mssql_filegroup,
        self.mssql_clustered,
        self.oracle_tablespace,
        oracle_index_compress_runtime(self.oracle_compress),
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "name": self.name,
        "columns": list(self.columns),
    }
    if self.deferrable is not None:
        payload["deferrable"] = self.deferrable
    if self.initially_deferred:
        payload["initially_deferred"] = self.initially_deferred
    if self.nulls_not_distinct:
        payload["nulls_not_distinct"] = self.nulls_not_distinct
    if self.sqlite_on_conflict is not None:
        payload["sqlite_on_conflict"] = self.sqlite_on_conflict
    if self.mssql_filegroup is not None:
        payload["mssql_filegroup"] = self.mssql_filegroup
    if self.mssql_clustered is not None:
        payload["mssql_clustered"] = self.mssql_clustered
    if self.oracle_tablespace is not None:
        payload["oracle_tablespace"] = self.oracle_tablespace
    if self.oracle_compress is not None:
        payload["oracle_compress"] = self.oracle_compress
    if self.comment is not None:
        payload["comment"] = self.comment
    if self.postgres_include:
        payload["postgres_include"] = list(self.postgres_include)
    return payload

ormdantic._migrations.models.ForeignKeyConstraintSnapshot dataclass

ForeignKeyConstraintSnapshot(
    name,
    columns,
    foreign_table,
    foreign_columns,
    on_delete=None,
    on_update=None,
    deferrable=None,
    initially_deferred=False,
    validated=True,
    match=None,
    comment=None,
)

Serializable table-level FOREIGN KEY constraint metadata.

name instance-attribute

name

columns instance-attribute

columns

foreign_table instance-attribute

foreign_table

foreign_columns instance-attribute

foreign_columns

on_delete class-attribute instance-attribute

on_delete = None

on_update class-attribute instance-attribute

on_update = None

deferrable class-attribute instance-attribute

deferrable = None

initially_deferred class-attribute instance-attribute

initially_deferred = False

validated class-attribute instance-attribute

validated = True

match class-attribute instance-attribute

match = None

comment class-attribute instance-attribute

comment = None

from_runtime classmethod

from_runtime(constraint)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(
    cls,
    constraint: Sequence[Any],
) -> "ForeignKeyConstraintSnapshot":
    return cls(
        str(constraint[0]),
        [str(column) for column in constraint[1]],
        str(constraint[2]),
        [str(column) for column in constraint[3]],
        optional_str(constraint[4]) if len(constraint) > 4 else None,
        optional_str(constraint[5]) if len(constraint) > 5 else None,
        optional_bool(constraint[6]) if len(constraint) > 6 else None,
        bool(constraint[7]) if len(constraint) > 7 else False,
        bool(constraint[8]) if len(constraint) > 8 else True,
        optional_str(constraint[9]) if len(constraint) > 9 else None,
        optional_str(constraint[10]) if len(constraint) > 10 else None,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "ForeignKeyConstraintSnapshot":
    return cls(
        str(payload["name"]),
        [str(column) for column in payload["columns"]],
        str(payload["foreign_table"]),
        [str(column) for column in payload["foreign_columns"]],
        optional_str(payload.get("on_delete")),
        optional_str(payload.get("on_update")),
        optional_bool(payload.get("deferrable")),
        bool(payload.get("initially_deferred", False)),
        bool(payload.get("validated", True)),
        optional_str(payload.get("match")),
        optional_str(payload.get("comment")),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeForeignKeyConstraint:
    return (
        self.name,
        list(self.columns),
        self.foreign_table,
        list(self.foreign_columns),
        self.on_delete,
        self.on_update,
        self.deferrable,
        self.initially_deferred,
        self.validated,
        self.match,
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "name": self.name,
        "columns": list(self.columns),
        "foreign_table": self.foreign_table,
        "foreign_columns": list(self.foreign_columns),
    }
    if self.on_delete is not None:
        payload["on_delete"] = self.on_delete
    if self.on_update is not None:
        payload["on_update"] = self.on_update
    if self.deferrable is not None:
        payload["deferrable"] = self.deferrable
    if self.initially_deferred:
        payload["initially_deferred"] = self.initially_deferred
    if not self.validated:
        payload["validated"] = False
    if self.match is not None:
        payload["match"] = self.match
    if self.comment is not None:
        payload["comment"] = self.comment
    return payload

ormdantic._migrations.models.ExclusionConstraintSnapshot dataclass

ExclusionConstraintSnapshot(
    name,
    columns=list(),
    expressions=list(),
    using="gist",
    where=None,
    deferrable=None,
    initially_deferred=False,
    ops=dict(),
    comment=None,
)

Serializable PostgreSQL EXCLUDE constraint metadata.

name instance-attribute

name

columns class-attribute instance-attribute

columns = field(default_factory=list)

expressions class-attribute instance-attribute

expressions = field(default_factory=list)

using class-attribute instance-attribute

using = 'gist'

where class-attribute instance-attribute

where = None

deferrable class-attribute instance-attribute

deferrable = None

initially_deferred class-attribute instance-attribute

initially_deferred = False

ops class-attribute instance-attribute

ops = field(default_factory=dict)

comment class-attribute instance-attribute

comment = None

from_runtime classmethod

from_runtime(constraint)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(
    cls,
    constraint: Sequence[Any],
) -> "ExclusionConstraintSnapshot":
    ops: dict[str, str] = {}
    comment: str | None = None
    if len(constraint) > 7:
        if isinstance(constraint[7], Mapping):
            ops = postgres_index_ops(constraint[7])
            comment = optional_str(constraint[8]) if len(constraint) > 8 else None
        else:
            comment = optional_str(constraint[7])
    return cls(
        str(constraint[0]),
        exclusion_elements(constraint[1]),
        exclusion_elements(constraint[2]),
        str(constraint[3]) if len(constraint) > 3 else "gist",
        optional_str(constraint[4]) if len(constraint) > 4 else None,
        optional_bool(constraint[5]) if len(constraint) > 5 else None,
        bool(constraint[6]) if len(constraint) > 6 else False,
        ops,
        comment,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "ExclusionConstraintSnapshot":
    return cls(
        str(payload["name"]),
        exclusion_elements(payload.get("columns", [])),
        exclusion_elements(payload.get("expressions", [])),
        str(payload.get("using", "gist")),
        optional_str(payload.get("where")),
        optional_bool(payload.get("deferrable")),
        bool(payload.get("initially_deferred", False)),
        postgres_index_ops(payload.get("ops", {})),
        optional_str(payload.get("comment")),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeExclusionConstraint:
    return (
        self.name,
        list(self.columns),
        list(self.expressions),
        self.using,
        self.where,
        self.deferrable,
        self.initially_deferred,
        dict(self.ops),
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "name": self.name,
        "columns": [list(element) for element in self.columns],
        "expressions": [list(element) for element in self.expressions],
        "using": self.using,
    }
    if self.where is not None:
        payload["where"] = self.where
    if self.deferrable is not None:
        payload["deferrable"] = self.deferrable
    if self.initially_deferred:
        payload["initially_deferred"] = self.initially_deferred
    if self.ops:
        payload["ops"] = dict(self.ops)
    if self.comment is not None:
        payload["comment"] = self.comment
    return payload

ormdantic._migrations.models.NamespaceSnapshot dataclass

NamespaceSnapshot(name, comment=None)

Serializable database namespace/schema metadata used by migration snapshots.

name instance-attribute

name

comment class-attribute instance-attribute

comment = None

from_runtime classmethod

from_runtime(namespace)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, namespace: Sequence[Any]) -> "NamespaceSnapshot":
    return cls(
        str(namespace[0]),
        optional_str(namespace[1]) if len(namespace) > 1 else None,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "NamespaceSnapshot":
    return cls(str(payload["name"]), optional_str(payload.get("comment")))

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeNamespace:
    return (self.name, self.comment)

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload = {"name": self.name}
    if self.comment is not None:
        payload["comment"] = self.comment
    return payload

ormdantic._migrations.models.SequenceSnapshot dataclass

SequenceSnapshot(
    name,
    schema=None,
    start=None,
    increment=None,
    min_value=None,
    max_value=None,
    cycle=False,
    cache=None,
    comment=None,
    data_type=None,
    order=False,
    no_min_value=False,
    no_max_value=False,
)

Serializable database sequence metadata used by migration snapshots.

name instance-attribute

name

schema class-attribute instance-attribute

schema = None

start class-attribute instance-attribute

start = None

increment class-attribute instance-attribute

increment = None

min_value class-attribute instance-attribute

min_value = None

max_value class-attribute instance-attribute

max_value = None

cycle class-attribute instance-attribute

cycle = False

cache class-attribute instance-attribute

cache = None

comment class-attribute instance-attribute

comment = None

data_type class-attribute instance-attribute

data_type = None

order class-attribute instance-attribute

order = False

no_min_value class-attribute instance-attribute

no_min_value = False

no_max_value class-attribute instance-attribute

no_max_value = False

from_runtime classmethod

from_runtime(sequence)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, sequence: Sequence[Any]) -> "SequenceSnapshot":
    return cls(
        str(sequence[0]),
        optional_str(sequence[1]) if len(sequence) > 1 else None,
        optional_int(sequence[2]) if len(sequence) > 2 else None,
        optional_int(sequence[3]) if len(sequence) > 3 else None,
        optional_int(sequence[4]) if len(sequence) > 4 else None,
        optional_int(sequence[5]) if len(sequence) > 5 else None,
        bool(sequence[6]) if len(sequence) > 6 else False,
        optional_int(sequence[7]) if len(sequence) > 7 else None,
        optional_str(sequence[8]) if len(sequence) > 8 else None,
        optional_str(sequence[9]) if len(sequence) > 9 else None,
        bool(sequence[10]) if len(sequence) > 10 else False,
        bool(sequence[11]) if len(sequence) > 11 else False,
        bool(sequence[12]) if len(sequence) > 12 else False,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "SequenceSnapshot":
    return cls(
        str(payload["name"]),
        optional_str(payload.get("schema")),
        optional_int(payload.get("start")),
        optional_int(payload.get("increment")),
        optional_int(payload.get("min_value")),
        optional_int(payload.get("max_value")),
        bool(payload.get("cycle", False)),
        optional_int(payload.get("cache")),
        optional_str(payload.get("comment")),
        optional_str(payload.get("data_type")),
        bool(payload.get("order", False)),
        bool(payload.get("no_min_value", False)),
        bool(payload.get("no_max_value", False)),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeSequence:
    return (
        self.name,
        self.schema,
        self.start,
        self.increment,
        self.min_value,
        self.max_value,
        self.cycle,
        self.cache,
        self.comment,
        self.data_type,
        self.order,
        self.no_min_value,
        self.no_max_value,
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {"name": self.name}
    if self.schema is not None:
        payload["schema"] = self.schema
    if self.start is not None:
        payload["start"] = self.start
    if self.increment is not None:
        payload["increment"] = self.increment
    if self.min_value is not None:
        payload["min_value"] = self.min_value
    if self.max_value is not None:
        payload["max_value"] = self.max_value
    if self.cycle:
        payload["cycle"] = self.cycle
    if self.cache is not None:
        payload["cache"] = self.cache
    if self.comment is not None:
        payload["comment"] = self.comment
    if self.data_type is not None:
        payload["data_type"] = self.data_type
    if self.order:
        payload["order"] = self.order
    if self.no_min_value:
        payload["no_min_value"] = self.no_min_value
    if self.no_max_value:
        payload["no_max_value"] = self.no_max_value
    return payload

ormdantic._migrations.models.ViewSnapshot dataclass

ViewSnapshot(
    name,
    definition,
    schema=None,
    materialized=False,
    comment=None,
)

Serializable database view metadata used by migration snapshots.

name instance-attribute

name

definition instance-attribute

definition

schema class-attribute instance-attribute

schema = None

materialized class-attribute instance-attribute

materialized = False

comment class-attribute instance-attribute

comment = None

from_runtime classmethod

from_runtime(view)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, view: Sequence[Any]) -> "ViewSnapshot":
    return cls(
        str(view[0]),
        normalized_view_definition(str(view[2])),
        optional_str(view[1]) if len(view) > 1 else None,
        bool(view[3]) if len(view) > 3 else False,
        optional_str(view[4]) if len(view) > 4 else None,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "ViewSnapshot":
    return cls(
        str(payload["name"]),
        normalized_view_definition(str(payload["definition"])),
        optional_str(payload.get("schema")),
        bool(payload.get("materialized", False)),
        optional_str(payload.get("comment")),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeView:
    return (
        self.name,
        self.schema,
        self.definition,
        self.materialized,
        self.comment,
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "name": self.name,
        "definition": self.definition,
    }
    if self.schema is not None:
        payload["schema"] = self.schema
    if self.materialized:
        payload["materialized"] = self.materialized
    if self.comment is not None:
        payload["comment"] = self.comment
    return payload

ormdantic._migrations.models.EnumTypeSnapshot dataclass

EnumTypeSnapshot(name, values, schema=None, comment=None)

Serializable native enum type metadata used by migration snapshots.

name instance-attribute

name

values instance-attribute

values

schema class-attribute instance-attribute

schema = None

comment class-attribute instance-attribute

comment = None

from_runtime classmethod

from_runtime(enum_type)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, enum_type: Sequence[Any]) -> "EnumTypeSnapshot":
    schema = optional_str(enum_type[2]) if len(enum_type) > 2 else None
    comment = optional_str(enum_type[3]) if len(enum_type) > 3 else None
    return cls(
        str(enum_type[0]),
        [str(value) for value in enum_type[1]],
        schema,
        comment,
    )

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "EnumTypeSnapshot":
    return cls(
        str(payload["name"]),
        [str(value) for value in payload["values"]],
        optional_str(payload.get("schema")),
        optional_str(payload.get("comment")),
    )

to_runtime

to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeEnumType:
    return (self.name, list(self.values), self.schema, self.comment)

to_dict

to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "name": self.name,
        "values": list(self.values),
    }
    if self.schema is not None:
        payload["schema"] = self.schema
    if self.comment is not None:
        payload["comment"] = self.comment
    return payload

ormdantic._migrations.models.SchemaDiff dataclass

SchemaDiff(changes=list(), warnings=list())

Structured schema diff output.

changes class-attribute instance-attribute

changes = field(default_factory=list)

warnings class-attribute instance-attribute

warnings = field(default_factory=list)

destructive_changes property

destructive_changes

unsafe_changes property

unsafe_changes

has_unsafe_operations property

has_unsafe_operations

has_destructive_operations property

has_destructive_operations

is_empty

is_empty()
Source code in ormdantic/_migrations/models.py
def is_empty(self) -> bool:
    return not self.changes

summary

summary()
Source code in ormdantic/_migrations/models.py
def summary(self) -> list[str]:
    return [change.message for change in self.changes]

ormdantic._migrations.models.MigrationPlan dataclass

MigrationPlan(
    operations=list(),
    rollback_operations=list(),
    diff=SchemaDiff(),
    warnings=list(),
    safety=dict(),
)

A generated migration plan.

operations class-attribute instance-attribute

operations = field(default_factory=list)

rollback_operations class-attribute instance-attribute

rollback_operations = field(default_factory=list)

diff class-attribute instance-attribute

diff = field(default_factory=SchemaDiff)

warnings class-attribute instance-attribute

warnings = field(default_factory=list)

safety class-attribute instance-attribute

safety = field(default_factory=dict)

rollback_available property

rollback_available

has_unsafe_operations property

has_unsafe_operations

has_destructive_operations property

has_destructive_operations

is_empty

is_empty()
Source code in ormdantic/_migrations/models.py
def is_empty(self) -> bool:
    return not self.operations

dry_run

dry_run()
Source code in ormdantic/_migrations/models.py
def dry_run(self) -> list[str]:
    return [operation.sql for operation in self.operations]

rollback_sql

rollback_sql()
Source code in ormdantic/_migrations/models.py
def rollback_sql(self) -> list[str]:
    return [operation.sql for operation in self.rollback_operations]

ormdantic._migrations.models.MigrationOperation dataclass

MigrationOperation(
    sql,
    values=(),
    description=None,
    unsafe=False,
    destructive=False,
    kind="statement",
    table=None,
    object_name=None,
    reversible=True,
    requires_lock=True,
    requires_rebuild=False,
    generated_rollback=False,
    metadata=dict(),
)

A SQL migration operation.

sql instance-attribute

sql

values class-attribute instance-attribute

values = ()

description class-attribute instance-attribute

description = None

unsafe class-attribute instance-attribute

unsafe = False

destructive class-attribute instance-attribute

destructive = False

kind class-attribute instance-attribute

kind = 'statement'

table class-attribute instance-attribute

table = None

object_name class-attribute instance-attribute

object_name = None

reversible class-attribute instance-attribute

reversible = True

requires_lock class-attribute instance-attribute

requires_lock = True

requires_rebuild class-attribute instance-attribute

requires_rebuild = False

generated_rollback class-attribute instance-attribute

generated_rollback = False

metadata class-attribute instance-attribute

metadata = field(default_factory=dict)

ormdantic._migrations.models.MigrationWarning dataclass

MigrationWarning(code, message, table=None, name=None)

A migration safety warning.

code instance-attribute

code

message instance-attribute

message

table class-attribute instance-attribute

table = None

name class-attribute instance-attribute

name = None

ormdantic._migrations.models.MigrationHistoryEntry dataclass

MigrationHistoryEntry(
    revision,
    description=None,
    checksum=None,
    applied_at=None,
    execution_time_ms=None,
    status=MIGRATION_STATUS_APPLIED,
    dirty=False,
    artifact_version=None,
    ormdantic_version=None,
    metadata=dict(),
)

One row from the durable migration history table.

revision instance-attribute

revision

description class-attribute instance-attribute

description = None

checksum class-attribute instance-attribute

checksum = None

applied_at class-attribute instance-attribute

applied_at = None

execution_time_ms class-attribute instance-attribute

execution_time_ms = None

status class-attribute instance-attribute

status = MIGRATION_STATUS_APPLIED

dirty class-attribute instance-attribute

dirty = False

artifact_version class-attribute instance-attribute

artifact_version = None

ormdantic_version class-attribute instance-attribute

ormdantic_version = None

metadata class-attribute instance-attribute

metadata = field(default_factory=dict)

Artifact Type

ormdantic._migrations.artifacts.MigrationArtifact dataclass

MigrationArtifact(
    revision,
    from_snapshot,
    to_snapshot,
    operations=list(),
    rollback_operations=list(),
    diff=SchemaDiff(),
    warnings=list(),
    description=None,
    created_at=(lambda: isoformat())(),
    dialect=None,
    checksum=None,
    depends_on=list(),
    branch_labels=list(),
    safety=dict(),
    metadata=dict(),
    artifact_version=MIGRATION_ARTIFACT_VERSION,
    version=MIGRATION_ARTIFACT_VERSION,
)

A serializable migration file with snapshots, SQL, and safety metadata.

revision instance-attribute

revision

from_snapshot instance-attribute

from_snapshot

to_snapshot instance-attribute

to_snapshot

operations class-attribute instance-attribute

operations = field(default_factory=list)

rollback_operations class-attribute instance-attribute

rollback_operations = field(default_factory=list)

diff class-attribute instance-attribute

diff = field(default_factory=SchemaDiff)

warnings class-attribute instance-attribute

warnings = field(default_factory=list)

description class-attribute instance-attribute

description = None

created_at class-attribute instance-attribute

created_at = field(default_factory=lambda: isoformat())

dialect class-attribute instance-attribute

dialect = None

checksum class-attribute instance-attribute

checksum = None

depends_on class-attribute instance-attribute

depends_on = field(default_factory=list)

branch_labels class-attribute instance-attribute

branch_labels = field(default_factory=list)

safety class-attribute instance-attribute

safety = field(default_factory=dict)

metadata class-attribute instance-attribute

metadata = field(default_factory=dict)

artifact_version class-attribute instance-attribute

artifact_version = MIGRATION_ARTIFACT_VERSION

version class-attribute instance-attribute

version = MIGRATION_ARTIFACT_VERSION

from_plan classmethod

from_plan(
    revision,
    plan,
    from_snapshot,
    to_snapshot,
    *,
    dialect=None,
    description=None,
    depends_on=None,
    branch_labels=None,
    metadata=None,
    created_at=None
)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def from_plan(
    cls,
    revision: str,
    plan: MigrationPlan,
    from_snapshot: SchemaSnapshot,
    to_snapshot: SchemaSnapshot,
    *,
    dialect: str | None = None,
    description: str | None = None,
    depends_on: Sequence[str] | None = None,
    branch_labels: Sequence[str] | None = None,
    metadata: Mapping[str, Any] | None = None,
    created_at: str | None = None,
) -> MigrationArtifact:
    artifact = cls(
        revision=revision,
        from_snapshot=from_snapshot,
        to_snapshot=to_snapshot,
        operations=[
            operation_from_dict(operation_to_dict(operation))
            for operation in plan.operations
        ],
        rollback_operations=[
            operation_from_dict(operation_to_dict(operation))
            for operation in plan.rollback_operations
        ],
        diff=diff_from_dict(diff_to_dict(plan.diff)),
        warnings=[
            warning_from_dict(warning_to_dict(warning)) for warning in plan.warnings
        ],
        description=description,
        created_at=created_at
        or datetime.now(UTC).replace(microsecond=0).isoformat(),
        dialect=dialect,
        depends_on=[str(item) for item in depends_on or ()],
        branch_labels=[str(item) for item in branch_labels or ()],
        safety=dict(plan.safety),
        metadata=dict(metadata or {}),
        artifact_version=MIGRATION_ARTIFACT_VERSION,
        version=MIGRATION_ARTIFACT_VERSION,
    )
    return artifact.with_checksum()

from_dict classmethod

from_dict(payload)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> MigrationArtifact:
    artifact_version = int(
        payload.get("artifact_version", payload.get("version", 1))
    )
    artifact = cls(
        revision=str(payload["revision"]),
        from_snapshot=SchemaSnapshot.from_dict(payload["from_snapshot"]),
        to_snapshot=SchemaSnapshot.from_dict(payload["to_snapshot"]),
        operations=[
            operation_from_dict(operation)
            for operation in payload.get("up", payload.get("operations", []))
        ],
        rollback_operations=[
            operation_from_dict(operation)
            for operation in payload.get(
                "down", payload.get("rollback_operations", [])
            )
        ],
        diff=diff_from_dict(payload.get("diff", {})),
        warnings=[
            warning_from_dict(warning) for warning in payload.get("warnings", [])
        ],
        description=optional_str(payload.get("description")),
        created_at=str(
            payload.get(
                "created_at",
                datetime.now(UTC).replace(microsecond=0).isoformat(),
            )
        ),
        dialect=optional_str(payload.get("dialect")),
        checksum=optional_str(payload.get("checksum")),
        depends_on=[str(item) for item in payload.get("depends_on", [])],
        branch_labels=[str(item) for item in payload.get("branch_labels", [])],
        safety=dict(payload.get("safety", {})),
        metadata=dict(payload.get("metadata", {})),
        artifact_version=artifact_version,
        version=int(payload.get("version", artifact_version)),
    )
    if artifact.checksum:
        artifact.validate_checksum()
    return artifact

from_json classmethod

from_json(payload)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def from_json(cls, payload: str | bytes | bytearray) -> MigrationArtifact:
    return cls.from_dict(json.loads(payload))

from_toml classmethod

from_toml(payload)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def from_toml(cls, payload: str | bytes | bytearray) -> MigrationArtifact:
    return cls.from_dict(toml_loads(payload))

read classmethod

read(path, *, format=None)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def read(
    cls, path: str | PathLike[str], *, format: str | None = None
) -> MigrationArtifact:
    input_path = Path(path)
    document = input_path.read_text()
    if document_format(str(input_path), format) == "toml":
        return cls.from_toml(document)
    return cls.from_json(document)

to_plan

to_plan()
Source code in ormdantic/_migrations/artifacts.py
def to_plan(self) -> MigrationPlan:
    return MigrationPlan(
        operations=[
            operation_from_dict(operation_to_dict(operation))
            for operation in self.operations
        ],
        rollback_operations=[
            operation_from_dict(operation_to_dict(operation))
            for operation in self.rollback_operations
        ],
        diff=diff_from_dict(diff_to_dict(self.diff)),
        warnings=[
            warning_from_dict(warning_to_dict(warning)) for warning in self.warnings
        ],
        safety=dict(self.safety),
    )

to_dict

to_dict()
Source code in ormdantic/_migrations/artifacts.py
def to_dict(self) -> dict[str, Any]:
    return {
        "version": self.version,
        "artifact_version": self.artifact_version,
        "revision": self.revision,
        "description": self.description,
        "created_at": self.created_at,
        "dialect": self.dialect,
        "checksum": self.checksum,
        "depends_on": list(self.depends_on),
        "branch_labels": list(self.branch_labels),
        "from_snapshot": self.from_snapshot.to_dict(),
        "to_snapshot": self.to_snapshot.to_dict(),
        "up": [operation_to_dict(operation) for operation in self.operations],
        "down": [
            operation_to_dict(operation) for operation in self.rollback_operations
        ],
        "diff": diff_to_dict(self.diff),
        "warnings": [warning_to_dict(warning) for warning in self.warnings],
        "safety": dict(self.safety),
        "metadata": dict(self.metadata),
    }

to_json

to_json(*, indent=2)
Source code in ormdantic/_migrations/artifacts.py
def to_json(self, *, indent: int | None = 2) -> str:
    return json.dumps(self.to_dict(), indent=indent, sort_keys=True)

to_toml

to_toml()
Source code in ormdantic/_migrations/artifacts.py
def to_toml(self) -> str:
    return toml_dumps(self.to_dict())

write

write(path, *, format=None)
Source code in ormdantic/_migrations/artifacts.py
def write(self, path: str | PathLike[str], *, format: str | None = None) -> None:
    output = Path(path)
    output.parent.mkdir(parents=True, exist_ok=True)
    if document_format(str(output), format) == "toml":
        output.write_text(self.to_toml())
    else:
        output.write_text(self.to_json())

with_checksum

with_checksum()
Source code in ormdantic/_migrations/artifacts.py
def with_checksum(self) -> MigrationArtifact:
    payload = dict(self.to_dict())
    payload.pop("checksum", None)
    checksum = artifact_checksum(payload)
    return MigrationArtifact(
        revision=self.revision,
        from_snapshot=self.from_snapshot,
        to_snapshot=self.to_snapshot,
        operations=self.operations,
        rollback_operations=self.rollback_operations,
        diff=self.diff,
        warnings=self.warnings,
        description=self.description,
        created_at=self.created_at,
        dialect=self.dialect,
        checksum=checksum,
        depends_on=self.depends_on,
        branch_labels=self.branch_labels,
        safety=self.safety,
        metadata=self.metadata,
        artifact_version=self.artifact_version,
        version=self.version,
    )

validate_checksum

validate_checksum()
Source code in ormdantic/_migrations/artifacts.py
def validate_checksum(self) -> None:
    if not self.checksum:
        return
    payload = dict(self.to_dict())
    payload.pop("checksum", None)
    expected = artifact_checksum(payload)
    if expected != self.checksum:
        raise ValueError(
            f"migration artifact checksum mismatch for revision {self.revision}: "
            f"expected {self.checksum}, calculated {expected}"
        )