Skip to content

Session

The session API provides a small async unit-of-work wrapper around an Ormdantic database instance.

ormdantic.session.Session

Session(database)

Minimal async unit-of-work session for Ormdantic models.

Create a session bound to an Ormdantic database instance.

Source code in ormdantic/session.py
def __init__(self, database: Any) -> None:
    """Create a session bound to an `Ormdantic` database instance."""
    self._database = database
    self._new: list[BaseModel] = []
    self._dirty: list[BaseModel] = []
    self._deleted: list[BaseModel] = []
    self._identity_map: dict[tuple[type[BaseModel], Any], BaseModel] = {}
    self._closed = False

add

add(model)

Stage a new model for insertion on flush.

Source code in ormdantic/session.py
def add(self, model: BaseModel) -> None:
    """Stage a new model for insertion on flush."""
    if model not in self._new:
        self._new.append(model)

mark_dirty

mark_dirty(model)

Stage an existing model for update on flush.

Source code in ormdantic/session.py
def mark_dirty(self, model: BaseModel) -> None:
    """Stage an existing model for update on flush."""
    if model not in self._dirty:
        self._dirty.append(model)

delete

delete(model)

Stage an existing model for deletion on flush.

Source code in ormdantic/session.py
def delete(self, model: BaseModel) -> None:
    """Stage an existing model for deletion on flush."""
    if model not in self._deleted:
        self._deleted.append(model)

merge

merge(model)

Merge a detached model into the identity map and stage it as dirty.

Source code in ormdantic/session.py
def merge(self, model: BaseModel) -> BaseModel:
    """Merge a detached model into the identity map and stage it as dirty."""
    table = self._database._table_map.model_to_data[type(model)]
    key = (type(model), getattr(model, table.pk))
    if cached := self._identity_map.get(key):
        for field, value in model.__dict__.items():
            setattr(cached, field, value)
        self.mark_dirty(cached)
        return cached
    self._remember(model)
    self.mark_dirty(model)
    return model

expire

expire(model)

Remove a model from the identity map.

Source code in ormdantic/session.py
def expire(self, model: BaseModel) -> None:
    """Remove a model from the identity map."""
    table = self._database._table_map.model_to_data[type(model)]
    self._identity_map.pop((type(model), getattr(model, table.pk)), None)

flush async

flush()

Write staged inserts and updates without ending the transaction.

Source code in ormdantic/session.py
async def flush(self) -> None:
    """Write staged inserts and updates without ending the transaction."""
    await self._database._events.dispatch("before_flush", session=self)
    for model in list(self._new):
        stored = await self._database[type(model)].insert(model)
        self._remember(stored)
    self._new.clear()

    for model in list(self._dirty):
        stored = await self._database[type(model)].update(model)
        self._remember(stored)
    self._dirty.clear()

    for model in list(self._deleted):
        table = self._database._table_map.model_to_data[type(model)]
        pk = getattr(model, table.pk)
        await self._database[type(model)].delete(pk)
        self._identity_map.pop((type(model), pk), None)
    self._deleted.clear()
    await self._database._events.dispatch("after_flush", session=self)

commit async

commit()

Flush changes and commit the active transaction.

Source code in ormdantic/session.py
async def commit(self) -> None:
    """Flush changes and commit the active transaction."""
    if self._closed:
        return
    await self.flush()
    await self._database._commit()
    self._closed = True

rollback async

rollback()

Discard staged changes and roll back the active transaction.

Source code in ormdantic/session.py
async def rollback(self) -> None:
    """Discard staged changes and roll back the active transaction."""
    if self._closed:
        return
    self._new.clear()
    self._dirty.clear()
    self._deleted.clear()
    await self._database._rollback()
    self._closed = True

refresh async

refresh(model, *, depth=0)

Reload a model by primary key and remember the refreshed instance.

Source code in ormdantic/session.py
async def refresh(self, model: BaseModel, *, depth: int = 0) -> BaseModel | None:
    """Reload a model by primary key and remember the refreshed instance."""
    table = self._database._table_map.model_to_data[type(model)]
    refreshed = await self._database[type(model)].find_one(
        getattr(model, table.pk), depth=depth
    )
    if refreshed is not None:
        self._remember(refreshed)
    return refreshed

get_cached

get_cached(model_type, pk)

Return a model from the identity map if it has been remembered.

Source code in ormdantic/session.py
def get_cached(self, model_type: type[BaseModel], pk: Any) -> BaseModel | None:
    """Return a model from the identity map if it has been remembered."""
    return self._identity_map.get((model_type, pk))

get async

get(model_type, pk, *, depth=0)

Return a cached model or load it by primary key.

Source code in ormdantic/session.py
async def get(
    self, model_type: type[BaseModel], pk: Any, *, depth: int = 0
) -> BaseModel | None:
    """Return a cached model or load it by primary key."""
    if cached := self.get_cached(model_type, pk):
        return cached
    loaded = await self._database[model_type].find_one(pk, depth=depth)
    if loaded is not None:
        self._remember(loaded)
    return loaded