Source code for langchain_experimental.utilities.python

import functools
import logging
import multiprocessing
import re
import sys
from io import StringIO
from typing import Dict, Optional

from langchain.pydantic_v1 import BaseModel, Field

logger = logging.getLogger(__name__)


@functools.lru_cache(maxsize=None)
def warn_once() -> None:
    """Warn once about the dangers of PythonREPL."""
    logger.warning("Python REPL can execute arbitrary code. Use with caution.")


[docs]class PythonREPL(BaseModel): """Simulates a standalone Python REPL.""" globals: Optional[Dict] = Field(default_factory=dict, alias="_globals") locals: Optional[Dict] = Field(default_factory=dict, alias="_locals")
[docs] @staticmethod def sanitize_input(query: str) -> str: """Sanitize input to the python REPL. Remove whitespace, backtick & python (if llm mistakes python console as terminal) Args: query: The query to sanitize Returns: str: The sanitized query """ query = re.sub(r"^(\s|`)*(?i:python)?\s*", "", query) query = re.sub(r"(\s|`)*$", "", query) return query
[docs] @classmethod def worker( cls, command: str, globals: Optional[Dict], locals: Optional[Dict], queue: multiprocessing.Queue, ) -> None: old_stdout = sys.stdout sys.stdout = mystdout = StringIO() try: cleaned_command = cls.sanitize_input(command) exec(cleaned_command, globals, locals) sys.stdout = old_stdout queue.put(mystdout.getvalue()) except Exception as e: sys.stdout = old_stdout queue.put(repr(e))
[docs] def run(self, command: str, timeout: Optional[int] = None) -> str: """Run command with own globals/locals and returns anything printed. Timeout after the specified number of seconds.""" # Warn against dangers of PythonREPL warn_once() queue: multiprocessing.Queue = multiprocessing.Queue() # Only use multiprocessing if we are enforcing a timeout if timeout is not None: # create a Process p = multiprocessing.Process( target=self.worker, args=(command, self.globals, self.locals, queue) ) # start it p.start() # wait for the process to finish or kill it after timeout seconds p.join(timeout) if p.is_alive(): p.terminate() return "Execution timed out" else: self.worker(command, self.globals, self.locals, queue) # get the result from the worker function return queue.get()