Source code for langchain.callbacks.streaming_aiter
from__future__importannotationsimportasynciofromtypingimportAny,AsyncIterator,Dict,List,Literal,Union,castfromlangchain_core.callbacksimportAsyncCallbackHandlerfromlangchain_core.outputsimportLLMResult# TODO If used by two LLM runs in parallel this won't work as expected
[docs]classAsyncIteratorCallbackHandler(AsyncCallbackHandler):"""Callback handler that returns an async iterator."""queue:asyncio.Queue[str]done:asyncio.Event@propertydefalways_verbose(self)->bool:returnTrue
[docs]asyncdefon_llm_start(self,serialized:Dict[str,Any],prompts:List[str],**kwargs:Any)->None:# If two calls are made in a row, this resets the stateself.done.clear()
[docs]asyncdefaiter(self)->AsyncIterator[str]:whilenotself.queue.empty()ornotself.done.is_set():# Wait for the next token in the queue,# but stop waiting if the done event is setdone,other=awaitasyncio.wait([# NOTE: If you add other tasks here, update the code below,# which assumes each set has exactly one task eachasyncio.ensure_future(self.queue.get()),asyncio.ensure_future(self.done.wait()),],return_when=asyncio.FIRST_COMPLETED,)# Cancel the other taskifother:other.pop().cancel()# Extract the value of the first completed tasktoken_or_done=cast(Union[str,Literal[True]],done.pop().result())# If the extracted value is the boolean True, the done event was setiftoken_or_doneisTrue:break# Otherwise, the extracted value is a token, which we yieldyieldtoken_or_done