[docs]classEtherscanLoader(BaseLoader):"""Load transactions from `Ethereum` mainnet. The Loader use Etherscan API to interact with Ethereum mainnet. ETHERSCAN_API_KEY environment variable must be set use this loader. """
[docs]def__init__(self,account_address:str,api_key:str="docs-demo",filter:str="normal_transaction",page:int=1,offset:int=10,start_block:int=0,end_block:int=99999999,sort:str="desc",):self.account_address=account_addressself.api_key=os.environ.get("ETHERSCAN_API_KEY")orapi_keyself.filter=filterself.page=pageself.offset=offsetself.start_block=start_blockself.end_block=end_blockself.sort=sortifnotself.api_key:raiseValueError("Etherscan API key not provided")ifnotre.match(r"^0x[a-fA-F0-9]{40}$",self.account_address):raiseValueError(f"Invalid contract address {self.account_address}")iffilternotin["normal_transaction","internal_transaction","erc20_transaction","eth_balance","erc721_transaction","erc1155_transaction",]:raiseValueError(f"Invalid filter {filter}")
[docs]deflazy_load(self)->Iterator[Document]:"""Lazy load Documents from table."""result=[]ifself.filter=="normal_transaction":result=self.getNormTx()elifself.filter=="internal_transaction":result=self.getInternalTx()elifself.filter=="erc20_transaction":result=self.getERC20Tx()elifself.filter=="eth_balance":result=self.getEthBalance()elifself.filter=="erc721_transaction":result=self.getERC721Tx()elifself.filter=="erc1155_transaction":result=self.getERC1155Tx()else:raiseValueError(f"Invalid filter {filter}")fordocinresult:yielddoc
[docs]defgetNormTx(self)->List[Document]:url=(f"https://api.etherscan.io/api?module=account&action=txlist&address={self.account_address}"f"&startblock={self.start_block}&endblock={self.end_block}&page={self.page}"f"&offset={self.offset}&sort={self.sort}&apikey={self.api_key}")try:response=requests.get(url)response.raise_for_status()exceptrequests.exceptions.RequestExceptionase:print("Error occurred while making the request:",e)# noqa: T201items=response.json()["result"]result=[]iflen(items)==0:return[Document(page_content="")]foriteminitems:content=str(item)metadata={"from":item["from"],"tx_hash":item["hash"],"to":item["to"]}result.append(Document(page_content=content,metadata=metadata))print(len(result))# noqa: T201returnresult
[docs]defgetEthBalance(self)->List[Document]:url=(f"https://api.etherscan.io/api?module=account&action=balance"f"&address={self.account_address}&tag=latest&apikey={self.api_key}")try:response=requests.get(url)response.raise_for_status()exceptrequests.exceptions.RequestExceptionase:print("Error occurred while making the request:",e)# noqa: T201return[Document(page_content=response.json()["result"])]
[docs]defgetInternalTx(self)->List[Document]:url=(f"https://api.etherscan.io/api?module=account&action=txlistinternal"f"&address={self.account_address}&startblock={self.start_block}"f"&endblock={self.end_block}&page={self.page}&offset={self.offset}"f"&sort={self.sort}&apikey={self.api_key}")try:response=requests.get(url)response.raise_for_status()exceptrequests.exceptions.RequestExceptionase:print("Error occurred while making the request:",e)# noqa: T201items=response.json()["result"]result=[]iflen(items)==0:return[Document(page_content="")]foriteminitems:content=str(item)metadata={"from":item["from"],"tx_hash":item["hash"],"to":item["to"]}result.append(Document(page_content=content,metadata=metadata))returnresult
[docs]defgetERC20Tx(self)->List[Document]:url=(f"https://api.etherscan.io/api?module=account&action=tokentx"f"&address={self.account_address}&startblock={self.start_block}"f"&endblock={self.end_block}&page={self.page}&offset={self.offset}"f"&sort={self.sort}&apikey={self.api_key}")try:response=requests.get(url)response.raise_for_status()exceptrequests.exceptions.RequestExceptionase:print("Error occurred while making the request:",e)# noqa: T201items=response.json()["result"]result=[]iflen(items)==0:return[Document(page_content="")]foriteminitems:content=str(item)metadata={"from":item["from"],"tx_hash":item["hash"],"to":item["to"]}result.append(Document(page_content=content,metadata=metadata))returnresult
[docs]defgetERC721Tx(self)->List[Document]:url=(f"https://api.etherscan.io/api?module=account&action=tokennfttx"f"&address={self.account_address}&startblock={self.start_block}"f"&endblock={self.end_block}&page={self.page}&offset={self.offset}"f"&sort={self.sort}&apikey={self.api_key}")try:response=requests.get(url)response.raise_for_status()exceptrequests.exceptions.RequestExceptionase:print("Error occurred while making the request:",e)# noqa: T201items=response.json()["result"]result=[]iflen(items)==0:return[Document(page_content="")]foriteminitems:content=str(item)metadata={"from":item["from"],"tx_hash":item["hash"],"to":item["to"]}result.append(Document(page_content=content,metadata=metadata))returnresult
[docs]defgetERC1155Tx(self)->List[Document]:url=(f"https://api.etherscan.io/api?module=account&action=token1155tx"f"&address={self.account_address}&startblock={self.start_block}"f"&endblock={self.end_block}&page={self.page}&offset={self.offset}"f"&sort={self.sort}&apikey={self.api_key}")try:response=requests.get(url)response.raise_for_status()exceptrequests.exceptions.RequestExceptionase:print("Error occurred while making the request:",e)# noqa: T201items=response.json()["result"]result=[]iflen(items)==0:return[Document(page_content="")]foriteminitems:content=str(item)metadata={"from":item["from"],"tx_hash":item["hash"],"to":item["to"]}result.append(Document(page_content=content,metadata=metadata))returnresult