[docs]classISO8601Date(TypedDict):"""A date in ISO 8601 format (YYYY-MM-DD)."""date:strtype:Literal["date"]
[docs]classISO8601DateTime(TypedDict):"""A datetime in ISO 8601 format (YYYY-MM-DDTHH:MM:SS)."""datetime:strtype:Literal["datetime"]
@v_args(inline=True)classQueryTransformer(Transformer):"""Transform a query string into an intermediate representation."""def__init__(self,*args:Any,allowed_comparators:Optional[Sequence[Comparator]]=None,allowed_operators:Optional[Sequence[Operator]]=None,allowed_attributes:Optional[Sequence[str]]=None,**kwargs:Any,):super().__init__(*args,**kwargs)self.allowed_comparators=allowed_comparatorsself.allowed_operators=allowed_operatorsself.allowed_attributes=allowed_attributesdefprogram(self,*items:Any)->tuple:returnitemsdeffunc_call(self,func_name:Any,args:list)->FilterDirective:func=self._match_func_name(str(func_name))ifisinstance(func,Comparator):ifself.allowed_attributesandargs[0]notinself.allowed_attributes:raiseValueError(f"Received invalid attributes {args[0]}. Allowed attributes are "f"{self.allowed_attributes}")returnComparison(comparator=func,attribute=args[0],value=args[1])eliflen(args)==1andfuncin(Operator.AND,Operator.OR):returnargs[0]else:returnOperation(operator=func,arguments=args)def_match_func_name(self,func_name:str)->Union[Operator,Comparator]:iffunc_nameinset(Comparator):ifself.allowed_comparatorsisnotNone:iffunc_namenotinself.allowed_comparators:raiseValueError(f"Received disallowed comparator {func_name}. Allowed "f"comparators are {self.allowed_comparators}")returnComparator(func_name)eliffunc_nameinset(Operator):ifself.allowed_operatorsisnotNone:iffunc_namenotinself.allowed_operators:raiseValueError(f"Received disallowed operator {func_name}. Allowed operators"f" are {self.allowed_operators}")returnOperator(func_name)else:raiseValueError(f"Received unrecognized function {func_name}. Valid functions are "f"{list(Operator)+list(Comparator)}")defargs(self,*items:Any)->tuple:returnitemsdeffalse(self)->bool:returnFalsedeftrue(self)->bool:returnTruedeflist(self,item:Any)->list:ifitemisNone:return[]returnlist(item)defint(self,item:Any)->int:returnint(item)deffloat(self,item:Any)->float:returnfloat(item)defdate(self,item:Any)->ISO8601Date:item=str(item).strip("\"'")try:datetime.datetime.strptime(item,"%Y-%m-%d")exceptValueError:warnings.warn("Dates are expected to be provided in ISO 8601 date format ""(YYYY-MM-DD).")return{"date":item,"type":"date"}defdatetime(self,item:Any)->ISO8601DateTime:item=str(item).strip("\"'")try:# Parse full ISO 8601 datetime formatdatetime.datetime.strptime(item,"%Y-%m-%dT%H:%M:%S%z")exceptValueError:try:datetime.datetime.strptime(item,"%Y-%m-%dT%H:%M:%S")exceptValueError:raiseValueError("Datetime values are expected to be in ISO 8601 format.")return{"datetime":item,"type":"datetime"}defstring(self,item:Any)->str:# Remove escaped quotesreturnstr(item).strip("\"'")
[docs]defget_parser(allowed_comparators:Optional[Sequence[Comparator]]=None,allowed_operators:Optional[Sequence[Operator]]=None,allowed_attributes:Optional[Sequence[str]]=None,)->Lark:"""Return a parser for the query language. Args: allowed_comparators: Optional[Sequence[Comparator]] allowed_operators: Optional[Sequence[Operator]] Returns: Lark parser for the query language. """# QueryTransformer is None when Lark cannot be imported.ifQueryTransformerisNone:raiseImportError("Cannot import lark, please install it with 'pip install lark'.")transformer=QueryTransformer(allowed_comparators=allowed_comparators,allowed_operators=allowed_operators,allowed_attributes=allowed_attributes,)returnLark(GRAMMAR,parser="lalr",transformer=transformer,start="program")