Coverage for pyodmongo/engines/engines.py: 100%
201 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-27 14:31 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-27 14:31 +0000
1from motor.motor_asyncio import AsyncIOMotorClient
2from pymongo import MongoClient, UpdateMany, DeleteOne, DeleteMany
3from pymongo.results import BulkWriteResult
4from datetime import datetime, timezone
5from bson import ObjectId
6from bson.codec_options import CodecOptions
7from ..models.db_model import DbModel
8from ..models.id_model import Id
9from ..models.responses import DbResponse
10from ..models.query_operators import QueryOperator
11from ..models.sort_operators import SortOperator
12from ..models.paginate import ResponsePaginate
13from ..models.db_field_info import DbField
14from typing import TypeVar, Type, Union
15from concurrent.futures import ThreadPoolExecutor
16from .utils import consolidate_dict, mount_base_pipeline
17from ..services.verify_subclasses import is_subclass
18from asyncio import gather
19from math import ceil
22Model = TypeVar("Model", bound=DbModel)
25class _Engine:
26 """
27 Base class for database operations, providing common functionality for both synchronous and asynchronous engines.
29 Attributes:
30 _client (MongoClient): The MongoDB client.
31 _db (Database): The database instance.
32 _tz_info (timezone): The timezone information.
33 """
35 def __init__(self, Client, mongo_uri, db_name, tz_info: timezone = None):
36 """
37 Initialize the database engine.
39 Args:
40 Client (type): The MongoDB client class.
41 mongo_uri (str): The MongoDB URI.
42 db_name (str): The database name.
43 tz_info (timezone, optional): The timezone information. Defaults to None.
44 """
45 self._client = Client(mongo_uri)
46 self._db = self._client[db_name]
47 self._tz_info = tz_info
49 def _query(self, query: QueryOperator, raw_query: dict) -> dict:
50 """
51 Construct a query dictionary from a QueryOperator or a raw query.
53 Args:
54 query (QueryOperator): The query operator.
55 raw_query (dict): The raw query dictionary.
57 Returns:
58 dict: The constructed query dictionary.
59 """
60 if not is_subclass(class_to_verify=query.__class__, subclass=QueryOperator):
61 raise TypeError(
62 'query argument must be a valid query operator from pyodmongo.queries. If you really need to make a very specific query, use "raw_query" argument'
63 )
64 raw_query = {} if not raw_query else raw_query
65 return query.to_dict() if query else raw_query
67 def _sort(self, sort: QueryOperator, raw_sort: dict) -> dict:
68 """
69 Construct a sort dictionary from a SortOperator or a raw sort.
71 Args:
72 sort (QueryOperator): The sort operator.
73 raw_sort (dict): The raw sort dictionary.
75 Returns:
76 dict: The constructed sort dictionary.
77 """
78 if sort and (type(sort) != SortOperator):
79 raise TypeError(
80 'sort argument must be a SortOperator from pyodmongo.queries. If you really need to make a very specific sort, use "raw_sort" argument'
81 )
82 raw_sort = {} if not raw_sort else raw_sort
83 return sort.to_dict() if sort else raw_sort
85 def _set_tz_info(self, tz_info: timezone):
86 """
87 Set the timezone information.
89 Args:
90 tz_info (timezone): The timezone information.
92 Returns:
93 timezone: The set timezone information.
94 """
95 return tz_info if tz_info else self._tz_info
97 def _update_many_operation(
98 self, obj: Type[Model], query_dict: dict, now, upsert: bool, populate: bool
99 ):
100 """
101 Create an UpdateMany operation for bulk updates.
103 Args:
104 obj (DbModel): The database model object.
105 query_dict (dict): The query dictionary.
106 now (datetime): The current datetime.
108 Returns:
109 UpdateMany: The UpdateMany operation.
110 """
111 dct = consolidate_dict(obj=obj, dct={}, populate=populate)
112 find_filter = query_dict or {"_id": ObjectId(dct.get("_id"))}
113 dct[obj.__class__.updated_at.field_alias] = now
114 dct.pop("_id")
115 dct.pop(obj.__class__.created_at.field_alias)
116 to_save = {
117 "$set": dct,
118 "$setOnInsert": {obj.__class__.created_at.field_alias: now},
119 }
120 return UpdateMany(filter=find_filter, update=to_save, upsert=upsert)
122 def _create_delete_operations_list(
123 self, query: QueryOperator, raw_query: dict, delete_one: bool
124 ):
125 """
126 Create a list of delete operations.
128 Args:
129 query (QueryOperator): The query operator.
130 raw_query (dict): The raw query dictionary.
131 delete_one (bool): Flag to indicate whether to delete one or many documents.
133 Returns:
134 list: The list of delete operations.
135 """
136 query = self._query(query=query, raw_query=raw_query)
137 if delete_one:
138 return [DeleteOne(filter=query)]
139 return [DeleteMany(filter=query)]
141 def _create_save_operations_list(
142 self,
143 objs: list[Type[Model]],
144 query: QueryOperator,
145 raw_query: dict,
146 upsert: bool,
147 populate: bool,
148 ):
149 """
150 Create lists of indexes and save operations for bulk writes.
152 Args:
153 objs (list[DbModel]): The list of database model objects.
154 query (QueryOperator): The query operator.
155 raw_query (dict): The raw query dictionary.
157 Returns:
158 tuple: A tuple containing indexes, operations, and the current datetime.
159 """
160 operations = {}
161 indexes = {}
162 query = self._query(query=query, raw_query=raw_query)
163 now = datetime.now(self._tz_info)
164 now = now.replace(microsecond=int(now.microsecond / 1000) * 1000)
165 for obj in objs:
166 obj: Model
167 operation = self._update_many_operation(
168 obj=obj, query_dict=query, now=now, upsert=upsert, populate=populate
169 )
170 collection_name = obj._collection
171 try:
172 operations[collection_name] += [operation]
173 except KeyError:
174 operations[collection_name] = [operation]
176 try:
177 obj_indexes = obj._indexes
178 except AttributeError:
179 obj_indexes = obj._init_indexes
180 indexes[collection_name] = obj_indexes
181 return indexes, operations, now
183 def _after_save(
184 self, result: BulkWriteResult, objs: list[Model], collection_name: str, now
185 ):
186 """
187 Perform post-save operations.
189 Args:
190 result (BulkWriteResult): The bulk write result.
191 objs (list[DbModel]): The list of database model objects.
192 collection_name (str): The name of the collection.
193 now (datetime): The current datetime.
194 """
195 objs_from_collection = list(
196 filter(lambda x: x._collection == collection_name, objs)
197 )
198 for index, obj_id in result.upserted_ids.items():
199 objs_from_collection[index].id = Id(obj_id)
200 objs_from_collection[index].created_at = now
201 objs_from_collection[index].updated_at = now
203 def _db_response(self, result: BulkWriteResult):
204 """
205 Create a database response object from a bulk write result.
207 Args:
208 result (BulkWriteResult): The bulk write result.
210 Returns:
211 DbResponse: The database response object.
212 """
213 return DbResponse(
214 acknowledged=result.acknowledged,
215 deleted_count=result.deleted_count,
216 inserted_count=result.inserted_count,
217 matched_count=result.matched_count,
218 modified_count=result.modified_count,
219 upserted_count=result.upserted_count,
220 upserted_ids=result.upserted_ids,
221 )
223 def _aggregate_cursor(
224 self,
225 Model: Type[Model],
226 pipeline,
227 tz_info: timezone,
228 ):
229 """
230 Create an aggregation cursor with the specified pipeline and timezone information.
232 Args:
233 Model (DbModel): The database model class.
234 pipeline (list): The aggregation pipeline.
235 tz_info (timezone): The timezone information.
237 Returns:
238 CommandCursor: The aggregation cursor.
239 """
240 tz_info = self._set_tz_info(tz_info=tz_info)
241 tz_aware = True if tz_info else False
242 collection = self._db[Model._collection].with_options(
243 codec_options=CodecOptions(tz_aware=tz_aware, tzinfo=tz_info)
244 )
245 return collection.aggregate(pipeline)
247 def _aggregate_pipeline(
248 self,
249 Model: Type[Model],
250 query: QueryOperator,
251 raw_query: dict,
252 sort: SortOperator,
253 raw_sort: dict,
254 populate: bool,
255 pipeline: list | None,
256 populate_db_fields: list[DbField] | None,
257 paginate: int,
258 current_page: int,
259 docs_per_page: int,
260 is_find_one: bool,
261 ) -> dict:
262 """
263 Construct an aggregation pipeline.
265 Args:
266 Model (DbModel): The database model class.
267 query (QueryOperator): The query operator.
268 raw_query (dict): The raw query dictionary.
269 sort (SortOperator): The sort operator.
270 raw_sort (dict): The raw sort dictionary.
271 populate (bool): Flag to indicate whether to populate related documents.
273 Returns:
274 tuple: A tuple containing the pipeline, query, and sort dictionaries.
275 """
276 query = self._query(query=query, raw_query=raw_query)
277 sort = self._sort(sort=sort, raw_sort=raw_sort)
278 return (
279 mount_base_pipeline(
280 Model=Model,
281 query=query,
282 sort=sort,
283 populate=populate,
284 pipeline=pipeline,
285 populate_db_fields=populate_db_fields,
286 paginate=paginate,
287 current_page=current_page,
288 docs_per_page=docs_per_page,
289 is_find_one=is_find_one,
290 ),
291 query,
292 sort,
293 )
296class AsyncDbEngine(_Engine):
297 """
298 Asynchronous database engine class that extends the base engine to provide asynchronous operations.
299 """
301 def __init__(self, mongo_uri, db_name, tz_info: timezone = None):
302 """
303 Initialize the asynchronous database engine.
305 Args:
306 mongo_uri (str): The MongoDB URI.
307 db_name (str): The database name.
308 tz_info (timezone, optional): The timezone information. Defaults to None.
309 """
310 super().__init__(
311 Client=AsyncIOMotorClient,
312 mongo_uri=mongo_uri,
313 db_name=db_name,
314 tz_info=tz_info,
315 )
317 async def save_all(
318 self,
319 obj_list: list[Model],
320 populate: bool = False,
321 ) -> dict[str, DbResponse]:
322 """
323 Save a list of objects to the database.
325 Args:
326 obj_list (list[DbModel]): The list of database model objects.
327 """
328 response = {}
329 indexes, operations, now = self._create_save_operations_list(
330 objs=obj_list, query=None, raw_query=None, upsert=True, populate=populate
331 )
332 for collection_name, index_list in indexes.items():
333 if index_list:
334 await self._db[collection_name].create_indexes(index_list)
335 for collection_name, operation_list in operations.items():
336 result: BulkWriteResult = await self._db[collection_name].bulk_write(
337 operation_list
338 )
339 self._after_save(
340 result=result, objs=obj_list, collection_name=collection_name, now=now
341 )
342 response[collection_name] = self._db_response(result=result)
343 return response
345 async def save(
346 self,
347 obj: Model,
348 query: QueryOperator = None,
349 raw_query: dict = None,
350 upsert: bool = True,
351 populate: bool = False,
352 ) -> DbResponse:
353 """
354 Save a single object to the database.
356 Args:
357 obj (DbModel): The database model object.
358 query (QueryOperator, optional): The query operator. Defaults to None.
359 raw_query (dict, optional): The raw query dictionary. Defaults to None.
361 Returns:
362 DbResponse: The database response object.
363 """
364 indexes, operations, now = self._create_save_operations_list(
365 objs=[obj],
366 query=query,
367 raw_query=raw_query,
368 upsert=upsert,
369 populate=populate,
370 )
371 collection_name = obj._collection
372 index_list = indexes[collection_name]
373 if index_list:
374 await self._db[collection_name].create_indexes(index_list)
375 operation_list = operations[collection_name]
376 result: BulkWriteResult = await self._db[collection_name].bulk_write(
377 operation_list
378 )
379 self._after_save(
380 result=result, objs=[obj], collection_name=collection_name, now=now
381 )
382 return self._db_response(result=result)
384 async def find_one(
385 self,
386 Model: Type[Model],
387 query: QueryOperator = None,
388 raw_query: dict = None,
389 sort: SortOperator = None,
390 raw_sort: dict = None,
391 populate: bool = False,
392 pipeline: list = None,
393 populate_db_fields: list[DbField] | None = None,
394 as_dict: bool = False,
395 tz_info: timezone = None,
396 ) -> Model:
397 """
398 Find a single document in the database.
400 Args:
401 Model (DbModel): The database model class.
402 query (QueryOperator, optional): The query operator. Defaults to None.
403 raw_query (dict, optional): The raw query dictionary. Defaults to None.
404 sort (SortOperator, optional): The sort operator. Defaults to None.
405 raw_sort (dict, optional): The raw sort dictionary. Defaults to None.
406 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False.
407 as_dict (bool, optional): Flag to return the result as a dictionary. Defaults to False.
408 tz_info (timezone, optional): The timezone information. Defaults to None.
410 Returns:
411 DbModel: The found database model object.
412 """
413 pipeline, _, _ = self._aggregate_pipeline(
414 Model=Model,
415 query=query,
416 raw_query=raw_query,
417 sort=sort,
418 raw_sort=raw_sort,
419 populate=populate,
420 pipeline=pipeline,
421 populate_db_fields=populate_db_fields,
422 paginate=False,
423 current_page=1,
424 docs_per_page=1,
425 is_find_one=True,
426 )
427 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
428 if as_dict:
429 result = await cursor.to_list(length=None)
430 else:
431 result = [Model(**doc) async for doc in cursor]
432 try:
433 return result[0]
434 except IndexError:
435 return None
437 async def find_many(
438 self,
439 Model: Type[Model],
440 query: QueryOperator = None,
441 raw_query: dict = None,
442 sort: SortOperator = None,
443 raw_sort: dict = None,
444 populate: bool = False,
445 pipeline: list = None,
446 populate_db_fields: list[DbField] | None = None,
447 as_dict: bool = False,
448 tz_info: timezone = None,
449 paginate: bool = False,
450 current_page: int = 1,
451 docs_per_page: int = 1000,
452 ) -> Union[list[Model], ResponsePaginate]:
453 """
454 Find multiple documents in the database.
456 Args:
457 Model (DbModel): The database model class.
458 query (QueryOperator, optional): The query operator. Defaults to None.
459 raw_query (dict, optional): The raw query dictionary. Defaults to None.
460 sort (SortOperator, optional): The sort operator. Defaults to None.
461 raw_sort (dict, optional): The raw sort dictionary. Defaults to None.
462 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False.
463 as_dict (bool, optional): Flag to return the results as dictionaries. Defaults to False.
464 tz_info (timezone, optional): The timezone information. Defaults to None.
465 paginate (bool, optional): Flag to enable pagination. Defaults to False.
466 current_page (int, optional): The current page number. Defaults to 1.
467 docs_per_page (int, optional): The number of documents per page. Defaults to 1000.
469 Returns:
470 list[DbModel] or ResponsePaginate: The list of found database model objects or a paginated response.
471 """
472 pipeline, query, _ = self._aggregate_pipeline(
473 Model=Model,
474 query=query,
475 raw_query=raw_query,
476 sort=sort,
477 raw_sort=raw_sort,
478 populate=populate,
479 pipeline=pipeline,
480 populate_db_fields=populate_db_fields,
481 paginate=paginate,
482 current_page=current_page,
483 docs_per_page=docs_per_page,
484 is_find_one=False,
485 )
487 async def _result():
488 cursor = self._aggregate_cursor(
489 Model=Model, pipeline=pipeline, tz_info=tz_info
490 )
491 if as_dict:
492 result = await cursor.to_list(length=None)
493 else:
494 result = [Model(**doc) async for doc in cursor]
495 return result
497 if not paginate:
498 return await _result()
500 async def _count():
501 kwargs = {"hint": "_id_"} if not query else {}
502 return await self._db[Model._collection].count_documents(
503 filter=query, **kwargs
504 )
506 result, count = await gather(_result(), _count())
507 page_quantity = ceil(count / docs_per_page)
508 return ResponsePaginate(
509 current_page=current_page,
510 page_quantity=page_quantity,
511 docs_quantity=count,
512 docs=result,
513 )
515 async def delete(
516 self,
517 Model: Type[Model],
518 query: QueryOperator = None,
519 raw_query: dict = None,
520 delete_one: bool = False,
521 ) -> DbResponse:
522 """
523 Delete documents from the database.
525 Args:
526 Model (DbModel): The database model class.
527 query (QueryOperator, optional): The query operator. Defaults to None.
528 raw_query (dict, optional): The raw query dictionary. Defaults to None.
529 delete_one (bool, optional): Flag to delete a single document. Defaults to False.
531 Returns:
532 DbResponse: The database response object.
533 """
534 operations = self._create_delete_operations_list(
535 query=query, raw_query=raw_query, delete_one=delete_one
536 )
537 collection_name = Model._collection
538 result: BulkWriteResult = await self._db[collection_name].bulk_write(operations)
539 return self._db_response(result=result)
542class DbEngine(_Engine):
543 """
544 Synchronous database engine class that extends the base engine to provide synchronous operations.
545 """
547 def __init__(self, mongo_uri, db_name, tz_info: timezone = None):
548 """
549 Initialize the synchronous database engine.
551 Args:
552 mongo_uri (str): The MongoDB URI.
553 db_name (str): The database name.
554 tz_info (timezone, optional): The timezone information. Defaults to None.
555 """
556 super().__init__(
557 Client=MongoClient,
558 mongo_uri=mongo_uri,
559 db_name=db_name,
560 tz_info=tz_info,
561 )
563 def save_all(
564 self,
565 obj_list: list[Model],
566 populate: bool = False,
567 ) -> dict[str, DbResponse]:
568 """
569 Save a list of objects to the database.
571 Args:
572 obj_list (list[DbModel]): The list of database model objects.
573 """
574 response = {}
575 indexes, operations, now = self._create_save_operations_list(
576 objs=obj_list, query=None, raw_query=None, upsert=True, populate=populate
577 )
578 for collection_name, index_list in indexes.items():
579 if index_list:
580 self._db[collection_name].create_indexes(index_list)
581 for collection_name, operation_list in operations.items():
582 result: BulkWriteResult = self._db[collection_name].bulk_write(
583 operation_list
584 )
585 self._after_save(
586 result=result, objs=obj_list, collection_name=collection_name, now=now
587 )
588 response[collection_name] = self._db_response(result=result)
589 return response
591 def save(
592 self,
593 obj: Model,
594 query: QueryOperator = None,
595 raw_query: dict = None,
596 upsert: bool = True,
597 populate: bool = False,
598 ) -> DbResponse:
599 """
600 Save a single object to the database.
602 Args:
603 obj (DbModel): The database model object.
604 query (QueryOperator, optional): The query operator. Defaults to None.
605 raw_query (dict, optional): The raw query dictionary. Defaults to None.
607 Returns:
608 DbResponse: The database response object.
609 """
610 indexes, operations, now = self._create_save_operations_list(
611 objs=[obj],
612 query=query,
613 raw_query=raw_query,
614 upsert=upsert,
615 populate=populate,
616 )
617 collection_name = obj._collection
618 index_list = indexes[collection_name]
619 if index_list:
620 self._db[collection_name].create_indexes(index_list)
621 operation_list = operations[collection_name]
622 result: BulkWriteResult = self._db[collection_name].bulk_write(operation_list)
623 self._after_save(
624 result=result, objs=[obj], collection_name=collection_name, now=now
625 )
626 return self._db_response(result=result)
628 def find_one(
629 self,
630 Model: Type[Model],
631 query: QueryOperator = None,
632 raw_query: dict = None,
633 sort: SortOperator = None,
634 raw_sort: dict = None,
635 populate: bool = False,
636 pipeline: list = None,
637 populate_db_fields: list[DbField] | None = None,
638 as_dict: bool = False,
639 tz_info: timezone = None,
640 ) -> Model:
641 """
642 Find a single document in the database.
644 Args:
645 Model (DbModel): The database model class.
646 query (QueryOperator, optional): The query operator. Defaults to None.
647 raw_query (dict, optional): The raw query dictionary. Defaults to None.
648 sort (SortOperator, optional): The sort operator. Defaults to None.
649 raw_sort (dict, optional): The raw sort dictionary. Defaults to None.
650 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False.
651 as_dict (bool, optional): Flag to return the result as a dictionary. Defaults to False.
652 tz_info (timezone, optional): The timezone information. Defaults to None.
654 Returns:
655 DbModel: The found database model object.
656 """
657 pipeline, _, _ = self._aggregate_pipeline(
658 Model=Model,
659 query=query,
660 raw_query=raw_query,
661 sort=sort,
662 raw_sort=raw_sort,
663 populate=populate,
664 pipeline=pipeline,
665 populate_db_fields=populate_db_fields,
666 paginate=False,
667 current_page=1,
668 docs_per_page=1,
669 is_find_one=True,
670 )
671 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
672 if as_dict:
673 result = list(cursor)
674 else:
675 result = [Model(**doc) for doc in cursor]
676 try:
677 return result[0]
678 except IndexError:
679 return None
681 def find_many(
682 self,
683 Model: Type[Model],
684 query: QueryOperator = None,
685 raw_query: dict = None,
686 sort: SortOperator = None,
687 raw_sort: dict = None,
688 populate: bool = False,
689 pipeline: list = None,
690 populate_db_fields: list[DbField] | None = None,
691 as_dict: bool = False,
692 tz_info: timezone = None,
693 paginate: bool = False,
694 current_page: int = 1,
695 docs_per_page: int = 1000,
696 ) -> Union[list[Model], ResponsePaginate]:
697 """
698 Find multiple documents in the database.
700 Args:
701 Model (DbModel): The database model class.
702 query (QueryOperator, optional): The query operator. Defaults to None.
703 raw_query (dict, optional): The raw query dictionary. Defaults to None.
704 sort (SortOperator, optional): The sort operator. Defaults to None.
705 raw_sort (dict, optional): The raw sort dictionary. Defaults to None.
706 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False.
707 as_dict (bool, optional): Flag to return the results as dictionaries. Defaults to False.
708 tz_info (timezone, optional): The timezone information. Defaults to None.
709 paginate (bool, optional): Flag to enable pagination. Defaults to False.
710 current_page (int, optional): The current page number. Defaults to 1.
711 docs_per_page (int, optional): The number of documents per page. Defaults to 1000.
713 Returns:
714 list[DbModel] or ResponsePaginate: The list of found database model objects or a paginated response.
715 """
716 pipeline, query, _ = self._aggregate_pipeline(
717 Model=Model,
718 query=query,
719 raw_query=raw_query,
720 sort=sort,
721 raw_sort=raw_sort,
722 populate=populate,
723 pipeline=pipeline,
724 populate_db_fields=populate_db_fields,
725 paginate=paginate,
726 current_page=current_page,
727 docs_per_page=docs_per_page,
728 is_find_one=False,
729 )
731 def _result():
732 cursor = self._aggregate_cursor(
733 Model=Model, pipeline=pipeline, tz_info=tz_info
734 )
735 if as_dict:
736 result = list(cursor)
737 else:
738 result = [Model(**doc) for doc in cursor]
739 return result
741 if not paginate:
742 return _result()
744 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info)
746 def _count():
747 kwargs = {"hint": "_id_"} if not query else {}
748 return self._db[Model._collection].count_documents(filter=query, **kwargs)
750 with ThreadPoolExecutor() as executor:
751 future_result = executor.submit(_result)
752 future_count = executor.submit(_count)
753 result = future_result.result()
754 count = future_count.result()
756 page_quantity = ceil(count / docs_per_page)
757 return ResponsePaginate(
758 current_page=current_page,
759 page_quantity=page_quantity,
760 docs_quantity=count,
761 docs=result,
762 )
764 def delete(
765 self,
766 Model: Type[Model],
767 query: QueryOperator = None,
768 raw_query: dict = None,
769 delete_one: bool = False,
770 ) -> DbResponse:
771 """
772 Delete documents from the database.
774 Args:
775 Model (DbModel): The database model class.
776 query (QueryOperator, optional): The query operator. Defaults to None.
777 raw_query (dict, optional): The raw query dictionary. Defaults to None.
778 delete_one (bool, optional): Flag to delete a single document. Defaults to False.
780 Returns:
781 DbResponse: The database response object.
782 """
783 operations = self._create_delete_operations_list(
784 query=query, raw_query=raw_query, delete_one=delete_one
785 )
786 collection_name = Model._collection
787 result: BulkWriteResult = self._db[collection_name].bulk_write(operations)
788 return self._db_response(result=result)