Source code for langchain_core.output_parsers.transform
from __future__ import annotations
from collections.abc import AsyncIterator, Iterator
from typing import (
TYPE_CHECKING,
Any,
Optional,
Union,
)
from langchain_core.messages import BaseMessage, BaseMessageChunk
from langchain_core.output_parsers.base import BaseOutputParser, T
from langchain_core.outputs import (
ChatGeneration,
ChatGenerationChunk,
Generation,
GenerationChunk,
)
from langchain_core.runnables.config import run_in_executor
if TYPE_CHECKING:
from langchain_core.runnables import RunnableConfig
[docs]
class BaseTransformOutputParser(BaseOutputParser[T]):
"""Base class for an output parser that can handle streaming input."""
def _transform(self, input: Iterator[Union[str, BaseMessage]]) -> Iterator[T]:
for chunk in input:
if isinstance(chunk, BaseMessage):
yield self.parse_result([ChatGeneration(message=chunk)])
else:
yield self.parse_result([Generation(text=chunk)])
async def _atransform(
self, input: AsyncIterator[Union[str, BaseMessage]]
) -> AsyncIterator[T]:
async for chunk in input:
if isinstance(chunk, BaseMessage):
yield await run_in_executor(
None, self.parse_result, [ChatGeneration(message=chunk)]
)
else:
yield await run_in_executor(
None, self.parse_result, [Generation(text=chunk)]
)
def transform(
self,
input: Iterator[Union[str, BaseMessage]],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[T]:
"""Transform the input into the output format.
Args:
input: The input to transform.
config: The configuration to use for the transformation.
kwargs: Additional keyword arguments.
Yields:
The transformed output.
"""
yield from self._transform_stream_with_config(
input, self._transform, config, run_type="parser"
)
async def atransform(
self,
input: AsyncIterator[Union[str, BaseMessage]],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[T]:
"""Async transform the input into the output format.
Args:
input: The input to transform.
config: The configuration to use for the transformation.
kwargs: Additional keyword arguments.
Yields:
The transformed output.
"""
async for chunk in self._atransform_stream_with_config(
input, self._atransform, config, run_type="parser"
):
yield chunk
[docs]
class BaseCumulativeTransformOutputParser(BaseTransformOutputParser[T]):
"""Base class for an output parser that can handle streaming input."""
diff: bool = False
"""In streaming mode, whether to yield diffs between the previous and current
parsed output, or just the current parsed output.
"""
def _diff(self, prev: Optional[T], next: T) -> T:
"""Convert parsed outputs into a diff format. The semantics of this are
up to the output parser.
Args:
prev: The previous parsed output.
next: The current parsed output.
Returns:
The diff between the previous and current parsed output.
"""
raise NotImplementedError
def _transform(self, input: Iterator[Union[str, BaseMessage]]) -> Iterator[Any]:
prev_parsed = None
acc_gen: Union[GenerationChunk, ChatGenerationChunk, None] = None
for chunk in input:
chunk_gen: Union[GenerationChunk, ChatGenerationChunk]
if isinstance(chunk, BaseMessageChunk):
chunk_gen = ChatGenerationChunk(message=chunk)
elif isinstance(chunk, BaseMessage):
chunk_gen = ChatGenerationChunk(
message=BaseMessageChunk(**chunk.dict())
)
else:
chunk_gen = GenerationChunk(text=chunk)
acc_gen = chunk_gen if acc_gen is None else acc_gen + chunk_gen # type: ignore[operator]
parsed = self.parse_result([acc_gen], partial=True)
if parsed is not None and parsed != prev_parsed:
if self.diff:
yield self._diff(prev_parsed, parsed)
else:
yield parsed
prev_parsed = parsed
async def _atransform(
self, input: AsyncIterator[Union[str, BaseMessage]]
) -> AsyncIterator[T]:
prev_parsed = None
acc_gen: Union[GenerationChunk, ChatGenerationChunk, None] = None
async for chunk in input:
chunk_gen: Union[GenerationChunk, ChatGenerationChunk]
if isinstance(chunk, BaseMessageChunk):
chunk_gen = ChatGenerationChunk(message=chunk)
elif isinstance(chunk, BaseMessage):
chunk_gen = ChatGenerationChunk(
message=BaseMessageChunk(**chunk.dict())
)
else:
chunk_gen = GenerationChunk(text=chunk)
acc_gen = chunk_gen if acc_gen is None else acc_gen + chunk_gen # type: ignore[operator]
parsed = await self.aparse_result([acc_gen], partial=True)
if parsed is not None and parsed != prev_parsed:
if self.diff:
yield await run_in_executor(None, self._diff, prev_parsed, parsed)
else:
yield parsed
prev_parsed = parsed