Source code for langchain_nvidia_ai_endpoints.callbacks
"""Callback Handler that prints to std out."""from__future__importannotationsimportloggingimportthreadingfromcollectionsimportdefaultdictfromcontextlibimportcontextmanagerfromcontextvarsimportContextVarfromtypingimportAny,Dict,Generator,List,Optionalfromlangchain_core.callbacksimportBaseCallbackHandlerfromlangchain_core.outputsimportLLMResultfromlangchain_core.tracers.contextimportregister_configure_hooklogger=logging.getLogger(__name__)## This module contains output parsers for OpenAI tools. Set here for version control"""### **Usage/Cost Tracking**For tracking model usage and , you can use the `get_usage_callback` context manager totrack token information similar to `get_openai_callback`. Additionally, you can specifycustom price mappings as necessary (`price_map` argument), or provide a custom callbackmanager for advanced use-cases (`callback` argument).**NOTE:** This feature is currently not supported in streaming modes, but works finefor non-streaming `invoke/ainvoke` queries.```from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddingsfrom langchain_nvidia_ai_endpoints.callbacks import get_usage_callback## Assume a price map per 1K tokens for a particular deployment planprice_map = { "mixtral_8x7b": 0.00060, "gemma_7b": 0.0002, "nvolveqa_40k": 0.000016,}llm_large = ChatNVIDIA(model="mixtral_8x7b", temperature=0.1)llm_small = ChatNVIDIA(model="gemma_2b", temperature=0.1)embedding = NVIDIAEmbeddings(model="nvolveqa_40k")models = [llm_large, llm_small, embedding]with get_usage_callback(price_map=price_map) as cb: ## Reset either at beginning or end. Statistics will run until cleared cb.reset() llm_large.invoke("Tell me a joke") print(cb, end="\n\n") # llm_large.invoke("Tell me a short joke") # print(cb, end="\n\n") # ## Tracking through streaming coming soon # [_ for _ in llm_small.stream("Tell me a joke")] # print(cb, end="\n[Should not change yet]\n\n") ## Tracking for streaming supported embedding.embed_query("What a nice day :D") print(cb, end="\n\n") # ## Sanity check. Should still be tracked fine # llm_small.invoke("Tell me a long joke") # print(cb, end="\n\n")## Out of scope. Will not be trackedllm_small.invoke("Tell me a short joke")print(cb, end="\n[Should not change ever]\n\n")cb.model_usage```"""DEFAULT_MODEL_COST_PER_1K_TOKENS:Dict[str,float]={}
[docs]defstandardize_model_name(model_name:str,price_map:dict={},is_completion:bool=False,)->str:""" Standardize the model name to a format that can be used in the OpenAI API. Args: model_name: Model name to standardize. is_completion: Whether the model is used for completion or not. Defaults to False. Returns: Standardized model name. """model_name=model_name.lower()if".ft-"inmodel_name:model_name=model_name.split(".ft-")[0]+"-azure-finetuned"if":ft-"inmodel_name:model_name=model_name.split(":")[0]+"-finetuned-legacy"if"ft:"inmodel_name:model_name=model_name.split(":")[1]+"-finetuned"ifmodel_name.startswith("playground_"):model_name=model_name.replace("playground_","")if(is_completionandmodel_name+"-completion"inprice_mapand(model_name.startswith("gpt-4")ormodel_name.startswith("gpt-3.5")ormodel_name.startswith("gpt-35")or("finetuned"inmodel_nameand"legacy"notinmodel_name))):returnmodel_name+"-completion"else:returnmodel_name
[docs]defget_token_cost_for_model(model_name:str,num_tokens:int,price_map:dict,is_completion:bool=False)->float:""" Get the cost in USD for a given model and number of tokens. Args: model_name: Name of the model num_tokens: Number of tokens. price_map: Map of model names to cost per 1000 tokens. Defaults to AI Foundation Endpoint pricing per https://www.together.ai/pricing. is_completion: Whether the model is used for completion or not. Defaults to False. Returns: Cost in USD. """model_name=standardize_model_name(model_name,price_map,is_completion=is_completion,)ifmodel_namenotinprice_map:raiseValueError(f"Unknown model: {model_name}. Please provide a valid model name.""Known models are: "+", ".join(price_map.keys()))returnprice_map[model_name]*(num_tokens/1000)
[docs]classUsageCallbackHandler(BaseCallbackHandler):"""Callback Handler that tracks OpenAI info."""## Per-model statistics_model_usage:defaultdict=defaultdict(lambda:{"total_tokens":0,"prompt_tokens":0,"completion_tokens":0,"successful_requests":0,"total_cost":0.0,})llm_output:dict={}price_map:dict={k:vfork,vinDEFAULT_MODEL_COST_PER_1K_TOKENS.items()}## Aggregate statistics, compatible with OpenAICallbackHandler@propertydeftotal_tokens(self)->int:"""Total tokens used."""returnself._model_usage["total"]["total_tokens"]@propertydefprompt_tokens(self)->int:"""Prompt tokens used."""returnself._model_usage["total"]["prompt_tokens"]@propertydefcompletion_tokens(self)->int:"""Completion tokens used."""returnself._model_usage["total"]["completion_tokens"]@propertydefsuccessful_requests(self)->int:"""Total successful requests."""returnself._model_usage["total"]["successful_requests"]@propertydeftotal_cost(self)->float:"""Total cost in USD."""returnself._model_usage["total"]["total_cost"]
[docs]defon_llm_end(self,response:LLMResult,**kwargs:Any)->None:ifnotresponse.llm_output:response.llm_output={}ifnotself.llm_output:self.llm_output={}response.llm_output={**self.llm_output,**response.llm_output}self.llm_output={}ifnotresponse.llm_output:returnNone# compute tokens and cost for this requesttoken_usage=response.llm_output.get("token_usage",response.llm_output.get("usage",{}))completion_tokens=token_usage.get("completion_tokens",0)prompt_tokens=token_usage.get("prompt_tokens",0)model_name=response.llm_output.get("model_name","")ifmodel_nameinself.price_map:completion_cost=get_token_cost_for_model(model_name,completion_tokens,self.price_map,is_completion=True)prompt_cost=get_token_cost_for_model(model_name,prompt_tokens,self.price_map)else:completion_cost=0prompt_cost=0# update shared state behind lockwithself._lock:forbasein(self._model_usage["total"],self._model_usage[model_name]):base["total_tokens"]+=token_usage.get("total_tokens",0)base["prompt_tokens"]+=prompt_tokensbase["completion_tokens"]+=completion_tokensbase["total_cost"]+=prompt_cost+completion_costbase["successful_requests"]+=1forkeyinbase.keys():base[key]=round(base[key],10)
def__copy__(self)->"UsageCallbackHandler":"""Return a copy of the callback handler."""returnselfdef__deepcopy__(self,memo:Any)->"UsageCallbackHandler":"""Return a deep copy of the callback handler."""returnself
[docs]@contextmanagerdefget_usage_callback(price_map:dict={},callback:Optional[UsageCallbackHandler]=None,)->Generator[UsageCallbackHandler,None,None]:"""Get the OpenAI callback handler in a context manager. which conveniently exposes token and cost information. Returns: OpenAICallbackHandler: The OpenAI callback handler. Example: >>> with get_openai_callback() as cb: ... # Use the OpenAI callback handler """ifnotcallback:callback=UsageCallbackHandler()ifhasattr(callback,"price_map"):ifhasattr(callback,"_lock"):withcallback._lock:callback.price_map.update(price_map)else:callback.price_map.update(price_map)usage_callback_var.set(callback)yieldcallbackusage_callback_var.set(None)