"""Util that can interact with Zapier NLA.
Full docs here: https://nla.zapier.com/start/
Note: this wrapper currently only implemented the `api_key` auth method for testing
and server-side production use cases (using the developer's connected accounts on
Zapier.com)
For use-cases where LangChain + Zapier NLA is powering a user-facing application, and
LangChain needs access to the end-user's connected accounts on Zapier.com, you'll need
to use oauth. Review the full docs above and reach out to nla@zapier.com for
developer support.
"""
import json
from typing import Any, Dict, List, Optional
import aiohttp
import requests
from langchain_core.utils import get_from_dict_or_env
from pydantic import BaseModel, ConfigDict, model_validator
from requests import Request, Session
[docs]
class ZapierNLAWrapper(BaseModel):
"""Wrapper for Zapier NLA.
Full docs here: https://nla.zapier.com/start/
This wrapper supports both API Key and OAuth Credential auth methods. API Key
is the fastest way to get started using this wrapper.
Call this wrapper with either `zapier_nla_api_key` or
`zapier_nla_oauth_access_token` arguments, or set the `ZAPIER_NLA_API_KEY`
environment variable. If both arguments are set, the Access Token will take
precedence.
For use-cases where LangChain + Zapier NLA is powering a user-facing application,
and LangChain needs access to the end-user's connected accounts on Zapier.com,
you'll need to use OAuth. Review the full docs above to learn how to create
your own provider and generate credentials.
"""
zapier_nla_api_key: str
zapier_nla_oauth_access_token: str
zapier_nla_api_base: str = "https://nla.zapier.com/api/v1/"
model_config = ConfigDict(
extra="forbid",
)
def _format_headers(self) -> Dict[str, str]:
"""Format headers for requests."""
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if self.zapier_nla_oauth_access_token:
headers.update(
{"Authorization": f"Bearer {self.zapier_nla_oauth_access_token}"}
)
else:
headers.update({"X-API-Key": self.zapier_nla_api_key})
return headers
def _get_session(self) -> Session:
session = requests.Session()
session.headers.update(self._format_headers())
return session
async def _arequest(self, method: str, url: str, **kwargs: Any) -> Dict[str, Any]:
"""Make an async request."""
async with aiohttp.ClientSession(headers=self._format_headers()) as session:
async with session.request(method, url, **kwargs) as response:
response.raise_for_status()
return await response.json()
def _create_action_payload( # type: ignore[no-untyped-def]
self, instructions: str, params: Optional[Dict] = None, preview_only=False
) -> Dict:
"""Create a payload for an action."""
data = params if params else {}
data.update(
{
"instructions": instructions,
}
)
if preview_only:
data.update({"preview_only": True})
return data
def _create_action_url(self, action_id: str) -> str:
"""Create a url for an action."""
return self.zapier_nla_api_base + f"exposed/{action_id}/execute/"
def _create_action_request( # type: ignore[no-untyped-def]
self,
action_id: str,
instructions: str,
params: Optional[Dict] = None,
preview_only=False,
) -> Request:
data = self._create_action_payload(instructions, params, preview_only)
return Request(
"POST",
self._create_action_url(action_id),
json=data,
)
@model_validator(mode="before")
@classmethod
def validate_environment(cls, values: Dict) -> Any:
"""Validate that api key exists in environment."""
zapier_nla_api_key_default = None
# If there is a oauth_access_key passed in the values
# we don't need a nla_api_key it can be blank
if "zapier_nla_oauth_access_token" in values:
zapier_nla_api_key_default = ""
else:
values["zapier_nla_oauth_access_token"] = ""
# we require at least one API Key
zapier_nla_api_key = get_from_dict_or_env(
values,
"zapier_nla_api_key",
"ZAPIER_NLA_API_KEY",
zapier_nla_api_key_default,
)
values["zapier_nla_api_key"] = zapier_nla_api_key
return values
[docs]
async def alist(self) -> List[Dict]:
"""Returns a list of all exposed (enabled) actions associated with
current user (associated with the set api_key). Change your exposed
actions here: https://nla.zapier.com/demo/start/
The return list can be empty if no actions exposed. Else will contain
a list of action objects:
[{
"id": str,
"description": str,
"params": Dict[str, str]
}]
`params` will always contain an `instructions` key, the only required
param. All others optional and if provided will override any AI guesses
(see "understanding the AI guessing flow" here:
https://nla.zapier.com/api/v1/docs)
"""
response = await self._arequest("GET", self.zapier_nla_api_base + "exposed/")
return response["results"]
[docs]
def list(self) -> List[Dict]:
"""Returns a list of all exposed (enabled) actions associated with
current user (associated with the set api_key). Change your exposed
actions here: https://nla.zapier.com/demo/start/
The return list can be empty if no actions exposed. Else will contain
a list of action objects:
[{
"id": str,
"description": str,
"params": Dict[str, str]
}]
`params` will always contain an `instructions` key, the only required
param. All others optional and if provided will override any AI guesses
(see "understanding the AI guessing flow" here:
https://nla.zapier.com/docs/using-the-api#ai-guessing)
"""
session = self._get_session()
try:
response = session.get(self.zapier_nla_api_base + "exposed/")
response.raise_for_status()
except requests.HTTPError as http_err:
if response.status_code == 401:
if self.zapier_nla_oauth_access_token:
raise requests.HTTPError(
f"An unauthorized response occurred. Check that your "
f"access token is correct and doesn't need to be "
f"refreshed. Err: {http_err}",
response=response,
)
raise requests.HTTPError(
f"An unauthorized response occurred. Check that your api "
f"key is correct. Err: {http_err}",
response=response,
)
raise http_err
return response.json()["results"]
[docs]
def run(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""Executes an action that is identified by action_id, must be exposed
(enabled) by the current user (associated with the set api_key). Change
your exposed actions here: https://nla.zapier.com/demo/start/
The return JSON is guaranteed to be less than ~500 words (350
tokens) making it safe to inject into the prompt of another LLM
call.
"""
session = self._get_session()
request = self._create_action_request(action_id, instructions, params)
response = session.send(session.prepare_request(request))
response.raise_for_status()
return response.json()["result"]
[docs]
async def arun(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""Executes an action that is identified by action_id, must be exposed
(enabled) by the current user (associated with the set api_key). Change
your exposed actions here: https://nla.zapier.com/demo/start/
The return JSON is guaranteed to be less than ~500 words (350
tokens) making it safe to inject into the prompt of another LLM
call.
"""
response = await self._arequest(
"POST",
self._create_action_url(action_id),
json=self._create_action_payload(instructions, params),
)
return response["result"]
[docs]
def preview(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""Same as run, but instead of actually executing the action, will
instead return a preview of params that have been guessed by the AI in
case you need to explicitly review before executing."""
session = self._get_session()
params = params if params else {}
params.update({"preview_only": True})
request = self._create_action_request(action_id, instructions, params, True)
response = session.send(session.prepare_request(request))
response.raise_for_status()
return response.json()["input_params"]
[docs]
async def apreview(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""Same as run, but instead of actually executing the action, will
instead return a preview of params that have been guessed by the AI in
case you need to explicitly review before executing."""
response = await self._arequest(
"POST",
self._create_action_url(action_id),
json=self._create_action_payload(instructions, params, preview_only=True),
)
return response["result"]
[docs]
def run_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as run, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = self.run(*args, **kwargs)
return json.dumps(data)
[docs]
async def arun_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as run, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = await self.arun(*args, **kwargs)
return json.dumps(data)
[docs]
def preview_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as preview, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = self.preview(*args, **kwargs)
return json.dumps(data)
[docs]
async def apreview_as_str( # type: ignore[no-untyped-def]
self, *args, **kwargs
) -> str:
"""Same as preview, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = await self.apreview(*args, **kwargs)
return json.dumps(data)
[docs]
def list_as_str(self) -> str: # type: ignore[no-untyped-def]
"""Same as list, but returns a stringified version of the JSON for
insertting back into an LLM."""
actions = self.list()
return json.dumps(actions)
[docs]
async def alist_as_str(self) -> str: # type: ignore[no-untyped-def]
"""Same as list, but returns a stringified version of the JSON for
insertting back into an LLM."""
actions = await self.alist()
return json.dumps(actions)