Source code for langchain_community.tools.databricks.tool
importjsonfromdatetimeimportdate,datetimefromdecimalimportDecimalfromhashlibimportmd5fromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,Type,Unionfromlangchain_core._apiimportdeprecatedfromlangchain_core.toolsimportBaseTool,StructuredToolfromlangchain_core.tools.baseimportBaseToolkitfrompydanticimportBaseModel,Field,create_modelfromtyping_extensionsimportSelfifTYPE_CHECKING:fromdatabricks.sdk.service.catalogimportFunctionInfofrompydanticimportConfigDictfromlangchain_community.tools.databricks._executionimportexecute_functiondef_uc_type_to_pydantic_type(uc_type_json:Union[str,Dict[str,Any]])->Type:mapping={"long":int,"binary":bytes,"boolean":bool,"date":date,"double":float,"float":float,"integer":int,"short":int,"string":str,"timestamp":datetime,"timestamp_ntz":datetime,"byte":int,}ifisinstance(uc_type_json,str):ifuc_type_jsoninmapping:returnmapping[uc_type_json]else:ifuc_type_json.startswith("decimal"):returnDecimalelifuc_type_json=="void"oruc_type_json.startswith("interval"):raiseTypeError(f"Type {uc_type_json} is not supported.")else:raiseTypeError(f"Unknown type {uc_type_json}. Try upgrading this package.")else:assertisinstance(uc_type_json,dict)tpe=uc_type_json["type"]iftpe=="array":element_type=_uc_type_to_pydantic_type(uc_type_json["elementType"])ifuc_type_json["containsNull"]:element_type=Optional[element_type]# type: ignorereturnList[element_type]# type: ignoreeliftpe=="map":key_type=uc_type_json["keyType"]assertkey_type=="string",TypeError(f"Only support STRING key type for MAP but got {key_type}.")value_type=_uc_type_to_pydantic_type(uc_type_json["valueType"])ifuc_type_json["valueContainsNull"]:value_type:Type=Optional[value_type]# type: ignorereturnDict[str,value_type]# type: ignoreeliftpe=="struct":fields={}forfieldinuc_type_json["fields"]:field_type=_uc_type_to_pydantic_type(field["type"])iffield.get("nullable"):field_type=Optional[field_type]# type: ignorecomment=(uc_type_json["metadata"].get("comment")if"metadata"inuc_type_jsonelseNone)fields[field["name"]]=(field_type,Field(...,description=comment))uc_type_json_str=json.dumps(uc_type_json,sort_keys=True)type_hash=md5(uc_type_json_str.encode()).hexdigest()[:8]returncreate_model(f"Struct_{type_hash}",**fields)# type: ignoreelse:raiseTypeError(f"Unknown type {uc_type_json}. Try upgrading this package.")def_generate_args_schema(function:"FunctionInfo")->Type[BaseModel]:iffunction.input_paramsisNone:returnBaseModelparams=function.input_params.parametersassertparamsisnotNonefields={}forpinparams:assertp.type_jsonisnotNonetype_json=json.loads(p.type_json)["type"]pydantic_type=_uc_type_to_pydantic_type(type_json)description=p.commentdefault:Any=...ifp.parameter_default:pydantic_type=Optional[pydantic_type]# type: ignoredefault=None# TODO: Convert default value string to the correct type.# We might need to use statement execution API# to get the JSON representation of the value.default_description=f"(Default: {p.parameter_default})"ifdescription:description+=f" {default_description}"else:description=default_descriptionfields[p.name]=(pydantic_type,Field(default=default,description=description),)returncreate_model(f"{function.catalog_name}__{function.schema_name}__{function.name}__params",**fields,# type: ignore)def_get_tool_name(function:"FunctionInfo")->str:tool_name=f"{function.catalog_name}__{function.schema_name}__{function.name}"[-64:]returntool_namedef_get_default_workspace_client()->Any:try:fromdatabricks.sdkimportWorkspaceClientexceptImportErrorase:raiseImportError("Could not import databricks-sdk python package. ""Please install it with `pip install databricks-sdk`.")fromereturnWorkspaceClient()
[docs]@deprecated(since="0.3.18",removal="1.0",alternative_import="databricks_langchain.uc_ai.UCFunctionToolkit",)classUCFunctionToolkit(BaseToolkit):warehouse_id:str=Field(description="The ID of a Databricks SQL Warehouse to execute functions.")workspace_client:Any=Field(default_factory=_get_default_workspace_client,description="Databricks workspace client.",)tools:Dict[str,BaseTool]=Field(default_factory=dict)model_config=ConfigDict(arbitrary_types_allowed=True,)
[docs]definclude(self,*function_names:str,**kwargs:Any)->Self:""" Includes UC functions to the toolkit. Args: functions: A list of UC function names in the format "catalog_name.schema_name.function_name" or "catalog_name.schema_name.*". If the function name ends with ".*", all functions in the schema will be added. kwargs: Extra arguments to pass to StructuredTool, e.g., `return_direct`. """fornameinfunction_names:ifname.endswith(".*"):catalog_name,schema_name=name[:-2].split(".")# TODO: handle pagination, warn and truncate if too manyfunctions=self.workspace_client.functions.list(catalog_name=catalog_name,schema_name=schema_name)forfinfunctions:assertf.full_nameisnotNoneself.include(f.full_name,**kwargs)else:ifnamenotinself.tools:self.tools[name]=self._make_tool(name,**kwargs)returnself
def_make_tool(self,function_name:str,**kwargs:Any)->BaseTool:function=self.workspace_client.functions.get(function_name)name=_get_tool_name(function)description=function.commentor""args_schema=_generate_args_schema(function)deffunc(*args:Any,**kwargs:Any)->str:# TODO: We expect all named args and ignore args.# Non-empty args show up when the function has no parameters.args_json=json.loads(json.dumps(kwargs,default=str))result=execute_function(ws=self.workspace_client,warehouse_id=self.warehouse_id,function=function,parameters=args_json,)returnresult.to_json()returnStructuredTool(name=name,description=description,args_schema=args_schema,func=func,**kwargs,)