Coverage for pyodmongo/engines/engines.py: 100%

214 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-16 15:08 +0000

1from motor.motor_asyncio import AsyncIOMotorClient 

2from pymongo import MongoClient, UpdateMany, DeleteOne, DeleteMany 

3from pymongo.results import BulkWriteResult 

4from datetime import datetime, timezone 

5from bson import ObjectId 

6from bson.codec_options import CodecOptions 

7from ..models.db_model import DbModel 

8from ..models.id_model import Id 

9from ..models.responses import DbResponse 

10from ..models.query_operators import QueryOperator 

11from ..models.sort_operators import SortOperator 

12from ..models.paginate import ResponsePaginate 

13from ..models.db_field_info import DbField 

14from typing import TypeVar, Type, Union 

15from concurrent.futures import ThreadPoolExecutor 

16from .utils import consolidate_dict, mount_base_pipeline 

17from ..services.verify_subclasses import is_subclass 

18from asyncio import gather 

19from math import ceil 

20 

21 

22Model = TypeVar("Model", bound=DbModel) 

23 

24 

25class _Engine: 

26 """ 

27 Base class for database operations, providing common functionality for both synchronous and asynchronous engines. 

28 

29 Attributes: 

30 _client (MongoClient): The MongoDB client. 

31 _db (Database): The database instance. 

32 _tz_info (timezone): The timezone information. 

33 """ 

34 

35 def __init__(self, Client, mongo_uri, db_name, tz_info: timezone = None): 

36 """ 

37 Initialize the database engine. 

38 

39 Args: 

40 Client (type): The MongoDB client class. 

41 mongo_uri (str): The MongoDB URI. 

42 db_name (str): The database name. 

43 tz_info (timezone, optional): The timezone information. Defaults to None. 

44 """ 

45 self._client = Client(mongo_uri) 

46 self._db = self._client[db_name] 

47 self._tz_info = tz_info 

48 

49 def _query(self, query: QueryOperator, raw_query: dict) -> dict: 

50 """ 

51 Construct a query dictionary from a QueryOperator or a raw query. 

52 

53 Args: 

54 query (QueryOperator): The query operator. 

55 raw_query (dict): The raw query dictionary. 

56 

57 Returns: 

58 dict: The constructed query dictionary. 

59 """ 

60 if not is_subclass(class_to_verify=query.__class__, subclass=QueryOperator): 

61 raise TypeError( 

62 'query argument must be a valid query operator from pyodmongo.queries. If you really need to make a very specific query, use "raw_query" argument' 

63 ) 

64 raw_query = {} if not raw_query else raw_query 

65 return query.to_dict() if query else raw_query 

66 

67 def _sort(self, sort: QueryOperator, raw_sort: dict) -> dict: 

68 """ 

69 Construct a sort dictionary from a SortOperator or a raw sort. 

70 

71 Args: 

72 sort (QueryOperator): The sort operator. 

73 raw_sort (dict): The raw sort dictionary. 

74 

75 Returns: 

76 dict: The constructed sort dictionary. 

77 """ 

78 if sort and (type(sort) != SortOperator): 

79 raise TypeError( 

80 'sort argument must be a SortOperator from pyodmongo.queries. If you really need to make a very specific sort, use "raw_sort" argument' 

81 ) 

82 raw_sort = {} if not raw_sort else raw_sort 

83 return sort.to_dict() if sort else raw_sort 

84 

85 def _set_tz_info(self, tz_info: timezone): 

86 """ 

87 Set the timezone information. 

88 

89 Args: 

90 tz_info (timezone): The timezone information. 

91 

92 Returns: 

93 timezone: The set timezone information. 

94 """ 

95 return tz_info if tz_info else self._tz_info 

96 

97 def _update_many_operation( 

98 self, obj: Type[Model], query_dict: dict, now, upsert: bool 

99 ): 

100 """ 

101 Create an UpdateMany operation for bulk updates. 

102 

103 Args: 

104 obj (DbModel): The database model object. 

105 query_dict (dict): The query dictionary. 

106 now (datetime): The current datetime. 

107 

108 Returns: 

109 UpdateMany: The UpdateMany operation. 

110 """ 

111 dct = consolidate_dict(obj=obj, dct={}) 

112 find_filter = query_dict or {"_id": ObjectId(dct.get("_id"))} 

113 dct[obj.__class__.updated_at.field_alias] = now 

114 dct.pop("_id") 

115 dct.pop(obj.__class__.created_at.field_alias) 

116 to_save = { 

117 "$set": dct, 

118 "$setOnInsert": {obj.__class__.created_at.field_alias: now}, 

119 } 

120 return UpdateMany(filter=find_filter, update=to_save, upsert=upsert) 

121 

122 def _create_delete_operations_list( 

123 self, query: QueryOperator, raw_query: dict, delete_one: bool 

124 ): 

125 """ 

126 Create a list of delete operations. 

127 

128 Args: 

129 query (QueryOperator): The query operator. 

130 raw_query (dict): The raw query dictionary. 

131 delete_one (bool): Flag to indicate whether to delete one or many documents. 

132 

133 Returns: 

134 list: The list of delete operations. 

135 """ 

136 query = self._query(query=query, raw_query=raw_query) 

137 if delete_one: 

138 return [DeleteOne(filter=query)] 

139 return [DeleteMany(filter=query)] 

140 

141 def _create_save_operations_list( 

142 self, 

143 objs: list[Type[Model]], 

144 query: QueryOperator, 

145 raw_query: dict, 

146 upsert: bool, 

147 ): 

148 """ 

149 Create lists of indexes and save operations for bulk writes. 

150 

151 Args: 

152 objs (list[DbModel]): The list of database model objects. 

153 query (QueryOperator): The query operator. 

154 raw_query (dict): The raw query dictionary. 

155 

156 Returns: 

157 tuple: A tuple containing indexes, operations, and the current datetime. 

158 """ 

159 operations = {} 

160 indexes = {} 

161 query = self._query(query=query, raw_query=raw_query) 

162 now = datetime.now(self._tz_info) 

163 now = now.replace(microsecond=int(now.microsecond / 1000) * 1000) 

164 for obj in objs: 

165 obj: Model 

166 operation = self._update_many_operation( 

167 obj=obj, query_dict=query, now=now, upsert=upsert 

168 ) 

169 collection_name = obj._collection 

170 try: 

171 operations[collection_name] += [operation] 

172 except KeyError: 

173 operations[collection_name] = [operation] 

174 

175 try: 

176 obj_indexes = obj._indexes 

177 except AttributeError: 

178 obj_indexes = obj._init_indexes 

179 indexes[collection_name] = obj_indexes 

180 return indexes, operations, now 

181 

182 def _after_save( 

183 self, result: BulkWriteResult, objs: list[Model], collection_name: str, now 

184 ): 

185 """ 

186 Perform post-save operations. 

187 

188 Args: 

189 result (BulkWriteResult): The bulk write result. 

190 objs (list[DbModel]): The list of database model objects. 

191 collection_name (str): The name of the collection. 

192 now (datetime): The current datetime. 

193 """ 

194 objs_from_collection = list( 

195 filter(lambda x: x._collection == collection_name, objs) 

196 ) 

197 for index, obj_id in result.upserted_ids.items(): 

198 objs_from_collection[index].id = Id(obj_id) 

199 objs_from_collection[index].created_at = now 

200 objs_from_collection[index].updated_at = now 

201 

202 def _db_response(self, result: BulkWriteResult): 

203 """ 

204 Create a database response object from a bulk write result. 

205 

206 Args: 

207 result (BulkWriteResult): The bulk write result. 

208 

209 Returns: 

210 DbResponse: The database response object. 

211 """ 

212 return DbResponse( 

213 acknowledged=result.acknowledged, 

214 deleted_count=result.deleted_count, 

215 inserted_count=result.inserted_count, 

216 matched_count=result.matched_count, 

217 modified_count=result.modified_count, 

218 upserted_count=result.upserted_count, 

219 upserted_ids=result.upserted_ids, 

220 ) 

221 

222 def _aggregate_cursor( 

223 self, 

224 Model: Type[Model], 

225 pipeline, 

226 tz_info: timezone, 

227 ): 

228 """ 

229 Create an aggregation cursor with the specified pipeline and timezone information. 

230 

231 Args: 

232 Model (DbModel): The database model class. 

233 pipeline (list): The aggregation pipeline. 

234 tz_info (timezone): The timezone information. 

235 

236 Returns: 

237 CommandCursor: The aggregation cursor. 

238 """ 

239 tz_info = self._set_tz_info(tz_info=tz_info) 

240 tz_aware = True if tz_info else False 

241 collection = self._db[Model._collection].with_options( 

242 codec_options=CodecOptions(tz_aware=tz_aware, tzinfo=tz_info) 

243 ) 

244 return collection.aggregate(pipeline) 

245 

246 def _aggregate_pipeline( 

247 self, 

248 Model: Type[Model], 

249 query: QueryOperator, 

250 raw_query: dict, 

251 sort: SortOperator, 

252 raw_sort: dict, 

253 populate: bool, 

254 populate_db_fields: list[DbField] | None, 

255 ) -> dict: 

256 """ 

257 Construct an aggregation pipeline. 

258 

259 Args: 

260 Model (DbModel): The database model class. 

261 query (QueryOperator): The query operator. 

262 raw_query (dict): The raw query dictionary. 

263 sort (SortOperator): The sort operator. 

264 raw_sort (dict): The raw sort dictionary. 

265 populate (bool): Flag to indicate whether to populate related documents. 

266 

267 Returns: 

268 tuple: A tuple containing the pipeline, query, and sort dictionaries. 

269 """ 

270 query = self._query(query=query, raw_query=raw_query) 

271 sort = self._sort(sort=sort, raw_sort=raw_sort) 

272 return ( 

273 mount_base_pipeline( 

274 Model=Model, 

275 query=query, 

276 sort=sort, 

277 populate=populate, 

278 populate_db_fields=populate_db_fields, 

279 ), 

280 query, 

281 sort, 

282 ) 

283 

284 def _add_paginate_to_pipeline( 

285 self, pipeline: list, current_page: int, docs_per_page: int 

286 ): 

287 """ 

288 Add pagination stages to the aggregation pipeline. 

289 

290 Args: 

291 pipeline (list): The aggregation pipeline. 

292 current_page (int): The current page number. 

293 docs_per_page (int): The number of documents per page. 

294 """ 

295 max_docs_per_page = 1000 

296 current_page = 1 if current_page <= 0 else current_page 

297 docs_per_page = ( 

298 max_docs_per_page if docs_per_page > max_docs_per_page else docs_per_page 

299 ) 

300 skip = (docs_per_page * current_page) - docs_per_page 

301 skip_stage = [{"$skip": skip}] 

302 limit_stage = [{"$limit": docs_per_page}] 

303 pipeline += skip_stage + limit_stage 

304 

305 

306class AsyncDbEngine(_Engine): 

307 """ 

308 Asynchronous database engine class that extends the base engine to provide asynchronous operations. 

309 """ 

310 

311 def __init__(self, mongo_uri, db_name, tz_info: timezone = None): 

312 """ 

313 Initialize the asynchronous database engine. 

314 

315 Args: 

316 mongo_uri (str): The MongoDB URI. 

317 db_name (str): The database name. 

318 tz_info (timezone, optional): The timezone information. Defaults to None. 

319 """ 

320 super().__init__( 

321 Client=AsyncIOMotorClient, 

322 mongo_uri=mongo_uri, 

323 db_name=db_name, 

324 tz_info=tz_info, 

325 ) 

326 

327 async def save_all( 

328 self, 

329 obj_list: list[Model], 

330 ) -> dict[str, DbResponse]: 

331 """ 

332 Save a list of objects to the database. 

333 

334 Args: 

335 obj_list (list[DbModel]): The list of database model objects. 

336 """ 

337 response = {} 

338 indexes, operations, now = self._create_save_operations_list( 

339 objs=obj_list, query=None, raw_query=None, upsert=True 

340 ) 

341 for collection_name, index_list in indexes.items(): 

342 if index_list: 

343 await self._db[collection_name].create_indexes(index_list) 

344 for collection_name, operation_list in operations.items(): 

345 result: BulkWriteResult = await self._db[collection_name].bulk_write( 

346 operation_list 

347 ) 

348 self._after_save( 

349 result=result, objs=obj_list, collection_name=collection_name, now=now 

350 ) 

351 response[collection_name] = self._db_response(result=result) 

352 return response 

353 

354 async def save( 

355 self, 

356 obj: Model, 

357 query: QueryOperator = None, 

358 raw_query: dict = None, 

359 upsert: bool = True, 

360 ) -> DbResponse: 

361 """ 

362 Save a single object to the database. 

363 

364 Args: 

365 obj (DbModel): The database model object. 

366 query (QueryOperator, optional): The query operator. Defaults to None. 

367 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

368 

369 Returns: 

370 DbResponse: The database response object. 

371 """ 

372 indexes, operations, now = self._create_save_operations_list( 

373 objs=[obj], query=query, raw_query=raw_query, upsert=upsert 

374 ) 

375 collection_name = obj._collection 

376 index_list = indexes[collection_name] 

377 if index_list: 

378 await self._db[collection_name].create_indexes(index_list) 

379 operation_list = operations[collection_name] 

380 result: BulkWriteResult = await self._db[collection_name].bulk_write( 

381 operation_list 

382 ) 

383 self._after_save( 

384 result=result, objs=[obj], collection_name=collection_name, now=now 

385 ) 

386 return self._db_response(result=result) 

387 

388 async def find_one( 

389 self, 

390 Model: Type[Model], 

391 query: QueryOperator = None, 

392 raw_query: dict = None, 

393 sort: SortOperator = None, 

394 raw_sort: dict = None, 

395 populate: bool = False, 

396 populate_db_fields: list[DbField] | None = None, 

397 as_dict: bool = False, 

398 tz_info: timezone = None, 

399 ) -> Model: 

400 """ 

401 Find a single document in the database. 

402 

403 Args: 

404 Model (DbModel): The database model class. 

405 query (QueryOperator, optional): The query operator. Defaults to None. 

406 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

407 sort (SortOperator, optional): The sort operator. Defaults to None. 

408 raw_sort (dict, optional): The raw sort dictionary. Defaults to None. 

409 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False. 

410 as_dict (bool, optional): Flag to return the result as a dictionary. Defaults to False. 

411 tz_info (timezone, optional): The timezone information. Defaults to None. 

412 

413 Returns: 

414 DbModel: The found database model object. 

415 """ 

416 pipeline, _, _ = self._aggregate_pipeline( 

417 Model=Model, 

418 query=query, 

419 raw_query=raw_query, 

420 sort=sort, 

421 raw_sort=raw_sort, 

422 populate=populate, 

423 populate_db_fields=populate_db_fields, 

424 ) 

425 pipeline += [{"$limit": 1}] 

426 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

427 if as_dict: 

428 result = await cursor.to_list(length=None) 

429 else: 

430 result = [Model(**doc) async for doc in cursor] 

431 try: 

432 return result[0] 

433 except IndexError: 

434 return None 

435 

436 async def find_many( 

437 self, 

438 Model: Type[Model], 

439 query: QueryOperator = None, 

440 raw_query: dict = None, 

441 sort: SortOperator = None, 

442 raw_sort: dict = None, 

443 populate: bool = False, 

444 populate_db_fields: list[DbField] | None = None, 

445 as_dict: bool = False, 

446 tz_info: timezone = None, 

447 paginate: bool = False, 

448 current_page: int = 1, 

449 docs_per_page: int = 1000, 

450 ) -> Union[list[Model], ResponsePaginate]: 

451 """ 

452 Find multiple documents in the database. 

453 

454 Args: 

455 Model (DbModel): The database model class. 

456 query (QueryOperator, optional): The query operator. Defaults to None. 

457 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

458 sort (SortOperator, optional): The sort operator. Defaults to None. 

459 raw_sort (dict, optional): The raw sort dictionary. Defaults to None. 

460 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False. 

461 as_dict (bool, optional): Flag to return the results as dictionaries. Defaults to False. 

462 tz_info (timezone, optional): The timezone information. Defaults to None. 

463 paginate (bool, optional): Flag to enable pagination. Defaults to False. 

464 current_page (int, optional): The current page number. Defaults to 1. 

465 docs_per_page (int, optional): The number of documents per page. Defaults to 1000. 

466 

467 Returns: 

468 list[DbModel] or ResponsePaginate: The list of found database model objects or a paginated response. 

469 """ 

470 pipeline, query, _ = self._aggregate_pipeline( 

471 Model=Model, 

472 query=query, 

473 raw_query=raw_query, 

474 sort=sort, 

475 raw_sort=raw_sort, 

476 populate=populate, 

477 populate_db_fields=populate_db_fields, 

478 ) 

479 

480 async def _result(): 

481 cursor = self._aggregate_cursor( 

482 Model=Model, pipeline=pipeline, tz_info=tz_info 

483 ) 

484 if as_dict: 

485 result = await cursor.to_list(length=None) 

486 else: 

487 result = [Model(**doc) async for doc in cursor] 

488 return result 

489 

490 if not paginate: 

491 return await _result() 

492 self._add_paginate_to_pipeline( 

493 pipeline=pipeline, current_page=current_page, docs_per_page=docs_per_page 

494 ) 

495 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

496 

497 async def _count(): 

498 kwargs = {"hint": "_id_"} if not query else {} 

499 return await self._db[Model._collection].count_documents( 

500 filter=query, **kwargs 

501 ) 

502 

503 result, count = await gather(_result(), _count()) 

504 page_quantity = ceil(count / docs_per_page) 

505 return ResponsePaginate( 

506 current_page=current_page, 

507 page_quantity=page_quantity, 

508 docs_quantity=count, 

509 docs=result, 

510 ) 

511 

512 async def delete( 

513 self, 

514 Model: Type[Model], 

515 query: QueryOperator = None, 

516 raw_query: dict = None, 

517 delete_one: bool = False, 

518 ) -> DbResponse: 

519 """ 

520 Delete documents from the database. 

521 

522 Args: 

523 Model (DbModel): The database model class. 

524 query (QueryOperator, optional): The query operator. Defaults to None. 

525 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

526 delete_one (bool, optional): Flag to delete a single document. Defaults to False. 

527 

528 Returns: 

529 DbResponse: The database response object. 

530 """ 

531 operations = self._create_delete_operations_list( 

532 query=query, raw_query=raw_query, delete_one=delete_one 

533 ) 

534 collection_name = Model._collection 

535 result: BulkWriteResult = await self._db[collection_name].bulk_write(operations) 

536 return self._db_response(result=result) 

537 

538 

539class DbEngine(_Engine): 

540 """ 

541 Synchronous database engine class that extends the base engine to provide synchronous operations. 

542 """ 

543 

544 def __init__(self, mongo_uri, db_name, tz_info: timezone = None): 

545 """ 

546 Initialize the synchronous database engine. 

547 

548 Args: 

549 mongo_uri (str): The MongoDB URI. 

550 db_name (str): The database name. 

551 tz_info (timezone, optional): The timezone information. Defaults to None. 

552 """ 

553 super().__init__( 

554 Client=MongoClient, 

555 mongo_uri=mongo_uri, 

556 db_name=db_name, 

557 tz_info=tz_info, 

558 ) 

559 

560 def save_all( 

561 self, 

562 obj_list: list[Model], 

563 ) -> dict[str, DbResponse]: 

564 """ 

565 Save a list of objects to the database. 

566 

567 Args: 

568 obj_list (list[DbModel]): The list of database model objects. 

569 """ 

570 response = {} 

571 indexes, operations, now = self._create_save_operations_list( 

572 objs=obj_list, query=None, raw_query=None, upsert=True 

573 ) 

574 for collection_name, index_list in indexes.items(): 

575 if index_list: 

576 self._db[collection_name].create_indexes(index_list) 

577 for collection_name, operation_list in operations.items(): 

578 result: BulkWriteResult = self._db[collection_name].bulk_write( 

579 operation_list 

580 ) 

581 self._after_save( 

582 result=result, objs=obj_list, collection_name=collection_name, now=now 

583 ) 

584 response[collection_name] = self._db_response(result=result) 

585 return response 

586 

587 def save( 

588 self, 

589 obj: Model, 

590 query: QueryOperator = None, 

591 raw_query: dict = None, 

592 upsert: bool = True, 

593 ) -> DbResponse: 

594 """ 

595 Save a single object to the database. 

596 

597 Args: 

598 obj (DbModel): The database model object. 

599 query (QueryOperator, optional): The query operator. Defaults to None. 

600 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

601 

602 Returns: 

603 DbResponse: The database response object. 

604 """ 

605 indexes, operations, now = self._create_save_operations_list( 

606 objs=[obj], query=query, raw_query=raw_query, upsert=upsert 

607 ) 

608 collection_name = obj._collection 

609 index_list = indexes[collection_name] 

610 if index_list: 

611 self._db[collection_name].create_indexes(index_list) 

612 operation_list = operations[collection_name] 

613 result: BulkWriteResult = self._db[collection_name].bulk_write(operation_list) 

614 self._after_save( 

615 result=result, objs=[obj], collection_name=collection_name, now=now 

616 ) 

617 return self._db_response(result=result) 

618 

619 def find_one( 

620 self, 

621 Model: Type[Model], 

622 query: QueryOperator = None, 

623 raw_query: dict = None, 

624 sort: SortOperator = None, 

625 raw_sort: dict = None, 

626 populate: bool = False, 

627 populate_db_fields: list[DbField] | None = None, 

628 as_dict: bool = False, 

629 tz_info: timezone = None, 

630 ) -> Model: 

631 """ 

632 Find a single document in the database. 

633 

634 Args: 

635 Model (DbModel): The database model class. 

636 query (QueryOperator, optional): The query operator. Defaults to None. 

637 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

638 sort (SortOperator, optional): The sort operator. Defaults to None. 

639 raw_sort (dict, optional): The raw sort dictionary. Defaults to None. 

640 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False. 

641 as_dict (bool, optional): Flag to return the result as a dictionary. Defaults to False. 

642 tz_info (timezone, optional): The timezone information. Defaults to None. 

643 

644 Returns: 

645 DbModel: The found database model object. 

646 """ 

647 pipeline, _, _ = self._aggregate_pipeline( 

648 Model=Model, 

649 query=query, 

650 raw_query=raw_query, 

651 sort=sort, 

652 raw_sort=raw_sort, 

653 populate=populate, 

654 populate_db_fields=populate_db_fields, 

655 ) 

656 pipeline += [{"$limit": 1}] 

657 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

658 if as_dict: 

659 result = list(cursor) 

660 else: 

661 result = [Model(**doc) for doc in cursor] 

662 try: 

663 return result[0] 

664 except IndexError: 

665 return None 

666 

667 def find_many( 

668 self, 

669 Model: Type[Model], 

670 query: QueryOperator = None, 

671 raw_query: dict = None, 

672 sort: SortOperator = None, 

673 raw_sort: dict = None, 

674 populate: bool = False, 

675 populate_db_fields: list[DbField] | None = None, 

676 as_dict: bool = False, 

677 tz_info: timezone = None, 

678 paginate: bool = False, 

679 current_page: int = 1, 

680 docs_per_page: int = 1000, 

681 ) -> Union[list[Model], ResponsePaginate]: 

682 """ 

683 Find multiple documents in the database. 

684 

685 Args: 

686 Model (DbModel): The database model class. 

687 query (QueryOperator, optional): The query operator. Defaults to None. 

688 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

689 sort (SortOperator, optional): The sort operator. Defaults to None. 

690 raw_sort (dict, optional): The raw sort dictionary. Defaults to None. 

691 populate (bool, optional): Flag to indicate whether to populate related documents. Defaults to False. 

692 as_dict (bool, optional): Flag to return the results as dictionaries. Defaults to False. 

693 tz_info (timezone, optional): The timezone information. Defaults to None. 

694 paginate (bool, optional): Flag to enable pagination. Defaults to False. 

695 current_page (int, optional): The current page number. Defaults to 1. 

696 docs_per_page (int, optional): The number of documents per page. Defaults to 1000. 

697 

698 Returns: 

699 list[DbModel] or ResponsePaginate: The list of found database model objects or a paginated response. 

700 """ 

701 pipeline, query, _ = self._aggregate_pipeline( 

702 Model=Model, 

703 query=query, 

704 raw_query=raw_query, 

705 sort=sort, 

706 raw_sort=raw_sort, 

707 populate=populate, 

708 populate_db_fields=populate_db_fields, 

709 ) 

710 

711 def _result(): 

712 cursor = self._aggregate_cursor( 

713 Model=Model, pipeline=pipeline, tz_info=tz_info 

714 ) 

715 if as_dict: 

716 result = list(cursor) 

717 else: 

718 result = [Model(**doc) for doc in cursor] 

719 return result 

720 

721 if not paginate: 

722 return _result() 

723 self._add_paginate_to_pipeline( 

724 pipeline=pipeline, current_page=current_page, docs_per_page=docs_per_page 

725 ) 

726 cursor = self._aggregate_cursor(Model=Model, pipeline=pipeline, tz_info=tz_info) 

727 

728 def _count(): 

729 kwargs = {"hint": "_id_"} if not query else {} 

730 return self._db[Model._collection].count_documents(filter=query, **kwargs) 

731 

732 with ThreadPoolExecutor() as executor: 

733 future_result = executor.submit(_result) 

734 future_count = executor.submit(_count) 

735 result = future_result.result() 

736 count = future_count.result() 

737 

738 page_quantity = ceil(count / docs_per_page) 

739 return ResponsePaginate( 

740 current_page=current_page, 

741 page_quantity=page_quantity, 

742 docs_quantity=count, 

743 docs=result, 

744 ) 

745 

746 def delete( 

747 self, 

748 Model: Type[Model], 

749 query: QueryOperator = None, 

750 raw_query: dict = None, 

751 delete_one: bool = False, 

752 ) -> DbResponse: 

753 """ 

754 Delete documents from the database. 

755 

756 Args: 

757 Model (DbModel): The database model class. 

758 query (QueryOperator, optional): The query operator. Defaults to None. 

759 raw_query (dict, optional): The raw query dictionary. Defaults to None. 

760 delete_one (bool, optional): Flag to delete a single document. Defaults to False. 

761 

762 Returns: 

763 DbResponse: The database response object. 

764 """ 

765 operations = self._create_delete_operations_list( 

766 query=query, raw_query=raw_query, delete_one=delete_one 

767 ) 

768 collection_name = Model._collection 

769 result: BulkWriteResult = self._db[collection_name].bulk_write(operations) 

770 return self._db_response(result=result)