Coverage for pyodmongo/engines/engines.py: 100%
214 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-16 15:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-16 15:08 +0000
1from motor.motor_asyncio import AsyncIOMotorClient
2from pymongo import MongoClient, UpdateMany, DeleteOne, DeleteMany
3from pymongo.results import BulkWriteResult
4from datetime import datetime, timezone
5from bson import ObjectId
6from bson.codec_options import CodecOptions
7from ..models.db_model import DbModel
8from ..models.id_model import Id
9from ..models.responses import DbResponse
10from ..models.query_operators import QueryOperator
11from ..models.sort_operators import SortOperator
12from ..models.paginate import ResponsePaginate
13from ..models.db_field_info import DbField
14from typing import TypeVar, Type, Union
15from concurrent.futures import ThreadPoolExecutor
16from .utils import consolidate_dict, mount_base_pipeline
17from ..services.verify_subclasses import is_subclass
18from asyncio import gather
19from math import ceil
22Model = TypeVar("Model", bound=DbModel)
25class _Engine:
26 """
27 Base class for database operations, providing common functionality for both synchronous and asynchronous engines.
29 Attributes:
30 _client (MongoClient): The MongoDB client.
31 _db (Database): The database instance.
32 _tz_info (timezone): The timezone information.
33 """
35 def __init__(self, Client, mongo_uri, db_name, tz_info: timezone = None):
36 """
37 Initialize the database engine.
39 Args:
40 Client (type): The MongoDB client class.
41 mongo_uri (str): The MongoDB URI.
42 db_name (str): The database name.
43 tz_info (timezone, optional): The timezone information. Defaults to None.
44 """
45 self._client = Client(mongo_uri)
46 self._db = self._client[db_name]
47 self._tz_info = tz_info
49 def _query(self, query: QueryOperator, raw_query: dict) -> dict:
50 """
51 Construct a query dictionary from a QueryOperator or a raw query.
53 Args:
54 query (QueryOperator): The query operator.
55 raw_query (dict): The raw query dictionary.
57 Returns:
58 dict: The constructed query dictionary.
59 """
60 if not is_subclass(class_to_verify=query.__class__, subclass=QueryOperator):
61 raise TypeError(
62 'query argument must be a valid query operator from pyodmongo.queries. If you really need to make a very specific query, use "raw_query" argument'
63 )
64 raw_query = {} if not raw_query else raw_query
65 return query.to_dict() if query else raw_query
67 def _sort(self, sort: QueryOperator, raw_sort: dict) -> dict:
68 """
69 Construct a sort dictionary from a SortOperator or a raw sort.
71 Args:
72 sort (QueryOperator): The sort operator.
73 raw_sort (dict): The raw sort dictionary.
75 Returns:
76 dict: The constructed sort dictionary.
77 """
78 if sort and (type(sort) != SortOperator):
79 raise TypeError(
80 'sort argument must be a SortOperator from pyodmongo.queries. If you really need to make a very specific sort, use "raw_sort" argument'
81 )
82 raw_sort = {} if not raw_sort else raw_sort
83 return sort.to_dict() if sort else raw_sort
85 def _set_tz_info(self, tz_info: timezone):
86 """
87 Set the timezone information.
89 Args:
90 tz_info (timezone): The timezone information.
92 Returns:
93 timezone: The set timezone information.
94 """
95 return tz_info if tz_info else self._tz_info
97 def _update_many_operation(
98 self, obj: Type[Model], query_dict: dict, now, upsert: bool
99 ):
100 """
101 Create an UpdateMany operation for bulk updates.
103 Args:
104 obj (DbModel): The database model object.
105 query_dict (dict): The query dictionary.
106 now (datetime): The current datetime.
108 Returns:
109 UpdateMany: The UpdateMany operation.
110 """
111 dct = consolidate_dict(obj=obj, dct={})
112 find_filter = query_dict or {"_id": ObjectId(dct.get("_id"))}
113 dct[obj.__class__.updated_at.field_alias] = now
114 dct.pop("_id")
115 dct.pop(obj.__class__.created_at.field_alias)
116 to_save = {
117 "$set": dct,
118 "$setOnInsert": {obj.__class__.created_at.field_alias: now},
119 }
120 return UpdateMany(filter=find_filter, update=to_save, upsert=upsert)
122 def _create_delete_operations_list(
123 self, query: QueryOperator, raw_query: dict, delete_one: bool
124 ):
125 """
126 Create a list of delete operations.
128 Args:
129 query (QueryOperator): The query operator.
130 raw_query (dict): The raw query dictionary.
131 delete_one (bool): Flag to indicate whether to delete one or many documents.
133 Returns:
134 list: The list of delete operations.
135 """
136 query = self._query(query=query, raw_query=raw_query)
137 if delete_one:
138 return [DeleteOne(filter=query)]
139 return [DeleteMany(filter=query)]
141 def _create_save_operations_list(
142 self,
143 objs: list[Type[Model]],
144 query: QueryOperator,
145 raw_query: dict,
146 upsert: bool,
147 ):
148 """
149 Create lists of indexes and save operations for bulk writes.
151 Args:
152 objs (list[DbModel]): The list of database model objects.
153 query (QueryOperator): The query operator.
154 raw_query (dict): The raw query dictionary.
156 Returns:
157 tuple: A tuple containing indexes, operations, and the current datetime.
158 """
159 operations = {}
160 indexes = {}
161 query = self._query(query=query, raw_query=raw_query)
162 now = datetime.now(self._tz_info)
163 now = now.replace(microsecond=int(now.microsecond / 1000) * 1000)
164 for obj in objs:
165 obj: Model
166 operation = self._update_many_operation(
167 obj=obj, query_dict=query, now=now, upsert=upsert
168 )
169 collection_name = obj._collection
170 try:
171 operations[collection_name] += [operation]
172 except KeyError:
173 operations[collection_name] = [operation]
175 try:
176 obj_indexes = obj._indexes
177 except AttributeError:
178 obj_indexes = obj._init_indexes
179 indexes[collection_name] = obj_indexes
180 return indexes, operations, now
182 def _after_save(
183 self, result: BulkWriteResult, objs: list[Model], collection_name: str, now
184 ):
185 """
186 Perform post-save operations.
188 Args:
189 result (BulkWriteResult): The bulk write result.
190 objs (list[DbModel]): The list of database model objects.
191 collection_name (str): The name of the collection.
192 now (datetime): The current datetime.
193 """
194 objs_from_collection = list(
195 filter(lambda x: x._collection == collection_name, objs)
196 )
197 for index, obj_id in result.upserted_ids.items():
198 objs_from_collection[index].id = Id(obj_id)
199 objs_from_collection[index].created_at = now
200 objs_from_collection[index].updated_at = now
202 def _db_response(self, result: BulkWriteResult):
203 """
204 Create a database response object from a bulk write result.
206 Args:
207 result (BulkWriteResult): The bulk write result.
209 Returns:
210 DbResponse: The database response object.
211 """
212 return DbResponse(
213 acknowledged=result.acknowledged,
214 deleted_count=result.deleted_count,
215 inserted_count=result.inserted_count,
216 matched_count=result.matched_count,
217 modified_count=result.modified_count,
218 upserted_count=result.upserted_count,
219 upserted_ids=result.upserted_ids,
220 )
222 def _aggregate_cursor(
223 self,
224 Model: Type[Model],
225 pipeline,
226 tz_info: timezone,
227 ):
228 """
229 Create an aggregation cursor with the specified pipeline and timezone information.
231 Args:
232 Model (DbModel): The database model class.
233 pipeline (list): The aggregation pipeline.
234 tz_info (timezone): The timezone information.
236 Returns:
237 CommandCursor: The aggregation cursor.
238 """
239 tz_info = self._set_tz_info(tz_info=tz_info)
240 tz_aware = True if tz_info else False
241 collection = self._db[Model._collection].with_options(
242 codec_options=CodecOptions(tz_aware=tz_aware, tzinfo=tz_info)
243 )
244 return collection.aggregate(pipeline)
246 def _aggregate_pipeline(
247 self,
248 Model: Type[Model],
249 query: QueryOperator,
250 raw_query: dict,
251 sort: SortOperator,
252 raw_sort: dict,
253 populate: bool,
254 populate_db_fields: list[DbField] | None,
255 ) -> dict:
256 """
257 Construct an aggregation pipeline.
259 Args:
260 Model (DbModel): The database model class.
261 query (QueryOperator): The query operator.
262 raw_query (dict): The raw query dictionary.
263 sort (SortOperator): The sort operator.
264 raw_sort (dict): The raw sort dictionary.
265 populate (bool): Flag to indicate whether to populate related documents.
267 Returns:
268 tuple: A tuple containing the pipeline, query, and sort dictionaries.
269 """
270 query = self._query(query=query, raw_query=raw_query)
271 sort = self._sort(sort=sort, raw_sort=raw_sort)
272 return (
273 mount_base_pipeline(
274 Model=Model,
275 query=query,
276 sort=sort,
277 populate=populate,
278 populate_db_fields=populate_db_fields,
279 ),
280 query,
281 sort,
282 )
284 def _add_paginate_to_pipeline(
285 self, pipeline: list, current_page: int, docs_per_page: int
286 ):
287 """
288 Add pagination stages to the aggregation pipeline.
290 Args:
291 pipeline (list): The aggregation pipeline.
292 current_page (int): The current page number.
293 docs_per_page (int): The number of documents per page.
294 """
295 max_docs_per_page = 1000
296 current_page = 1 if current_page <= 0 else current_page
297 docs_per_page = (
298 max_docs_per_page if docs_per_page > max_docs_per_page else docs_per_page
299 )
300 skip = (docs_per_page * current_page) - docs_per_page
301 skip_stage = [{"$skip": skip}]
302 limit_stage = [{"$limit": docs_per_page}]
303 pipeline += skip_stage + limit_stage
306class AsyncDbEngine(_Engine):
307 """
308 Asynchronous database engine class that extends the base engine to provide asynchronous operations.
309 """
311 def __init__(self, mongo_uri, db_name, tz_info: timezone = None):
312 """
313 Initialize the asynchronous database engine.
315 Args:
316 mongo_uri (str): The MongoDB URI.
317 db_name (str): The database name.
318 tz_info (timezone, optional): The timezone information. Defaults to None.
319 """
320 super().__init__(
321 Client=AsyncIOMotorClient,
322 mongo_uri=mongo_uri,
323 db_name=db_name,
324 tz_info=tz_info,
325 )
327 async def save_all(
328 self,
329 obj_list: list[Model],
330 ) -> dict[str, DbResponse]:
331 """
332 Save a list of objects to the database.
334 Args:
335 obj_list (list[DbModel]): The list of database model objects.
336 """
337 response = {}
338 indexes, operations, now = self._create_save_operations_list(
339 objs=obj_list, query=None, raw_query=None, upsert=True
340 )
341 for collection_name, index_list in indexes.items():
342 if index_list:
343 await self._db[collection_name].create_indexes(index_list)
344 for collection_name, operation_list in operations.items():
345 result: BulkWriteResult = await self._db[collection_name].bulk_write(
346 operation_list
347 )
348 self._after_save(
349 result=result, objs=obj_list, collection_name=collection_name, now=now
350 )
351 response[collection_name] = self._db_response(result=result)
352 return response
354 async def save(
355 self,
356 obj: Model,
357 query: QueryOperator = None,
358 raw_query: dict = None,
359 upsert: bool = True,
360 ) -> DbResponse:
361 """
362 Save a single object to the database.
364 Args:
365 obj (DbModel): The database model object.
366 query (QueryOperator, optional): The query operator. Defaults to None.
367 raw_query (dict, optional): The raw query dictionary. Defaults to None.
369 Returns:
370 DbResponse: The database response object.
371 """
372 indexes, operations, now = self._create_save_operations_list(
373 objs=[obj], query=query, raw_query=raw_query, upsert=upsert
374 )
375 collection_name = obj._collection
376 index_list = indexes[collection_name]
377 if index_list:
378 await self._db[collection_name].create_indexes(index_list)
379 operation_list = operations[collection_name]
380 result: BulkWriteResult = await self._db[collection_name].bulk_write(
381 operation_list
382 )
383 self._after_save(
384 result=result, objs=[obj], collection_name=collection_name, now=now
385 )
386 return self._db_response(result=result)
388 async def find_one(
389 self,
390 Model: Type[Model],
391 query: QueryOperator = None,
392 raw_query: dict = None,
393 sort: SortOperator = None,
394 raw_sort: dict = None,
395 populate: bool = False,
396 populate_db_fields: list[DbField] | None = None,
397 as_dict: bool = False,
398 tz_info: timezone = None,
399 ) -> Model:
400 """
401 Find a single document in the database.
403 Args:
404 Model (DbModel): The database model class.
405 query (QueryOperator, optional): The query operator. Defaults to None.
406 raw_query (dict, optional): The raw query dictionary. Defaults to None.
407 sort (SortOperator, optional): The sort operator. Defaults to None.
408 raw_sort (dict, optional): The raw sort dictionary. Defaults to None.
409 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False.
410 as_dict (bool, optional): Flag to return the result as a dictionary. Defaults to False.
411 tz_info (timezone, optional): The timezone information. Defaults to None.
413 Returns:
414 DbModel: The found database model object.
415 """
416 pipeline, _, _ = self._aggregate_pipeline(
417 Model=Model,
418 query=query,
419 raw_query=raw_query,
420 sort=sort,
421 raw_sort=raw_sort,
422 populate=populate,
423 populate_db_fields=populate_db_fields,
424 )
425 pipeline += [{"$limit": 1}]
426 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
427 if as_dict:
428 result = await cursor.to_list(length=None)
429 else:
430 result = [Model(**doc) async for doc in cursor]
431 try:
432 return result[0]
433 except IndexError:
434 return None
436 async def find_many(
437 self,
438 Model: Type[Model],
439 query: QueryOperator = None,
440 raw_query: dict = None,
441 sort: SortOperator = None,
442 raw_sort: dict = None,
443 populate: bool = False,
444 populate_db_fields: list[DbField] | None = None,
445 as_dict: bool = False,
446 tz_info: timezone = None,
447 paginate: bool = False,
448 current_page: int = 1,
449 docs_per_page: int = 1000,
450 ) -> Union[list[Model], ResponsePaginate]:
451 """
452 Find multiple documents in the database.
454 Args:
455 Model (DbModel): The database model class.
456 query (QueryOperator, optional): The query operator. Defaults to None.
457 raw_query (dict, optional): The raw query dictionary. Defaults to None.
458 sort (SortOperator, optional): The sort operator. Defaults to None.
459 raw_sort (dict, optional): The raw sort dictionary. Defaults to None.
460 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False.
461 as_dict (bool, optional): Flag to return the results as dictionaries. Defaults to False.
462 tz_info (timezone, optional): The timezone information. Defaults to None.
463 paginate (bool, optional): Flag to enable pagination. Defaults to False.
464 current_page (int, optional): The current page number. Defaults to 1.
465 docs_per_page (int, optional): The number of documents per page. Defaults to 1000.
467 Returns:
468 list[DbModel] or ResponsePaginate: The list of found database model objects or a paginated response.
469 """
470 pipeline, query, _ = self._aggregate_pipeline(
471 Model=Model,
472 query=query,
473 raw_query=raw_query,
474 sort=sort,
475 raw_sort=raw_sort,
476 populate=populate,
477 populate_db_fields=populate_db_fields,
478 )
480 async def _result():
481 cursor = self._aggregate_cursor(
482 Model=Model, pipeline=pipeline, tz_info=tz_info
483 )
484 if as_dict:
485 result = await cursor.to_list(length=None)
486 else:
487 result = [Model(**doc) async for doc in cursor]
488 return result
490 if not paginate:
491 return await _result()
492 self._add_paginate_to_pipeline(
493 pipeline=pipeline, current_page=current_page, docs_per_page=docs_per_page
494 )
495 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
497 async def _count():
498 kwargs = {"hint": "_id_"} if not query else {}
499 return await self._db[Model._collection].count_documents(
500 filter=query, **kwargs
501 )
503 result, count = await gather(_result(), _count())
504 page_quantity = ceil(count / docs_per_page)
505 return ResponsePaginate(
506 current_page=current_page,
507 page_quantity=page_quantity,
508 docs_quantity=count,
509 docs=result,
510 )
512 async def delete(
513 self,
514 Model: Type[Model],
515 query: QueryOperator = None,
516 raw_query: dict = None,
517 delete_one: bool = False,
518 ) -> DbResponse:
519 """
520 Delete documents from the database.
522 Args:
523 Model (DbModel): The database model class.
524 query (QueryOperator, optional): The query operator. Defaults to None.
525 raw_query (dict, optional): The raw query dictionary. Defaults to None.
526 delete_one (bool, optional): Flag to delete a single document. Defaults to False.
528 Returns:
529 DbResponse: The database response object.
530 """
531 operations = self._create_delete_operations_list(
532 query=query, raw_query=raw_query, delete_one=delete_one
533 )
534 collection_name = Model._collection
535 result: BulkWriteResult = await self._db[collection_name].bulk_write(operations)
536 return self._db_response(result=result)
539class DbEngine(_Engine):
540 """
541 Synchronous database engine class that extends the base engine to provide synchronous operations.
542 """
544 def __init__(self, mongo_uri, db_name, tz_info: timezone = None):
545 """
546 Initialize the synchronous database engine.
548 Args:
549 mongo_uri (str): The MongoDB URI.
550 db_name (str): The database name.
551 tz_info (timezone, optional): The timezone information. Defaults to None.
552 """
553 super().__init__(
554 Client=MongoClient,
555 mongo_uri=mongo_uri,
556 db_name=db_name,
557 tz_info=tz_info,
558 )
560 def save_all(
561 self,
562 obj_list: list[Model],
563 ) -> dict[str, DbResponse]:
564 """
565 Save a list of objects to the database.
567 Args:
568 obj_list (list[DbModel]): The list of database model objects.
569 """
570 response = {}
571 indexes, operations, now = self._create_save_operations_list(
572 objs=obj_list, query=None, raw_query=None, upsert=True
573 )
574 for collection_name, index_list in indexes.items():
575 if index_list:
576 self._db[collection_name].create_indexes(index_list)
577 for collection_name, operation_list in operations.items():
578 result: BulkWriteResult = self._db[collection_name].bulk_write(
579 operation_list
580 )
581 self._after_save(
582 result=result, objs=obj_list, collection_name=collection_name, now=now
583 )
584 response[collection_name] = self._db_response(result=result)
585 return response
587 def save(
588 self,
589 obj: Model,
590 query: QueryOperator = None,
591 raw_query: dict = None,
592 upsert: bool = True,
593 ) -> DbResponse:
594 """
595 Save a single object to the database.
597 Args:
598 obj (DbModel): The database model object.
599 query (QueryOperator, optional): The query operator. Defaults to None.
600 raw_query (dict, optional): The raw query dictionary. Defaults to None.
602 Returns:
603 DbResponse: The database response object.
604 """
605 indexes, operations, now = self._create_save_operations_list(
606 objs=[obj], query=query, raw_query=raw_query, upsert=upsert
607 )
608 collection_name = obj._collection
609 index_list = indexes[collection_name]
610 if index_list:
611 self._db[collection_name].create_indexes(index_list)
612 operation_list = operations[collection_name]
613 result: BulkWriteResult = self._db[collection_name].bulk_write(operation_list)
614 self._after_save(
615 result=result, objs=[obj], collection_name=collection_name, now=now
616 )
617 return self._db_response(result=result)
619 def find_one(
620 self,
621 Model: Type[Model],
622 query: QueryOperator = None,
623 raw_query: dict = None,
624 sort: SortOperator = None,
625 raw_sort: dict = None,
626 populate: bool = False,
627 populate_db_fields: list[DbField] | None = None,
628 as_dict: bool = False,
629 tz_info: timezone = None,
630 ) -> Model:
631 """
632 Find a single document in the database.
634 Args:
635 Model (DbModel): The database model class.
636 query (QueryOperator, optional): The query operator. Defaults to None.
637 raw_query (dict, optional): The raw query dictionary. Defaults to None.
638 sort (SortOperator, optional): The sort operator. Defaults to None.
639 raw_sort (dict, optional): The raw sort dictionary. Defaults to None.
640 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False.
641 as_dict (bool, optional): Flag to return the result as a dictionary. Defaults to False.
642 tz_info (timezone, optional): The timezone information. Defaults to None.
644 Returns:
645 DbModel: The found database model object.
646 """
647 pipeline, _, _ = self._aggregate_pipeline(
648 Model=Model,
649 query=query,
650 raw_query=raw_query,
651 sort=sort,
652 raw_sort=raw_sort,
653 populate=populate,
654 populate_db_fields=populate_db_fields,
655 )
656 pipeline += [{"$limit": 1}]
657 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
658 if as_dict:
659 result = list(cursor)
660 else:
661 result = [Model(**doc) for doc in cursor]
662 try:
663 return result[0]
664 except IndexError:
665 return None
667 def find_many(
668 self,
669 Model: Type[Model],
670 query: QueryOperator = None,
671 raw_query: dict = None,
672 sort: SortOperator = None,
673 raw_sort: dict = None,
674 populate: bool = False,
675 populate_db_fields: list[DbField] | None = None,
676 as_dict: bool = False,
677 tz_info: timezone = None,
678 paginate: bool = False,
679 current_page: int = 1,
680 docs_per_page: int = 1000,
681 ) -> Union[list[Model], ResponsePaginate]:
682 """
683 Find multiple documents in the database.
685 Args:
686 Model (DbModel): The database model class.
687 query (QueryOperator, optional): The query operator. Defaults to None.
688 raw_query (dict, optional): The raw query dictionary. Defaults to None.
689 sort (SortOperator, optional): The sort operator. Defaults to None.
690 raw_sort (dict, optional): The raw sort dictionary. Defaults to None.
691 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False.
692 as_dict (bool, optional): Flag to return the results as dictionaries. Defaults to False.
693 tz_info (timezone, optional): The timezone information. Defaults to None.
694 paginate (bool, optional): Flag to enable pagination. Defaults to False.
695 current_page (int, optional): The current page number. Defaults to 1.
696 docs_per_page (int, optional): The number of documents per page. Defaults to 1000.
698 Returns:
699 list[DbModel] or ResponsePaginate: The list of found database model objects or a paginated response.
700 """
701 pipeline, query, _ = self._aggregate_pipeline(
702 Model=Model,
703 query=query,
704 raw_query=raw_query,
705 sort=sort,
706 raw_sort=raw_sort,
707 populate=populate,
708 populate_db_fields=populate_db_fields,
709 )
711 def _result():
712 cursor = self._aggregate_cursor(
713 Model=Model, pipeline=pipeline, tz_info=tz_info
714 )
715 if as_dict:
716 result = list(cursor)
717 else:
718 result = [Model(**doc) for doc in cursor]
719 return result
721 if not paginate:
722 return _result()
723 self._add_paginate_to_pipeline(
724 pipeline=pipeline, current_page=current_page, docs_per_page=docs_per_page
725 )
726 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
728 def _count():
729 kwargs = {"hint": "_id_"} if not query else {}
730 return self._db[Model._collection].count_documents(filter=query, **kwargs)
732 with ThreadPoolExecutor() as executor:
733 future_result = executor.submit(_result)
734 future_count = executor.submit(_count)
735 result = future_result.result()
736 count = future_count.result()
738 page_quantity = ceil(count / docs_per_page)
739 return ResponsePaginate(
740 current_page=current_page,
741 page_quantity=page_quantity,
742 docs_quantity=count,
743 docs=result,
744 )
746 def delete(
747 self,
748 Model: Type[Model],
749 query: QueryOperator = None,
750 raw_query: dict = None,
751 delete_one: bool = False,
752 ) -> DbResponse:
753 """
754 Delete documents from the database.
756 Args:
757 Model (DbModel): The database model class.
758 query (QueryOperator, optional): The query operator. Defaults to None.
759 raw_query (dict, optional): The raw query dictionary. Defaults to None.
760 delete_one (bool, optional): Flag to delete a single document. Defaults to False.
762 Returns:
763 DbResponse: The database response object.
764 """
765 operations = self._create_delete_operations_list(
766 query=query, raw_query=raw_query, delete_one=delete_one
767 )
768 collection_name = Model._collection
769 result: BulkWriteResult = self._db[collection_name].bulk_write(operations)
770 return self._db_response(result=result)