Spaces:
Sleeping
Sleeping
| from smolagents import CodeAgent, HfApiModel,tool | |
| import datetime | |
| import requests | |
| import pytz | |
| import yaml | |
| from tools.final_answer import FinalAnswerTool | |
| import re | |
| import ast | |
| from typing import List | |
| from huggingface_hub import login | |
| import os | |
| from CustomGradioUI import CustomGradioUI | |
| from Gradio_UI import GradioUI | |
| def get_open_pull_requests(github_url: str) -> str: | |
| """Fetches a list of open pull requests for a given GitHub repository. | |
| Args: | |
| github_url: The URL of the GitHub repository where the pull requests should be retrieved. | |
| (e.g., 'https://github.com/LukeMattingly/huggingface-agents-course', | |
| 'https://github.com/upb-lea/reinforcement_learning_course_materials'). | |
| Returns: | |
| A string containing the list of open pull requests with their titles and links. | |
| If no pull requests are open, returns a message indicating no PRs were found. | |
| """ | |
| try: | |
| owner_repo = github_url.replace("https://github.com/", "") | |
| api_url = f"https://api.github.com/repos/{owner_repo}/pulls" | |
| response = requests.get(api_url) | |
| if response.status_code != 200: | |
| return f"Error fetching PRs: {response.json().get('message', 'Unknown error')}" | |
| pull_requests = response.json() | |
| if not pull_requests: | |
| return "No open pull requests found." | |
| return "\n".join([f"PR #{pr['number']}: {pr['title']} - {pr['html_url']}" for pr in pull_requests]) | |
| except Exception as e: | |
| return f"Error retrieving pull requests: {str(e)}" | |
| def find_todo_comments(code: str) -> str: | |
| """Finds TODO and FIXME comments in the provided code. | |
| Args: | |
| code: The source code in which to search for TODO and FIXME comments. | |
| Returns: | |
| A string listing all TODO and FIXME comments found in the code. | |
| If no comments are found, returns a message indicating that no TODO or FIXME comments exist. | |
| """ | |
| matches = re.findall(r"#\s*(TODO|FIXME):?\s*(.*)", code, re.IGNORECASE) | |
| if not matches: | |
| return "No TODO or FIXME comments found." | |
| return "\n".join([f"{match[0]}: {match[1]}" for match in matches]) | |
| def get_pr_diff(github_url: str, pr_number: int, start_line: int = None, end_line: int = None, total_lines: int = None) -> str: | |
| """Fetches the code diff of a specific pull request and returns a subset of lines as requested. | |
| Args: | |
| github_url: The URL of the GitHub repository where the pull request is located. | |
| (e.g., 'https://github.com/crewAIInc/crewAI'). | |
| pr_number: The pull request number for which the code diff should be retrieved. | |
| start_line: Optional; the starting line number (1-indexed) of the diff to return. | |
| end_line: Optional; the ending line number (1-indexed) of the diff to return. | |
| total_lines: Optional; if provided, returns the first 'total_lines' lines of the diff. | |
| This parameter is ignored if both start_line and end_line are provided. | |
| Returns: | |
| A string containing the requested portion of the code diff of the specified pull request. | |
| If the diff cannot be retrieved or if invalid parameters are provided, returns an error message. | |
| """ | |
| try: | |
| owner_repo = github_url.replace("https://github.com/", "") | |
| api_url = f"https://api.github.com/repos/{owner_repo}/pulls/{pr_number}" | |
| response = requests.get(api_url, headers={"Accept": "application/vnd.github.v3.diff"}) | |
| if response.status_code != 200: | |
| return f"Error fetching PR diff: {response.json().get('message', 'Unknown error')}" | |
| diff_text = response.text | |
| # Split the diff into individual lines | |
| diff_lines = diff_text.splitlines() | |
| # Determine which subset of lines to return: | |
| if start_line is not None or end_line is not None: | |
| if start_line is None or end_line is None: | |
| return "Error: Both start_line and end_line must be provided if specifying a range." | |
| # Adjust for 1-indexed line numbers provided by the user. | |
| diff_lines = diff_lines[start_line - 1:end_line] | |
| elif total_lines is not None: | |
| diff_lines = diff_lines[:total_lines] | |
| return "\n".join(diff_lines) | |
| except Exception as e: | |
| return f"Error retrieving PR diff: {str(e)}" | |
| def get_pr_diff_for_file(github_url: str, pr_number: int, file_path: str) -> str: | |
| """Fetches the code diff for a specific file in a given pull request. | |
| Args: | |
| github_url: The URL of the GitHub repository where the pull request is located. | |
| (e.g., 'https://github.com/crewAIInc/crewAI'). | |
| pr_number: The pull request number for which the diff should be retrieved. | |
| file_path: The relative path of the file within the repository to retrieve the diff for | |
| (e.g., 'src/module.py'). | |
| Returns: | |
| A string containing the code diff (patch) for the specified file in the pull request. | |
| If the file is not found in the PR or if its diff is not available, returns an error message. | |
| """ | |
| try: | |
| # Extract owner and repo from the URL | |
| owner_repo = github_url.replace("https://github.com/", "") | |
| # API endpoint to get files changed in the PR | |
| api_url = f"https://api.github.com/repos/{owner_repo}/pulls/{pr_number}/files" | |
| response = requests.get(api_url) | |
| if response.status_code != 200: | |
| return f"Error fetching PR files: {response.json().get('message', 'Unknown error')}" | |
| files = response.json() | |
| # Look for the specific file in the list | |
| for file_info in files: | |
| if file_info.get('filename') == file_path: | |
| patch = file_info.get('patch') | |
| if patch: | |
| return patch | |
| else: | |
| return f"No diff (patch) available for file: {file_path}" | |
| return f"File '{file_path}' not found in the pull request." | |
| except Exception as e: | |
| return f"Error retrieving PR diff for file: {str(e)}" | |
| def get_pr_files_changed(github_url: str, pr_number: int) -> List[str]: | |
| """Retrieves the list of files changed in a given pull request. | |
| Args: | |
| github_url: The URL of the GitHub repository where the pull request is located. | |
| pr_number: The pull request number for which the changed files should be retrieved. | |
| Returns: | |
| A list of strings, where each string is a file path that was modified in the specified pull request. | |
| If no files are found or an error occurs, returns a list with an appropriate error message. | |
| """ | |
| try: | |
| owner_repo = github_url.replace("https://github.com/", "") | |
| api_url = f"https://api.github.com/repos/{owner_repo}/pulls/{pr_number}/files" | |
| response = requests.get(api_url) | |
| if response.status_code != 200: | |
| return [f"Error fetching PR files: {response.json().get('message', 'Unknown error')}"] | |
| files = response.json() | |
| files_changed = [file['filename'] for file in files] | |
| print(files_changed) | |
| return files_changed | |
| except Exception as e: | |
| return [f"Error retrieving files for PR #{pr_number}: {str(e)}"] | |
| #Helper Function | |
| def diff_to_code(diff: str) -> str: | |
| """ | |
| Converts a unified diff string into a regular code string by extracting | |
| added and context lines, while ignoring diff metadata and removed lines. | |
| Args: | |
| diff: A unified diff string representing code changes. | |
| Returns: | |
| A string containing the reconstructed code. | |
| """ | |
| code_lines = [] | |
| for line in diff.splitlines(): | |
| # Skip diff metadata lines | |
| if line.startswith("diff") or line.startswith("index") or line.startswith("---") or line.startswith("+++"): | |
| continue | |
| # Skip hunk headers (lines starting with @@) | |
| if re.match(r'^@@', line): | |
| continue | |
| # Skip removal lines (lines starting with '-') | |
| if line.startswith("-"): | |
| continue | |
| # For added lines, remove the '+' prefix | |
| if line.startswith("+"): | |
| code_lines.append(line[1:]) | |
| # For context lines (starting with a space), remove the leading space | |
| elif line.startswith(" "): | |
| code_lines.append(line[1:]) | |
| else: | |
| code_lines.append(line) | |
| return "\n".join(code_lines) | |
| ''' | |
| @tool | |
| def detect_code_smells(code: str) -> str: | |
| """Detects common code smells such as long functions and deeply nested loops. | |
| Args: | |
| code: The source code to analyze for potential code smells. | |
| Returns: | |
| A string listing detected code smells, including long functions and deeply nested loops. | |
| If no code smells are found, returns a message indicating the code is clean. | |
| """ | |
| try: | |
| tree = ast.parse(code) | |
| issues = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.FunctionDef) and len(node.body) > 20: | |
| issues.append(f"Long function detected: {node.name} ({len(node.body)} lines)") | |
| if isinstance(node, ast.For) or isinstance(node, ast.While): | |
| nested_loops = sum(isinstance(n, (ast.For, ast.While)) for n in ast.walk(node)) | |
| if nested_loops > 2: | |
| issues.append(f"Deeply nested loop detected in function: {node.lineno}") | |
| return "\n".join(issues) if issues else "No code smells detected." | |
| except Exception as e: | |
| return f"Error analyzing code: {str(e)}" | |
| ''' | |
| def detect_code_smells_diff(diff: str) -> str: | |
| """Detects common code smells such as long functions and deeply nested loops from a code diff. | |
| Args: | |
| diff: A unified diff string representing changes in code to analyze for potential code smells. | |
| Returns: | |
| A string listing detected code smells based on the added and context code lines. | |
| If no code smells are found, returns a message indicating the code is clean. | |
| """ | |
| try: | |
| # Use the helper function to convert the diff into a code string. | |
| code = diff_to_code(diff) | |
| tree = ast.parse(code) | |
| issues = [] | |
| for node in ast.walk(tree): | |
| # Detect long functions (more than 20 statements) | |
| if isinstance(node, ast.FunctionDef) and len(node.body) > 20: | |
| issues.append(f"Long function detected: {node.name} ({len(node.body)} lines)") | |
| # Detect deeply nested loops by counting nested For/While nodes | |
| if isinstance(node, (ast.For, ast.While)): | |
| nested_loops = sum(isinstance(n, (ast.For, ast.While)) for n in ast.walk(node)) | |
| if nested_loops > 2: | |
| issues.append(f"Deeply nested loop detected at line {node.lineno}") | |
| return "\n".join(issues) if issues else "No code smells detected." | |
| except Exception as e: | |
| return f"Error analyzing code diff: {str(e)}" | |
| ''' | |
| @tool | |
| def get_file_content(github_url: str, file_path: str) -> str: | |
| """Fetches the content of a specific file from the GitHub repository. | |
| Args: | |
| github_url: The URL of the GitHub repository (e.g., 'https://github.com/user/repo'). | |
| file_path: The relative path of the file within the repository (e.g., 'src/module.py'). | |
| Returns: | |
| A string containing the file's content or an error message if retrieval fails. | |
| """ | |
| try: | |
| owner_repo = github_url.replace("https://github.com/", "") | |
| api_url = f"https://raw.githubusercontent.com/{owner_repo}/main/{file_path}" | |
| response = requests.get(api_url) | |
| if response.status_code != 200: | |
| return f"Error fetching file content: {response.status_code}" | |
| return response.text | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| ''' | |
| def security_check_code_diff(diff: str) -> str: | |
| """Analyzes the provided code diff for potential security vulnerabilities. | |
| Args: | |
| diff: A unified diff string representing changes in code. The source code to be analyzed for common security issues (e.g., hardcoded secrets, unsafe functions). | |
| Returns: | |
| A string listing detected potential security vulnerabilities based on common patterns (e.g., hardcoded credentials, | |
| risky usage of functions like eval or os.system, and simple SQL injection risks). If no issues are found, returns a message indicating the code is secure. | |
| """ | |
| import re | |
| issues = [] | |
| code = diff_to_code(diff) | |
| # Check for hardcoded credentials (case-insensitive search) | |
| secret_patterns = [ | |
| r'(?i)api[-_]?key\s*=\s*[\'"].+[\'"]', | |
| r'(?i)secret\s*=\s*[\'"].+[\'"]', | |
| r'(?i)password\s*=\s*[\'"].+[\'"]', | |
| r'(?i)token\s*=\s*[\'"].+[\'"]' | |
| ] | |
| for pattern in secret_patterns: | |
| matches = re.findall(pattern, code) | |
| if matches: | |
| issues.append("Potential hardcoded credential(s) found: " + ", ".join(matches)) | |
| # Check for usage of eval() which can be dangerous | |
| if "eval(" in code: | |
| issues.append("Usage of eval() detected, which can lead to security vulnerabilities if misused.") | |
| # Check for potential command injection risks with os.system | |
| if "os.system(" in code: | |
| issues.append("Usage of os.system() detected; consider using safer alternatives to avoid command injection risks.") | |
| # Check for simple SQL injection patterns (heuristic) | |
| sql_injection_patterns = [ | |
| r"execute\(.+\+.+\)", | |
| r"format\(.+%\(.+\)s.+\)" | |
| ] | |
| for pattern in sql_injection_patterns: | |
| matches = re.findall(pattern, code) | |
| if matches: | |
| issues.append("Potential SQL injection risk found in statements: " + ", ".join(matches)) | |
| if issues: | |
| return "\n".join(issues) | |
| else: | |
| return "No obvious security vulnerabilities detected based on heuristic analysis." | |
| def check_documentation_updates(changed_files: str) -> str: | |
| """Checks whether documentation files have been updated alongside code changes. | |
| Args: | |
| changed_files: A newline-separated string listing the file paths changed in a commit or pull request. | |
| Returns: | |
| A string indicating whether documentation appears to have been updated or if it might be missing. | |
| """ | |
| files = [f.strip() for f in changed_files.splitlines() if f.strip()] | |
| doc_files = [f for f in files if "readme" in f.lower() or "docs" in f.lower()] | |
| if doc_files: | |
| return "Documentation files were updated." | |
| else: | |
| return "No documentation updates detected. Consider reviewing the docs to ensure they reflect the new changes." | |
| def lint_code(diff: str) -> str: | |
| """Analyzes the provided code snippet for style and potential issues using a linter. | |
| Args: | |
| diff: The source code to be analyzed. | |
| Returns: | |
| A string with linting warnings and suggestions for improvement, or a message indicating that no issues were found. | |
| """ | |
| # This is a placeholder; you could integrate pylint or flake8 via subprocess or an API. | |
| # For demonstration, we'll simulate a response. | |
| issues = [] | |
| code = diff_to_code(diff) | |
| if "print(" in code: | |
| issues.append("Consider removing debug print statements.") | |
| if not issues: | |
| return "No linting issues found." | |
| return "\n".join(issues) | |
| final_answer = FinalAnswerTool() | |
| # If the agent does not answer, the model is overloaded, please use another model or the following Hugging Face Endpoint that also contains qwen2.5 coder: | |
| # model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud' | |
| #local dev only? | |
| #hf_token = os.getenv("HF_TOKEN") | |
| #if hf_token: | |
| # login(token=hf_token) | |
| model = HfApiModel( | |
| max_tokens=2096, | |
| temperature=0.5, | |
| model_id='Qwen/Qwen2.5-Coder-32B-Instruct',# it is possible that this model may be overloaded deepseek-ai/DeepSeek-R1-Distill-Qwen-32B || Qwen/Qwen2.5-Coder-32B-Instruct | |
| custom_role_conversions=None, | |
| ) | |
| with open("prompts.yaml", 'r') as stream: | |
| prompt_templates = yaml.safe_load(stream) | |
| agent = CodeAgent( | |
| model=model, | |
| tools=[final_answer, get_open_pull_requests, find_todo_comments, get_pr_diff, get_pr_files_changed, detect_code_smells_diff, security_check_code_diff, check_documentation_updates, lint_code, get_pr_diff_for_file ], ## add your tools here (don't remove final answer) | |
| max_steps=6, | |
| verbosity_level=1, | |
| grammar=None, | |
| planning_interval=None, | |
| name=None, | |
| description=None, | |
| prompt_templates=prompt_templates | |
| ) | |
| CustomGradioUI(agent).launch() |