Source code for langchain_experimental.autonomous_agents.hugginggpt.task_executor
import copy
import uuid
from typing import Dict, List
import numpy as np
from langchain_core.tools import BaseTool
from langchain_experimental.autonomous_agents.hugginggpt.task_planner import Plan
[docs]class Task:
"""Task to be executed."""
[docs] def __init__(self, task: str, id: int, dep: List[int], args: Dict, tool: BaseTool):
self.task = task
self.id = id
self.dep = dep
self.args = args
self.tool = tool
self.status = "pending"
self.message = ""
self.result = ""
def __str__(self) -> str:
return f"{self.task}({self.args})"
[docs] def save_product(self) -> None:
import cv2
if self.task == "video_generator":
# ndarray to video
product = np.array(self.product)
nframe, height, width, _ = product.shape
video_filename = uuid.uuid4().hex[:6] + ".mp4"
fps = 30 # Frames per second
fourcc = cv2.VideoWriter_fourcc(*"mp4v") # type: ignore
video_out = cv2.VideoWriter(video_filename, fourcc, fps, (width, height))
for frame in self.product:
video_out.write(frame)
video_out.release()
self.result = video_filename
elif self.task == "image_generator":
# PIL.Image to image
filename = uuid.uuid4().hex[:6] + ".png"
self.product.save(filename) # type: ignore
self.result = filename
[docs] def completed(self) -> bool:
return self.status == "completed"
[docs] def failed(self) -> bool:
return self.status == "failed"
[docs] def pending(self) -> bool:
return self.status == "pending"
[docs] def run(self) -> str:
from diffusers.utils import load_image
try:
new_args = copy.deepcopy(self.args)
for k, v in new_args.items():
if k == "image":
new_args["image"] = load_image(v)
if self.task in ["video_generator", "image_generator", "text_reader"]:
self.product = self.tool(**new_args)
else:
self.result = self.tool(**new_args)
except Exception as e:
self.status = "failed"
self.message = str(e)
return self.message
self.status = "completed"
self.save_product()
return self.result
[docs]class TaskExecutor:
"""Load tools and execute tasks."""
[docs] def __init__(self, plan: Plan):
self.plan = plan
self.tasks = []
self.id_task_map = {}
self.status = "pending"
for step in self.plan.steps:
task = Task(step.task, step.id, step.dep, step.args, step.tool)
self.tasks.append(task)
self.id_task_map[step.id] = task
[docs] def completed(self) -> bool:
return all(task.completed() for task in self.tasks)
[docs] def failed(self) -> bool:
return any(task.failed() for task in self.tasks)
[docs] def pending(self) -> bool:
return any(task.pending() for task in self.tasks)
[docs] def check_dependency(self, task: Task) -> bool:
for dep_id in task.dep:
if dep_id == -1:
continue
dep_task = self.id_task_map[dep_id]
if dep_task.failed() or dep_task.pending():
return False
return True
[docs] def update_args(self, task: Task) -> None:
for dep_id in task.dep:
if dep_id == -1:
continue
dep_task = self.id_task_map[dep_id]
for k, v in task.args.items():
if f"<resource-{dep_id}>" in v:
task.args[k] = task.args[k].replace(
f"<resource-{dep_id}>", dep_task.result
)
[docs] def run(self) -> str:
for task in self.tasks:
print(f"running {task}") # noqa: T201
if task.pending() and self.check_dependency(task):
self.update_args(task)
task.run()
if self.completed():
self.status = "completed"
elif self.failed():
self.status = "failed"
else:
self.status = "pending"
return self.status
def __str__(self) -> str:
result = ""
for task in self.tasks:
result += f"{task}\n"
result += f"status: {task.status}\n"
if task.failed():
result += f"message: {task.message}\n"
if task.completed():
result += f"result: {task.result}\n"
return result
def __repr__(self) -> str:
return self.__str__()
[docs] def describe(self) -> str:
return self.__str__()