[docs]classTimescaleVectorTranslator(Visitor):"""Translate the internal query language elements to valid filters."""allowed_operators=[Operator.AND,Operator.OR,Operator.NOT]"""Subset of allowed logical operators."""allowed_comparators=[Comparator.EQ,Comparator.GT,Comparator.GTE,Comparator.LT,Comparator.LTE,]COMPARATOR_MAP={Comparator.EQ:"==",Comparator.GT:">",Comparator.GTE:">=",Comparator.LT:"<",Comparator.LTE:"<=",}OPERATOR_MAP={Operator.AND:"AND",Operator.OR:"OR",Operator.NOT:"NOT"}def_format_func(self,func:Union[Operator,Comparator])->str:self._validate_func(func)ifisinstance(func,Operator):value=self.OPERATOR_MAP[func.value]# type: ignoreelifisinstance(func,Comparator):value=self.COMPARATOR_MAP[func.value]# type: ignorereturnf"{value}"
[docs]defvisit_operation(self,operation:Operation)->client.Predicates:try:fromtimescale_vectorimportclientexceptImportErrorase:raiseImportError("Cannot import timescale-vector. Please install with `pip install ""timescale-vector`.")fromeargs=[arg.accept(self)forarginoperation.arguments]returnclient.Predicates(*args,operator=self._format_func(operation.operator))
[docs]defvisit_comparison(self,comparison:Comparison)->client.Predicates:try:fromtimescale_vectorimportclientexceptImportErrorase:raiseImportError("Cannot import timescale-vector. Please install with `pip install ""timescale-vector`.")fromereturnclient.Predicates((comparison.attribute,self._format_func(comparison.comparator),comparison.value,))