Coverage for pyodmongo/engines/engines.py: 100%
201 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 15:48 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 15:48 +0000
1from motor.motor_asyncio import AsyncIOMotorClient
2from pymongo import MongoClient, UpdateMany, DeleteOne, DeleteMany
3from pymongo.results import BulkWriteResult
4from datetime import datetime, timezone
5from bson import ObjectId
6from bson.codec_options import CodecOptions
7from ..models.db_model import DbModel
8from ..models.id_model import Id
9from ..models.responses import DbResponse
10from ..models.query_operators import QueryOperator
11from ..models.sort_operators import SortOperator
12from ..models.paginate import ResponsePaginate
13from ..models.db_field_info import DbField
14from typing import TypeVar, Type, Union
15from concurrent.futures import ThreadPoolExecutor
16from .utils import consolidate_dict, mount_base_pipeline
17from ..services.verify_subclasses import is_subclass
18from asyncio import gather
19from math import ceil
22Model = TypeVar("Model", bound=DbModel)
25class _Engine:
26 """
27 Base class for database operations, providing common functionality for both synchronous and asynchronous engines.
29 Attributes:
30 _client (MongoClient): The MongoDB client.
31 _db (Database): The database instance.
32 _tz_info (timezone): The timezone information.
33 """
35 def __init__(self, Client, mongo_uri, db_name, tz_info: timezone = None):
36 """
37 Initialize the database engine.
39 Args:
40 Client (type): The MongoDB client class.
41 mongo_uri (str): The MongoDB URI.
42 db_name (str): The database name.
43 tz_info (timezone, optional): The timezone information. Defaults to None.
44 """
45 self._client = Client(mongo_uri)
46 self._db = self._client[db_name]
47 self._tz_info = tz_info
49 def _query(self, query: QueryOperator, raw_query: dict) -> dict:
50 """
51 Construct a query dictionary from a QueryOperator or a raw query.
53 Args:
54 query (QueryOperator): The query operator.
55 raw_query (dict): The raw query dictionary.
57 Returns:
58 dict: The constructed query dictionary.
59 """
60 if not is_subclass(class_to_verify=query.__class__, subclass=QueryOperator):
61 raise TypeError(
62 'query argument must be a valid query operator from pyodmongo.queries. If you really need to make a very specific query, use "raw_query" argument'
63 )
64 raw_query = {} if not raw_query else raw_query
65 return query.to_dict() if query else raw_query
67 def _sort(self, sort: QueryOperator, raw_sort: dict) -> dict:
68 """
69 Construct a sort dictionary from a SortOperator or a raw sort.
71 Args:
72 sort (QueryOperator): The sort operator.
73 raw_sort (dict): The raw sort dictionary.
75 Returns:
76 dict: The constructed sort dictionary.
77 """
78 if sort and (type(sort) != SortOperator):
79 raise TypeError(
80 'sort argument must be a SortOperator from pyodmongo.queries. If you really need to make a very specific sort, use "raw_sort" argument'
81 )
82 raw_sort = {} if not raw_sort else raw_sort
83 return sort.to_dict() if sort else raw_sort
85 def _set_tz_info(self, tz_info: timezone):
86 """
87 Set the timezone information.
89 Args:
90 tz_info (timezone): The timezone information.
92 Returns:
93 timezone: The set timezone information.
94 """
95 return tz_info if tz_info else self._tz_info
97 def _update_many_operation(
98 self, obj: Type[Model], query_dict: dict, now, upsert: bool, populate: bool
99 ):
100 """
101 Create an UpdateMany operation for bulk updates.
103 Args:
104 obj (DbModel): The database model object.
105 query_dict (dict): The query dictionary.
106 now (datetime): The current datetime.
108 Returns:
109 UpdateMany: The UpdateMany operation.
110 """
111 dct = consolidate_dict(obj=obj, dct={}, populate=populate)
112 find_filter = query_dict or {"_id": ObjectId(dct.get("_id"))}
113 dct[obj.__class__.updated_at.field_alias] = now
114 dct.pop("_id")
115 dct.pop(obj.__class__.created_at.field_alias)
116 to_save = {
117 "$set": dct,
118 "$setOnInsert": {obj.__class__.created_at.field_alias: now},
119 }
120 return UpdateMany(filter=find_filter, update=to_save, upsert=upsert)
122 def _create_delete_operations_list(
123 self, query: QueryOperator, raw_query: dict, delete_one: bool
124 ):
125 """
126 Create a list of delete operations.
128 Args:
129 query (QueryOperator): The query operator.
130 raw_query (dict): The raw query dictionary.
131 delete_one (bool): Flag to indicate whether to delete one or many documents.
133 Returns:
134 list: The list of delete operations.
135 """
136 query = self._query(query=query, raw_query=raw_query)
137 if delete_one:
138 return [DeleteOne(filter=query)]
139 return [DeleteMany(filter=query)]
141 def _create_save_operations_list(
142 self,
143 objs: list[Type[Model]],
144 query: QueryOperator,
145 raw_query: dict,
146 upsert: bool,
147 populate: bool,
148 ):
149 """
150 Create lists of indexes and save operations for bulk writes.
152 Args:
153 objs (list[DbModel]): The list of database model objects.
154 query (QueryOperator): The query operator.
155 raw_query (dict): The raw query dictionary.
157 Returns:
158 tuple: A tuple containing indexes, operations, and the current datetime.
159 """
160 operations = {}
161 indexes = {}
162 query = self._query(query=query, raw_query=raw_query)
163 now = datetime.now(self._tz_info)
164 now = now.replace(microsecond=int(now.microsecond / 1000) * 1000)
165 for obj in objs:
166 obj: Model
167 operation = self._update_many_operation(
168 obj=obj, query_dict=query, now=now, upsert=upsert, populate=populate
169 )
170 collection_name = obj._collection
171 try:
172 operations[collection_name] += [operation]
173 except KeyError:
174 operations[collection_name] = [operation]
176 try:
177 obj_indexes = obj._indexes
178 except AttributeError:
179 obj_indexes = obj._init_indexes
180 indexes[collection_name] = obj_indexes
181 return indexes, operations, now
183 def _after_save(
184 self, result: BulkWriteResult, objs: list[Model], collection_name: str, now
185 ):
186 """
187 Perform post-save operations.
189 Args:
190 result (BulkWriteResult): The bulk write result.
191 objs (list[DbModel]): The list of database model objects.
192 collection_name (str): The name of the collection.
193 now (datetime): The current datetime.
194 """
195 objs_from_collection = list(
196 filter(lambda x: x._collection == collection_name, objs)
197 )
198 for index, obj_id in result.upserted_ids.items():
199 objs_from_collection[index].id = Id(obj_id)
200 objs_from_collection[index].created_at = now
201 objs_from_collection[index].updated_at = now
203 def _db_response(self, result: BulkWriteResult):
204 """
205 Create a database response object from a bulk write result.
207 Args:
208 result (BulkWriteResult): The bulk write result.
210 Returns:
211 DbResponse: The database response object.
212 """
213 return DbResponse(
214 acknowledged=result.acknowledged,
215 deleted_count=result.deleted_count,
216 inserted_count=result.inserted_count,
217 matched_count=result.matched_count,
218 modified_count=result.modified_count,
219 upserted_count=result.upserted_count,
220 upserted_ids=result.upserted_ids,
221 )
223 def _aggregate_cursor(
224 self,
225 Model: Type[Model],
226 pipeline,
227 tz_info: timezone,
228 ):
229 """
230 Create an aggregation cursor with the specified pipeline and timezone information.
232 Args:
233 Model (DbModel): The database model class.
234 pipeline (list): The aggregation pipeline.
235 tz_info (timezone): The timezone information.
237 Returns:
238 CommandCursor: The aggregation cursor.
239 """
240 tz_info = self._set_tz_info(tz_info=tz_info)
241 tz_aware = True if tz_info else False
242 collection = self._db[Model._collection].with_options(
243 codec_options=CodecOptions(tz_aware=tz_aware, tzinfo=tz_info)
244 )
245 return collection.aggregate(pipeline)
247 def _aggregate_pipeline(
248 self,
249 Model: Type[Model],
250 query: QueryOperator,
251 raw_query: dict,
252 sort: SortOperator,
253 raw_sort: dict,
254 populate: bool,
255 pipeline: list | None,
256 populate_db_fields: list[DbField] | None,
257 paginate: int,
258 current_page: int,
259 docs_per_page: int,
260 no_paginate_limit: int | None,
261 ) -> dict:
262 """
263 Construct an aggregation pipeline.
265 Args:
266 Model (DbModel): The database model class.
267 query (QueryOperator): The query operator.
268 raw_query (dict): The raw query dictionary.
269 sort (SortOperator): The sort operator.
270 raw_sort (dict): The raw sort dictionary.
271 populate (bool): Flag to indicate whether to populate related documents.
273 Returns:
274 tuple: A tuple containing the pipeline, query, and sort dictionaries.
275 """
276 query = self._query(query=query, raw_query=raw_query)
277 sort = self._sort(sort=sort, raw_sort=raw_sort)
278 return (
279 mount_base_pipeline(
280 Model=Model,
281 query=query,
282 sort=sort,
283 populate=populate,
284 pipeline=pipeline,
285 populate_db_fields=populate_db_fields,
286 paginate=paginate,
287 current_page=current_page,
288 docs_per_page=docs_per_page,
289 no_paginate_limit=no_paginate_limit,
290 ),
291 query,
292 sort,
293 )
296class AsyncDbEngine(_Engine):
297 """
298 Asynchronous database engine class that extends the base engine to provide asynchronous operations.
299 """
301 def __init__(self, mongo_uri, db_name, tz_info: timezone = None):
302 """
303 Initialize the asynchronous database engine.
305 Args:
306 mongo_uri (str): The MongoDB URI.
307 db_name (str): The database name.
308 tz_info (timezone, optional): The timezone information. Defaults to None.
309 """
310 super().__init__(
311 Client=AsyncIOMotorClient,
312 mongo_uri=mongo_uri,
313 db_name=db_name,
314 tz_info=tz_info,
315 )
317 async def save_all(
318 self,
319 obj_list: list[Model],
320 populate: bool = False,
321 ) -> dict[str, DbResponse]:
322 """
323 Save a list of objects to the database.
325 Args:
326 obj_list (list[DbModel]): The list of database model objects.
327 """
328 response = {}
329 indexes, operations, now = self._create_save_operations_list(
330 objs=obj_list, query=None, raw_query=None, upsert=True, populate=populate
331 )
332 for collection_name, index_list in indexes.items():
333 if index_list:
334 await self._db[collection_name].create_indexes(index_list)
335 for collection_name, operation_list in operations.items():
336 result: BulkWriteResult = await self._db[collection_name].bulk_write(
337 operation_list
338 )
339 self._after_save(
340 result=result, objs=obj_list, collection_name=collection_name, now=now
341 )
342 response[collection_name] = self._db_response(result=result)
343 return response
345 async def save(
346 self,
347 obj: Model,
348 query: QueryOperator = None,
349 raw_query: dict = None,
350 upsert: bool = True,
351 populate: bool = False,
352 ) -> DbResponse:
353 """
354 Save a single object to the database.
356 Args:
357 obj (DbModel): The database model object.
358 query (QueryOperator, optional): The query operator. Defaults to None.
359 raw_query (dict, optional): The raw query dictionary. Defaults to None.
361 Returns:
362 DbResponse: The database response object.
363 """
364 indexes, operations, now = self._create_save_operations_list(
365 objs=[obj],
366 query=query,
367 raw_query=raw_query,
368 upsert=upsert,
369 populate=populate,
370 )
371 collection_name = obj._collection
372 index_list = indexes[collection_name]
373 if index_list:
374 await self._db[collection_name].create_indexes(index_list)
375 operation_list = operations[collection_name]
376 result: BulkWriteResult = await self._db[collection_name].bulk_write(
377 operation_list
378 )
379 self._after_save(
380 result=result, objs=[obj], collection_name=collection_name, now=now
381 )
382 return self._db_response(result=result)
384 async def find_one(
385 self,
386 Model: Type[Model],
387 query: QueryOperator = None,
388 raw_query: dict = None,
389 sort: SortOperator = None,
390 raw_sort: dict = None,
391 populate: bool = False,
392 pipeline: list = None,
393 populate_db_fields: list[DbField] | None = None,
394 as_dict: bool = False,
395 tz_info: timezone = None,
396 ) -> Model:
397 """
398 Asynchronously finds a single document in the database that matches the query criteria.
400 This method constructs and executes an aggregation pipeline to find one document.
401 It supports complex queries, sorting, and populating referenced fields.
403 Args:
404 Model: The DbModel class to perform the query on.
405 query: A QueryOperator for filtering documents. Defaults to None.
406 raw_query: A raw dictionary for MongoDB query. Defaults to None.
407 sort: A SortOperator for sorting the result. Defaults to None.
408 raw_sort: A raw dictionary for MongoDB sorting. Defaults to None.
409 populate: If True, populates referenced documents. Defaults to False.
410 pipeline: A custom aggregation pipeline to use. Defaults to None.
411 populate_db_fields: A list of specific DbFields to populate. Defaults to None.
412 as_dict: If True, returns the document as a dictionary. Defaults to False.
413 tz_info: Timezone information for decoding datetime objects. Defaults to None.
415 Returns:
416 The matched document as a model instance or a dictionary, or None if no
417 document is found.
418 """
419 pipeline, _, _ = self._aggregate_pipeline(
420 Model=Model,
421 query=query,
422 raw_query=raw_query,
423 sort=sort,
424 raw_sort=raw_sort,
425 populate=populate,
426 pipeline=pipeline,
427 populate_db_fields=populate_db_fields,
428 paginate=False,
429 current_page=1,
430 docs_per_page=1,
431 no_paginate_limit=1,
432 )
433 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
434 if as_dict:
435 result = await cursor.to_list(length=None)
436 else:
437 result = [Model(**doc) async for doc in cursor]
438 try:
439 return result[0]
440 except IndexError:
441 return None
443 async def find_many(
444 self,
445 Model: Type[Model],
446 query: QueryOperator = None,
447 raw_query: dict = None,
448 sort: SortOperator = None,
449 raw_sort: dict = None,
450 populate: bool = False,
451 pipeline: list = None,
452 populate_db_fields: list[DbField] | None = None,
453 as_dict: bool = False,
454 tz_info: timezone = None,
455 paginate: bool = False,
456 current_page: int = 1,
457 docs_per_page: int = 1000,
458 no_paginate_limit: int | None = None,
459 ) -> Union[list[Model], ResponsePaginate]:
460 """
461 Asynchronously finds multiple documents in the database that match the query criteria.
463 This method supports filtering, sorting, populating referenced fields, and pagination.
464 If pagination is enabled, it returns a paginated response object.
466 Args:
467 Model: The DbModel class to perform the query on.
468 query: A QueryOperator for filtering documents. Defaults to None.
469 raw_query: A raw dictionary for MongoDB query. Defaults to None.
470 sort: A SortOperator for sorting the results. Defaults to None.
471 raw_sort: A raw dictionary for MongoDB sorting. Defaults to None.
472 populate: If True, populates referenced documents. Defaults to False.
473 pipeline: A custom aggregation pipeline to use. Defaults to None.
474 populate_db_fields: A list of specific DbFields to populate. Defaults to None.
475 as_dict: If True, returns documents as dictionaries. Defaults to False.
476 tz_info: Timezone information for decoding datetime objects. Defaults to None.
477 paginate: If True, enables pagination. Defaults to False.
478 current_page: The page number to retrieve. Defaults to 1.
479 docs_per_page: The number of documents per page. Defaults to 1000.
480 no_paginate_limit: The maximum number of documents to return when pagination is disabled. Defaults to None.
482 Returns:
483 A list of documents or a ResponsePaginate object with the paginated results.
484 """
485 pipeline, query, _ = self._aggregate_pipeline(
486 Model=Model,
487 query=query,
488 raw_query=raw_query,
489 sort=sort,
490 raw_sort=raw_sort,
491 populate=populate,
492 pipeline=pipeline,
493 populate_db_fields=populate_db_fields,
494 paginate=paginate,
495 current_page=current_page,
496 docs_per_page=docs_per_page,
497 no_paginate_limit=no_paginate_limit,
498 )
500 async def _result():
501 cursor = self._aggregate_cursor(
502 Model=Model, pipeline=pipeline, tz_info=tz_info
503 )
504 if as_dict:
505 result = await cursor.to_list(length=None)
506 else:
507 result = [Model(**doc) async for doc in cursor]
508 return result
510 if not paginate:
511 return await _result()
513 async def _count():
514 kwargs = {"hint": "_id_"} if not query else {}
515 return await self._db[Model._collection].count_documents(
516 filter=query, **kwargs
517 )
519 result, count = await gather(_result(), _count())
520 page_quantity = ceil(count / docs_per_page)
521 return ResponsePaginate(
522 current_page=current_page,
523 page_quantity=page_quantity,
524 docs_quantity=count,
525 docs=result,
526 )
528 async def delete(
529 self,
530 Model: Type[Model],
531 query: QueryOperator = None,
532 raw_query: dict = None,
533 delete_one: bool = False,
534 ) -> DbResponse:
535 """
536 Delete documents from the database.
538 Args:
539 Model (DbModel): The database model class.
540 query (QueryOperator, optional): The query operator. Defaults to None.
541 raw_query (dict, optional): The raw query dictionary. Defaults to None.
542 delete_one (bool, optional): Flag to delete a single document. Defaults to False.
544 Returns:
545 DbResponse: The database response object.
546 """
547 operations = self._create_delete_operations_list(
548 query=query, raw_query=raw_query, delete_one=delete_one
549 )
550 collection_name = Model._collection
551 result: BulkWriteResult = await self._db[collection_name].bulk_write(operations)
552 return self._db_response(result=result)
555class DbEngine(_Engine):
556 """
557 Synchronous database engine class that extends the base engine to provide synchronous operations.
558 """
560 def __init__(self, mongo_uri, db_name, tz_info: timezone = None):
561 """
562 Initialize the synchronous database engine.
564 Args:
565 mongo_uri (str): The MongoDB URI.
566 db_name (str): The database name.
567 tz_info (timezone, optional): The timezone information. Defaults to None.
568 """
569 super().__init__(
570 Client=MongoClient,
571 mongo_uri=mongo_uri,
572 db_name=db_name,
573 tz_info=tz_info,
574 )
576 def save_all(
577 self,
578 obj_list: list[Model],
579 populate: bool = False,
580 ) -> dict[str, DbResponse]:
581 """
582 Save a list of objects to the database.
584 Args:
585 obj_list (list[DbModel]): The list of database model objects.
586 """
587 response = {}
588 indexes, operations, now = self._create_save_operations_list(
589 objs=obj_list, query=None, raw_query=None, upsert=True, populate=populate
590 )
591 for collection_name, index_list in indexes.items():
592 if index_list:
593 self._db[collection_name].create_indexes(index_list)
594 for collection_name, operation_list in operations.items():
595 result: BulkWriteResult = self._db[collection_name].bulk_write(
596 operation_list
597 )
598 self._after_save(
599 result=result, objs=obj_list, collection_name=collection_name, now=now
600 )
601 response[collection_name] = self._db_response(result=result)
602 return response
604 def save(
605 self,
606 obj: Model,
607 query: QueryOperator = None,
608 raw_query: dict = None,
609 upsert: bool = True,
610 populate: bool = False,
611 ) -> DbResponse:
612 """
613 Save a single object to the database.
615 Args:
616 obj (DbModel): The database model object.
617 query (QueryOperator, optional): The query operator. Defaults to None.
618 raw_query (dict, optional): The raw query dictionary. Defaults to None.
620 Returns:
621 DbResponse: The database response object.
622 """
623 indexes, operations, now = self._create_save_operations_list(
624 objs=[obj],
625 query=query,
626 raw_query=raw_query,
627 upsert=upsert,
628 populate=populate,
629 )
630 collection_name = obj._collection
631 index_list = indexes[collection_name]
632 if index_list:
633 self._db[collection_name].create_indexes(index_list)
634 operation_list = operations[collection_name]
635 result: BulkWriteResult = self._db[collection_name].bulk_write(operation_list)
636 self._after_save(
637 result=result, objs=[obj], collection_name=collection_name, now=now
638 )
639 return self._db_response(result=result)
641 def find_one(
642 self,
643 Model: Type[Model],
644 query: QueryOperator = None,
645 raw_query: dict = None,
646 sort: SortOperator = None,
647 raw_sort: dict = None,
648 populate: bool = False,
649 pipeline: list = None,
650 populate_db_fields: list[DbField] | None = None,
651 as_dict: bool = False,
652 tz_info: timezone = None,
653 ) -> Model:
654 """
655 Synchronously finds a single document in the database that matches the query criteria.
657 This method constructs and executes an aggregation pipeline to find one document.
658 It supports complex queries, sorting, and populating referenced fields.
660 Args:
661 Model: The DbModel class to perform the query on.
662 query: A QueryOperator for filtering documents. Defaults to None.
663 raw_query: A raw dictionary for MongoDB query. Defaults to None.
664 sort: A SortOperator for sorting the result. Defaults to None.
665 raw_sort: A raw dictionary for MongoDB sorting. Defaults to None.
666 populate: If True, populates referenced documents. Defaults to False.
667 pipeline: A custom aggregation pipeline to use. Defaults to None.
668 populate_db_fields: A list of specific DbFields to populate. Defaults to None.
669 as_dict: If True, returns the document as a dictionary. Defaults to False.
670 tz_info: Timezone information for decoding datetime objects. Defaults to None.
672 Returns:
673 The matched document as a model instance or a dictionary, or None if no
674 document is found.
675 """
676 pipeline, _, _ = self._aggregate_pipeline(
677 Model=Model,
678 query=query,
679 raw_query=raw_query,
680 sort=sort,
681 raw_sort=raw_sort,
682 populate=populate,
683 pipeline=pipeline,
684 populate_db_fields=populate_db_fields,
685 paginate=False,
686 current_page=1,
687 docs_per_page=1,
688 no_paginate_limit=1,
689 )
690 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
691 if as_dict:
692 result = list(cursor)
693 else:
694 result = [Model(**doc) for doc in cursor]
695 try:
696 return result[0]
697 except IndexError:
698 return None
700 def find_many(
701 self,
702 Model: Type[Model],
703 query: QueryOperator = None,
704 raw_query: dict = None,
705 sort: SortOperator = None,
706 raw_sort: dict = None,
707 populate: bool = False,
708 pipeline: list = None,
709 populate_db_fields: list[DbField] | None = None,
710 as_dict: bool = False,
711 tz_info: timezone = None,
712 paginate: bool = False,
713 current_page: int = 1,
714 docs_per_page: int = 1000,
715 no_paginate_limit: int | None = None,
716 ) -> Union[list[Model], ResponsePaginate]:
717 """
718 Synchronously finds multiple documents in the database that match the query criteria.
720 This method supports filtering, sorting, populating referenced fields, and pagination.
721 If pagination is enabled, it returns a paginated response object.
723 Args:
724 Model: The DbModel class to perform the query on.
725 query: A QueryOperator for filtering documents. Defaults to None.
726 raw_query: A raw dictionary for MongoDB query. Defaults to None.
727 sort: A SortOperator for sorting the results. Defaults to None.
728 raw_sort: A raw dictionary for MongoDB sorting. Defaults to None.
729 populate: If True, populates referenced documents. Defaults to False.
730 pipeline: A custom aggregation pipeline to use. Defaults to None.
731 populate_db_fields: A list of specific DbFields to populate. Defaults to None.
732 as_dict: If True, returns documents as dictionaries. Defaults to False.
733 tz_info: Timezone information for decoding datetime objects. Defaults to None.
734 paginate: If True, enables pagination. Defaults to False.
735 current_page: The page number to retrieve. Defaults to 1.
736 docs_per_page: The number of documents per page. Defaults to 1000.
737 no_paginate_limit: The maximum number of documents to return when pagination is disabled. Defaults to None.
739 Returns:
740 A list of documents or a ResponsePaginate object with the paginated results.
741 """
742 pipeline, query, _ = self._aggregate_pipeline(
743 Model=Model,
744 query=query,
745 raw_query=raw_query,
746 sort=sort,
747 raw_sort=raw_sort,
748 populate=populate,
749 pipeline=pipeline,
750 populate_db_fields=populate_db_fields,
751 paginate=paginate,
752 current_page=current_page,
753 docs_per_page=docs_per_page,
754 no_paginate_limit=no_paginate_limit,
755 )
757 def _result():
758 cursor = self._aggregate_cursor(
759 Model=Model, pipeline=pipeline, tz_info=tz_info
760 )
761 if as_dict:
762 result = list(cursor)
763 else:
764 result = [Model(**doc) for doc in cursor]
765 return result
767 if not paginate:
768 return _result()
770 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
772 def _count():
773 kwargs = {"hint": "_id_"} if not query else {}
774 return self._db[Model._collection].count_documents(filter=query, **kwargs)
776 with ThreadPoolExecutor() as executor:
777 future_result = executor.submit(_result)
778 future_count = executor.submit(_count)
779 result = future_result.result()
780 count = future_count.result()
782 page_quantity = ceil(count / docs_per_page)
783 return ResponsePaginate(
784 current_page=current_page,
785 page_quantity=page_quantity,
786 docs_quantity=count,
787 docs=result,
788 )
790 def delete(
791 self,
792 Model: Type[Model],
793 query: QueryOperator = None,
794 raw_query: dict = None,
795 delete_one: bool = False,
796 ) -> DbResponse:
797 """
798 Delete documents from the database.
800 Args:
801 Model (DbModel): The database model class.
802 query (QueryOperator, optional): The query operator. Defaults to None.
803 raw_query (dict, optional): The raw query dictionary. Defaults to None.
804 delete_one (bool, optional): Flag to delete a single document. Defaults to False.
806 Returns:
807 DbResponse: The database response object.
808 """
809 operations = self._create_delete_operations_list(
810 query=query, raw_query=raw_query, delete_one=delete_one
811 )
812 collection_name = Model._collection
813 result: BulkWriteResult = self._db[collection_name].bulk_write(operations)
814 return self._db_response(result=result)