Coverage for pyodmongo/queries/query_string.py: 98%

80 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-27 14:31 +0000

1from ..models.db_model import DbModel 

2from ..models.db_field_info import DbField 

3from ..models.query_operators import QueryOperator, LogicalOperator 

4from .operators import and_, or_, sort 

5from typing import Type, Literal 

6from datetime import datetime 

7import re 

8 

9 

10def is_inheritance_of_db_model(Model): 

11 """ 

12 Checks if the provided class is a subclass of DbModel or is DbModel itself. 

13 

14 Args: 

15 Model (type): The class to check for inheritance. 

16 

17 Returns: 

18 bool: True if the class is DbModel or a subclass of DbModel, False otherwise. 

19 

20 Description: 

21 This function recursively checks the inheritance chain of the provided class to 

22 determine if it is derived from DbModel. This is used to ensure that models used 

23 in database operations inherit from the base DbModel class, enforcing a certain 

24 structure. 

25 """ 

26 if Model == DbModel: 

27 return True 

28 bases = Model.__bases__ 

29 for base in bases: 

30 if is_inheritance_of_db_model(Model=base): 

31 return True 

32 return False 

33 

34 

35def js_regex_to_python(js_regex_str): 

36 """ 

37 Converts a JavaScript-style regex string to a Python regex pattern. 

38 

39 Args: 

40 js_regex_str (str): The JavaScript regex string to convert. 

41 

42 Returns: 

43 Pattern: A compiled Python regex pattern, or the original string if conversion is not possible. 

44 

45 Description: 

46 This function attempts to parse a JavaScript regex string and convert it into a Python 

47 regex pattern. JavaScript flags are converted to their Python equivalents where applicable. 

48 If the string cannot be parsed as a JavaScript regex, the original string is returned. 

49 """ 

50 try: 

51 match = re.match(r"/([^/]+)/([a-z]*)$", js_regex_str) 

52 except TypeError: 

53 return js_regex_str 

54 try: 

55 pattern, js_flags = match.groups() 

56 except AttributeError: 

57 return js_regex_str 

58 flags = 0 

59 if "i" in js_flags: 

60 flags |= re.IGNORECASE 

61 if "m" in js_flags: 

62 flags |= re.MULTILINE 

63 if "s" in js_flags: 

64 flags |= re.DOTALL 

65 

66 return re.compile(pattern, flags) 

67 

68 

69def mount_query_filter( 

70 Model: Type[DbModel], 

71 items: dict, 

72 query_operator: Literal["and", "or"] = "and", 

73 initial_comparison_operators: list[QueryOperator] = [], 

74) -> QueryOperator: 

75 """ 

76 Constructs a MongoDB query filter from a dictionary of conditions and initializes 

77 additional comparison operators based on the Model's field definitions. 

78 

79 Args: 

80 Model (Type[DbModel]): The model class that fields are checked against. 

81 items (dict): A dictionary containing field names and their corresponding filter values. 

82 initial_comparison_operators (list[ComparisonOperator]): A list to which new comparison 

83 operators are added. 

84 

85 Returns: 

86 LogicalOperator, sort_operators: A logical operator combining all comparison operators, 

87 and sorting operators if '$sort' is found in the keys. 

88 

89 Raises: 

90 TypeError: If the Model is not a subclass of DbModel. 

91 AttributeError: If a field specified does not exist in the Model. 

92 

93 Description: 

94 This function interprets and converts query conditions specified in `items` into 

95 MongoDB query operators. It supports conversion of ISO date strings, evaluation of 

96 strings into Python expressions, and handling of JavaScript-style regex patterns. 

97 It also processes sorting instructions if provided. 

98 """ 

99 is_inheritance = is_inheritance_of_db_model(Model=Model) 

100 if not is_inheritance: 

101 raise TypeError("Model must be a DbModel") 

102 sort_operators = None 

103 for key, value in items.items(): 

104 key = key.strip() 

105 try: 

106 value = value.strip() 

107 except AttributeError: 

108 ... 

109 # if value == "": 

110 # continue 

111 split_result = key.strip().rsplit(sep="_", maxsplit=1) 

112 operator = f"{split_result[-1]}" 

113 if operator not in [ 

114 "eq", 

115 "gt", 

116 "gte", 

117 "in", 

118 "lt", 

119 "lte", 

120 "ne", 

121 "nin", 

122 "and", 

123 "or", 

124 ]: 

125 if operator in ["sort"]: 

126 value = eval(value) 

127 for v in value: 

128 v[0] = getattr(Model, v[0]) 

129 sort_operators = sort(*value) 

130 continue 

131 if operator in ["and", "or"]: 

132 value, _ = mount_query_filter( 

133 Model=Model, 

134 items=eval(value), 

135 query_operator=operator, 

136 initial_comparison_operators=[], 

137 ) 

138 try: 

139 value = datetime.fromisoformat(value) 

140 except (TypeError, ValueError): 

141 try: 

142 if type(value) == str and ( 

143 value.capitalize() == "True" or value.capitalize() == "False" 

144 ): 

145 value = value.capitalize() 

146 value = eval(value) 

147 except (NameError, SyntaxError, TypeError): 

148 value = value 

149 field_name = split_result[0] 

150 if type(value) is list: 

151 for index, item in enumerate(value): 

152 value[index] = js_regex_to_python(item) 

153 if type(value) != LogicalOperator: 

154 try: 

155 db_field_info: DbField = eval(f"Model.{field_name}") 

156 except AttributeError: 

157 raise AttributeError( 

158 f"There's no field '{field_name}' in {Model.__name__}" 

159 ) 

160 initial_comparison_operators.append( 

161 db_field_info.comparison_operator(operator="$" + operator, value=value) 

162 ) 

163 else: 

164 initial_comparison_operators.append(value) 

165 pass 

166 if len(initial_comparison_operators) == 0: 

167 return None, sort_operators 

168 if query_operator == "or": 

169 return or_(*initial_comparison_operators), sort_operators 

170 else: 

171 return and_(*initial_comparison_operators), sort_operators