Table¶
Table is the CRUD and query handle returned by db[Model].
Most application code does not instantiate Table directly:
flavors = db[Flavor]
result = await flavors.find_many({"rating": {"gte": 4}})
Use table handles for:
- primary-key lookup with
find_one; - filtered lists with
find_many; - writes with
insert,update,upsert, anddelete; - counts with
count; - expression-backed reads with
select; - expression-backed bulk writes with
update_where.
ormdantic.table.Order
¶
ormdantic.table.Table
¶
Table(
*,
table_data,
table_map,
rust_handle,
events,
runtime=None,
connection=None,
debug=False,
log_queries=False
)
Bases: Generic[ModelType]
User-facing table handle backed by a Rust PyTableHandle.
Source code in ormdantic/table.py
def __init__(
self,
*,
table_data: OrmTable[ModelType],
table_map: Map,
rust_handle: Any,
events: EventRegistry,
runtime: Any | None = None,
connection: str | None = None,
debug: bool = False,
log_queries: bool = False,
) -> None:
self._table_data = table_data
self._table_map = table_map
self._rust_handle = rust_handle
self._events = events
self._runtime = runtime
self._connection = connection
self._debug = debug
self._log_queries = log_queries
self.tablename = table_data.tablename
self.columns = table_data.columns
find_one
async
¶
find_one(pk, depth=0, load=None)
Find a model by primary key.
Source code in ormdantic/table.py
async def find_one(
self,
pk: Any,
depth: int = 0,
load: list[LoaderOption] | None = None,
) -> ModelType | None:
"""Find a model by primary key."""
load_plan = self._resolve_load_plan(depth, load)
if load_plan.paths:
joined_filters, joined_order_by, joined_values = (
self._joined_loader_query_parts(load_plan)
)
joined_values[self._table_data.pk] = py_type_to_sql(self._table_map, pk)
result = await self._execute_rust(
"select_one",
lambda: self._rust_handle.find_one_with_paths(
joined_values,
list(load_plan.paths),
joined_filters,
joined_order_by,
),
parameters=joined_values,
context={"load_paths": list(load_plan.paths)},
)
else:
primary_key = py_type_to_sql(self._table_map, pk)
values = {self._table_data.pk: primary_key}
result = await self._execute_rust(
"select_one",
lambda: self._rust_handle.find_one(primary_key, load_plan.depth),
parameters=values,
compile_query=(
lambda: (
self._compile_select_pk_query()
if load_plan.depth == 0
else None
)
),
context={"depth": load_plan.depth},
)
model = await self._deserialize(
result,
is_array=False,
depth=load_plan.depth,
load_paths=load_plan.paths,
load_options=load_plan.options,
)
if model is not None and load_plan.selectin_paths:
await self._load_selectin_graph([model], load_plan)
return model
find_many
async
¶
find_many(
where=None,
order_by=None,
order=asc,
limit=0,
offset=0,
depth=0,
load=None,
)
Find many model instances.
Source code in ormdantic/table.py
async def find_many(
self,
where: dict[str, Any] | QueryExpression | None = None,
order_by: list[str | OrderExpression] | None = None,
order: Order = Order.asc,
limit: int = 0,
offset: int = 0,
depth: int = 0,
load: list[LoaderOption] | None = None,
) -> Result[ModelType]:
"""Find many model instances."""
load_plan = self._resolve_load_plan(depth, load)
if self._requires_expression_select(where, order_by):
return await self._find_many_expression(
where=where if isinstance(where, QueryExpression) else None,
order_by=order_by or [],
order=order,
limit=limit,
offset=offset,
load_plan=load_plan,
)
filters, values = self._compile_where(where)
legacy_order_by = self._legacy_order_columns(order_by)
if load_plan.paths:
joined_filters, joined_order_by, joined_values = (
self._joined_loader_query_parts(load_plan)
)
values.update(joined_values)
result = await self._execute_rust(
"select_many",
lambda: self._rust_handle.find_many_with_paths(
filters,
values,
legacy_order_by,
order.value,
limit or None,
offset or None,
list(load_plan.paths),
joined_filters,
joined_order_by,
),
parameters=values,
context={
"limit": limit or None,
"offset": offset or None,
"load_paths": list(load_plan.paths),
},
)
data = (
await self._deserialize(
result,
is_array=True,
depth=load_plan.depth,
load_paths=load_plan.paths,
load_options=load_plan.options,
)
or []
)
if load_plan.selectin_paths:
await self._load_selectin_graph(data, load_plan)
else:
result = await self._execute_rust(
"select_many",
lambda: self._rust_handle.find_many(
filters,
values,
legacy_order_by,
order.value,
limit or None,
offset or None,
load_plan.depth,
),
parameters=values,
compile_query=lambda: self._compile_find_many_query(
filters,
legacy_order_by,
order.value,
limit or None,
offset or None,
),
context={
"limit": limit or None,
"offset": offset or None,
"depth": load_plan.depth,
},
)
data = (
await self._deserialize(
result,
is_array=True,
depth=load_plan.depth,
load_paths=load_plan.paths,
load_options=load_plan.options,
)
or []
)
if load_plan.selectin_paths:
await self._load_selectin_graph(data, load_plan)
return Result(offset=offset, limit=limit, data=data)
insert
async
¶
insert(model_instance)
Insert a model instance.
Source code in ormdantic/table.py
async def insert(self, model_instance: ModelType) -> ModelType:
"""Insert a model instance."""
await self._events.dispatch(
"before_create", model=model_instance, table=self._table_data
)
await self._events.dispatch(
"before_insert", model=model_instance, table=self._table_data
)
payload = self._payload(model_instance, mode="insert")
await self._execute_rust(
"insert",
lambda: self._rust_handle.insert(payload),
parameters=payload,
compile_query=lambda: self._compile_insert_query(payload),
)
await self._events.dispatch(
"after_insert", model=model_instance, table=self._table_data
)
await self._events.dispatch(
"after_create", model=model_instance, table=self._table_data
)
return model_instance
update
async
¶
update(model_instance)
Update a model instance.
Source code in ormdantic/table.py
async def update(self, model_instance: ModelType) -> ModelType:
"""Update a model instance."""
await self._events.dispatch(
"before_update", model=model_instance, table=self._table_data
)
payload = self._payload(model_instance, mode="update")
await self._execute_rust(
"update",
lambda: self._rust_handle.update(payload),
parameters=payload,
compile_query=lambda: self._compile_update_query(payload),
)
await self._events.dispatch(
"after_update", model=model_instance, table=self._table_data
)
return model_instance
upsert
async
¶
upsert(model_instance)
Insert or update a model instance.
Source code in ormdantic/table.py
async def upsert(self, model_instance: ModelType) -> ModelType:
"""Insert or update a model instance."""
await self._events.dispatch(
"before_upsert", model=model_instance, table=self._table_data
)
payload = self._payload(model_instance, mode="upsert")
await self._execute_rust(
"upsert",
lambda: self._rust_handle.upsert(payload),
parameters=payload,
compile_query=lambda: self._compile_upsert_query(payload),
)
await self._events.dispatch(
"after_upsert", model=model_instance, table=self._table_data
)
return model_instance
delete
async
¶
delete(pk)
Delete a model by primary key.
Source code in ormdantic/table.py
async def delete(self, pk: Any) -> bool:
"""Delete a model by primary key."""
await self._events.dispatch("before_delete", pk=pk, table=self._table_data)
primary_key = py_type_to_sql(self._table_map, pk)
await self._execute_rust(
"delete",
lambda: self._rust_handle.delete(primary_key),
parameters={self._table_data.pk: primary_key},
compile_query=self._compile_delete_query,
)
await self._events.dispatch("after_delete", pk=pk, table=self._table_data)
return True
count
async
¶
count(where=None, depth=0)
Count records matching an optional filter.
Source code in ormdantic/table.py
async def count(
self, where: dict[str, Any] | QueryExpression | None = None, depth: int = 0
) -> int:
"""Count records matching an optional filter."""
if isinstance(where, QueryExpression) and not where.supports_legacy_filters():
result = await self.select(count_expr(), where=where)
return int(result.scalar())
filters, values = self._compile_where(where)
result = await self._execute_rust(
"count",
lambda: self._rust_handle.count(filters, values),
parameters=values,
compile_query=lambda: self._compile_count_query(filters),
)
return NativeResult(
columns=list(result["columns"]),
rows=[tuple(row) for row in result["rows"]],
).scalar()
select
async
¶
select(
*projections,
query=None,
where=None,
group_by=None,
having=None,
order_by=None,
limit=None,
offset=None,
distinct=False
)
Execute a typed projection query and return raw projected rows.
Source code in ormdantic/table.py
async def select(
self,
*projections: ProjectionExpression | SerializableExpression,
query: SelectExpressionQuery | None = None,
where: QueryExpression | None = None,
group_by: list[SerializableExpression] | None = None,
having: QueryExpression | None = None,
order_by: list[OrderExpression] | None = None,
limit: int | None = None,
offset: int | None = None,
distinct: bool = False,
) -> NativeResult:
"""Execute a typed projection query and return raw projected rows."""
if query is None:
query = select_query(
self.tablename,
*projections,
where=where,
group_by=group_by or (),
having=having,
order_by=order_by or (),
limit=limit,
offset=offset,
distinct=distinct,
)
elif projections:
raise ValueError("pass either query= or projection arguments, not both")
payload = query.to_query_payload()
if payload["table"] != self.tablename:
raise ValueError(
f"typed query targets table '{payload['table']}', not '{self.tablename}'"
)
result = await self._execute_rust(
"select",
lambda: self._rust_handle.select_expression(payload),
parameters=dict(payload.get("values") or {}),
compile_query=lambda: self._compile_typed_select_query(payload),
)
return NativeResult(
columns=list(result["columns"]),
rows=[tuple(row) for row in result["rows"]],
)
update_where
async
¶
update_where(*assignments, query=None, where=None)
Execute a typed UPDATE expression query.
Source code in ormdantic/table.py
async def update_where(
self,
*assignments: AssignmentExpression,
query: UpdateExpressionQuery | None = None,
where: QueryExpression | None = None,
) -> NativeResult:
"""Execute a typed UPDATE expression query."""
if query is None:
query = update_query(self.tablename, *assignments, where=where)
elif assignments:
raise ValueError("pass either query= or assignment arguments, not both")
payload = query.to_query_payload()
if payload["table"] != self.tablename:
raise ValueError(
f"typed update targets table '{payload['table']}', not '{self.tablename}'"
)
result = await self._execute_rust(
"update",
lambda: self._rust_handle.update_expression(payload),
parameters=dict(payload.get("values") or {}),
compile_query=lambda: self._compile_typed_update_query(payload),
)
return NativeResult(
columns=list(result["columns"]),
rows=[tuple(row) for row in result["rows"]],
)