[docs]classBeam(LLM):# type: ignore[override, override, override, override]"""Beam API for gpt2 large language model. To use, you should have the ``beam-sdk`` python package installed, and the environment variable ``BEAM_CLIENT_ID`` set with your client id and ``BEAM_CLIENT_SECRET`` set with your client secret. Information on how to get this is available here: https://docs.beam.cloud/account/api-keys. The wrapper can then be called as follows, where the name, cpu, memory, gpu, python version, and python packages can be updated accordingly. Once deployed, the instance can be called. Example: .. code-block:: python llm = Beam(model_name="gpt2", name="langchain-gpt2", cpu=8, memory="32Gi", gpu="A10G", python_version="python3.8", python_packages=[ "diffusers[torch]>=0.10", "transformers", "torch", "pillow", "accelerate", "safetensors", "xformers",], max_length=50) llm._deploy() call_result = llm._call(input) """model_name:str=""name:str=""cpu:str=""memory:str=""gpu:str=""python_version:str=""python_packages:List[str]=[]max_length:str=""url:str="""""model endpoint to use"""model_kwargs:Dict[str,Any]=Field(default_factory=dict)"""Holds any model parameters valid for `create` call not explicitly specified."""beam_client_id:str=""beam_client_secret:str=""app_id:Optional[str]=Nonemodel_config=ConfigDict(extra="forbid",)@model_validator(mode="before")@classmethoddefbuild_extra(cls,values:Dict[str,Any])->Any:"""Build extra kwargs from additional params that were passed in."""all_required_field_names={field.aliasforfieldinget_fields(cls).values()}extra=values.get("model_kwargs",{})forfield_nameinlist(values):iffield_namenotinall_required_field_names:iffield_nameinextra:raiseValueError(f"Found {field_name} supplied twice.")logger.warning(f"""{field_name} was transferred to model_kwargs. Please confirm that {field_name} is what you intended.""")extra[field_name]=values.pop(field_name)values["model_kwargs"]=extrareturnvalues
[docs]@pre_initdefvalidate_environment(cls,values:Dict)->Dict:"""Validate that api key and python package exists in environment."""beam_client_id=get_from_dict_or_env(values,"beam_client_id","BEAM_CLIENT_ID")beam_client_secret=get_from_dict_or_env(values,"beam_client_secret","BEAM_CLIENT_SECRET")values["beam_client_id"]=beam_client_idvalues["beam_client_secret"]=beam_client_secretreturnvalues
@propertydef_identifying_params(self)->Mapping[str,Any]:"""Get the identifying parameters."""return{"model_name":self.model_name,"name":self.name,"cpu":self.cpu,"memory":self.memory,"gpu":self.gpu,"python_version":self.python_version,"python_packages":self.python_packages,"max_length":self.max_length,"model_kwargs":self.model_kwargs,}@propertydef_llm_type(self)->str:"""Return type of llm."""return"beam"
[docs]defapp_creation(self)->None:"""Creates a Python file which will contain your Beam app definition."""script=textwrap.dedent("""\ import beam # The environment your code will run on app = beam.App( name="{name}", cpu={cpu}, memory="{memory}", gpu="{gpu}", python_version="{python_version}", python_packages={python_packages}, ) app.Trigger.RestAPI( inputs={{"prompt": beam.Types.String(), "max_length": beam.Types.String()}}, outputs={{"text": beam.Types.String()}}, handler="run.py:beam_langchain", ) """)script_name="app.py"withopen(script_name,"w")asfile:file.write(script.format(name=self.name,cpu=self.cpu,memory=self.memory,gpu=self.gpu,python_version=self.python_version,python_packages=self.python_packages,))
[docs]defrun_creation(self)->None:"""Creates a Python file which will be deployed on beam."""script=textwrap.dedent(""" import os import transformers from transformers import GPT2LMHeadModel, GPT2Tokenizer model_name = "{model_name}" def beam_langchain(**inputs): prompt = inputs["prompt"] length = inputs["max_length"] tokenizer = GPT2Tokenizer.from_pretrained(model_name) model = GPT2LMHeadModel.from_pretrained(model_name) encodedPrompt = tokenizer.encode(prompt, return_tensors='pt') outputs = model.generate(encodedPrompt, max_length=int(length), do_sample=True, pad_token_id=tokenizer.eos_token_id) output = tokenizer.decode(outputs[0], skip_special_tokens=True) print(output) # noqa: T201 return {{"text": output}} """)script_name="run.py"withopen(script_name,"w")asfile:file.write(script.format(model_name=self.model_name))
def_deploy(self)->str:"""Call to Beam."""try:importbeam# type: ignoreifbeam.__path__=="":raiseImportErrorexceptImportError:raiseImportError("Could not import beam python package. ""Please install it with `curl ""https://raw.githubusercontent.com/slai-labs""/get-beam/main/get-beam.sh -sSfL | sh`.")self.app_creation()self.run_creation()process=subprocess.run("beam deploy app.py",shell=True,capture_output=True,text=True)ifprocess.returncode==0:output=process.stdoutlogger.info(output)lines=output.split("\n")forlineinlines:ifline.startswith(" i Send requests to: https://apps.beam.cloud/"):self.app_id=line.split("/")[-1]self.url=line.split(":")[1].strip()returnself.app_idraiseValueError(f"""Failed to retrieve the appID from the deployment output. Deployment output: {output}""")else:raiseValueError(f"Deployment failed. Error: {process.stderr}")@propertydefauthorization(self)->str:ifself.beam_client_id:credential_str=self.beam_client_id+":"+self.beam_client_secretelse:credential_str=self.beam_client_secretreturnbase64.b64encode(credential_str.encode()).decode()def_call(self,prompt:str,stop:Optional[list]=None,run_manager:Optional[CallbackManagerForLLMRun]=None,**kwargs:Any,)->str:"""Call to Beam."""url="https://apps.beam.cloud/"+self.app_idifself.app_idelseself.urlpayload={"prompt":prompt,"max_length":self.max_length}payload.update(kwargs)headers={"Accept":"*/*","Accept-Encoding":"gzip, deflate","Authorization":"Basic "+self.authorization,"Connection":"keep-alive","Content-Type":"application/json",}for_inrange(DEFAULT_NUM_TRIES):request=requests.post(url,headers=headers,data=json.dumps(payload))ifrequest.status_code==200:returnrequest.json()["text"]time.sleep(DEFAULT_SLEEP_TIME)logger.warning("Unable to successfully call model.")return""