[docs]classCacheUsageMetadata(UsageMetadata):cache_creation_input_tokens:Optional[int]"""The number of input tokens used to create the cache entry."""cache_read_input_tokens:Optional[int]"""The number of input tokens read from the cache."""
def_create_retry_decorator(*,max_retries:int=3,run_manager:Optional[Union[AsyncCallbackManagerForLLMRun,CallbackManagerForLLMRun]]=None,wait_exponential_kwargs:Optional[dict[str,float]]=None,)->Callable[[Any],Any]:"""Creates a retry decorator for Anthropic Vertex LLMs with proper tracing."""fromanthropicimport(# type: ignore[unused-ignore, import-not-found]APIError,APITimeoutError,RateLimitError,)errors=[APIError,APITimeoutError,RateLimitError,]returncreate_base_retry_decorator(error_types=errors,max_retries=max_retries,run_manager=run_manager,wait_exponential_kwargs=wait_exponential_kwargs,)
[docs]classVertexAIModelGarden(_BaseVertexAIModelGarden,BaseLLM):"""Large language models served from Vertex AI Model Garden."""model_config=ConfigDict(populate_by_name=True,protected_namespaces=(),)# Needed so that mypy doesn't flag missing aliased init args.def__init__(self,**kwargs:Any)->None:super().__init__(**kwargs)def_generate(self,prompts:List[str],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->LLMResult:"""Run the LLM on the given prompt and input."""instances=self._prepare_request(prompts,**kwargs)ifself.single_example_per_requestandlen(instances)>1:results=[]forinstanceininstances:response=self.client.predict(endpoint=self.endpoint_path,instances=[instance])results.append(self._parse_prediction(response.predictions[0]))returnLLMResult(generations=[[Generation(text=result)]forresultinresults])response=self.client.predict(endpoint=self.endpoint_path,instances=instances)returnself._parse_response(response)asyncdef_agenerate(self,prompts:List[str],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->LLMResult:"""Run the LLM on the given prompt and input."""instances=self._prepare_request(prompts,**kwargs)ifself.single_example_per_requestandlen(instances)>1:responses=[]forinstanceininstances:responses.append(self.async_client.predict(endpoint=self.endpoint_path,instances=[instance]))responses=awaitasyncio.gather(*responses)returnLLMResult(generations=[[Generation(text=self._parse_prediction(response.predictions[0]))]forresponseinresponses])response=awaitself.async_client.predict(endpoint=self.endpoint_path,instances=instances)returnself._parse_response(response)
[docs]classChatAnthropicVertex(_VertexAICommon,BaseChatModel):async_client:Any=Field(default=None,exclude=True)#: :meta private:max_output_tokens:int=Field(default=1024,alias="max_tokens")access_token:Optional[str]=Nonestream_usage:bool=True# Whether to include usage metadata in streaming outputcredentials:Optional[Credentials]=Nonemax_retries:int=Field(default=3,description="Number of retries for error handling.")wait_exponential_kwargs:Optional[dict[str,float]]=Field(default=None,description="Optional dictionary with parameters for wait_exponential: ""- multiplier: Initial wait time multiplier (default: 1.0) ""- min: Minimum wait time in seconds (default: 4.0) ""- max: Maximum wait time in seconds (default: 10.0) ""- exp_base: Exponent base to use (default: 2.0) ",)timeout:Optional[Union[float,httpx.Timeout]]=Field(default=None,description="Timeout for API requests.",)model_config=ConfigDict(populate_by_name=True,)model_kwargs:dict[str,Any]=Field(default_factory=dict)# Needed so that mypy doesn't flag missing aliased init args.def__init__(self,**kwargs:Any)->None:super().__init__(**kwargs)@model_validator(mode="after")defvalidate_environment(self)->Self:fromanthropicimport(# type: ignore[unused-ignore, import-not-found]AnthropicVertex,AsyncAnthropicVertex,)ifself.projectisNone:raiseValueError("project is required for ChatAnthropicVertex")project_id:str=self.project# Always disable Anthropic's retries, we handle it using the retry decoratorself.client=AnthropicVertex(project_id=project_id,region=self.location,max_retries=0,access_token=self.access_token,credentials=self.credentials,timeout=self.timeout,)self.async_client=AsyncAnthropicVertex(project_id=project_id,region=self.location,max_retries=0,access_token=self.access_token,credentials=self.credentials,timeout=self.timeout,)returnself@propertydef_default_params(self):default_parameters={"model":self.model_name,"max_tokens":self.max_output_tokens,"temperature":self.temperature,"top_k":self.top_k,"top_p":self.top_p,}return{**default_parameters,**self.model_kwargs}def_format_params(self,*,messages:List[BaseMessage],stop:Optional[List[str]]=None,**kwargs:Any,)->Dict[str,Any]:system_message,formatted_messages=_format_messages_anthropic(messages,self.project)params=self._default_paramsparams.update(kwargs)ifkwargs.get("model_name"):params["model"]=params["model_name"]ifkwargs.get("model"):params["model"]=kwargs["model"]params.pop("model_name",None)params.update({"system":system_message,"messages":formatted_messages,"stop_sequences":stop,})return{k:vfork,vinparams.items()ifvisnotNone}def_format_output(self,data:Any,**kwargs:Any)->ChatResult:data_dict=data.model_dump()content=data_dict["content"]llm_output={k:vfork,vindata_dict.items()ifknotin("content","role","type")}iflen(content)==1andcontent[0]["type"]=="text":msg=AIMessage(content=content[0]["text"])elifany(block["type"]=="tool_use"forblockincontent):tool_calls=_extract_tool_calls(content)msg=AIMessage(content=content,tool_calls=tool_calls,)else:msg=AIMessage(content=content)# Collect token usagemsg.usage_metadata=CacheUsageMetadata(input_tokens=data.usage.input_tokens,output_tokens=data.usage.output_tokens,total_tokens=data.usage.input_tokens+data.usage.output_tokens,cache_creation_input_tokens=data.usage.cache_creation_input_tokens,cache_read_input_tokens=data.usage.cache_read_input_tokens,)returnChatResult(generations=[ChatGeneration(message=msg)],llm_output=llm_output,)def_generate(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:"""Run the LLM on the given prompt and input."""params=self._format_params(messages=messages,stop=stop,**kwargs)ifself.streaming:stream_iter=self._stream(messages,stop=stop,run_manager=run_manager,**kwargs)returngenerate_from_stream(stream_iter)retry_decorator=_create_retry_decorator(max_retries=self.max_retries,run_manager=run_manager,wait_exponential_kwargs=self.wait_exponential_kwargs,)@retry_decoratordef_completion_with_retry_inner(**params:Any)->Any:returnself.client.messages.create(**params)data=_completion_with_retry_inner(**params)returnself._format_output(data,**kwargs)asyncdef_agenerate(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:"""Run the LLM on the given prompt and input."""params=self._format_params(messages=messages,stop=stop,**kwargs)ifself.streaming:stream_iter=self._astream(messages,stop=stop,run_manager=run_manager,**kwargs)returnawaitagenerate_from_stream(stream_iter)retry_decorator=_create_retry_decorator(max_retries=self.max_retries,run_manager=run_manager,wait_exponential_kwargs=self.wait_exponential_kwargs,)@retry_decoratorasyncdef_acompletion_with_retry_inner(**params:Any)->Any:returnawaitself.async_client.messages.create(**params)data=await_acompletion_with_retry_inner(**params)returnself._format_output(data,**kwargs)@propertydef_llm_type(self)->str:"""Return type of chat model."""return"anthropic-chat-vertexai"def_stream(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,*,stream_usage:Optional[bool]=None,**kwargs:Any,)->Iterator[ChatGenerationChunk]:ifstream_usageisNone:stream_usage=self.stream_usageparams=self._format_params(messages=messages,stop=stop,**kwargs)retry_decorator=_create_retry_decorator(max_retries=self.max_retries,run_manager=run_manager,wait_exponential_kwargs=self.wait_exponential_kwargs,)@retry_decoratordef_stream_with_retry(**params:Any)->Any:returnself.client.messages.create(**params,stream=True)stream=_stream_with_retry(**params)coerce_content_to_string=not_tools_in_params(params)foreventinstream:msg=_make_message_chunk_from_anthropic_event(event,stream_usage=stream_usage,coerce_content_to_string=coerce_content_to_string,)ifmsgisnotNone:chunk=ChatGenerationChunk(message=msg)ifrun_managerandisinstance(msg.content,str):run_manager.on_llm_new_token(msg.content,chunk=chunk)yieldchunkasyncdef_astream(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,*,stream_usage:Optional[bool]=None,**kwargs:Any,)->AsyncIterator[ChatGenerationChunk]:ifstream_usageisNone:stream_usage=self.stream_usageparams=self._format_params(messages=messages,stop=stop,**kwargs)retry_decorator=_create_retry_decorator(max_retries=self.max_retries,run_manager=run_manager,wait_exponential_kwargs=self.wait_exponential_kwargs,)@retry_decoratorasyncdef_astream_with_retry(**params:Any)->Any:returnawaitself.async_client.messages.create(**params,stream=True)stream=await_astream_with_retry(**params)coerce_content_to_string=not_tools_in_params(params)asyncforeventinstream:msg=_make_message_chunk_from_anthropic_event(event,stream_usage=stream_usage,coerce_content_to_string=coerce_content_to_string,)ifmsgisnotNone:chunk=ChatGenerationChunk(message=msg)ifrun_managerandisinstance(msg.content,str):awaitrun_manager.on_llm_new_token(msg.content,chunk=chunk)yieldchunk
[docs]defbind_tools(self,tools:Sequence[Union[Dict[str,Any],Type[BaseModel],Callable,BaseTool]],*,tool_choice:Optional[Union[Dict[str,str],Literal["any","auto"],str]]=None,**kwargs:Any,)->Runnable[LanguageModelInput,BaseMessage]:"""Bind tool-like objects to this chat model"""formatted_tools=[convert_to_anthropic_tool(tool)fortoolintools]ifnottool_choice:passelifisinstance(tool_choice,dict):kwargs["tool_choice"]=tool_choiceelifisinstance(tool_choice,str)andtool_choicein("any","auto"):kwargs["tool_choice"]={"type":tool_choice}elifisinstance(tool_choice,str):kwargs["tool_choice"]={"type":"tool","name":tool_choice}else:raiseValueError(f"Unrecognized 'tool_choice' type {tool_choice=}. Expected dict, "f"str, or None.")returnself.bind(tools=formatted_tools,**kwargs)
[docs]defwith_structured_output(self,schema:Union[Dict,Type[BaseModel]],*,include_raw:bool=False,**kwargs:Any,)->Runnable[LanguageModelInput,Union[Dict,BaseModel]]:"""Model wrapper that returns outputs formatted to match the given schema."""tool_name=convert_to_anthropic_tool(schema)["name"]llm=self.bind_tools([schema],tool_choice=tool_name)ifisinstance(schema,type)andissubclass(schema,BaseModel):output_parser=ToolsOutputParser(first_tool_only=True,pydantic_schemas=[schema])else:output_parser=ToolsOutputParser(first_tool_only=True,args_only=True)ifinclude_raw:parser_assign=RunnablePassthrough.assign(parsed=itemgetter("raw")|output_parser,parsing_error=lambda_:None)parser_none=RunnablePassthrough.assign(parsed=lambda_:None)parser_with_fallback=parser_assign.with_fallbacks([parser_none],exception_key="parsing_error")returnRunnableMap(raw=llm)|parser_with_fallbackelse:returnllm|output_parser