Source code for langchain_community.tools.ainetwork.base
from __future__ import annotations
import asyncio
import threading
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional
from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.pydantic_v1 import Field
from langchain_core.tools import BaseTool
from langchain_community.tools.ainetwork.utils import authenticate
if TYPE_CHECKING:
from ain.ain import Ain
[docs]class OperationType(str, Enum):
"""Type of operation as enumerator."""
SET = "SET"
GET = "GET"
[docs]class AINBaseTool(BaseTool):
"""Base class for the AINetwork tools."""
interface: Ain = Field(default_factory=authenticate)
"""The interface object for the AINetwork Blockchain."""
def _run(
self,
*args: Any,
run_manager: Optional[CallbackManagerForToolRun] = None,
**kwargs: Any,
) -> str:
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if loop.is_running():
result_container = []
def thread_target() -> None:
nonlocal result_container
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
result_container.append(
new_loop.run_until_complete(self._arun(*args, **kwargs))
)
except Exception as e:
result_container.append(e)
finally:
new_loop.close()
thread = threading.Thread(target=thread_target)
thread.start()
thread.join()
result = result_container[0]
if isinstance(result, Exception):
raise result
return result
else:
result = loop.run_until_complete(self._arun(*args, **kwargs))
loop.close()
return result