Source code for langchain_community.tools.e2b_data_analysis.unparse
# mypy: disable-error-code=no-untyped-def# Because Python >3.9 doesn't support ast.unparse,# we copied the unparse functionality from here:# https://github.com/python/cpython/blob/3.8/Tools/parser/unparse.py"Usage: unparse.py <path to source file>"importastimportioimportsysimporttokenize# Large float and imaginary literals get turned into infinities in the AST.# We unparse those infinities to INFSTR.INFSTR="1e"+repr(sys.float_info.max_10_exp+1)
[docs]definterleave(inter,f,seq):"""Call f on each item in seq, calling inter() in between."""seq=iter(seq)try:f(next(seq))exceptStopIteration:passelse:forxinseq:inter()f(x)
[docs]classUnparser:"""Traverse an AST and output source code for the abstract syntax; original formatting is disregarded."""
[docs]def__init__(self,tree,file=sys.stdout):"""Unparser(tree, file=sys.stdout) -> None. Print the source for tree to file."""self.f=fileself._indent=0self.dispatch(tree)self.f.flush()
[docs]deffill(self,text=""):"Indent a piece of text, according to the current indentation level"self.f.write("\n"+" "*self._indent+text)
[docs]defwrite(self,text):"Append a piece of text to the current line."self.f.write(text)
[docs]defenter(self):"Print ':', and increase the indentation."self.write(":")self._indent+=1
[docs]defleave(self):"Decrease the indentation level."self._indent-=1
[docs]defdispatch(self,tree):"Dispatcher function, dispatching tree type T to method _T."ifisinstance(tree,list):fortintree:self.dispatch(t)returnmeth=getattr(self,"_"+tree.__class__.__name__)meth(tree)
############### Unparsing methods ####################### There should be one method per concrete grammar type ## Constructors should be grouped by sum type. Ideally, ## this would follow the order in the grammar, but ## currently doesn't. #########################################################def_Module(self,tree):forstmtintree.body:self.dispatch(stmt)# stmtdef_Expr(self,tree):self.fill()self.dispatch(tree.value)def_NamedExpr(self,tree):self.write("(")self.dispatch(tree.target)self.write(" := ")self.dispatch(tree.value)self.write(")")def_Import(self,t):self.fill("import ")interleave(lambda:self.write(", "),self.dispatch,t.names)def_ImportFrom(self,t):self.fill("from ")self.write("."*t.level)ift.module:self.write(t.module)self.write(" import ")interleave(lambda:self.write(", "),self.dispatch,t.names)def_Assign(self,t):self.fill()fortargetint.targets:self.dispatch(target)self.write(" = ")self.dispatch(t.value)def_AugAssign(self,t):self.fill()self.dispatch(t.target)self.write(" "+self.binop[t.op.__class__.__name__]+"= ")self.dispatch(t.value)def_AnnAssign(self,t):self.fill()ifnott.simpleandisinstance(t.target,ast.Name):self.write("(")self.dispatch(t.target)ifnott.simpleandisinstance(t.target,ast.Name):self.write(")")self.write(": ")self.dispatch(t.annotation)ift.value:self.write(" = ")self.dispatch(t.value)def_Return(self,t):self.fill("return")ift.value:self.write(" ")self.dispatch(t.value)def_Pass(self,t):self.fill("pass")def_Break(self,t):self.fill("break")def_Continue(self,t):self.fill("continue")def_Delete(self,t):self.fill("del ")interleave(lambda:self.write(", "),self.dispatch,t.targets)def_Assert(self,t):self.fill("assert ")self.dispatch(t.test)ift.msg:self.write(", ")self.dispatch(t.msg)def_Global(self,t):self.fill("global ")interleave(lambda:self.write(", "),self.write,t.names)def_Nonlocal(self,t):self.fill("nonlocal ")interleave(lambda:self.write(", "),self.write,t.names)def_Await(self,t):self.write("(")self.write("await")ift.value:self.write(" ")self.dispatch(t.value)self.write(")")def_Yield(self,t):self.write("(")self.write("yield")ift.value:self.write(" ")self.dispatch(t.value)self.write(")")def_YieldFrom(self,t):self.write("(")self.write("yield from")ift.value:self.write(" ")self.dispatch(t.value)self.write(")")def_Raise(self,t):self.fill("raise")ifnott.exc:assertnott.causereturnself.write(" ")self.dispatch(t.exc)ift.cause:self.write(" from ")self.dispatch(t.cause)def_Try(self,t):self.fill("try")self.enter()self.dispatch(t.body)self.leave()forexint.handlers:self.dispatch(ex)ift.orelse:self.fill("else")self.enter()self.dispatch(t.orelse)self.leave()ift.finalbody:self.fill("finally")self.enter()self.dispatch(t.finalbody)self.leave()def_ExceptHandler(self,t):self.fill("except")ift.type:self.write(" ")self.dispatch(t.type)ift.name:self.write(" as ")self.write(t.name)self.enter()self.dispatch(t.body)self.leave()def_ClassDef(self,t):self.write("\n")fordecoint.decorator_list:self.fill("@")self.dispatch(deco)self.fill("class "+t.name)self.write("(")comma=Falseforeint.bases:ifcomma:self.write(", ")else:comma=Trueself.dispatch(e)foreint.keywords:ifcomma:self.write(", ")else:comma=Trueself.dispatch(e)self.write(")")self.enter()self.dispatch(t.body)self.leave()def_FunctionDef(self,t):self.__FunctionDef_helper(t,"def")def_AsyncFunctionDef(self,t):self.__FunctionDef_helper(t,"async def")def__FunctionDef_helper(self,t,fill_suffix):self.write("\n")fordecoint.decorator_list:self.fill("@")self.dispatch(deco)def_str=fill_suffix+" "+t.name+"("self.fill(def_str)self.dispatch(t.args)self.write(")")ift.returns:self.write(" -> ")self.dispatch(t.returns)self.enter()self.dispatch(t.body)self.leave()def_For(self,t):self.__For_helper("for ",t)def_AsyncFor(self,t):self.__For_helper("async for ",t)def__For_helper(self,fill,t):self.fill(fill)self.dispatch(t.target)self.write(" in ")self.dispatch(t.iter)self.enter()self.dispatch(t.body)self.leave()ift.orelse:self.fill("else")self.enter()self.dispatch(t.orelse)self.leave()def_If(self,t):self.fill("if ")self.dispatch(t.test)self.enter()self.dispatch(t.body)self.leave()# collapse nested ifs into equivalent elifs.whilet.orelseandlen(t.orelse)==1andisinstance(t.orelse[0],ast.If):t=t.orelse[0]self.fill("elif ")self.dispatch(t.test)self.enter()self.dispatch(t.body)self.leave()# final elseift.orelse:self.fill("else")self.enter()self.dispatch(t.orelse)self.leave()def_While(self,t):self.fill("while ")self.dispatch(t.test)self.enter()self.dispatch(t.body)self.leave()ift.orelse:self.fill("else")self.enter()self.dispatch(t.orelse)self.leave()def_With(self,t):self.fill("with ")interleave(lambda:self.write(", "),self.dispatch,t.items)self.enter()self.dispatch(t.body)self.leave()def_AsyncWith(self,t):self.fill("async with ")interleave(lambda:self.write(", "),self.dispatch,t.items)self.enter()self.dispatch(t.body)self.leave()# exprdef_JoinedStr(self,t):self.write("f")string=io.StringIO()self._fstring_JoinedStr(t,string.write)self.write(repr(string.getvalue()))def_FormattedValue(self,t):self.write("f")string=io.StringIO()self._fstring_FormattedValue(t,string.write)self.write(repr(string.getvalue()))def_fstring_JoinedStr(self,t,write):forvalueint.values:meth=getattr(self,"_fstring_"+type(value).__name__)meth(value,write)def_fstring_Constant(self,t,write):assertisinstance(t.value,str)value=t.value.replace("{","{{").replace("}","}}")write(value)def_fstring_FormattedValue(self,t,write):write("{")expr=io.StringIO()Unparser(t.value,expr)expr=expr.getvalue().rstrip("\n")ifexpr.startswith("{"):write(" ")# Separate pair of opening brackets as "{ {"write(expr)ift.conversion!=-1:conversion=chr(t.conversion)assertconversionin"sra"write(f"!{conversion}")ift.format_spec:write(":")meth=getattr(self,"_fstring_"+type(t.format_spec).__name__)meth(t.format_spec,write)write("}")def_Name(self,t):self.write(t.id)def_write_constant(self,value):ifisinstance(value,(float,complex)):# Substitute overflowing decimal literal for AST infinities.self.write(repr(value).replace("inf",INFSTR))else:self.write(repr(value))def_Constant(self,t):value=t.valueifisinstance(value,tuple):self.write("(")iflen(value)==1:self._write_constant(value[0])self.write(",")else:interleave(lambda:self.write(", "),self._write_constant,value)self.write(")")elifvalueis...:self.write("...")else:ift.kind=="u":self.write("u")self._write_constant(t.value)def_List(self,t):self.write("[")interleave(lambda:self.write(", "),self.dispatch,t.elts)self.write("]")def_ListComp(self,t):self.write("[")self.dispatch(t.elt)forgenint.generators:self.dispatch(gen)self.write("]")def_GeneratorExp(self,t):self.write("(")self.dispatch(t.elt)forgenint.generators:self.dispatch(gen)self.write(")")def_SetComp(self,t):self.write("{")self.dispatch(t.elt)forgenint.generators:self.dispatch(gen)self.write("}")def_DictComp(self,t):self.write("{")self.dispatch(t.key)self.write(": ")self.dispatch(t.value)forgenint.generators:self.dispatch(gen)self.write("}")def_comprehension(self,t):ift.is_async:self.write(" async for ")else:self.write(" for ")self.dispatch(t.target)self.write(" in ")self.dispatch(t.iter)forif_clauseint.ifs:self.write(" if ")self.dispatch(if_clause)def_IfExp(self,t):self.write("(")self.dispatch(t.body)self.write(" if ")self.dispatch(t.test)self.write(" else ")self.dispatch(t.orelse)self.write(")")def_Set(self,t):assertt.elts# should be at least one elementself.write("{")interleave(lambda:self.write(", "),self.dispatch,t.elts)self.write("}")def_Dict(self,t):self.write("{")defwrite_key_value_pair(k,v):self.dispatch(k)self.write(": ")self.dispatch(v)defwrite_item(item):k,v=itemifkisNone:# for dictionary unpacking operator in dicts {**{'y': 2}}# see PEP 448 for detailsself.write("**")self.dispatch(v)else:write_key_value_pair(k,v)interleave(lambda:self.write(", "),write_item,zip(t.keys,t.values))self.write("}")def_Tuple(self,t):self.write("(")iflen(t.elts)==1:elt=t.elts[0]self.dispatch(elt)self.write(",")else:interleave(lambda:self.write(", "),self.dispatch,t.elts)self.write(")")unop={"Invert":"~","Not":"not","UAdd":"+","USub":"-"}def_UnaryOp(self,t):self.write("(")self.write(self.unop[t.op.__class__.__name__])self.write(" ")self.dispatch(t.operand)self.write(")")binop={"Add":"+","Sub":"-","Mult":"*","MatMult":"@","Div":"/","Mod":"%","LShift":"<<","RShift":">>","BitOr":"|","BitXor":"^","BitAnd":"&","FloorDiv":"//","Pow":"**",}def_BinOp(self,t):self.write("(")self.dispatch(t.left)self.write(" "+self.binop[t.op.__class__.__name__]+" ")self.dispatch(t.right)self.write(")")cmpops={"Eq":"==","NotEq":"!=","Lt":"<","LtE":"<=","Gt":">","GtE":">=","Is":"is","IsNot":"is not","In":"in","NotIn":"not in",}def_Compare(self,t):self.write("(")self.dispatch(t.left)foro,einzip(t.ops,t.comparators):self.write(" "+self.cmpops[o.__class__.__name__]+" ")self.dispatch(e)self.write(")")boolops={ast.And:"and",ast.Or:"or"}def_BoolOp(self,t):self.write("(")s=" %s "%self.boolops[t.op.__class__]interleave(lambda:self.write(s),self.dispatch,t.values)self.write(")")def_Attribute(self,t):self.dispatch(t.value)# Special case: 3.__abs__() is a syntax error, so if t.value# is an integer literal then we need to either parenthesize# it or add an extra space to get 3 .__abs__().ifisinstance(t.value,ast.Constant)andisinstance(t.value.value,int):self.write(" ")self.write(".")self.write(t.attr)def_Call(self,t):self.dispatch(t.func)self.write("(")comma=Falseforeint.args:ifcomma:self.write(", ")else:comma=Trueself.dispatch(e)foreint.keywords:ifcomma:self.write(", ")else:comma=Trueself.dispatch(e)self.write(")")def_Subscript(self,t):self.dispatch(t.value)self.write("[")if(isinstance(t.slice,ast.Index)andisinstance(t.slice.value,ast.Tuple)andt.slice.value.elts):iflen(t.slice.value.elts)==1:elt=t.slice.value.elts[0]self.dispatch(elt)self.write(",")else:interleave(lambda:self.write(", "),self.dispatch,t.slice.value.elts)else:self.dispatch(t.slice)self.write("]")def_Starred(self,t):self.write("*")self.dispatch(t.value)# slicedef_Ellipsis(self,t):self.write("...")def_Index(self,t):self.dispatch(t.value)def_Slice(self,t):ift.lower:self.dispatch(t.lower)self.write(":")ift.upper:self.dispatch(t.upper)ift.step:self.write(":")self.dispatch(t.step)def_ExtSlice(self,t):iflen(t.dims)==1:elt=t.dims[0]self.dispatch(elt)self.write(",")else:interleave(lambda:self.write(", "),self.dispatch,t.dims)# argumentdef_arg(self,t):self.write(t.arg)ift.annotation:self.write(": ")self.dispatch(t.annotation)# othersdef_arguments(self,t):first=True# normal argumentsall_args=t.posonlyargs+t.argsdefaults=[None]*(len(all_args)-len(t.defaults))+t.defaultsforindex,elementsinenumerate(zip(all_args,defaults),1):a,d=elementsiffirst:first=Falseelse:self.write(", ")self.dispatch(a)ifd:self.write("=")self.dispatch(d)ifindex==len(t.posonlyargs):self.write(", /")# varargs, or bare '*' if no varargs but keyword-only arguments presentift.varargort.kwonlyargs:iffirst:first=Falseelse:self.write(", ")self.write("*")ift.vararg:self.write(t.vararg.arg)ift.vararg.annotation:self.write(": ")self.dispatch(t.vararg.annotation)# keyword-only argumentsift.kwonlyargs:fora,dinzip(t.kwonlyargs,t.kw_defaults):iffirst:first=Falseelse:self.write(", ")self.dispatch(a)ifd:self.write("=")self.dispatch(d)# kwargsift.kwarg:iffirst:first=Falseelse:self.write(", ")self.write("**"+t.kwarg.arg)ift.kwarg.annotation:self.write(": ")self.dispatch(t.kwarg.annotation)def_keyword(self,t):ift.argisNone:self.write("**")else:self.write(t.arg)self.write("=")self.dispatch(t.value)def_Lambda(self,t):self.write("(")self.write("lambda ")self.dispatch(t.args)self.write(": ")self.dispatch(t.body)self.write(")")def_alias(self,t):self.write(t.name)ift.asname:self.write(" as "+t.asname)def_withitem(self,t):self.dispatch(t.context_expr)ift.optional_vars:self.write(" as ")self.dispatch(t.optional_vars)
[docs]defroundtrip(filename,output=sys.stdout):"""Parse a file and pretty-print it to output. The output is formatted as valid Python source code. Args: filename: The name of the file to parse. output: The output stream to write to. """withopen(filename,"rb")aspyfile:encoding=tokenize.detect_encoding(pyfile.readline)[0]withopen(filename,"r",encoding=encoding)aspyfile:source=pyfile.read()tree=compile(source,filename,"exec",ast.PyCF_ONLY_AST)Unparser(tree,output)