Source code for langchain_experimental.autonomous_agents.hugginggpt.task_executor

import copy
import uuid
from typing import Dict, List

import numpy as np
from langchain_core.tools import BaseTool

from langchain_experimental.autonomous_agents.hugginggpt.task_planner import Plan


[docs]class Task: """Task to be executed."""
[docs] def __init__(self, task: str, id: int, dep: List[int], args: Dict, tool: BaseTool): self.task = task self.id = id self.dep = dep self.args = args self.tool = tool self.status = "pending" self.message = "" self.result = ""
def __str__(self) -> str: return f"{self.task}({self.args})"
[docs] def save_product(self) -> None: import cv2 if self.task == "video_generator": # ndarray to video product = np.array(self.product) nframe, height, width, _ = product.shape video_filename = uuid.uuid4().hex[:6] + ".mp4" fps = 30 # Frames per second fourcc = cv2.VideoWriter_fourcc(*"mp4v") # type: ignore video_out = cv2.VideoWriter(video_filename, fourcc, fps, (width, height)) for frame in self.product: video_out.write(frame) video_out.release() self.result = video_filename elif self.task == "image_generator": # PIL.Image to image filename = uuid.uuid4().hex[:6] + ".png" self.product.save(filename) # type: ignore self.result = filename
[docs] def completed(self) -> bool: return self.status == "completed"
[docs] def failed(self) -> bool: return self.status == "failed"
[docs] def pending(self) -> bool: return self.status == "pending"
[docs] def run(self) -> str: from diffusers.utils import load_image try: new_args = copy.deepcopy(self.args) for k, v in new_args.items(): if k == "image": new_args["image"] = load_image(v) if self.task in ["video_generator", "image_generator", "text_reader"]: self.product = self.tool(**new_args) else: self.result = self.tool(**new_args) except Exception as e: self.status = "failed" self.message = str(e) return self.message self.status = "completed" self.save_product() return self.result
[docs]class TaskExecutor: """Load tools and execute tasks."""
[docs] def __init__(self, plan: Plan): self.plan = plan self.tasks = [] self.id_task_map = {} self.status = "pending" for step in self.plan.steps: task = Task(step.task, step.id, step.dep, step.args, step.tool) self.tasks.append(task) self.id_task_map[step.id] = task
[docs] def completed(self) -> bool: return all(task.completed() for task in self.tasks)
[docs] def failed(self) -> bool: return any(task.failed() for task in self.tasks)
[docs] def pending(self) -> bool: return any(task.pending() for task in self.tasks)
[docs] def check_dependency(self, task: Task) -> bool: for dep_id in task.dep: if dep_id == -1: continue dep_task = self.id_task_map[dep_id] if dep_task.failed() or dep_task.pending(): return False return True
[docs] def update_args(self, task: Task) -> None: for dep_id in task.dep: if dep_id == -1: continue dep_task = self.id_task_map[dep_id] for k, v in task.args.items(): if f"<resource-{dep_id}>" in v: task.args[k] = task.args[k].replace( f"<resource-{dep_id}>", dep_task.result )
[docs] def run(self) -> str: for task in self.tasks: print(f"running {task}") # noqa: T201 if task.pending() and self.check_dependency(task): self.update_args(task) task.run() if self.completed(): self.status = "completed" elif self.failed(): self.status = "failed" else: self.status = "pending" return self.status
def __str__(self) -> str: result = "" for task in self.tasks: result += f"{task}\n" result += f"status: {task.status}\n" if task.failed(): result += f"message: {task.message}\n" if task.completed(): result += f"result: {task.result}\n" return result def __repr__(self) -> str: return self.__str__()
[docs] def describe(self) -> str: return self.__str__()