Source code for langchain_community.utilities.infobip
"""Util that sends messages via Infobip."""fromtypingimportAny,Dict,List,Optionalimportrequestsfromlangchain_core.utilsimportget_from_dict_or_envfrompydanticimportBaseModel,ConfigDict,model_validatorfromrequests.adaptersimportHTTPAdapterfromurllib3.utilimportRetry
[docs]classInfobipAPIWrapper(BaseModel):"""Wrapper for Infobip API for messaging."""infobip_api_key:Optional[str]=Noneinfobip_base_url:Optional[str]="https://api.infobip.com"model_config=ConfigDict(extra="forbid",)@model_validator(mode="before")@classmethoddefvalidate_environment(cls,values:Dict)->Any:"""Validate that api key exists in environment."""values["infobip_api_key"]=get_from_dict_or_env(values,"infobip_api_key","INFOBIP_API_KEY")values["infobip_base_url"]=get_from_dict_or_env(values,"infobip_base_url","INFOBIP_BASE_URL")returnvaluesdef_get_requests_session(self)->requests.Session:"""Get a requests session with the correct headers."""retry_strategy:Retry=Retry(total=4,# Maximum number of retriesbackoff_factor=2,# Exponential backoff factorstatus_forcelist=[429,500,502,503,504],# HTTP status codes to retry on)adapter:HTTPAdapter=HTTPAdapter(max_retries=retry_strategy)session=requests.Session()session.mount("https://",adapter)session.headers.update({"Authorization":f"App {self.infobip_api_key}","User-Agent":"infobip-langchain-community",})returnsessiondef_send_sms(self,sender:str,destination_phone_numbers:List[str],text:str)->str:"""Send an SMS message."""json:Dict={"messages":[{"destinations":[{"to":destination}fordestinationindestination_phone_numbers],"from":sender,"text":text,}]}session:requests.Session=self._get_requests_session()session.headers.update({"Content-Type":"application/json",})response:requests.Response=session.post(f"{self.infobip_base_url}/sms/2/text/advanced",json=json,)response_json:Dict=response.json()try:ifresponse.status_code!=200:returnresponse_json["requestError"]["serviceException"]["text"]exceptKeyError:return"Failed to send message"try:returnresponse_json["messages"][0]["messageId"]exceptKeyError:return("Could not get message ID from response, message was sent successfully")def_send_email(self,from_email:str,to_email:str,subject:str,body:str)->str:"""Send an email message."""try:fromrequests_toolbeltimportMultipartEncoderexceptImportErrorase:raiseImportError("Unable to import requests_toolbelt, please install it with ""`pip install -U requests-toolbelt`.")fromeform_data:Dict={"from":from_email,"to":to_email,"subject":subject,"text":body,}data=MultipartEncoder(fields=form_data)session:requests.Session=self._get_requests_session()session.headers.update({"Content-Type":data.content_type,})response:requests.Response=session.post(f"{self.infobip_base_url}/email/3/send",data=data,)response_json:Dict=response.json()try:ifresponse.status_code!=200:returnresponse_json["requestError"]["serviceException"]["text"]exceptKeyError:return"Failed to send message"try:returnresponse_json["messages"][0]["messageId"]exceptKeyError:return("Could not get message ID from response, message was sent successfully")
[docs]defrun(self,body:str="",to:str="",sender:str="",subject:str="",channel:str="sms",)->str:ifchannel=="sms":ifsender=="":raiseValueError("Sender must be specified for SMS messages")ifto=="":raiseValueError("Destination must be specified for SMS messages")ifbody=="":raiseValueError("Body must be specified for SMS messages")returnself._send_sms(sender=sender,destination_phone_numbers=[to],text=body,)elifchannel=="email":ifsender=="":raiseValueError("Sender must be specified for email messages")ifto=="":raiseValueError("Destination must be specified for email messages")ifsubject=="":raiseValueError("Subject must be specified for email messages")ifbody=="":raiseValueError("Body must be specified for email messages")returnself._send_email(from_email=sender,to_email=to,subject=subject,body=body,)else:raiseValueError(f"Channel {channel} is not supported")