Migrations¶
The migration API provides snapshots, diffs, generated plans, migration artifacts, history operations, rollback, repair, and squash helpers.
ormdantic.migrations.MigrationManager
¶
MigrationManager(database)
Generate, review, apply, and roll back SQL migration plans.
Source code in ormdantic/migrations.py
def __init__(self, database: Any) -> None:
self._database = database
snapshot
¶
snapshot()
Return a serializable snapshot for the currently registered models.
Source code in ormdantic/migrations.py
def snapshot(self) -> SchemaSnapshot:
"""Return a serializable snapshot for the currently registered models."""
return SchemaSnapshot.from_database(
self._database,
native_enum_types=self._dialect() == "postgresql",
)
live_snapshot
¶
live_snapshot(
*, include_tables=None, exclude_tables=None, schema=None
)
Return a live schema snapshot reflected from the current database.
Source code in ormdantic/migrations.py
def live_snapshot(
self,
*,
include_tables: Sequence[str] | None = None,
exclude_tables: Sequence[str] | None = None,
schema: str | None = None,
) -> SchemaSnapshot:
"""Return a live schema snapshot reflected from the current database."""
return _reflect_schema_snapshot(
self._connection_url,
dialect=self._dialect(),
include_tables=include_tables,
exclude_tables=exclude_tables,
schema=schema,
)
autogenerate
¶
autogenerate(
revision,
*,
dialect=None,
include_tables=None,
exclude_tables=None,
schema=None,
description=None,
depends_on=None,
branch_labels=None,
path=None,
skip_noop=True
)
Generate a migration artifact by diffing live schema against models.
Source code in ormdantic/migrations.py
def autogenerate(
self,
revision: str,
*,
dialect: str | None = None,
include_tables: Sequence[str] | None = None,
exclude_tables: Sequence[str] | None = None,
schema: str | None = None,
description: str | None = None,
depends_on: Sequence[str] | None = None,
branch_labels: Sequence[str] | None = None,
path: str | PathLike[str] | None = None,
skip_noop: bool = True,
) -> MigrationArtifact | None:
"""Generate a migration artifact by diffing live schema against models."""
before = self.live_snapshot(
include_tables=include_tables,
exclude_tables=exclude_tables,
schema=schema,
)
after = _filter_snapshot_for_autogenerate_scope(
self.snapshot(),
include_tables=include_tables,
exclude_tables=exclude_tables,
schema=schema,
)
if schema is not None and after.tables:
after = replace(
after,
tables=[
replace(table, schema=schema) if table.schema is None else table
for table in after.tables
],
)
if schema is not None and after.enum_types:
after = replace(
after,
enum_types=[
(
replace(enum_type, schema=schema)
if enum_type.schema is None
else enum_type
)
for enum_type in after.enum_types
],
)
if schema is not None and after.sequences:
after = replace(
after,
sequences=[
(
replace(sequence, schema=schema)
if sequence.schema is None
else sequence
)
for sequence in after.sequences
],
)
if schema is not None and after.views:
after = replace(
after,
views=[
replace(view, schema=schema) if view.schema is None else view
for view in after.views
],
)
if before.enum_types and not after.enum_types:
# Non-PostgreSQL target snapshots do not infer native enum types.
# Preserve reflected objects so autogenerate does not plan accidental drops.
after = replace(after, enum_types=list(before.enum_types))
if before.namespaces and not after.namespaces:
# Namespace ownership is explicit. Preserve reflected namespaces unless
# the model snapshot registers its own namespace target.
after = replace(after, namespaces=list(before.namespaces))
if before.sequences and not after.sequences:
# Preserve reflected sequences unless the model snapshot registers its
# own sequence target.
after = replace(after, sequences=list(before.sequences))
if before.views and not after.views:
# Preserve reflected views unless the model snapshot registers its own
# view target.
after = replace(after, views=list(before.views))
active_dialect = dialect or self._connection_url
plan = _build_plan(active_dialect, before, after)
if plan.is_empty() and skip_noop:
return None
artifact = MigrationArtifact.from_plan(
revision,
plan,
before,
after,
dialect=active_dialect,
description=description,
depends_on=depends_on,
branch_labels=branch_labels,
metadata={"autogenerated": True, "schema": schema},
)
if path is not None:
artifact.write(path)
return artifact
diff
¶
diff(from_snapshot=None, to_snapshot=None)
Return structured schema changes between two snapshots.
Source code in ormdantic/migrations.py
def diff(
self,
from_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
to_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
) -> SchemaDiff:
"""Return structured schema changes between two snapshots."""
before, after = self._resolve_snapshots(from_snapshot, to_snapshot)
return diff_snapshots(before, after, dialect=self._database._connection)
generate_plan
¶
generate_plan(
from_snapshot=None, to_snapshot=None, *, dialect=None
)
Generate a migration plan and rollback SQL between two snapshots.
Source code in ormdantic/migrations.py
def generate_plan(
self,
from_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
to_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
*,
dialect: str | None = None,
) -> MigrationPlan:
"""Generate a migration plan and rollback SQL between two snapshots."""
before, after = self._resolve_snapshots(from_snapshot, to_snapshot)
return _build_plan(dialect or self._database._connection, before, after)
create_migration
¶
create_migration(
revision,
from_snapshot=None,
to_snapshot=None,
*,
dialect=None,
description=None,
depends_on=None,
branch_labels=None,
path=None
)
Generate a serializable migration artifact.
Source code in ormdantic/migrations.py
def create_migration(
self,
revision: str,
from_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
to_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
*,
dialect: str | None = None,
description: str | None = None,
depends_on: Sequence[str] | None = None,
branch_labels: Sequence[str] | None = None,
path: str | PathLike[str] | None = None,
) -> MigrationArtifact:
"""Generate a serializable migration artifact."""
before, after = self._resolve_snapshots(from_snapshot, to_snapshot)
artifact = create_migration_artifact(
revision,
before,
after,
dialect=dialect or self._database._connection,
description=description,
depends_on=depends_on,
branch_labels=branch_labels,
)
if path is not None:
artifact.write(path)
return artifact
dry_run
¶
dry_run(
from_snapshot=None, to_snapshot=None, *, dialect=None
)
Return generated SQL without applying it.
Source code in ormdantic/migrations.py
def dry_run(
self,
from_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
to_snapshot: SchemaSnapshot | Mapping[str, Any] | None = None,
*,
dialect: str | None = None,
) -> list[str]:
"""Return generated SQL without applying it."""
return self.generate_plan(
from_snapshot,
to_snapshot,
dialect=dialect,
).dry_run()
ensure_revision_table
async
¶
ensure_revision_table()
Create/upgrade the migration history table when missing.
Source code in ormdantic/migrations.py
async def ensure_revision_table(self) -> None:
"""Create/upgrade the migration history table when missing."""
connection = self._native_connection()
_ensure_migration_history_table(connection, self._dialect())
applied_revisions
async
¶
applied_revisions()
Return applied migration revisions ordered by apply time.
Source code in ormdantic/migrations.py
async def applied_revisions(self) -> list[str]:
"""Return applied migration revisions ordered by apply time."""
return [
entry.revision
for entry in await self.history()
if entry.status == MIGRATION_STATUS_APPLIED and not entry.dirty
]
history
async
¶
history()
Return migration history rows.
Source code in ormdantic/migrations.py
async def history(self) -> list[MigrationHistoryEntry]:
"""Return migration history rows."""
connection = self._native_connection()
dialect = self._dialect()
_ensure_migration_history_table(connection, dialect)
return _history_entries(connection, dialect)
current
async
¶
current()
Return the latest successfully applied revision.
Source code in ormdantic/migrations.py
async def current(self) -> MigrationHistoryEntry | None:
"""Return the latest successfully applied revision."""
connection = self._native_connection()
dialect = self._dialect()
_ensure_migration_history_table(connection, dialect)
return _current_entry(connection, dialect)
is_dirty
async
¶
is_dirty()
Return whether migration history is currently marked dirty.
Source code in ormdantic/migrations.py
async def is_dirty(self) -> bool:
"""Return whether migration history is currently marked dirty."""
connection = self._native_connection()
dialect = self._dialect()
_ensure_migration_history_table(connection, dialect)
return _is_dirty(connection, dialect)
status
async
¶
status()
Return current migration status metadata.
Source code in ormdantic/migrations.py
async def status(self) -> dict[str, Any]:
"""Return current migration status metadata."""
current = await self.current()
return {
"dirty": await self.is_dirty(),
"current": current.revision if current else None,
"current_entry": current,
"applied": await self.applied_revisions(),
}
repair
async
¶
repair(
*,
revision=None,
status=None,
clear_dirty=True,
checksum=None
)
Repair migration metadata after a failed/manual migration.
Source code in ormdantic/migrations.py
async def repair(
self,
*,
revision: str | None = None,
status: str | None = None,
clear_dirty: bool = True,
checksum: str | None = None,
) -> int:
"""Repair migration metadata after a failed/manual migration."""
connection = self._native_connection()
dialect = self._dialect()
_ensure_migration_history_table(connection, dialect)
repaired = _repair_history(
connection,
dialect,
revision=revision,
status=status,
clear_dirty=clear_dirty,
checksum=checksum,
)
_commit_migration_history_if_needed(connection, dialect)
return repaired
apply_artifact
async
¶
apply_artifact(artifact, *, allow_destructive=False)
Apply a migration artifact and record its revision.
Source code in ormdantic/migrations.py
async def apply_artifact(
self,
artifact: MigrationArtifact | Mapping[str, Any] | str | PathLike[str],
*,
allow_destructive: bool = False,
) -> bool:
"""Apply a migration artifact and record its revision."""
migration = _coerce_artifact(artifact)
migration.validate_checksum()
return await self.apply(
migration.revision,
migration.to_plan(),
allow_destructive=allow_destructive,
checksum=migration.checksum,
description=migration.description,
artifact_version=migration.artifact_version,
metadata=migration.metadata,
)
apply_file
async
¶
apply_file(path, *, allow_destructive=False)
Apply a migration artifact from disk.
Source code in ormdantic/migrations.py
async def apply_file(
self,
path: str | PathLike[str],
*,
allow_destructive: bool = False,
) -> bool:
"""Apply a migration artifact from disk."""
return await self.apply_artifact(
MigrationArtifact.read(path),
allow_destructive=allow_destructive,
)
apply_directory
async
¶
apply_directory(
path, *, pattern=None, allow_destructive=False
)
Apply migration artifacts in filename order.
Source code in ormdantic/migrations.py
async def apply_directory(
self,
path: str | PathLike[str],
*,
pattern: str | None = None,
allow_destructive: bool = False,
) -> list[str]:
"""Apply migration artifacts in filename order."""
if await self.is_dirty():
raise ValueError(
"migration history is dirty; run `ormdantic migrations repair` before apply-dir"
)
applied = []
applied_set = set(await self.applied_revisions())
for artifact_path in _migration_files(path, pattern):
artifact = MigrationArtifact.read(artifact_path)
missing_dependencies = [
revision
for revision in artifact.depends_on
if revision not in applied_set
]
if missing_dependencies:
raise ValueError(
f"migration {artifact.revision} has missing dependencies: "
+ ", ".join(missing_dependencies)
)
if await self.apply_artifact(
artifact,
allow_destructive=allow_destructive,
):
applied.append(artifact.revision)
applied_set.add(artifact.revision)
return applied
apply
async
¶
apply(
revision,
plan,
*,
allow_destructive=False,
checksum=None,
description=None,
artifact_version=MIGRATION_ARTIFACT_VERSION,
metadata=None
)
Apply a migration plan and record its revision.
Returns False when the revision is already recorded.
Source code in ormdantic/migrations.py
async def apply(
self,
revision: str,
plan: MigrationPlan,
*,
allow_destructive: bool = False,
checksum: str | None = None,
description: str | None = None,
artifact_version: int = MIGRATION_ARTIFACT_VERSION,
metadata: Mapping[str, Any] | None = None,
) -> bool:
"""Apply a migration plan and record its revision.
Returns ``False`` when the revision is already recorded.
"""
connection = self._native_connection()
dialect = self._dialect()
_ensure_migration_history_table(connection, dialect)
if _is_dirty(connection, dialect):
raise ValueError(
"database is marked dirty from a failed migration; run repair before applying"
)
existing = _history_entry(connection, dialect, revision)
expected_checksum = checksum or _plan_checksum(revision, plan)
if (
existing is not None
and existing.status == MIGRATION_STATUS_APPLIED
and not existing.dirty
):
if existing.checksum and existing.checksum != expected_checksum:
raise ValueError(
f"revision {revision} already applied with checksum "
f"{existing.checksum}, requested {expected_checksum}"
)
return False
if plan.has_destructive_operations and not allow_destructive:
raise ValueError(
"migration contains destructive operations; pass "
"allow_destructive=True to apply it"
)
_raise_if_unsupported_sqlite_plan(dialect, plan.operations)
_run_migration_operations(
connection=connection,
dialect=dialect,
revision=revision,
operations=plan.operations,
status=MIGRATION_STATUS_APPLIED,
description=description,
checksum=expected_checksum,
artifact_version=artifact_version,
metadata=dict(metadata or {}),
)
return True
rollback
async
¶
rollback(
revision,
plan,
*,
allow_destructive=False,
checksum=None,
description=None,
artifact_version=MIGRATION_ARTIFACT_VERSION,
metadata=None
)
Run rollback SQL and remove a migration revision.
Source code in ormdantic/migrations.py
async def rollback(
self,
revision: str,
plan: MigrationPlan,
*,
allow_destructive: bool = False,
checksum: str | None = None,
description: str | None = None,
artifact_version: int = MIGRATION_ARTIFACT_VERSION,
metadata: Mapping[str, Any] | None = None,
) -> bool:
"""Run rollback SQL and remove a migration revision."""
connection = self._native_connection()
dialect = self._dialect()
_ensure_migration_history_table(connection, dialect)
existing = _history_entry(connection, dialect, revision)
if existing is None or existing.status != MIGRATION_STATUS_APPLIED:
return False
if not plan.rollback_available:
raise ValueError(
"rollback SQL is unavailable for this migration; "
"provide explicit down SQL or generated rollback operations"
)
rollback_plan = MigrationPlan(
operations=list(plan.rollback_operations),
diff=plan.diff,
warnings=plan.warnings,
safety=plan.safety,
)
if rollback_plan.has_destructive_operations and not allow_destructive:
raise ValueError(
"rollback contains destructive operations; pass "
"allow_destructive=True to apply it"
)
_raise_if_unsupported_sqlite_plan(dialect, rollback_plan.operations)
_run_migration_operations(
connection=connection,
dialect=dialect,
revision=revision,
operations=rollback_plan.operations,
status=MIGRATION_STATUS_ROLLED_BACK,
description=description or existing.description,
checksum=checksum
or existing.checksum
or _plan_checksum(revision, rollback_plan),
artifact_version=artifact_version,
metadata=dict(metadata or {}),
)
return True
rollback_artifact
async
¶
rollback_artifact(artifact, *, allow_destructive=False)
Roll back a migration artifact when rollback SQL is available.
Source code in ormdantic/migrations.py
async def rollback_artifact(
self,
artifact: MigrationArtifact | Mapping[str, Any] | str | PathLike[str],
*,
allow_destructive: bool = False,
) -> bool:
"""Roll back a migration artifact when rollback SQL is available."""
migration = _coerce_artifact(artifact)
return await self.rollback(
migration.revision,
migration.to_plan(),
allow_destructive=allow_destructive,
checksum=migration.checksum,
description=migration.description,
artifact_version=migration.artifact_version,
metadata=migration.metadata,
)
rollback_file
async
¶
rollback_file(path, *, allow_destructive=False)
Roll back a migration artifact from disk.
Source code in ormdantic/migrations.py
async def rollback_file(
self, path: str | PathLike[str], *, allow_destructive: bool = False
) -> bool:
"""Roll back a migration artifact from disk."""
return await self.rollback_artifact(
MigrationArtifact.read(path),
allow_destructive=allow_destructive,
)
squash
¶
squash(revision, artifacts, *, dialect=None, path=None)
Squash contiguous migration artifacts into one net migration.
Source code in ormdantic/migrations.py
def squash(
self,
revision: str,
artifacts: Sequence[
MigrationArtifact | Mapping[str, Any] | str | PathLike[str]
],
*,
dialect: str | None = None,
path: str | PathLike[str] | None = None,
) -> MigrationArtifact:
"""Squash contiguous migration artifacts into one net migration."""
artifact = squash_migrations(
revision,
artifacts,
dialect=dialect or self._database._connection,
).with_checksum()
if path is not None:
artifact.write(path)
return artifact
Snapshot Types¶
The snapshot and plan models live in ormdantic._migrations.models.
ormdantic._migrations.models.SchemaSnapshot
dataclass
¶
SchemaSnapshot(
tables=list(),
namespaces=list(),
enum_types=list(),
sequences=list(),
views=list(),
version=1,
)
Serializable database schema snapshot.
empty
classmethod
¶
empty()
Source code in ormdantic/_migrations/models.py
@classmethod
def empty(cls) -> "SchemaSnapshot":
return cls()
from_runtime
classmethod
¶
from_runtime(tables)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, tables: Sequence[Sequence[Any]]) -> "SchemaSnapshot":
return cls([TableSnapshot.from_runtime(table) for table in tables])
from_database
classmethod
¶
from_database(
database, *, native_enum_types=False, enum_schema=None
)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_database(
cls,
database: Any,
*,
native_enum_types: bool = False,
enum_schema: str | None = None,
) -> "SchemaSnapshot":
prepare_database_relationships(database)
enum_types: list[EnumTypeSnapshot] = []
if native_enum_types:
from ormdantic.schema import enum_type_descriptors
enum_types = [
EnumTypeSnapshot.from_runtime(enum_type)
for enum_type in enum_type_descriptors(
database._table_map,
schema=enum_schema,
)
]
namespaces = [
NamespaceSnapshot.from_runtime(namespace)
for namespace in database._runtime_namespace_specs()
]
sequences = [
SequenceSnapshot.from_runtime(sequence)
for sequence in database._runtime_sequence_specs()
]
views = [
ViewSnapshot.from_runtime(view) for view in database._runtime_view_specs()
]
return cls(
tables=[
TableSnapshot.from_runtime(table)
for table in database._runtime_table_specs(
native_enum_types=native_enum_types,
enum_schema=enum_schema,
)
],
namespaces=namespaces,
enum_types=enum_types,
sequences=sequences,
views=views,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "SchemaSnapshot":
return cls(
tables=[
TableSnapshot.from_dict(table) for table in payload.get("tables", [])
],
namespaces=[
NamespaceSnapshot.from_dict(namespace)
for namespace in payload.get("namespaces", [])
],
enum_types=[
EnumTypeSnapshot.from_dict(enum_type)
for enum_type in payload.get("enum_types", [])
],
sequences=[
SequenceSnapshot.from_dict(sequence)
for sequence in payload.get("sequences", [])
],
views=[ViewSnapshot.from_dict(view) for view in payload.get("views", [])],
version=int(payload.get("version", 1)),
)
from_json
classmethod
¶
from_json(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_json(cls, payload: str | bytes | bytearray) -> "SchemaSnapshot":
return cls.from_dict(json.loads(payload))
from_toml
classmethod
¶
from_toml(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_toml(cls, payload: str | bytes | bytearray) -> "SchemaSnapshot":
return cls.from_dict(toml_loads(payload))
read
classmethod
¶
read(path, *, format=None)
Source code in ormdantic/_migrations/models.py
@classmethod
def read(
cls, path: str | PathLike[str], *, format: str | None = None
) -> "SchemaSnapshot":
input_path = Path(path)
document = input_path.read_text()
if document_format(str(input_path), format) == "toml":
return cls.from_toml(document)
return cls.from_json(document)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> list[RuntimeTableSpec]:
return [table.to_runtime() for table in self.tables]
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"version": self.version,
"tables": [table.to_dict() for table in self.tables],
}
if self.namespaces:
payload["namespaces"] = [
namespace.to_dict() for namespace in self.namespaces
]
if self.enum_types:
payload["enum_types"] = [
enum_type.to_dict() for enum_type in self.enum_types
]
if self.sequences:
payload["sequences"] = [sequence.to_dict() for sequence in self.sequences]
if self.views:
payload["views"] = [view.to_dict() for view in self.views]
return payload
to_json
¶
to_json(*, indent=2)
Source code in ormdantic/_migrations/models.py
def to_json(self, *, indent: int | None = 2) -> str:
return json.dumps(self.to_dict(), indent=indent, sort_keys=True)
to_toml
¶
to_toml()
Source code in ormdantic/_migrations/models.py
def to_toml(self) -> str:
return toml_dumps(self.to_dict())
write
¶
write(path, *, format=None)
Source code in ormdantic/_migrations/models.py
def write(self, path: str | PathLike[str], *, format: str | None = None) -> None:
output = Path(path)
output.parent.mkdir(parents=True, exist_ok=True)
if document_format(str(output), format) == "toml":
output.write_text(self.to_toml())
else:
output.write_text(self.to_json())
ormdantic._migrations.models.TableSnapshot
dataclass
¶
TableSnapshot(
model_key,
name,
primary_key,
schema=None,
columns=list(),
indexes=list(),
unique_constraints=list(),
named_unique_constraints=list(),
check_constraints=list(),
foreign_key_constraints=list(),
exclusion_constraints=list(),
relationships=list(),
comment=None,
tablespace=None,
mysql_engine=None,
mysql_charset=None,
mysql_collation=None,
mysql_row_format=None,
mysql_key_block_size=None,
mysql_pack_keys=None,
mysql_checksum=None,
mysql_delay_key_write=None,
mysql_stats_persistent=None,
mysql_stats_auto_recalc=None,
mysql_stats_sample_pages=None,
mysql_avg_row_length=None,
mysql_max_rows=None,
mysql_min_rows=None,
mysql_insert_method=None,
mysql_data_directory=None,
mysql_index_directory=None,
mysql_connection=None,
mysql_union=list(),
mysql_partition_by=None,
mysql_partitions=None,
mysql_subpartition_by=None,
mysql_subpartitions=None,
mysql_auto_increment=None,
postgres_inherits=list(),
postgres_with=list(),
postgres_using=None,
postgres_partition_by=None,
postgres_partition_of=None,
postgres_partition_for=None,
postgres_unlogged=False,
sqlite_strict=False,
sqlite_without_rowid=False,
oracle_compress=None,
)
Serializable table metadata used by migration snapshots.
unique_constraints
class-attribute
instance-attribute
¶
unique_constraints = field(default_factory=list)
named_unique_constraints
class-attribute
instance-attribute
¶
named_unique_constraints = field(default_factory=list)
check_constraints
class-attribute
instance-attribute
¶
check_constraints = field(default_factory=list)
foreign_key_constraints
class-attribute
instance-attribute
¶
foreign_key_constraints = field(default_factory=list)
exclusion_constraints
class-attribute
instance-attribute
¶
exclusion_constraints = field(default_factory=list)
postgres_inherits
class-attribute
instance-attribute
¶
postgres_inherits = field(default_factory=list)
from_runtime
classmethod
¶
from_runtime(table)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, table: Sequence[Any]) -> "TableSnapshot":
sqlite_strict = False
sqlite_without_rowid = False
schema = None
if len(table) > 11:
named_unique_constraints = table[6]
check_constraints = table[7]
foreign_key_constraints = table[8]
exclusion_constraints = table[9]
if is_runtime_table_options(table[10]):
comment = optional_str(table[10][0])
tablespace = optional_str(table[10][1])
mysql_engine = (
optional_str(table[10][2]) if len(table[10]) > 2 else None
)
mysql_charset = (
optional_str(table[10][3]) if len(table[10]) > 3 else None
)
mysql_collation = (
optional_str(table[10][4]) if len(table[10]) > 4 else None
)
new_mysql_options = len(table[10]) > 11
mysql_row_format = (
optional_str(table[10][5]) if new_mysql_options else None
)
postgres_offset = 1 if new_mysql_options else 0
postgres_inherits = (
[str(parent) for parent in table[10][5 + postgres_offset]]
if len(table[10]) > 5 + postgres_offset
else []
)
postgres_with = (
postgres_storage_parameters(table[10][6 + postgres_offset])
if len(table[10]) > 6 + postgres_offset
else []
)
postgres_using = (
optional_str(table[10][7 + postgres_offset])
if len(table[10]) > 7 + postgres_offset
else None
)
postgres_partition_by = (
optional_str(table[10][8 + postgres_offset])
if len(table[10]) > 8 + postgres_offset
else None
)
postgres_partition_of = (
optional_str(table[10][9 + postgres_offset])
if len(table[10]) > 9 + postgres_offset
else None
)
postgres_partition_for = (
optional_str(table[10][10 + postgres_offset])
if len(table[10]) > 10 + postgres_offset
else None
)
postgres_unlogged = (
bool(table[10][11 + postgres_offset])
if len(table[10]) > 11 + postgres_offset
else False
)
sqlite_strict = (
bool(table[10][12 + postgres_offset])
if len(table[10]) > 12 + postgres_offset
else False
)
sqlite_without_rowid = (
bool(table[10][13 + postgres_offset])
if len(table[10]) > 13 + postgres_offset
else False
)
schema = (
optional_str(table[10][14 + postgres_offset])
if len(table[10]) > 14 + postgres_offset
else None
)
oracle_compress = (
oracle_table_compress(table[10][16 + postgres_offset])
if len(table[10]) > 16 + postgres_offset
else None
)
mysql_key_block_size = (
optional_int(table[10][17 + postgres_offset])
if len(table[10]) > 17 + postgres_offset
else None
)
mysql_pack_keys = (
optional_bool(table[10][18 + postgres_offset])
if len(table[10]) > 18 + postgres_offset
else None
)
mysql_checksum = (
optional_bool(table[10][19 + postgres_offset])
if len(table[10]) > 19 + postgres_offset
else None
)
mysql_delay_key_write = (
optional_bool(table[10][20 + postgres_offset])
if len(table[10]) > 20 + postgres_offset
else None
)
mysql_stats_persistent = (
optional_bool(table[10][21 + postgres_offset])
if len(table[10]) > 21 + postgres_offset
else None
)
mysql_stats_auto_recalc = (
optional_bool(table[10][22 + postgres_offset])
if len(table[10]) > 22 + postgres_offset
else None
)
mysql_stats_sample_pages = (
optional_int(table[10][23 + postgres_offset])
if len(table[10]) > 23 + postgres_offset
else None
)
mysql_avg_row_length = (
optional_int(table[10][24 + postgres_offset])
if len(table[10]) > 24 + postgres_offset
else None
)
mysql_max_rows = (
optional_int(table[10][25 + postgres_offset])
if len(table[10]) > 25 + postgres_offset
else None
)
mysql_min_rows = (
optional_int(table[10][26 + postgres_offset])
if len(table[10]) > 26 + postgres_offset
else None
)
mysql_insert_method = (
optional_str(table[10][27 + postgres_offset])
if len(table[10]) > 27 + postgres_offset
else None
)
mysql_data_directory = (
optional_str(table[10][28 + postgres_offset])
if len(table[10]) > 28 + postgres_offset
else None
)
mysql_index_directory = (
optional_str(table[10][29 + postgres_offset])
if len(table[10]) > 29 + postgres_offset
else None
)
mysql_connection = (
optional_str(table[10][30 + postgres_offset])
if len(table[10]) > 30 + postgres_offset
else None
)
mysql_union = (
string_list(table[10][31 + postgres_offset])
if len(table[10]) > 31 + postgres_offset
else []
)
mysql_partition_by = (
optional_str(table[10][32 + postgres_offset])
if len(table[10]) > 32 + postgres_offset
else None
)
mysql_partitions = (
optional_int(table[10][33 + postgres_offset])
if len(table[10]) > 33 + postgres_offset
else None
)
mysql_subpartition_by = (
optional_str(table[10][34 + postgres_offset])
if len(table[10]) > 34 + postgres_offset
else None
)
mysql_subpartitions = (
optional_int(table[10][35 + postgres_offset])
if len(table[10]) > 35 + postgres_offset
else None
)
mysql_auto_increment = (
optional_int(table[10][36 + postgres_offset])
if len(table[10]) > 36 + postgres_offset
else None
)
else:
comment = optional_str(table[10])
tablespace = None
mysql_engine = None
mysql_charset = None
mysql_collation = None
mysql_row_format = None
mysql_key_block_size = None
mysql_pack_keys = None
mysql_checksum = None
mysql_delay_key_write = None
mysql_stats_persistent = None
mysql_stats_auto_recalc = None
mysql_stats_sample_pages = None
mysql_avg_row_length = None
mysql_max_rows = None
mysql_min_rows = None
mysql_insert_method = None
mysql_data_directory = None
mysql_index_directory = None
mysql_connection = None
mysql_union = []
mysql_partition_by = None
mysql_partitions = None
mysql_subpartition_by = None
mysql_subpartitions = None
mysql_auto_increment = None
postgres_inherits = []
postgres_with = []
postgres_using = None
postgres_partition_by = None
postgres_partition_of = None
postgres_partition_for = None
postgres_unlogged = False
schema = None
oracle_compress = None
relationships = table[11]
elif len(table) > 10:
named_unique_constraints = table[6]
check_constraints = table[7]
foreign_key_constraints = table[8]
exclusion_constraints = table[9]
comment = None
tablespace = None
mysql_engine = None
mysql_charset = None
mysql_collation = None
mysql_row_format = None
mysql_key_block_size = None
mysql_pack_keys = None
mysql_checksum = None
mysql_delay_key_write = None
mysql_stats_persistent = None
mysql_stats_auto_recalc = None
mysql_stats_sample_pages = None
mysql_avg_row_length = None
mysql_max_rows = None
mysql_min_rows = None
mysql_insert_method = None
mysql_data_directory = None
mysql_index_directory = None
mysql_connection = None
mysql_union = []
mysql_partition_by = None
mysql_partitions = None
mysql_subpartition_by = None
mysql_subpartitions = None
mysql_auto_increment = None
postgres_inherits = []
postgres_with = []
postgres_using = None
postgres_partition_by = None
postgres_partition_of = None
postgres_partition_for = None
postgres_unlogged = False
schema = None
oracle_compress = None
relationships = table[10]
elif len(table) > 9:
named_unique_constraints = table[6]
check_constraints = table[7]
foreign_key_constraints = table[8]
exclusion_constraints = []
comment = None
tablespace = None
mysql_engine = None
mysql_charset = None
mysql_collation = None
mysql_row_format = None
mysql_key_block_size = None
mysql_pack_keys = None
mysql_checksum = None
mysql_delay_key_write = None
mysql_stats_persistent = None
mysql_stats_auto_recalc = None
mysql_stats_sample_pages = None
mysql_avg_row_length = None
mysql_max_rows = None
mysql_min_rows = None
mysql_insert_method = None
mysql_data_directory = None
mysql_index_directory = None
mysql_connection = None
mysql_union = []
mysql_partition_by = None
mysql_partitions = None
mysql_subpartition_by = None
mysql_subpartitions = None
mysql_auto_increment = None
postgres_inherits = []
postgres_with = []
postgres_using = None
postgres_partition_by = None
postgres_partition_of = None
postgres_partition_for = None
postgres_unlogged = False
schema = None
oracle_compress = None
relationships = table[9]
elif len(table) > 8:
named_unique_constraints = table[6]
check_constraints = table[7]
foreign_key_constraints = []
exclusion_constraints = []
comment = None
tablespace = None
mysql_engine = None
mysql_charset = None
mysql_collation = None
mysql_row_format = None
mysql_key_block_size = None
mysql_pack_keys = None
mysql_checksum = None
mysql_delay_key_write = None
mysql_stats_persistent = None
mysql_stats_auto_recalc = None
mysql_stats_sample_pages = None
mysql_avg_row_length = None
mysql_max_rows = None
mysql_min_rows = None
mysql_insert_method = None
mysql_data_directory = None
mysql_index_directory = None
mysql_connection = None
mysql_union = []
mysql_partition_by = None
mysql_partitions = None
mysql_subpartition_by = None
mysql_subpartitions = None
mysql_auto_increment = None
postgres_inherits = []
postgres_with = []
postgres_using = None
postgres_partition_by = None
postgres_partition_of = None
postgres_partition_for = None
postgres_unlogged = False
schema = None
oracle_compress = None
relationships = table[8]
elif len(table) > 7:
named_unique_constraints = []
check_constraints = table[6]
foreign_key_constraints = []
exclusion_constraints = []
comment = None
tablespace = None
mysql_engine = None
mysql_charset = None
mysql_collation = None
mysql_row_format = None
mysql_key_block_size = None
mysql_pack_keys = None
mysql_checksum = None
mysql_delay_key_write = None
mysql_stats_persistent = None
mysql_stats_auto_recalc = None
mysql_stats_sample_pages = None
mysql_avg_row_length = None
mysql_max_rows = None
mysql_min_rows = None
mysql_insert_method = None
mysql_data_directory = None
mysql_index_directory = None
mysql_connection = None
mysql_union = []
mysql_partition_by = None
mysql_partitions = None
mysql_subpartition_by = None
mysql_subpartitions = None
mysql_auto_increment = None
postgres_inherits = []
postgres_with = []
postgres_using = None
postgres_partition_by = None
postgres_partition_of = None
postgres_partition_for = None
postgres_unlogged = False
schema = None
oracle_compress = None
relationships = table[7]
else:
named_unique_constraints = []
check_constraints = []
foreign_key_constraints = []
exclusion_constraints = []
comment = None
tablespace = None
mysql_engine = None
mysql_charset = None
mysql_collation = None
mysql_row_format = None
mysql_key_block_size = None
mysql_pack_keys = None
mysql_checksum = None
mysql_delay_key_write = None
mysql_stats_persistent = None
mysql_stats_auto_recalc = None
mysql_stats_sample_pages = None
mysql_avg_row_length = None
mysql_max_rows = None
mysql_min_rows = None
mysql_insert_method = None
mysql_data_directory = None
mysql_index_directory = None
mysql_connection = None
mysql_union = []
mysql_partition_by = None
mysql_partitions = None
mysql_subpartition_by = None
mysql_subpartitions = None
mysql_auto_increment = None
postgres_inherits = []
postgres_with = []
postgres_using = None
postgres_partition_by = None
postgres_partition_of = None
postgres_partition_for = None
postgres_unlogged = False
schema = None
oracle_compress = None
relationships = table[6]
return cls(
model_key=str(table[0]),
name=str(table[1]),
primary_key=str(table[2]),
schema=schema,
comment=comment,
tablespace=tablespace,
mysql_engine=mysql_engine,
mysql_charset=mysql_charset,
mysql_collation=mysql_collation,
mysql_row_format=mysql_row_format,
mysql_key_block_size=mysql_key_block_size,
mysql_pack_keys=mysql_pack_keys,
mysql_checksum=mysql_checksum,
mysql_delay_key_write=mysql_delay_key_write,
mysql_stats_persistent=mysql_stats_persistent,
mysql_stats_auto_recalc=mysql_stats_auto_recalc,
mysql_stats_sample_pages=mysql_stats_sample_pages,
mysql_avg_row_length=mysql_avg_row_length,
mysql_max_rows=mysql_max_rows,
mysql_min_rows=mysql_min_rows,
mysql_insert_method=mysql_insert_method,
mysql_data_directory=mysql_data_directory,
mysql_index_directory=mysql_index_directory,
mysql_connection=mysql_connection,
mysql_union=mysql_union,
mysql_partition_by=mysql_partition_by,
mysql_partitions=mysql_partitions,
mysql_subpartition_by=mysql_subpartition_by,
mysql_subpartitions=mysql_subpartitions,
mysql_auto_increment=mysql_auto_increment,
postgres_inherits=postgres_inherits,
postgres_with=postgres_with,
postgres_using=postgres_using,
postgres_partition_by=postgres_partition_by,
postgres_partition_of=postgres_partition_of,
postgres_partition_for=postgres_partition_for,
postgres_unlogged=postgres_unlogged,
sqlite_strict=sqlite_strict,
sqlite_without_rowid=sqlite_without_rowid,
oracle_compress=oracle_compress,
columns=[ColumnSnapshot.from_runtime(column) for column in table[3]],
indexes=[IndexSnapshot.from_runtime(index) for index in table[4]],
unique_constraints=[
[str(column) for column in columns] for columns in table[5]
],
named_unique_constraints=[
UniqueConstraintSnapshot.from_runtime(constraint)
for constraint in named_unique_constraints
],
check_constraints=[
TableCheckSnapshot.from_runtime(check) for check in check_constraints
],
foreign_key_constraints=[
ForeignKeyConstraintSnapshot.from_runtime(constraint)
for constraint in foreign_key_constraints
],
exclusion_constraints=[
ExclusionConstraintSnapshot.from_runtime(constraint)
for constraint in exclusion_constraints
],
relationships=[
RelationshipSnapshot.from_runtime(relationship)
for relationship in relationships
],
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "TableSnapshot":
return cls(
model_key=str(payload["model_key"]),
name=str(payload["name"]),
primary_key=str(payload["primary_key"]),
schema=optional_str(payload.get("schema")),
comment=optional_str(payload.get("comment")),
tablespace=optional_str(payload.get("tablespace")),
mysql_engine=optional_str(payload.get("mysql_engine")),
mysql_charset=optional_str(payload.get("mysql_charset")),
mysql_collation=optional_str(payload.get("mysql_collation")),
mysql_row_format=optional_str(payload.get("mysql_row_format")),
mysql_key_block_size=optional_int(payload.get("mysql_key_block_size")),
mysql_pack_keys=optional_bool(payload.get("mysql_pack_keys")),
mysql_checksum=optional_bool(payload.get("mysql_checksum")),
mysql_delay_key_write=optional_bool(payload.get("mysql_delay_key_write")),
mysql_stats_persistent=optional_bool(payload.get("mysql_stats_persistent")),
mysql_stats_auto_recalc=optional_bool(
payload.get("mysql_stats_auto_recalc")
),
mysql_stats_sample_pages=optional_int(
payload.get("mysql_stats_sample_pages")
),
mysql_avg_row_length=optional_int(payload.get("mysql_avg_row_length")),
mysql_max_rows=optional_int(payload.get("mysql_max_rows")),
mysql_min_rows=optional_int(payload.get("mysql_min_rows")),
mysql_insert_method=optional_str(payload.get("mysql_insert_method")),
mysql_data_directory=optional_str(payload.get("mysql_data_directory")),
mysql_index_directory=optional_str(payload.get("mysql_index_directory")),
mysql_connection=optional_str(payload.get("mysql_connection")),
mysql_union=string_list(payload.get("mysql_union", [])),
mysql_partition_by=optional_str(payload.get("mysql_partition_by")),
mysql_partitions=optional_int(payload.get("mysql_partitions")),
mysql_subpartition_by=optional_str(payload.get("mysql_subpartition_by")),
mysql_subpartitions=optional_int(payload.get("mysql_subpartitions")),
mysql_auto_increment=optional_int(payload.get("mysql_auto_increment")),
postgres_inherits=[
str(parent) for parent in payload.get("postgres_inherits", [])
],
postgres_with=postgres_storage_parameters(payload.get("postgres_with", [])),
postgres_using=optional_str(payload.get("postgres_using")),
postgres_partition_by=optional_str(payload.get("postgres_partition_by")),
postgres_partition_of=optional_str(payload.get("postgres_partition_of")),
postgres_partition_for=optional_str(payload.get("postgres_partition_for")),
postgres_unlogged=bool(payload.get("postgres_unlogged", False)),
sqlite_strict=bool(payload.get("sqlite_strict", False)),
sqlite_without_rowid=bool(payload.get("sqlite_without_rowid", False)),
oracle_compress=oracle_table_compress(payload.get("oracle_compress")),
columns=[
ColumnSnapshot.from_dict(column)
for column in payload.get("columns", [])
],
indexes=[
IndexSnapshot.from_dict(index) for index in payload.get("indexes", [])
],
unique_constraints=[
[str(column) for column in columns]
for columns in payload.get("unique_constraints", [])
],
named_unique_constraints=[
UniqueConstraintSnapshot.from_dict(constraint)
for constraint in payload.get("named_unique_constraints", [])
],
check_constraints=[
TableCheckSnapshot.from_dict(check)
for check in payload.get("check_constraints", [])
],
foreign_key_constraints=[
ForeignKeyConstraintSnapshot.from_dict(constraint)
for constraint in payload.get("foreign_key_constraints", [])
],
exclusion_constraints=[
ExclusionConstraintSnapshot.from_dict(constraint)
for constraint in payload.get("exclusion_constraints", [])
],
relationships=[
RelationshipSnapshot.from_dict(relationship)
for relationship in payload.get("relationships", [])
],
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeTableSpec:
return (
self.model_key,
self.name,
self.primary_key,
[column.to_runtime() for column in self.columns],
[index.to_runtime() for index in self.indexes],
[list(columns) for columns in self.unique_constraints],
[constraint.to_runtime() for constraint in self.named_unique_constraints],
[check.to_runtime() for check in self.check_constraints],
[constraint.to_runtime() for constraint in self.foreign_key_constraints],
[constraint.to_runtime() for constraint in self.exclusion_constraints],
(
self.comment,
self.tablespace,
self.mysql_engine,
self.mysql_charset,
self.mysql_collation,
self.mysql_row_format,
list(self.postgres_inherits),
list(self.postgres_with),
self.postgres_using,
self.postgres_partition_by,
self.postgres_partition_of,
self.postgres_partition_for,
self.postgres_unlogged,
self.sqlite_strict,
self.sqlite_without_rowid,
self.schema,
any(index.mssql_clustered for index in self.indexes)
or any(
constraint.mssql_clustered is True
for constraint in self.named_unique_constraints
),
oracle_table_compress_runtime(self.oracle_compress),
self.mysql_key_block_size,
self.mysql_pack_keys,
self.mysql_checksum,
self.mysql_delay_key_write,
self.mysql_stats_persistent,
self.mysql_stats_auto_recalc,
self.mysql_stats_sample_pages,
self.mysql_avg_row_length,
self.mysql_max_rows,
self.mysql_min_rows,
self.mysql_insert_method,
self.mysql_data_directory,
self.mysql_index_directory,
self.mysql_connection,
list(self.mysql_union),
self.mysql_partition_by,
self.mysql_partitions,
self.mysql_subpartition_by,
self.mysql_subpartitions,
self.mysql_auto_increment,
),
[relationship.to_runtime() for relationship in self.relationships],
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"model_key": self.model_key,
"name": self.name,
"primary_key": self.primary_key,
"columns": [column.to_dict() for column in self.columns],
"indexes": [index.to_dict() for index in self.indexes],
"unique_constraints": [
list(columns) for columns in self.unique_constraints
],
"named_unique_constraints": [
constraint.to_dict() for constraint in self.named_unique_constraints
],
"check_constraints": [check.to_dict() for check in self.check_constraints],
"foreign_key_constraints": [
constraint.to_dict() for constraint in self.foreign_key_constraints
],
"exclusion_constraints": [
constraint.to_dict() for constraint in self.exclusion_constraints
],
"relationships": [
relationship.to_dict() for relationship in self.relationships
],
}
if self.schema is not None:
payload["schema"] = self.schema
if self.comment is not None:
payload["comment"] = self.comment
if self.tablespace is not None:
payload["tablespace"] = self.tablespace
if self.mysql_engine is not None:
payload["mysql_engine"] = self.mysql_engine
if self.mysql_charset is not None:
payload["mysql_charset"] = self.mysql_charset
if self.mysql_collation is not None:
payload["mysql_collation"] = self.mysql_collation
if self.mysql_row_format is not None:
payload["mysql_row_format"] = self.mysql_row_format
if self.mysql_key_block_size is not None:
payload["mysql_key_block_size"] = self.mysql_key_block_size
if self.mysql_pack_keys is not None:
payload["mysql_pack_keys"] = self.mysql_pack_keys
if self.mysql_checksum is not None:
payload["mysql_checksum"] = self.mysql_checksum
if self.mysql_delay_key_write is not None:
payload["mysql_delay_key_write"] = self.mysql_delay_key_write
if self.mysql_stats_persistent is not None:
payload["mysql_stats_persistent"] = self.mysql_stats_persistent
if self.mysql_stats_auto_recalc is not None:
payload["mysql_stats_auto_recalc"] = self.mysql_stats_auto_recalc
if self.mysql_stats_sample_pages is not None:
payload["mysql_stats_sample_pages"] = self.mysql_stats_sample_pages
if self.mysql_avg_row_length is not None:
payload["mysql_avg_row_length"] = self.mysql_avg_row_length
if self.mysql_max_rows is not None:
payload["mysql_max_rows"] = self.mysql_max_rows
if self.mysql_min_rows is not None:
payload["mysql_min_rows"] = self.mysql_min_rows
if self.mysql_insert_method is not None:
payload["mysql_insert_method"] = self.mysql_insert_method
if self.mysql_data_directory is not None:
payload["mysql_data_directory"] = self.mysql_data_directory
if self.mysql_index_directory is not None:
payload["mysql_index_directory"] = self.mysql_index_directory
if self.mysql_connection is not None:
payload["mysql_connection"] = self.mysql_connection
if self.mysql_union:
payload["mysql_union"] = list(self.mysql_union)
if self.mysql_partition_by is not None:
payload["mysql_partition_by"] = self.mysql_partition_by
if self.mysql_partitions is not None:
payload["mysql_partitions"] = self.mysql_partitions
if self.mysql_subpartition_by is not None:
payload["mysql_subpartition_by"] = self.mysql_subpartition_by
if self.mysql_subpartitions is not None:
payload["mysql_subpartitions"] = self.mysql_subpartitions
if self.mysql_auto_increment is not None:
payload["mysql_auto_increment"] = self.mysql_auto_increment
if self.postgres_inherits:
payload["postgres_inherits"] = list(self.postgres_inherits)
if self.postgres_with:
payload["postgres_with"] = [
[name, value] for name, value in self.postgres_with
]
if self.postgres_using is not None:
payload["postgres_using"] = self.postgres_using
if self.postgres_partition_by is not None:
payload["postgres_partition_by"] = self.postgres_partition_by
if self.postgres_partition_of is not None:
payload["postgres_partition_of"] = self.postgres_partition_of
if self.postgres_partition_for is not None:
payload["postgres_partition_for"] = self.postgres_partition_for
if self.postgres_unlogged:
payload["postgres_unlogged"] = self.postgres_unlogged
if self.sqlite_strict:
payload["sqlite_strict"] = self.sqlite_strict
if self.sqlite_without_rowid:
payload["sqlite_without_rowid"] = self.sqlite_without_rowid
if self.oracle_compress is not None:
payload["oracle_compress"] = self.oracle_compress
return payload
ormdantic._migrations.models.ColumnSnapshot
dataclass
¶
ColumnSnapshot(
name,
kind,
nullable,
primary_key,
comment=None,
foreign_table=None,
foreign_column=None,
max_length=None,
unique=False,
checks=list(),
server_default=None,
computed=None,
computed_persisted=False,
autoincrement=False,
identity=False,
identity_always=False,
identity_start=None,
identity_increment=None,
identity_min_value=None,
identity_max_value=None,
identity_no_min_value=False,
identity_no_max_value=False,
identity_cycle=False,
identity_cache=None,
identity_order=False,
identity_on_null=False,
collation=None,
numeric_precision=None,
numeric_scale=None,
foreign_key_name=None,
on_delete=None,
on_update=None,
deferrable=None,
initially_deferred=False,
sqlite_on_conflict_primary_key=None,
sqlite_on_conflict_not_null=None,
sqlite_on_conflict_unique=None,
)
Serializable table column metadata used by migration snapshots.
sqlite_on_conflict_primary_key
class-attribute
instance-attribute
¶
sqlite_on_conflict_primary_key = None
from_runtime
classmethod
¶
from_runtime(column)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, column: Sequence[Any]) -> "ColumnSnapshot":
return cls(
name=str(column[0]),
kind=str(column[1]),
nullable=bool(column[2]),
primary_key=bool(column[3]),
foreign_table=optional_str(column[4]),
foreign_column=optional_str(column[5]),
max_length=optional_int(column[6]),
unique=bool(column[7]),
checks=[runtime_check(check) for check in column[8]],
**runtime_column_options(column),
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "ColumnSnapshot":
return cls(
name=str(payload["name"]),
kind=str(payload["kind"]),
nullable=bool(payload["nullable"]),
primary_key=bool(payload["primary_key"]),
foreign_table=optional_str(payload.get("foreign_table")),
foreign_column=optional_str(payload.get("foreign_column")),
max_length=optional_int(payload.get("max_length")),
unique=bool(payload.get("unique", False)),
comment=optional_str(payload.get("comment")),
checks=[runtime_check(check) for check in payload.get("checks", [])],
server_default=optional_str(payload.get("server_default")),
computed=optional_str(payload.get("computed")),
computed_persisted=bool(payload.get("computed_persisted", False)),
autoincrement=bool(payload.get("autoincrement", False)),
identity=bool(payload.get("identity", False)),
identity_always=bool(payload.get("identity_always", False)),
identity_start=optional_int(payload.get("identity_start")),
identity_increment=optional_int(payload.get("identity_increment")),
identity_min_value=optional_int(payload.get("identity_min_value")),
identity_max_value=optional_int(payload.get("identity_max_value")),
identity_no_min_value=bool(payload.get("identity_no_min_value", False)),
identity_no_max_value=bool(payload.get("identity_no_max_value", False)),
identity_cycle=bool(payload.get("identity_cycle", False)),
identity_cache=optional_int(payload.get("identity_cache")),
identity_order=bool(payload.get("identity_order", False)),
identity_on_null=bool(payload.get("identity_on_null", False)),
collation=optional_str(payload.get("collation")),
numeric_precision=optional_int(payload.get("numeric_precision")),
numeric_scale=optional_int(payload.get("numeric_scale")),
foreign_key_name=optional_str(payload.get("foreign_key_name")),
on_delete=optional_str(payload.get("on_delete")),
on_update=optional_str(payload.get("on_update")),
deferrable=optional_bool(payload.get("deferrable")),
initially_deferred=bool(payload.get("initially_deferred", False)),
sqlite_on_conflict_primary_key=optional_str(
payload.get("sqlite_on_conflict_primary_key")
),
sqlite_on_conflict_not_null=optional_str(
payload.get("sqlite_on_conflict_not_null")
),
sqlite_on_conflict_unique=optional_str(
payload.get("sqlite_on_conflict_unique")
),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeColumn:
return (
self.name,
self.kind,
self.nullable,
self.primary_key,
self.foreign_table,
self.foreign_column,
self.max_length,
self.unique,
list(self.checks),
(
self.server_default,
self.computed,
self.computed_persisted,
self.autoincrement,
self.collation,
self.numeric_precision,
self.numeric_scale,
(
(
self.identity_always,
self.identity_start,
self.identity_increment,
self.identity_min_value,
self.identity_max_value,
self.identity_cycle,
self.identity_cache,
self.identity_order,
self.identity_on_null,
self.identity_no_min_value,
self.identity_no_max_value,
)
if self.has_identity
else None
),
self.foreign_key_name,
self.on_delete,
self.on_update,
(
(
(self.deferrable, self.initially_deferred)
if self.deferrable is not None or self.initially_deferred
else None
),
self.comment,
(
(
self.sqlite_on_conflict_primary_key,
self.sqlite_on_conflict_not_null,
self.sqlite_on_conflict_unique,
)
if (
self.sqlite_on_conflict_primary_key is not None
or self.sqlite_on_conflict_not_null is not None
or self.sqlite_on_conflict_unique is not None
)
else None
),
),
),
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"kind": self.kind,
"nullable": self.nullable,
"primary_key": self.primary_key,
"foreign_table": self.foreign_table,
"foreign_column": self.foreign_column,
"max_length": self.max_length,
"unique": self.unique,
"checks": [list(check) for check in self.checks],
}
if self.comment is not None:
payload["comment"] = self.comment
if self.server_default is not None:
payload["server_default"] = self.server_default
if self.computed is not None:
payload["computed"] = self.computed
if self.computed_persisted:
payload["computed_persisted"] = self.computed_persisted
if self.autoincrement:
payload["autoincrement"] = self.autoincrement
if self.has_identity:
payload["identity"] = True
if self.identity_always:
payload["identity_always"] = self.identity_always
if self.identity_start is not None:
payload["identity_start"] = self.identity_start
if self.identity_increment is not None:
payload["identity_increment"] = self.identity_increment
if self.identity_min_value is not None:
payload["identity_min_value"] = self.identity_min_value
if self.identity_max_value is not None:
payload["identity_max_value"] = self.identity_max_value
if self.identity_no_min_value:
payload["identity_no_min_value"] = self.identity_no_min_value
if self.identity_no_max_value:
payload["identity_no_max_value"] = self.identity_no_max_value
if self.identity_cycle:
payload["identity_cycle"] = self.identity_cycle
if self.identity_cache is not None:
payload["identity_cache"] = self.identity_cache
if self.identity_order:
payload["identity_order"] = self.identity_order
if self.identity_on_null:
payload["identity_on_null"] = self.identity_on_null
if self.collation is not None:
payload["collation"] = self.collation
if self.numeric_precision is not None:
payload["numeric_precision"] = self.numeric_precision
if self.numeric_scale is not None:
payload["numeric_scale"] = self.numeric_scale
if self.foreign_key_name is not None:
payload["foreign_key_name"] = self.foreign_key_name
if self.on_delete is not None:
payload["on_delete"] = self.on_delete
if self.on_update is not None:
payload["on_update"] = self.on_update
if self.deferrable is not None:
payload["deferrable"] = self.deferrable
if self.initially_deferred:
payload["initially_deferred"] = self.initially_deferred
if self.sqlite_on_conflict_primary_key is not None:
payload["sqlite_on_conflict_primary_key"] = (
self.sqlite_on_conflict_primary_key
)
if self.sqlite_on_conflict_not_null is not None:
payload["sqlite_on_conflict_not_null"] = self.sqlite_on_conflict_not_null
if self.sqlite_on_conflict_unique is not None:
payload["sqlite_on_conflict_unique"] = self.sqlite_on_conflict_unique
return payload
ormdantic._migrations.models.IndexSnapshot
dataclass
¶
IndexSnapshot(
name,
columns,
unique=False,
where=None,
include_columns=list(),
method=None,
expressions=list(),
postgres_with=list(),
comment=None,
postgres_tablespace=None,
mssql_filegroup=None,
mssql_clustered=False,
oracle_tablespace=None,
mysql_prefix=None,
mysql_length=dict(),
mysql_using=None,
postgres_ops=dict(),
mysql_visible=None,
oracle_bitmap=False,
oracle_compress=None,
postgres_nulls_not_distinct=False,
)
Serializable index metadata used by migration snapshots.
postgres_nulls_not_distinct
class-attribute
instance-attribute
¶
postgres_nulls_not_distinct = False
from_runtime
classmethod
¶
from_runtime(index)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, index: Sequence[Any]) -> "IndexSnapshot":
if len(index) > 13 and isinstance(index[13], Mapping):
mysql_prefix = None
mysql_lengths = mysql_index_lengths(index[13])
mysql_using = optional_str(index[14]) if len(index) > 14 else None
else:
mysql_prefix = optional_str(index[13]) if len(index) > 13 else None
mysql_lengths = mysql_index_lengths(index[14]) if len(index) > 14 else {}
mysql_using = optional_str(index[15]) if len(index) > 15 else None
return cls(
str(index[0]),
[str(column) for column in index[1]],
bool(index[2]),
str(index[3]) if len(index) > 3 and index[3] is not None else None,
[str(column) for column in index[4]] if len(index) > 4 else [],
str(index[5]) if len(index) > 5 and index[5] is not None else None,
[str(expression) for expression in index[6]] if len(index) > 6 else [],
postgres_storage_parameters(index[7]) if len(index) > 7 else [],
optional_str(index[8]) if len(index) > 8 else None,
optional_str(index[9]) if len(index) > 9 else None,
optional_str(index[10]) if len(index) > 10 else None,
bool(index[11]) if len(index) > 11 else False,
optional_str(index[12]) if len(index) > 12 else None,
mysql_prefix,
mysql_lengths,
mysql_using,
postgres_index_ops(index[16]) if len(index) > 16 else {},
optional_bool(index[17]) if len(index) > 17 else None,
bool(index[18]) if len(index) > 18 else False,
oracle_index_compress(index[19]) if len(index) > 19 else None,
bool(index[20]) if len(index) > 20 else False,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "IndexSnapshot":
return cls(
str(payload["name"]),
[str(column) for column in payload["columns"]],
bool(payload.get("unique", False)),
(
str(payload["where"])
if payload.get("where") is not None
else (
str(payload["where_expr"])
if payload.get("where_expr") is not None
else None
)
),
[str(column) for column in payload.get("include_columns", [])],
str(payload["method"]) if payload.get("method") is not None else None,
[str(expression) for expression in payload.get("expressions", [])],
postgres_storage_parameters(payload.get("postgres_with", [])),
optional_str(payload.get("comment")),
optional_str(payload.get("postgres_tablespace")),
optional_str(payload.get("mssql_filegroup")),
bool(payload.get("mssql_clustered", False)),
optional_str(payload.get("oracle_tablespace")),
optional_str(payload.get("mysql_prefix")),
mysql_index_lengths(payload.get("mysql_length", {})),
optional_str(payload.get("mysql_using")),
postgres_index_ops(payload.get("postgres_ops", {})),
optional_bool(payload.get("mysql_visible")),
bool(payload.get("oracle_bitmap", False)),
oracle_index_compress(payload.get("oracle_compress")),
bool(payload.get("postgres_nulls_not_distinct", False)),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeIndex:
return (
self.name,
list(self.columns),
self.unique,
self.where,
list(self.include_columns),
self.method,
list(self.expressions),
list(self.postgres_with),
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"columns": list(self.columns),
"unique": self.unique,
}
if self.where is not None:
payload["where"] = self.where
if self.include_columns:
payload["include_columns"] = list(self.include_columns)
if self.method is not None:
payload["method"] = self.method
if self.expressions:
payload["expressions"] = list(self.expressions)
if self.postgres_with:
payload["postgres_with"] = [
[name, value] for name, value in self.postgres_with
]
if self.comment is not None:
payload["comment"] = self.comment
if self.postgres_tablespace is not None:
payload["postgres_tablespace"] = self.postgres_tablespace
if self.mssql_filegroup is not None:
payload["mssql_filegroup"] = self.mssql_filegroup
if self.mssql_clustered:
payload["mssql_clustered"] = self.mssql_clustered
if self.oracle_tablespace is not None:
payload["oracle_tablespace"] = self.oracle_tablespace
if self.oracle_bitmap:
payload["oracle_bitmap"] = self.oracle_bitmap
if self.oracle_compress is not None:
payload["oracle_compress"] = self.oracle_compress
if self.mysql_prefix is not None:
payload["mysql_prefix"] = self.mysql_prefix
if self.mysql_length:
payload["mysql_length"] = dict(self.mysql_length)
if self.mysql_using is not None:
payload["mysql_using"] = self.mysql_using
if self.postgres_ops:
payload["postgres_ops"] = dict(self.postgres_ops)
if self.mysql_visible is not None:
payload["mysql_visible"] = self.mysql_visible
if self.postgres_nulls_not_distinct:
payload["postgres_nulls_not_distinct"] = self.postgres_nulls_not_distinct
return payload
ormdantic._migrations.models.TableCheckSnapshot
dataclass
¶
TableCheckSnapshot(
name,
expression,
validated=True,
no_inherit=False,
comment=None,
)
Serializable table-level CHECK constraint metadata.
from_runtime
classmethod
¶
from_runtime(check)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, check: Sequence[Any]) -> "TableCheckSnapshot":
return cls(
str(check[0]),
str(check[1]),
bool(check[2]) if len(check) > 2 else True,
bool(check[3]) if len(check) > 3 else False,
optional_str(check[4]) if len(check) > 4 else None,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "TableCheckSnapshot":
return cls(
str(payload["name"]),
str(payload["expression"]),
bool(payload.get("validated", True)),
bool(payload.get("no_inherit", False)),
optional_str(payload.get("comment")),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeTableCheck:
return (self.name, self.expression, self.validated, self.no_inherit)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"expression": self.expression,
}
if not self.validated:
payload["validated"] = False
if self.no_inherit:
payload["no_inherit"] = True
if self.comment is not None:
payload["comment"] = self.comment
return payload
ormdantic._migrations.models.UniqueConstraintSnapshot
dataclass
¶
UniqueConstraintSnapshot(
name,
columns,
deferrable=None,
initially_deferred=False,
nulls_not_distinct=False,
sqlite_on_conflict=None,
mssql_filegroup=None,
mssql_clustered=None,
comment=None,
postgres_include=list(),
oracle_tablespace=None,
oracle_compress=None,
)
Serializable named UNIQUE constraint metadata.
postgres_include
class-attribute
instance-attribute
¶
postgres_include = field(default_factory=list)
from_runtime
classmethod
¶
from_runtime(constraint)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, constraint: Sequence[Any]) -> "UniqueConstraintSnapshot":
mssql_filegroup: str | None = None
mssql_clustered: bool | None = None
oracle_tablespace: str | None = None
oracle_compress: int | bool | None = None
comment: str | None = None
postgres_include: list[str] = []
if len(constraint) > 10:
mssql_filegroup = (
optional_str(constraint[6]) if len(constraint) > 6 else None
)
mssql_clustered = (
optional_bool(constraint[7]) if len(constraint) > 7 else None
)
if len(constraint) > 8:
comment = optional_str(constraint[8])
if len(constraint) > 9 and constraint[9] is not None:
postgres_include = [str(column) for column in constraint[9]]
oracle_tablespace = optional_str(constraint[10])
if len(constraint) > 11:
oracle_compress = oracle_index_compress(constraint[11])
elif len(constraint) == 10 and is_non_string_sequence(constraint[9]):
mssql_filegroup = (
optional_str(constraint[6]) if len(constraint) > 6 else None
)
mssql_clustered = (
optional_bool(constraint[7]) if len(constraint) > 7 else None
)
comment = optional_str(constraint[8])
postgres_include = [str(column) for column in constraint[9]]
elif len(constraint) == 10:
mssql_filegroup = (
optional_str(constraint[6]) if len(constraint) > 6 else None
)
mssql_clustered = (
optional_bool(constraint[7]) if len(constraint) > 7 else None
)
oracle_tablespace = optional_str(constraint[8])
oracle_compress = oracle_index_compress(constraint[9])
elif len(constraint) == 9:
mssql_filegroup = (
optional_str(constraint[6]) if len(constraint) > 6 else None
)
mssql_clustered = (
optional_bool(constraint[7]) if len(constraint) > 7 else None
)
oracle_tablespace = optional_str(constraint[8])
elif len(constraint) > 7 and is_non_string_sequence(constraint[7]):
comment = optional_str(constraint[6]) if len(constraint) > 6 else None
postgres_include = [str(column) for column in constraint[7]]
else:
if len(constraint) > 6:
if isinstance(constraint[6], bool):
mssql_clustered = optional_bool(constraint[6])
elif constraint[6] is not None:
if len(constraint) > 7:
mssql_filegroup = optional_str(constraint[6])
else:
comment = optional_str(constraint[6])
if len(constraint) > 7:
if isinstance(constraint[7], bool):
mssql_clustered = optional_bool(constraint[7])
elif constraint[7] is not None:
comment = optional_str(constraint[7])
if len(constraint) > 8 and constraint[8] is not None:
if is_non_string_sequence(constraint[8]):
postgres_include = [str(column) for column in constraint[8]]
else:
comment = optional_str(constraint[8])
if len(constraint) > 9 and constraint[9] is not None:
postgres_include = [str(column) for column in constraint[9]]
return cls(
str(constraint[0]),
[str(column) for column in constraint[1]],
optional_bool(constraint[2]) if len(constraint) > 2 else None,
bool(constraint[3]) if len(constraint) > 3 else False,
bool(constraint[4]) if len(constraint) > 4 else False,
optional_str(constraint[5]) if len(constraint) > 5 else None,
mssql_filegroup,
mssql_clustered,
comment,
postgres_include,
oracle_tablespace,
oracle_compress,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "UniqueConstraintSnapshot":
return cls(
str(payload["name"]),
[str(column) for column in payload["columns"]],
optional_bool(payload.get("deferrable")),
bool(payload.get("initially_deferred", False)),
bool(payload.get("nulls_not_distinct", False)),
optional_str(payload.get("sqlite_on_conflict")),
optional_str(payload.get("mssql_filegroup")),
optional_bool(payload.get("mssql_clustered")),
optional_str(payload.get("comment")),
[str(column) for column in payload.get("postgres_include") or []],
optional_str(payload.get("oracle_tablespace")),
oracle_index_compress(payload.get("oracle_compress")),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeUniqueConstraint:
return (
self.name,
list(self.columns),
self.deferrable,
self.initially_deferred,
self.nulls_not_distinct,
self.sqlite_on_conflict,
self.mssql_filegroup,
self.mssql_clustered,
self.oracle_tablespace,
oracle_index_compress_runtime(self.oracle_compress),
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"columns": list(self.columns),
}
if self.deferrable is not None:
payload["deferrable"] = self.deferrable
if self.initially_deferred:
payload["initially_deferred"] = self.initially_deferred
if self.nulls_not_distinct:
payload["nulls_not_distinct"] = self.nulls_not_distinct
if self.sqlite_on_conflict is not None:
payload["sqlite_on_conflict"] = self.sqlite_on_conflict
if self.mssql_filegroup is not None:
payload["mssql_filegroup"] = self.mssql_filegroup
if self.mssql_clustered is not None:
payload["mssql_clustered"] = self.mssql_clustered
if self.oracle_tablespace is not None:
payload["oracle_tablespace"] = self.oracle_tablespace
if self.oracle_compress is not None:
payload["oracle_compress"] = self.oracle_compress
if self.comment is not None:
payload["comment"] = self.comment
if self.postgres_include:
payload["postgres_include"] = list(self.postgres_include)
return payload
ormdantic._migrations.models.ForeignKeyConstraintSnapshot
dataclass
¶
ForeignKeyConstraintSnapshot(
name,
columns,
foreign_table,
foreign_columns,
on_delete=None,
on_update=None,
deferrable=None,
initially_deferred=False,
validated=True,
match=None,
comment=None,
)
Serializable table-level FOREIGN KEY constraint metadata.
from_runtime
classmethod
¶
from_runtime(constraint)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(
cls,
constraint: Sequence[Any],
) -> "ForeignKeyConstraintSnapshot":
return cls(
str(constraint[0]),
[str(column) for column in constraint[1]],
str(constraint[2]),
[str(column) for column in constraint[3]],
optional_str(constraint[4]) if len(constraint) > 4 else None,
optional_str(constraint[5]) if len(constraint) > 5 else None,
optional_bool(constraint[6]) if len(constraint) > 6 else None,
bool(constraint[7]) if len(constraint) > 7 else False,
bool(constraint[8]) if len(constraint) > 8 else True,
optional_str(constraint[9]) if len(constraint) > 9 else None,
optional_str(constraint[10]) if len(constraint) > 10 else None,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "ForeignKeyConstraintSnapshot":
return cls(
str(payload["name"]),
[str(column) for column in payload["columns"]],
str(payload["foreign_table"]),
[str(column) for column in payload["foreign_columns"]],
optional_str(payload.get("on_delete")),
optional_str(payload.get("on_update")),
optional_bool(payload.get("deferrable")),
bool(payload.get("initially_deferred", False)),
bool(payload.get("validated", True)),
optional_str(payload.get("match")),
optional_str(payload.get("comment")),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeForeignKeyConstraint:
return (
self.name,
list(self.columns),
self.foreign_table,
list(self.foreign_columns),
self.on_delete,
self.on_update,
self.deferrable,
self.initially_deferred,
self.validated,
self.match,
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"columns": list(self.columns),
"foreign_table": self.foreign_table,
"foreign_columns": list(self.foreign_columns),
}
if self.on_delete is not None:
payload["on_delete"] = self.on_delete
if self.on_update is not None:
payload["on_update"] = self.on_update
if self.deferrable is not None:
payload["deferrable"] = self.deferrable
if self.initially_deferred:
payload["initially_deferred"] = self.initially_deferred
if not self.validated:
payload["validated"] = False
if self.match is not None:
payload["match"] = self.match
if self.comment is not None:
payload["comment"] = self.comment
return payload
ormdantic._migrations.models.ExclusionConstraintSnapshot
dataclass
¶
ExclusionConstraintSnapshot(
name,
columns=list(),
expressions=list(),
using="gist",
where=None,
deferrable=None,
initially_deferred=False,
ops=dict(),
comment=None,
)
Serializable PostgreSQL EXCLUDE constraint metadata.
from_runtime
classmethod
¶
from_runtime(constraint)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(
cls,
constraint: Sequence[Any],
) -> "ExclusionConstraintSnapshot":
ops: dict[str, str] = {}
comment: str | None = None
if len(constraint) > 7:
if isinstance(constraint[7], Mapping):
ops = postgres_index_ops(constraint[7])
comment = optional_str(constraint[8]) if len(constraint) > 8 else None
else:
comment = optional_str(constraint[7])
return cls(
str(constraint[0]),
exclusion_elements(constraint[1]),
exclusion_elements(constraint[2]),
str(constraint[3]) if len(constraint) > 3 else "gist",
optional_str(constraint[4]) if len(constraint) > 4 else None,
optional_bool(constraint[5]) if len(constraint) > 5 else None,
bool(constraint[6]) if len(constraint) > 6 else False,
ops,
comment,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "ExclusionConstraintSnapshot":
return cls(
str(payload["name"]),
exclusion_elements(payload.get("columns", [])),
exclusion_elements(payload.get("expressions", [])),
str(payload.get("using", "gist")),
optional_str(payload.get("where")),
optional_bool(payload.get("deferrable")),
bool(payload.get("initially_deferred", False)),
postgres_index_ops(payload.get("ops", {})),
optional_str(payload.get("comment")),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeExclusionConstraint:
return (
self.name,
list(self.columns),
list(self.expressions),
self.using,
self.where,
self.deferrable,
self.initially_deferred,
dict(self.ops),
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"columns": [list(element) for element in self.columns],
"expressions": [list(element) for element in self.expressions],
"using": self.using,
}
if self.where is not None:
payload["where"] = self.where
if self.deferrable is not None:
payload["deferrable"] = self.deferrable
if self.initially_deferred:
payload["initially_deferred"] = self.initially_deferred
if self.ops:
payload["ops"] = dict(self.ops)
if self.comment is not None:
payload["comment"] = self.comment
return payload
ormdantic._migrations.models.NamespaceSnapshot
dataclass
¶
NamespaceSnapshot(name, comment=None)
Serializable database namespace/schema metadata used by migration snapshots.
from_runtime
classmethod
¶
from_runtime(namespace)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, namespace: Sequence[Any]) -> "NamespaceSnapshot":
return cls(
str(namespace[0]),
optional_str(namespace[1]) if len(namespace) > 1 else None,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "NamespaceSnapshot":
return cls(str(payload["name"]), optional_str(payload.get("comment")))
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeNamespace:
return (self.name, self.comment)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload = {"name": self.name}
if self.comment is not None:
payload["comment"] = self.comment
return payload
ormdantic._migrations.models.SequenceSnapshot
dataclass
¶
SequenceSnapshot(
name,
schema=None,
start=None,
increment=None,
min_value=None,
max_value=None,
cycle=False,
cache=None,
comment=None,
data_type=None,
order=False,
no_min_value=False,
no_max_value=False,
)
Serializable database sequence metadata used by migration snapshots.
from_runtime
classmethod
¶
from_runtime(sequence)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, sequence: Sequence[Any]) -> "SequenceSnapshot":
return cls(
str(sequence[0]),
optional_str(sequence[1]) if len(sequence) > 1 else None,
optional_int(sequence[2]) if len(sequence) > 2 else None,
optional_int(sequence[3]) if len(sequence) > 3 else None,
optional_int(sequence[4]) if len(sequence) > 4 else None,
optional_int(sequence[5]) if len(sequence) > 5 else None,
bool(sequence[6]) if len(sequence) > 6 else False,
optional_int(sequence[7]) if len(sequence) > 7 else None,
optional_str(sequence[8]) if len(sequence) > 8 else None,
optional_str(sequence[9]) if len(sequence) > 9 else None,
bool(sequence[10]) if len(sequence) > 10 else False,
bool(sequence[11]) if len(sequence) > 11 else False,
bool(sequence[12]) if len(sequence) > 12 else False,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "SequenceSnapshot":
return cls(
str(payload["name"]),
optional_str(payload.get("schema")),
optional_int(payload.get("start")),
optional_int(payload.get("increment")),
optional_int(payload.get("min_value")),
optional_int(payload.get("max_value")),
bool(payload.get("cycle", False)),
optional_int(payload.get("cache")),
optional_str(payload.get("comment")),
optional_str(payload.get("data_type")),
bool(payload.get("order", False)),
bool(payload.get("no_min_value", False)),
bool(payload.get("no_max_value", False)),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeSequence:
return (
self.name,
self.schema,
self.start,
self.increment,
self.min_value,
self.max_value,
self.cycle,
self.cache,
self.comment,
self.data_type,
self.order,
self.no_min_value,
self.no_max_value,
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {"name": self.name}
if self.schema is not None:
payload["schema"] = self.schema
if self.start is not None:
payload["start"] = self.start
if self.increment is not None:
payload["increment"] = self.increment
if self.min_value is not None:
payload["min_value"] = self.min_value
if self.max_value is not None:
payload["max_value"] = self.max_value
if self.cycle:
payload["cycle"] = self.cycle
if self.cache is not None:
payload["cache"] = self.cache
if self.comment is not None:
payload["comment"] = self.comment
if self.data_type is not None:
payload["data_type"] = self.data_type
if self.order:
payload["order"] = self.order
if self.no_min_value:
payload["no_min_value"] = self.no_min_value
if self.no_max_value:
payload["no_max_value"] = self.no_max_value
return payload
ormdantic._migrations.models.ViewSnapshot
dataclass
¶
ViewSnapshot(
name,
definition,
schema=None,
materialized=False,
comment=None,
)
Serializable database view metadata used by migration snapshots.
from_runtime
classmethod
¶
from_runtime(view)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, view: Sequence[Any]) -> "ViewSnapshot":
return cls(
str(view[0]),
normalized_view_definition(str(view[2])),
optional_str(view[1]) if len(view) > 1 else None,
bool(view[3]) if len(view) > 3 else False,
optional_str(view[4]) if len(view) > 4 else None,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "ViewSnapshot":
return cls(
str(payload["name"]),
normalized_view_definition(str(payload["definition"])),
optional_str(payload.get("schema")),
bool(payload.get("materialized", False)),
optional_str(payload.get("comment")),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeView:
return (
self.name,
self.schema,
self.definition,
self.materialized,
self.comment,
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"definition": self.definition,
}
if self.schema is not None:
payload["schema"] = self.schema
if self.materialized:
payload["materialized"] = self.materialized
if self.comment is not None:
payload["comment"] = self.comment
return payload
ormdantic._migrations.models.EnumTypeSnapshot
dataclass
¶
EnumTypeSnapshot(name, values, schema=None, comment=None)
Serializable native enum type metadata used by migration snapshots.
from_runtime
classmethod
¶
from_runtime(enum_type)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_runtime(cls, enum_type: Sequence[Any]) -> "EnumTypeSnapshot":
schema = optional_str(enum_type[2]) if len(enum_type) > 2 else None
comment = optional_str(enum_type[3]) if len(enum_type) > 3 else None
return cls(
str(enum_type[0]),
[str(value) for value in enum_type[1]],
schema,
comment,
)
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/models.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> "EnumTypeSnapshot":
return cls(
str(payload["name"]),
[str(value) for value in payload["values"]],
optional_str(payload.get("schema")),
optional_str(payload.get("comment")),
)
to_runtime
¶
to_runtime()
Source code in ormdantic/_migrations/models.py
def to_runtime(self) -> RuntimeEnumType:
return (self.name, list(self.values), self.schema, self.comment)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/models.py
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"values": list(self.values),
}
if self.schema is not None:
payload["schema"] = self.schema
if self.comment is not None:
payload["comment"] = self.comment
return payload
ormdantic._migrations.models.SchemaDiff
dataclass
¶
SchemaDiff(changes=list(), warnings=list())
Structured schema diff output.
ormdantic._migrations.models.MigrationPlan
dataclass
¶
MigrationPlan(
operations=list(),
rollback_operations=list(),
diff=SchemaDiff(),
warnings=list(),
safety=dict(),
)
A generated migration plan.
rollback_operations
class-attribute
instance-attribute
¶
rollback_operations = field(default_factory=list)
is_empty
¶
is_empty()
Source code in ormdantic/_migrations/models.py
def is_empty(self) -> bool:
return not self.operations
dry_run
¶
dry_run()
Source code in ormdantic/_migrations/models.py
def dry_run(self) -> list[str]:
return [operation.sql for operation in self.operations]
rollback_sql
¶
rollback_sql()
Source code in ormdantic/_migrations/models.py
def rollback_sql(self) -> list[str]:
return [operation.sql for operation in self.rollback_operations]
ormdantic._migrations.models.MigrationOperation
dataclass
¶
MigrationOperation(
sql,
values=(),
description=None,
unsafe=False,
destructive=False,
kind="statement",
table=None,
object_name=None,
reversible=True,
requires_lock=True,
requires_rebuild=False,
generated_rollback=False,
metadata=dict(),
)
A SQL migration operation.
ormdantic._migrations.models.MigrationWarning
dataclass
¶
MigrationWarning(code, message, table=None, name=None)
ormdantic._migrations.models.MigrationHistoryEntry
dataclass
¶
MigrationHistoryEntry(
revision,
description=None,
checksum=None,
applied_at=None,
execution_time_ms=None,
status=MIGRATION_STATUS_APPLIED,
dirty=False,
artifact_version=None,
ormdantic_version=None,
metadata=dict(),
)
One row from the durable migration history table.
Artifact Type¶
ormdantic._migrations.artifacts.MigrationArtifact
dataclass
¶
MigrationArtifact(
revision,
from_snapshot,
to_snapshot,
operations=list(),
rollback_operations=list(),
diff=SchemaDiff(),
warnings=list(),
description=None,
created_at=(lambda: isoformat())(),
dialect=None,
checksum=None,
depends_on=list(),
branch_labels=list(),
safety=dict(),
metadata=dict(),
artifact_version=MIGRATION_ARTIFACT_VERSION,
version=MIGRATION_ARTIFACT_VERSION,
)
A serializable migration file with snapshots, SQL, and safety metadata.
rollback_operations
class-attribute
instance-attribute
¶
rollback_operations = field(default_factory=list)
created_at
class-attribute
instance-attribute
¶
created_at = field(default_factory=lambda: isoformat())
from_plan
classmethod
¶
from_plan(
revision,
plan,
from_snapshot,
to_snapshot,
*,
dialect=None,
description=None,
depends_on=None,
branch_labels=None,
metadata=None,
created_at=None
)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def from_plan(
cls,
revision: str,
plan: MigrationPlan,
from_snapshot: SchemaSnapshot,
to_snapshot: SchemaSnapshot,
*,
dialect: str | None = None,
description: str | None = None,
depends_on: Sequence[str] | None = None,
branch_labels: Sequence[str] | None = None,
metadata: Mapping[str, Any] | None = None,
created_at: str | None = None,
) -> MigrationArtifact:
artifact = cls(
revision=revision,
from_snapshot=from_snapshot,
to_snapshot=to_snapshot,
operations=[
operation_from_dict(operation_to_dict(operation))
for operation in plan.operations
],
rollback_operations=[
operation_from_dict(operation_to_dict(operation))
for operation in plan.rollback_operations
],
diff=diff_from_dict(diff_to_dict(plan.diff)),
warnings=[
warning_from_dict(warning_to_dict(warning)) for warning in plan.warnings
],
description=description,
created_at=created_at
or datetime.now(UTC).replace(microsecond=0).isoformat(),
dialect=dialect,
depends_on=[str(item) for item in depends_on or ()],
branch_labels=[str(item) for item in branch_labels or ()],
safety=dict(plan.safety),
metadata=dict(metadata or {}),
artifact_version=MIGRATION_ARTIFACT_VERSION,
version=MIGRATION_ARTIFACT_VERSION,
)
return artifact.with_checksum()
from_dict
classmethod
¶
from_dict(payload)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def from_dict(cls, payload: Mapping[str, Any]) -> MigrationArtifact:
artifact_version = int(
payload.get("artifact_version", payload.get("version", 1))
)
artifact = cls(
revision=str(payload["revision"]),
from_snapshot=SchemaSnapshot.from_dict(payload["from_snapshot"]),
to_snapshot=SchemaSnapshot.from_dict(payload["to_snapshot"]),
operations=[
operation_from_dict(operation)
for operation in payload.get("up", payload.get("operations", []))
],
rollback_operations=[
operation_from_dict(operation)
for operation in payload.get(
"down", payload.get("rollback_operations", [])
)
],
diff=diff_from_dict(payload.get("diff", {})),
warnings=[
warning_from_dict(warning) for warning in payload.get("warnings", [])
],
description=optional_str(payload.get("description")),
created_at=str(
payload.get(
"created_at",
datetime.now(UTC).replace(microsecond=0).isoformat(),
)
),
dialect=optional_str(payload.get("dialect")),
checksum=optional_str(payload.get("checksum")),
depends_on=[str(item) for item in payload.get("depends_on", [])],
branch_labels=[str(item) for item in payload.get("branch_labels", [])],
safety=dict(payload.get("safety", {})),
metadata=dict(payload.get("metadata", {})),
artifact_version=artifact_version,
version=int(payload.get("version", artifact_version)),
)
if artifact.checksum:
artifact.validate_checksum()
return artifact
from_json
classmethod
¶
from_json(payload)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def from_json(cls, payload: str | bytes | bytearray) -> MigrationArtifact:
return cls.from_dict(json.loads(payload))
from_toml
classmethod
¶
from_toml(payload)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def from_toml(cls, payload: str | bytes | bytearray) -> MigrationArtifact:
return cls.from_dict(toml_loads(payload))
read
classmethod
¶
read(path, *, format=None)
Source code in ormdantic/_migrations/artifacts.py
@classmethod
def read(
cls, path: str | PathLike[str], *, format: str | None = None
) -> MigrationArtifact:
input_path = Path(path)
document = input_path.read_text()
if document_format(str(input_path), format) == "toml":
return cls.from_toml(document)
return cls.from_json(document)
to_plan
¶
to_plan()
Source code in ormdantic/_migrations/artifacts.py
def to_plan(self) -> MigrationPlan:
return MigrationPlan(
operations=[
operation_from_dict(operation_to_dict(operation))
for operation in self.operations
],
rollback_operations=[
operation_from_dict(operation_to_dict(operation))
for operation in self.rollback_operations
],
diff=diff_from_dict(diff_to_dict(self.diff)),
warnings=[
warning_from_dict(warning_to_dict(warning)) for warning in self.warnings
],
safety=dict(self.safety),
)
to_dict
¶
to_dict()
Source code in ormdantic/_migrations/artifacts.py
def to_dict(self) -> dict[str, Any]:
return {
"version": self.version,
"artifact_version": self.artifact_version,
"revision": self.revision,
"description": self.description,
"created_at": self.created_at,
"dialect": self.dialect,
"checksum": self.checksum,
"depends_on": list(self.depends_on),
"branch_labels": list(self.branch_labels),
"from_snapshot": self.from_snapshot.to_dict(),
"to_snapshot": self.to_snapshot.to_dict(),
"up": [operation_to_dict(operation) for operation in self.operations],
"down": [
operation_to_dict(operation) for operation in self.rollback_operations
],
"diff": diff_to_dict(self.diff),
"warnings": [warning_to_dict(warning) for warning in self.warnings],
"safety": dict(self.safety),
"metadata": dict(self.metadata),
}
to_json
¶
to_json(*, indent=2)
Source code in ormdantic/_migrations/artifacts.py
def to_json(self, *, indent: int | None = 2) -> str:
return json.dumps(self.to_dict(), indent=indent, sort_keys=True)
to_toml
¶
to_toml()
Source code in ormdantic/_migrations/artifacts.py
def to_toml(self) -> str:
return toml_dumps(self.to_dict())
write
¶
write(path, *, format=None)
Source code in ormdantic/_migrations/artifacts.py
def write(self, path: str | PathLike[str], *, format: str | None = None) -> None:
output = Path(path)
output.parent.mkdir(parents=True, exist_ok=True)
if document_format(str(output), format) == "toml":
output.write_text(self.to_toml())
else:
output.write_text(self.to_json())
with_checksum
¶
with_checksum()
Source code in ormdantic/_migrations/artifacts.py
def with_checksum(self) -> MigrationArtifact:
payload = dict(self.to_dict())
payload.pop("checksum", None)
checksum = artifact_checksum(payload)
return MigrationArtifact(
revision=self.revision,
from_snapshot=self.from_snapshot,
to_snapshot=self.to_snapshot,
operations=self.operations,
rollback_operations=self.rollback_operations,
diff=self.diff,
warnings=self.warnings,
description=self.description,
created_at=self.created_at,
dialect=self.dialect,
checksum=checksum,
depends_on=self.depends_on,
branch_labels=self.branch_labels,
safety=self.safety,
metadata=self.metadata,
artifact_version=self.artifact_version,
version=self.version,
)
validate_checksum
¶
validate_checksum()
Source code in ormdantic/_migrations/artifacts.py
def validate_checksum(self) -> None:
if not self.checksum:
return
payload = dict(self.to_dict())
payload.pop("checksum", None)
expected = artifact_checksum(payload)
if expected != self.checksum:
raise ValueError(
f"migration artifact checksum mismatch for revision {self.revision}: "
f"expected {self.checksum}, calculated {expected}"
)