Session¶
The session API provides a small async unit-of-work wrapper around an Ormdantic database instance.
ormdantic.session.Session
¶
Session(database)
Minimal async unit-of-work session for Ormdantic models.
Create a session bound to an Ormdantic database instance.
Source code in ormdantic/session.py
def __init__(self, database: Any) -> None:
"""Create a session bound to an `Ormdantic` database instance."""
self._database = database
self._new: list[BaseModel] = []
self._dirty: list[BaseModel] = []
self._deleted: list[BaseModel] = []
self._identity_map: dict[tuple[type[BaseModel], Any], BaseModel] = {}
self._closed = False
add
¶
add(model)
Stage a new model for insertion on flush.
Source code in ormdantic/session.py
def add(self, model: BaseModel) -> None:
"""Stage a new model for insertion on flush."""
if model not in self._new:
self._new.append(model)
mark_dirty
¶
mark_dirty(model)
Stage an existing model for update on flush.
Source code in ormdantic/session.py
def mark_dirty(self, model: BaseModel) -> None:
"""Stage an existing model for update on flush."""
if model not in self._dirty:
self._dirty.append(model)
delete
¶
delete(model)
Stage an existing model for deletion on flush.
Source code in ormdantic/session.py
def delete(self, model: BaseModel) -> None:
"""Stage an existing model for deletion on flush."""
if model not in self._deleted:
self._deleted.append(model)
merge
¶
merge(model)
Merge a detached model into the identity map and stage it as dirty.
Source code in ormdantic/session.py
def merge(self, model: BaseModel) -> BaseModel:
"""Merge a detached model into the identity map and stage it as dirty."""
table = self._database._table_map.model_to_data[type(model)]
key = (type(model), getattr(model, table.pk))
if cached := self._identity_map.get(key):
for field, value in model.__dict__.items():
setattr(cached, field, value)
self.mark_dirty(cached)
return cached
self._remember(model)
self.mark_dirty(model)
return model
expire
¶
expire(model)
Remove a model from the identity map.
Source code in ormdantic/session.py
def expire(self, model: BaseModel) -> None:
"""Remove a model from the identity map."""
table = self._database._table_map.model_to_data[type(model)]
self._identity_map.pop((type(model), getattr(model, table.pk)), None)
flush
async
¶
flush()
Write staged inserts and updates without ending the transaction.
Source code in ormdantic/session.py
async def flush(self) -> None:
"""Write staged inserts and updates without ending the transaction."""
await self._database._events.dispatch("before_flush", session=self)
for model in list(self._new):
stored = await self._database[type(model)].insert(model)
self._remember(stored)
self._new.clear()
for model in list(self._dirty):
stored = await self._database[type(model)].update(model)
self._remember(stored)
self._dirty.clear()
for model in list(self._deleted):
table = self._database._table_map.model_to_data[type(model)]
pk = getattr(model, table.pk)
await self._database[type(model)].delete(pk)
self._identity_map.pop((type(model), pk), None)
self._deleted.clear()
await self._database._events.dispatch("after_flush", session=self)
commit
async
¶
commit()
Flush changes and commit the active transaction.
Source code in ormdantic/session.py
async def commit(self) -> None:
"""Flush changes and commit the active transaction."""
if self._closed:
return
await self.flush()
await self._database._commit()
self._closed = True
rollback
async
¶
rollback()
Discard staged changes and roll back the active transaction.
Source code in ormdantic/session.py
async def rollback(self) -> None:
"""Discard staged changes and roll back the active transaction."""
if self._closed:
return
self._new.clear()
self._dirty.clear()
self._deleted.clear()
await self._database._rollback()
self._closed = True
refresh
async
¶
refresh(model, *, depth=0)
Reload a model by primary key and remember the refreshed instance.
Source code in ormdantic/session.py
async def refresh(self, model: BaseModel, *, depth: int = 0) -> BaseModel | None:
"""Reload a model by primary key and remember the refreshed instance."""
table = self._database._table_map.model_to_data[type(model)]
refreshed = await self._database[type(model)].find_one(
getattr(model, table.pk), depth=depth
)
if refreshed is not None:
self._remember(refreshed)
return refreshed
get_cached
¶
get_cached(model_type, pk)
Return a model from the identity map if it has been remembered.
Source code in ormdantic/session.py
def get_cached(self, model_type: type[BaseModel], pk: Any) -> BaseModel | None:
"""Return a model from the identity map if it has been remembered."""
return self._identity_map.get((model_type, pk))
get
async
¶
get(model_type, pk, *, depth=0)
Return a cached model or load it by primary key.
Source code in ormdantic/session.py
async def get(
self, model_type: type[BaseModel], pk: Any, *, depth: int = 0
) -> BaseModel | None:
"""Return a cached model or load it by primary key."""
if cached := self.get_cached(model_type, pk):
return cached
loaded = await self._database[model_type].find_one(pk, depth=depth)
if loaded is not None:
self._remember(loaded)
return loaded