Source code for langchain_community.document_loaders.stripe
import json
import urllib.request
from typing import List, Optional
from langchain_core.documents import Document
from langchain_core.utils import get_from_env, stringify_dict
from langchain_community.document_loaders.base import BaseLoader
STRIPE_ENDPOINTS = {
"balance_transactions": "https://api.stripe.com/v1/balance_transactions",
"charges": "https://api.stripe.com/v1/charges",
"customers": "https://api.stripe.com/v1/customers",
"events": "https://api.stripe.com/v1/events",
"refunds": "https://api.stripe.com/v1/refunds",
"disputes": "https://api.stripe.com/v1/disputes",
}
[docs]class StripeLoader(BaseLoader):
"""Load from `Stripe` API."""
[docs] def __init__(self, resource: str, access_token: Optional[str] = None) -> None:
"""Initialize with a resource and an access token.
Args:
resource: The resource.
access_token: The access token.
"""
self.resource = resource
access_token = access_token or get_from_env(
"access_token", "STRIPE_ACCESS_TOKEN"
)
self.headers = {"Authorization": f"Bearer {access_token}"}
def _make_request(self, url: str) -> List[Document]:
request = urllib.request.Request(url, headers=self.headers)
with urllib.request.urlopen(request) as response:
json_data = json.loads(response.read().decode())
text = stringify_dict(json_data)
metadata = {"source": url}
return [Document(page_content=text, metadata=metadata)]
def _get_resource(self) -> List[Document]:
endpoint = STRIPE_ENDPOINTS.get(self.resource)
if endpoint is None:
return []
return self._make_request(endpoint)
[docs] def load(self) -> List[Document]:
return self._get_resource()