Source code for langchain_community.document_loaders.youtube

"""Loads YouTube transcript."""

from __future__ import annotations

import logging
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Sequence, Union
from urllib.parse import parse_qs, urlparse
from xml.etree.ElementTree import ParseError  # OK: trusted-source

from langchain_core.documents import Document
from pydantic import model_validator
from pydantic.dataclasses import dataclass

from langchain_community.document_loaders.base import BaseLoader

logger = logging.getLogger(__name__)

SCOPES = ["https://www.googleapis.com/auth/youtube.readonly"]


[docs] @dataclass class GoogleApiClient: """Generic Google API Client. To use, you should have the ``google_auth_oauthlib,youtube_transcript_api,google`` python package installed. As the google api expects credentials you need to set up a google account and register your Service. "https://developers.google.com/docs/api/quickstart/python" *Security Note*: Note that parsing of the transcripts relies on the standard xml library but the input is viewed as trusted in this case. Example: .. code-block:: python from langchain_community.document_loaders import GoogleApiClient google_api_client = GoogleApiClient( service_account_path=Path("path_to_your_sec_file.json") ) """ credentials_path: Path = Path.home() / ".credentials" / "credentials.json" service_account_path: Path = Path.home() / ".credentials" / "credentials.json" token_path: Path = Path.home() / ".credentials" / "token.json" def __post_init__(self) -> None: self.creds = self._load_credentials()
[docs] @model_validator(mode="before") @classmethod def validate_channel_or_videoIds_is_set(cls, values: Dict[str, Any]) -> Any: """Validate that either folder_id or document_ids is set, but not both.""" if not values.get("credentials_path") and not values.get( "service_account_path" ): raise ValueError("Must specify either channel_name or video_ids") return values
def _load_credentials(self) -> Any: """Load credentials.""" # Adapted from https://developers.google.com/drive/api/v3/quickstart/python try: from google.auth.transport.requests import Request from google.oauth2 import service_account from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401 except ImportError: raise ImportError( "You must run" "`pip install --upgrade " "google-api-python-client google-auth-httplib2 " "google-auth-oauthlib " "youtube-transcript-api` " "to use the Google Drive loader" ) creds = None if self.service_account_path.exists(): return service_account.Credentials.from_service_account_file( str(self.service_account_path) ) if self.token_path.exists(): creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( str(self.credentials_path), SCOPES ) creds = flow.run_local_server(port=0) with open(self.token_path, "w") as token: token.write(creds.to_json()) return creds
ALLOWED_SCHEMES = {"http", "https"} ALLOWED_NETLOCS = { "youtu.be", "m.youtube.com", "youtube.com", "www.youtube.com", "www.youtube-nocookie.com", "vid.plus", } def _parse_video_id(url: str) -> Optional[str]: """Parse a YouTube URL and return the video ID if valid, otherwise None.""" parsed_url = urlparse(url) if parsed_url.scheme not in ALLOWED_SCHEMES: return None if parsed_url.netloc not in ALLOWED_NETLOCS: return None path = parsed_url.path if path.endswith("/watch"): query = parsed_url.query parsed_query = parse_qs(query) if "v" in parsed_query: ids = parsed_query["v"] video_id = ids if isinstance(ids, str) else ids[0] else: return None else: path = parsed_url.path.lstrip("/") video_id = path.split("/")[-1] if len(video_id) != 11: # Video IDs are 11 characters long return None return video_id
[docs] class TranscriptFormat(Enum): """Output formats of transcripts from `YoutubeLoader`.""" TEXT = "text" LINES = "lines" CHUNKS = "chunks"
[docs] class YoutubeLoader(BaseLoader): """Load `YouTube` video transcripts."""
[docs] def __init__( self, video_id: str, add_video_info: bool = False, language: Union[str, Sequence[str]] = "en", translation: Optional[str] = None, transcript_format: TranscriptFormat = TranscriptFormat.TEXT, continue_on_failure: bool = False, chunk_size_seconds: int = 120, ): """Initialize with YouTube video ID.""" self.video_id = video_id self._metadata = {"source": video_id} self.add_video_info = add_video_info self.language = language if isinstance(language, str): self.language = [language] else: self.language = language self.translation = translation self.transcript_format = transcript_format self.continue_on_failure = continue_on_failure self.chunk_size_seconds = chunk_size_seconds
[docs] @staticmethod def extract_video_id(youtube_url: str) -> str: """Extract video ID from common YouTube URLs.""" video_id = _parse_video_id(youtube_url) if not video_id: raise ValueError( f'Could not determine the video ID for the URL "{youtube_url}".' ) return video_id
[docs] @classmethod def from_youtube_url(cls, youtube_url: str, **kwargs: Any) -> YoutubeLoader: """Given a YouTube URL, construct a loader. See `YoutubeLoader()` constructor for a list of keyword arguments. """ video_id = cls.extract_video_id(youtube_url) return cls(video_id, **kwargs)
def _make_chunk_document( self, chunk_pieces: List[Dict], chunk_start_seconds: int ) -> Document: """Create Document from chunk of transcript pieces.""" m, s = divmod(chunk_start_seconds, 60) h, m = divmod(m, 60) return Document( page_content=" ".join( map(lambda chunk_piece: chunk_piece["text"].strip(" "), chunk_pieces) ), metadata={ **self._metadata, "start_seconds": chunk_start_seconds, "start_timestamp": f"{h:02d}:{m:02d}:{s:02d}", "source": # replace video ID with URL to start time f"https://www.youtube.com/watch?v={self.video_id}" f"&t={chunk_start_seconds}s", }, ) def _get_transcript_chunks( self, transcript_pieces: List[Dict] ) -> Generator[Document, None, None]: chunk_pieces: List[Dict[str, Any]] = [] chunk_start_seconds = 0 chunk_time_limit = self.chunk_size_seconds for transcript_piece in transcript_pieces: piece_end = transcript_piece["start"] + transcript_piece["duration"] if piece_end > chunk_time_limit: if chunk_pieces: yield self._make_chunk_document(chunk_pieces, chunk_start_seconds) chunk_pieces = [] chunk_start_seconds = chunk_time_limit chunk_time_limit += self.chunk_size_seconds chunk_pieces.append(transcript_piece) if len(chunk_pieces) > 0: yield self._make_chunk_document(chunk_pieces, chunk_start_seconds)
[docs] def load(self) -> List[Document]: """Load YouTube transcripts into `Document` objects.""" try: from youtube_transcript_api import ( NoTranscriptFound, TranscriptsDisabled, YouTubeTranscriptApi, ) except ImportError: raise ImportError( 'Could not import "youtube_transcript_api" Python package. ' "Please install it with `pip install youtube-transcript-api`." ) if self.add_video_info: # Get more video meta info # Such as title, description, thumbnail url, publish_date video_info = self._get_video_info() self._metadata.update(video_info) try: transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id) except TranscriptsDisabled: return [] try: transcript = transcript_list.find_transcript(self.language) except NoTranscriptFound: transcript = transcript_list.find_transcript(["en"]) if self.translation is not None: transcript = transcript.translate(self.translation) transcript_pieces: List[Dict[str, Any]] = transcript.fetch() if self.transcript_format == TranscriptFormat.TEXT: transcript = " ".join( map( lambda transcript_piece: transcript_piece["text"].strip(" "), transcript_pieces, ) ) return [Document(page_content=transcript, metadata=self._metadata)] elif self.transcript_format == TranscriptFormat.LINES: return list( map( lambda transcript_piece: Document( page_content=transcript_piece["text"].strip(" "), metadata=dict( filter( lambda item: item[0] != "text", transcript_piece.items() ) ), ), transcript_pieces, ) ) elif self.transcript_format == TranscriptFormat.CHUNKS: return list(self._get_transcript_chunks(transcript_pieces)) else: raise ValueError("Unknown transcript format.")
def _get_video_info(self) -> Dict: """Get important video information. Components include: - title - description - thumbnail URL, - publish_date - channel author - and more. """ try: from pytube import YouTube except ImportError: raise ImportError( 'Could not import "pytube" Python package. ' "Please install it with `pip install pytube`." ) yt = YouTube(f"https://www.youtube.com/watch?v={self.video_id}") video_info = { "title": yt.title or "Unknown", "description": yt.description or "Unknown", "view_count": yt.views or 0, "thumbnail_url": yt.thumbnail_url or "Unknown", "publish_date": yt.publish_date.strftime("%Y-%m-%d %H:%M:%S") if yt.publish_date else "Unknown", "length": yt.length or 0, "author": yt.author or "Unknown", } return video_info
[docs] @dataclass class GoogleApiYoutubeLoader(BaseLoader): """Load all Videos from a `YouTube` Channel. To use, you should have the ``googleapiclient,youtube_transcript_api`` python package installed. As the service needs a google_api_client, you first have to initialize the GoogleApiClient. Additionally you have to either provide a channel name or a list of videoids "https://developers.google.com/docs/api/quickstart/python" Example: .. code-block:: python from langchain_community.document_loaders import GoogleApiClient from langchain_community.document_loaders import GoogleApiYoutubeLoader google_api_client = GoogleApiClient( service_account_path=Path("path_to_your_sec_file.json") ) loader = GoogleApiYoutubeLoader( google_api_client=google_api_client, channel_name = "CodeAesthetic" ) load.load() """ google_api_client: GoogleApiClient channel_name: Optional[str] = None video_ids: Optional[List[str]] = None add_video_info: bool = True captions_language: str = "en" continue_on_failure: bool = False def __post_init__(self) -> None: self.youtube_client = self._build_youtube_client(self.google_api_client.creds) def _build_youtube_client(self, creds: Any) -> Any: try: from googleapiclient.discovery import build from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401 except ImportError: raise ImportError( "You must run" "`pip install --upgrade " "google-api-python-client google-auth-httplib2 " "google-auth-oauthlib " "youtube-transcript-api` " "to use the Google Drive loader" ) return build("youtube", "v3", credentials=creds)
[docs] @model_validator(mode="before") @classmethod def validate_channel_or_videoIds_is_set(cls, values: Dict[str, Any]) -> Any: """Validate that either folder_id or document_ids is set, but not both.""" if not values.get("channel_name") and not values.get("video_ids"): raise ValueError("Must specify either channel_name or video_ids") return values
def _get_transcripe_for_video_id(self, video_id: str) -> str: from youtube_transcript_api import NoTranscriptFound, YouTubeTranscriptApi transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: transcript = transcript_list.find_transcript([self.captions_language]) except NoTranscriptFound: for available_transcript in transcript_list: transcript = available_transcript.translate(self.captions_language) continue transcript_pieces = transcript.fetch() return " ".join([t["text"].strip(" ") for t in transcript_pieces]) def _get_document_for_video_id(self, video_id: str, **kwargs: Any) -> Document: captions = self._get_transcripe_for_video_id(video_id) video_response = ( self.youtube_client.videos() .list( part="id,snippet", id=video_id, ) .execute() ) return Document( page_content=captions, metadata=video_response.get("items")[0], ) def _get_channel_id(self, channel_name: str) -> str: request = self.youtube_client.search().list( part="id", q=channel_name, type="channel", maxResults=1, # we only need one result since channel names are unique ) response = request.execute() channel_id = response["items"][0]["id"]["channelId"] return channel_id def _get_uploads_playlist_id(self, channel_id: str) -> str: request = self.youtube_client.channels().list( part="contentDetails", id=channel_id, ) response = request.execute() return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] def _get_document_for_channel(self, channel: str, **kwargs: Any) -> List[Document]: try: from youtube_transcript_api import ( NoTranscriptFound, TranscriptsDisabled, ) except ImportError: raise ImportError( "You must run" "`pip install --upgrade " "youtube-transcript-api` " "to use the youtube loader" ) channel_id = self._get_channel_id(channel) uploads_playlist_id = self._get_uploads_playlist_id(channel_id) request = self.youtube_client.playlistItems().list( part="id,snippet", playlistId=uploads_playlist_id, maxResults=50, ) video_ids = [] while request is not None: response = request.execute() # Add each video ID to the list for item in response["items"]: video_id = item["snippet"]["resourceId"]["videoId"] meta_data = {"videoId": video_id} if self.add_video_info: item["snippet"].pop("thumbnails") meta_data.update(item["snippet"]) try: page_content = self._get_transcripe_for_video_id(video_id) video_ids.append( Document( page_content=page_content, metadata=meta_data, ) ) except (TranscriptsDisabled, NoTranscriptFound, ParseError) as e: if self.continue_on_failure: logger.error( "Error fetching transscript " + f" {item['id']['videoId']}, exception: {e}" ) else: raise e pass request = self.youtube_client.search().list_next(request, response) return video_ids
[docs] def load(self) -> List[Document]: """Load documents.""" document_list = [] if self.channel_name: document_list.extend(self._get_document_for_channel(self.channel_name)) elif self.video_ids: document_list.extend( [ self._get_document_for_video_id(video_id) for video_id in self.video_ids ] ) else: raise ValueError("Must specify either channel_name or video_ids") return document_list