[docs]defgenerate_from_stream(stream:Iterator[ChatGenerationChunk])->ChatResult:"""Generate from a stream. Args: stream: Iterator of ChatGenerationChunk. Returns: ChatResult: Chat result. """generation=next(stream,None)ifgeneration:generation+=list(stream)ifgenerationisNone:msg="No generations found in stream."raiseValueError(msg)returnChatResult(generations=[ChatGeneration(message=message_chunk_to_message(generation.message),generation_info=generation.generation_info,)])
[docs]asyncdefagenerate_from_stream(stream:AsyncIterator[ChatGenerationChunk],)->ChatResult:"""Async generate from a stream. Args: stream: Iterator of ChatGenerationChunk. Returns: ChatResult: Chat result. """chunks=[chunkasyncforchunkinstream]returnawaitrun_in_executor(None,generate_from_stream,iter(chunks))
[docs]classBaseChatModel(BaseLanguageModel[BaseMessage],ABC):"""Base class for chat models. Key imperative methods: Methods that actually call the underlying model. +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | Method | Input | Output | Description | +===========================+================================================================+=====================================================================+==================================================================================================+ | `invoke` | str | List[dict | tuple | BaseMessage] | PromptValue | BaseMessage | A single chat model call. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | `ainvoke` | ''' | BaseMessage | Defaults to running invoke in an async executor. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | `stream` | ''' | Iterator[BaseMessageChunk] | Defaults to yielding output of invoke. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | `astream` | ''' | AsyncIterator[BaseMessageChunk] | Defaults to yielding output of ainvoke. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | `astream_events` | ''' | AsyncIterator[StreamEvent] | Event types: 'on_chat_model_start', 'on_chat_model_stream', 'on_chat_model_end'. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | `batch` | List['''] | List[BaseMessage] | Defaults to running invoke in concurrent threads. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | `abatch` | List['''] | List[BaseMessage] | Defaults to running ainvoke in concurrent threads. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | `batch_as_completed` | List['''] | Iterator[Tuple[int, Union[BaseMessage, Exception]]] | Defaults to running invoke in concurrent threads. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ | `abatch_as_completed` | List['''] | AsyncIterator[Tuple[int, Union[BaseMessage, Exception]]] | Defaults to running ainvoke in concurrent threads. | +---------------------------+----------------------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+ This table provides a brief overview of the main imperative methods. Please see the base Runnable reference for full documentation. Key declarative methods: Methods for creating another Runnable using the ChatModel. +----------------------------------+-----------------------------------------------------------------------------------------------------------+ | Method | Description | +==================================+===========================================================================================================+ | `bind_tools` | Create ChatModel that can call tools. | +----------------------------------+-----------------------------------------------------------------------------------------------------------+ | `with_structured_output` | Create wrapper that structures model output using schema. | +----------------------------------+-----------------------------------------------------------------------------------------------------------+ | `with_retry` | Create wrapper that retries model calls on failure. | +----------------------------------+-----------------------------------------------------------------------------------------------------------+ | `with_fallbacks` | Create wrapper that falls back to other models on failure. | +----------------------------------+-----------------------------------------------------------------------------------------------------------+ | `configurable_fields` | Specify init args of the model that can be configured at runtime via the RunnableConfig. | +----------------------------------+-----------------------------------------------------------------------------------------------------------+ | `configurable_alternatives` | Specify alternative models which can be swapped in at runtime via the RunnableConfig. | +----------------------------------+-----------------------------------------------------------------------------------------------------------+ This table provides a brief overview of the main declarative methods. Please see the reference for each method for full documentation. Creating custom chat model: Custom chat model implementations should inherit from this class. Please reference the table below for information about which methods and properties are required or optional for implementations. +----------------------------------+--------------------------------------------------------------------+-------------------+ | Method/Property | Description | Required/Optional | +==================================+====================================================================+===================+ | `_generate` | Use to generate a chat result from a prompt | Required | +----------------------------------+--------------------------------------------------------------------+-------------------+ | `_llm_type` (property) | Used to uniquely identify the type of the model. Used for logging. | Required | +----------------------------------+--------------------------------------------------------------------+-------------------+ | `_identifying_params` (property) | Represent model parameterization for tracing purposes. | Optional | +----------------------------------+--------------------------------------------------------------------+-------------------+ | `_stream` | Use to implement streaming | Optional | +----------------------------------+--------------------------------------------------------------------+-------------------+ | `_agenerate` | Use to implement a native async method | Optional | +----------------------------------+--------------------------------------------------------------------+-------------------+ | `_astream` | Use to implement async version of `_stream` | Optional | +----------------------------------+--------------------------------------------------------------------+-------------------+ Follow the guide for more information on how to implement a custom Chat Model: [Guide](https://python.langchain.com/docs/how_to/custom_chat_model/). """# noqa: E501callback_manager:Optional[BaseCallbackManager]=deprecated(name="callback_manager",since="0.1.7",removal="1.0",alternative="callbacks")(Field(default=None,exclude=True,description="Callback manager to add to the run trace.",))rate_limiter:Optional[BaseRateLimiter]=Field(default=None,exclude=True)"An optional rate limiter to use for limiting the number of requests."disable_streaming:Union[bool,Literal["tool_calling"]]=False"""Whether to disable streaming for this model. If streaming is bypassed, then ``stream()``/``astream()``/``astream_events()`` will defer to ``invoke()``/``ainvoke()``. - If True, will always bypass streaming case. - If "tool_calling", will bypass streaming case only when the model is called with a ``tools`` keyword argument. - If False (default), will always use streaming case if available. """@model_validator(mode="before")@classmethoddefraise_deprecation(cls,values:dict)->Any:"""Raise deprecation warning if callback_manager is used. Args: values (Dict): Values to validate. Returns: Dict: Validated values. Raises: DeprecationWarning: If callback_manager is used. """ifvalues.get("callback_manager")isnotNone:warnings.warn("callback_manager is deprecated. Please use callbacks instead.",DeprecationWarning,stacklevel=5,)values["callbacks"]=values.pop("callback_manager",None)returnvaluesmodel_config=ConfigDict(arbitrary_types_allowed=True,)@cached_propertydef_serialized(self)->dict[str,Any]:returndumpd(self)# --- Runnable methods ---@property@overridedefOutputType(self)->Any:"""Get the output type for this runnable."""returnAnyMessagedef_convert_input(self,input:LanguageModelInput)->PromptValue:ifisinstance(input,PromptValue):returninputelifisinstance(input,str):returnStringPromptValue(text=input)elifisinstance(input,Sequence):returnChatPromptValue(messages=convert_to_messages(input))else:msg=(f"Invalid input type {type(input)}. ""Must be a PromptValue, str, or list of BaseMessages.")raiseValueError(msg)# noqa: TRY004
def_should_stream(self,*,async_api:bool,run_manager:Optional[Union[CallbackManagerForLLMRun,AsyncCallbackManagerForLLMRun]]=None,**kwargs:Any,)->bool:"""Determine if a given model call should hit the streaming API."""sync_not_implemented=type(self)._stream==BaseChatModel._streamasync_not_implemented=type(self)._astream==BaseChatModel._astream# Check if streaming is implemented.if(notasync_api)andsync_not_implemented:returnFalse# Note, since async falls back to sync we check both here.ifasync_apiandasync_not_implementedandsync_not_implemented:returnFalse# Check if streaming has been disabled on this instance.ifself.disable_streamingisTrue:returnFalse# We assume tools are passed in via "tools" kwarg in all models.ifself.disable_streaming=="tool_calling"andkwargs.get("tools"):returnFalse# Check if a runtime streaming flag has been passed in.if"stream"inkwargs:returnkwargs["stream"]# Check if any streaming callback handlers have been passed in.handlers=run_manager.handlersifrun_managerelse[]returnany(isinstance(h,_StreamingCallbackHandler)forhinhandlers)
[docs]defstream(self,input:LanguageModelInput,config:Optional[RunnableConfig]=None,*,stop:Optional[list[str]]=None,**kwargs:Any,)->Iterator[BaseMessageChunk]:ifnotself._should_stream(async_api=False,**{**kwargs,"stream":True}):# model doesn't implement streaming, so use default implementationyieldcast("BaseMessageChunk",self.invoke(input,config=config,stop=stop,**kwargs),)else:config=ensure_config(config)messages=self._convert_input(input).to_messages()ls_structured_output_format=kwargs.pop("ls_structured_output_format",None)orkwargs.pop("structured_output_format",None)ls_structured_output_format_dict=_format_ls_structured_output(ls_structured_output_format)params=self._get_invocation_params(stop=stop,**kwargs)options={"stop":stop,**kwargs,**ls_structured_output_format_dict}inheritable_metadata={**(config.get("metadata")or{}),**self._get_ls_params(stop=stop,**kwargs),}callback_manager=CallbackManager.configure(config.get("callbacks"),self.callbacks,self.verbose,config.get("tags"),self.tags,inheritable_metadata,self.metadata,)(run_manager,)=callback_manager.on_chat_model_start(self._serialized,[messages],invocation_params=params,options=options,name=config.get("run_name"),run_id=config.pop("run_id",None),batch_size=1,)generation:Optional[ChatGenerationChunk]=Noneifself.rate_limiter:self.rate_limiter.acquire(blocking=True)try:forchunkinself._stream(messages,stop=stop,**kwargs):ifchunk.message.idisNone:chunk.message.id=f"run-{run_manager.run_id}"chunk.message.response_metadata=_gen_info_and_msg_metadata(chunk)run_manager.on_llm_new_token(cast("str",chunk.message.content),chunk=chunk)yieldchunk.messageifgenerationisNone:generation=chunkelse:generation+=chunkexceptBaseExceptionase:run_manager.on_llm_error(e,response=LLMResult(generations=[[generation]]ifgenerationelse[]),)raiseifgenerationisNone:err=ValueError("No generation chunks were returned")run_manager.on_llm_error(err,response=LLMResult(generations=[]))raiseerrrun_manager.on_llm_end(LLMResult(generations=[[generation]]))
[docs]asyncdefastream(self,input:LanguageModelInput,config:Optional[RunnableConfig]=None,*,stop:Optional[list[str]]=None,**kwargs:Any,)->AsyncIterator[BaseMessageChunk]:ifnotself._should_stream(async_api=True,**{**kwargs,"stream":True}):# No async or sync stream is implemented, so fall back to ainvokeyieldcast("BaseMessageChunk",awaitself.ainvoke(input,config=config,stop=stop,**kwargs),)returnconfig=ensure_config(config)messages=self._convert_input(input).to_messages()ls_structured_output_format=kwargs.pop("ls_structured_output_format",None)orkwargs.pop("structured_output_format",None)ls_structured_output_format_dict=_format_ls_structured_output(ls_structured_output_format)params=self._get_invocation_params(stop=stop,**kwargs)options={"stop":stop,**kwargs,**ls_structured_output_format_dict}inheritable_metadata={**(config.get("metadata")or{}),**self._get_ls_params(stop=stop,**kwargs),}callback_manager=AsyncCallbackManager.configure(config.get("callbacks"),self.callbacks,self.verbose,config.get("tags"),self.tags,inheritable_metadata,self.metadata,)(run_manager,)=awaitcallback_manager.on_chat_model_start(self._serialized,[messages],invocation_params=params,options=options,name=config.get("run_name"),run_id=config.pop("run_id",None),batch_size=1,)ifself.rate_limiter:awaitself.rate_limiter.aacquire(blocking=True)generation:Optional[ChatGenerationChunk]=Nonetry:asyncforchunkinself._astream(messages,stop=stop,**kwargs,):ifchunk.message.idisNone:chunk.message.id=f"run-{run_manager.run_id}"chunk.message.response_metadata=_gen_info_and_msg_metadata(chunk)awaitrun_manager.on_llm_new_token(cast("str",chunk.message.content),chunk=chunk)yieldchunk.messageifgenerationisNone:generation=chunkelse:generation+=chunkexceptBaseExceptionase:awaitrun_manager.on_llm_error(e,response=LLMResult(generations=[[generation]]ifgenerationelse[]),)raiseifgenerationisNone:err=ValueError("No generation chunks were returned")awaitrun_manager.on_llm_error(err,response=LLMResult(generations=[]))raiseerrawaitrun_manager.on_llm_end(LLMResult(generations=[[generation]]),)
# --- Custom methods ---def_combine_llm_outputs(self,llm_outputs:list[Optional[dict]])->dict:return{}def_get_invocation_params(self,stop:Optional[list[str]]=None,**kwargs:Any,)->dict:params=self.dict()params["stop"]=stopreturn{**params,**kwargs}def_get_ls_params(self,stop:Optional[list[str]]=None,**kwargs:Any,)->LangSmithParams:"""Get standard params for tracing."""# get default provider from class namedefault_provider=self.__class__.__name__ifdefault_provider.startswith("Chat"):default_provider=default_provider[4:].lower()elifdefault_provider.endswith("Chat"):default_provider=default_provider[:-4]default_provider=default_provider.lower()ls_params=LangSmithParams(ls_provider=default_provider,ls_model_type="chat")ifstop:ls_params["ls_stop"]=stop# modelifhasattr(self,"model")andisinstance(self.model,str):ls_params["ls_model_name"]=self.modelelifhasattr(self,"model_name")andisinstance(self.model_name,str):ls_params["ls_model_name"]=self.model_name# temperatureif"temperature"inkwargsandisinstance(kwargs["temperature"],float):ls_params["ls_temperature"]=kwargs["temperature"]elifhasattr(self,"temperature")andisinstance(self.temperature,float):ls_params["ls_temperature"]=self.temperature# max_tokensif"max_tokens"inkwargsandisinstance(kwargs["max_tokens"],int):ls_params["ls_max_tokens"]=kwargs["max_tokens"]elifhasattr(self,"max_tokens")andisinstance(self.max_tokens,int):ls_params["ls_max_tokens"]=self.max_tokensreturnls_paramsdef_get_llm_string(self,stop:Optional[list[str]]=None,**kwargs:Any)->str:ifself.is_lc_serializable():params={**kwargs,"stop":stop}param_string=str(sorted(params.items()))# This code is not super efficient as it goes back and forth between# json and dict.serialized_repr=self._serialized_cleanup_llm_representation(serialized_repr,1)llm_string=json.dumps(serialized_repr,sort_keys=True)returnllm_string+"---"+param_stringelse:params=self._get_invocation_params(stop=stop,**kwargs)params={**params,**kwargs}returnstr(sorted(params.items()))defgenerate(self,messages:list[list[BaseMessage]],stop:Optional[list[str]]=None,callbacks:Callbacks=None,*,tags:Optional[list[str]]=None,metadata:Optional[dict[str,Any]]=None,run_name:Optional[str]=None,run_id:Optional[uuid.UUID]=None,**kwargs:Any,)->LLMResult:"""Pass a sequence of prompts to the model and return model generations. This method should make use of batched calls for models that expose a batched API. Use this method when you want to: 1. take advantage of batched calls, 2. need more output from the model than just the top generated value, 3. are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models). Args: messages: List of list of messages. stop: Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. callbacks: Callbacks to pass through. Used for executing additional functionality, such as logging or streaming, throughout generation. **kwargs: Arbitrary additional keyword arguments. These are usually passed to the model provider API call. Returns: An LLMResult, which contains a list of candidate Generations for each input prompt and additional model provider-specific output. """ls_structured_output_format=kwargs.pop("ls_structured_output_format",None)orkwargs.pop("structured_output_format",None)ls_structured_output_format_dict=_format_ls_structured_output(ls_structured_output_format)params=self._get_invocation_params(stop=stop,**kwargs)options={"stop":stop,**ls_structured_output_format_dict}inheritable_metadata={**(metadataor{}),**self._get_ls_params(stop=stop,**kwargs),}callback_manager=CallbackManager.configure(callbacks,self.callbacks,self.verbose,tags,self.tags,inheritable_metadata,self.metadata,)run_managers=callback_manager.on_chat_model_start(self._serialized,messages,invocation_params=params,options=options,name=run_name,run_id=run_id,batch_size=len(messages),)results=[]fori,minenumerate(messages):try:results.append(self._generate_with_cache(m,stop=stop,run_manager=run_managers[i]ifrun_managerselseNone,**kwargs,))exceptBaseExceptionase:ifrun_managers:run_managers[i].on_llm_error(e,response=LLMResult(generations=[]))raiseflattened_outputs=[LLMResult(generations=[res.generations],llm_output=res.llm_output)# type: ignore[list-item]forresinresults]llm_output=self._combine_llm_outputs([res.llm_outputforresinresults])generations=[res.generationsforresinresults]output=LLMResult(generations=generations,llm_output=llm_output)# type: ignore[arg-type]ifrun_managers:run_infos=[]formanager,flattened_outputinzip(run_managers,flattened_outputs):manager.on_llm_end(flattened_output)run_infos.append(RunInfo(run_id=manager.run_id))output.run=run_infosreturnoutputasyncdefagenerate(self,messages:list[list[BaseMessage]],stop:Optional[list[str]]=None,callbacks:Callbacks=None,*,tags:Optional[list[str]]=None,metadata:Optional[dict[str,Any]]=None,run_name:Optional[str]=None,run_id:Optional[uuid.UUID]=None,**kwargs:Any,)->LLMResult:"""Asynchronously pass a sequence of prompts to a model and return generations. This method should make use of batched calls for models that expose a batched API. Use this method when you want to: 1. take advantage of batched calls, 2. need more output from the model than just the top generated value, 3. are building chains that are agnostic to the underlying language model type (e.g., pure text completion models vs chat models). Args: messages: List of list of messages. stop: Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings. callbacks: Callbacks to pass through. Used for executing additional functionality, such as logging or streaming, throughout generation. **kwargs: Arbitrary additional keyword arguments. These are usually passed to the model provider API call. Returns: An LLMResult, which contains a list of candidate Generations for each input prompt and additional model provider-specific output. """ls_structured_output_format=kwargs.pop("ls_structured_output_format",None)orkwargs.pop("structured_output_format",None)ls_structured_output_format_dict=_format_ls_structured_output(ls_structured_output_format)params=self._get_invocation_params(stop=stop,**kwargs)options={"stop":stop,**ls_structured_output_format_dict}inheritable_metadata={**(metadataor{}),**self._get_ls_params(stop=stop,**kwargs),}callback_manager=AsyncCallbackManager.configure(callbacks,self.callbacks,self.verbose,tags,self.tags,inheritable_metadata,self.metadata,)run_managers=awaitcallback_manager.on_chat_model_start(self._serialized,messages,invocation_params=params,options=options,name=run_name,batch_size=len(messages),run_id=run_id,)results=awaitasyncio.gather(*[self._agenerate_with_cache(m,stop=stop,run_manager=run_managers[i]ifrun_managerselseNone,**kwargs,)fori,minenumerate(messages)],return_exceptions=True,)exceptions=[]fori,resinenumerate(results):ifisinstance(res,BaseException):ifrun_managers:awaitrun_managers[i].on_llm_error(res,response=LLMResult(generations=[]))exceptions.append(res)ifexceptions:ifrun_managers:awaitasyncio.gather(*[run_manager.on_llm_end(LLMResult(generations=[res.generations],# type: ignore[list-item, union-attr]llm_output=res.llm_output,# type: ignore[list-item, union-attr]))forrun_manager,resinzip(run_managers,results)ifnotisinstance(res,Exception)])raiseexceptions[0]flattened_outputs=[LLMResult(generations=[res.generations],llm_output=res.llm_output)# type: ignore[list-item, union-attr]forresinresults]llm_output=self._combine_llm_outputs([res.llm_outputforresinresults])# type: ignore[union-attr]generations=[res.generationsforresinresults]# type: ignore[union-attr]output=LLMResult(generations=generations,llm_output=llm_output)# type: ignore[arg-type]awaitasyncio.gather(*[run_manager.on_llm_end(flattened_output)forrun_manager,flattened_outputinzip(run_managers,flattened_outputs)])ifrun_managers:output.run=[RunInfo(run_id=run_manager.run_id)forrun_managerinrun_managers]returnoutputdefgenerate_prompt(self,prompts:list[PromptValue],stop:Optional[list[str]]=None,callbacks:Callbacks=None,**kwargs:Any,)->LLMResult:prompt_messages=[p.to_messages()forpinprompts]returnself.generate(prompt_messages,stop=stop,callbacks=callbacks,**kwargs)asyncdefagenerate_prompt(self,prompts:list[PromptValue],stop:Optional[list[str]]=None,callbacks:Callbacks=None,**kwargs:Any,)->LLMResult:prompt_messages=[p.to_messages()forpinprompts]returnawaitself.agenerate(prompt_messages,stop=stop,callbacks=callbacks,**kwargs)def_generate_with_cache(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:llm_cache=self.cacheifisinstance(self.cache,BaseCache)elseget_llm_cache()# We should check the cache unless it's explicitly set to False# A None cache means we should use the default global cache# if it's configured.check_cache=self.cacheorself.cacheisNoneifcheck_cache:ifllm_cache:llm_string=self._get_llm_string(stop=stop,**kwargs)prompt=dumps(messages)cache_val=llm_cache.lookup(prompt,llm_string)ifisinstance(cache_val,list):returnChatResult(generations=cache_val)elifself.cacheisNone:passelse:msg="Asked to cache, but no cache found at `langchain.cache`."raiseValueError(msg)# Apply the rate limiter after checking the cache, since# we usually don't want to rate limit cache lookups, but# we do want to rate limit API requests.ifself.rate_limiter:self.rate_limiter.acquire(blocking=True)# If stream is not explicitly set, check if implicitly requested by# astream_events() or astream_log(). Bail out if _stream not implementedifself._should_stream(async_api=False,run_manager=run_manager,**kwargs,):chunks:list[ChatGenerationChunk]=[]forchunkinself._stream(messages,stop=stop,**kwargs):chunk.message.response_metadata=_gen_info_and_msg_metadata(chunk)ifrun_manager:ifchunk.message.idisNone:chunk.message.id=f"run-{run_manager.run_id}"run_manager.on_llm_new_token(cast("str",chunk.message.content),chunk=chunk)chunks.append(chunk)result=generate_from_stream(iter(chunks))else:ifinspect.signature(self._generate).parameters.get("run_manager"):result=self._generate(messages,stop=stop,run_manager=run_manager,**kwargs)else:result=self._generate(messages,stop=stop,**kwargs)# Add response metadata to each generationforidx,generationinenumerate(result.generations):ifrun_managerandgeneration.message.idisNone:generation.message.id=f"run-{run_manager.run_id}-{idx}"generation.message.response_metadata=_gen_info_and_msg_metadata(generation)iflen(result.generations)==1andresult.llm_outputisnotNone:result.generations[0].message.response_metadata={**result.llm_output,**result.generations[0].message.response_metadata,}ifcheck_cacheandllm_cache:llm_cache.update(prompt,llm_string,result.generations)returnresultasyncdef_agenerate_with_cache(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:llm_cache=self.cacheifisinstance(self.cache,BaseCache)elseget_llm_cache()# We should check the cache unless it's explicitly set to False# A None cache means we should use the default global cache# if it's configured.check_cache=self.cacheorself.cacheisNoneifcheck_cache:ifllm_cache:llm_string=self._get_llm_string(stop=stop,**kwargs)prompt=dumps(messages)cache_val=awaitllm_cache.alookup(prompt,llm_string)ifisinstance(cache_val,list):returnChatResult(generations=cache_val)elifself.cacheisNone:passelse:msg="Asked to cache, but no cache found at `langchain.cache`."raiseValueError(msg)# Apply the rate limiter after checking the cache, since# we usually don't want to rate limit cache lookups, but# we do want to rate limit API requests.ifself.rate_limiter:awaitself.rate_limiter.aacquire(blocking=True)# If stream is not explicitly set, check if implicitly requested by# astream_events() or astream_log(). Bail out if _astream not implementedifself._should_stream(async_api=True,run_manager=run_manager,**kwargs,):chunks:list[ChatGenerationChunk]=[]asyncforchunkinself._astream(messages,stop=stop,**kwargs):chunk.message.response_metadata=_gen_info_and_msg_metadata(chunk)ifrun_manager:ifchunk.message.idisNone:chunk.message.id=f"run-{run_manager.run_id}"awaitrun_manager.on_llm_new_token(cast("str",chunk.message.content),chunk=chunk)chunks.append(chunk)result=generate_from_stream(iter(chunks))else:ifinspect.signature(self._agenerate).parameters.get("run_manager"):result=awaitself._agenerate(messages,stop=stop,run_manager=run_manager,**kwargs)else:result=awaitself._agenerate(messages,stop=stop,**kwargs)# Add response metadata to each generationforidx,generationinenumerate(result.generations):ifrun_managerandgeneration.message.idisNone:generation.message.id=f"run-{run_manager.run_id}-{idx}"generation.message.response_metadata=_gen_info_and_msg_metadata(generation)iflen(result.generations)==1andresult.llm_outputisnotNone:result.generations[0].message.response_metadata={**result.llm_output,**result.generations[0].message.response_metadata,}ifcheck_cacheandllm_cache:awaitllm_cache.aupdate(prompt,llm_string,result.generations)returnresult@abstractmethoddef_generate(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:"""Top Level call."""asyncdef_agenerate(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:"""Top Level call."""returnawaitrun_in_executor(None,self._generate,messages,stop,run_manager.get_sync()ifrun_managerelseNone,**kwargs,)def_stream(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->Iterator[ChatGenerationChunk]:raiseNotImplementedErrorasyncdef_astream(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->AsyncIterator[ChatGenerationChunk]:iterator=awaitrun_in_executor(None,self._stream,messages,stop,run_manager.get_sync()ifrun_managerelseNone,**kwargs,)done=object()whileTrue:item=awaitrun_in_executor(None,next,iterator,done,# type: ignore[call-arg, arg-type])ifitemisdone:breakyielditem# type: ignore[misc]
asyncdef_call_async(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,callbacks:Callbacks=None,**kwargs:Any,)->BaseMessage:result=awaitself.agenerate([messages],stop=stop,callbacks=callbacks,**kwargs)generation=result.generations[0][0]ifisinstance(generation,ChatGeneration):returngeneration.messageelse:msg="Unexpected generation type"raiseValueError(msg)# noqa: TRY004@deprecated("0.1.7",alternative="invoke",removal="1.0")defcall_as_llm(self,message:str,stop:Optional[list[str]]=None,**kwargs:Any)->str:returnself.predict(message,stop=stop,**kwargs)@deprecated("0.1.7",alternative="invoke",removal="1.0")defpredict(self,text:str,*,stop:Optional[Sequence[str]]=None,**kwargs:Any)->str:_stop=NoneifstopisNoneelselist(stop)result=self([HumanMessage(content=text)],stop=_stop,**kwargs)ifisinstance(result.content,str):returnresult.contentelse:msg="Cannot use predict when output is not a string."raiseValueError(msg)# noqa: TRY004@deprecated("0.1.7",alternative="invoke",removal="1.0")defpredict_messages(self,messages:list[BaseMessage],*,stop:Optional[Sequence[str]]=None,**kwargs:Any,)->BaseMessage:_stop=NoneifstopisNoneelselist(stop)returnself(messages,stop=_stop,**kwargs)@deprecated("0.1.7",alternative="ainvoke",removal="1.0")asyncdefapredict(self,text:str,*,stop:Optional[Sequence[str]]=None,**kwargs:Any)->str:_stop=NoneifstopisNoneelselist(stop)result=awaitself._call_async([HumanMessage(content=text)],stop=_stop,**kwargs)ifisinstance(result.content,str):returnresult.contentelse:msg="Cannot use predict when output is not a string."raiseValueError(msg)# noqa: TRY004@deprecated("0.1.7",alternative="ainvoke",removal="1.0")asyncdefapredict_messages(self,messages:list[BaseMessage],*,stop:Optional[Sequence[str]]=None,**kwargs:Any,)->BaseMessage:_stop=NoneifstopisNoneelselist(stop)returnawaitself._call_async(messages,stop=_stop,**kwargs)@property@abstractmethoddef_llm_type(self)->str:"""Return type of chat model."""defdict(self,**kwargs:Any)->dict:"""Return a dictionary of the LLM."""starter_dict=dict(self._identifying_params)starter_dict["_type"]=self._llm_typereturnstarter_dict
[docs]defwith_structured_output(self,schema:Union[typing.Dict,type],# noqa: UP006*,include_raw:bool=False,**kwargs:Any,)->Runnable[LanguageModelInput,Union[typing.Dict,BaseModel]]:# noqa: UP006"""Model wrapper that returns outputs formatted to match the given schema. Args: schema: The output schema. Can be passed in as: - an OpenAI function/tool schema, - a JSON Schema, - a TypedDict class, - or a Pydantic class. If ``schema`` is a Pydantic class then the model output will be a Pydantic instance of that class, and the model-generated fields will be validated by the Pydantic class. Otherwise the model output will be a dict and will not be validated. See :meth:`langchain_core.utils.function_calling.convert_to_openai_tool` for more on how to properly specify types and descriptions of schema fields when specifying a Pydantic or TypedDict class. include_raw: If False then only the parsed structured output is returned. If an error occurs during model output parsing it will be raised. If True then both the raw model response (a BaseMessage) and the parsed model response will be returned. If an error occurs during output parsing it will be caught and returned as well. The final output is always a dict with keys "raw", "parsed", and "parsing_error". Returns: A Runnable that takes same inputs as a :class:`langchain_core.language_models.chat.BaseChatModel`. If ``include_raw`` is False and ``schema`` is a Pydantic class, Runnable outputs an instance of ``schema`` (i.e., a Pydantic object). Otherwise, if ``include_raw`` is False then Runnable outputs a dict. If ``include_raw`` is True, then Runnable outputs a dict with keys: - ``"raw"``: BaseMessage - ``"parsed"``: None if there was a parsing error, otherwise the type depends on the ``schema`` as described above. - ``"parsing_error"``: Optional[BaseException] Example: Pydantic schema (include_raw=False): .. code-block:: python from pydantic import BaseModel class AnswerWithJustification(BaseModel): '''An answer to the user question along with justification for the answer.''' answer: str justification: str llm = ChatModel(model="model-name", temperature=0) structured_llm = llm.with_structured_output(AnswerWithJustification) structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers") # -> AnswerWithJustification( # answer='They weigh the same', # justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.' # ) Example: Pydantic schema (include_raw=True): .. code-block:: python from pydantic import BaseModel class AnswerWithJustification(BaseModel): '''An answer to the user question along with justification for the answer.''' answer: str justification: str llm = ChatModel(model="model-name", temperature=0) structured_llm = llm.with_structured_output(AnswerWithJustification, include_raw=True) structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers") # -> { # 'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Ao02pnFYXD6GN1yzc0uXPsvF', 'function': {'arguments': '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}', 'name': 'AnswerWithJustification'}, 'type': 'function'}]}), # 'parsed': AnswerWithJustification(answer='They weigh the same.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'), # 'parsing_error': None # } Example: Dict schema (include_raw=False): .. code-block:: python from pydantic import BaseModel from langchain_core.utils.function_calling import convert_to_openai_tool class AnswerWithJustification(BaseModel): '''An answer to the user question along with justification for the answer.''' answer: str justification: str dict_schema = convert_to_openai_tool(AnswerWithJustification) llm = ChatModel(model="model-name", temperature=0) structured_llm = llm.with_structured_output(dict_schema) structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers") # -> { # 'answer': 'They weigh the same', # 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.' # } .. versionchanged:: 0.2.26 Added support for TypedDict class. """# noqa: E501_=kwargs.pop("method",None)_=kwargs.pop("strict",None)ifkwargs:msg=f"Received unsupported arguments {kwargs}"raiseValueError(msg)fromlangchain_core.output_parsers.openai_toolsimport(JsonOutputKeyToolsParser,PydanticToolsParser,)ifself.bind_toolsisBaseChatModel.bind_tools:msg="with_structured_output is not implemented for this model."raiseNotImplementedError(msg)llm=self.bind_tools([schema],tool_choice="any",ls_structured_output_format={"kwargs":{"method":"function_calling"},"schema":schema,},)ifisinstance(schema,type)andis_basemodel_subclass(schema):output_parser:OutputParserLike=PydanticToolsParser(tools=[cast("TypeBaseModel",schema)],first_tool_only=True)else:key_name=convert_to_openai_tool(schema)["function"]["name"]output_parser=JsonOutputKeyToolsParser(key_name=key_name,first_tool_only=True)ifinclude_raw:parser_assign=RunnablePassthrough.assign(parsed=itemgetter("raw")|output_parser,parsing_error=lambda_:None)parser_none=RunnablePassthrough.assign(parsed=lambda_:None)parser_with_fallback=parser_assign.with_fallbacks([parser_none],exception_key="parsing_error")returnRunnableMap(raw=llm)|parser_with_fallbackelse:returnllm|output_parser
[docs]classSimpleChatModel(BaseChatModel):"""Simplified implementation for a chat model to inherit from. **Note** This implementation is primarily here for backwards compatibility. For new implementations, please use `BaseChatModel` directly. """def_generate(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:output_str=self._call(messages,stop=stop,run_manager=run_manager,**kwargs)message=AIMessage(content=output_str)generation=ChatGeneration(message=message)returnChatResult(generations=[generation])@abstractmethoddef_call(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->str:"""Simpler interface."""asyncdef_agenerate(self,messages:list[BaseMessage],stop:Optional[list[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:returnawaitrun_in_executor(None,self._generate,messages,stop=stop,run_manager=run_manager.get_sync()ifrun_managerelseNone,**kwargs,)
def_gen_info_and_msg_metadata(generation:Union[ChatGeneration,ChatGenerationChunk],)->dict:return{**(generation.generation_infoor{}),**generation.message.response_metadata,}def_cleanup_llm_representation(serialized:Any,depth:int)->None:"""Remove non-serializable objects from a serialized object."""ifdepth>100:# Don't cooperate for pathological casesreturnifnotisinstance(serialized,dict):returnif("type"inserializedandserialized["type"]=="not_implemented"and"repr"inserialized):delserialized["repr"]if"graph"inserialized:delserialized["graph"]if"kwargs"inserialized:kwargs=serialized["kwargs"]forvalueinkwargs.values():_cleanup_llm_representation(value,depth+1)