Coverage for pyodmongo/queries/query_string.py: 98%
80 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-27 14:31 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-27 14:31 +0000
1from ..models.db_model import DbModel
2from ..models.db_field_info import DbField
3from ..models.query_operators import QueryOperator, LogicalOperator
4from .operators import and_, or_, sort
5from typing import Type, Literal
6from datetime import datetime
7import re
10def is_inheritance_of_db_model(Model):
11 """
12 Checks if the provided class is a subclass of DbModel or is DbModel itself.
14 Args:
15 Model (type): The class to check for inheritance.
17 Returns:
18 bool: True if the class is DbModel or a subclass of DbModel, False otherwise.
20 Description:
21 This function recursively checks the inheritance chain of the provided class to
22 determine if it is derived from DbModel. This is used to ensure that models used
23 in database operations inherit from the base DbModel class, enforcing a certain
24 structure.
25 """
26 if Model == DbModel:
27 return True
28 bases = Model.__bases__
29 for base in bases:
30 if is_inheritance_of_db_model(Model=base):
31 return True
32 return False
35def js_regex_to_python(js_regex_str):
36 """
37 Converts a JavaScript-style regex string to a Python regex pattern.
39 Args:
40 js_regex_str (str): The JavaScript regex string to convert.
42 Returns:
43 Pattern: A compiled Python regex pattern, or the original string if conversion is not possible.
45 Description:
46 This function attempts to parse a JavaScript regex string and convert it into a Python
47 regex pattern. JavaScript flags are converted to their Python equivalents where applicable.
48 If the string cannot be parsed as a JavaScript regex, the original string is returned.
49 """
50 try:
51 match = re.match(r"/([^/]+)/([a-z]*)$", js_regex_str)
52 except TypeError:
53 return js_regex_str
54 try:
55 pattern, js_flags = match.groups()
56 except AttributeError:
57 return js_regex_str
58 flags = 0
59 if "i" in js_flags:
60 flags |= re.IGNORECASE
61 if "m" in js_flags:
62 flags |= re.MULTILINE
63 if "s" in js_flags:
64 flags |= re.DOTALL
66 return re.compile(pattern, flags)
69def mount_query_filter(
70 Model: Type[DbModel],
71 items: dict,
72 query_operator: Literal["and", "or"] = "and",
73 initial_comparison_operators: list[QueryOperator] = [],
74) -> QueryOperator:
75 """
76 Constructs a MongoDB query filter from a dictionary of conditions and initializes
77 additional comparison operators based on the Model's field definitions.
79 Args:
80 Model (Type[DbModel]): The model class that fields are checked against.
81 items (dict): A dictionary containing field names and their corresponding filter values.
82 initial_comparison_operators (list[ComparisonOperator]): A list to which new comparison
83 operators are added.
85 Returns:
86 LogicalOperator, sort_operators: A logical operator combining all comparison operators,
87 and sorting operators if '$sort' is found in the keys.
89 Raises:
90 TypeError: If the Model is not a subclass of DbModel.
91 AttributeError: If a field specified does not exist in the Model.
93 Description:
94 This function interprets and converts query conditions specified in `items` into
95 MongoDB query operators. It supports conversion of ISO date strings, evaluation of
96 strings into Python expressions, and handling of JavaScript-style regex patterns.
97 It also processes sorting instructions if provided.
98 """
99 is_inheritance = is_inheritance_of_db_model(Model=Model)
100 if not is_inheritance:
101 raise TypeError("Model must be a DbModel")
102 sort_operators = None
103 for key, value in items.items():
104 key = key.strip()
105 try:
106 value = value.strip()
107 except AttributeError:
108 ...
109 # if value == "":
110 # continue
111 split_result = key.strip().rsplit(sep="_", maxsplit=1)
112 operator = f"{split_result[-1]}"
113 if operator not in [
114 "eq",
115 "gt",
116 "gte",
117 "in",
118 "lt",
119 "lte",
120 "ne",
121 "nin",
122 "and",
123 "or",
124 ]:
125 if operator in ["sort"]:
126 value = eval(value)
127 for v in value:
128 v[0] = getattr(Model, v[0])
129 sort_operators = sort(*value)
130 continue
131 if operator in ["and", "or"]:
132 value, _ = mount_query_filter(
133 Model=Model,
134 items=eval(value),
135 query_operator=operator,
136 initial_comparison_operators=[],
137 )
138 try:
139 value = datetime.fromisoformat(value)
140 except (TypeError, ValueError):
141 try:
142 if type(value) == str and (
143 value.capitalize() == "True" or value.capitalize() == "False"
144 ):
145 value = value.capitalize()
146 value = eval(value)
147 except (NameError, SyntaxError, TypeError):
148 value = value
149 field_name = split_result[0]
150 if type(value) is list:
151 for index, item in enumerate(value):
152 value[index] = js_regex_to_python(item)
153 if type(value) != LogicalOperator:
154 try:
155 db_field_info: DbField = eval(f"Model.{field_name}")
156 except AttributeError:
157 raise AttributeError(
158 f"There's no field '{field_name}' in {Model.__name__}"
159 )
160 initial_comparison_operators.append(
161 db_field_info.comparison_operator(operator="$" + operator, value=value)
162 )
163 else:
164 initial_comparison_operators.append(value)
165 pass
166 if len(initial_comparison_operators) == 0:
167 return None, sort_operators
168 if query_operator == "or":
169 return or_(*initial_comparison_operators), sort_operators
170 else:
171 return and_(*initial_comparison_operators), sort_operators