mirror of
https://github.com/0x5t4l1n/AURHub.git
synced 2026-05-26 19:26:35 +00:00
247 lines
7.1 KiB
Python
247 lines
7.1 KiB
Python
"""
|
|
Pacman service for ArchStore.
|
|
Handles all interactions with the pacman package manager via subprocess.
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
from typing import Optional
|
|
from utils.sanitize import sanitize_package_name
|
|
|
|
|
|
async def _run_command(cmd: list[str], timeout: int = 30) -> tuple[str, str, int]:
|
|
"""
|
|
Run a shell command safely using argument list (no shell=True).
|
|
Returns (stdout, stderr, returncode).
|
|
"""
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(
|
|
proc.communicate(), timeout=timeout
|
|
)
|
|
return (
|
|
stdout.decode("utf-8", errors="replace"),
|
|
stderr.decode("utf-8", errors="replace"),
|
|
proc.returncode,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
return "", "Command timed out", -1
|
|
except Exception as e:
|
|
return "", str(e), -1
|
|
|
|
|
|
def _parse_search_results(output: str) -> list[dict]:
|
|
"""Parse pacman -Ss output into structured results."""
|
|
packages = []
|
|
lines = output.strip().split("\n")
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
# Match: repo/name version [installed] or repo/name version
|
|
match = re.match(
|
|
r'^(\S+)/(\S+)\s+(\S+)(?:\s+\[installed(?::?\s*(\S+))?\])?\s*$',
|
|
line
|
|
)
|
|
if match:
|
|
repo = match.group(1)
|
|
name = match.group(2)
|
|
version = match.group(3)
|
|
installed = match.group(4) is not None or "[installed" in line
|
|
description = ""
|
|
if i + 1 < len(lines) and lines[i + 1].startswith(" "):
|
|
description = lines[i + 1].strip()
|
|
i += 1
|
|
packages.append({
|
|
"name": name,
|
|
"version": version,
|
|
"description": description,
|
|
"repository": repo,
|
|
"source": "pacman",
|
|
"installed": installed,
|
|
})
|
|
i += 1
|
|
return packages
|
|
|
|
|
|
def _parse_package_info(output: str) -> dict:
|
|
"""Parse pacman -Si or -Qi output into a dict."""
|
|
info = {}
|
|
current_key = None
|
|
current_value = []
|
|
|
|
for line in output.split("\n"):
|
|
if ":" in line and not line.startswith(" "):
|
|
if current_key:
|
|
info[current_key] = " ".join(current_value).strip()
|
|
parts = line.split(":", 1)
|
|
current_key = parts[0].strip().lower().replace(" ", "_")
|
|
current_value = [parts[1].strip()] if len(parts) > 1 else []
|
|
elif line.startswith(" ") and current_key:
|
|
current_value.append(line.strip())
|
|
|
|
if current_key:
|
|
info[current_key] = " ".join(current_value).strip()
|
|
|
|
return info
|
|
|
|
|
|
async def search_packages(query: str) -> list[dict]:
|
|
"""Search pacman repositories."""
|
|
stdout, stderr, code = await _run_command(
|
|
["pacman", "-Ss", query], timeout=15
|
|
)
|
|
if code != 0:
|
|
return []
|
|
return _parse_search_results(stdout)
|
|
|
|
|
|
async def get_package_info(name: str) -> Optional[dict]:
|
|
"""Get detailed info about a package from sync db."""
|
|
name = sanitize_package_name(name)
|
|
|
|
# Try sync database first
|
|
stdout, stderr, code = await _run_command(
|
|
["pacman", "-Si", name], timeout=10
|
|
)
|
|
if code == 0:
|
|
info = _parse_package_info(stdout)
|
|
info["source"] = "pacman"
|
|
info["installed"] = await is_installed(name)
|
|
return info
|
|
|
|
return None
|
|
|
|
|
|
async def get_installed_info(name: str) -> Optional[dict]:
|
|
"""Get info about an installed package."""
|
|
name = sanitize_package_name(name)
|
|
stdout, stderr, code = await _run_command(
|
|
["pacman", "-Qi", name], timeout=10
|
|
)
|
|
if code == 0:
|
|
info = _parse_package_info(stdout)
|
|
info["source"] = "pacman"
|
|
info["installed"] = True
|
|
return info
|
|
return None
|
|
|
|
|
|
async def is_installed(name: str) -> bool:
|
|
"""Check if a package is installed."""
|
|
name = sanitize_package_name(name)
|
|
_, _, code = await _run_command(["pacman", "-Q", name], timeout=5)
|
|
return code == 0
|
|
|
|
|
|
async def list_installed() -> list[dict]:
|
|
"""List all explicitly installed packages."""
|
|
stdout, _, code = await _run_command(
|
|
["pacman", "-Qe"], timeout=15
|
|
)
|
|
if code != 0:
|
|
return []
|
|
|
|
packages = []
|
|
for line in stdout.strip().split("\n"):
|
|
if line.strip():
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
packages.append({
|
|
"name": parts[0],
|
|
"version": parts[1],
|
|
"source": "pacman",
|
|
"installed": True,
|
|
})
|
|
return packages
|
|
|
|
|
|
async def check_updates() -> list[dict]:
|
|
"""Check for available updates using checkupdates."""
|
|
stdout, _, code = await _run_command(
|
|
["checkupdates"], timeout=60
|
|
)
|
|
# checkupdates returns 2 if no updates, 0 if updates available
|
|
if code not in (0,):
|
|
return []
|
|
|
|
updates = []
|
|
for line in stdout.strip().split("\n"):
|
|
if line.strip():
|
|
parts = line.split()
|
|
if len(parts) >= 4:
|
|
updates.append({
|
|
"name": parts[0],
|
|
"current_version": parts[1],
|
|
"new_version": parts[3],
|
|
"source": "pacman",
|
|
})
|
|
return updates
|
|
|
|
|
|
async def install_package(name: str) -> dict:
|
|
"""Install a package using pacman (requires pkexec)."""
|
|
name = sanitize_package_name(name)
|
|
stdout, stderr, code = await _run_command(
|
|
["pkexec", "pacman", "-S", "--noconfirm", name], timeout=300
|
|
)
|
|
return {
|
|
"success": code == 0,
|
|
"message": stdout if code == 0 else stderr,
|
|
"package": name,
|
|
}
|
|
|
|
|
|
async def remove_package(name: str) -> dict:
|
|
"""Remove a package using pacman (requires pkexec)."""
|
|
name = sanitize_package_name(name)
|
|
stdout, stderr, code = await _run_command(
|
|
["pkexec", "pacman", "-R", "--noconfirm", name], timeout=120
|
|
)
|
|
return {
|
|
"success": code == 0,
|
|
"message": stdout if code == 0 else stderr,
|
|
"package": name,
|
|
}
|
|
|
|
|
|
async def get_package_groups() -> list[str]:
|
|
"""Get list of all package groups."""
|
|
stdout, _, code = await _run_command(
|
|
["pacman", "-Sg"], timeout=10
|
|
)
|
|
if code != 0:
|
|
return []
|
|
|
|
groups = set()
|
|
for line in stdout.strip().split("\n"):
|
|
if line.strip():
|
|
groups.add(line.strip().split()[0])
|
|
return sorted(groups)
|
|
|
|
|
|
async def get_group_packages(group: str) -> list[dict]:
|
|
"""Get packages in a specific group."""
|
|
stdout, _, code = await _run_command(
|
|
["pacman", "-Sg", group], timeout=10
|
|
)
|
|
if code != 0:
|
|
return []
|
|
|
|
packages = []
|
|
for line in stdout.strip().split("\n"):
|
|
parts = line.strip().split()
|
|
if len(parts) >= 2:
|
|
is_inst = await is_installed(parts[1])
|
|
packages.append({
|
|
"name": parts[1],
|
|
"source": "pacman",
|
|
"installed": is_inst,
|
|
"group": parts[0],
|
|
})
|
|
return packages
|