Coverage for pyodmongo/queries/query_string.py: 100%
79 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-16 15:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-16 15:08 +0000
1from ..models.db_model import DbModel
2from ..models.db_field_info import DbField
3from ..models.query_operators import QueryOperator, LogicalOperator
4from .operators import and_, or_, sort
5from typing import Type, Literal
6from datetime import datetime
7import re
10def is_inheritance_of_db_model(Model):
11 """
12 Checks if the provided class is a subclass of DbModel or is DbModel itself.
14 Args:
15 Model (type): The class to check for inheritance.
17 Returns:
18 bool: True if the class is DbModel or a subclass of DbModel, False otherwise.
20 Description:
21 This function recursively checks the inheritance chain of the provided class to
22 determine if it is derived from DbModel. This is used to ensure that models used
23 in database operations inherit from the base DbModel class, enforcing a certain
24 structure.
25 """
26 if Model == DbModel:
27 return True
28 bases = Model.__bases__
29 for base in bases:
30 if is_inheritance_of_db_model(Model=base):
31 return True
32 return False
35def js_regex_to_python(js_regex_str):
36 """
37 Converts a JavaScript-style regex string to a Python regex pattern.
39 Args:
40 js_regex_str (str): The JavaScript regex string to convert.
42 Returns:
43 Pattern: A compiled Python regex pattern, or the original string if conversion is not possible.
45 Description:
46 This function attempts to parse a JavaScript regex string and convert it into a Python
47 regex pattern. JavaScript flags are converted to their Python equivalents where applicable.
48 If the string cannot be parsed as a JavaScript regex, the original string is returned.
49 """
50 try:
51 match = re.match(r"/([^/]+)/([a-z]*)$", js_regex_str)
52 except TypeError:
53 return js_regex_str
54 try:
55 pattern, js_flags = match.groups()
56 except AttributeError:
57 return js_regex_str
58 flags = 0
59 if "i" in js_flags:
60 flags |= re.IGNORECASE
61 if "m" in js_flags:
62 flags |= re.MULTILINE
63 if "s" in js_flags:
64 flags |= re.DOTALL
66 return re.compile(pattern, flags)
69def mount_query_filter(
70 Model: Type[DbModel],
71 items: dict,
72 query_operator: Literal["and", "or"] = "and",
73 initial_comparison_operators: list[QueryOperator] = [],
74) -> QueryOperator:
75 """
76 Constructs a MongoDB query filter from a dictionary of conditions and initializes
77 additional comparison operators based on the Model's field definitions.
79 Args:
80 Model (Type[DbModel]): The model class that fields are checked against.
81 items (dict): A dictionary containing field names and their corresponding filter values.
82 initial_comparison_operators (list[ComparisonOperator]): A list to which new comparison
83 operators are added.
85 Returns:
86 LogicalOperator, sort_operators: A logical operator combining all comparison operators,
87 and sorting operators if '$sort' is found in the keys.
89 Raises:
90 TypeError: If the Model is not a subclass of DbModel.
91 AttributeError: If a field specified does not exist in the Model.
93 Description:
94 This function interprets and converts query conditions specified in `items` into
95 MongoDB query operators. It supports conversion of ISO date strings, evaluation of
96 strings into Python expressions, and handling of JavaScript-style regex patterns.
97 It also processes sorting instructions if provided.
98 """
99 is_inheritance = is_inheritance_of_db_model(Model=Model)
100 if not is_inheritance:
101 raise TypeError("Model must be a DbModel")
102 sort_operators = None
103 for key, value in items.items():
104 key = key.strip()
105 value = value.strip()
106 if value == "":
107 continue
108 split_result = key.strip().rsplit(sep="_", maxsplit=1)
109 operator = f"{split_result[-1]}"
110 if operator not in [
111 "eq",
112 "gt",
113 "gte",
114 "in",
115 "lt",
116 "lte",
117 "ne",
118 "nin",
119 "and",
120 "or",
121 ]:
122 if operator in ["sort"]:
123 value = eval(value)
124 for v in value:
125 v[0] = getattr(Model, v[0])
126 sort_operators = sort(*value)
127 continue
128 if operator in ["and", "or"]:
129 value, _ = mount_query_filter(
130 Model=Model,
131 items=eval(value),
132 query_operator=operator,
133 initial_comparison_operators=[],
134 )
135 try:
136 value = datetime.fromisoformat(value)
137 except (TypeError, ValueError):
138 try:
139 if type(value) == str and (
140 value.capitalize() == "True" or value.capitalize() == "False"
141 ):
142 value = value.capitalize()
143 value = eval(value)
144 except (NameError, SyntaxError, TypeError):
145 value = value
146 field_name = split_result[0]
147 if type(value) is list:
148 for index, item in enumerate(value):
149 value[index] = js_regex_to_python(item)
150 if type(value) != LogicalOperator:
151 try:
152 db_field_info: DbField = eval(f"Model.{field_name}")
153 except AttributeError:
154 raise AttributeError(
155 f"There's no field '{field_name}' in {Model.__name__}"
156 )
157 initial_comparison_operators.append(
158 db_field_info.comparison_operator(operator="$" + operator, value=value)
159 )
160 else:
161 initial_comparison_operators.append(value)
162 pass
163 if len(initial_comparison_operators) == 0:
164 return None, sort_operators
165 if query_operator == "or":
166 return or_(*initial_comparison_operators), sort_operators
167 else:
168 return and_(*initial_comparison_operators), sort_operators