"""Loads YouTube transcript."""
from __future__ import annotations
import logging
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Sequence, Union
from urllib.parse import parse_qs, urlparse
from xml.etree.ElementTree import ParseError # OK: trusted-source
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_core.pydantic_v1.dataclasses import dataclass
from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
SCOPES = ["https://www.googleapis.com/auth/youtube.readonly"]
[docs]@dataclass
class GoogleApiClient:
"""Generic Google API Client.
To use, you should have the ``google_auth_oauthlib,youtube_transcript_api,google``
python package installed.
As the google api expects credentials you need to set up a google account and
register your Service. "https://developers.google.com/docs/api/quickstart/python"
*Security Note*: Note that parsing of the transcripts relies on the standard
xml library but the input is viewed as trusted in this case.
Example:
.. code-block:: python
from langchain_community.document_loaders import GoogleApiClient
google_api_client = GoogleApiClient(
service_account_path=Path("path_to_your_sec_file.json")
)
"""
credentials_path: Path = Path.home() / ".credentials" / "credentials.json"
service_account_path: Path = Path.home() / ".credentials" / "credentials.json"
token_path: Path = Path.home() / ".credentials" / "token.json"
def __post_init__(self) -> None:
self.creds = self._load_credentials()
[docs] @root_validator(pre=True)
def validate_channel_or_videoIds_is_set(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate that either folder_id or document_ids is set, but not both."""
if not values.get("credentials_path") and not values.get(
"service_account_path"
):
raise ValueError("Must specify either channel_name or video_ids")
return values
def _load_credentials(self) -> Any:
"""Load credentials."""
# Adapted from https://developers.google.com/drive/api/v3/quickstart/python
try:
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"google-api-python-client google-auth-httplib2 "
"google-auth-oauthlib "
"youtube-transcript-api` "
"to use the Google Drive loader"
)
creds = None
if self.service_account_path.exists():
return service_account.Credentials.from_service_account_file(
str(self.service_account_path)
)
if self.token_path.exists():
creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
str(self.credentials_path), SCOPES
)
creds = flow.run_local_server(port=0)
with open(self.token_path, "w") as token:
token.write(creds.to_json())
return creds
ALLOWED_SCHEMES = {"http", "https"}
ALLOWED_NETLOCS = {
"youtu.be",
"m.youtube.com",
"youtube.com",
"www.youtube.com",
"www.youtube-nocookie.com",
"vid.plus",
}
def _parse_video_id(url: str) -> Optional[str]:
"""Parse a YouTube URL and return the video ID if valid, otherwise None."""
parsed_url = urlparse(url)
if parsed_url.scheme not in ALLOWED_SCHEMES:
return None
if parsed_url.netloc not in ALLOWED_NETLOCS:
return None
path = parsed_url.path
if path.endswith("/watch"):
query = parsed_url.query
parsed_query = parse_qs(query)
if "v" in parsed_query:
ids = parsed_query["v"]
video_id = ids if isinstance(ids, str) else ids[0]
else:
return None
else:
path = parsed_url.path.lstrip("/")
video_id = path.split("/")[-1]
if len(video_id) != 11: # Video IDs are 11 characters long
return None
return video_id
[docs]class YoutubeLoader(BaseLoader):
"""Load `YouTube` video transcripts."""
[docs] def __init__(
self,
video_id: str,
add_video_info: bool = False,
language: Union[str, Sequence[str]] = "en",
translation: Optional[str] = None,
transcript_format: TranscriptFormat = TranscriptFormat.TEXT,
continue_on_failure: bool = False,
chunk_size_seconds: int = 120,
):
"""Initialize with YouTube video ID."""
self.video_id = video_id
self._metadata = {"source": video_id}
self.add_video_info = add_video_info
self.language = language
if isinstance(language, str):
self.language = [language]
else:
self.language = language
self.translation = translation
self.transcript_format = transcript_format
self.continue_on_failure = continue_on_failure
self.chunk_size_seconds = chunk_size_seconds
[docs] @classmethod
def from_youtube_url(cls, youtube_url: str, **kwargs: Any) -> YoutubeLoader:
"""Given a YouTube URL, construct a loader.
See `YoutubeLoader()` constructor for a list of keyword arguments.
"""
video_id = cls.extract_video_id(youtube_url)
return cls(video_id, **kwargs)
def _make_chunk_document(
self, chunk_pieces: List[Dict], chunk_start_seconds: int
) -> Document:
"""Create Document from chunk of transcript pieces."""
m, s = divmod(chunk_start_seconds, 60)
h, m = divmod(m, 60)
return Document(
page_content=" ".join(
map(lambda chunk_piece: chunk_piece["text"].strip(" "), chunk_pieces)
),
metadata={
**self._metadata,
"start_seconds": chunk_start_seconds,
"start_timestamp": f"{h:02d}:{m:02d}:{s:02d}",
"source":
# replace video ID with URL to start time
f"https://www.youtube.com/watch?v={self.video_id}"
f"&t={chunk_start_seconds}s",
},
)
def _get_transcript_chunks(
self, transcript_pieces: List[Dict]
) -> Generator[Document, None, None]:
chunk_pieces: List[Dict[str, Any]] = []
chunk_start_seconds = 0
chunk_time_limit = self.chunk_size_seconds
for transcript_piece in transcript_pieces:
piece_end = transcript_piece["start"] + transcript_piece["duration"]
if piece_end > chunk_time_limit:
if chunk_pieces:
yield self._make_chunk_document(chunk_pieces, chunk_start_seconds)
chunk_pieces = []
chunk_start_seconds = chunk_time_limit
chunk_time_limit += self.chunk_size_seconds
chunk_pieces.append(transcript_piece)
if len(chunk_pieces) > 0:
yield self._make_chunk_document(chunk_pieces, chunk_start_seconds)
[docs] def load(self) -> List[Document]:
"""Load YouTube transcripts into `Document` objects."""
try:
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
YouTubeTranscriptApi,
)
except ImportError:
raise ImportError(
'Could not import "youtube_transcript_api" Python package. '
"Please install it with `pip install youtube-transcript-api`."
)
if self.add_video_info:
# Get more video meta info
# Such as title, description, thumbnail url, publish_date
video_info = self._get_video_info()
self._metadata.update(video_info)
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
except TranscriptsDisabled:
return []
try:
transcript = transcript_list.find_transcript(self.language)
except NoTranscriptFound:
transcript = transcript_list.find_transcript(["en"])
if self.translation is not None:
transcript = transcript.translate(self.translation)
transcript_pieces: List[Dict[str, Any]] = transcript.fetch()
if self.transcript_format == TranscriptFormat.TEXT:
transcript = " ".join(
map(
lambda transcript_piece: transcript_piece["text"].strip(" "),
transcript_pieces,
)
)
return [Document(page_content=transcript, metadata=self._metadata)]
elif self.transcript_format == TranscriptFormat.LINES:
return list(
map(
lambda transcript_piece: Document(
page_content=transcript_piece["text"].strip(" "),
metadata=dict(
filter(
lambda item: item[0] != "text", transcript_piece.items()
)
),
),
transcript_pieces,
)
)
elif self.transcript_format == TranscriptFormat.CHUNKS:
return list(self._get_transcript_chunks(transcript_pieces))
else:
raise ValueError("Unknown transcript format.")
def _get_video_info(self) -> Dict:
"""Get important video information.
Components include:
- title
- description
- thumbnail URL,
- publish_date
- channel author
- and more.
"""
try:
from pytube import YouTube
except ImportError:
raise ImportError(
'Could not import "pytube" Python package. '
"Please install it with `pip install pytube`."
)
yt = YouTube(f"https://www.youtube.com/watch?v={self.video_id}")
video_info = {
"title": yt.title or "Unknown",
"description": yt.description or "Unknown",
"view_count": yt.views or 0,
"thumbnail_url": yt.thumbnail_url or "Unknown",
"publish_date": yt.publish_date.strftime("%Y-%m-%d %H:%M:%S")
if yt.publish_date
else "Unknown",
"length": yt.length or 0,
"author": yt.author or "Unknown",
}
return video_info
[docs]@dataclass
class GoogleApiYoutubeLoader(BaseLoader):
"""Load all Videos from a `YouTube` Channel.
To use, you should have the ``googleapiclient,youtube_transcript_api``
python package installed.
As the service needs a google_api_client, you first have to initialize
the GoogleApiClient.
Additionally you have to either provide a channel name or a list of videoids
"https://developers.google.com/docs/api/quickstart/python"
Example:
.. code-block:: python
from langchain_community.document_loaders import GoogleApiClient
from langchain_community.document_loaders import GoogleApiYoutubeLoader
google_api_client = GoogleApiClient(
service_account_path=Path("path_to_your_sec_file.json")
)
loader = GoogleApiYoutubeLoader(
google_api_client=google_api_client,
channel_name = "CodeAesthetic"
)
load.load()
"""
google_api_client: GoogleApiClient
channel_name: Optional[str] = None
video_ids: Optional[List[str]] = None
add_video_info: bool = True
captions_language: str = "en"
continue_on_failure: bool = False
def __post_init__(self) -> None:
self.youtube_client = self._build_youtube_client(self.google_api_client.creds)
def _build_youtube_client(self, creds: Any) -> Any:
try:
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"google-api-python-client google-auth-httplib2 "
"google-auth-oauthlib "
"youtube-transcript-api` "
"to use the Google Drive loader"
)
return build("youtube", "v3", credentials=creds)
[docs] @root_validator(pre=True)
def validate_channel_or_videoIds_is_set(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate that either folder_id or document_ids is set, but not both."""
if not values.get("channel_name") and not values.get("video_ids"):
raise ValueError("Must specify either channel_name or video_ids")
return values
def _get_transcripe_for_video_id(self, video_id: str) -> str:
from youtube_transcript_api import NoTranscriptFound, YouTubeTranscriptApi
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
transcript = transcript_list.find_transcript([self.captions_language])
except NoTranscriptFound:
for available_transcript in transcript_list:
transcript = available_transcript.translate(self.captions_language)
continue
transcript_pieces = transcript.fetch()
return " ".join([t["text"].strip(" ") for t in transcript_pieces])
def _get_document_for_video_id(self, video_id: str, **kwargs: Any) -> Document:
captions = self._get_transcripe_for_video_id(video_id)
video_response = (
self.youtube_client.videos()
.list(
part="id,snippet",
id=video_id,
)
.execute()
)
return Document(
page_content=captions,
metadata=video_response.get("items")[0],
)
def _get_channel_id(self, channel_name: str) -> str:
request = self.youtube_client.search().list(
part="id",
q=channel_name,
type="channel",
maxResults=1, # we only need one result since channel names are unique
)
response = request.execute()
channel_id = response["items"][0]["id"]["channelId"]
return channel_id
def _get_uploads_playlist_id(self, channel_id: str) -> str:
request = self.youtube_client.channels().list(
part="contentDetails",
id=channel_id,
)
response = request.execute()
return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
def _get_document_for_channel(self, channel: str, **kwargs: Any) -> List[Document]:
try:
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
)
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"youtube-transcript-api` "
"to use the youtube loader"
)
channel_id = self._get_channel_id(channel)
uploads_playlist_id = self._get_uploads_playlist_id(channel_id)
request = self.youtube_client.playlistItems().list(
part="id,snippet",
playlistId=uploads_playlist_id,
maxResults=50,
)
video_ids = []
while request is not None:
response = request.execute()
# Add each video ID to the list
for item in response["items"]:
video_id = item["snippet"]["resourceId"]["videoId"]
meta_data = {"videoId": video_id}
if self.add_video_info:
item["snippet"].pop("thumbnails")
meta_data.update(item["snippet"])
try:
page_content = self._get_transcripe_for_video_id(video_id)
video_ids.append(
Document(
page_content=page_content,
metadata=meta_data,
)
)
except (TranscriptsDisabled, NoTranscriptFound, ParseError) as e:
if self.continue_on_failure:
logger.error(
"Error fetching transscript "
+ f" {item['id']['videoId']}, exception: {e}"
)
else:
raise e
pass
request = self.youtube_client.search().list_next(request, response)
return video_ids
[docs] def load(self) -> List[Document]:
"""Load documents."""
document_list = []
if self.channel_name:
document_list.extend(self._get_document_for_channel(self.channel_name))
elif self.video_ids:
document_list.extend(
[
self._get_document_for_video_id(video_id)
for video_id in self.video_ids
]
)
else:
raise ValueError("Must specify either channel_name or video_ids")
return document_list