Skip to content

Table

Table is the Python facade over a Rust-owned table handle. It exposes CRUD, counting, ordering, filtering, and relationship-depth loading while Rust owns SQL planning and execution.

ormdantic.table.Order

Bases: Enum

Sort direction for table queries.

asc class-attribute instance-attribute

asc = 'asc'

desc class-attribute instance-attribute

desc = 'desc'

ormdantic.table.Table

Table(
    *,
    table_data,
    table_map,
    rust_handle,
    events,
    runtime=None
)

Bases: Generic[ModelType]

User-facing table handle backed by a Rust PyTableHandle.

Source code in ormdantic/table.py
def __init__(
    self,
    *,
    table_data: OrmTable[ModelType],
    table_map: Map,
    rust_handle: Any,
    events: EventRegistry,
    runtime: Any | None = None,
) -> None:
    self._table_data = table_data
    self._table_map = table_map
    self._rust_handle = rust_handle
    self._events = events
    self._runtime = runtime
    self.tablename = table_data.tablename
    self.columns = table_data.columns

tablename instance-attribute

tablename = tablename

columns instance-attribute

columns = columns

find_one async

find_one(pk, depth=0, load=None)

Find a model by primary key.

Source code in ormdantic/table.py
async def find_one(
    self,
    pk: Any,
    depth: int = 0,
    load: list[LoaderOption] | None = None,
) -> ModelType | None:
    """Find a model by primary key."""
    load_plan = self._resolve_load_plan(depth, load)
    if load_plan.paths:
        joined_filters, joined_order_by, joined_values = (
            self._joined_loader_query_parts(load_plan)
        )
        joined_values[self._table_data.pk] = py_type_to_sql(self._table_map, pk)
        result = self._rust_handle.find_one_with_paths(
            joined_values,
            list(load_plan.paths),
            joined_filters,
            joined_order_by,
        )
    else:
        result = self._rust_handle.find_one(
            py_type_to_sql(self._table_map, pk), load_plan.depth
        )
    model = self._deserialize(
        result,
        is_array=False,
        depth=load_plan.depth,
        load_paths=load_plan.paths,
        load_options=load_plan.options,
    )
    if model is not None and load_plan.selectin_paths:
        await self._load_selectin_graph([model], load_plan)
    return model

find_many async

find_many(
    where=None,
    order_by=None,
    order=asc,
    limit=0,
    offset=0,
    depth=0,
    load=None,
)

Find many model instances.

Source code in ormdantic/table.py
async def find_many(
    self,
    where: dict[str, Any] | QueryExpression | None = None,
    order_by: list[str] | None = None,
    order: Order = Order.asc,
    limit: int = 0,
    offset: int = 0,
    depth: int = 0,
    load: list[LoaderOption] | None = None,
) -> Result[ModelType]:
    """Find many model instances."""
    load_plan = self._resolve_load_plan(depth, load)
    filters, values = self._compile_where(where)
    if load_plan.paths:
        joined_filters, joined_order_by, joined_values = (
            self._joined_loader_query_parts(load_plan)
        )
        values.update(joined_values)
        result = self._rust_handle.find_many_with_paths(
            filters,
            values,
            order_by or [],
            order.value,
            limit or None,
            offset or None,
            list(load_plan.paths),
            joined_filters,
            joined_order_by,
        )
        data = (
            self._deserialize(
                result,
                is_array=True,
                depth=load_plan.depth,
                load_paths=load_plan.paths,
                load_options=load_plan.options,
            )
            or []
        )
        if load_plan.selectin_paths:
            await self._load_selectin_graph(data, load_plan)
    else:
        result = self._rust_handle.find_many(
            filters,
            values,
            order_by or [],
            order.value,
            limit or None,
            offset or None,
            load_plan.depth,
        )
        data = (
            self._deserialize(
                result,
                is_array=True,
                depth=load_plan.depth,
                load_paths=load_plan.paths,
                load_options=load_plan.options,
            )
            or []
        )
        if load_plan.selectin_paths:
            await self._load_selectin_graph(data, load_plan)
    return Result(offset=offset, limit=limit, data=data)

insert async

insert(model_instance)

Insert a model instance.

Source code in ormdantic/table.py
async def insert(self, model_instance: ModelType) -> ModelType:
    """Insert a model instance."""
    await self._events.dispatch(
        "before_insert", model=model_instance, table=self._table_data
    )
    self._rust_handle.insert(self._payload(model_instance))
    await self._events.dispatch(
        "after_insert", model=model_instance, table=self._table_data
    )
    return model_instance

update async

update(model_instance)

Update a model instance.

Source code in ormdantic/table.py
async def update(self, model_instance: ModelType) -> ModelType:
    """Update a model instance."""
    await self._events.dispatch(
        "before_update", model=model_instance, table=self._table_data
    )
    self._rust_handle.update(self._payload(model_instance))
    await self._events.dispatch(
        "after_update", model=model_instance, table=self._table_data
    )
    return model_instance

upsert async

upsert(model_instance)

Insert or update a model instance.

Source code in ormdantic/table.py
async def upsert(self, model_instance: ModelType) -> ModelType:
    """Insert or update a model instance."""
    await self._events.dispatch(
        "before_upsert", model=model_instance, table=self._table_data
    )
    self._rust_handle.upsert(self._payload(model_instance))
    await self._events.dispatch(
        "after_upsert", model=model_instance, table=self._table_data
    )
    return model_instance

delete async

delete(pk)

Delete a model by primary key.

Source code in ormdantic/table.py
async def delete(self, pk: Any) -> bool:
    """Delete a model by primary key."""
    await self._events.dispatch("before_delete", pk=pk, table=self._table_data)
    self._rust_handle.delete(py_type_to_sql(self._table_map, pk))
    await self._events.dispatch("after_delete", pk=pk, table=self._table_data)
    return True

count async

count(where=None, depth=0)

Count records matching an optional filter.

Source code in ormdantic/table.py
async def count(
    self, where: dict[str, Any] | QueryExpression | None = None, depth: int = 0
) -> int:
    """Count records matching an optional filter."""
    filters, values = self._compile_where(where)
    result = self._rust_handle.count(filters, values)
    return NativeResult(
        columns=list(result["columns"]),
        rows=[tuple(row) for row in result["rows"]],
    ).scalar()

select async

select(
    *projections,
    query=None,
    where=None,
    group_by=None,
    having=None,
    order_by=None,
    limit=None,
    offset=None,
    distinct=False
)

Execute a typed projection query and return raw projected rows.

Source code in ormdantic/table.py
async def select(
    self,
    *projections: ProjectionExpression | SerializableExpression,
    query: SelectExpressionQuery | None = None,
    where: QueryExpression | None = None,
    group_by: list[SerializableExpression] | None = None,
    having: QueryExpression | None = None,
    order_by: list[OrderExpression] | None = None,
    limit: int | None = None,
    offset: int | None = None,
    distinct: bool = False,
) -> NativeResult:
    """Execute a typed projection query and return raw projected rows."""
    if query is None:
        query = select_query(
            self.tablename,
            *projections,
            where=where,
            group_by=group_by or (),
            having=having,
            order_by=order_by or (),
            limit=limit,
            offset=offset,
            distinct=distinct,
        )
    elif projections:
        raise ValueError("pass either query= or projection arguments, not both")
    payload = query.to_query_payload()
    if payload["table"] != self.tablename:
        raise ValueError(
            f"typed query targets table '{payload['table']}', not '{self.tablename}'"
        )
    result = self._rust_handle.select_expression(payload)
    return NativeResult(
        columns=list(result["columns"]),
        rows=[tuple(row) for row in result["rows"]],
    )

update_where async

update_where(*assignments, query=None, where=None)

Execute a typed UPDATE expression query.

Source code in ormdantic/table.py
async def update_where(
    self,
    *assignments: AssignmentExpression,
    query: UpdateExpressionQuery | None = None,
    where: QueryExpression | None = None,
) -> NativeResult:
    """Execute a typed UPDATE expression query."""
    if query is None:
        query = update_query(self.tablename, *assignments, where=where)
    elif assignments:
        raise ValueError("pass either query= or assignment arguments, not both")
    payload = query.to_query_payload()
    if payload["table"] != self.tablename:
        raise ValueError(
            f"typed update targets table '{payload['table']}', not '{self.tablename}'"
        )
    result = self._rust_handle.update_expression(payload)
    return NativeResult(
        columns=list(result["columns"]),
        rows=[tuple(row) for row in result["rows"]],
    )