[docs]defprocess_content_item(item:Dict[str,Any])->Dict[str,Any]:"""Process a single content item."""ifitem["type"]=="image_url":image_url=item["image_url"]ifisinstance(image_url,dict)and"url"inimage_url:# If it's in LangChain format, extract the URL valueitem["image_url"]=image_url["url"]returnitem
[docs]defprocess_content(content:ContentType)->List[Dict[str,Any]]:"""Process content to handle both text and media inputs, returning a list of content items."""ifisinstance(content,str):return[{"type":"text","text":content}]elifisinstance(content,list):result=[]foritemincontent:ifisinstance(item,str):result.append({"type":"text","text":item})elifisinstance(item,dict):result.append(process_content_item(item))else:raiseValueError(f"Invalid content item format: {item}")returnresultelse:raiseValueError("Invalid content format")
[docs]defconvert_to_reka_messages(messages:List[BaseMessage])->List[Dict[str,Any]]:"""Convert LangChain messages to Reka message format."""reka_messages:List[Dict[str,Any]]=[]system_message:Optional[str]=Noneformessageinmessages:ifisinstance(message,SystemMessage):ifsystem_messageisNone:ifisinstance(message.content,str):system_message=message.contentelse:raiseTypeError("SystemMessage content must be a string.")else:raiseValueError("Multiple system messages are not supported.")elifisinstance(message,HumanMessage):processed_content=process_content(message.content)ifsystem_message:if(processed_contentandisinstance(processed_content[0],dict)andprocessed_content[0].get("type")=="text"and"text"inprocessed_content[0]):processed_content[0]["text"]=(f"{system_message}\n{processed_content[0]['text']}")else:processed_content.insert(0,{"type":"text","text":system_message})system_message=Nonereka_messages.append({"role":"user","content":processed_content})elifisinstance(message,AIMessage):reka_message:Dict[str,Any]={"role":"assistant"}ifmessage.content:processed_content=process_content(message.content)reka_message["content"]=processed_contentif"tool_calls"inmessage.additional_kwargs:tool_calls=message.additional_kwargs["tool_calls"]formatted_tool_calls=[]fortool_callintool_calls:formatted_tool_call={"id":tool_call["id"],"name":tool_call["function"]["name"],"parameters":json.loads(tool_call["function"]["arguments"]),}formatted_tool_calls.append(formatted_tool_call)reka_message["tool_calls"]=formatted_tool_callsreka_messages.append(reka_message)elifisinstance(message,ToolMessage):content_list:List[Dict[str,Any]]=[]content_list.append({"tool_call_id":message.tool_call_id,"output":json.dumps({"status":message.content}),})reka_messages.append({"role":"tool_output","content":content_list,})else:raiseValueError(f"Unsupported message type: {type(message)}")returnreka_messages
[docs]classChatReka(BaseChatModel):"""Reka chat large language models."""client:Any=None#: :meta private:async_client:Any=None#: :meta private:model:str=Field(default=DEFAULT_REKA_MODEL)max_tokens:int=Field(default=256)temperature:Optional[float]=Nonestreaming:bool=Falsedefault_request_timeout:Optional[float]=Nonemax_retries:int=2reka_api_key:Optional[str]=Nonemodel_kwargs:Dict[str,Any]=Field(default_factory=dict)model_config=ConfigDict(extra="forbid")token_counter:Optional[Callable[[Union[str,BaseMessage,List[BaseMessage]]],int]]=None@model_validator(mode="before")@classmethoddefvalidate_environment(cls,values:Dict[str,Any])->Dict[str,Any]:"""Validate that API key and Python package exist in the environment."""reka_api_key=values.get("reka_api_key")reka_api_key=get_from_dict_or_env({"reka_api_key":reka_api_key},"reka_api_key","REKA_API_KEY")values["reka_api_key"]=reka_api_keytry:# Import reka libraries herefromreka.clientimportAsyncReka,Rekavalues["client"]=Reka(api_key=reka_api_key,)values["async_client"]=AsyncReka(api_key=reka_api_key,)exceptImportError:raiseImportError("Could not import Reka Python package. ""Please install it with `pip install reka-api`.")returnvalues@propertydef_default_params(self)->Mapping[str,Any]:"""Get the default parameters for calling Reka API."""params={"model":self.model,"max_tokens":self.max_tokens,}ifself.temperatureisnotNone:params["temperature"]=self.temperaturereturn{**params,**self.model_kwargs}@propertydef_llm_type(self)->str:"""Return type of chat model."""return"reka-chat"def_stream(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->Iterator[ChatGenerationChunk]:reka_messages=convert_to_reka_messages(messages)params={**self._default_params,**kwargs}ifstop:params["stop"]=stopstream=self.client.chat.create_stream(messages=reka_messages,**params)forchunkinstream:content=chunk.responses[0].chunk.contentchat_chunk=ChatGenerationChunk(message=AIMessageChunk(content=content))ifrun_manager:run_manager.on_llm_new_token(content,chunk=chat_chunk)yieldchat_chunkasyncdef_astream(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->AsyncIterator[ChatGenerationChunk]:reka_messages=convert_to_reka_messages(messages)params={**self._default_params,**kwargs}ifstop:params["stop"]=stopstream=self.async_client.chat.create_stream(messages=reka_messages,**params)asyncforchunkinstream:content=chunk.responses[0].chunk.contentchat_chunk=ChatGenerationChunk(message=AIMessageChunk(content=content))ifrun_manager:awaitrun_manager.on_llm_new_token(content,chunk=chat_chunk)yieldchat_chunkdef_generate(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:ifself.streaming:returngenerate_from_stream(self._stream(messages,stop=stop,run_manager=run_manager,**kwargs))reka_messages=convert_to_reka_messages(messages)params={**self._default_params,**kwargs}ifstop:params["stop"]=stopresponse=self.client.chat.create(messages=reka_messages,**params)ifresponse.responses[0].message.tool_calls:tool_calls=response.responses[0].message.tool_callsmessage=AIMessage(content="",# Empty string instead of Noneadditional_kwargs={"tool_calls":[{"id":tc.id,"type":"function","function":{"name":tc.name,"arguments":json.dumps(tc.parameters),},}fortcintool_calls]},)else:content=response.responses[0].message.content# Ensure content is never Nonemessage=AIMessage(content=contentifcontentisnotNoneelse"")returnChatResult(generations=[ChatGeneration(message=message)])asyncdef_agenerate(self,messages:List[BaseMessage],stop:Optional[List[str]]=None,run_manager:Optional[AsyncCallbackManagerForLLMRun]=None,**kwargs:Any,)->ChatResult:ifself.streaming:returnawaitagenerate_from_stream(self._astream(messages,stop=stop,run_manager=run_manager,**kwargs))reka_messages=convert_to_reka_messages(messages)params={**self._default_params,**kwargs}ifstop:params["stop"]=stopresponse=awaitself.async_client.chat.create(messages=reka_messages,**params)ifresponse.responses[0].message.tool_calls:tool_calls=response.responses[0].message.tool_callsmessage=AIMessage(content="",# Empty string instead of Noneadditional_kwargs={"tool_calls":[{"id":tc.id,"type":"function","function":{"name":tc.name,"arguments":json.dumps(tc.parameters),},}fortcintool_calls]},)else:content=response.responses[0].message.content# Ensure content is never Nonemessage=AIMessage(content=contentifcontentisnotNoneelse"")returnChatResult(generations=[ChatGeneration(message=message)])
[docs]defget_num_tokens(self,input:Union[str,BaseMessage,List[BaseMessage]])->int:"""Calculate number of tokens. Args: input: Either a string, a single BaseMessage, or a list of BaseMessages. Returns: int: Number of tokens in the input. Raises: ImportError: If tiktoken is not installed. ValueError: If message content is not a string. """ifself.token_counterisnotNone:returnself.token_counter(input)try:importtiktokenexceptImportError:raiseImportError("Could not import tiktoken python package. ""Please install it with `pip install tiktoken`.")encoding=tiktoken.get_encoding("cl100k_base")ifisinstance(input,str):returnlen(encoding.encode(input))elifisinstance(input,BaseMessage):content=input.contentifnotisinstance(content,str):raiseValueError(f"Message content must be a string, got {type(content)}")returnlen(encoding.encode(content))elifisinstance(input,list):total=0formsgininput:content=msg.contentifnotisinstance(content,str):raiseValueError(f"Message content must be a string, got {type(content)}")total+=len(encoding.encode(content))returntotalelse:raiseTypeError(f"Unsupported input type: {type(input)}")
[docs]defbind_tools(self,tools:Sequence[Union[Dict[str,Any],Type[BaseModel],Callable,BaseTool]],*,tool_choice:str="auto",strict:Optional[bool]=None,**kwargs:Any,)->Runnable[LanguageModelInput,BaseMessage]:"""Bind tool-like objects to this chat model. The `tool_choice` parameter controls how the model uses the tools you pass. There are three available options: - `"auto"`: Lets the model decide whether or not to invoke a tool. This is the recommended way to do function calling with our models. - `"none"`: Disables tool calling. In this case, even if you pass tools to the model, the model will not invoke any tools. - `"tool"`: Forces the model to invoke one or more of the tools it has been passed. Args: tools: A list of tool definitions to bind to this chat model. Supports any tool definition handled by :meth:`langchain_core.utils.function_calling.convert_to_openai_tool`. tool_choice: Controls how the model uses the tools you pass. Options are "auto", "none", or "tool". Defaults to "auto". strict: If True, model output is guaranteed to exactly match the JSON Schema provided in the tool definition. If False, input schema will not be validated and model output will not be validated. If None, ``strict`` argument will not be passed to the model. kwargs: Any additional parameters are passed directly to the model. Returns: Runnable: An executable chain or component. """formatted_tools=[convert_to_openai_tool(tool,strict=strict)fortoolintools]# Ensure tool_choice is one of the allowed optionsiftool_choicenotin("auto","none","tool"):raiseValueError(f"Invalid tool_choice '{tool_choice}' provided. ""Tool choice must be one of: 'auto', 'none', or 'tool'.")# Map tool_choice to the parameter expected by the Reka APIkwargs["tool_choice"]=tool_choice# Pass the tools and updated kwargs to the modelformatted_tools=[tool["function"]fortoolinformatted_tools]returnsuper().bind(tools=formatted_tools,**kwargs)