Coverage for pyodmongo/engines/engines.py: 100%

201 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 15:48 +0000

1from motor.motor_asyncio import AsyncIOMotorClient 

2from pymongo import MongoClient, UpdateMany, DeleteOne, DeleteMany 

3from pymongo.results import BulkWriteResult 

4from datetime import datetime, timezone 

5from bson import ObjectId 

6from bson.codec_options import CodecOptions 

7from ..models.db_model import DbModel 

8from ..models.id_model import Id 

9from ..models.responses import DbResponse 

10from ..models.query_operators import QueryOperator 

11from ..models.sort_operators import SortOperator 

12from ..models.paginate import ResponsePaginate 

13from ..models.db_field_info import DbField 

14from typing import TypeVar, Type, Union 

15from concurrent.futures import ThreadPoolExecutor 

16from .utils import consolidate_dict, mount_base_pipeline 

17from ..services.verify_subclasses import is_subclass 

18from asyncio import gather 

19from math import ceil 

20 

21 

22Model = TypeVar("Model", bound=DbModel) 

23 

24 

25class _Engine: 

26 """ 

27 Base class for database operations, providing common functionality for both synchronous and asynchronous engines. 

28 

29 Attributes: 

30 _client (MongoClient): The MongoDB client. 

31 _db (Database): The database instance. 

32 _tz_info (timezone): The timezone information. 

33 """ 

34 

35 def __init__(self, Client, mongo_uri, db_name, tz_info: timezone = None): 

36 """ 

37 Initialize the database engine. 

38 

39 Args: 

40 Client (type): The MongoDB client class. 

41 mongo_uri (str): The MongoDB URI. 

42 db_name (str): The database name. 

43 tz_info (timezone, optional): The timezone information. Defaults to None. 

44 """ 

45 self._client = Client(mongo_uri) 

46 self._db = self._client[db_name] 

47 self._tz_info = tz_info 

48 

49 def _query(self, query: QueryOperator, raw_query: dict) -> dict: 

50 """ 

51 Construct a query dictionary from a QueryOperator or a raw query. 

52 

53 Args: 

54 query (QueryOperator): The query operator. 

55 raw_query (dict): The raw query dictionary. 

56 

57 Returns: 

58 dict: The constructed query dictionary. 

59 """ 

60 if not is_subclass(class_to_verify=query.__class__, subclass=QueryOperator): 

61 raise TypeError( 

62 'query argument must be a valid query operator from pyodmongo.queries. If you really need to make a very specific query, use "raw_query" argument' 

63 ) 

64 raw_query = {} if not raw_query else raw_query 

65 return query.to_dict() if query else raw_query 

66 

67 def _sort(self, sort: QueryOperator, raw_sort: dict) -> dict: 

68 """ 

69 Construct a sort dictionary from a SortOperator or a raw sort. 

70 

71 Args: 

72 sort (QueryOperator): The sort operator. 

73 raw_sort (dict): The raw sort dictionary. 

74 

75 Returns: 

76 dict: The constructed sort dictionary. 

77 """ 

78 if sort and (type(sort) != SortOperator): 

79 raise TypeError( 

80 'sort argument must be a SortOperator from pyodmongo.queries. If you really need to make a very specific sort, use "raw_sort" argument' 

81 ) 

82 raw_sort = {} if not raw_sort else raw_sort 

83 return sort.to_dict() if sort else raw_sort 

84 

85 def _set_tz_info(self, tz_info: timezone): 

86 """ 

87 Set the timezone information. 

88 

89 Args: 

90 tz_info (timezone): The timezone information. 

91 

92 Returns: 

93 timezone: The set timezone information. 

94 """ 

95 return tz_info if tz_info else self._tz_info 

96 

97 def _update_many_operation( 

98 self, obj: Type[Model], query_dict: dict, now, upsert: bool, populate: bool 

99 ): 

100 """ 

101 Create an UpdateMany operation for bulk updates. 

102 

103 Args: 

104 obj (DbModel): The database model object. 

105 query_dict (dict): The query dictionary. 

106 now (datetime): The current datetime. 

107 

108 Returns: 

109 UpdateMany: The UpdateMany operation. 

110 """ 

111 dct = consolidate_dict(obj=obj, dct={}, populate=populate) 

112 find_filter = query_dict or {"_id": ObjectId(dct.get("_id"))} 

113 dct[obj.__class__.updated_at.field_alias] = now 

114 dct.pop("_id") 

115 dct.pop(obj.__class__.created_at.field_alias) 

116 to_save = { 

117 "$set": dct, 

118 "$setOnInsert": {obj.__class__.created_at.field_alias: now}, 

119 } 

120 return UpdateMany(filter=find_filter, update=to_save, upsert=upsert) 

121 

122 def _create_delete_operations_list( 

123 self, query: QueryOperator, raw_query: dict, delete_one: bool 

124 ): 

125 """ 

126 Create a list of delete operations. 

127 

128 Args: 

129 query (QueryOperator): The query operator. 

130 raw_query (dict): The raw query dictionary. 

131 delete_one (bool): Flag to indicate whether to delete one or many documents. 

132 

133 Returns: 

134 list: The list of delete operations. 

135 """ 

136 query = self._query(query=query, raw_query=raw_query) 

137 if delete_one: 

138 return [DeleteOne(filter=query)] 

139 return [DeleteMany(filter=query)] 

140 

141 def _create_save_operations_list( 

142 self, 

143 objs: list[Type[Model]], 

144 query: QueryOperator, 

145 raw_query: dict, 

146 upsert: bool, 

147 populate: bool, 

148 ): 

149 """ 

150 Create lists of indexes and save operations for bulk writes. 

151 

152 Args: 

153 objs (list[DbModel]): The list of database model objects. 

154 query (QueryOperator): The query operator. 

155 raw_query (dict): The raw query dictionary. 

156 

157 Returns: 

158 tuple: A tuple containing indexes, operations, and the current datetime. 

159 """ 

160 operations = {} 

161 indexes = {} 

162 query = self._query(query=query, raw_query=raw_query) 

163 now = datetime.now(self._tz_info) 

164 now = now.replace(microsecond=int(now.microsecond / 1000) * 1000) 

165 for obj in objs: 

166 obj: Model 

167 operation = self._update_many_operation( 

168 obj=obj, query_dict=query, now=now, upsert=upsert, populate=populate 

169 ) 

170 collection_name = obj._collection 

171 try: 

172 operations[collection_name] += [operation] 

173 except KeyError: 

174 operations[collection_name] = [operation] 

175 

176 try: 

177 obj_indexes = obj._indexes 

178 except AttributeError: 

179 obj_indexes = obj._init_indexes 

180 indexes[collection_name] = obj_indexes 

181 return indexes, operations, now 

182 

183 def _after_save( 

184 self, result: BulkWriteResult, objs: list[Model], collection_name: str, now 

185 ): 

186 """ 

187 Perform post-save operations. 

188 

189 Args: 

190 result (BulkWriteResult): The bulk write result. 

191 objs (list[DbModel]): The list of database model objects. 

192 collection_name (str): The name of the collection. 

193 now (datetime): The current datetime. 

194 """ 

195 objs_from_collection = list( 

196 filter(lambda x: x._collection == collection_name, objs) 

197 ) 

198 for index, obj_id in result.upserted_ids.items(): 

199 objs_from_collection[index].id = Id(obj_id) 

200 objs_from_collection[index].created_at = now 

201 objs_from_collection[index].updated_at = now 

202 

203 def _db_response(self, result: BulkWriteResult): 

204 """ 

205 Create a database response object from a bulk write result. 

206 

207 Args: 

208 result (BulkWriteResult): The bulk write result. 

209 

210 Returns: 

211 DbResponse: The database response object. 

212 """ 

213 return DbResponse( 

214 acknowledged=result.acknowledged, 

215 deleted_count=result.deleted_count, 

216 inserted_count=result.inserted_count, 

217 matched_count=result.matched_count, 

218 modified_count=result.modified_count, 

219 upserted_count=result.upserted_count, 

220 upserted_ids=result.upserted_ids, 

221 ) 

222 

223 def _aggregate_cursor( 

224 self, 

225 Model: Type[Model], 

226 pipeline, 

227 tz_info: timezone, 

228 ): 

229 """ 

230 Create an aggregation cursor with the specified pipeline and timezone information. 

231 

232 Args: 

233 Model (DbModel): The database model class. 

234 pipeline (list): The aggregation pipeline. 

235 tz_info (timezone): The timezone information. 

236 

237 Returns: 

238 CommandCursor: The aggregation cursor. 

239 """ 

240 tz_info = self._set_tz_info(tz_info=tz_info) 

241 tz_aware = True if tz_info else False 

242 collection = self._db[Model._collection].with_options( 

243 codec_options=CodecOptions(tz_aware=tz_aware, tzinfo=tz_info) 

244 ) 

245 return collection.aggregate(pipeline) 

246 

247 def _aggregate_pipeline( 

248 self, 

249 Model: Type[Model], 

250 query: QueryOperator, 

251 raw_query: dict, 

252 sort: SortOperator, 

253 raw_sort: dict, 

254 populate: bool, 

255 pipeline: list | None, 

256 populate_db_fields: list[DbField] | None, 

257 paginate: int, 

258 current_page: int, 

259 docs_per_page: int, 

260 no_paginate_limit: int | None, 

261 ) -> dict: 

262 """ 

263 Construct an aggregation pipeline. 

264 

265 Args: 

266 Model (DbModel): The database model class. 

267 query (QueryOperator): The query operator. 

268 raw_query (dict): The raw query dictionary. 

269 sort (SortOperator): The sort operator. 

270 raw_sort (dict): The raw sort dictionary. 

271 populate (bool): Flag to indicate whether to populate related documents. 

272 

273 Returns: 

274 tuple: A tuple containing the pipeline, query, and sort dictionaries. 

275 """ 

276 query = self._query(query=query, raw_query=raw_query) 

277 sort = self._sort(sort=sort, raw_sort=raw_sort) 

278 return ( 

279 mount_base_pipeline( 

280 Model=Model, 

281 query=query, 

282 sort=sort, 

283 populate=populate, 

284 pipeline=pipeline, 

285 populate_db_fields=populate_db_fields, 

286 paginate=paginate, 

287 current_page=current_page, 

288 docs_per_page=docs_per_page, 

289 no_paginate_limit=no_paginate_limit, 

290 ), 

291 query, 

292 sort, 

293 ) 

294 

295 

296class AsyncDbEngine(_Engine): 

297 """ 

298 Asynchronous database engine class that extends the base engine to provide asynchronous operations. 

299 """ 

300 

301 def __init__(self, mongo_uri, db_name, tz_info: timezone = None): 

302 """ 

303 Initialize the asynchronous database engine. 

304 

305 Args: 

306 mongo_uri (str): The MongoDB URI. 

307 db_name (str): The database name. 

308 tz_info (timezone, optional): The timezone information. Defaults to None. 

309 """ 

310 super().__init__( 

311 Client=AsyncIOMotorClient, 

312 mongo_uri=mongo_uri, 

313 db_name=db_name, 

314 tz_info=tz_info, 

315 ) 

316 

317 async def save_all( 

318 self, 

319 obj_list: list[Model], 

320 populate: bool = False, 

321 ) -> dict[str, DbResponse]: 

322 """ 

323 Save a list of objects to the database. 

324 

325 Args: 

326 obj_list (list[DbModel]): The list of database model objects. 

327 """ 

328 response = {} 

329 indexes, operations, now = self._create_save_operations_list( 

330 objs=obj_list, query=None, raw_query=None, upsert=True, populate=populate 

331 ) 

332 for collection_name, index_list in indexes.items(): 

333 if index_list: 

334 await self._db[collection_name].create_indexes(index_list) 

335 for collection_name, operation_list in operations.items(): 

336 result: BulkWriteResult = await self._db[collection_name].bulk_write( 

337 operation_list 

338 ) 

339 self._after_save( 

340 result=result, objs=obj_list, collection_name=collection_name, now=now 

341 ) 

342 response[collection_name] = self._db_response(result=result) 

343 return response 

344 

345 async def save( 

346 self, 

347 obj: Model, 

348 query: QueryOperator = None, 

349 raw_query: dict = None, 

350 upsert: bool = True, 

351 populate: bool = False, 

352 ) -> DbResponse: 

353 """ 

354 Save a single object to the database. 

355 

356 Args: 

357 obj (DbModel): The database model object. 

358 query (QueryOperator, optional): The query operator. Defaults to None. 

359 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

360 

361 Returns: 

362 DbResponse: The database response object. 

363 """ 

364 indexes, operations, now = self._create_save_operations_list( 

365 objs=[obj], 

366 query=query, 

367 raw_query=raw_query, 

368 upsert=upsert, 

369 populate=populate, 

370 ) 

371 collection_name = obj._collection 

372 index_list = indexes[collection_name] 

373 if index_list: 

374 await self._db[collection_name].create_indexes(index_list) 

375 operation_list = operations[collection_name] 

376 result: BulkWriteResult = await self._db[collection_name].bulk_write( 

377 operation_list 

378 ) 

379 self._after_save( 

380 result=result, objs=[obj], collection_name=collection_name, now=now 

381 ) 

382 return self._db_response(result=result) 

383 

384 async def find_one( 

385 self, 

386 Model: Type[Model], 

387 query: QueryOperator = None, 

388 raw_query: dict = None, 

389 sort: SortOperator = None, 

390 raw_sort: dict = None, 

391 populate: bool = False, 

392 pipeline: list = None, 

393 populate_db_fields: list[DbField] | None = None, 

394 as_dict: bool = False, 

395 tz_info: timezone = None, 

396 ) -> Model: 

397 """ 

398 Asynchronously finds a single document in the database that matches the query criteria. 

399 

400 This method constructs and executes an aggregation pipeline to find one document. 

401 It supports complex queries, sorting, and populating referenced fields. 

402 

403 Args: 

404 Model: The DbModel class to perform the query on. 

405 query: A QueryOperator for filtering documents. Defaults to None. 

406 raw_query: A raw dictionary for MongoDB query. Defaults to None. 

407 sort: A SortOperator for sorting the result. Defaults to None. 

408 raw_sort: A raw dictionary for MongoDB sorting. Defaults to None. 

409 populate: If True, populates referenced documents. Defaults to False. 

410 pipeline: A custom aggregation pipeline to use. Defaults to None. 

411 populate_db_fields: A list of specific DbFields to populate. Defaults to None. 

412 as_dict: If True, returns the document as a dictionary. Defaults to False. 

413 tz_info: Timezone information for decoding datetime objects. Defaults to None. 

414 

415 Returns: 

416 The matched document as a model instance or a dictionary, or None if no 

417 document is found. 

418 """ 

419 pipeline, _, _ = self._aggregate_pipeline( 

420 Model=Model, 

421 query=query, 

422 raw_query=raw_query, 

423 sort=sort, 

424 raw_sort=raw_sort, 

425 populate=populate, 

426 pipeline=pipeline, 

427 populate_db_fields=populate_db_fields, 

428 paginate=False, 

429 current_page=1, 

430 docs_per_page=1, 

431 no_paginate_limit=1, 

432 ) 

433 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

434 if as_dict: 

435 result = await cursor.to_list(length=None) 

436 else: 

437 result = [Model(**doc) async for doc in cursor] 

438 try: 

439 return result[0] 

440 except IndexError: 

441 return None 

442 

443 async def find_many( 

444 self, 

445 Model: Type[Model], 

446 query: QueryOperator = None, 

447 raw_query: dict = None, 

448 sort: SortOperator = None, 

449 raw_sort: dict = None, 

450 populate: bool = False, 

451 pipeline: list = None, 

452 populate_db_fields: list[DbField] | None = None, 

453 as_dict: bool = False, 

454 tz_info: timezone = None, 

455 paginate: bool = False, 

456 current_page: int = 1, 

457 docs_per_page: int = 1000, 

458 no_paginate_limit: int | None = None, 

459 ) -> Union[list[Model], ResponsePaginate]: 

460 """ 

461 Asynchronously finds multiple documents in the database that match the query criteria. 

462 

463 This method supports filtering, sorting, populating referenced fields, and pagination. 

464 If pagination is enabled, it returns a paginated response object. 

465 

466 Args: 

467 Model: The DbModel class to perform the query on. 

468 query: A QueryOperator for filtering documents. Defaults to None. 

469 raw_query: A raw dictionary for MongoDB query. Defaults to None. 

470 sort: A SortOperator for sorting the results. Defaults to None. 

471 raw_sort: A raw dictionary for MongoDB sorting. Defaults to None. 

472 populate: If True, populates referenced documents. Defaults to False. 

473 pipeline: A custom aggregation pipeline to use. Defaults to None. 

474 populate_db_fields: A list of specific DbFields to populate. Defaults to None. 

475 as_dict: If True, returns documents as dictionaries. Defaults to False. 

476 tz_info: Timezone information for decoding datetime objects. Defaults to None. 

477 paginate: If True, enables pagination. Defaults to False. 

478 current_page: The page number to retrieve. Defaults to 1. 

479 docs_per_page: The number of documents per page. Defaults to 1000. 

480 no_paginate_limit: The maximum number of documents to return when pagination is disabled. Defaults to None. 

481 

482 Returns: 

483 A list of documents or a ResponsePaginate object with the paginated results. 

484 """ 

485 pipeline, query, _ = self._aggregate_pipeline( 

486 Model=Model, 

487 query=query, 

488 raw_query=raw_query, 

489 sort=sort, 

490 raw_sort=raw_sort, 

491 populate=populate, 

492 pipeline=pipeline, 

493 populate_db_fields=populate_db_fields, 

494 paginate=paginate, 

495 current_page=current_page, 

496 docs_per_page=docs_per_page, 

497 no_paginate_limit=no_paginate_limit, 

498 ) 

499 

500 async def _result(): 

501 cursor = self._aggregate_cursor( 

502 Model=Model, pipeline=pipeline, tz_info=tz_info 

503 ) 

504 if as_dict: 

505 result = await cursor.to_list(length=None) 

506 else: 

507 result = [Model(**doc) async for doc in cursor] 

508 return result 

509 

510 if not paginate: 

511 return await _result() 

512 

513 async def _count(): 

514 kwargs = {"hint": "_id_"} if not query else {} 

515 return await self._db[Model._collection].count_documents( 

516 filter=query, **kwargs 

517 ) 

518 

519 result, count = await gather(_result(), _count()) 

520 page_quantity = ceil(count / docs_per_page) 

521 return ResponsePaginate( 

522 current_page=current_page, 

523 page_quantity=page_quantity, 

524 docs_quantity=count, 

525 docs=result, 

526 ) 

527 

528 async def delete( 

529 self, 

530 Model: Type[Model], 

531 query: QueryOperator = None, 

532 raw_query: dict = None, 

533 delete_one: bool = False, 

534 ) -> DbResponse: 

535 """ 

536 Delete documents from the database. 

537 

538 Args: 

539 Model (DbModel): The database model class. 

540 query (QueryOperator, optional): The query operator. Defaults to None. 

541 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

542 delete_one (bool, optional): Flag to delete a single document. Defaults to False. 

543 

544 Returns: 

545 DbResponse: The database response object. 

546 """ 

547 operations = self._create_delete_operations_list( 

548 query=query, raw_query=raw_query, delete_one=delete_one 

549 ) 

550 collection_name = Model._collection 

551 result: BulkWriteResult = await self._db[collection_name].bulk_write(operations) 

552 return self._db_response(result=result) 

553 

554 

555class DbEngine(_Engine): 

556 """ 

557 Synchronous database engine class that extends the base engine to provide synchronous operations. 

558 """ 

559 

560 def __init__(self, mongo_uri, db_name, tz_info: timezone = None): 

561 """ 

562 Initialize the synchronous database engine. 

563 

564 Args: 

565 mongo_uri (str): The MongoDB URI. 

566 db_name (str): The database name. 

567 tz_info (timezone, optional): The timezone information. Defaults to None. 

568 """ 

569 super().__init__( 

570 Client=MongoClient, 

571 mongo_uri=mongo_uri, 

572 db_name=db_name, 

573 tz_info=tz_info, 

574 ) 

575 

576 def save_all( 

577 self, 

578 obj_list: list[Model], 

579 populate: bool = False, 

580 ) -> dict[str, DbResponse]: 

581 """ 

582 Save a list of objects to the database. 

583 

584 Args: 

585 obj_list (list[DbModel]): The list of database model objects. 

586 """ 

587 response = {} 

588 indexes, operations, now = self._create_save_operations_list( 

589 objs=obj_list, query=None, raw_query=None, upsert=True, populate=populate 

590 ) 

591 for collection_name, index_list in indexes.items(): 

592 if index_list: 

593 self._db[collection_name].create_indexes(index_list) 

594 for collection_name, operation_list in operations.items(): 

595 result: BulkWriteResult = self._db[collection_name].bulk_write( 

596 operation_list 

597 ) 

598 self._after_save( 

599 result=result, objs=obj_list, collection_name=collection_name, now=now 

600 ) 

601 response[collection_name] = self._db_response(result=result) 

602 return response 

603 

604 def save( 

605 self, 

606 obj: Model, 

607 query: QueryOperator = None, 

608 raw_query: dict = None, 

609 upsert: bool = True, 

610 populate: bool = False, 

611 ) -> DbResponse: 

612 """ 

613 Save a single object to the database. 

614 

615 Args: 

616 obj (DbModel): The database model object. 

617 query (QueryOperator, optional): The query operator. Defaults to None. 

618 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

619 

620 Returns: 

621 DbResponse: The database response object. 

622 """ 

623 indexes, operations, now = self._create_save_operations_list( 

624 objs=[obj], 

625 query=query, 

626 raw_query=raw_query, 

627 upsert=upsert, 

628 populate=populate, 

629 ) 

630 collection_name = obj._collection 

631 index_list = indexes[collection_name] 

632 if index_list: 

633 self._db[collection_name].create_indexes(index_list) 

634 operation_list = operations[collection_name] 

635 result: BulkWriteResult = self._db[collection_name].bulk_write(operation_list) 

636 self._after_save( 

637 result=result, objs=[obj], collection_name=collection_name, now=now 

638 ) 

639 return self._db_response(result=result) 

640 

641 def find_one( 

642 self, 

643 Model: Type[Model], 

644 query: QueryOperator = None, 

645 raw_query: dict = None, 

646 sort: SortOperator = None, 

647 raw_sort: dict = None, 

648 populate: bool = False, 

649 pipeline: list = None, 

650 populate_db_fields: list[DbField] | None = None, 

651 as_dict: bool = False, 

652 tz_info: timezone = None, 

653 ) -> Model: 

654 """ 

655 Synchronously finds a single document in the database that matches the query criteria. 

656 

657 This method constructs and executes an aggregation pipeline to find one document. 

658 It supports complex queries, sorting, and populating referenced fields. 

659 

660 Args: 

661 Model: The DbModel class to perform the query on. 

662 query: A QueryOperator for filtering documents. Defaults to None. 

663 raw_query: A raw dictionary for MongoDB query. Defaults to None. 

664 sort: A SortOperator for sorting the result. Defaults to None. 

665 raw_sort: A raw dictionary for MongoDB sorting. Defaults to None. 

666 populate: If True, populates referenced documents. Defaults to False. 

667 pipeline: A custom aggregation pipeline to use. Defaults to None. 

668 populate_db_fields: A list of specific DbFields to populate. Defaults to None. 

669 as_dict: If True, returns the document as a dictionary. Defaults to False. 

670 tz_info: Timezone information for decoding datetime objects. Defaults to None. 

671 

672 Returns: 

673 The matched document as a model instance or a dictionary, or None if no 

674 document is found. 

675 """ 

676 pipeline, _, _ = self._aggregate_pipeline( 

677 Model=Model, 

678 query=query, 

679 raw_query=raw_query, 

680 sort=sort, 

681 raw_sort=raw_sort, 

682 populate=populate, 

683 pipeline=pipeline, 

684 populate_db_fields=populate_db_fields, 

685 paginate=False, 

686 current_page=1, 

687 docs_per_page=1, 

688 no_paginate_limit=1, 

689 ) 

690 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

691 if as_dict: 

692 result = list(cursor) 

693 else: 

694 result = [Model(**doc) for doc in cursor] 

695 try: 

696 return result[0] 

697 except IndexError: 

698 return None 

699 

700 def find_many( 

701 self, 

702 Model: Type[Model], 

703 query: QueryOperator = None, 

704 raw_query: dict = None, 

705 sort: SortOperator = None, 

706 raw_sort: dict = None, 

707 populate: bool = False, 

708 pipeline: list = None, 

709 populate_db_fields: list[DbField] | None = None, 

710 as_dict: bool = False, 

711 tz_info: timezone = None, 

712 paginate: bool = False, 

713 current_page: int = 1, 

714 docs_per_page: int = 1000, 

715 no_paginate_limit: int | None = None, 

716 ) -> Union[list[Model], ResponsePaginate]: 

717 """ 

718 Synchronously finds multiple documents in the database that match the query criteria. 

719 

720 This method supports filtering, sorting, populating referenced fields, and pagination. 

721 If pagination is enabled, it returns a paginated response object. 

722 

723 Args: 

724 Model: The DbModel class to perform the query on. 

725 query: A QueryOperator for filtering documents. Defaults to None. 

726 raw_query: A raw dictionary for MongoDB query. Defaults to None. 

727 sort: A SortOperator for sorting the results. Defaults to None. 

728 raw_sort: A raw dictionary for MongoDB sorting. Defaults to None. 

729 populate: If True, populates referenced documents. Defaults to False. 

730 pipeline: A custom aggregation pipeline to use. Defaults to None. 

731 populate_db_fields: A list of specific DbFields to populate. Defaults to None. 

732 as_dict: If True, returns documents as dictionaries. Defaults to False. 

733 tz_info: Timezone information for decoding datetime objects. Defaults to None. 

734 paginate: If True, enables pagination. Defaults to False. 

735 current_page: The page number to retrieve. Defaults to 1. 

736 docs_per_page: The number of documents per page. Defaults to 1000. 

737 no_paginate_limit: The maximum number of documents to return when pagination is disabled. Defaults to None. 

738 

739 Returns: 

740 A list of documents or a ResponsePaginate object with the paginated results. 

741 """ 

742 pipeline, query, _ = self._aggregate_pipeline( 

743 Model=Model, 

744 query=query, 

745 raw_query=raw_query, 

746 sort=sort, 

747 raw_sort=raw_sort, 

748 populate=populate, 

749 pipeline=pipeline, 

750 populate_db_fields=populate_db_fields, 

751 paginate=paginate, 

752 current_page=current_page, 

753 docs_per_page=docs_per_page, 

754 no_paginate_limit=no_paginate_limit, 

755 ) 

756 

757 def _result(): 

758 cursor = self._aggregate_cursor( 

759 Model=Model, pipeline=pipeline, tz_info=tz_info 

760 ) 

761 if as_dict: 

762 result = list(cursor) 

763 else: 

764 result = [Model(**doc) for doc in cursor] 

765 return result 

766 

767 if not paginate: 

768 return _result() 

769 

770 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

771 

772 def _count(): 

773 kwargs = {"hint": "_id_"} if not query else {} 

774 return self._db[Model._collection].count_documents(filter=query, **kwargs) 

775 

776 with ThreadPoolExecutor() as executor: 

777 future_result = executor.submit(_result) 

778 future_count = executor.submit(_count) 

779 result = future_result.result() 

780 count = future_count.result() 

781 

782 page_quantity = ceil(count / docs_per_page) 

783 return ResponsePaginate( 

784 current_page=current_page, 

785 page_quantity=page_quantity, 

786 docs_quantity=count, 

787 docs=result, 

788 ) 

789 

790 def delete( 

791 self, 

792 Model: Type[Model], 

793 query: QueryOperator = None, 

794 raw_query: dict = None, 

795 delete_one: bool = False, 

796 ) -> DbResponse: 

797 """ 

798 Delete documents from the database. 

799 

800 Args: 

801 Model (DbModel): The database model class. 

802 query (QueryOperator, optional): The query operator. Defaults to None. 

803 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

804 delete_one (bool, optional): Flag to delete a single document. Defaults to False. 

805 

806 Returns: 

807 DbResponse: The database response object. 

808 """ 

809 operations = self._create_delete_operations_list( 

810 query=query, raw_query=raw_query, delete_one=delete_one 

811 ) 

812 collection_name = Model._collection 

813 result: BulkWriteResult = self._db[collection_name].bulk_write(operations) 

814 return self._db_response(result=result)