Coverage for pyodmongo/services/aggregate_stages.py: 100%

16 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-16 15:08 +0000

1def unwind(path: str, array_index: str, preserve_empty: bool): 

2 """ 

3 Constructs an unwind stage for a MongoDB aggregation pipeline which deconstructs an array field 

4 from the input documents to output a document for each element. 

5 

6 Args: 

7 path (str): The field path to an array to be unwound (without the '$' prefix). 

8 array_index (str): The name of a new field to hold the array index of the element. 

9 preserve_empty (bool): If True, includes the path as null on documents where the array is missing, 

10 empty, or null; otherwise, such documents are excluded from the resulting 

11 set. 

12 

13 Returns: 

14 list[dict]: A list containing the unwind stage of the aggregation pipeline. 

15 

16 Description: 

17 The unwind operation is useful for creating a flat view of array contents in order to apply 

18 further aggregation operations like match, project, or group. 

19 """ 

20 return [ 

21 { 

22 "$unwind": { 

23 "path": f"${path}", 

24 "includeArrayIndex": array_index, 

25 "preserveNullAndEmptyArrays": preserve_empty, 

26 } 

27 } 

28 ] 

29 

30 

31def lookup(from_: str, local_field: str, foreign_field: str, as_: str, pipeline: list): 

32 """ 

33 Constructs a lookup stage for a MongoDB aggregation pipeline which performs a join from another 

34 collection in the same database. 

35 

36 Args: 

37 from_ (str): The target collection from which to fetch documents. 

38 local_field (str): The local field to match against the foreign field. 

39 foreign_field (str): The field from the documents of the "from" collection to match against. 

40 as_ (str): The output array field which contains the joined documents. 

41 pipeline (list): A list defining additional aggregation pipeline stages to filter the documents 

42 from the "from" collection before joining. 

43 

44 Returns: 

45 list[dict]: A list containing the lookup stage of the aggregation pipeline. 

46 

47 Description: 

48 The lookup operation allows for integrating data from multiple collections based on a related field. 

49 """ 

50 return [ 

51 { 

52 "$lookup": { 

53 "from": from_, 

54 "localField": local_field, 

55 "foreignField": foreign_field, 

56 "as": as_, 

57 "pipeline": pipeline, 

58 } 

59 } 

60 ] 

61 

62 

63def set_(local_field: str, as_: str): 

64 """ 

65 Constructs a set stage for a MongoDB aggregation pipeline that redefines a field with a specified value. 

66 

67 Args: 

68 as_ (str): The name of the field to redefine or create. 

69 

70 Returns: 

71 list[dict]: A list containing the set stage of the aggregation pipeline. 

72 

73 Description: 

74 This operation allows setting or replacing values of fields within documents during aggregation. 

75 """ 

76 fields = local_field.split(".") 

77 pipeline = [{"$set": {as_: {"$arrayElemAt": [f"${as_}", 0]}}}] 

78 for i in range(len(fields), 0, -1): 

79 field_str = ".".join(fields[:i]) 

80 then = "$$REMOVE" if i > 1 else None 

81 pipeline.append( 

82 { 

83 "$set": { 

84 field_str: { 

85 "$cond": { 

86 "if": { 

87 "$or": [ 

88 {"$eq": [f"${field_str}", None]}, 

89 {"$eq": [f"${field_str}", {}]}, 

90 ] 

91 }, 

92 "then": then, 

93 "else": f"${field_str}", 

94 } 

95 } 

96 } 

97 } 

98 ) 

99 

100 return pipeline 

101 

102 

103def group_set_replace_root( 

104 to_sort: dict, id_: list[str], array_index: str, field: str, path_str: str 

105): 

106 """ 

107 Constructs a combination of group, set, and replaceRoot stages for a MongoDB aggregation pipeline. 

108 

109 Args: 

110 id_ (str): The field to use as the identifier for grouping documents. 

111 field (str): The field name to create or redefine with grouped values. 

112 path_str (str): The path string representing where to place or replace data in documents. 

113 

114 Returns: 

115 list[dict]: A list containing the group, set, and replaceRoot stages of the aggregation pipeline. 

116 

117 Description: 

118 This sequence of stages first groups documents by the specified identifier, aggregates 

119 specific fields, then modifies document structure, and finally promotes a nested document 

120 to the top-level. 

121 """ 

122 return [ 

123 {"$sort": to_sort}, 

124 { 

125 "$group": { 

126 "_id": id_, 

127 "_document": {"$first": "$$ROOT"}, 

128 field: {"$push": f"${path_str}"}, 

129 } 

130 }, 

131 # {"$set": {f"_document.{path_str}": f"${field}"}}, 

132 { 

133 "$set": { 

134 f"_document.{path_str}": { 

135 "$cond": { 

136 "if": {"$eq": [f"$_document.{array_index}", None]}, 

137 "then": [], 

138 # "then": None, 

139 "else": f"${field}", 

140 } 

141 } 

142 } 

143 }, 

144 {"$replaceRoot": {"newRoot": "$_document"}}, 

145 ] 

146 

147 

148def unset(fields: list): 

149 """ 

150 Constructs an unset stage for a MongoDB aggregation pipeline which removes the specified fields from documents. 

151 

152 Args: 

153 fields (list): A list of field names to be removed from the documents. 

154 

155 Returns: 

156 list[dict]: A list containing the unset stage of the aggregation pipeline. 

157 

158 Description: 

159 The unset operation is used to delete fields from documents during aggregation, simplifying 

160 documents or removing unwanted data. 

161 """ 

162 return [{"$unset": fields}]