Source code for langchain_community.callbacks.human
fromtypingimportAny,Awaitable,Callable,Dict,OptionalfromuuidimportUUIDfromlangchain_core.callbacksimportAsyncCallbackHandler,BaseCallbackHandlerdef_default_approve(_input:str)->bool:msg=("Do you approve of the following input? ""Anything except 'Y'/'Yes' (case-insensitive) will be treated as a no.")msg+="\n\n"+_input+"\n"resp=input(msg)returnresp.lower()in("yes","y")asyncdef_adefault_approve(_input:str)->bool:msg=("Do you approve of the following input? ""Anything except 'Y'/'Yes' (case-insensitive) will be treated as a no.")msg+="\n\n"+_input+"\n"resp=input(msg)returnresp.lower()in("yes","y")def_default_true(_:Dict[str,Any])->bool:returnTrue
[docs]classHumanRejectedException(Exception):"""Exception to raise when a person manually review and rejects a value."""
[docs]classHumanApprovalCallbackHandler(BaseCallbackHandler):"""Callback for manually validating values."""raise_error:bool=True
[docs]defon_tool_start(self,serialized:Dict[str,Any],input_str:str,*,run_id:UUID,parent_run_id:Optional[UUID]=None,**kwargs:Any,)->Any:ifself._should_check(serialized)andnotself._approve(input_str):raiseHumanRejectedException(f"Inputs {input_str} to tool {serialized} were rejected.")
[docs]classAsyncHumanApprovalCallbackHandler(AsyncCallbackHandler):"""Asynchronous callback for manually validating values."""raise_error:bool=True
[docs]asyncdefon_tool_start(self,serialized:Dict[str,Any],input_str:str,*,run_id:UUID,parent_run_id:Optional[UUID]=None,**kwargs:Any,)->Any:ifself._should_check(serialized)andnotawaitself._approve(input_str):raiseHumanRejectedException(f"Inputs {input_str} to tool {serialized} were rejected.")