Source code for langchain.smith.evaluation.runner_utils
"""Utilities for running language models or Chains over datasets."""from__future__importannotationsimportconcurrent.futuresimportdataclassesimportfunctoolsimportinspectimportloggingimportuuidfromdatetimeimportdatetime,timezonefromtypingimport(TYPE_CHECKING,Any,Callable,Dict,List,Optional,Tuple,Union,cast,)fromlangchain_core._apiimportwarn_deprecatedfromlangchain_core.callbacks.managerimportCallbacksfromlangchain_core.language_modelsimportBaseLanguageModelfromlangchain_core.messagesimportBaseMessage,messages_from_dictfromlangchain_core.outputsimportChatResult,LLMResultfromlangchain_core.runnablesimportRunnable,RunnableConfig,RunnableLambdafromlangchain_core.runnablesimportconfigasrunnable_configfromlangchain_core.runnablesimportutilsasrunnable_utilsfromlangchain_core.tracers.evaluationimport(EvaluatorCallbackHandler,wait_for_all_evaluators,)fromlangchain_core.tracers.langchainimportLangChainTracerfromlangsmith.clientimportClientfromlangsmith.envimportget_git_info,get_langchain_env_var_metadatafromlangsmith.evaluationimport(EvaluationResult,RunEvaluator,)fromlangsmith.evaluationimport(run_evaluatorasrun_evaluator_dec,)fromlangsmith.run_helpersimportas_runnable,is_traceable_functionfromlangsmith.schemasimportDataset,DataType,Example,Run,TracerSessionfromlangsmith.utilsimportLangSmithErrorfromrequestsimportHTTPErrorfromtyping_extensionsimportTypedDictfromlangchain.chains.baseimportChainfromlangchain.evaluation.loadingimportload_evaluatorfromlangchain.evaluation.schemaimport(EvaluatorType,PairwiseStringEvaluator,StringEvaluator,)fromlangchain.smithimportevaluationassmith_evalfromlangchain.smith.evaluationimportconfigassmith_eval_configfromlangchain.smith.evaluationimportname_generation,progressifTYPE_CHECKING:importpandasaspdlogger=logging.getLogger(__name__)MODEL_OR_CHAIN_FACTORY=Union[Callable[[],Union[Chain,Runnable]],BaseLanguageModel,Callable[[dict],Any],Runnable,Chain,]MCF=Union[Callable[[],Union[Chain,Runnable]],BaseLanguageModel]
[docs]classInputFormatError(Exception):"""Raised when the input format is invalid."""
## Shared Utilities
[docs]classTestResult(dict):"""A dictionary of the results of a single test run."""
[docs]defget_aggregate_feedback(self,)->pd.DataFrame:"""Return quantiles for the feedback scores. This method calculates and prints the quantiles for the feedback scores across all feedback keys. Returns: A DataFrame containing the quantiles for each feedback key. """df=self.to_dataframe()# Drop all things starting with inputs., outputs., and referenceto_drop=[colforcolindf.columnsifcol.startswith("inputs.")orcol.startswith("outputs.")orcolin{"input","output"}orcol.startswith("reference")]returndf.describe(include="all").drop(to_drop,axis=1)
[docs]defto_dataframe(self)->pd.DataFrame:"""Convert the results to a dataframe."""try:importpandasaspdexceptImportErrorase:raiseImportError("Pandas is required to convert the results to a dataframe."" to install pandas, run `pip install pandas`.")fromeindices=[]records=[]forexample_id,resultinself["results"].items():feedback=result["feedback"]output_=result.get("output")ifisinstance(output_,dict):output={f"outputs.{k}":vfork,vinoutput_.items()}elifoutput_isNone:output={}else:output={"output":output_}r={**{f"inputs.{k}":vfork,vinresult["input"].items()},**output,}if"reference"inresult:ifisinstance(result["reference"],dict):r.update({f"reference.{k}":vfork,vinresult["reference"].items()})else:r["reference"]=result["reference"]r.update({**{f"feedback.{f.key}":f.scoreforfinfeedback},"error":result.get("Error"),"execution_time":result["execution_time"],"run_id":result.get("run_id"),})records.append(r)indices.append(example_id)returnpd.DataFrame(records,index=indices)
[docs]classEvalError(dict):"""Your architecture raised an error."""
def__getattr__(self,name:str)->Any:try:returnself[name]exceptKeyError:raiseAttributeError(f"'EvalError' object has no attribute '{name}'")
def_wrap_in_chain_factory(llm_or_chain_factory:MODEL_OR_CHAIN_FACTORY,dataset_name:str="<my_dataset>",)->MCF:"""Forgive the user if they pass in a chain without memory instead of a chain factory. It's a common mistake. Raise a more helpful error message as well."""ifisinstance(llm_or_chain_factory,Chain):chain=llm_or_chain_factorychain_class=chain.__class__.__name__ifllm_or_chain_factory.memoryisnotNone:memory_class=chain.memory.__class__.__name__raiseValueError("Cannot directly evaluate a chain with stateful memory."" To evaluate this chain, pass in a chain constructor"" that initializes fresh memory each time it is called."" This will safegaurd against information"" leakage between dataset examples.""\nFor example:\n\n""def chain_constructor():\n"f" new_memory = {memory_class}(...)\n"f" return {chain_class}""(memory=new_memory, ...)\n\n"f'run_on_dataset("{dataset_name}", chain_constructor, ...)')returnlambda:chainelifisinstance(llm_or_chain_factory,BaseLanguageModel):returnllm_or_chain_factoryelifisinstance(llm_or_chain_factory,Runnable):# Memory may exist here, but it's not elegant to check all those cases.lcf=llm_or_chain_factoryreturnlambda:lcfelifcallable(llm_or_chain_factory):ifis_traceable_function(llm_or_chain_factory):runnable_=as_runnable(cast(Callable,llm_or_chain_factory))returnlambda:runnable_try:_model=llm_or_chain_factory()# type: ignore[call-arg]exceptTypeError:# It's an arbitrary function, wrap it in a RunnableLambdauser_func=cast(Callable,llm_or_chain_factory)sig=inspect.signature(user_func)logger.info(f"Wrapping function {sig} as RunnableLambda.")wrapped=RunnableLambda(user_func)returnlambda:wrappedconstructor=cast(Callable,llm_or_chain_factory)ifisinstance(_model,BaseLanguageModel):# It's not uncommon to do an LLM constructor instead of raw LLM,# so we'll unpack it for the user.return_modelelifis_traceable_function(cast(Callable,_model)):runnable_=as_runnable(cast(Callable,_model))returnlambda:runnable_elifnotisinstance(_model,Runnable):# This is unlikely to happen - a constructor for a model functionreturnlambda:RunnableLambda(constructor)else:# Typical correct casereturnconstructorreturnllm_or_chain_factorydef_get_prompt(inputs:Dict[str,Any])->str:"""Get prompt from inputs. Args: inputs: The input dictionary. Returns: A string prompt. Raises: InputFormatError: If the input format is invalid. """ifnotinputs:raiseInputFormatError("Inputs should not be empty.")prompts=[]if"prompt"ininputs:ifnotisinstance(inputs["prompt"],str):raiseInputFormatError(f"Expected string for 'prompt', got {type(inputs['prompt']).__name__}")prompts=[inputs["prompt"]]elif"prompts"ininputs:ifnotisinstance(inputs["prompts"],list)ornotall(isinstance(i,str)foriininputs["prompts"]):raiseInputFormatError("Expected list of strings for 'prompts',"f" got {type(inputs['prompts']).__name__}")prompts=inputs["prompts"]eliflen(inputs)==1:prompt_=next(iter(inputs.values()))ifisinstance(prompt_,str):prompts=[prompt_]elifisinstance(prompt_,list)andall(isinstance(i,str)foriinprompt_):prompts=prompt_else:raiseInputFormatError(f"LLM Run expects string prompt input. Got {inputs}")else:raiseInputFormatError(f"LLM Run expects 'prompt' or 'prompts' in inputs. Got {inputs}")iflen(prompts)==1:returnprompts[0]else:raiseInputFormatError(f"LLM Run expects single prompt input. Got {len(prompts)} prompts.")
[docs]classChatModelInput(TypedDict):"""Input for a chat model. Parameters: messages: List of chat messages. """messages:List[BaseMessage]
def_get_messages(inputs:Dict[str,Any])->dict:"""Get Chat Messages from inputs. Args: inputs: The input dictionary. Returns: A list of chat messages. Raises: InputFormatError: If the input format is invalid. """ifnotinputs:raiseInputFormatError("Inputs should not be empty.")input_copy=inputs.copy()if"messages"ininputs:input_copy["input"]=input_copy.pop("messages")eliflen(inputs)==1:input_copy["input"]=next(iter(inputs.values()))if"input"ininput_copy:raw_messages=input_copy["input"]ifisinstance(raw_messages,list)andall(isinstance(i,dict)foriinraw_messages):raw_messages=[raw_messages]iflen(raw_messages)==1:input_copy["input"]=messages_from_dict(raw_messages[0])else:raiseInputFormatError("Batch messages not supported. Please provide a"" single list of messages.")returninput_copyelse:raiseInputFormatError(f"Chat Run expects single List[dict] or List[List[dict]] 'messages'"f" input. Got {inputs}")## Shared data validation utilitiesdef_validate_example_inputs_for_language_model(first_example:Example,input_mapper:Optional[Callable[[Dict],Any]],)->None:ifinput_mapper:prompt_input=input_mapper(first_example.inputs)ifnotisinstance(prompt_input,str)andnot(isinstance(prompt_input,list)andall(isinstance(msg,BaseMessage)formsginprompt_input)):raiseInputFormatError("When using an input_mapper to prepare dataset example inputs"" for an LLM or chat model, the output must a single string or"" a list of chat messages."f"\nGot: {prompt_input} of type {type(prompt_input)}.")else:try:_get_prompt(first_example.inputs)exceptInputFormatError:try:_get_messages(first_example.inputs)exceptInputFormatError:raiseInputFormatError("Example inputs do not match language model input format. ""Expected a dictionary with messages or a single prompt."f" Got: {first_example.inputs}"" Please update your dataset OR provide an input_mapper"" to convert the example.inputs to a compatible format"" for the llm or chat model you wish to evaluate.")def_validate_example_inputs_for_chain(first_example:Example,chain:Chain,input_mapper:Optional[Callable[[Dict],Any]],)->None:"""Validate that the example inputs match the chain input keys."""ifinput_mapper:first_inputs=input_mapper(first_example.inputs)missing_keys=set(chain.input_keys).difference(first_inputs)ifnotisinstance(first_inputs,dict):raiseInputFormatError("When using an input_mapper to prepare dataset example"" inputs for a chain, the mapped value must be a dictionary."f"\nGot: {first_inputs} of type {type(first_inputs)}.")ifmissing_keys:raiseInputFormatError("Missing keys after loading example using input_mapper."f"\nExpected: {chain.input_keys}. Got: {first_inputs.keys()}")else:first_inputs=first_example.inputsmissing_keys=set(chain.input_keys).difference(first_inputs)iflen(first_inputs)==1andlen(chain.input_keys)==1:# We can pass this through the run method.# Refrain from calling to validate.passelifmissing_keys:raiseInputFormatError("Example inputs missing expected chain input keys."" Please provide an input_mapper to convert the example.inputs"" to a compatible format for the chain you wish to evaluate."f"Expected: {chain.input_keys}. "f"Got: {first_inputs.keys()}")def_validate_example_inputs(example:Example,llm_or_chain_factory:MCF,input_mapper:Optional[Callable[[Dict],Any]],)->None:"""Validate that the example inputs are valid for the model."""ifisinstance(llm_or_chain_factory,BaseLanguageModel):_validate_example_inputs_for_language_model(example,input_mapper)else:chain=llm_or_chain_factory()ifisinstance(chain,Chain):# Otherwise it's a runnable_validate_example_inputs_for_chain(example,chain,input_mapper)elifisinstance(chain,Runnable):logger.debug(f"Skipping input validation for {chain}")## Shared Evaluator Setup Utilitiesdef_setup_evaluation(llm_or_chain_factory:MCF,examples:List[Example],evaluation:Optional[smith_eval.RunEvalConfig],data_type:DataType,)->Optional[List[RunEvaluator]]:"""Configure the evaluators to run on the results of the chain."""ifevaluation:ifisinstance(llm_or_chain_factory,BaseLanguageModel):run_inputs,run_outputs=None,Nonerun_type="llm"else:run_type="chain"chain=llm_or_chain_factory()run_inputs=chain.input_keysifisinstance(chain,Chain)elseNonerun_outputs=chain.output_keysifisinstance(chain,Chain)elseNonerun_evaluators=_load_run_evaluators(evaluation,run_type,data_type,list(examples[0].outputs)ifexamples[0].outputselseNone,run_inputs,run_outputs,)else:# TODO: Create a default helpfulness evaluatorrun_evaluators=Nonereturnrun_evaluatorsdef_determine_input_key(config:smith_eval.RunEvalConfig,run_inputs:Optional[List[str]],)->Optional[str]:input_key=Noneifconfig.input_key:input_key=config.input_keyifrun_inputsandinput_keynotinrun_inputs:logger.warning(f"Input key {input_key} not in chain's specified"f" input keys {run_inputs}. Evaluation behavior may be undefined.")elifrun_inputsandlen(run_inputs)==1:input_key=run_inputs[0]elifrun_inputsisnotNoneandlen(run_inputs)>1:logger.warning(f"Chain expects multiple input keys: {run_inputs},"f" Evaluator is likely to fail. Evaluation behavior may be undefined."" Specify an input_key in the RunEvalConfig to avoid this warning.")returninput_keydef_determine_prediction_key(config:smith_eval.RunEvalConfig,run_outputs:Optional[List[str]],)->Optional[str]:prediction_key=Noneifconfig.prediction_key:prediction_key=config.prediction_keyifrun_outputsandprediction_keynotinrun_outputs:logger.warning(f"Prediction key {prediction_key} not in chain's specified"f" output keys {run_outputs}. Evaluation behavior may be undefined.")elifrun_outputsandlen(run_outputs)==1:prediction_key=run_outputs[0]elifrun_outputsisnotNoneandlen(run_outputs)>1:logger.warning(f"Chain expects multiple output keys: {run_outputs},"f" Evaluation behavior may be undefined. Specify a prediction_key"" in the RunEvalConfig to avoid this warning.")returnprediction_keydef_determine_reference_key(config:smith_eval.RunEvalConfig,example_outputs:Optional[List[str]],)->Optional[str]:ifconfig.reference_key:reference_key=config.reference_keyifexample_outputsandreference_keynotinexample_outputs:raiseValueError(f"Reference key {reference_key} not in Dataset"f" example outputs: {example_outputs}")elifexample_outputsandlen(example_outputs)==1:reference_key=list(example_outputs)[0]else:reference_key=Nonereturnreference_keydef_construct_run_evaluator(eval_config:Union[smith_eval_config.SINGLE_EVAL_CONFIG_TYPE,smith_eval_config.CUSTOM_EVALUATOR_TYPE,],eval_llm:Optional[BaseLanguageModel],run_type:str,data_type:DataType,example_outputs:Optional[List[str]],reference_key:Optional[str],input_key:Optional[str],prediction_key:Optional[str],)->RunEvaluator:ifisinstance(eval_config,RunEvaluator):returneval_configifisinstance(eval_config,(EvaluatorType,str)):ifnotisinstance(eval_config,EvaluatorType):eval_config=EvaluatorType(eval_config)evaluator_=load_evaluator(eval_config,llm=eval_llm)eval_type_tag=eval_config.valueelifisinstance(eval_config,smith_eval_config.EvalConfig):kwargs={"llm":eval_llm,**eval_config.get_kwargs()}evaluator_=load_evaluator(eval_config.evaluator_type,**kwargs)eval_type_tag=eval_config.evaluator_type.value# Override keys if specified in the configifisinstance(eval_config,smith_eval_config.SingleKeyEvalConfig):input_key=eval_config.input_keyorinput_keyprediction_key=eval_config.prediction_keyorprediction_keyreference_key=eval_config.reference_keyorreference_keyelifcallable(eval_config):# Assume we can decoratereturnrun_evaluator_dec(eval_config)else:raiseValueError(f"Unknown evaluator type: {type(eval_config)}")ifisinstance(evaluator_,StringEvaluator):ifevaluator_.requires_referenceandreference_keyisNone:raiseValueError(f"Must specify reference_key in smith_eval.RunEvalConfig to use"f" evaluator of type {eval_type_tag} with"f" dataset with multiple output keys: {example_outputs}.")run_evaluator=smith_eval.StringRunEvaluatorChain.from_run_and_data_type(evaluator_,run_type,data_type,input_key=input_key,prediction_key=prediction_key,reference_key=reference_key,tags=[eval_type_tag],)elifisinstance(evaluator_,PairwiseStringEvaluator):raiseNotImplementedError(f"Run evaluator for {eval_type_tag} is not implemented."" PairwiseStringEvaluators compare the outputs of two different models"" rather than the output of a single model."" Did you mean to use a StringEvaluator instead?""\nSee: https://python.langchain.com/docs/guides/evaluation/string/")else:raiseNotImplementedError(f"Run evaluator for {eval_type_tag} is not implemented")returnrun_evaluatordef_get_keys(config:smith_eval.RunEvalConfig,run_inputs:Optional[List[str]],run_outputs:Optional[List[str]],example_outputs:Optional[List[str]],)->Tuple[Optional[str],Optional[str],Optional[str]]:input_key=_determine_input_key(config,run_inputs)prediction_key=_determine_prediction_key(config,run_outputs)reference_key=_determine_reference_key(config,example_outputs)returninput_key,prediction_key,reference_keydef_load_run_evaluators(config:smith_eval.RunEvalConfig,run_type:str,data_type:DataType,example_outputs:Optional[List[str]],run_inputs:Optional[List[str]],run_outputs:Optional[List[str]],)->List[RunEvaluator]:""" Load run evaluators from a configuration. Args: config: Configuration for the run evaluators. Returns: A list of run evaluators. """run_evaluators=[]input_key,prediction_key,reference_key=None,None,Noneifconfig.evaluatorsor(config.custom_evaluatorsandany([isinstance(e,StringEvaluator)foreinconfig.custom_evaluators])):input_key,prediction_key,reference_key=_get_keys(config,run_inputs,run_outputs,example_outputs)foreval_configinconfig.evaluators:run_evaluator=_construct_run_evaluator(eval_config,config.eval_llm,run_type,data_type,example_outputs,reference_key,input_key,prediction_key,)run_evaluators.append(run_evaluator)custom_evaluators=config.custom_evaluatorsor[]forcustom_evaluatorincustom_evaluators:ifisinstance(custom_evaluator,RunEvaluator):run_evaluators.append(custom_evaluator)elifisinstance(custom_evaluator,StringEvaluator):run_evaluators.append(smith_eval.StringRunEvaluatorChain.from_run_and_data_type(custom_evaluator,run_type,data_type,input_key=input_key,prediction_key=prediction_key,reference_key=reference_key,))elifcallable(custom_evaluator):run_evaluators.append(run_evaluator_dec(custom_evaluator))else:raiseValueError(f"Unsupported custom evaluator: {custom_evaluator}."f" Expected RunEvaluator or StringEvaluator.")returnrun_evaluators### Async Helpersasyncdef_arun_llm(llm:BaseLanguageModel,inputs:Dict[str,Any],*,tags:Optional[List[str]]=None,callbacks:Callbacks=None,input_mapper:Optional[Callable[[Dict],Any]]=None,metadata:Optional[Dict[str,Any]]=None,)->Union[str,BaseMessage]:"""Asynchronously run the language model. Args: llm: The language model to run. inputs: The input dictionary. tags: Optional tags to add to the run. callbacks: Optional callbacks to use during the run. input_mapper: Optional function to map inputs to the expected format. Returns: The LLMResult or ChatResult. Raises: ValueError: If the LLM type is unsupported. InputFormatError: If the input format is invalid. """ifinput_mapperisnotNone:prompt_or_messages=input_mapper(inputs)if(isinstance(prompt_or_messages,str)orisinstance(prompt_or_messages,list)andall(isinstance(msg,BaseMessage)formsginprompt_or_messages)):returnawaitllm.ainvoke(prompt_or_messages,config=RunnableConfig(callbacks=callbacks,tags=tagsor[],metadata=metadataor{}),)else:raiseInputFormatError("Input mapper returned invalid format"f" {prompt_or_messages}""\nExpected a single string or list of chat messages.")else:try:prompt=_get_prompt(inputs)llm_output:Union[str,BaseMessage]=awaitllm.ainvoke(prompt,config=RunnableConfig(callbacks=callbacks,tags=tagsor[],metadata=metadataor{}),)exceptInputFormatError:llm_inputs=_get_messages(inputs)llm_output=awaitllm.ainvoke(**llm_inputs,config=RunnableConfig(callbacks=callbacks,tags=tagsor[],metadata=metadataor{}),)returnllm_outputasyncdef_arun_chain(chain:Union[Chain,Runnable],inputs:Dict[str,Any],callbacks:Callbacks,*,tags:Optional[List[str]]=None,input_mapper:Optional[Callable[[Dict],Any]]=None,metadata:Optional[Dict[str,Any]]=None,)->Union[dict,str]:"""Run a chain asynchronously on inputs."""inputs_=inputsifinput_mapperisNoneelseinput_mapper(inputs)if(isinstance(chain,Chain)andisinstance(inputs_,dict)andlen(inputs_)==1andchain.input_keys):val=next(iter(inputs_.values()))output=awaitchain.ainvoke(val,config=RunnableConfig(callbacks=callbacks,tags=tagsor[],metadata=metadataor{}),)else:runnable_config=RunnableConfig(tags=tagsor[],callbacks=callbacks,metadata=metadataor{})output=awaitchain.ainvoke(inputs_,config=runnable_config)returnoutputasyncdef_arun_llm_or_chain(example:Example,config:RunnableConfig,*,llm_or_chain_factory:MCF,input_mapper:Optional[Callable[[Dict],Any]]=None,)->Union[dict,str,LLMResult,ChatResult]:"""Asynchronously run the Chain or language model. Args: example: The example to run. llm_or_chain_factory: The Chain or language model constructor to run. tags: Optional tags to add to the run. callbacks: Optional callbacks to use during the run. input_mapper: Optional function to map the input to the expected format. Returns: A list of outputs. """chain_or_llm=("LLM"ifisinstance(llm_or_chain_factory,BaseLanguageModel)else"Chain")result=Nonetry:ifisinstance(llm_or_chain_factory,BaseLanguageModel):output:Any=await_arun_llm(llm_or_chain_factory,example.inputs,tags=config["tags"],callbacks=config["callbacks"],input_mapper=input_mapper,metadata=config.get("metadata"),)else:chain=llm_or_chain_factory()output=await_arun_chain(chain,example.inputs,tags=config["tags"],callbacks=config["callbacks"],input_mapper=input_mapper,metadata=config.get("metadata"),)result=outputexceptExceptionase:logger.warning(f"{chain_or_llm} failed for example {example.id} "f"with inputs {example.inputs}"f"\n{repr(e)}")result=EvalError(Error=e)returnresult## Sync Utilitiesdef_run_llm(llm:BaseLanguageModel,inputs:Dict[str,Any],callbacks:Callbacks,*,tags:Optional[List[str]]=None,input_mapper:Optional[Callable[[Dict],Any]]=None,metadata:Optional[Dict[str,Any]]=None,)->Union[str,BaseMessage]:""" Run the language model on the example. Args: llm: The language model to run. inputs: The input dictionary. callbacks: The callbacks to use during the run. tags: Optional tags to add to the run. input_mapper: function to map to the inputs dictionary from an Example Returns: The LLMResult or ChatResult. Raises: ValueError: If the LLM type is unsupported. InputFormatError: If the input format is invalid. """# Most of this is legacy code; we could probably remove a lot of it.ifinput_mapperisnotNone:prompt_or_messages=input_mapper(inputs)if(isinstance(prompt_or_messages,str)orisinstance(prompt_or_messages,list)andall(isinstance(msg,BaseMessage)formsginprompt_or_messages)):llm_output:Union[str,BaseMessage]=llm.invoke(prompt_or_messages,config=RunnableConfig(callbacks=callbacks,tags=tagsor[],metadata=metadataor{}),)else:raiseInputFormatError("Input mapper returned invalid format: "f" {prompt_or_messages}""\nExpected a single string or list of chat messages.")else:try:llm_prompts=_get_prompt(inputs)llm_output=llm.invoke(llm_prompts,config=RunnableConfig(callbacks=callbacks,tags=tagsor[],metadata=metadataor{}),)exceptInputFormatError:llm_inputs=_get_messages(inputs)llm_output=llm.invoke(**llm_inputs,config=RunnableConfig(callbacks=callbacks,metadata=metadataor{}),)returnllm_outputdef_run_chain(chain:Union[Chain,Runnable],inputs:Dict[str,Any],callbacks:Callbacks,*,tags:Optional[List[str]]=None,input_mapper:Optional[Callable[[Dict],Any]]=None,metadata:Optional[Dict[str,Any]]=None,)->Union[Dict,str]:"""Run a chain on inputs."""inputs_=inputsifinput_mapperisNoneelseinput_mapper(inputs)if(isinstance(chain,Chain)andisinstance(inputs_,dict)andlen(inputs_)==1andchain.input_keys):val=next(iter(inputs_.values()))output=chain.invoke(val,config=RunnableConfig(callbacks=callbacks,tags=tagsor[],metadata=metadataor{}),)else:runnable_config=RunnableConfig(tags=tagsor[],callbacks=callbacks,metadata=metadataor{})output=chain.invoke(inputs_,config=runnable_config)returnoutputdef_run_llm_or_chain(example:Example,config:RunnableConfig,*,llm_or_chain_factory:MCF,input_mapper:Optional[Callable[[Dict],Any]]=None,)->Union[dict,str,LLMResult,ChatResult]:""" Run the Chain or language model synchronously. Args: example: The example to run. llm_or_chain_factory: The Chain or language model constructor to run. tags: Optional tags to add to the run. callbacks: Optional callbacks to use during the run. Returns: Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: The outputs of the model or chain. """chain_or_llm=("LLM"ifisinstance(llm_or_chain_factory,BaseLanguageModel)else"Chain")result=Nonetry:ifisinstance(llm_or_chain_factory,BaseLanguageModel):output:Any=_run_llm(llm_or_chain_factory,example.inputs,config["callbacks"],tags=config["tags"],input_mapper=input_mapper,metadata=config.get("metadata"),)else:chain=llm_or_chain_factory()output=_run_chain(chain,example.inputs,config["callbacks"],tags=config["tags"],input_mapper=input_mapper,metadata=config.get("metadata"),)result=outputexceptExceptionase:error_type=type(e).__name__logger.warning(f"{chain_or_llm} failed for example {example.id} "f"with inputs {example.inputs}"f"\nError Type: {error_type}, Message: {e}")result=EvalError(Error=e)returnresultdef_prepare_eval_run(client:Client,dataset_name:str,llm_or_chain_factory:MODEL_OR_CHAIN_FACTORY,project_name:str,project_metadata:Optional[Dict[str,Any]]=None,tags:Optional[List[str]]=None,dataset_version:Optional[Union[str,datetime]]=None,)->Tuple[MCF,TracerSession,Dataset,List[Example]]:wrapped_model=_wrap_in_chain_factory(llm_or_chain_factory,dataset_name)dataset=client.read_dataset(dataset_name=dataset_name)examples=list(client.list_examples(dataset_id=dataset.id,as_of=dataset_version))ifnotexamples:raiseValueError(f"Dataset {dataset_name} has no example rows.")modified_at=[ex.modified_atforexinexamplesifex.modified_at]# Should always be defined in practice when fetched,# but the typing permits Nonemax_modified_at=max(modified_at)ifmodified_atelseNoneinferred_version=max_modified_at.isoformat()ifmax_modified_atelseNonetry:project_metadata=project_metadataor{}git_info=get_git_info()ifgit_info:project_metadata={**project_metadata,"git":git_info,}project_metadata["dataset_version"]=inferred_versionproject=client.create_project(project_name,reference_dataset_id=dataset.id,project_extra={"tags":tags}iftagselse{},metadata=project_metadata,)except(HTTPError,ValueError,LangSmithError)ase:if"already exists "notinstr(e):raiseeuid=uuid.uuid4()example_msg=f"""run_on_dataset( ... project_name="{project_name} - {uid}", # Update since {project_name} already exists)"""raiseValueError(f"Test project {project_name} already exists. Please use a different name:"f"\n\n{example_msg}")comparison_url=dataset.url+f"/compare?selectedSessions={project.id}"print(# noqa: T201f"View the evaluation results for project '{project_name}'"f" at:\n{comparison_url}\n\n"f"View all tests for Dataset {dataset_name} at:\n{dataset.url}",flush=True,)returnwrapped_model,project,dataset,examplesclass_RowResult(TypedDict,total=False):"""A dictionary of the results for a single example row."""feedback:Optional[List[EvaluationResult]]execution_time:Optional[float]run_id:Optional[str]@dataclasses.dataclassclass_DatasetRunContainer:"""A container to help manage the state of a eval run."""client:Clientproject:TracerSessionwrapped_model:MCFexamples:List[Example]configs:List[RunnableConfig]batch_evaluators:Optional[List[smith_eval_config.BATCH_EVALUATOR_LIKE]]=Nonedef_merge_test_outputs(self,batch_results:list,all_eval_results:Dict[str,_RowResult],)->dict:results:dict={}forexample,outputinzip(self.examples,batch_results):row_result=cast(_RowResult,all_eval_results.get(str(example.id),{}))results[str(example.id)]={"input":example.inputs,"feedback":row_result.get("feedback",[]),"execution_time":row_result.get("execution_time"),"run_id":row_result.get("run_id"),}ifisinstance(output,EvalError):results[str(example.id)]["Error"]=output.Errorelse:results[str(example.id)]["output"]=outputifexample.outputs:results[str(example.id)]["reference"]=example.outputsreturnresultsdef_run_batch_evaluators(self,runs:Dict[str,Run])->List[dict]:evaluators=self.batch_evaluatorsifnotevaluators:return[]runs_list=[runs[str(example.id)]forexampleinself.examples]aggregate_feedback=[]withconcurrent.futures.ThreadPoolExecutor()asexecutor:forevaluatorinevaluators:try:result=evaluator(runs_list,self.examples)ifisinstance(result,EvaluationResult):result=result.dict()aggregate_feedback.append(cast(dict,result))executor.submit(self.client.create_feedback,**result,run_id=None,project_id=self.project.id,)exceptExceptionase:logger.error(f"Error running batch evaluator {repr(evaluator)}: {e}")returnaggregate_feedbackdef_collect_metrics(self)->Tuple[Dict[str,_RowResult],Dict[str,Run]]:all_eval_results:dict={}all_runs:dict={}forcinself.configs:forcallbackincast(list,c["callbacks"]):ifisinstance(callback,EvaluatorCallbackHandler):eval_results=callback.logged_eval_resultsfor(_,example_id),vineval_results.items():all_eval_results.setdefault(str(example_id),{}).update({"feedback":v})elifisinstance(callback,LangChainTracer):run=callback.latest_runexecution_time=((run.end_time-run.start_time).total_seconds()ifrunandrun.end_timeelseNone)run_id=str(run.id)ifrunelseNoneall_eval_results.setdefault(str(callback.example_id),{}).update({"execution_time":execution_time,"run_id":run_id,"run":run,})all_runs[str(callback.example_id)]=runreturncast(Dict[str,_RowResult],all_eval_results),all_runsdef_collect_test_results(self,batch_results:List[Union[dict,str,LLMResult,ChatResult]],)->TestResult:logger.info("Waiting for evaluators to complete.")wait_for_all_evaluators()all_eval_results,all_runs=self._collect_metrics()aggregate_feedback=Noneifself.batch_evaluators:logger.info("Running session evaluators.")aggregate_feedback=self._run_batch_evaluators(all_runs)results=self._merge_test_outputs(batch_results,all_eval_results)returnTestResult(project_name=self.project.name,results=results,aggregate_metrics=aggregate_feedback,)deffinish(self,batch_results:list,verbose:bool=False)->TestResult:results=self._collect_test_results(batch_results)ifverbose:try:agg_feedback=results.get_aggregate_feedback()_display_aggregate_results(agg_feedback)exceptExceptionase:logger.debug(f"Failed to print aggregate feedback: {repr(e)}")try:# Closing the project permits name changing and metric optimizationsself.client.update_project(self.project.id,end_time=datetime.now(timezone.utc))exceptExceptionase:logger.debug(f"Failed to close project: {repr(e)}")returnresults@classmethoddefprepare(cls,client:Client,dataset_name:str,llm_or_chain_factory:MODEL_OR_CHAIN_FACTORY,project_name:Optional[str],evaluation:Optional[smith_eval.RunEvalConfig]=None,tags:Optional[List[str]]=None,input_mapper:Optional[Callable[[Dict],Any]]=None,concurrency_level:int=5,project_metadata:Optional[Dict[str,Any]]=None,revision_id:Optional[str]=None,dataset_version:Optional[Union[datetime,str]]=None,)->_DatasetRunContainer:project_name=project_nameorname_generation.random_name()ifrevision_id:ifnotproject_metadata:project_metadata={}project_metadata.update({"revision_id":revision_id})wrapped_model,project,dataset,examples=_prepare_eval_run(client,dataset_name,llm_or_chain_factory,project_name,project_metadata=project_metadata,tags=tags,dataset_version=dataset_version,)tags=tagsor[]fork,vin(project.metadata.get("git")or{}).items():tags.append(f"git:{k}={v}")run_metadata={"dataset_version":project.metadata["dataset_version"]}ifrevision_id:run_metadata["revision_id"]=revision_idwrapped_model=_wrap_in_chain_factory(llm_or_chain_factory)run_evaluators=_setup_evaluation(wrapped_model,examples,evaluation,dataset.data_typeorDataType.kv)_validate_example_inputs(examples[0],wrapped_model,input_mapper)progress_bar=progress.ProgressBarCallback(len(examples))configs=[RunnableConfig(callbacks=[LangChainTracer(project_name=project.name,client=client,example_id=example.id,),EvaluatorCallbackHandler(evaluators=run_evaluatorsor[],client=client,example_id=example.id,max_concurrency=0,),progress_bar,],tags=tags,max_concurrency=concurrency_level,metadata=run_metadata,)forexampleinexamples]returncls(client=client,project=project,wrapped_model=wrapped_model,examples=examples,configs=configs,batch_evaluators=evaluation.batch_evaluatorsifevaluationelseNone,)def_is_jupyter_environment()->bool:try:fromIPythonimportget_ipythonres=get_ipython()returnget_ipython()isnotNoneand"zmqshell"instr(type(res))exceptImportError:returnFalsedef_display_aggregate_results(aggregate_results:pd.DataFrame)->None:if_is_jupyter_environment():fromIPython.displayimportHTML,displaydisplay(HTML("<h3>Experiment Results:</h3>"))display(aggregate_results)else:formatted_string=aggregate_results.to_string(float_format=lambdax:f"{x:.2f}",justify="right")print("\n Experiment Results:")# noqa: T201print(formatted_string)# noqa: T201_INPUT_MAPPER_DEP_WARNING=("The input_mapper argument is deprecated and ""will be removed in a future release. Please add a "" RunnableLambda to your chain to map inputs to the expected format"" instead. Example:\n""def construct_chain():\n"" my_chain = ...\n"" input_mapper = {'other_key': 'MyOtherInput', 'my_input_key': x}\n"" return input_mapper | my_chain\n""run_on_dataset(..., llm_or_chain_factory=construct_chain)\n""(See https://api.python.langchain.com/en/latest/schema/""langchain.schema.runnable.base.RunnableLambda.html)")## Public API
[docs]asyncdefarun_on_dataset(client:Optional[Client],dataset_name:str,llm_or_chain_factory:MODEL_OR_CHAIN_FACTORY,*,evaluation:Optional[smith_eval.RunEvalConfig]=None,dataset_version:Optional[Union[datetime,str]]=None,concurrency_level:int=5,project_name:Optional[str]=None,project_metadata:Optional[Dict[str,Any]]=None,verbose:bool=False,revision_id:Optional[str]=None,**kwargs:Any,)->Dict[str,Any]:input_mapper=kwargs.pop("input_mapper",None)ifinput_mapper:warn_deprecated("0.0.305",message=_INPUT_MAPPER_DEP_WARNING,pending=True)ifrevision_idisNone:revision_id=get_langchain_env_var_metadata().get("revision_id")tags=kwargs.pop("tags",None)iftags:warn_deprecated("0.1.9",message="The tags argument is deprecated and will be"" removed in a future release. Please specify project_metadata instead.",pending=True,)ifkwargs:warn_deprecated("0.0.305",message="The following arguments are deprecated and ""will be removed in a future release: "f"{kwargs.keys()}.",removal="0.0.305",)client=clientorClient()container=_DatasetRunContainer.prepare(client,dataset_name,llm_or_chain_factory,project_name,evaluation,tags,input_mapper,concurrency_level,project_metadata=project_metadata,revision_id=revision_id,dataset_version=dataset_version,)batch_results=awaitrunnable_utils.gather_with_concurrency(container.configs[0].get("max_concurrency"),*map(functools.partial(_arun_llm_or_chain,llm_or_chain_factory=container.wrapped_model,input_mapper=input_mapper,),container.examples,container.configs,),)returncontainer.finish(batch_results,verbose=verbose)
[docs]defrun_on_dataset(client:Optional[Client],dataset_name:str,llm_or_chain_factory:MODEL_OR_CHAIN_FACTORY,*,evaluation:Optional[smith_eval.RunEvalConfig]=None,dataset_version:Optional[Union[datetime,str]]=None,concurrency_level:int=5,project_name:Optional[str]=None,project_metadata:Optional[Dict[str,Any]]=None,verbose:bool=False,revision_id:Optional[str]=None,**kwargs:Any,)->Dict[str,Any]:input_mapper=kwargs.pop("input_mapper",None)ifinput_mapper:warn_deprecated("0.0.305",message=_INPUT_MAPPER_DEP_WARNING,pending=True)tags=kwargs.pop("tags",None)iftags:warn_deprecated("0.1.9",message="The tags argument is deprecated and will be"" removed in a future release. Please specify project_metadata instead.",pending=True,)ifrevision_idisNone:revision_id=get_langchain_env_var_metadata().get("revision_id")ifkwargs:warn_deprecated("0.0.305",message="The following arguments are deprecated and ""will be removed in a future release: "f"{kwargs.keys()}.",removal="0.0.305",)client=clientorClient()container=_DatasetRunContainer.prepare(client,dataset_name,llm_or_chain_factory,project_name,evaluation,tags,input_mapper,concurrency_level,project_metadata=project_metadata,revision_id=revision_id,dataset_version=dataset_version,)ifconcurrency_level==0:batch_results=[_run_llm_or_chain(example,config,llm_or_chain_factory=container.wrapped_model,input_mapper=input_mapper,)forexample,configinzip(container.examples,container.configs)]else:withrunnable_config.get_executor_for_config(container.configs[0])asexecutor:batch_results=list(executor.map(functools.partial(_run_llm_or_chain,llm_or_chain_factory=container.wrapped_model,input_mapper=input_mapper,),container.examples,container.configs,))returncontainer.finish(batch_results,verbose=verbose)
_RUN_ON_DATASET_DOCSTRING="""Run the Chain or language model on a dataset and store tracesto the specified project name.Args: dataset_name: Name of the dataset to run the chain on. llm_or_chain_factory: Language model or Chain constructor to run over the dataset. The Chain constructor is used to permit independent calls on each example without carrying over state. evaluation: Configuration for evaluators to run on the results of the chain concurrency_level: The number of async tasks to run concurrently. project_name: Name of the project to store the traces in. Defaults to {dataset_name}-{chain class name}-{datetime}. project_metadata: Optional metadata to add to the project. Useful for storing information the test variant. (prompt version, model version, etc.) client: LangSmith client to use to access the dataset and to log feedback and run traces. verbose: Whether to print progress. tags: Tags to add to each run in the project. revision_id: Optional revision identifier to assign this test run to track the performance of different versions of your system.Returns: A dictionary containing the run's project name and the resulting model outputs.For the (usually faster) async version of this function, see :func:`arun_on_dataset`.Examples--------.. code-block:: python from langsmith import Client from langchain_openai import ChatOpenAI from langchain.chains import LLMChain from langchain.smith import smith_eval.RunEvalConfig, run_on_dataset # Chains may have memory. Passing in a constructor function lets the # evaluation framework avoid cross-contamination between runs. def construct_chain(): llm = ChatOpenAI(temperature=0) chain = LLMChain.from_string( llm, "What's the answer to {your_input_key}" ) return chain # Load off-the-shelf evaluators via config or the EvaluatorType (string or enum) evaluation_config = smith_eval.RunEvalConfig( evaluators=[ "qa", # "Correctness" against a reference answer "embedding_distance", smith_eval.RunEvalConfig.Criteria("helpfulness"), smith_eval.RunEvalConfig.Criteria({ "fifth-grader-score": "Do you have to be smarter than a fifth grader to answer this question?" }), ] ) client = Client() run_on_dataset( client, dataset_name="<my_dataset_name>", llm_or_chain_factory=construct_chain, evaluation=evaluation_config, )You can also create custom evaluators by subclassing the:class:`StringEvaluator <langchain.evaluation.schema.StringEvaluator>`or LangSmith's `RunEvaluator` classes... code-block:: python from typing import Optional from langchain.evaluation import StringEvaluator class MyStringEvaluator(StringEvaluator): @property def requires_input(self) -> bool: return False @property def requires_reference(self) -> bool: return True @property def evaluation_name(self) -> str: return "exact_match" def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> dict: return {"score": prediction == reference} evaluation_config = smith_eval.RunEvalConfig( custom_evaluators = [MyStringEvaluator()], ) run_on_dataset( client, dataset_name="<my_dataset_name>", llm_or_chain_factory=construct_chain, evaluation=evaluation_config, )"""# noqa: E501run_on_dataset.__doc__=_RUN_ON_DATASET_DOCSTRINGarun_on_dataset.__doc__=_RUN_ON_DATASET_DOCSTRING.replace("run_on_dataset(","await arun_on_dataset(")