"""Adapted fromhttps://github.com/maxfischer2781/asyncstdlib/blob/master/asyncstdlib/itertools.pyMIT License"""fromcollectionsimportdequefromcontextlibimportAbstractAsyncContextManagerfromtypesimportTracebackTypefromtypingimport(Any,AsyncContextManager,AsyncGenerator,AsyncIterable,AsyncIterator,Awaitable,Callable,Deque,Generic,Iterator,List,Optional,Tuple,Type,TypeVar,Union,cast,overload,)T=TypeVar("T")_no_default=object()# https://github.com/python/cpython/blob/main/Lib/test/test_asyncgen.py#L54# before 3.10, the builtin anext() was not available
[docs]defpy_anext(iterator:AsyncIterator[T],default:Union[T,Any]=_no_default)->Awaitable[Union[T,None,Any]]:"""Pure-Python implementation of anext() for testing purposes. Closely matches the builtin anext() C implementation. Can be used to compare the built-in implementation of the inner coroutines machinery to C-implementation of __anext__() and send() or throw() on the returned generator. Args: iterator: The async iterator to advance. default: The value to return if the iterator is exhausted. If not provided, a StopAsyncIteration exception is raised. Returns: The next value from the iterator, or the default value if the iterator is exhausted. Raises: TypeError: If the iterator is not an async iterator. """try:__anext__=cast(Callable[[AsyncIterator[T]],Awaitable[T]],type(iterator).__anext__)exceptAttributeErrorase:raiseTypeError(f"{iterator!r} is not an async iterator")fromeifdefaultis_no_default:return__anext__(iterator)asyncdefanext_impl()->Union[T,Any]:try:# The C code is way more low-level than this, as it implements# all methods of the iterator protocol. In this implementation# we're relying on higher-level coroutine concepts, but that's# exactly what we want -- crosstest pure-Python high-level# implementation and low-level C anext() iterators.returnawait__anext__(iterator)exceptStopAsyncIteration:returndefaultreturnanext_impl()
[docs]classNoLock:"""Dummy lock that provides the proper interface but no protection."""asyncdef__aenter__(self)->None:passasyncdef__aexit__(self,exc_type:Any,exc_val:Any,exc_tb:Any)->bool:returnFalse
[docs]asyncdeftee_peer(iterator:AsyncIterator[T],# the buffer specific to this peerbuffer:Deque[T],# the buffers of all peers, including our ownpeers:List[Deque[T]],lock:AsyncContextManager[Any],)->AsyncGenerator[T,None]:"""An individual iterator of a :py:func:`~.tee`. This function is a generator that yields items from the shared iterator ``iterator``. It buffers items until the least advanced iterator has yielded them as well. The buffer is shared with all other peers. Args: iterator: The shared iterator. buffer: The buffer for this peer. peers: The buffers of all peers. lock: The lock to synchronise access to the shared buffers. Yields: The next item from the shared iterator. """try:whileTrue:ifnotbuffer:asyncwithlock:# Another peer produced an item while we were waiting for the lock.# Proceed with the next loop iteration to yield the item.ifbuffer:continuetry:item=awaititerator.__anext__()exceptStopAsyncIteration:breakelse:# Append to all buffers, including our own. We'll fetch our# item from the buffer again, instead of yielding it directly.# This ensures the proper item ordering if any of our peers# are fetching items concurrently. They may have buffered their# item already.forpeer_bufferinpeers:peer_buffer.append(item)yieldbuffer.popleft()finally:asyncwithlock:# this peer is done – remove its bufferforidx,peer_bufferinenumerate(peers):# pragma: no branchifpeer_bufferisbuffer:peers.pop(idx)break# if we are the last peer, try and close the iteratorifnotpeersandhasattr(iterator,"aclose"):awaititerator.aclose()
[docs]classTee(Generic[T]):""" Create ``n`` separate asynchronous iterators over ``iterable``. This splits a single ``iterable`` into multiple iterators, each providing the same items in the same order. All child iterators may advance separately but share the same items from ``iterable`` -- when the most advanced iterator retrieves an item, it is buffered until the least advanced iterator has yielded it as well. A ``tee`` works lazily and can handle an infinite ``iterable``, provided that all iterators advance. .. code-block:: python3 async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # advance one iterator return a.map(operator.sub, previous, current) Unlike :py:func:`itertools.tee`, :py:func:`~.tee` returns a custom type instead of a :py:class:`tuple`. Like a tuple, it can be indexed, iterated and unpacked to get the child iterators. In addition, its :py:meth:`~.tee.aclose` method immediately closes all children, and it can be used in an ``async with`` context for the same effect. If ``iterable`` is an iterator and read elsewhere, ``tee`` will *not* provide these items. Also, ``tee`` must internally buffer each item until the last iterator has yielded it; if the most and least advanced iterator differ by most data, using a :py:class:`list` is more efficient (but not lazy). If the underlying iterable is concurrency safe (``anext`` may be awaited concurrently) the resulting iterators are concurrency safe as well. Otherwise, the iterators are safe if there is only ever one single "most advanced" iterator. To enforce sequential use of ``anext``, provide a ``lock`` - e.g. an :py:class:`asyncio.Lock` instance in an :py:mod:`asyncio` application - and access is automatically synchronised. """
[docs]def__init__(self,iterable:AsyncIterator[T],n:int=2,*,lock:Optional[AsyncContextManager[Any]]=None,):self._iterator=iterable.__aiter__()# before 3.10 aiter() doesn't existself._buffers:List[Deque[T]]=[deque()for_inrange(n)]self._children=tuple(tee_peer(iterator=self._iterator,buffer=buffer,peers=self._buffers,lock=lockiflockisnotNoneelseNoLock(),)forbufferinself._buffers)
[docs]asyncdefaclose(self)->None:"""Async close all child iterators."""forchildinself._children:awaitchild.aclose()
atee=Tee
[docs]classaclosing(AbstractAsyncContextManager):"""Async context manager for safely finalizing an asynchronously cleaned-up resource such as an async generator, calling its ``aclose()`` method. Code like this: async with aclosing(<module>.fetch(<arguments>)) as agen: <block> is equivalent to this: agen = <module>.fetch(<arguments>) try: <block> finally: await agen.aclose() """
[docs]asyncdefabatch_iterate(size:int,iterable:AsyncIterable[T])->AsyncIterator[List[T]]:"""Utility batching function for async iterables. Args: size: The size of the batch. iterable: The async iterable to batch. Returns: An async iterator over the batches. """batch:List[T]=[]asyncforelementiniterable:iflen(batch)<size:batch.append(element)iflen(batch)>=size:yieldbatchbatch=[]ifbatch:yieldbatch