[docs]classCohereToolsReactAgentOutputParser(BaseOutputParser[Union[List[AgentAction],AgentFinish]]):"""Parses a message into agent actions/finish."""
[docs]defparse(self,text:str)->Union[List[AgentAction],AgentFinish]:# Parse the structured output of the final answer.if"Answer: "intext:prefix_map={"answer":"Answer:","grounded_answer":"Grounded answer:","relevant_docs":"Relevant Documents:","cited_docs":"Cited Documents:",}parsed_answer=parse_answer_with_prefixes(text,prefix_map)returnAgentFinish(return_values={OUTPUT_KEY:parsed_answer["answer"],GROUNDED_ANSWER_KEY:parsed_answer["grounded_answer"],},log=text,)elifany([xintextforxin["Plan: ","Reflection: ","Action: "]]):completion,plan,actions=parse_actions(text)agent_actions:List[AgentAction]=[]fori,actioninenumerate(actions):agent_action=AgentActionMessageLog(tool=action["tool_name"],tool_input=action["parameters"],log=f"\n{action}\n"ifi>0elsef"\n{plan}\n{action}\n",message_log=[AIMessage(content="\n"+completion)],)agent_actions.append(agent_action)returnagent_actionselse:raiseValueError("\nCould not parse generation as it did not contain Plan, Reflection,"+f"Action, or Answer. Input: {text}\n\n")
[docs]defparse_jsonified_tool_use_generation(tool_use_generation:str,tool_use_prefix:str)->List[Dict]:"""Parses model-generated jsonified actions. Expects input of the form "{tool_use_prefix}: ```json\n[{list of jsonified objects}]```" outputs parsed list of jsonified objects. """def_extract_codeblocks_from_md(text:str)->List[str]:returnre.findall(r"`{3}([^`]*)`{0,3}",text)raw_generation=re.sub(f"^{tool_use_prefix} ","",tool_use_generation)code_block_sections=_extract_codeblocks_from_md(raw_generation)iflen(code_block_sections)!=1:# should have exactly 1 code blockraiseValueError(f"Action Parsing Failed: {tool_use_generation}")# only json allowed:assertcode_block_sections[0].startswith("json\n"),f"Action Parsing Failed: {tool_use_generation}"actions=json.loads(re.sub("^json\n","",code_block_sections[0]))ifnotisinstance(actions,list):raiseValueError(f"Action Parsing Failed: {tool_use_generation}")iflen(actions):ifany(notisinstance(action,Dict)or"tool_name"notinactionforactioninactions):raiseValueError(f"Action Parsing Failed: {tool_use_generation}")returnactions
[docs]defparse_answer_with_prefixes(completion:str,prefixes:Dict[str,str])->Dict[str,str]:"""parses string into key-value pairs, according to patterns supplied in prefixes. Also strips. if inputs are: completion = "\nhello: sam\ngoodbye then: paul.", prefixes = {"greeting": "hello:", "farewell": "goodbye then:"} the expected returned result is: {"greeting": "sam", "farewell": "paul."} Args: completion (str): text to split prefixes (Dict[str, str]): a key-value dict of keys and patterns. See example above Returns: Dict[str, str]: parsed result """# sort out prefixesre_pat="("+"|".join([re.escape(p)forpinprefixes.values()])+")"reverse_prefix_map={v:kfork,vinprefixes.items()}split=re.split(re_pat,completion)split=split[1:]parsed={}forprefix,valueinzip(split[::2],split[1::2]):ifprefixinreverse_prefix_map:# if the prefix is a matchif(reverse_prefix_map[prefix]notinparsed):# first occurrence of a prefix is kept, others discardedparsed[reverse_prefix_map[prefix]]=value.strip()returnparsed
[docs]defparse_actions(generation:str)->Tuple[str,str,List[Dict]]:"""Parse action selections from model output."""plan=""generation=generation.strip()actions=generationtry:if"Plan: "ingenerationor"Reflection: "ingeneration:# Model is trained to output a Plan or Reflection followed by an action.# Use regex to extract the plan and action.regex=r"^(Plan|Reflection)\s*\d*\s*:(.*?)(Action\s*\d*\s*:\s*\d*\s*```json\n.*?```)"# noqa: E501action_match=re.search(regex,generation,re.DOTALL)ifnotaction_match:raiseValueError(f"Failed to parse multihop completion for input: {generation}")plan=action_match.group(2).strip()actions=action_match.group(3).strip()else:# Catch the case where model outputs only an action.regex=r"^(Action\s*\d*\s*:\s*\d*\s*```json\n.*?```)"action_match=re.search(regex,generation,re.DOTALL)ifnotaction_match:raiseValueError(f"Failed to parse multihop completion for input: {generation}")actions=action_match.group(1).strip()exceptExceptionase:logging.error(f"Failed to parse multihop completion for input: {generation}")logging.error(f"Error: {e}")parsed_actions=parse_jsonified_tool_use_generation(actions,"Action:")returngeneration,plan,parsed_actions
[docs]defparse_citations(grounded_answer:str,documents:List[MutableMapping])->Tuple[str,List[CohereCitation]]:""" Parses a grounded_generation (from parse_actions) and documents (from convert_to_documents) into a (generation, CohereCitation list) tuple. """no_markup_answer,parsed_answer=_parse_answer_spans(grounded_answer)citations:List[CohereCitation]=[]start=0# Add an id field to each document. This may be useful for future deduplication.foriinrange(len(documents)):documents[i]["id"]=documents[i].get("id")orf"doc_{i}"foranswerinparsed_answer:text=answer.get("text","")document_indexes=answer.get("cited_docs")ifnotdocument_indexes:# There were no citations for this piece of text.start+=len(text)continueend=start+len(text)# Look up the cited document by indexcited_documents:List[Mapping]=[]cited_document_ids:List[str]=[]forindexinset(document_indexes):ifindex>=len(documents):# The document index doesn't existcontinuecited_documents.append(documents[index])cited_document_ids.append(documents[index]["id"])citations.append(CohereCitation(start=start,end=end,text=text,documents=cited_documents,document_ids=set(cited_document_ids),))start=endreturnno_markup_answer,citations
def_strip_spans(answer:str)->str:"""removes any <co> tags from a string, including trailing partial tags input: "hi my <co>name</co> is <co: 1> patrick</co:3> and <co" output: "hi my name is patrick and" Args: answer (str): string Returns: str: same string with co tags removed """answer=re.sub(r"<co(.*?)>|</co(.*?)>","",answer)idx=answer.find("<co")ifidx>-1:answer=answer[:idx]idx=answer.find("</")ifidx>-1:answer=answer[:idx]returnanswerdef_parse_answer_spans(grounded_answer:str)->Tuple[str,List[Dict[str,Any]]]:actual_cites=[]forcinre.findall(r"<co:(.*?)>",grounded_answer):actual_cites.append(c.strip().split(","))no_markup_answer=_strip_spans(grounded_answer)current_idx=0parsed_answer:List[Dict[str,Union[str,List[int]]]]=[]cited_docs_set=[]last_entry_is_open_cite=Falseparsed_current_cite_document_idxs:List[int]=[]whilecurrent_idx<len(grounded_answer):current_cite=re.search(r"<co: (.*?)>",grounded_answer[current_idx:])ifcurrent_cite:# previous partparsed_answer.append({"text":grounded_answer[current_idx:current_idx+current_cite.start()]})current_cite_document_idxs=current_cite.group(1).split(",")parsed_current_cite_document_idxs=[]forcited_idxincurrent_cite_document_idxs:ifcited_idx.isdigit():cited_idx=int(cited_idx.strip())parsed_current_cite_document_idxs.append(cited_idx)ifcited_idxnotincited_docs_set:cited_docs_set.append(cited_idx)current_idx+=current_cite.end()current_cite_close=re.search(r"</co: "+current_cite.group(1)+">",grounded_answer[current_idx:])ifcurrent_cite_close:# there might have been issues parsing the ids, so we need to check# that they are actually ints and availableiflen(parsed_current_cite_document_idxs)>0:pt=grounded_answer[current_idx:current_idx+current_cite_close.start()]parsed_answer.append({"text":pt,"cited_docs":parsed_current_cite_document_idxs})else:parsed_answer.append({"text":grounded_answer[current_idx:current_idx+current_cite_close.start()],})current_idx+=current_cite_close.end()else:last_entry_is_open_cite=Truebreakelse:break# don't forget about the last oneiflast_entry_is_open_cite:pt=_strip_spans(grounded_answer[current_idx:])parsed_answer.append({"text":pt,"cited_docs":parsed_current_cite_document_idxs})else:parsed_answer.append({"text":_strip_spans(grounded_answer[current_idx:])})returnno_markup_answer,parsed_answer