importreimportxmlimportxml.etree.ElementTreeasETfromtypingimportAny,AsyncIterator,Dict,Iterator,List,Literal,Optional,Unionfromxml.etree.ElementTreeimportTreeBuilderfromlangchain_core.exceptionsimportOutputParserExceptionfromlangchain_core.messagesimportBaseMessagefromlangchain_core.output_parsers.transformimportBaseTransformOutputParserfromlangchain_core.runnables.utilsimportAddableDictXML_FORMAT_INSTRUCTIONS="""The output should be formatted as a XML file.1. Output should conform to the tags below. 2. If tags are not given, make them on your own.3. Remember to always open and close all the tags.As an example, for the tags ["foo", "bar", "baz"]:1. String "<foo>\n <bar>\n <baz></baz>\n </bar>\n</foo>" is a well-formatted instance of the schema. 2. String "<foo>\n <bar>\n </foo>" is a badly-formatted instance.3. String "<foo>\n <tag>\n </tag>\n</foo>" is a badly-formatted instance.Here are the output tags:```{tags}```"""# noqa: E501class_StreamingParser:"""Streaming parser for XML. This implementation is pulled into a class to avoid implementation drift between transform and atransform of the XMLOutputParser. """def__init__(self,parser:Literal["defusedxml","xml"])->None:"""Initialize the streaming parser. Args: parser: Parser to use for XML parsing. Can be either 'defusedxml' or 'xml'. See documentation in XMLOutputParser for more information. Raises: ImportError: If defusedxml is not installed and the defusedxml parser is requested. """ifparser=="defusedxml":try:fromdefusedxmlimportElementTreeasDET# type: ignoreexceptImportErrorase:raiseImportError("defusedxml is not installed. ""Please install it to use the defusedxml parser.""You can install it with `pip install defusedxml` ")frome_parser=DET.DefusedXMLParser(target=TreeBuilder())else:_parser=Noneself.pull_parser=ET.XMLPullParser(["start","end"],_parser=_parser)self.xml_start_re=re.compile(r"<[a-zA-Z:_]")self.current_path:List[str]=[]self.current_path_has_children=Falseself.buffer=""self.xml_started=Falsedefparse(self,chunk:Union[str,BaseMessage])->Iterator[AddableDict]:"""Parse a chunk of text. Args: chunk: A chunk of text to parse. This can be a string or a BaseMessage. Yields: AddableDict: A dictionary representing the parsed XML element. Raises: xml.etree.ElementTree.ParseError: If the XML is not well-formed. """ifisinstance(chunk,BaseMessage):# extract textchunk_content=chunk.contentifnotisinstance(chunk_content,str):# ignore non-string messages (e.g., function calls)returnchunk=chunk_content# add chunk to buffer of unprocessed textself.buffer+=chunk# if xml string hasn't started yet, continue to next chunkifnotself.xml_started:ifmatch:=self.xml_start_re.search(self.buffer):# if xml string has started, remove all text before itself.buffer=self.buffer[match.start():]self.xml_started=Trueelse:return# feed buffer to parserself.pull_parser.feed(self.buffer)self.buffer=""# yield all eventstry:forevent,eleminself.pull_parser.read_events():ifevent=="start":# update current pathself.current_path.append(elem.tag)self.current_path_has_children=Falseelifevent=="end":# remove last element from current path#self.current_path.pop()# yield elementifnotself.current_path_has_children:yieldnested_element(self.current_path,elem)# prevent yielding of parent elementifself.current_path:self.current_path_has_children=Trueelse:self.xml_started=Falseexceptxml.etree.ElementTree.ParseError:# This might be junk at the end of the XML input.# Let's check whether the current path is empty.ifnotself.current_path:# If it is empty, we can ignore this error.returnelse:raisedefclose(self)->None:"""Close the parser. This should be called after all chunks have been parsed. Raises: xml.etree.ElementTree.ParseError: If the XML is not well-formed. """try:self.pull_parser.close()exceptxml.etree.ElementTree.ParseError:# Ignore. This will ignore any incomplete XML at the end of the inputpass
[docs]classXMLOutputParser(BaseTransformOutputParser):"""Parse an output using xml format."""tags:Optional[List[str]]=Noneencoding_matcher:re.Pattern=re.compile(r"<([^>]*encoding[^>]*)>\n(.*)",re.MULTILINE|re.DOTALL)parser:Literal["defusedxml","xml"]="defusedxml""""Parser to use for XML parsing. Can be either 'defusedxml' or 'xml'. * 'defusedxml' is the default parser and is used to prevent XML vulnerabilities present in some distributions of Python's standard library xml. `defusedxml` is a wrapper around the standard library parser that sets up the parser with secure defaults. * 'xml' is the standard library parser. Use `xml` only if you are sure that your distribution of the standard library is not vulnerable to XML vulnerabilities. Please review the following resources for more information: * https://docs.python.org/3/library/xml.html#xml-vulnerabilities * https://github.com/tiran/defusedxml The standard library relies on libexpat for parsing XML: https://github.com/libexpat/libexpat """
[docs]defget_format_instructions(self)->str:"""Return the format instructions for the XML output."""returnXML_FORMAT_INSTRUCTIONS.format(tags=self.tags)
[docs]defparse(self,text:str)->Dict[str,Union[str,List[Any]]]:"""Parse the output of an LLM call. Args: text: The output of an LLM call. Returns: A dictionary representing the parsed XML. Raises: OutputParserException: If the XML is not well-formed. ImportError: If defusedxml is not installed and the defusedxml parser is requested. """# Try to find XML string within triple backticks# Imports are temporarily placed here to avoid issue with caching on CI# likely if you're reading this you can move them to the top of the fileifself.parser=="defusedxml":try:fromdefusedxmlimportElementTreeasDET# type: ignoreexceptImportErrorase:raiseImportError("defusedxml is not installed. ""Please install it to use the defusedxml parser.""You can install it with `pip install defusedxml`""See https://github.com/tiran/defusedxml for more details")frome_ET=DET# Use the defusedxml parserelse:_ET=ET# Use the standard library parsermatch=re.search(r"```(xml)?(.*)```",text,re.DOTALL)ifmatchisnotNone:# If match found, use the content within the backtickstext=match.group(2)encoding_match=self.encoding_matcher.search(text)ifencoding_match:text=encoding_match.group(2)text=text.strip()try:root=ET.fromstring(text)returnself._root_to_dict(root)exceptET.ParseErrorase:msg=f"Failed to parse XML format from completion {text}. Got: {e}"raiseOutputParserException(msg,llm_output=text)frome
def_transform(self,input:Iterator[Union[str,BaseMessage]])->Iterator[AddableDict]:streaming_parser=_StreamingParser(self.parser)forchunkininput:yield fromstreaming_parser.parse(chunk)streaming_parser.close()asyncdef_atransform(self,input:AsyncIterator[Union[str,BaseMessage]])->AsyncIterator[AddableDict]:streaming_parser=_StreamingParser(self.parser)asyncforchunkininput:foroutputinstreaming_parser.parse(chunk):yieldoutputstreaming_parser.close()def_root_to_dict(self,root:ET.Element)->Dict[str,Union[str,List[Any]]]:"""Converts xml tree to python dictionary."""ifroot.textandbool(re.search(r"\S",root.text)):# If root text contains any non-whitespace character it# returns {root.tag: root.text}return{root.tag:root.text}result:Dict={root.tag:[]}forchildinroot:iflen(child)==0:result[root.tag].append({child.tag:child.text})else:result[root.tag].append(self._root_to_dict(child))returnresult@propertydef_type(self)->str:return"xml"
[docs]defnested_element(path:List[str],elem:ET.Element)->Any:"""Get nested element from path. Args: path: The path to the element. elem: The element to extract. Returns: The nested element. """iflen(path)==0:returnAddableDict({elem.tag:elem.text})else:returnAddableDict({path[0]:[nested_element(path[1:],elem)]})