Files

247 lines
7.1 KiB
Python

"""
Pacman service for ArchStore.
Handles all interactions with the pacman package manager via subprocess.
"""
import asyncio
import re
from typing import Optional
from utils.sanitize import sanitize_package_name
async def _run_command(cmd: list[str], timeout: int = 30) -> tuple[str, str, int]:
"""
Run a shell command safely using argument list (no shell=True).
Returns (stdout, stderr, returncode).
"""
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
return (
stdout.decode("utf-8", errors="replace"),
stderr.decode("utf-8", errors="replace"),
proc.returncode,
)
except asyncio.TimeoutError:
proc.kill()
return "", "Command timed out", -1
except Exception as e:
return "", str(e), -1
def _parse_search_results(output: str) -> list[dict]:
"""Parse pacman -Ss output into structured results."""
packages = []
lines = output.strip().split("\n")
i = 0
while i < len(lines):
line = lines[i]
# Match: repo/name version [installed] or repo/name version
match = re.match(
r'^(\S+)/(\S+)\s+(\S+)(?:\s+\[installed(?::?\s*(\S+))?\])?\s*$',
line
)
if match:
repo = match.group(1)
name = match.group(2)
version = match.group(3)
installed = match.group(4) is not None or "[installed" in line
description = ""
if i + 1 < len(lines) and lines[i + 1].startswith(" "):
description = lines[i + 1].strip()
i += 1
packages.append({
"name": name,
"version": version,
"description": description,
"repository": repo,
"source": "pacman",
"installed": installed,
})
i += 1
return packages
def _parse_package_info(output: str) -> dict:
"""Parse pacman -Si or -Qi output into a dict."""
info = {}
current_key = None
current_value = []
for line in output.split("\n"):
if ":" in line and not line.startswith(" "):
if current_key:
info[current_key] = " ".join(current_value).strip()
parts = line.split(":", 1)
current_key = parts[0].strip().lower().replace(" ", "_")
current_value = [parts[1].strip()] if len(parts) > 1 else []
elif line.startswith(" ") and current_key:
current_value.append(line.strip())
if current_key:
info[current_key] = " ".join(current_value).strip()
return info
async def search_packages(query: str) -> list[dict]:
"""Search pacman repositories."""
stdout, stderr, code = await _run_command(
["pacman", "-Ss", query], timeout=15
)
if code != 0:
return []
return _parse_search_results(stdout)
async def get_package_info(name: str) -> Optional[dict]:
"""Get detailed info about a package from sync db."""
name = sanitize_package_name(name)
# Try sync database first
stdout, stderr, code = await _run_command(
["pacman", "-Si", name], timeout=10
)
if code == 0:
info = _parse_package_info(stdout)
info["source"] = "pacman"
info["installed"] = await is_installed(name)
return info
return None
async def get_installed_info(name: str) -> Optional[dict]:
"""Get info about an installed package."""
name = sanitize_package_name(name)
stdout, stderr, code = await _run_command(
["pacman", "-Qi", name], timeout=10
)
if code == 0:
info = _parse_package_info(stdout)
info["source"] = "pacman"
info["installed"] = True
return info
return None
async def is_installed(name: str) -> bool:
"""Check if a package is installed."""
name = sanitize_package_name(name)
_, _, code = await _run_command(["pacman", "-Q", name], timeout=5)
return code == 0
async def list_installed() -> list[dict]:
"""List all explicitly installed packages."""
stdout, _, code = await _run_command(
["pacman", "-Qe"], timeout=15
)
if code != 0:
return []
packages = []
for line in stdout.strip().split("\n"):
if line.strip():
parts = line.split()
if len(parts) >= 2:
packages.append({
"name": parts[0],
"version": parts[1],
"source": "pacman",
"installed": True,
})
return packages
async def check_updates() -> list[dict]:
"""Check for available updates using checkupdates."""
stdout, _, code = await _run_command(
["checkupdates"], timeout=60
)
# checkupdates returns 2 if no updates, 0 if updates available
if code not in (0,):
return []
updates = []
for line in stdout.strip().split("\n"):
if line.strip():
parts = line.split()
if len(parts) >= 4:
updates.append({
"name": parts[0],
"current_version": parts[1],
"new_version": parts[3],
"source": "pacman",
})
return updates
async def install_package(name: str) -> dict:
"""Install a package using pacman (requires pkexec)."""
name = sanitize_package_name(name)
stdout, stderr, code = await _run_command(
["pkexec", "pacman", "-S", "--noconfirm", name], timeout=300
)
return {
"success": code == 0,
"message": stdout if code == 0 else stderr,
"package": name,
}
async def remove_package(name: str) -> dict:
"""Remove a package using pacman (requires pkexec)."""
name = sanitize_package_name(name)
stdout, stderr, code = await _run_command(
["pkexec", "pacman", "-R", "--noconfirm", name], timeout=120
)
return {
"success": code == 0,
"message": stdout if code == 0 else stderr,
"package": name,
}
async def get_package_groups() -> list[str]:
"""Get list of all package groups."""
stdout, _, code = await _run_command(
["pacman", "-Sg"], timeout=10
)
if code != 0:
return []
groups = set()
for line in stdout.strip().split("\n"):
if line.strip():
groups.add(line.strip().split()[0])
return sorted(groups)
async def get_group_packages(group: str) -> list[dict]:
"""Get packages in a specific group."""
stdout, _, code = await _run_command(
["pacman", "-Sg", group], timeout=10
)
if code != 0:
return []
packages = []
for line in stdout.strip().split("\n"):
parts = line.strip().split()
if len(parts) >= 2:
is_inst = await is_installed(parts[1])
packages.append({
"name": parts[1],
"source": "pacman",
"installed": is_inst,
"group": parts[0],
})
return packages