Source code for langchain_community.llms.self_hosted

import importlib.util
import logging
import pickle
from typing import Any, Callable, List, Mapping, Optional

from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from pydantic import ConfigDict

from langchain_community.llms.utils import enforce_stop_tokens

logger = logging.getLogger(__name__)


def _generate_text(
    pipeline: Any,
    prompt: str,
    *args: Any,
    stop: Optional[List[str]] = None,
    **kwargs: Any,
) -> str:
    """Inference function to send to the remote hardware.

    Accepts a pipeline callable (or, more likely,
    a key pointing to the model on the cluster's object store)
    and returns text predictions for each document
    in the batch.
    """
    text = pipeline(prompt, *args, **kwargs)
    if stop is not None:
        text = enforce_stop_tokens(text, stop)
    return text


def _send_pipeline_to_device(pipeline: Any, device: int) -> Any:
    """Send a pipeline to a device on the cluster."""
    if isinstance(pipeline, str):
        with open(pipeline, "rb") as f:
            # This code path can only be triggered if the user
            # passed allow_dangerous_deserialization=True
            pipeline = pickle.load(f)  # ignore[pickle]: explicit-opt-in

    if importlib.util.find_spec("torch") is not None:
        import torch

        cuda_device_count = torch.cuda.device_count()
        if device < -1 or (device >= cuda_device_count):
            raise ValueError(
                f"Got device=={device}, "
                f"device is required to be within [-1, {cuda_device_count})"
            )
        if device < 0 and cuda_device_count > 0:
            logger.warning(
                "Device has %d GPUs available. "
                "Provide device={deviceId} to `from_model_id` to use available"
                "GPUs for execution. deviceId is -1 for CPU and "
                "can be a positive integer associated with CUDA device id.",
                cuda_device_count,
            )

        pipeline.device = torch.device(device)
        pipeline.model = pipeline.model.to(pipeline.device)
    return pipeline


[docs] class SelfHostedPipeline(LLM): """Model inference on self-hosted remote hardware. Supported hardware includes auto-launched instances on AWS, GCP, Azure, and Lambda, as well as servers specified by IP address and SSH credentials (such as on-prem, or another cloud like Paperspace, Coreweave, etc.). To use, you should have the ``runhouse`` python package installed. Example for custom pipeline and inference functions: .. code-block:: python from langchain_community.llms import SelfHostedPipeline from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline import runhouse as rh def load_pipeline(): tokenizer = AutoTokenizer.from_pretrained("gpt2") model = AutoModelForCausalLM.from_pretrained("gpt2") return pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=10 ) def inference_fn(pipeline, prompt, stop = None): return pipeline(prompt)[0]["generated_text"] gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") llm = SelfHostedPipeline( model_load_fn=load_pipeline, hardware=gpu, model_reqs=model_reqs, inference_fn=inference_fn ) Example for <2GB model (can be serialized and sent directly to the server): .. code-block:: python from langchain_community.llms import SelfHostedPipeline import runhouse as rh gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") my_model = ... llm = SelfHostedPipeline.from_pipeline( pipeline=my_model, hardware=gpu, model_reqs=["./", "torch", "transformers"], ) Example passing model path for larger models: .. code-block:: python from langchain_community.llms import SelfHostedPipeline import runhouse as rh import pickle from transformers import pipeline generator = pipeline(model="gpt2") rh.blob(pickle.dumps(generator), path="models/pipeline.pkl" ).save().to(gpu, path="models") llm = SelfHostedPipeline.from_pipeline( pipeline="models/pipeline.pkl", hardware=gpu, model_reqs=["./", "torch", "transformers"], ) """ pipeline_ref: Any = None #: :meta private: client: Any = None #: :meta private: inference_fn: Callable = _generate_text #: :meta private: """Inference function to send to the remote hardware.""" hardware: Any = None """Remote hardware to send the inference function to.""" model_load_fn: Callable """Function to load the model remotely on the server.""" load_fn_kwargs: Optional[dict] = None """Keyword arguments to pass to the model load function.""" model_reqs: List[str] = ["./", "torch"] """Requirements to install on hardware to inference the model.""" allow_dangerous_deserialization: bool = False """Allow deserialization using pickle which can be dangerous if loading compromised data. """ model_config = ConfigDict( extra="forbid", ) def __init__(self, **kwargs: Any): """Init the pipeline with an auxiliary function. The load function must be in global scope to be imported and run on the server, i.e. in a module and not a REPL or closure. Then, initialize the remote inference function. """ if not kwargs.get("allow_dangerous_deserialization"): raise ValueError( "SelfHostedPipeline relies on the pickle module. " "You will need to set allow_dangerous_deserialization=True " "if you want to opt-in to allow deserialization of data using pickle." "Data can be compromised by a malicious actor if " "not handled properly to include " "a malicious payload that when deserialized with " "pickle can execute arbitrary code. " ) super().__init__(**kwargs) try: import runhouse as rh except ImportError: raise ImportError( "Could not import runhouse python package. " "Please install it with `pip install runhouse`." ) remote_load_fn = rh.function(fn=self.model_load_fn).to( self.hardware, reqs=self.model_reqs ) _load_fn_kwargs = self.load_fn_kwargs or {} self.pipeline_ref = remote_load_fn.remote(**_load_fn_kwargs) self.client = rh.function(fn=self.inference_fn).to( self.hardware, reqs=self.model_reqs )
[docs] @classmethod def from_pipeline( cls, pipeline: Any, hardware: Any, model_reqs: Optional[List[str]] = None, device: int = 0, **kwargs: Any, ) -> LLM: """Init the SelfHostedPipeline from a pipeline object or string.""" if not isinstance(pipeline, str): logger.warning( "Serializing pipeline to send to remote hardware. " "Note, it can be quite slow" "to serialize and send large models with each execution. " "Consider sending the pipeline" "to the cluster and passing the path to the pipeline instead." ) load_fn_kwargs = {"pipeline": pipeline, "device": device} return cls( load_fn_kwargs=load_fn_kwargs, model_load_fn=_send_pipeline_to_device, hardware=hardware, model_reqs=["transformers", "torch"] + (model_reqs or []), **kwargs, )
@property def _identifying_params(self) -> Mapping[str, Any]: """Get the identifying parameters.""" return { **{"hardware": self.hardware}, } @property def _llm_type(self) -> str: return "self_hosted_llm" def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: return self.client( pipeline=self.pipeline_ref, prompt=prompt, stop=stop, **kwargs )