first commit

This commit is contained in:
w4nn4d13
2026-04-06 13:37:26 +05:30
commit 065eae9134
52 changed files with 1918 additions and 0 deletions
+5
View File
@@ -0,0 +1,5 @@
"""ExecuTrace package."""
from .core.models import Action, Workflow
__all__ = ["Action", "Workflow"]
+5
View File
@@ -0,0 +1,5 @@
from exectrace.cli import main
if __name__ == "__main__":
raise SystemExit(main())
Binary file not shown.
Binary file not shown.
Binary file not shown.
+341
View File
@@ -0,0 +1,341 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from exectrace.core.replayer import Replayer
from exectrace.recorder.session import RecorderSession
from exectrace.storage.factory import get_storage
from exectrace.storage.json_storage import JsonStorage
from exectrace.storage.xml_storage import XmlStorage
from exectrace.utils.interactive import (
prompt_confirmation,
prompt_file_format,
prompt_storage_location,
)
from exectrace.utils.logger import get_logger
logger = get_logger(__name__)
def build_parser() -> argparse.ArgumentParser:
"""Build the argument parser for the CLI."""
parser = argparse.ArgumentParser(
prog="exectrace",
description="Record and replay developer workflows.",
)
sub = parser.add_subparsers(dest="command")
# Record command
record_parser = sub.add_parser("record", help="Start recording a workflow")
record_parser.add_argument("name", help="Workflow name")
record_parser.add_argument(
"--root",
default=".",
help="Root directory to track for file system changes (default: current dir)",
)
record_parser.add_argument(
"--format",
choices=["json", "xml"],
help="Storage format (json or xml). If not specified, will prompt.",
)
record_parser.add_argument(
"--path",
help="Custom storage path. If not specified, will prompt.",
)
# Stop command
sub.add_parser("stop", help="Stop current recording and save workflow")
# Replay command
replay_parser = sub.add_parser("replay", help="Replay a workflow")
replay_parser.add_argument("name", help="Workflow name")
replay_parser.add_argument("--dry-run", action="store_true", help="Simulate steps without executing")
replay_parser.add_argument("--explain", action="store_true", help="Describe each step before running")
replay_parser.add_argument("--smart", action="store_true", help="Skip actions previously completed")
# List command
list_parser = sub.add_parser("list", help="List saved workflows")
list_parser.add_argument("--json", action="store_true", help="Print list as JSON")
# Edit command
edit_parser = sub.add_parser("edit", help="Edit a saved workflow")
edit_parser.add_argument("name", help="Workflow name to edit")
# Delete command
delete_parser = sub.add_parser("delete", help="Delete a saved workflow")
delete_parser.add_argument("name", help="Workflow name to delete")
delete_parser.add_argument(
"--force",
action="store_true",
help="Delete without confirmation",
)
return parser
def cmd_record(args: argparse.Namespace) -> int:
"""Handle the 'record' command with interactive prompts."""
# Prompt for storage path if not provided
storage_path = args.path if args.path else prompt_storage_location()
# Prompt for file format if not provided
file_format = args.format if args.format else prompt_file_format()
# Create RecorderSession with chosen format and path
recorder = RecorderSession(storage_format=file_format, storage_path=storage_path)
state = recorder.start(name=args.name, root_dir=args.root)
print(f"Recording started for workflow '{args.name}'.")
print(f"Root directory: {state['root_dir']}")
print(f"Storage format: {file_format.upper()}")
print(f"Storage path: {storage_path}")
print("Run your commands, then execute: exectrace stop")
return 0
def cmd_stop(_: argparse.Namespace) -> int:
"""Handle the 'stop' command."""
recorder = RecorderSession()
workflow = recorder.stop()
print(f"Recording stopped. Saved workflow '{workflow.name}' with {len(workflow.actions)} actions.")
return 0
def cmd_replay(args: argparse.Namespace) -> int:
"""Handle the 'replay' command."""
# Try both storage backends to find the workflow
storage = None
workflow = None
# Try JSON first
try:
storage = JsonStorage()
workflow = storage.load_workflow(args.name)
except FileNotFoundError:
# Try XML
try:
storage = XmlStorage()
workflow = storage.load_workflow(args.name)
except FileNotFoundError as exc:
raise FileNotFoundError(f"Workflow '{args.name}' not found in JSON or XML format") from exc
replayer = Replayer(storage=storage)
total = replayer.replay(workflow, dry_run=args.dry_run, explain=args.explain, smart=args.smart)
print(f"Replay complete. Processed {total} action(s).")
return 0
def cmd_list(args: argparse.Namespace) -> int:
"""Handle the 'list' command."""
# Collect workflows from both storage formats
json_storage = JsonStorage()
xml_storage = XmlStorage()
json_workflows = json_storage.list_workflows()
xml_workflows = xml_storage.list_workflows()
# Combine and deduplicate
all_workflows = sorted(set(json_workflows + xml_workflows))
if args.json:
print(json.dumps(all_workflows, indent=2))
else:
if not all_workflows:
print("No workflows found.")
return 0
print("Available workflows:")
for workflow_name in all_workflows:
# Check which formats are available
is_json = workflow_name in json_workflows
is_xml = workflow_name in xml_workflows
formats = []
if is_json:
formats.append("json")
if is_xml:
formats.append("xml")
format_str = f" [{', '.join(formats)}]" if formats else ""
print(f" - {workflow_name}{format_str}")
return 0
def cmd_edit(args: argparse.Namespace) -> int:
"""Handle the 'edit' command."""
# Try to find the workflow in either format
storage = None
workflow = None
try:
storage = JsonStorage()
workflow = storage.load_workflow(args.name)
current_format = "json"
except FileNotFoundError:
try:
storage = XmlStorage()
workflow = storage.load_workflow(args.name)
current_format = "xml"
except FileNotFoundError as exc:
raise FileNotFoundError(f"Workflow '{args.name}' not found") from exc
# Display workflow info
print(f"\nWorkflow: {workflow.name}")
print(f"Format: {current_format}")
print(f"Created: {workflow.created_at}")
print(f"Actions: {len(workflow.actions)}")
print("\nActions:")
for idx, action in enumerate(workflow.actions, 1):
print(f" {idx}. {action.action_type} @ {action.timestamp}")
# Interactive editing menu
while True:
print("\nEdit options:")
print(" 1) Add action")
print(" 2) Remove action")
print(" 3) View action details")
print(" 4) Save and exit")
print(" 5) Exit without saving")
choice = input("Choose option (1-5): ").strip()
if choice == "1":
action_type = input("Enter action type (command, file_create, file_modify, file_delete): ").strip()
if action_type not in ("command", "file_create", "file_modify", "file_delete"):
print("Invalid action type.")
continue
payload_input = input("Enter payload as JSON: ").strip()
try:
payload = json.loads(payload_input)
except json.JSONDecodeError:
print("Invalid JSON format.")
continue
workflow.add_action(action_type, payload)
print(f"Action added. Total actions: {len(workflow.actions)}")
elif choice == "2":
try:
idx = int(input("Enter action number to remove: ").strip())
if 1 <= idx <= len(workflow.actions):
removed = workflow.actions.pop(idx - 1)
print(f"Removed: {removed.action_type}")
else:
print("Invalid action number.")
except ValueError:
print("Please enter a valid number.")
elif choice == "3":
try:
idx = int(input("Enter action number to view: ").strip())
if 1 <= idx <= len(workflow.actions):
action = workflow.actions[idx - 1]
print(f"\nAction {idx}:")
print(f" Type: {action.action_type}")
print(f" Timestamp: {action.timestamp}")
print(f" Payload: {json.dumps(action.payload, indent=2)}")
else:
print("Invalid action number.")
except ValueError:
print("Please enter a valid number.")
elif choice == "4":
storage.save_workflow(workflow)
print(f"Workflow saved as '{workflow.name}' in {current_format} format.")
return 0
elif choice == "5":
print("Exiting without saving.")
return 0
else:
print("Invalid choice.")
def cmd_delete(args: argparse.Namespace) -> int:
"""Handle the 'delete' command."""
# Try to find the workflow in either format
found_formats = []
paths_to_delete = []
try:
storage = JsonStorage()
path = storage.workflow_path(args.name)
if path.exists():
found_formats.append("json")
paths_to_delete.append(path)
except Exception:
pass
try:
storage = XmlStorage()
path = storage.workflow_path(args.name)
if path.exists():
found_formats.append("xml")
paths_to_delete.append(path)
except Exception:
pass
if not found_formats:
print(f"Workflow '{args.name}' not found in any format.")
return 1
# Confirm deletion unless --force is used
if not args.force:
format_str = ", ".join(found_formats)
if not prompt_confirmation(f"Delete workflow '{args.name}' ({format_str})?"):
print("Deletion cancelled.")
return 0
# Delete the files
for path in paths_to_delete:
try:
path.unlink()
print(f"Deleted: {path}")
except Exception as exc:
print(f"Error deleting {path}: {exc}", file=sys.stderr)
return 2
print(f"Workflow '{args.name}' deleted successfully.")
return 0
def main(argv: list[str] | None = None) -> int:
"""Main entry point for the CLI."""
parser = build_parser()
args = parser.parse_args(argv)
try:
if args.command == "record":
return cmd_record(args)
if args.command == "stop":
return cmd_stop(args)
if args.command == "replay":
return cmd_replay(args)
if args.command == "list":
return cmd_list(args)
if args.command == "edit":
return cmd_edit(args)
if args.command == "delete":
return cmd_delete(args)
parser.print_help()
return 1
except FileNotFoundError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 2
except RuntimeError as exc:
print(f"Execution error: {exc}", file=sys.stderr)
return 3
except KeyboardInterrupt:
print("\nOperation cancelled by user.", file=sys.stderr)
return 130
except Exception as exc: # Basic fallback handling for starter project robustness.
print(f"Unexpected error: {exc}", file=sys.stderr)
logger.exception("Unexpected error")
return 99
if __name__ == "__main__":
raise SystemExit(main())
+1
View File
@@ -0,0 +1 @@
"""Core models and replay logic for ExecuTrace."""
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+26
View File
@@ -0,0 +1,26 @@
from __future__ import annotations
from typing import Union
from exectrace.core.models import Workflow
from exectrace.storage.factory import get_storage
from exectrace.storage.json_storage import JsonStorage
from exectrace.storage.xml_storage import XmlStorage
class WorkflowEditor:
"""Load, modify, and save workflows programmatically."""
def __init__(self, storage: Union[JsonStorage, XmlStorage] | None = None) -> None:
self.storage = storage or get_storage("json")
def load(self, name: str) -> Workflow:
return self.storage.load_workflow(name)
def save(self, workflow: Workflow) -> None:
self.storage.save_workflow(workflow)
def remove_action(self, workflow: Workflow, index: int) -> None:
if index < 0 or index >= len(workflow.actions):
raise IndexError("Action index out of range")
workflow.actions.pop(index)
+60
View File
@@ -0,0 +1,60 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List
from exectrace.utils.time_utils import utc_now_iso
@dataclass
class Action:
action_type: str
timestamp: str
payload: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"action_type": self.action_type,
"timestamp": self.timestamp,
"payload": self.payload,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Action":
return cls(
action_type=data["action_type"],
timestamp=data["timestamp"],
payload=data.get("payload", {}),
)
@dataclass
class Workflow:
name: str
version: str = "1.0"
created_at: str = field(default_factory=utc_now_iso)
updated_at: str = field(default_factory=utc_now_iso)
actions: List[Action] = field(default_factory=list)
def add_action(self, action_type: str, payload: Dict[str, Any]) -> None:
self.actions.append(Action(action_type=action_type, timestamp=utc_now_iso(), payload=payload))
self.updated_at = utc_now_iso()
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"version": self.version,
"created_at": self.created_at,
"updated_at": self.updated_at,
"actions": [action.to_dict() for action in self.actions],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Workflow":
return cls(
name=data["name"],
version=data.get("version", "1.0"),
created_at=data.get("created_at", utc_now_iso()),
updated_at=data.get("updated_at", utc_now_iso()),
actions=[Action.from_dict(item) for item in data.get("actions", [])],
)
+103
View File
@@ -0,0 +1,103 @@
from __future__ import annotations
import base64
import hashlib
import os
import shutil
import subprocess
from pathlib import Path
from exectrace.core.models import Action, Workflow
from exectrace.storage.json_storage import JsonStorage
from exectrace.utils.logger import get_logger
logger = get_logger(__name__)
class Replayer:
def __init__(self, storage: JsonStorage | None = None) -> None:
self.storage = storage or JsonStorage()
def _signature(self, action: Action) -> str:
serialized = f"{action.action_type}|{action.payload}"
return hashlib.sha256(serialized.encode("utf-8")).hexdigest()
def replay(
self,
workflow: Workflow,
dry_run: bool = False,
explain: bool = False,
smart: bool = False,
) -> int:
completed = self.storage.load_replay_state(workflow.name) if smart else set()
newly_completed = set(completed)
for idx, action in enumerate(workflow.actions, start=1):
signature = self._signature(action)
if smart and signature in completed:
print(f"[{idx}] SKIP (smart): {action.action_type}")
continue
if explain:
print(self._explain_action(idx, action))
if dry_run:
print(f"[{idx}] DRY-RUN: {action.action_type}")
newly_completed.add(signature)
continue
self._execute_action(idx, action)
newly_completed.add(signature)
if smart:
self.storage.save_replay_state(workflow.name, newly_completed)
return len(workflow.actions)
def _explain_action(self, idx: int, action: Action) -> str:
if action.action_type == "command":
cmd = action.payload.get("command", "")
cwd = action.payload.get("cwd", ".")
return f"[{idx}] Execute command in {cwd}: {cmd}"
path = action.payload.get("path", "")
if action.action_type == "file_create":
return f"[{idx}] Create file: {path}"
if action.action_type == "file_modify":
return f"[{idx}] Modify file: {path}"
if action.action_type == "file_delete":
return f"[{idx}] Delete file: {path}"
return f"[{idx}] Unknown action: {action.action_type}"
def _execute_action(self, idx: int, action: Action) -> None:
if action.action_type == "command":
cmd = str(action.payload.get("command", "")).strip()
cwd = str(action.payload.get("cwd", "."))
if not cmd:
logger.warning("[%d] Empty command skipped", idx)
return
print(f"[{idx}] RUN: {cmd}")
result = subprocess.run(cmd, cwd=cwd, shell=True, check=False)
if result.returncode != 0:
raise RuntimeError(f"Command failed with code {result.returncode}: {cmd}")
return
if action.action_type in {"file_create", "file_modify"}:
path = Path(str(action.payload["path"]))
path.parent.mkdir(parents=True, exist_ok=True)
content_b64 = str(action.payload.get("content_b64", ""))
raw = base64.b64decode(content_b64.encode("ascii"))
path.write_bytes(raw)
print(f"[{idx}] WRITE: {path}")
return
if action.action_type == "file_delete":
path = Path(str(action.payload["path"]))
if path.is_dir():
shutil.rmtree(path, ignore_errors=True)
elif path.exists():
os.remove(path)
print(f"[{idx}] DELETE: {path}")
return
logger.warning("[%d] Unknown action type: %s", idx, action.action_type)
+1
View File
@@ -0,0 +1 @@
"""Recording modules for ExecuTrace."""
+35
View File
@@ -0,0 +1,35 @@
from __future__ import annotations
from pathlib import Path
from typing import List, Tuple
from exectrace.utils.sensitive_filter import redact_text
def detect_history_file() -> Path | None:
candidates = [Path.home() / ".bash_history", Path.home() / ".zsh_history"]
for path in candidates:
if path.exists() and path.is_file():
return path
return None
def history_line_count(history_file: Path) -> int:
with open(history_file, "r", encoding="utf-8", errors="ignore") as f:
return sum(1 for _ in f)
def capture_commands_since(history_file: Path, start_line: int) -> Tuple[List[str], int]:
with open(history_file, "r", encoding="utf-8", errors="ignore") as f:
lines = [line.rstrip("\n") for line in f]
new_lines = lines[start_line:]
commands = []
for line in new_lines:
cmd = line
# zsh can store lines as ": <epoch>:<duration>;<command>"
if line.startswith(": ") and ";" in line:
cmd = line.split(";", 1)[1]
commands.append(redact_text(cmd))
return commands, len(lines)
+91
View File
@@ -0,0 +1,91 @@
from __future__ import annotations
import base64
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple
from exectrace.utils.hash_utils import sha256_bytes
@dataclass
class FileSnapshotEntry:
sha256: str
size: int
def snapshot_directory(root_dir: Path) -> Dict[str, FileSnapshotEntry]:
"""Capture a file snapshot for deterministic change detection."""
snapshot: Dict[str, FileSnapshotEntry] = {}
for path in root_dir.rglob("*"):
if path.is_file() and ".git" not in path.parts:
rel = str(path.relative_to(root_dir))
content = path.read_bytes()
snapshot[rel] = FileSnapshotEntry(sha256=sha256_bytes(content), size=len(content))
return snapshot
def encode_file_content(path: Path) -> Tuple[str, bool]:
content = path.read_bytes()
try:
content.decode("utf-8")
is_binary = False
except UnicodeDecodeError:
is_binary = True
return base64.b64encode(content).decode("ascii"), is_binary
def diff_snapshots(
root_dir: Path,
before: Dict[str, FileSnapshotEntry],
after: Dict[str, FileSnapshotEntry],
) -> List[dict]:
actions: List[dict] = []
before_paths = set(before.keys())
after_paths = set(after.keys())
created = sorted(after_paths - before_paths)
deleted = sorted(before_paths - after_paths)
possibly_modified = sorted(before_paths & after_paths)
for rel_path in created:
file_path = root_dir / rel_path
encoded, is_binary = encode_file_content(file_path)
actions.append(
{
"action_type": "file_create",
"payload": {
"path": rel_path,
"content_b64": encoded,
"is_binary": is_binary,
},
}
)
for rel_path in possibly_modified:
if before[rel_path].sha256 != after[rel_path].sha256:
file_path = root_dir / rel_path
encoded, is_binary = encode_file_content(file_path)
actions.append(
{
"action_type": "file_modify",
"payload": {
"path": rel_path,
"content_b64": encoded,
"is_binary": is_binary,
},
}
)
for rel_path in deleted:
actions.append(
{
"action_type": "file_delete",
"payload": {
"path": rel_path,
},
}
)
return actions
+80
View File
@@ -0,0 +1,80 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Literal, Union
from exectrace.core.models import Workflow
from exectrace.recorder.command_capture import (
capture_commands_since,
detect_history_file,
history_line_count,
)
from exectrace.recorder.fs_tracker import FileSnapshotEntry, diff_snapshots, snapshot_directory
from exectrace.storage.factory import get_storage
from exectrace.storage.json_storage import JsonStorage
from exectrace.storage.xml_storage import XmlStorage
from exectrace.utils.logger import get_logger
from exectrace.utils.time_utils import utc_now_iso
logger = get_logger(__name__)
class RecorderSession:
def __init__(
self,
storage: Union[JsonStorage, XmlStorage] | None = None,
storage_format: Literal["json", "xml"] = "json",
storage_path: str | None = None,
) -> None:
if storage:
self.storage = storage
else:
self.storage = get_storage(storage_format, storage_path)
def start(self, name: str, root_dir: str | None = None) -> Dict[str, Any]:
root = Path(root_dir or ".").resolve()
history_file = detect_history_file()
history_start_line = history_line_count(history_file) if history_file else 0
snapshot = snapshot_directory(root)
state = {
"name": name,
"started_at": utc_now_iso(),
"root_dir": str(root),
"history_file": str(history_file) if history_file else None,
"history_start_line": history_start_line,
"snapshot": {k: {"sha256": v.sha256, "size": v.size} for k, v in snapshot.items()},
}
self.storage.save_active_recording(state)
logger.info("Recording started: %s", name)
return state
def stop(self) -> Workflow:
state = self.storage.load_active_recording()
workflow = Workflow(name=str(state["name"]))
workflow.created_at = str(state["started_at"])
root_dir = Path(str(state["root_dir"]))
before_snapshot = {
path: FileSnapshotEntry(sha256=entry["sha256"], size=entry["size"])
for path, entry in dict(state["snapshot"]).items()
}
after_snapshot = snapshot_directory(root_dir)
history_file_value = state.get("history_file")
if history_file_value:
commands, _ = capture_commands_since(Path(str(history_file_value)), int(state["history_start_line"]))
for command in commands:
if command.strip():
workflow.add_action("command", {"command": command, "cwd": str(root_dir)})
file_actions = diff_snapshots(root_dir, before_snapshot, after_snapshot)
for item in file_actions:
workflow.add_action(item["action_type"], item["payload"])
self.storage.save_workflow(workflow)
self.storage.clear_active_recording()
logger.info("Recording stopped: %s (%d actions)", workflow.name, len(workflow.actions))
return workflow
+1
View File
@@ -0,0 +1 @@
"""Storage backends for ExecuTrace."""
+18
View File
@@ -0,0 +1,18 @@
"""Storage factory for selecting appropriate backend."""
from __future__ import annotations
from typing import Literal, Union
from exectrace.storage.json_storage import JsonStorage
from exectrace.storage.xml_storage import XmlStorage
StorageBackend = Union[JsonStorage, XmlStorage]
def get_storage(format_type: Literal["json", "xml"] = "json", base_dir: str | None = None) -> StorageBackend:
"""Get storage backend by format type."""
if format_type == "xml":
return XmlStorage(base_dir)
else: # default to json
return JsonStorage(base_dir)
+74
View File
@@ -0,0 +1,74 @@
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Dict, List, Set
from exectrace.core.models import Workflow
class JsonStorage:
def __init__(self, base_dir: str | None = None) -> None:
home = os.environ.get("EXECTRACE_HOME") or str(Path.home() / ".exectrace")
self.base_dir = Path(base_dir or home)
self.workflows_dir = self.base_dir / "workflows"
self.state_dir = self.base_dir / "state"
self.workflows_dir.mkdir(parents=True, exist_ok=True)
self.state_dir.mkdir(parents=True, exist_ok=True)
def workflow_path(self, name: str) -> Path:
return self.workflows_dir / f"{name}.json"
def save_workflow(self, workflow: Workflow) -> Path:
path = self.workflow_path(workflow.name)
with open(path, "w", encoding="utf-8") as f:
json.dump(workflow.to_dict(), f, indent=2)
return path
def load_workflow(self, name: str) -> Workflow:
path = self.workflow_path(name)
if not path.exists():
raise FileNotFoundError(f"Workflow '{name}' was not found")
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return Workflow.from_dict(data)
def list_workflows(self) -> List[str]:
return sorted(file.stem for file in self.workflows_dir.glob("*.json"))
def active_recording_path(self) -> Path:
return self.state_dir / "active_recording.json"
def save_active_recording(self, data: Dict[str, object]) -> None:
with open(self.active_recording_path(), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def load_active_recording(self) -> Dict[str, object]:
path = self.active_recording_path()
if not path.exists():
raise FileNotFoundError("No active recording found. Start one with 'exectrace record <name>'.")
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def clear_active_recording(self) -> None:
path = self.active_recording_path()
if path.exists():
path.unlink()
def replay_state_path(self, workflow_name: str) -> Path:
return self.state_dir / f"replay_state_{workflow_name}.json"
def load_replay_state(self, workflow_name: str) -> Set[str]:
path = self.replay_state_path(workflow_name)
if not path.exists():
return set()
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return set(data.get("completed_signatures", []))
def save_replay_state(self, workflow_name: str, signatures: Set[str]) -> None:
path = self.replay_state_path(workflow_name)
data = {"completed_signatures": sorted(signatures)}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
+144
View File
@@ -0,0 +1,144 @@
"""XML-based workflow storage backend for ExecuTrace."""
from __future__ import annotations
import os
from pathlib import Path
from typing import Dict, List, Set
from xml.etree import ElementTree as ET
from exectrace.core.models import Action, Workflow
class XmlStorage:
"""Store and retrieve workflows in XML format."""
def __init__(self, base_dir: str | None = None) -> None:
home = os.environ.get("EXECTRACE_HOME") or str(Path.home() / ".exectrace")
self.base_dir = Path(base_dir or home)
self.workflows_dir = self.base_dir / "workflows"
self.state_dir = self.base_dir / "state"
self.workflows_dir.mkdir(parents=True, exist_ok=True)
self.state_dir.mkdir(parents=True, exist_ok=True)
def workflow_path(self, name: str) -> Path:
"""Get the file path for a workflow."""
return self.workflows_dir / f"{name}.xml"
def save_workflow(self, workflow: Workflow) -> Path:
"""Save a workflow to XML file."""
path = self.workflow_path(workflow.name)
# Create root element
root = ET.Element("workflow")
root.set("name", workflow.name)
root.set("version", workflow.version)
root.set("created_at", workflow.created_at)
root.set("updated_at", workflow.updated_at)
# Add actions
actions_elem = ET.SubElement(root, "actions")
for action in workflow.actions:
action_elem = ET.SubElement(actions_elem, "action")
action_elem.set("type", action.action_type)
action_elem.set("timestamp", action.timestamp)
# Add payload as child elements
for key, value in action.payload.items():
payload_elem = ET.SubElement(action_elem, "payload")
payload_elem.set("key", key)
payload_elem.text = str(value)
# Write to file with pretty printing
tree = ET.ElementTree(root)
ET.indent(tree, space=" ")
with open(path, "wb") as f:
tree.write(f, encoding="utf-8", xml_declaration=True)
return path
def load_workflow(self, name: str) -> Workflow:
"""Load a workflow from XML file."""
path = self.workflow_path(name)
if not path.exists():
raise FileNotFoundError(f"Workflow '{name}' was not found")
tree = ET.parse(path)
root = tree.getroot()
workflow = Workflow(
name=root.get("name", name),
version=root.get("version", "1.0"),
created_at=root.get("created_at"),
updated_at=root.get("updated_at"),
)
# Load actions
actions_elem = root.find("actions")
if actions_elem is not None:
for action_elem in actions_elem.findall("action"):
action_type = action_elem.get("type", "")
timestamp = action_elem.get("timestamp", "")
# Load payload
payload: Dict[str, object] = {}
for payload_elem in action_elem.findall("payload"):
key = payload_elem.get("key", "")
value = payload_elem.text or ""
payload[key] = value
action = Action(action_type=action_type, timestamp=timestamp, payload=payload)
workflow.actions.append(action)
return workflow
def list_workflows(self) -> List[str]:
"""List all available workflow names (XML format)."""
return sorted(file.stem for file in self.workflows_dir.glob("*.xml"))
def active_recording_path(self) -> Path:
"""Get path for active recording state file."""
return self.state_dir / "active_recording.json"
def save_active_recording(self, data: Dict[str, object]) -> None:
"""Save active recording state (uses JSON for simplicity)."""
import json
with open(self.active_recording_path(), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def load_active_recording(self) -> Dict[str, object]:
"""Load active recording state (uses JSON for simplicity)."""
import json
path = self.active_recording_path()
if not path.exists():
raise FileNotFoundError("No active recording found. Start one with 'exectrace record <name>'.")
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def clear_active_recording(self) -> None:
"""Clear active recording state."""
path = self.active_recording_path()
if path.exists():
path.unlink()
def replay_state_path(self, workflow_name: str) -> Path:
"""Get path for replay state file."""
return self.state_dir / f"replay_state_{workflow_name}.json"
def load_replay_state(self, workflow_name: str) -> Set[str]:
"""Load replay state for smart replays (uses JSON for simplicity)."""
import json
path = self.replay_state_path(workflow_name)
if not path.exists():
return set()
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return set(data.get("completed_signatures", []))
def save_replay_state(self, workflow_name: str, signatures: Set[str]) -> None:
"""Save replay state for smart replays (uses JSON for simplicity)."""
import json
path = self.replay_state_path(workflow_name)
data = {"completed_signatures": sorted(signatures)}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
+1
View File
@@ -0,0 +1 @@
"""Utility helpers for ExecuTrace."""
Binary file not shown.
Binary file not shown.
+23
View File
@@ -0,0 +1,23 @@
from __future__ import annotations
import hashlib
BLOCK_SIZE = 1024 * 64
def sha256_bytes(data: bytes) -> str:
digest = hashlib.sha256()
digest.update(data)
return digest.hexdigest()
def sha256_file(path: str) -> str:
digest = hashlib.sha256()
with open(path, "rb") as f:
while True:
block = f.read(BLOCK_SIZE)
if not block:
break
digest.update(block)
return digest.hexdigest()
+69
View File
@@ -0,0 +1,69 @@
"""Interactive CLI utilities for user input and selection."""
from __future__ import annotations
import os
from pathlib import Path
from typing import Literal
def prompt_storage_location(default: str | None = None) -> str:
"""Prompt user for storage location with optional default."""
if default is None:
default = str(Path.home() / ".exectrace" / "workflows")
prompt_text = f"Enter storage path (press Enter for default: {default}): "
user_input = input(prompt_text).strip()
if not user_input:
return default
# Expand home directory
path = Path(user_input).expanduser()
# Create directory if it doesn't exist
path.mkdir(parents=True, exist_ok=True)
return str(path)
def prompt_file_format() -> Literal["json", "xml"]:
"""Prompt user to choose file format (JSON or XML)."""
while True:
print("Choose file format:")
print(" 1) JSON (default)")
print(" 2) XML")
choice = input("Enter choice (1 or 2, default: 1): ").strip().lower()
if not choice or choice == "1":
return "json"
elif choice == "2":
return "xml"
else:
print("Invalid choice. Please enter 1 or 2.")
def prompt_confirmation(message: str) -> bool:
"""Prompt user for yes/no confirmation."""
while True:
response = input(f"{message} (y/n): ").strip().lower()
if response in ("y", "yes"):
return True
elif response in ("n", "no"):
return False
else:
print("Please enter 'y' or 'n'.")
def list_directories(base_path: str | None = None) -> list[str]:
"""List subdirectories in a path for selection (future feature)."""
if base_path is None:
base_path = str(Path.home())
try:
base = Path(base_path)
dirs = sorted([d.name for d in base.iterdir() if d.is_dir()])
return dirs
except (OSError, PermissionError):
return []
+14
View File
@@ -0,0 +1,14 @@
from __future__ import annotations
import logging
def get_logger(name: str = "exectrace", level: int = logging.INFO) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(level)
return logger
+21
View File
@@ -0,0 +1,21 @@
from __future__ import annotations
import re
from typing import Iterable
DEFAULT_PATTERNS = [
re.compile(r"(?i)(api[_-]?key\s*[=:]\s*)([^\s]+)"),
re.compile(r"(?i)(token\s*[=:]\s*)([^\s]+)"),
re.compile(r"(?i)(password\s*[=:]\s*)([^\s]+)"),
re.compile(r"(?i)(secret\s*[=:]\s*)([^\s]+)"),
re.compile(r"(?i)(Authorization:\s*Bearer\s+)([^\s]+)"),
]
def redact_text(text: str, patterns: Iterable[re.Pattern] | None = None) -> str:
"""Redact common secrets from captured command text."""
active_patterns = list(patterns) if patterns is not None else DEFAULT_PATTERNS
result = text
for pattern in active_patterns:
result = pattern.sub(r"\1<REDACTED>", result)
return result
+11
View File
@@ -0,0 +1,11 @@
from __future__ import annotations
from datetime import datetime, timezone
ISO_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
def utc_now_iso() -> str:
"""Return a UTC timestamp in ISO-8601 format."""
return datetime.now(tz=timezone.utc).strftime(ISO_FORMAT)