Coverage for pyodmongo/queries/query_string.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-16 15:08 +0000

1from ..models.db_model import DbModel 

2from ..models.db_field_info import DbField 

3from ..models.query_operators import QueryOperator, LogicalOperator 

4from .operators import and_, or_, sort 

5from typing import Type, Literal 

6from datetime import datetime 

7import re 

8 

9 

10def is_inheritance_of_db_model(Model): 

11 """ 

12 Checks if the provided class is a subclass of DbModel or is DbModel itself. 

13 

14 Args: 

15 Model (type): The class to check for inheritance. 

16 

17 Returns: 

18 bool: True if the class is DbModel or a subclass of DbModel, False otherwise. 

19 

20 Description: 

21 This function recursively checks the inheritance chain of the provided class to 

22 determine if it is derived from DbModel. This is used to ensure that models used 

23 in database operations inherit from the base DbModel class, enforcing a certain 

24 structure. 

25 """ 

26 if Model == DbModel: 

27 return True 

28 bases = Model.__bases__ 

29 for base in bases: 

30 if is_inheritance_of_db_model(Model=base): 

31 return True 

32 return False 

33 

34 

35def js_regex_to_python(js_regex_str): 

36 """ 

37 Converts a JavaScript-style regex string to a Python regex pattern. 

38 

39 Args: 

40 js_regex_str (str): The JavaScript regex string to convert. 

41 

42 Returns: 

43 Pattern: A compiled Python regex pattern, or the original string if conversion is not possible. 

44 

45 Description: 

46 This function attempts to parse a JavaScript regex string and convert it into a Python 

47 regex pattern. JavaScript flags are converted to their Python equivalents where applicable. 

48 If the string cannot be parsed as a JavaScript regex, the original string is returned. 

49 """ 

50 try: 

51 match = re.match(r"/([^/]+)/([a-z]*)$", js_regex_str) 

52 except TypeError: 

53 return js_regex_str 

54 try: 

55 pattern, js_flags = match.groups() 

56 except AttributeError: 

57 return js_regex_str 

58 flags = 0 

59 if "i" in js_flags: 

60 flags |= re.IGNORECASE 

61 if "m" in js_flags: 

62 flags |= re.MULTILINE 

63 if "s" in js_flags: 

64 flags |= re.DOTALL 

65 

66 return re.compile(pattern, flags) 

67 

68 

69def mount_query_filter( 

70 Model: Type[DbModel], 

71 items: dict, 

72 query_operator: Literal["and", "or"] = "and", 

73 initial_comparison_operators: list[QueryOperator] = [], 

74) -> QueryOperator: 

75 """ 

76 Constructs a MongoDB query filter from a dictionary of conditions and initializes 

77 additional comparison operators based on the Model's field definitions. 

78 

79 Args: 

80 Model (Type[DbModel]): The model class that fields are checked against. 

81 items (dict): A dictionary containing field names and their corresponding filter values. 

82 initial_comparison_operators (list[ComparisonOperator]): A list to which new comparison 

83 operators are added. 

84 

85 Returns: 

86 LogicalOperator, sort_operators: A logical operator combining all comparison operators, 

87 and sorting operators if '$sort' is found in the keys. 

88 

89 Raises: 

90 TypeError: If the Model is not a subclass of DbModel. 

91 AttributeError: If a field specified does not exist in the Model. 

92 

93 Description: 

94 This function interprets and converts query conditions specified in `items` into 

95 MongoDB query operators. It supports conversion of ISO date strings, evaluation of 

96 strings into Python expressions, and handling of JavaScript-style regex patterns. 

97 It also processes sorting instructions if provided. 

98 """ 

99 is_inheritance = is_inheritance_of_db_model(Model=Model) 

100 if not is_inheritance: 

101 raise TypeError("Model must be a DbModel") 

102 sort_operators = None 

103 for key, value in items.items(): 

104 key = key.strip() 

105 value = value.strip() 

106 if value == "": 

107 continue 

108 split_result = key.strip().rsplit(sep="_", maxsplit=1) 

109 operator = f"{split_result[-1]}" 

110 if operator not in [ 

111 "eq", 

112 "gt", 

113 "gte", 

114 "in", 

115 "lt", 

116 "lte", 

117 "ne", 

118 "nin", 

119 "and", 

120 "or", 

121 ]: 

122 if operator in ["sort"]: 

123 value = eval(value) 

124 for v in value: 

125 v[0] = getattr(Model, v[0]) 

126 sort_operators = sort(*value) 

127 continue 

128 if operator in ["and", "or"]: 

129 value, _ = mount_query_filter( 

130 Model=Model, 

131 items=eval(value), 

132 query_operator=operator, 

133 initial_comparison_operators=[], 

134 ) 

135 try: 

136 value = datetime.fromisoformat(value) 

137 except (TypeError, ValueError): 

138 try: 

139 if type(value) == str and ( 

140 value.capitalize() == "True" or value.capitalize() == "False" 

141 ): 

142 value = value.capitalize() 

143 value = eval(value) 

144 except (NameError, SyntaxError, TypeError): 

145 value = value 

146 field_name = split_result[0] 

147 if type(value) is list: 

148 for index, item in enumerate(value): 

149 value[index] = js_regex_to_python(item) 

150 if type(value) != LogicalOperator: 

151 try: 

152 db_field_info: DbField = eval(f"Model.{field_name}") 

153 except AttributeError: 

154 raise AttributeError( 

155 f"There's no field '{field_name}' in {Model.__name__}" 

156 ) 

157 initial_comparison_operators.append( 

158 db_field_info.comparison_operator(operator="$" + operator, value=value) 

159 ) 

160 else: 

161 initial_comparison_operators.append(value) 

162 pass 

163 if len(initial_comparison_operators) == 0: 

164 return None, sort_operators 

165 if query_operator == "or": 

166 return or_(*initial_comparison_operators), sort_operators 

167 else: 

168 return and_(*initial_comparison_operators), sort_operators