Source code for langchain_community.utilities.github
"""Util that calls GitHub."""from__future__importannotationsimportjsonfromtypingimportTYPE_CHECKING,Any,Dict,List,Optionalimportrequestsfromlangchain_core.utilsimportget_from_dict_or_envfrompydanticimportBaseModel,ConfigDict,model_validatorifTYPE_CHECKING:fromgithub.IssueimportIssuefromgithub.PullRequestimportPullRequestdef_import_tiktoken()->Any:"""Import tiktoken."""try:importtiktokenexceptImportError:raiseImportError("tiktoken is not installed. Please install it with `pip install tiktoken`")returntiktoken
[docs]classGitHubAPIWrapper(BaseModel):"""Wrapper for GitHub API."""github:Any=None#: :meta private:github_repo_instance:Any=None#: :meta private:github_repository:Optional[str]=Nonegithub_app_id:Optional[str]=Nonegithub_app_private_key:Optional[str]=Noneactive_branch:Optional[str]=Nonegithub_base_branch:Optional[str]=Nonemodel_config=ConfigDict(extra="forbid",)@model_validator(mode="before")@classmethoddefvalidate_environment(cls,values:Dict)->Any:"""Validate that api key and python package exists in environment."""github_repository=get_from_dict_or_env(values,"github_repository","GITHUB_REPOSITORY")github_app_id=get_from_dict_or_env(values,"github_app_id","GITHUB_APP_ID")github_app_private_key=get_from_dict_or_env(values,"github_app_private_key","GITHUB_APP_PRIVATE_KEY")try:fromgithubimportAuth,GithubIntegrationexceptImportError:raiseImportError("PyGithub is not installed. ""Please install it with `pip install PyGithub`")try:# interpret the key as a file path# fallback to interpreting as the key itselfwithopen(github_app_private_key,"r")asf:private_key=f.read()exceptException:private_key=github_app_private_keyauth=Auth.AppAuth(github_app_id,private_key,)gi=GithubIntegration(auth=auth)installation=gi.get_installations()ifnotinstallation:raiseValueError(f"Please make sure to install the created github app with id "f"{github_app_id} on the repo: {github_repository}""More instructions can be found at ""https://docs.github.com/en/apps/using-""github-apps/installing-your-own-github-app")try:installation=installation[0]exceptValueErrorase:raiseValueError(f"Please make sure to give correct github parameters Error message: {e}")# create a GitHub instance:g=installation.get_github_for_installation()repo=g.get_repo(github_repository)github_base_branch=get_from_dict_or_env(values,"github_base_branch","GITHUB_BASE_BRANCH",default=repo.default_branch,)active_branch=get_from_dict_or_env(values,"active_branch","ACTIVE_BRANCH",default=repo.default_branch,)values["github"]=gvalues["github_repo_instance"]=repovalues["github_repository"]=github_repositoryvalues["github_app_id"]=github_app_idvalues["github_app_private_key"]=github_app_private_keyvalues["active_branch"]=active_branchvalues["github_base_branch"]=github_base_branchreturnvalues
[docs]defparse_issues(self,issues:List[Issue])->List[dict]:""" Extracts title and number from each Issue and puts them in a dictionary Parameters: issues(List[Issue]): A list of Github Issue objects Returns: List[dict]: A dictionary of issue titles and numbers """parsed=[]forissueinissues:title=issue.titlenumber=issue.numberopened_by=issue.user.loginifissue.userelseNoneissue_dict={"title":title,"number":number}ifopened_byisnotNone:issue_dict["opened_by"]=opened_byparsed.append(issue_dict)returnparsed
[docs]defparse_pull_requests(self,pull_requests:List[PullRequest])->List[dict]:""" Extracts title and number from each Issue and puts them in a dictionary Parameters: issues(List[Issue]): A list of Github Issue objects Returns: List[dict]: A dictionary of issue titles and numbers """parsed=[]forprinpull_requests:parsed.append({"title":pr.title,"number":pr.number,"commits":str(pr.commits),"comments":str(pr.comments),})returnparsed
[docs]defget_issues(self)->str:""" Fetches all open issues from the repo excluding pull requests Returns: str: A plaintext report containing the number of issues and each issue's title and number. """issues=self.github_repo_instance.get_issues(state="open")# Filter out pull requests (part of GH issues object)issues=[issueforissueinissuesifnotissue.pull_request]ifissues:parsed_issues=self.parse_issues(issues)parsed_issues_str=("Found "+str(len(parsed_issues))+" issues:\n"+str(parsed_issues))returnparsed_issues_strelse:return"No open issues available"
[docs]deflist_open_pull_requests(self)->str:""" Fetches all open PRs from the repo Returns: str: A plaintext report containing the number of PRs and each PR's title and number. """# issues = self.github_repo_instance.get_issues(state="open")pull_requests=self.github_repo_instance.get_pulls(state="open")ifpull_requests.totalCount>0:parsed_prs=self.parse_pull_requests(pull_requests)parsed_prs_str=("Found "+str(len(parsed_prs))+" pull requests:\n"+str(parsed_prs))returnparsed_prs_strelse:return"No open pull requests available"
[docs]deflist_files_in_main_branch(self)->str:""" Fetches all files in the main branch of the repo. Returns: str: A plaintext report containing the paths and names of the files. """files:List[str]=[]try:contents=self.github_repo_instance.get_contents("",ref=self.github_base_branch)forcontentincontents:ifcontent.type=="dir":files.extend(self._list_files(content.path))else:files.append(content.path)iffiles:files_str="\n".join(files)returnf"Found {len(files)} files in the main branch:\n{files_str}"else:return"No files found in the main branch"exceptExceptionase:returnstr(e)
[docs]defset_active_branch(self,branch_name:str)->str:"""Equivalent to `git checkout branch_name` for this Agent. Clones formatting from Github. Returns an Error (as a string) if branch doesn't exist. """curr_branches=[branch.nameforbranchinself.github_repo_instance.get_branches()]ifbranch_nameincurr_branches:self.active_branch=branch_namereturnf"Switched to branch `{branch_name}`"else:return(f"Error {branch_name} does not exist,"f"in repo with current branches: {str(curr_branches)}")
[docs]deflist_branches_in_repo(self)->str:""" Fetches a list of all branches in the repository. Returns: str: A plaintext report containing the names of the branches. """try:branches=[branch.nameforbranchinself.github_repo_instance.get_branches()]ifbranches:branches_str="\n".join(branches)return(f"Found {len(branches)} branches in the repository:\n{branches_str}")else:return"No branches found in the repository"exceptExceptionase:returnstr(e)
[docs]defcreate_branch(self,proposed_branch_name:str)->str:""" Create a new branch, and set it as the active bot branch. Equivalent to `git switch -c proposed_branch_name` If the proposed branch already exists, we append _v1 then _v2... until a unique name is found. Returns: str: A plaintext success message. """fromgithubimportGithubExceptioni=0new_branch_name=proposed_branch_namebase_branch=self.github_repo_instance.get_branch(self.github_repo_instance.default_branch)foriinrange(1000):try:self.github_repo_instance.create_git_ref(ref=f"refs/heads/{new_branch_name}",sha=base_branch.commit.sha)self.active_branch=new_branch_namereturn(f"Branch '{new_branch_name}' ""created successfully, and set as current active branch.")exceptGithubExceptionase:ife.status==422and"Reference already exists"ine.data["message"]:i+=1new_branch_name=f"{proposed_branch_name}_v{i}"else:# Handle any other exceptionsprint(f"Failed to create branch. Error: {e}")# noqa: T201raiseException("Unable to create branch name from proposed_branch_name: "f"{proposed_branch_name}")return("Unable to create branch. ""At least 1000 branches exist with named derived from "f"proposed_branch_name: `{proposed_branch_name}`")
[docs]deflist_files_in_bot_branch(self)->str:""" Fetches all files in the active branch of the repo, the branch the bot uses to make changes. Returns: str: A plaintext list containing the filepaths in the branch. """files:List[str]=[]try:contents=self.github_repo_instance.get_contents("",ref=self.active_branch)forcontentincontents:ifcontent.type=="dir":files.extend(self._list_files(content.path))else:files.append(content.path)iffiles:files_str="\n".join(files)return(f"Found {len(files)} files in branch `{self.active_branch}`:\n"f"{files_str}")else:returnf"No files found in branch: `{self.active_branch}`"exceptExceptionase:returnf"Error: {e}"
[docs]defget_files_from_directory(self,directory_path:str)->str:""" Recursively fetches files from a directory in the repo. Parameters: directory_path (str): Path to the directory Returns: str: List of file paths, or an error message. """fromgithubimportGithubExceptiontry:returnstr(self._list_files(directory_path))exceptGithubExceptionase:returnf"Error: status code {e.status}, {e.message}"
[docs]defget_issue(self,issue_number:int)->Dict[str,Any]:""" Fetches a specific issue and its first 10 comments Parameters: issue_number(int): The number for the github issue Returns: dict: A dictionary containing the issue's title, body, comments as a string, and the username of the user who opened the issue """issue=self.github_repo_instance.get_issue(number=issue_number)page=0comments:List[dict]=[]whilelen(comments)<=10:comments_page=issue.get_comments().get_page(page)iflen(comments_page)==0:breakforcommentincomments_page:comments.append({"body":comment.body,"user":comment.user.login})page+=1opened_by=Noneifissue.userandissue.user.login:opened_by=issue.user.loginreturn{"number":issue_number,"title":issue.title,"body":issue.body,"comments":str(comments),"opened_by":str(opened_by),}
[docs]deflist_pull_request_files(self,pr_number:int)->List[Dict[str,Any]]:"""Fetches the full text of all files in a PR. Truncates after first 3k tokens. # TODO: Enhancement to summarize files with ctags if they're getting long. Args: pr_number(int): The number of the pull request on Github Returns: dict: A dictionary containing the issue's title, body, and comments as a string """tiktoken=_import_tiktoken()MAX_TOKENS_FOR_FILES=3_000pr_files=[]pr=self.github_repo_instance.get_pull(number=int(pr_number))total_tokens=0page=0whileTrue:# or while (total_tokens + tiktoken()) < MAX_TOKENS_FOR_FILES:files_page=pr.get_files().get_page(page)iflen(files_page)==0:breakforfileinfiles_page:try:file_metadata_response=requests.get(file.contents_url)iffile_metadata_response.status_code==200:download_url=json.loads(file_metadata_response.text)["download_url"]else:print(f"Failed to download file: {file.contents_url}, skipping")# noqa: T201continuefile_content_response=requests.get(download_url)iffile_content_response.status_code==200:# Save the content as a UTF-8 stringfile_content=file_content_response.textelse:print(# noqa: T201"Failed downloading file content "f"(Error {file_content_response.status_code}). Skipping")continuefile_tokens=len(tiktoken.get_encoding("cl100k_base").encode(file_content+file.filename+"file_name file_contents"))if(total_tokens+file_tokens)<MAX_TOKENS_FOR_FILES:pr_files.append({"filename":file.filename,"contents":file_content,"additions":file.additions,"deletions":file.deletions,})total_tokens+=file_tokensexceptExceptionase:print(f"Error when reading files from a PR on github. {e}")# noqa: T201page+=1returnpr_files
[docs]defget_pull_request(self,pr_number:int)->Dict[str,Any]:""" Fetches a specific pull request and its first 10 comments, limited by max_tokens. Parameters: pr_number(int): The number for the Github pull max_tokens(int): The maximum number of tokens in the response Returns: dict: A dictionary containing the pull's title, body, and comments as a string """max_tokens=2_000pull=self.github_repo_instance.get_pull(number=pr_number)total_tokens=0defget_tokens(text:str)->int:tiktoken=_import_tiktoken()returnlen(tiktoken.get_encoding("cl100k_base").encode(text))defadd_to_dict(data_dict:Dict[str,Any],key:str,value:str)->None:nonlocaltotal_tokens# Declare total_tokens as nonlocaltokens=get_tokens(value)iftotal_tokens+tokens<=max_tokens:data_dict[key]=valuetotal_tokens+=tokens# Now this will modify the outer variableresponse_dict:Dict[str,str]={}add_to_dict(response_dict,"title",pull.title)add_to_dict(response_dict,"number",str(pr_number))add_to_dict(response_dict,"body",pull.bodyifpull.bodyelse"")comments:List[str]=[]page=0whilelen(comments)<=10:comments_page=pull.get_issue_comments().get_page(page)iflen(comments_page)==0:breakforcommentincomments_page:comment_str=str({"body":comment.body,"user":comment.user.login})iftotal_tokens+get_tokens(comment_str)>max_tokens:breakcomments.append(comment_str)total_tokens+=get_tokens(comment_str)page+=1add_to_dict(response_dict,"comments",str(comments))commits:List[str]=[]page=0whilelen(commits)<=10:commits_page=pull.get_commits().get_page(page)iflen(commits_page)==0:breakforcommitincommits_page:commit_str=str({"message":commit.commit.message})iftotal_tokens+get_tokens(commit_str)>max_tokens:breakcommits.append(commit_str)total_tokens+=get_tokens(commit_str)page+=1add_to_dict(response_dict,"commits",str(commits))returnresponse_dict
[docs]defcreate_pull_request(self,pr_query:str)->str:""" Makes a pull request from the bot's branch to the base branch Parameters: pr_query(str): a string which contains the PR title and the PR body. The title is the first line in the string, and the body are the rest of the string. For example, "Updated README\nmade changes to add info" Returns: str: A success or failure message """ifself.github_base_branch==self.active_branch:return"""Cannot make a pull request because commits are already in the main or master branch."""else:try:title=pr_query.split("\n")[0]body=pr_query[len(title)+2:]pr=self.github_repo_instance.create_pull(title=title,body=body,head=self.active_branch,base=self.github_base_branch,)returnf"Successfully created PR number {pr.number}"exceptExceptionase:return"Unable to make pull request due to error:\n"+str(e)
[docs]defcomment_on_issue(self,comment_query:str)->str:""" Adds a comment to a github issue Parameters: comment_query(str): a string which contains the issue number, two newlines, and the comment. for example: "1\n\nWorking on it now" adds the comment "working on it now" to issue 1 Returns: str: A success or failure message """issue_number=int(comment_query.split("\n\n")[0])comment=comment_query[len(str(issue_number))+2:]try:issue=self.github_repo_instance.get_issue(number=issue_number)issue.create_comment(comment)return"Commented on issue "+str(issue_number)exceptExceptionase:return"Unable to make comment due to error:\n"+str(e)
[docs]defcreate_file(self,file_query:str)->str:""" Creates a new file on the Github repo Parameters: file_query(str): a string which contains the file path and the file contents. The file path is the first line in the string, and the contents are the rest of the string. For example, "hello_world.md\n# Hello World!" Returns: str: A success or failure message """ifself.active_branch==self.github_base_branch:return("You're attempting to commit to the directly to the"f"{self.github_base_branch} branch, which is protected. ""Please create a new branch and try again.")file_path=file_query.split("\n")[0]file_contents=file_query[len(file_path)+2:]try:try:file=self.github_repo_instance.get_contents(file_path,ref=self.active_branch)iffile:return(f"File already exists at `{file_path}` "f"on branch `{self.active_branch}`. You must use ""`update_file` to modify it.")exceptException:# expected behavior, file shouldn't exist yetpassself.github_repo_instance.create_file(path=file_path,message="Create "+file_path,content=file_contents,branch=self.active_branch,)return"Created file "+file_pathexceptExceptionase:return"Unable to make file due to error:\n"+str(e)
[docs]defread_file(self,file_path:str)->str:""" Read a file from this agent's branch, defined by self.active_branch, which supports PR branches. Parameters: file_path(str): the file path Returns: str: The file decoded as a string, or an error message if not found """try:file=self.github_repo_instance.get_contents(file_path,ref=self.active_branch)returnfile.decoded_content.decode("utf-8")exceptExceptionase:return(f"File not found `{file_path}` on branch"f"`{self.active_branch}`. Error: {str(e)}")
[docs]defupdate_file(self,file_query:str)->str:""" Updates a file with new content. Parameters: file_query(str): Contains the file path and the file contents. The old file contents is wrapped in OLD <<<< and >>>> OLD The new file contents is wrapped in NEW <<<< and >>>> NEW For example: /test/hello.txt OLD <<<< Hello Earth! >>>> OLD NEW <<<< Hello Mars! >>>> NEW Returns: A success or failure message """ifself.active_branch==self.github_base_branch:return("You're attempting to commit to the directly"f"to the {self.github_base_branch} branch, which is protected. ""Please create a new branch and try again.")try:file_path:str=file_query.split("\n")[0]old_file_contents=(file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip())new_file_contents=(file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip())file_content=self.read_file(file_path)updated_file_content=file_content.replace(old_file_contents,new_file_contents)iffile_content==updated_file_content:return("File content was not updated because old content was not found.""It may be helpful to use the read_file action to get ""the current file contents.")self.github_repo_instance.update_file(path=file_path,message="Update "+str(file_path),content=updated_file_content,branch=self.active_branch,sha=self.github_repo_instance.get_contents(file_path,ref=self.active_branch).sha,)return"Updated file "+str(file_path)exceptExceptionase:return"Unable to update file due to error:\n"+str(e)
[docs]defdelete_file(self,file_path:str)->str:""" Deletes a file from the repo Parameters: file_path(str): Where the file is Returns: str: Success or failure message """ifself.active_branch==self.github_base_branch:return("You're attempting to commit to the directly"f"to the {self.github_base_branch} branch, which is protected. ""Please create a new branch and try again.")try:self.github_repo_instance.delete_file(path=file_path,message="Delete "+file_path,branch=self.active_branch,sha=self.github_repo_instance.get_contents(file_path,ref=self.active_branch).sha,)return"Deleted file "+file_pathexceptExceptionase:return"Unable to delete file due to error:\n"+str(e)
[docs]defsearch_issues_and_prs(self,query:str)->str:""" Searches issues and pull requests in the repository. Parameters: query(str): The search query Returns: str: A string containing the first 5 issues and pull requests """search_result=self.github.search_issues(query,repo=self.github_repository)max_items=min(5,search_result.totalCount)results=[f"Top {max_items} results:"]forissueinsearch_result[:max_items]:results.append(f"Title: {issue.title}, Number: {issue.number}, State: {issue.state}")return"\n".join(results)
[docs]defsearch_code(self,query:str)->str:""" Searches code in the repository. # Todo: limit total tokens returned... Parameters: query(str): The search query Returns: str: A string containing, at most, the top 5 search results """search_result=self.github.search_code(query=query,repo=self.github_repository)ifsearch_result.totalCount==0:return"0 results found."max_results=min(5,search_result.totalCount)results=[f"Showing top {max_results} of {search_result.totalCount} results:"]count=0forcodeinsearch_result:ifcount>=max_results:break# Get the file content using the PyGithub get_contents methodfile_content=self.github_repo_instance.get_contents(code.path,ref=self.active_branch).decoded_content.decode()results.append(f"Filepath: `{code.path}`\nFile contents: {file_content}\n<END OF FILE>")count+=1return"\n".join(results)
[docs]defcreate_review_request(self,reviewer_username:str)->str:""" Creates a review request on *THE* open pull request that matches the current active_branch. Parameters: reviewer_username(str): The username of the person who is being requested Returns: str: A message confirming the creation of the review request """pull_requests=self.github_repo_instance.get_pulls(state="open",sort="created")# find PR against active_branchpr=next((prforprinpull_requestsifpr.head.ref==self.active_branch),None)ifprisNone:return("No open pull request found for the "f"current branch `{self.active_branch}`")try:pr.create_review_request(reviewers=[reviewer_username])return(f"Review request created for user {reviewer_username} "f"on PR #{pr.number}")exceptExceptionase:returnf"Failed to create a review request with error {e}"
[docs]defget_latest_release(self)->str:""" Fetches the latest release of the repository. Returns: str: The latest release """release=self.github_repo_instance.get_latest_release()return(f"Latest title: {release.title} "f"tag: {release.tag_name} "f"body: {release.body}")
[docs]defget_releases(self)->str:""" Fetches all releases of the repository. Returns: str: The releases """releases=self.github_repo_instance.get_releases()max_results=min(5,releases.totalCount)results=[f"Top {max_results} results:"]forreleaseinreleases[:max_results]:results.append(f"Title: {release.title}, Tag: {release.tag_name}, Body: {release.body}")return"\n".join(results)
[docs]defget_release(self,tag_name:str)->str:""" Fetches a specific release of the repository. Parameters: tag_name(str): The tag name of the release Returns: str: The release """release=self.github_repo_instance.get_release(tag_name)returnf"Release: {release.title} tag: {release.tag_name} body: {release.body}"