Source code for langchain.chains.combine_documents.map_reduce
"""Combining documents by mapping a chain over them first, then combining results."""from__future__importannotationsfromtypingimportAny,Dict,List,Optional,Tuple,Typefromlangchain_core._apiimportdeprecatedfromlangchain_core.callbacksimportCallbacksfromlangchain_core.documentsimportDocumentfromlangchain_core.runnables.configimportRunnableConfigfromlangchain_core.runnables.utilsimportcreate_modelfrompydanticimportBaseModel,ConfigDict,model_validatorfromlangchain.chains.combine_documents.baseimportBaseCombineDocumentsChainfromlangchain.chains.combine_documents.reduceimportReduceDocumentsChainfromlangchain.chains.llmimportLLMChain
[docs]@deprecated(since="0.3.1",removal="1.0",message=("This class is deprecated. Please see the migration guide here for ""a recommended replacement: ""https://python.langchain.com/docs/versions/migrating_chains/map_reduce_chain/"),)classMapReduceDocumentsChain(BaseCombineDocumentsChain):"""Combining documents by mapping a chain over them, then combining results. We first call `llm_chain` on each document individually, passing in the `page_content` and any other kwargs. This is the `map` step. We then process the results of that `map` step in a `reduce` step. This should likely be a ReduceDocumentsChain. Example: .. code-block:: python from langchain.chains import ( StuffDocumentsChain, LLMChain, ReduceDocumentsChain, MapReduceDocumentsChain, ) from langchain_core.prompts import PromptTemplate from langchain_community.llms import OpenAI # This controls how each document will be formatted. Specifically, # it will be passed to `format_document` - see that function for more # details. document_prompt = PromptTemplate( input_variables=["page_content"], template="{page_content}" ) document_variable_name = "context" llm = OpenAI() # The prompt here should take as an input variable the # `document_variable_name` prompt = PromptTemplate.from_template( "Summarize this content: {context}" ) llm_chain = LLMChain(llm=llm, prompt=prompt) # We now define how to combine these summaries reduce_prompt = PromptTemplate.from_template( "Combine these summaries: {context}" ) reduce_llm_chain = LLMChain(llm=llm, prompt=reduce_prompt) combine_documents_chain = StuffDocumentsChain( llm_chain=reduce_llm_chain, document_prompt=document_prompt, document_variable_name=document_variable_name ) reduce_documents_chain = ReduceDocumentsChain( combine_documents_chain=combine_documents_chain, ) chain = MapReduceDocumentsChain( llm_chain=llm_chain, reduce_documents_chain=reduce_documents_chain, ) # If we wanted to, we could also pass in collapse_documents_chain # which is specifically aimed at collapsing documents BEFORE # the final call. prompt = PromptTemplate.from_template( "Collapse this content: {context}" ) llm_chain = LLMChain(llm=llm, prompt=prompt) collapse_documents_chain = StuffDocumentsChain( llm_chain=llm_chain, document_prompt=document_prompt, document_variable_name=document_variable_name ) reduce_documents_chain = ReduceDocumentsChain( combine_documents_chain=combine_documents_chain, collapse_documents_chain=collapse_documents_chain, ) chain = MapReduceDocumentsChain( llm_chain=llm_chain, reduce_documents_chain=reduce_documents_chain, ) """llm_chain:LLMChain"""Chain to apply to each document individually."""reduce_documents_chain:BaseCombineDocumentsChain"""Chain to use to reduce the results of applying `llm_chain` to each doc. This typically either a ReduceDocumentChain or StuffDocumentChain."""document_variable_name:str"""The variable name in the llm_chain to put the documents in. If only one variable in the llm_chain, this need not be provided."""return_intermediate_steps:bool=False"""Return the results of the map steps in the output."""defget_output_schema(self,config:Optional[RunnableConfig]=None)->Type[BaseModel]:ifself.return_intermediate_steps:returncreate_model("MapReduceDocumentsOutput",**{self.output_key:(str,None),"intermediate_steps":(List[str],None),},# type: ignore[call-overload])returnsuper().get_output_schema(config)@propertydefoutput_keys(self)->List[str]:"""Expect input key. :meta private: """_output_keys=super().output_keysifself.return_intermediate_steps:_output_keys=_output_keys+["intermediate_steps"]return_output_keysmodel_config=ConfigDict(arbitrary_types_allowed=True,extra="forbid",)@model_validator(mode="before")@classmethoddefget_reduce_chain(cls,values:Dict)->Any:"""For backwards compatibility."""if"combine_document_chain"invalues:if"reduce_documents_chain"invalues:raiseValueError("Both `reduce_documents_chain` and `combine_document_chain` ""cannot be provided at the same time. `combine_document_chain` ""is deprecated, please only provide `reduce_documents_chain`")combine_chain=values["combine_document_chain"]collapse_chain=values.get("collapse_document_chain")reduce_chain=ReduceDocumentsChain(combine_documents_chain=combine_chain,collapse_documents_chain=collapse_chain,)values["reduce_documents_chain"]=reduce_chaindelvalues["combine_document_chain"]if"collapse_document_chain"invalues:delvalues["collapse_document_chain"]returnvalues@model_validator(mode="before")@classmethoddefget_return_intermediate_steps(cls,values:Dict)->Any:"""For backwards compatibility."""if"return_map_steps"invalues:values["return_intermediate_steps"]=values["return_map_steps"]delvalues["return_map_steps"]returnvalues@model_validator(mode="before")@classmethoddefget_default_document_variable_name(cls,values:Dict)->Any:"""Get default document variable name, if not provided."""if"llm_chain"notinvalues:raiseValueError("llm_chain must be provided")llm_chain_variables=values["llm_chain"].prompt.input_variablesif"document_variable_name"notinvalues:iflen(llm_chain_variables)==1:values["document_variable_name"]=llm_chain_variables[0]else:raiseValueError("document_variable_name must be provided if there are ""multiple llm_chain input_variables")else:ifvalues["document_variable_name"]notinllm_chain_variables:raiseValueError(f"document_variable_name {values['document_variable_name']} was "f"not found in llm_chain input_variables: {llm_chain_variables}")returnvalues@propertydefcollapse_document_chain(self)->BaseCombineDocumentsChain:"""Kept for backward compatibility."""ifisinstance(self.reduce_documents_chain,ReduceDocumentsChain):ifself.reduce_documents_chain.collapse_documents_chain:returnself.reduce_documents_chain.collapse_documents_chainelse:returnself.reduce_documents_chain.combine_documents_chainelse:raiseValueError(f"`reduce_documents_chain` is of type "f"{type(self.reduce_documents_chain)} so it does not have "f"this attribute.")@propertydefcombine_document_chain(self)->BaseCombineDocumentsChain:"""Kept for backward compatibility."""ifisinstance(self.reduce_documents_chain,ReduceDocumentsChain):returnself.reduce_documents_chain.combine_documents_chainelse:raiseValueError(f"`reduce_documents_chain` is of type "f"{type(self.reduce_documents_chain)} so it does not have "f"this attribute.")
[docs]defcombine_docs(self,docs:List[Document],token_max:Optional[int]=None,callbacks:Callbacks=None,**kwargs:Any,)->Tuple[str,dict]:"""Combine documents in a map reduce manner. Combine by mapping first chain over all documents, then reducing the results. This reducing can be done recursively if needed (if there are many documents). """map_results=self.llm_chain.apply(# FYI - this is parallelized and so it is fast.[{self.document_variable_name:d.page_content,**kwargs}fordindocs],callbacks=callbacks,)question_result_key=self.llm_chain.output_keyresult_docs=[Document(page_content=r[question_result_key],metadata=docs[i].metadata)# This uses metadata from the docs, and the textual results from `results`fori,rinenumerate(map_results)]result,extra_return_dict=self.reduce_documents_chain.combine_docs(result_docs,token_max=token_max,callbacks=callbacks,**kwargs)ifself.return_intermediate_steps:intermediate_steps=[r[question_result_key]forrinmap_results]extra_return_dict["intermediate_steps"]=intermediate_stepsreturnresult,extra_return_dict
[docs]asyncdefacombine_docs(self,docs:List[Document],token_max:Optional[int]=None,callbacks:Callbacks=None,**kwargs:Any,)->Tuple[str,dict]:"""Combine documents in a map reduce manner. Combine by mapping first chain over all documents, then reducing the results. This reducing can be done recursively if needed (if there are many documents). """map_results=awaitself.llm_chain.aapply(# FYI - this is parallelized and so it is fast.[{**{self.document_variable_name:d.page_content},**kwargs}fordindocs],callbacks=callbacks,)question_result_key=self.llm_chain.output_keyresult_docs=[Document(page_content=r[question_result_key],metadata=docs[i].metadata)# This uses metadata from the docs, and the textual results from `results`fori,rinenumerate(map_results)]result,extra_return_dict=awaitself.reduce_documents_chain.acombine_docs(result_docs,token_max=token_max,callbacks=callbacks,**kwargs)ifself.return_intermediate_steps:intermediate_steps=[r[question_result_key]forrinmap_results]extra_return_dict["intermediate_steps"]=intermediate_stepsreturnresult,extra_return_dict