Source code for langchain_google_vertexai.functions_utils

from __future__ import annotations

import json
import logging
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Literal,
    Optional,
    Sequence,
    Type,
    TypedDict,
    Union,
    cast,
)

import google.cloud.aiplatform_v1beta1.types as gapic
import vertexai.generative_models as vertexai  # type: ignore
from langchain_core.exceptions import OutputParserException
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.outputs import ChatGeneration, Generation
from langchain_core.tools import BaseTool
from langchain_core.tools import tool as callable_as_lc_tool
from langchain_core.utils.function_calling import (
    FunctionDescription,
    convert_to_openai_tool,
)
from langchain_core.utils.json_schema import dereference_refs
from pydantic import BaseModel

logger = logging.getLogger(__name__)

_FunctionDeclarationLike = Union[
    BaseTool,
    Type[BaseModel],
    FunctionDescription,
    Callable,
    vertexai.FunctionDeclaration,
    Dict[str, Any],
]
_GoogleSearchRetrievalLike = Union[
    gapic.GoogleSearchRetrieval,
    Dict[str, Any],
]
_RetrievalLike = Union[gapic.Retrieval, Dict[str, Any]]


class _ToolDictLike(TypedDict):
    function_declarations: Optional[List[_FunctionDeclarationLike]]
    google_search_retrieval: Optional[_GoogleSearchRetrievalLike]
    retrieval: Optional[_RetrievalLike]


_ToolType = Union[gapic.Tool, vertexai.Tool, _ToolDictLike, _FunctionDeclarationLike]
_ToolsType = Sequence[_ToolType]

_ALLOWED_SCHEMA_FIELDS = []
_ALLOWED_SCHEMA_FIELDS.extend([f.name for f in gapic.Schema()._pb.DESCRIPTOR.fields])
_ALLOWED_SCHEMA_FIELDS.extend(
    [
        f
        for f in gapic.Schema.to_dict(
            gapic.Schema(), preserving_proto_field_name=False
        ).keys()
    ]
)
_ALLOWED_SCHEMA_FIELDS_SET = set(_ALLOWED_SCHEMA_FIELDS)


def _format_json_schema_to_gapic_v1(schema: Dict[str, Any]) -> Dict[str, Any]:
    """Format a JSON schema from a Pydantic V1 BaseModel to gapic."""
    converted_schema: Dict[str, Any] = {}
    for key, value in schema.items():
        if key == "definitions":
            continue
        elif key == "items":
            converted_schema["items"] = _format_json_schema_to_gapic_v1(value)
        elif key == "properties":
            if "properties" not in converted_schema:
                converted_schema["properties"] = {}
            for pkey, pvalue in value.items():
                converted_schema["properties"][pkey] = _format_json_schema_to_gapic_v1(
                    pvalue
                )
            continue
        elif key in ["type", "_type"]:
            converted_schema["type"] = str(value).upper()
        elif key == "allOf":
            if len(value) > 1:
                logger.warning(
                    "Only first value for 'allOf' key is supported. "
                    f"Got {len(value)}, ignoring other than first value!"
                )
            return _format_json_schema_to_gapic_v1(value[0])
        elif key not in _ALLOWED_SCHEMA_FIELDS_SET:
            logger.warning(f"Key '{key}' is not supported in schema, ignoring")
        else:
            converted_schema[key] = value
    return converted_schema


def _format_json_schema_to_gapic(
    schema: Dict[str, Any],
    parent_key: Optional[str] = None,
    required_fields: Optional[list] = None,
) -> Dict[str, Any]:
    """Format a JSON schema from a Pydantic V2 BaseModel to gapic."""
    converted_schema: Dict[str, Any] = {}
    for key, value in schema.items():
        if key == "definitions":
            continue
        elif key == "items":
            converted_schema["items"] = _format_json_schema_to_gapic(
                value, parent_key, required_fields
            )
        elif key == "properties":
            if "properties" not in converted_schema:
                converted_schema["properties"] = {}
            for pkey, pvalue in value.items():
                converted_schema["properties"][pkey] = _format_json_schema_to_gapic(
                    pvalue, pkey, schema.get("required", [])
                )
            continue
        elif key in ["type", "_type"]:
            converted_schema["type"] = str(value).upper()
        elif key == "allOf":
            if len(value) > 1:
                logger.warning(
                    "Only first value for 'allOf' key is supported. "
                    f"Got {len(value)}, ignoring other than first value!"
                )
            return _format_json_schema_to_gapic(value[0], parent_key, required_fields)
        elif key == "anyOf":
            if len(value) == 2 and any(v.get("type") == "null" for v in value):
                non_null_type = next(v for v in value if v.get("type") != "null")
                converted_schema.update(
                    _format_json_schema_to_gapic(
                        non_null_type, parent_key, required_fields
                    )
                )
                # Remove the field from required if it exists
                if required_fields and parent_key in required_fields:
                    required_fields.remove(parent_key)
                continue
        elif key not in _ALLOWED_SCHEMA_FIELDS_SET:
            logger.warning(f"Key '{key}' is not supported in schema, ignoring")
        else:
            converted_schema[key] = value
    return converted_schema


def _dict_to_gapic_schema(
    schema: Dict[str, Any], pydantic_version: str = "v1"
) -> gapic.Schema:
    dereferenced_schema = dereference_refs(schema)
    if pydantic_version == "v1":
        formatted_schema = _format_json_schema_to_gapic_v1(dereferenced_schema)
    else:
        formatted_schema = _format_json_schema_to_gapic(dereferenced_schema)
    json_schema = json.dumps(formatted_schema)
    return gapic.Schema.from_json(json_schema)


def _format_base_tool_to_function_declaration(
    tool: BaseTool,
) -> gapic.FunctionDeclaration:
    "Format tool into the Vertex function API."
    if not tool.args_schema:
        return gapic.FunctionDeclaration(
            name=tool.name,
            description=tool.description,
            parameters=gapic.Schema(
                type=gapic.Type.OBJECT,
                properties={
                    "__arg1": gapic.Schema(type=gapic.Type.STRING),
                },
                required=["__arg1"],
            ),
        )

    if hasattr(tool.args_schema, "model_json_schema"):
        schema = tool.args_schema.model_json_schema()
        pydantic_version = "v2"
    else:
        schema = tool.args_schema.schema()
        pydantic_version = "v1"

    parameters = _dict_to_gapic_schema(schema, pydantic_version=pydantic_version)

    return gapic.FunctionDeclaration(
        name=tool.name or schema.get("title"),
        description=tool.description or schema.get("description"),
        parameters=parameters,
    )


def _format_pydantic_to_function_declaration(
    pydantic_model: Type[BaseModel],
) -> gapic.FunctionDeclaration:
    if hasattr(pydantic_model, "model_json_schema"):
        schema = pydantic_model.model_json_schema()
        pydantic_version = "v2"
    else:
        schema = pydantic_model.schema()
        pydantic_version = "v1"

    return gapic.FunctionDeclaration(
        name=schema["title"],
        description=schema.get("description", ""),
        parameters=_dict_to_gapic_schema(schema, pydantic_version=pydantic_version),
    )


def _format_dict_to_function_declaration(
    tool: Union[FunctionDescription, Dict[str, Any]],
) -> gapic.FunctionDeclaration:
    # Ensure we send "anyOf" parameters through pydantic v2 schema parsing
    pydantic_version = None
    if isinstance(tool, dict):
        properties = tool.get("parameters", {}).get("properties", {}).values()
        for property in properties:
            if "anyOf" in property:
                pydantic_version = "v2"
    if pydantic_version:
        parameters = _dict_to_gapic_schema(
            tool.get("parameters", {}), pydantic_version=pydantic_version
        )
    else:
        parameters = _dict_to_gapic_schema(tool.get("parameters", {}))

    return gapic.FunctionDeclaration(
        name=tool.get("name"),
        description=tool.get("description"),
        parameters=parameters,
    )


def _format_vertex_to_function_declaration(
    tool: vertexai.FunctionDeclaration,
) -> gapic.FunctionDeclaration:
    tool_dict = tool.to_dict()
    return _format_dict_to_function_declaration(tool_dict)


def _format_to_gapic_function_declaration(
    tool: _FunctionDeclarationLike,
) -> gapic.FunctionDeclaration:
    "Format tool into the Vertex function declaration."
    if isinstance(tool, BaseTool):
        return _format_base_tool_to_function_declaration(tool)
    elif isinstance(tool, type) and issubclass(tool, BaseModel):
        return _format_pydantic_to_function_declaration(tool)
    elif callable(tool):
        return _format_base_tool_to_function_declaration(callable_as_lc_tool()(tool))
    elif isinstance(tool, vertexai.FunctionDeclaration):
        return _format_vertex_to_function_declaration(tool)
    elif isinstance(tool, dict):
        # this could come from
        # 'langchain_core.utils.function_calling.convert_to_openai_tool'
        function = convert_to_openai_tool(cast(dict, tool))["function"]
        return _format_dict_to_function_declaration(cast(FunctionDescription, function))
    else:
        raise ValueError(f"Unsupported tool call type {tool}")


def _format_to_gapic_tool(tools: _ToolsType) -> gapic.Tool:
    gapic_tool = gapic.Tool()
    for tool in tools:
        if any(f in gapic_tool for f in ["google_search_retrieval", "retrieval"]):
            raise ValueError(
                "Providing multiple retrieval, google_search_retrieval"
                " or mixing with function_declarations is not supported"
            )
        if isinstance(tool, (gapic.Tool, vertexai.Tool)):
            rt: gapic.Tool = (
                tool if isinstance(tool, gapic.Tool) else tool._raw_tool  # type: ignore
            )
            if "retrieval" in rt:
                gapic_tool.retrieval = rt.retrieval
            if "google_search_retrieval" in rt:
                gapic_tool.google_search_retrieval = rt.google_search_retrieval
            if "function_declarations" in rt:
                gapic_tool.function_declarations.extend(rt.function_declarations)
        elif isinstance(tool, dict):
            # not _ToolDictLike
            if not any(
                f in tool
                for f in [
                    "function_declarations",
                    "google_search_retrieval",
                    "retrieval",
                ]
            ):
                fd = _format_to_gapic_function_declaration(tool)
                gapic_tool.function_declarations.append(fd)
                continue
            # _ToolDictLike
            tool = cast(_ToolDictLike, tool)
            if "function_declarations" in tool:
                function_declarations = tool["function_declarations"]
                if not isinstance(tool["function_declarations"], list):
                    raise ValueError(
                        "function_declarations should be a list"
                        f"got '{type(function_declarations)}'"
                    )
                if function_declarations:
                    fds = [
                        _format_to_gapic_function_declaration(fd)
                        for fd in function_declarations
                    ]
                    gapic_tool.function_declarations.extend(fds)
            if "google_search_retrieval" in tool:
                gapic_tool.google_search_retrieval = gapic.GoogleSearchRetrieval(
                    tool["google_search_retrieval"]
                )
            if "retrieval" in tool:
                gapic_tool.retrieval = gapic.Retrieval(tool["retrieval"])
        else:
            fd = _format_to_gapic_function_declaration(tool)
            gapic_tool.function_declarations.append(fd)
    return gapic_tool


[docs] class PydanticFunctionsOutputParser(BaseOutputParser): """Parse an output as a pydantic object. This parser is used to parse the output of a ChatModel that uses Google Vertex function format to invoke functions. The parser extracts the function call invocation and matches them to the pydantic schema provided. An exception will be raised if the function call does not match the provided schema. Example: ... code-block:: python message = AIMessage( content="This is a test message", additional_kwargs={ "function_call": { "name": "cookie", "arguments": json.dumps({"name": "value", "age": 10}), } }, ) chat_generation = ChatGeneration(message=message) class Cookie(BaseModel): name: str age: int class Dog(BaseModel): species: str # Full output parser = PydanticOutputFunctionsParser( pydantic_schema={"cookie": Cookie, "dog": Dog} ) result = parser.parse_result([chat_generation]) """ pydantic_schema: Union[Type[BaseModel], Dict[str, Type[BaseModel]]]
[docs] def parse_result( self, result: List[Generation], *, partial: bool = False ) -> BaseModel: if not isinstance(result[0], ChatGeneration): raise ValueError("This output parser only works on ChatGeneration output") message = result[0].message function_call = message.additional_kwargs.get("function_call", {}) if function_call: function_name = function_call["name"] tool_input = function_call.get("arguments", {}) if isinstance(self.pydantic_schema, dict): schema = self.pydantic_schema[function_name] else: schema = self.pydantic_schema return schema(**json.loads(tool_input)) else: raise OutputParserException(f"Could not parse function call: {message}")
[docs] def parse(self, text: str) -> BaseModel: raise ValueError("Can only parse messages")
class _FunctionCallingConfigDict(TypedDict): mode: Union[gapic.FunctionCallingConfig.Mode, int] allowed_function_names: Optional[List[str]] class _ToolConfigDict(TypedDict): function_calling_config: _FunctionCallingConfigDict _ToolChoiceType = Union[ dict, List[str], str, Literal["auto", "none", "any"], Literal[True] ] def _format_tool_config(tool_config: _ToolConfigDict) -> Union[gapic.ToolConfig, None]: if "function_calling_config" not in tool_config: raise ValueError( "Invalid ToolConfig, missing 'function_calling_config' key. Received:\n\n" f"{tool_config=}" ) return gapic.ToolConfig( function_calling_config=gapic.FunctionCallingConfig( **tool_config["function_calling_config"] ) ) def _tool_choice_to_tool_config( tool_choice: _ToolChoiceType, all_names: List[str], ) -> _ToolConfigDict: allowed_function_names: Optional[List[str]] = None if tool_choice is True or tool_choice == "any": mode = gapic.FunctionCallingConfig.Mode.ANY allowed_function_names = all_names elif tool_choice == "auto": mode = gapic.FunctionCallingConfig.Mode.AUTO elif tool_choice == "none": mode = gapic.FunctionCallingConfig.Mode.NONE elif isinstance(tool_choice, str): mode = gapic.FunctionCallingConfig.Mode.ANY allowed_function_names = [tool_choice] elif isinstance(tool_choice, list): mode = gapic.FunctionCallingConfig.Mode.ANY allowed_function_names = tool_choice elif isinstance(tool_choice, dict): if "mode" in tool_choice: mode = tool_choice["mode"] allowed_function_names = tool_choice.get("allowed_function_names") elif "function_calling_config" in tool_choice: mode = tool_choice["function_calling_config"]["mode"] allowed_function_names = tool_choice["function_calling_config"].get( "allowed_function_names" ) else: raise ValueError( f"Unrecognized tool choice format:\n\n{tool_choice=}\n\nShould match " f"VertexAI ToolConfig or FunctionCallingConfig format." ) else: raise ValueError(f"Unrecognized tool choice format:\n\n{tool_choice=}") return _ToolConfigDict( function_calling_config=_FunctionCallingConfigDict( mode=mode, allowed_function_names=allowed_function_names, ) )