Skip to content

Table

Table is the CRUD and query handle returned by db[Model].

Most application code does not instantiate Table directly:

flavors = db[Flavor]
result = await flavors.find_many({"rating": {"gte": 4}})

Use table handles for:

  • primary-key lookup with find_one;
  • filtered lists with find_many;
  • writes with insert, update, upsert, and delete;
  • counts with count;
  • expression-backed reads with select;
  • expression-backed bulk writes with update_where.

ormdantic.table.Order

Bases: Enum

Sort direction for table queries.

asc class-attribute instance-attribute

asc = 'asc'

desc class-attribute instance-attribute

desc = 'desc'

ormdantic.table.Table

Table(
    *,
    table_data,
    table_map,
    rust_handle,
    events,
    runtime=None,
    connection=None,
    debug=False,
    log_queries=False
)

Bases: Generic[ModelType]

User-facing table handle backed by a Rust PyTableHandle.

Source code in ormdantic/table.py
def __init__(
    self,
    *,
    table_data: OrmTable[ModelType],
    table_map: Map,
    rust_handle: Any,
    events: EventRegistry,
    runtime: Any | None = None,
    connection: str | None = None,
    debug: bool = False,
    log_queries: bool = False,
) -> None:
    self._table_data = table_data
    self._table_map = table_map
    self._rust_handle = rust_handle
    self._events = events
    self._runtime = runtime
    self._connection = connection
    self._debug = debug
    self._log_queries = log_queries
    self.tablename = table_data.tablename
    self.columns = table_data.columns

tablename instance-attribute

tablename = tablename

columns instance-attribute

columns = columns

find_one async

find_one(pk, depth=0, load=None)

Find a model by primary key.

Source code in ormdantic/table.py
async def find_one(
    self,
    pk: Any,
    depth: int = 0,
    load: list[LoaderOption] | None = None,
) -> ModelType | None:
    """Find a model by primary key."""
    load_plan = self._resolve_load_plan(depth, load)
    if load_plan.paths:
        joined_filters, joined_order_by, joined_values = (
            self._joined_loader_query_parts(load_plan)
        )
        joined_values[self._table_data.pk] = py_type_to_sql(self._table_map, pk)
        result = await self._execute_rust(
            "select_one",
            lambda: self._rust_handle.find_one_with_paths(
                joined_values,
                list(load_plan.paths),
                joined_filters,
                joined_order_by,
            ),
            parameters=joined_values,
            context={"load_paths": list(load_plan.paths)},
        )
    else:
        primary_key = py_type_to_sql(self._table_map, pk)
        values = {self._table_data.pk: primary_key}
        result = await self._execute_rust(
            "select_one",
            lambda: self._rust_handle.find_one(primary_key, load_plan.depth),
            parameters=values,
            compile_query=(
                lambda: (
                    self._compile_select_pk_query()
                    if load_plan.depth == 0
                    else None
                )
            ),
            context={"depth": load_plan.depth},
        )
    model = await self._deserialize(
        result,
        is_array=False,
        depth=load_plan.depth,
        load_paths=load_plan.paths,
        load_options=load_plan.options,
    )
    if model is not None and load_plan.selectin_paths:
        await self._load_selectin_graph([model], load_plan)
    return model

find_many async

find_many(
    where=None,
    order_by=None,
    order=asc,
    limit=0,
    offset=0,
    depth=0,
    load=None,
)

Find many model instances.

Source code in ormdantic/table.py
async def find_many(
    self,
    where: dict[str, Any] | QueryExpression | None = None,
    order_by: list[str | OrderExpression] | None = None,
    order: Order = Order.asc,
    limit: int = 0,
    offset: int = 0,
    depth: int = 0,
    load: list[LoaderOption] | None = None,
) -> Result[ModelType]:
    """Find many model instances."""
    load_plan = self._resolve_load_plan(depth, load)
    if self._requires_expression_select(where, order_by):
        return await self._find_many_expression(
            where=where if isinstance(where, QueryExpression) else None,
            order_by=order_by or [],
            order=order,
            limit=limit,
            offset=offset,
            load_plan=load_plan,
        )
    filters, values = self._compile_where(where)
    legacy_order_by = self._legacy_order_columns(order_by)
    if load_plan.paths:
        joined_filters, joined_order_by, joined_values = (
            self._joined_loader_query_parts(load_plan)
        )
        values.update(joined_values)
        result = await self._execute_rust(
            "select_many",
            lambda: self._rust_handle.find_many_with_paths(
                filters,
                values,
                legacy_order_by,
                order.value,
                limit or None,
                offset or None,
                list(load_plan.paths),
                joined_filters,
                joined_order_by,
            ),
            parameters=values,
            context={
                "limit": limit or None,
                "offset": offset or None,
                "load_paths": list(load_plan.paths),
            },
        )
        data = (
            await self._deserialize(
                result,
                is_array=True,
                depth=load_plan.depth,
                load_paths=load_plan.paths,
                load_options=load_plan.options,
            )
            or []
        )
        if load_plan.selectin_paths:
            await self._load_selectin_graph(data, load_plan)
    else:
        result = await self._execute_rust(
            "select_many",
            lambda: self._rust_handle.find_many(
                filters,
                values,
                legacy_order_by,
                order.value,
                limit or None,
                offset or None,
                load_plan.depth,
            ),
            parameters=values,
            compile_query=lambda: self._compile_find_many_query(
                filters,
                legacy_order_by,
                order.value,
                limit or None,
                offset or None,
            ),
            context={
                "limit": limit or None,
                "offset": offset or None,
                "depth": load_plan.depth,
            },
        )
        data = (
            await self._deserialize(
                result,
                is_array=True,
                depth=load_plan.depth,
                load_paths=load_plan.paths,
                load_options=load_plan.options,
            )
            or []
        )
        if load_plan.selectin_paths:
            await self._load_selectin_graph(data, load_plan)
    return Result(offset=offset, limit=limit, data=data)

insert async

insert(model_instance)

Insert a model instance.

Source code in ormdantic/table.py
async def insert(self, model_instance: ModelType) -> ModelType:
    """Insert a model instance."""
    await self._events.dispatch(
        "before_create", model=model_instance, table=self._table_data
    )
    await self._events.dispatch(
        "before_insert", model=model_instance, table=self._table_data
    )
    payload = self._payload(model_instance, mode="insert")
    await self._execute_rust(
        "insert",
        lambda: self._rust_handle.insert(payload),
        parameters=payload,
        compile_query=lambda: self._compile_insert_query(payload),
    )
    await self._events.dispatch(
        "after_insert", model=model_instance, table=self._table_data
    )
    await self._events.dispatch(
        "after_create", model=model_instance, table=self._table_data
    )
    return model_instance

update async

update(model_instance)

Update a model instance.

Source code in ormdantic/table.py
async def update(self, model_instance: ModelType) -> ModelType:
    """Update a model instance."""
    await self._events.dispatch(
        "before_update", model=model_instance, table=self._table_data
    )
    payload = self._payload(model_instance, mode="update")
    await self._execute_rust(
        "update",
        lambda: self._rust_handle.update(payload),
        parameters=payload,
        compile_query=lambda: self._compile_update_query(payload),
    )
    await self._events.dispatch(
        "after_update", model=model_instance, table=self._table_data
    )
    return model_instance

upsert async

upsert(model_instance)

Insert or update a model instance.

Source code in ormdantic/table.py
async def upsert(self, model_instance: ModelType) -> ModelType:
    """Insert or update a model instance."""
    await self._events.dispatch(
        "before_upsert", model=model_instance, table=self._table_data
    )
    payload = self._payload(model_instance, mode="upsert")
    await self._execute_rust(
        "upsert",
        lambda: self._rust_handle.upsert(payload),
        parameters=payload,
        compile_query=lambda: self._compile_upsert_query(payload),
    )
    await self._events.dispatch(
        "after_upsert", model=model_instance, table=self._table_data
    )
    return model_instance

delete async

delete(pk)

Delete a model by primary key.

Source code in ormdantic/table.py
async def delete(self, pk: Any) -> bool:
    """Delete a model by primary key."""
    await self._events.dispatch("before_delete", pk=pk, table=self._table_data)
    primary_key = py_type_to_sql(self._table_map, pk)
    await self._execute_rust(
        "delete",
        lambda: self._rust_handle.delete(primary_key),
        parameters={self._table_data.pk: primary_key},
        compile_query=self._compile_delete_query,
    )
    await self._events.dispatch("after_delete", pk=pk, table=self._table_data)
    return True

count async

count(where=None, depth=0)

Count records matching an optional filter.

Source code in ormdantic/table.py
async def count(
    self, where: dict[str, Any] | QueryExpression | None = None, depth: int = 0
) -> int:
    """Count records matching an optional filter."""
    if isinstance(where, QueryExpression) and not where.supports_legacy_filters():
        result = await self.select(count_expr(), where=where)
        return int(result.scalar())
    filters, values = self._compile_where(where)
    result = await self._execute_rust(
        "count",
        lambda: self._rust_handle.count(filters, values),
        parameters=values,
        compile_query=lambda: self._compile_count_query(filters),
    )
    return NativeResult(
        columns=list(result["columns"]),
        rows=[tuple(row) for row in result["rows"]],
    ).scalar()

select async

select(
    *projections,
    query=None,
    where=None,
    group_by=None,
    having=None,
    order_by=None,
    limit=None,
    offset=None,
    distinct=False
)

Execute a typed projection query and return raw projected rows.

Source code in ormdantic/table.py
async def select(
    self,
    *projections: ProjectionExpression | SerializableExpression,
    query: SelectExpressionQuery | None = None,
    where: QueryExpression | None = None,
    group_by: list[SerializableExpression] | None = None,
    having: QueryExpression | None = None,
    order_by: list[OrderExpression] | None = None,
    limit: int | None = None,
    offset: int | None = None,
    distinct: bool = False,
) -> NativeResult:
    """Execute a typed projection query and return raw projected rows."""
    if query is None:
        query = select_query(
            self.tablename,
            *projections,
            where=where,
            group_by=group_by or (),
            having=having,
            order_by=order_by or (),
            limit=limit,
            offset=offset,
            distinct=distinct,
        )
    elif projections:
        raise ValueError("pass either query= or projection arguments, not both")
    payload = query.to_query_payload()
    if payload["table"] != self.tablename:
        raise ValueError(
            f"typed query targets table '{payload['table']}', not '{self.tablename}'"
        )
    result = await self._execute_rust(
        "select",
        lambda: self._rust_handle.select_expression(payload),
        parameters=dict(payload.get("values") or {}),
        compile_query=lambda: self._compile_typed_select_query(payload),
    )
    return NativeResult(
        columns=list(result["columns"]),
        rows=[tuple(row) for row in result["rows"]],
    )

update_where async

update_where(*assignments, query=None, where=None)

Execute a typed UPDATE expression query.

Source code in ormdantic/table.py
async def update_where(
    self,
    *assignments: AssignmentExpression,
    query: UpdateExpressionQuery | None = None,
    where: QueryExpression | None = None,
) -> NativeResult:
    """Execute a typed UPDATE expression query."""
    if query is None:
        query = update_query(self.tablename, *assignments, where=where)
    elif assignments:
        raise ValueError("pass either query= or assignment arguments, not both")
    payload = query.to_query_payload()
    if payload["table"] != self.tablename:
        raise ValueError(
            f"typed update targets table '{payload['table']}', not '{self.tablename}'"
        )
    result = await self._execute_rust(
        "update",
        lambda: self._rust_handle.update_expression(payload),
        parameters=dict(payload.get("values") or {}),
        compile_query=lambda: self._compile_typed_update_query(payload),
    )
    return NativeResult(
        columns=list(result["columns"]),
        rows=[tuple(row) for row in result["rows"]],
    )