Generate Python models from a table map and result set.
Generate Python models from a table map and result set.
:param table_data: Table data for the returned model type.
:param table_map: Map of tablenames and models.
:param result_set: SQL Alchemy cursor result.
:param is_array: Deserialize as a model or a list of models?
:param depth: Model tree depth.
def__init__(self,table_data:OrmTable,# type: ignoretable_map:Map,# TODO: Missing type parameters for generic type "CursorResult".result_set:CursorResult[Any],is_array:bool,depth:int,)->None:"""Generate Python models from a table map and result set. :param table_data: Table data for the returned model type. :param table_map: Map of tablenames and models. :param result_set: SQL Alchemy cursor result. :param is_array: Deserialize as a model or a list of models? :param depth: Model tree depth. """self._table_data=table_dataself._table_map=table_mapself._result_set=result_setself._is_array=is_arrayself._depth=depthself._result_schema=ResultSchema(is_array=is_array,references={table_data.tablename:self._get_result_schema(table_data,depth,is_array)},)self._columns=[it[0]foritinself._result_set.cursor.description]self._return_dict:dict[str,Any]={}
defdeserialize(self)->SerializedType:"""Deserialize the result set into Python models."""forrowinself._result_set:row_schema={}forcolumn_idx,column_treeinenumerate(self._columns):# `node` is the currently acted on level of depth in return.node=self._return_dict# `schema` describes acted on level of depth.schema=self._result_schemacolumn_tree,column=column_tree.split("\\")current_tree=""forbranchincolumn_tree.split("/"):current_tree+=f"/{branch}"# Update schema position.schema=schema.references[branch]# Update last pk if this column is a pk.if(column==schema.table_data.pk# type: ignoreandcurrent_tree==f"/{column_tree}"):row_schema[current_tree]=row[column_idx]# If this branch in schema is absent from result set.ifrow_schema[current_tree]isNone:break# Initialize this object if it is None.ifnode.get(branch)isNone:node[branch]={}if(schema.is_arrayandnode[branch].get(row_schema[current_tree])isNone):node[branch][row_schema[current_tree]]={}# Set node to this level.ifschema.is_array:node=node[branch][row_schema[current_tree]]else:node=node[branch]# If we did not break.else:# Set value.ifcolumn:node[column]=row[column_idx]ifnotself._return_dict:returnNone# type: ignoreifself._result_schema.is_array:return[self._table_data.model(**record)forrecordinself._prep_result(self._return_dict,self._result_schema)[self._table_data.tablename]# type: ignore]returnself._table_data.model(**self._prep_result(self._return_dict,self._result_schema)[self._table_data.tablename])