Coverage for pyodmongo/engines/engines.py: 100%

201 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-27 14:31 +0000

1from motor.motor_asyncio import AsyncIOMotorClient 

2from pymongo import MongoClient, UpdateMany, DeleteOne, DeleteMany 

3from pymongo.results import BulkWriteResult 

4from datetime import datetime, timezone 

5from bson import ObjectId 

6from bson.codec_options import CodecOptions 

7from ..models.db_model import DbModel 

8from ..models.id_model import Id 

9from ..models.responses import DbResponse 

10from ..models.query_operators import QueryOperator 

11from ..models.sort_operators import SortOperator 

12from ..models.paginate import ResponsePaginate 

13from ..models.db_field_info import DbField 

14from typing import TypeVar, Type, Union 

15from concurrent.futures import ThreadPoolExecutor 

16from .utils import consolidate_dict, mount_base_pipeline 

17from ..services.verify_subclasses import is_subclass 

18from asyncio import gather 

19from math import ceil 

20 

21 

22Model = TypeVar("Model", bound=DbModel) 

23 

24 

25class _Engine: 

26 """ 

27 Base class for database operations, providing common functionality for both synchronous and asynchronous engines. 

28 

29 Attributes: 

30 _client (MongoClient): The MongoDB client. 

31 _db (Database): The database instance. 

32 _tz_info (timezone): The timezone information. 

33 """ 

34 

35 def __init__(self, Client, mongo_uri, db_name, tz_info: timezone = None): 

36 """ 

37 Initialize the database engine. 

38 

39 Args: 

40 Client (type): The MongoDB client class. 

41 mongo_uri (str): The MongoDB URI. 

42 db_name (str): The database name. 

43 tz_info (timezone, optional): The timezone information. Defaults to None. 

44 """ 

45 self._client = Client(mongo_uri) 

46 self._db = self._client[db_name] 

47 self._tz_info = tz_info 

48 

49 def _query(self, query: QueryOperator, raw_query: dict) -> dict: 

50 """ 

51 Construct a query dictionary from a QueryOperator or a raw query. 

52 

53 Args: 

54 query (QueryOperator): The query operator. 

55 raw_query (dict): The raw query dictionary. 

56 

57 Returns: 

58 dict: The constructed query dictionary. 

59 """ 

60 if not is_subclass(class_to_verify=query.__class__, subclass=QueryOperator): 

61 raise TypeError( 

62 'query argument must be a valid query operator from pyodmongo.queries. If you really need to make a very specific query, use "raw_query" argument' 

63 ) 

64 raw_query = {} if not raw_query else raw_query 

65 return query.to_dict() if query else raw_query 

66 

67 def _sort(self, sort: QueryOperator, raw_sort: dict) -> dict: 

68 """ 

69 Construct a sort dictionary from a SortOperator or a raw sort. 

70 

71 Args: 

72 sort (QueryOperator): The sort operator. 

73 raw_sort (dict): The raw sort dictionary. 

74 

75 Returns: 

76 dict: The constructed sort dictionary. 

77 """ 

78 if sort and (type(sort) != SortOperator): 

79 raise TypeError( 

80 'sort argument must be a SortOperator from pyodmongo.queries. If you really need to make a very specific sort, use "raw_sort" argument' 

81 ) 

82 raw_sort = {} if not raw_sort else raw_sort 

83 return sort.to_dict() if sort else raw_sort 

84 

85 def _set_tz_info(self, tz_info: timezone): 

86 """ 

87 Set the timezone information. 

88 

89 Args: 

90 tz_info (timezone): The timezone information. 

91 

92 Returns: 

93 timezone: The set timezone information. 

94 """ 

95 return tz_info if tz_info else self._tz_info 

96 

97 def _update_many_operation( 

98 self, obj: Type[Model], query_dict: dict, now, upsert: bool, populate: bool 

99 ): 

100 """ 

101 Create an UpdateMany operation for bulk updates. 

102 

103 Args: 

104 obj (DbModel): The database model object. 

105 query_dict (dict): The query dictionary. 

106 now (datetime): The current datetime. 

107 

108 Returns: 

109 UpdateMany: The UpdateMany operation. 

110 """ 

111 dct = consolidate_dict(obj=obj, dct={}, populate=populate) 

112 find_filter = query_dict or {"_id": ObjectId(dct.get("_id"))} 

113 dct[obj.__class__.updated_at.field_alias] = now 

114 dct.pop("_id") 

115 dct.pop(obj.__class__.created_at.field_alias) 

116 to_save = { 

117 "$set": dct, 

118 "$setOnInsert": {obj.__class__.created_at.field_alias: now}, 

119 } 

120 return UpdateMany(filter=find_filter, update=to_save, upsert=upsert) 

121 

122 def _create_delete_operations_list( 

123 self, query: QueryOperator, raw_query: dict, delete_one: bool 

124 ): 

125 """ 

126 Create a list of delete operations. 

127 

128 Args: 

129 query (QueryOperator): The query operator. 

130 raw_query (dict): The raw query dictionary. 

131 delete_one (bool): Flag to indicate whether to delete one or many documents. 

132 

133 Returns: 

134 list: The list of delete operations. 

135 """ 

136 query = self._query(query=query, raw_query=raw_query) 

137 if delete_one: 

138 return [DeleteOne(filter=query)] 

139 return [DeleteMany(filter=query)] 

140 

141 def _create_save_operations_list( 

142 self, 

143 objs: list[Type[Model]], 

144 query: QueryOperator, 

145 raw_query: dict, 

146 upsert: bool, 

147 populate: bool, 

148 ): 

149 """ 

150 Create lists of indexes and save operations for bulk writes. 

151 

152 Args: 

153 objs (list[DbModel]): The list of database model objects. 

154 query (QueryOperator): The query operator. 

155 raw_query (dict): The raw query dictionary. 

156 

157 Returns: 

158 tuple: A tuple containing indexes, operations, and the current datetime. 

159 """ 

160 operations = {} 

161 indexes = {} 

162 query = self._query(query=query, raw_query=raw_query) 

163 now = datetime.now(self._tz_info) 

164 now = now.replace(microsecond=int(now.microsecond / 1000) * 1000) 

165 for obj in objs: 

166 obj: Model 

167 operation = self._update_many_operation( 

168 obj=obj, query_dict=query, now=now, upsert=upsert, populate=populate 

169 ) 

170 collection_name = obj._collection 

171 try: 

172 operations[collection_name] += [operation] 

173 except KeyError: 

174 operations[collection_name] = [operation] 

175 

176 try: 

177 obj_indexes = obj._indexes 

178 except AttributeError: 

179 obj_indexes = obj._init_indexes 

180 indexes[collection_name] = obj_indexes 

181 return indexes, operations, now 

182 

183 def _after_save( 

184 self, result: BulkWriteResult, objs: list[Model], collection_name: str, now 

185 ): 

186 """ 

187 Perform post-save operations. 

188 

189 Args: 

190 result (BulkWriteResult): The bulk write result. 

191 objs (list[DbModel]): The list of database model objects. 

192 collection_name (str): The name of the collection. 

193 now (datetime): The current datetime. 

194 """ 

195 objs_from_collection = list( 

196 filter(lambda x: x._collection == collection_name, objs) 

197 ) 

198 for index, obj_id in result.upserted_ids.items(): 

199 objs_from_collection[index].id = Id(obj_id) 

200 objs_from_collection[index].created_at = now 

201 objs_from_collection[index].updated_at = now 

202 

203 def _db_response(self, result: BulkWriteResult): 

204 """ 

205 Create a database response object from a bulk write result. 

206 

207 Args: 

208 result (BulkWriteResult): The bulk write result. 

209 

210 Returns: 

211 DbResponse: The database response object. 

212 """ 

213 return DbResponse( 

214 acknowledged=result.acknowledged, 

215 deleted_count=result.deleted_count, 

216 inserted_count=result.inserted_count, 

217 matched_count=result.matched_count, 

218 modified_count=result.modified_count, 

219 upserted_count=result.upserted_count, 

220 upserted_ids=result.upserted_ids, 

221 ) 

222 

223 def _aggregate_cursor( 

224 self, 

225 Model: Type[Model], 

226 pipeline, 

227 tz_info: timezone, 

228 ): 

229 """ 

230 Create an aggregation cursor with the specified pipeline and timezone information. 

231 

232 Args: 

233 Model (DbModel): The database model class. 

234 pipeline (list): The aggregation pipeline. 

235 tz_info (timezone): The timezone information. 

236 

237 Returns: 

238 CommandCursor: The aggregation cursor. 

239 """ 

240 tz_info = self._set_tz_info(tz_info=tz_info) 

241 tz_aware = True if tz_info else False 

242 collection = self._db[Model._collection].with_options( 

243 codec_options=CodecOptions(tz_aware=tz_aware, tzinfo=tz_info) 

244 ) 

245 return collection.aggregate(pipeline) 

246 

247 def _aggregate_pipeline( 

248 self, 

249 Model: Type[Model], 

250 query: QueryOperator, 

251 raw_query: dict, 

252 sort: SortOperator, 

253 raw_sort: dict, 

254 populate: bool, 

255 pipeline: list | None, 

256 populate_db_fields: list[DbField] | None, 

257 paginate: int, 

258 current_page: int, 

259 docs_per_page: int, 

260 is_find_one: bool, 

261 ) -> dict: 

262 """ 

263 Construct an aggregation pipeline. 

264 

265 Args: 

266 Model (DbModel): The database model class. 

267 query (QueryOperator): The query operator. 

268 raw_query (dict): The raw query dictionary. 

269 sort (SortOperator): The sort operator. 

270 raw_sort (dict): The raw sort dictionary. 

271 populate (bool): Flag to indicate whether to populate related documents. 

272 

273 Returns: 

274 tuple: A tuple containing the pipeline, query, and sort dictionaries. 

275 """ 

276 query = self._query(query=query, raw_query=raw_query) 

277 sort = self._sort(sort=sort, raw_sort=raw_sort) 

278 return ( 

279 mount_base_pipeline( 

280 Model=Model, 

281 query=query, 

282 sort=sort, 

283 populate=populate, 

284 pipeline=pipeline, 

285 populate_db_fields=populate_db_fields, 

286 paginate=paginate, 

287 current_page=current_page, 

288 docs_per_page=docs_per_page, 

289 is_find_one=is_find_one, 

290 ), 

291 query, 

292 sort, 

293 ) 

294 

295 

296class AsyncDbEngine(_Engine): 

297 """ 

298 Asynchronous database engine class that extends the base engine to provide asynchronous operations. 

299 """ 

300 

301 def __init__(self, mongo_uri, db_name, tz_info: timezone = None): 

302 """ 

303 Initialize the asynchronous database engine. 

304 

305 Args: 

306 mongo_uri (str): The MongoDB URI. 

307 db_name (str): The database name. 

308 tz_info (timezone, optional): The timezone information. Defaults to None. 

309 """ 

310 super().__init__( 

311 Client=AsyncIOMotorClient, 

312 mongo_uri=mongo_uri, 

313 db_name=db_name, 

314 tz_info=tz_info, 

315 ) 

316 

317 async def save_all( 

318 self, 

319 obj_list: list[Model], 

320 populate: bool = False, 

321 ) -> dict[str, DbResponse]: 

322 """ 

323 Save a list of objects to the database. 

324 

325 Args: 

326 obj_list (list[DbModel]): The list of database model objects. 

327 """ 

328 response = {} 

329 indexes, operations, now = self._create_save_operations_list( 

330 objs=obj_list, query=None, raw_query=None, upsert=True, populate=populate 

331 ) 

332 for collection_name, index_list in indexes.items(): 

333 if index_list: 

334 await self._db[collection_name].create_indexes(index_list) 

335 for collection_name, operation_list in operations.items(): 

336 result: BulkWriteResult = await self._db[collection_name].bulk_write( 

337 operation_list 

338 ) 

339 self._after_save( 

340 result=result, objs=obj_list, collection_name=collection_name, now=now 

341 ) 

342 response[collection_name] = self._db_response(result=result) 

343 return response 

344 

345 async def save( 

346 self, 

347 obj: Model, 

348 query: QueryOperator = None, 

349 raw_query: dict = None, 

350 upsert: bool = True, 

351 populate: bool = False, 

352 ) -> DbResponse: 

353 """ 

354 Save a single object to the database. 

355 

356 Args: 

357 obj (DbModel): The database model object. 

358 query (QueryOperator, optional): The query operator. Defaults to None. 

359 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

360 

361 Returns: 

362 DbResponse: The database response object. 

363 """ 

364 indexes, operations, now = self._create_save_operations_list( 

365 objs=[obj], 

366 query=query, 

367 raw_query=raw_query, 

368 upsert=upsert, 

369 populate=populate, 

370 ) 

371 collection_name = obj._collection 

372 index_list = indexes[collection_name] 

373 if index_list: 

374 await self._db[collection_name].create_indexes(index_list) 

375 operation_list = operations[collection_name] 

376 result: BulkWriteResult = await self._db[collection_name].bulk_write( 

377 operation_list 

378 ) 

379 self._after_save( 

380 result=result, objs=[obj], collection_name=collection_name, now=now 

381 ) 

382 return self._db_response(result=result) 

383 

384 async def find_one( 

385 self, 

386 Model: Type[Model], 

387 query: QueryOperator = None, 

388 raw_query: dict = None, 

389 sort: SortOperator = None, 

390 raw_sort: dict = None, 

391 populate: bool = False, 

392 pipeline: list = None, 

393 populate_db_fields: list[DbField] | None = None, 

394 as_dict: bool = False, 

395 tz_info: timezone = None, 

396 ) -> Model: 

397 """ 

398 Find a single document in the database. 

399 

400 Args: 

401 Model (DbModel): The database model class. 

402 query (QueryOperator, optional): The query operator. Defaults to None. 

403 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

404 sort (SortOperator, optional): The sort operator. Defaults to None. 

405 raw_sort (dict, optional): The raw sort dictionary. Defaults to None. 

406 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False. 

407 as_dict (bool, optional): Flag to return the result as a dictionary. Defaults to False. 

408 tz_info (timezone, optional): The timezone information. Defaults to None. 

409 

410 Returns: 

411 DbModel: The found database model object. 

412 """ 

413 pipeline, _, _ = self._aggregate_pipeline( 

414 Model=Model, 

415 query=query, 

416 raw_query=raw_query, 

417 sort=sort, 

418 raw_sort=raw_sort, 

419 populate=populate, 

420 pipeline=pipeline, 

421 populate_db_fields=populate_db_fields, 

422 paginate=False, 

423 current_page=1, 

424 docs_per_page=1, 

425 is_find_one=True, 

426 ) 

427 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

428 if as_dict: 

429 result = await cursor.to_list(length=None) 

430 else: 

431 result = [Model(**doc) async for doc in cursor] 

432 try: 

433 return result[0] 

434 except IndexError: 

435 return None 

436 

437 async def find_many( 

438 self, 

439 Model: Type[Model], 

440 query: QueryOperator = None, 

441 raw_query: dict = None, 

442 sort: SortOperator = None, 

443 raw_sort: dict = None, 

444 populate: bool = False, 

445 pipeline: list = None, 

446 populate_db_fields: list[DbField] | None = None, 

447 as_dict: bool = False, 

448 tz_info: timezone = None, 

449 paginate: bool = False, 

450 current_page: int = 1, 

451 docs_per_page: int = 1000, 

452 ) -> Union[list[Model], ResponsePaginate]: 

453 """ 

454 Find multiple documents in the database. 

455 

456 Args: 

457 Model (DbModel): The database model class. 

458 query (QueryOperator, optional): The query operator. Defaults to None. 

459 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

460 sort (SortOperator, optional): The sort operator. Defaults to None. 

461 raw_sort (dict, optional): The raw sort dictionary. Defaults to None. 

462 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False. 

463 as_dict (bool, optional): Flag to return the results as dictionaries. Defaults to False. 

464 tz_info (timezone, optional): The timezone information. Defaults to None. 

465 paginate (bool, optional): Flag to enable pagination. Defaults to False. 

466 current_page (int, optional): The current page number. Defaults to 1. 

467 docs_per_page (int, optional): The number of documents per page. Defaults to 1000. 

468 

469 Returns: 

470 list[DbModel] or ResponsePaginate: The list of found database model objects or a paginated response. 

471 """ 

472 pipeline, query, _ = self._aggregate_pipeline( 

473 Model=Model, 

474 query=query, 

475 raw_query=raw_query, 

476 sort=sort, 

477 raw_sort=raw_sort, 

478 populate=populate, 

479 pipeline=pipeline, 

480 populate_db_fields=populate_db_fields, 

481 paginate=paginate, 

482 current_page=current_page, 

483 docs_per_page=docs_per_page, 

484 is_find_one=False, 

485 ) 

486 

487 async def _result(): 

488 cursor = self._aggregate_cursor( 

489 Model=Model, pipeline=pipeline, tz_info=tz_info 

490 ) 

491 if as_dict: 

492 result = await cursor.to_list(length=None) 

493 else: 

494 result = [Model(**doc) async for doc in cursor] 

495 return result 

496 

497 if not paginate: 

498 return await _result() 

499 

500 async def _count(): 

501 kwargs = {"hint": "_id_"} if not query else {} 

502 return await self._db[Model._collection].count_documents( 

503 filter=query, **kwargs 

504 ) 

505 

506 result, count = await gather(_result(), _count()) 

507 page_quantity = ceil(count / docs_per_page) 

508 return ResponsePaginate( 

509 current_page=current_page, 

510 page_quantity=page_quantity, 

511 docs_quantity=count, 

512 docs=result, 

513 ) 

514 

515 async def delete( 

516 self, 

517 Model: Type[Model], 

518 query: QueryOperator = None, 

519 raw_query: dict = None, 

520 delete_one: bool = False, 

521 ) -> DbResponse: 

522 """ 

523 Delete documents from the database. 

524 

525 Args: 

526 Model (DbModel): The database model class. 

527 query (QueryOperator, optional): The query operator. Defaults to None. 

528 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

529 delete_one (bool, optional): Flag to delete a single document. Defaults to False. 

530 

531 Returns: 

532 DbResponse: The database response object. 

533 """ 

534 operations = self._create_delete_operations_list( 

535 query=query, raw_query=raw_query, delete_one=delete_one 

536 ) 

537 collection_name = Model._collection 

538 result: BulkWriteResult = await self._db[collection_name].bulk_write(operations) 

539 return self._db_response(result=result) 

540 

541 

542class DbEngine(_Engine): 

543 """ 

544 Synchronous database engine class that extends the base engine to provide synchronous operations. 

545 """ 

546 

547 def __init__(self, mongo_uri, db_name, tz_info: timezone = None): 

548 """ 

549 Initialize the synchronous database engine. 

550 

551 Args: 

552 mongo_uri (str): The MongoDB URI. 

553 db_name (str): The database name. 

554 tz_info (timezone, optional): The timezone information. Defaults to None. 

555 """ 

556 super().__init__( 

557 Client=MongoClient, 

558 mongo_uri=mongo_uri, 

559 db_name=db_name, 

560 tz_info=tz_info, 

561 ) 

562 

563 def save_all( 

564 self, 

565 obj_list: list[Model], 

566 populate: bool = False, 

567 ) -> dict[str, DbResponse]: 

568 """ 

569 Save a list of objects to the database. 

570 

571 Args: 

572 obj_list (list[DbModel]): The list of database model objects. 

573 """ 

574 response = {} 

575 indexes, operations, now = self._create_save_operations_list( 

576 objs=obj_list, query=None, raw_query=None, upsert=True, populate=populate 

577 ) 

578 for collection_name, index_list in indexes.items(): 

579 if index_list: 

580 self._db[collection_name].create_indexes(index_list) 

581 for collection_name, operation_list in operations.items(): 

582 result: BulkWriteResult = self._db[collection_name].bulk_write( 

583 operation_list 

584 ) 

585 self._after_save( 

586 result=result, objs=obj_list, collection_name=collection_name, now=now 

587 ) 

588 response[collection_name] = self._db_response(result=result) 

589 return response 

590 

591 def save( 

592 self, 

593 obj: Model, 

594 query: QueryOperator = None, 

595 raw_query: dict = None, 

596 upsert: bool = True, 

597 populate: bool = False, 

598 ) -> DbResponse: 

599 """ 

600 Save a single object to the database. 

601 

602 Args: 

603 obj (DbModel): The database model object. 

604 query (QueryOperator, optional): The query operator. Defaults to None. 

605 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

606 

607 Returns: 

608 DbResponse: The database response object. 

609 """ 

610 indexes, operations, now = self._create_save_operations_list( 

611 objs=[obj], 

612 query=query, 

613 raw_query=raw_query, 

614 upsert=upsert, 

615 populate=populate, 

616 ) 

617 collection_name = obj._collection 

618 index_list = indexes[collection_name] 

619 if index_list: 

620 self._db[collection_name].create_indexes(index_list) 

621 operation_list = operations[collection_name] 

622 result: BulkWriteResult = self._db[collection_name].bulk_write(operation_list) 

623 self._after_save( 

624 result=result, objs=[obj], collection_name=collection_name, now=now 

625 ) 

626 return self._db_response(result=result) 

627 

628 def find_one( 

629 self, 

630 Model: Type[Model], 

631 query: QueryOperator = None, 

632 raw_query: dict = None, 

633 sort: SortOperator = None, 

634 raw_sort: dict = None, 

635 populate: bool = False, 

636 pipeline: list = None, 

637 populate_db_fields: list[DbField] | None = None, 

638 as_dict: bool = False, 

639 tz_info: timezone = None, 

640 ) -> Model: 

641 """ 

642 Find a single document in the database. 

643 

644 Args: 

645 Model (DbModel): The database model class. 

646 query (QueryOperator, optional): The query operator. Defaults to None. 

647 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

648 sort (SortOperator, optional): The sort operator. Defaults to None. 

649 raw_sort (dict, optional): The raw sort dictionary. Defaults to None. 

650 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False. 

651 as_dict (bool, optional): Flag to return the result as a dictionary. Defaults to False. 

652 tz_info (timezone, optional): The timezone information. Defaults to None. 

653 

654 Returns: 

655 DbModel: The found database model object. 

656 """ 

657 pipeline, _, _ = self._aggregate_pipeline( 

658 Model=Model, 

659 query=query, 

660 raw_query=raw_query, 

661 sort=sort, 

662 raw_sort=raw_sort, 

663 populate=populate, 

664 pipeline=pipeline, 

665 populate_db_fields=populate_db_fields, 

666 paginate=False, 

667 current_page=1, 

668 docs_per_page=1, 

669 is_find_one=True, 

670 ) 

671 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

672 if as_dict: 

673 result = list(cursor) 

674 else: 

675 result = [Model(**doc) for doc in cursor] 

676 try: 

677 return result[0] 

678 except IndexError: 

679 return None 

680 

681 def find_many( 

682 self, 

683 Model: Type[Model], 

684 query: QueryOperator = None, 

685 raw_query: dict = None, 

686 sort: SortOperator = None, 

687 raw_sort: dict = None, 

688 populate: bool = False, 

689 pipeline: list = None, 

690 populate_db_fields: list[DbField] | None = None, 

691 as_dict: bool = False, 

692 tz_info: timezone = None, 

693 paginate: bool = False, 

694 current_page: int = 1, 

695 docs_per_page: int = 1000, 

696 ) -> Union[list[Model], ResponsePaginate]: 

697 """ 

698 Find multiple documents in the database. 

699 

700 Args: 

701 Model (DbModel): The database model class. 

702 query (QueryOperator, optional): The query operator. Defaults to None. 

703 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

704 sort (SortOperator, optional): The sort operator. Defaults to None. 

705 raw_sort (dict, optional): The raw sort dictionary. Defaults to None. 

706 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False. 

707 as_dict (bool, optional): Flag to return the results as dictionaries. Defaults to False. 

708 tz_info (timezone, optional): The timezone information. Defaults to None. 

709 paginate (bool, optional): Flag to enable pagination. Defaults to False. 

710 current_page (int, optional): The current page number. Defaults to 1. 

711 docs_per_page (int, optional): The number of documents per page. Defaults to 1000. 

712 

713 Returns: 

714 list[DbModel] or ResponsePaginate: The list of found database model objects or a paginated response. 

715 """ 

716 pipeline, query, _ = self._aggregate_pipeline( 

717 Model=Model, 

718 query=query, 

719 raw_query=raw_query, 

720 sort=sort, 

721 raw_sort=raw_sort, 

722 populate=populate, 

723 pipeline=pipeline, 

724 populate_db_fields=populate_db_fields, 

725 paginate=paginate, 

726 current_page=current_page, 

727 docs_per_page=docs_per_page, 

728 is_find_one=False, 

729 ) 

730 

731 def _result(): 

732 cursor = self._aggregate_cursor( 

733 Model=Model, pipeline=pipeline, tz_info=tz_info 

734 ) 

735 if as_dict: 

736 result = list(cursor) 

737 else: 

738 result = [Model(**doc) for doc in cursor] 

739 return result 

740 

741 if not paginate: 

742 return _result() 

743 

744 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

745 

746 def _count(): 

747 kwargs = {"hint": "_id_"} if not query else {} 

748 return self._db[Model._collection].count_documents(filter=query, **kwargs) 

749 

750 with ThreadPoolExecutor() as executor: 

751 future_result = executor.submit(_result) 

752 future_count = executor.submit(_count) 

753 result = future_result.result() 

754 count = future_count.result() 

755 

756 page_quantity = ceil(count / docs_per_page) 

757 return ResponsePaginate( 

758 current_page=current_page, 

759 page_quantity=page_quantity, 

760 docs_quantity=count, 

761 docs=result, 

762 ) 

763 

764 def delete( 

765 self, 

766 Model: Type[Model], 

767 query: QueryOperator = None, 

768 raw_query: dict = None, 

769 delete_one: bool = False, 

770 ) -> DbResponse: 

771 """ 

772 Delete documents from the database. 

773 

774 Args: 

775 Model (DbModel): The database model class. 

776 query (QueryOperator, optional): The query operator. Defaults to None. 

777 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

778 delete_one (bool, optional): Flag to delete a single document. Defaults to False. 

779 

780 Returns: 

781 DbResponse: The database response object. 

782 """ 

783 operations = self._create_delete_operations_list( 

784 query=query, raw_query=raw_query, delete_one=delete_one 

785 ) 

786 collection_name = Model._collection 

787 result: BulkWriteResult = self._db[collection_name].bulk_write(operations) 

788 return self._db_response(result=result)