import asyncio
import logging
import warnings
from concurrent.futures import Future, ThreadPoolExecutor
from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
cast,
)
import aiohttp
import requests
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utils.user_agent import get_user_agent
logger = logging.getLogger(__name__)
default_header_template = {
"User-Agent": get_user_agent(),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*"
";q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
def _build_metadata(soup: Any, url: str) -> dict:
"""Build metadata from BeautifulSoup output."""
metadata = {"source": url}
if title := soup.find("title"):
metadata["title"] = title.get_text()
if description := soup.find("meta", attrs={"name": "description"}):
metadata["description"] = description.get("content", "No description found.")
if html := soup.find("html"):
metadata["language"] = html.get("lang", "No language found.")
return metadata
[docs]
class AsyncHtmlLoader(BaseLoader):
"""Load `HTML` asynchronously."""
[docs]
def __init__(
self,
web_path: Union[str, List[str]],
header_template: Optional[dict] = None,
verify_ssl: Optional[bool] = True,
proxies: Optional[dict] = None,
autoset_encoding: bool = True,
encoding: Optional[str] = None,
default_parser: str = "html.parser",
requests_per_second: int = 2,
requests_kwargs: Optional[Dict[str, Any]] = None,
raise_for_status: bool = False,
ignore_load_errors: bool = False,
*,
preserve_order: bool = True,
trust_env: bool = False,
):
"""Initialize with a webpage path."""
# TODO: Deprecate web_path in favor of web_paths, and remove this
# left like this because there are a number of loaders that expect single
# urls
if isinstance(web_path, str):
self.web_paths = [web_path]
elif isinstance(web_path, List):
self.web_paths = web_path
headers = header_template or default_header_template
if not headers.get("User-Agent"):
try:
from fake_useragent import UserAgent
headers["User-Agent"] = UserAgent().random
except ImportError:
logger.info(
"fake_useragent not found, using default user agent."
"To get a realistic header for requests, "
"`pip install fake_useragent`."
)
self.session = requests.Session()
self.session.headers = dict(headers)
self.session.verify = verify_ssl
if proxies:
self.session.proxies.update(proxies)
self.requests_per_second = requests_per_second
self.default_parser = default_parser
self.requests_kwargs = requests_kwargs or {}
self.raise_for_status = raise_for_status
self.autoset_encoding = autoset_encoding
self.encoding = encoding
self.ignore_load_errors = ignore_load_errors
self.preserve_order = preserve_order
self.trust_env = trust_env
def _fetch_valid_connection_docs(self, url: str) -> Any:
if self.ignore_load_errors:
try:
return self.session.get(url, **self.requests_kwargs)
except Exception as e:
warnings.warn(str(e))
return None
return self.session.get(url, **self.requests_kwargs)
@staticmethod
def _check_parser(parser: str) -> None:
"""Check that parser is valid for bs4."""
valid_parsers = ["html.parser", "lxml", "xml", "lxml-xml", "html5lib"]
if parser not in valid_parsers:
raise ValueError(
"`parser` must be one of " + ", ".join(valid_parsers) + "."
)
async def _fetch(
self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5
) -> str:
async with aiohttp.ClientSession(trust_env=self.trust_env) as session:
for i in range(retries):
try:
kwargs: Dict = dict(
headers=self.session.headers,
cookies=self.session.cookies.get_dict(),
**self.requests_kwargs,
)
if not self.session.verify:
kwargs["ssl"] = False
async with session.get(
url,
**kwargs,
) as response:
try:
text = await response.text()
except UnicodeDecodeError:
logger.error(f"Failed to decode content from {url}")
text = ""
return text
except (aiohttp.ClientConnectionError, TimeoutError) as e:
if i == retries - 1 and self.ignore_load_errors:
logger.warning(f"Error fetching {url} after {retries} retries.")
return ""
elif i == retries - 1:
raise
else:
logger.warning(
f"Error fetching {url} with attempt "
f"{i + 1}/{retries}: {e}. Retrying..."
)
await asyncio.sleep(cooldown * backoff**i)
raise ValueError("retry count exceeded")
async def _fetch_with_rate_limit(
self, url: str, semaphore: asyncio.Semaphore
) -> Tuple[str, str]:
async with semaphore:
return url, await self._fetch(url)
async def _lazy_fetch_all(
self, urls: List[str], preserve_order: bool
) -> AsyncIterator[Tuple[str, str]]:
semaphore = asyncio.Semaphore(self.requests_per_second)
tasks = [
asyncio.create_task(self._fetch_with_rate_limit(url, semaphore))
for url in urls
]
try:
from tqdm.asyncio import tqdm_asyncio
if preserve_order:
for task in tqdm_asyncio(
tasks, desc="Fetching pages", ascii=True, mininterval=1
):
yield await task
else:
for task in tqdm_asyncio.as_completed(
tasks, desc="Fetching pages", ascii=True, mininterval=1
):
yield await task
except ImportError:
warnings.warn("For better logging of progress, `pip install tqdm`")
if preserve_order:
for result in await asyncio.gather(*tasks):
yield result
else:
for task in asyncio.as_completed(tasks):
yield await task
[docs]
async def fetch_all(self, urls: List[str]) -> List[str]:
"""Fetch all urls concurrently with rate limiting."""
return [doc async for _, doc in self._lazy_fetch_all(urls, True)]
def _to_document(self, url: str, text: str) -> Document:
from bs4 import BeautifulSoup
if url.endswith(".xml"):
parser = "xml"
else:
parser = self.default_parser
self._check_parser(parser)
soup = BeautifulSoup(text, parser)
metadata = _build_metadata(soup, url)
return Document(page_content=text, metadata=metadata)
[docs]
def lazy_load(self) -> Iterator[Document]:
"""Lazy load text from the url(s) in web_path."""
results: List[str]
try:
# Raises RuntimeError if there is no current event loop.
asyncio.get_running_loop()
# If there is a current event loop, we need to run the async code
# in a separate loop, in a separate thread.
with ThreadPoolExecutor(max_workers=1) as executor:
future: Future[List[str]] = executor.submit(
asyncio.run, # type: ignore[arg-type]
self.fetch_all(self.web_paths), # type: ignore[arg-type]
)
results = future.result()
except RuntimeError:
results = asyncio.run(self.fetch_all(self.web_paths))
for i, text in enumerate(cast(List[str], results)):
yield self._to_document(self.web_paths[i], text)
[docs]
async def alazy_load(self) -> AsyncIterator[Document]:
"""Lazy load text from the url(s) in web_path."""
async for url, text in self._lazy_fetch_all(
self.web_paths, self.preserve_order
):
yield self._to_document(url, text)