Coverage for pyodmongo/services/aggregate_stages.py: 100%
16 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-16 15:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-16 15:08 +0000
1def unwind(path: str, array_index: str, preserve_empty: bool):
2 """
3 Constructs an unwind stage for a MongoDB aggregation pipeline which deconstructs an array field
4 from the input documents to output a document for each element.
6 Args:
7 path (str): The field path to an array to be unwound (without the '$' prefix).
8 array_index (str): The name of a new field to hold the array index of the element.
9 preserve_empty (bool): If True, includes the path as null on documents where the array is missing,
10 empty, or null; otherwise, such documents are excluded from the resulting
11 set.
13 Returns:
14 list[dict]: A list containing the unwind stage of the aggregation pipeline.
16 Description:
17 The unwind operation is useful for creating a flat view of array contents in order to apply
18 further aggregation operations like match, project, or group.
19 """
20 return [
21 {
22 "$unwind": {
23 "path": f"${path}",
24 "includeArrayIndex": array_index,
25 "preserveNullAndEmptyArrays": preserve_empty,
26 }
27 }
28 ]
31def lookup(from_: str, local_field: str, foreign_field: str, as_: str, pipeline: list):
32 """
33 Constructs a lookup stage for a MongoDB aggregation pipeline which performs a join from another
34 collection in the same database.
36 Args:
37 from_ (str): The target collection from which to fetch documents.
38 local_field (str): The local field to match against the foreign field.
39 foreign_field (str): The field from the documents of the "from" collection to match against.
40 as_ (str): The output array field which contains the joined documents.
41 pipeline (list): A list defining additional aggregation pipeline stages to filter the documents
42 from the "from" collection before joining.
44 Returns:
45 list[dict]: A list containing the lookup stage of the aggregation pipeline.
47 Description:
48 The lookup operation allows for integrating data from multiple collections based on a related field.
49 """
50 return [
51 {
52 "$lookup": {
53 "from": from_,
54 "localField": local_field,
55 "foreignField": foreign_field,
56 "as": as_,
57 "pipeline": pipeline,
58 }
59 }
60 ]
63def set_(local_field: str, as_: str):
64 """
65 Constructs a set stage for a MongoDB aggregation pipeline that redefines a field with a specified value.
67 Args:
68 as_ (str): The name of the field to redefine or create.
70 Returns:
71 list[dict]: A list containing the set stage of the aggregation pipeline.
73 Description:
74 This operation allows setting or replacing values of fields within documents during aggregation.
75 """
76 fields = local_field.split(".")
77 pipeline = [{"$set": {as_: {"$arrayElemAt": [f"${as_}", 0]}}}]
78 for i in range(len(fields), 0, -1):
79 field_str = ".".join(fields[:i])
80 then = "$$REMOVE" if i > 1 else None
81 pipeline.append(
82 {
83 "$set": {
84 field_str: {
85 "$cond": {
86 "if": {
87 "$or": [
88 {"$eq": [f"${field_str}", None]},
89 {"$eq": [f"${field_str}", {}]},
90 ]
91 },
92 "then": then,
93 "else": f"${field_str}",
94 }
95 }
96 }
97 }
98 )
100 return pipeline
103def group_set_replace_root(
104 to_sort: dict, id_: list[str], array_index: str, field: str, path_str: str
105):
106 """
107 Constructs a combination of group, set, and replaceRoot stages for a MongoDB aggregation pipeline.
109 Args:
110 id_ (str): The field to use as the identifier for grouping documents.
111 field (str): The field name to create or redefine with grouped values.
112 path_str (str): The path string representing where to place or replace data in documents.
114 Returns:
115 list[dict]: A list containing the group, set, and replaceRoot stages of the aggregation pipeline.
117 Description:
118 This sequence of stages first groups documents by the specified identifier, aggregates
119 specific fields, then modifies document structure, and finally promotes a nested document
120 to the top-level.
121 """
122 return [
123 {"$sort": to_sort},
124 {
125 "$group": {
126 "_id": id_,
127 "_document": {"$first": "$$ROOT"},
128 field: {"$push": f"${path_str}"},
129 }
130 },
131 # {"$set": {f"_document.{path_str}": f"${field}"}},
132 {
133 "$set": {
134 f"_document.{path_str}": {
135 "$cond": {
136 "if": {"$eq": [f"$_document.{array_index}", None]},
137 "then": [],
138 # "then": None,
139 "else": f"${field}",
140 }
141 }
142 }
143 },
144 {"$replaceRoot": {"newRoot": "$_document"}},
145 ]
148def unset(fields: list):
149 """
150 Constructs an unset stage for a MongoDB aggregation pipeline which removes the specified fields from documents.
152 Args:
153 fields (list): A list of field names to be removed from the documents.
155 Returns:
156 list[dict]: A list containing the unset stage of the aggregation pipeline.
158 Description:
159 The unset operation is used to delete fields from documents during aggregation, simplifying
160 documents or removing unwanted data.
161 """
162 return [{"$unset": fields}]