Table¶
Table is the Python facade over a Rust-owned table handle. It exposes CRUD,
counting, ordering, filtering, and relationship-depth loading while Rust owns
SQL planning and execution.
ormdantic.table.Order
¶
ormdantic.table.Table
¶
Table(
*,
table_data,
table_map,
rust_handle,
events,
runtime=None
)
Bases: Generic[ModelType]
User-facing table handle backed by a Rust PyTableHandle.
Source code in ormdantic/table.py
def __init__(
self,
*,
table_data: OrmTable[ModelType],
table_map: Map,
rust_handle: Any,
events: EventRegistry,
runtime: Any | None = None,
) -> None:
self._table_data = table_data
self._table_map = table_map
self._rust_handle = rust_handle
self._events = events
self._runtime = runtime
self.tablename = table_data.tablename
self.columns = table_data.columns
find_one
async
¶
find_one(pk, depth=0, load=None)
Find a model by primary key.
Source code in ormdantic/table.py
async def find_one(
self,
pk: Any,
depth: int = 0,
load: list[LoaderOption] | None = None,
) -> ModelType | None:
"""Find a model by primary key."""
load_plan = self._resolve_load_plan(depth, load)
if load_plan.paths:
joined_filters, joined_order_by, joined_values = (
self._joined_loader_query_parts(load_plan)
)
joined_values[self._table_data.pk] = py_type_to_sql(self._table_map, pk)
result = self._rust_handle.find_one_with_paths(
joined_values,
list(load_plan.paths),
joined_filters,
joined_order_by,
)
else:
result = self._rust_handle.find_one(
py_type_to_sql(self._table_map, pk), load_plan.depth
)
model = self._deserialize(
result,
is_array=False,
depth=load_plan.depth,
load_paths=load_plan.paths,
load_options=load_plan.options,
)
if model is not None and load_plan.selectin_paths:
await self._load_selectin_graph([model], load_plan)
return model
find_many
async
¶
find_many(
where=None,
order_by=None,
order=asc,
limit=0,
offset=0,
depth=0,
load=None,
)
Find many model instances.
Source code in ormdantic/table.py
async def find_many(
self,
where: dict[str, Any] | QueryExpression | None = None,
order_by: list[str] | None = None,
order: Order = Order.asc,
limit: int = 0,
offset: int = 0,
depth: int = 0,
load: list[LoaderOption] | None = None,
) -> Result[ModelType]:
"""Find many model instances."""
load_plan = self._resolve_load_plan(depth, load)
filters, values = self._compile_where(where)
if load_plan.paths:
joined_filters, joined_order_by, joined_values = (
self._joined_loader_query_parts(load_plan)
)
values.update(joined_values)
result = self._rust_handle.find_many_with_paths(
filters,
values,
order_by or [],
order.value,
limit or None,
offset or None,
list(load_plan.paths),
joined_filters,
joined_order_by,
)
data = (
self._deserialize(
result,
is_array=True,
depth=load_plan.depth,
load_paths=load_plan.paths,
load_options=load_plan.options,
)
or []
)
if load_plan.selectin_paths:
await self._load_selectin_graph(data, load_plan)
else:
result = self._rust_handle.find_many(
filters,
values,
order_by or [],
order.value,
limit or None,
offset or None,
load_plan.depth,
)
data = (
self._deserialize(
result,
is_array=True,
depth=load_plan.depth,
load_paths=load_plan.paths,
load_options=load_plan.options,
)
or []
)
if load_plan.selectin_paths:
await self._load_selectin_graph(data, load_plan)
return Result(offset=offset, limit=limit, data=data)
insert
async
¶
insert(model_instance)
Insert a model instance.
Source code in ormdantic/table.py
async def insert(self, model_instance: ModelType) -> ModelType:
"""Insert a model instance."""
await self._events.dispatch(
"before_insert", model=model_instance, table=self._table_data
)
self._rust_handle.insert(self._payload(model_instance))
await self._events.dispatch(
"after_insert", model=model_instance, table=self._table_data
)
return model_instance
update
async
¶
update(model_instance)
Update a model instance.
Source code in ormdantic/table.py
async def update(self, model_instance: ModelType) -> ModelType:
"""Update a model instance."""
await self._events.dispatch(
"before_update", model=model_instance, table=self._table_data
)
self._rust_handle.update(self._payload(model_instance))
await self._events.dispatch(
"after_update", model=model_instance, table=self._table_data
)
return model_instance
upsert
async
¶
upsert(model_instance)
Insert or update a model instance.
Source code in ormdantic/table.py
async def upsert(self, model_instance: ModelType) -> ModelType:
"""Insert or update a model instance."""
await self._events.dispatch(
"before_upsert", model=model_instance, table=self._table_data
)
self._rust_handle.upsert(self._payload(model_instance))
await self._events.dispatch(
"after_upsert", model=model_instance, table=self._table_data
)
return model_instance
delete
async
¶
delete(pk)
Delete a model by primary key.
Source code in ormdantic/table.py
async def delete(self, pk: Any) -> bool:
"""Delete a model by primary key."""
await self._events.dispatch("before_delete", pk=pk, table=self._table_data)
self._rust_handle.delete(py_type_to_sql(self._table_map, pk))
await self._events.dispatch("after_delete", pk=pk, table=self._table_data)
return True
count
async
¶
count(where=None, depth=0)
Count records matching an optional filter.
Source code in ormdantic/table.py
async def count(
self, where: dict[str, Any] | QueryExpression | None = None, depth: int = 0
) -> int:
"""Count records matching an optional filter."""
filters, values = self._compile_where(where)
result = self._rust_handle.count(filters, values)
return NativeResult(
columns=list(result["columns"]),
rows=[tuple(row) for row in result["rows"]],
).scalar()
select
async
¶
select(
*projections,
query=None,
where=None,
group_by=None,
having=None,
order_by=None,
limit=None,
offset=None,
distinct=False
)
Execute a typed projection query and return raw projected rows.
Source code in ormdantic/table.py
async def select(
self,
*projections: ProjectionExpression | SerializableExpression,
query: SelectExpressionQuery | None = None,
where: QueryExpression | None = None,
group_by: list[SerializableExpression] | None = None,
having: QueryExpression | None = None,
order_by: list[OrderExpression] | None = None,
limit: int | None = None,
offset: int | None = None,
distinct: bool = False,
) -> NativeResult:
"""Execute a typed projection query and return raw projected rows."""
if query is None:
query = select_query(
self.tablename,
*projections,
where=where,
group_by=group_by or (),
having=having,
order_by=order_by or (),
limit=limit,
offset=offset,
distinct=distinct,
)
elif projections:
raise ValueError("pass either query= or projection arguments, not both")
payload = query.to_query_payload()
if payload["table"] != self.tablename:
raise ValueError(
f"typed query targets table '{payload['table']}', not '{self.tablename}'"
)
result = self._rust_handle.select_expression(payload)
return NativeResult(
columns=list(result["columns"]),
rows=[tuple(row) for row in result["rows"]],
)
update_where
async
¶
update_where(*assignments, query=None, where=None)
Execute a typed UPDATE expression query.
Source code in ormdantic/table.py
async def update_where(
self,
*assignments: AssignmentExpression,
query: UpdateExpressionQuery | None = None,
where: QueryExpression | None = None,
) -> NativeResult:
"""Execute a typed UPDATE expression query."""
if query is None:
query = update_query(self.tablename, *assignments, where=where)
elif assignments:
raise ValueError("pass either query= or assignment arguments, not both")
payload = query.to_query_payload()
if payload["table"] != self.tablename:
raise ValueError(
f"typed update targets table '{payload['table']}', not '{self.tablename}'"
)
result = self._rust_handle.update_expression(payload)
return NativeResult(
columns=list(result["columns"]),
rows=[tuple(row) for row in result["rows"]],
)