class AxiumDaemon:
"""
Main daemon class managing state and IPC.
The daemon runs an asyncio event loop with a UNIX socket server
accepting JSON IPC requests from the CLI.
Attributes:
state: Runtime state dict with keys:
Persistent (saved to state.json):
- active_env: Current environment name (str | None)
- panes: Per-pane environment mapping (dict)
Runtime-only:
- started: ISO 8601 timestamp of daemon start (str)
- tmux_pane: TMUX_PANE when daemon started (str | None)
- hud_cache: Pre-rendered HUD strings (dict)
_start_time: Unix timestamp of daemon start (float, for uptime)
server: asyncio.Server instance for UNIX socket
_stop: asyncio.Event for graceful shutdown
Example:
```python
daemon = AxiumDaemon()
await daemon.run()
```
"""
def __init__(self):
"""
Initialize daemon with default state.
Loads persistent state from state.json if it exists, otherwise
initializes with defaults. Creates empty event registry.
"""
# Runtime-only state (not persisted)
self.state = {
"active_env": None,
"panes": {}, # Per-pane environment mapping: {"%1": "root", "%2": "builder"}
"hud_cache": {}, # Pre-rendered HUD strings: {"%1": "[axium] pane:%1 ..."}
"tmux_pane": os.getenv("TMUX_PANE"),
"started": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
self._start_time = time.time() # Unix timestamp for fast uptime calculations
self.permissions = (
{}
) # Effective permissions per spoke: {spoke_name: SpokePermissions}
self.notification_queue = (
[]
) # Queued notifications: [{"title": ..., "body": ..., "spoke": ...}]
self.hud_config = {} # HUD layout configuration from hud.yaml
self.server = None
self._stop = asyncio.Event()
# Initialize EventBus for spoke coordination
from axium.core.spokes import get_event_bus
self.bus = get_event_bus()
# Set up completion cache event listeners
self.bus.on("spoke_loaded", self._on_spoke_loaded)
self.bus.on("spoke_reloaded", self._on_spoke_reloaded)
self.bus.on("spoke_unloaded", self._on_spoke_unloaded)
self.bus.on("gear_loaded", self._on_gear_loaded)
self.bus.on("gear_unloaded", self._on_gear_unloaded)
self.bus.on("daemon_reload", self._on_daemon_reload)
self._load_state()
self._load_hud_config() # Load HUD layout from hud.yaml
self._write_state_cache()
self._refresh_all_hud_caches() # Pre-render HUD for all panes
# Load gears for daemon IPC operations
# Note: Gears are also loaded in CLI for command registration
# This loads them in daemon context for IPC permission enforcement
self._load_gears_for_daemon()
# Generate initial completion cache
self._regenerate_completions("daemon_init")
def _load_state(self) -> None:
"""
Load persistent state from state.json.
Reads active_env and panes mapping from disk if state.json exists.
Other state (started, tmux_pane) is runtime-only and not loaded.
Side Effects:
Updates self.state["active_env"] and self.state["panes"] if file exists and is valid.
Note:
Logs warning if state.json is malformed but continues with defaults.
"""
if STATE_PATH.exists():
try:
data = json.loads(STATE_PATH.read_text())
self.state["active_env"] = data.get("active_env")
self.state["panes"] = data.get("panes", {})
logger.info("Loaded state from %s", STATE_PATH)
except Exception as e:
logger.warning("Failed to load state: %s", e)
def _load_hud_config(self) -> None:
"""
Load HUD layout configuration from hud.yaml.
Reads HUD layout and style configuration from ~/.config/axium/hud.yaml.
Falls back to hardcoded defaults if file doesn't exist or is invalid.
Side Effects:
Updates self.hud_config with layout and style settings
Note:
- layout: List of segment names to display (e.g., ["env", "uptime"])
- style: Dict with color and padding settings
- Logs warning if hud.yaml is malformed but continues with defaults
"""
import yaml
hud_path = CONF_DIR / "hud.yaml"
# Default configuration
default_config = {
"layout": ["env", "uptime"],
"style": {"color": "#00B7C7", "padding": 1},
}
if not hud_path.exists():
self.hud_config = default_config
logger.debug("Using default HUD config (hud.yaml not found)")
return
try:
data = yaml.safe_load(hud_path.read_text())
self.hud_config = data if data else default_config
logger.info("Loaded HUD config from %s", hud_path)
except Exception as e:
logger.warning("Failed to load hud.yaml: %s (using defaults)", e)
self.hud_config = default_config
def _load_gears_for_daemon(self) -> None:
"""
Load gears for daemon IPC operations.
Loads gears to populate self.permissions dict for IPC permission enforcement.
Gears are also loaded in CLI context for command registration.
This is called during daemon initialization.
"""
from . import gears
discovered = gears.discover_gears()
if not discovered:
logger.debug("No gears found")
return
# Gears need app and events, but in daemon context we don't have the CLI app
# We only need to load permissions for IPC operations
# Commands are registered when gears load in CLI context
for gear_name in discovered:
try:
# Load just the permissions without calling register()
gear_path = gears.GEARS_DIR / gear_name
gear_yaml = gear_path / "gear.yaml"
if not gear_yaml.exists():
logger.warning("Gear %s missing gear.yaml", gear_name)
continue
# Load and store permissions
effective_perms = gears.get_effective_gear_permissions(
gear_name, gear_yaml
)
self.permissions[gear_name] = effective_perms
logger.info(
"Loaded gear permissions: %s (exec=%s ipc=%d)",
gear_name,
effective_perms.exec,
len(effective_perms.ipc),
)
except Exception as e:
logger.error("Failed to load gear %s permissions: %s", gear_name, e)
def _save_state(self) -> None:
"""
Persist state to state.json.
Saves only persistent fields (active_env, panes) to disk.
Runtime fields (started, tmux_pane) are not saved.
Side Effects:
Writes to ~/.config/axium/state.json
Note:
Creates parent directory if it doesn't exist.
Logs error but doesn't raise if save fails.
"""
try:
CONF_DIR.mkdir(parents=True, exist_ok=True)
data = {
"active_env": self.state["active_env"],
"panes": self.state.get("panes", {}),
}
STATE_PATH.write_text(json.dumps(data, indent=2))
logger.debug("Saved state to %s", STATE_PATH)
except Exception as e:
logger.error("Failed to save state: %s", e)
def _write_state_cache(self) -> None:
"""
Write shell-optimized state cache to state_cache.json.
Creates a JSON file with prefixed commands list for fast shell startup.
This file is read by bash/init.sh to generate wrapper functions without
requiring Python or daemon IPC during shell initialization.
Cache Format:
```json
{
"prefixed_commands": ["aws", "terraform", "ansible"]
}
```
Side Effects:
Writes to ~/.config/axium/state_cache.json
Note:
This should be called whenever prefix rules change:
- On daemon startup (__init__)
- When prefix.yaml is modified (reload command)
- When Spokes are loaded/unloaded
Errors are logged but not raised to avoid breaking daemon startup.
"""
try:
CONF_DIR.mkdir(parents=True, exist_ok=True)
commands = prefix.get_prefixed_commands()
cache_data = {"prefixed_commands": commands}
STATE_CACHE_PATH.write_text(json.dumps(cache_data, indent=2))
logger.debug(
"Wrote state cache to %s with %d commands",
STATE_CACHE_PATH,
len(commands),
)
except Exception as e:
logger.error("Failed to write state cache: %s", e)
def _render_hud_for_pane(self, pane_id: str) -> str:
"""
Pre-render HUD string for a specific pane.
Runs in daemon process, generating the complete HUD string without
requiring CLI-side computation. This enables instant HUD responses.
Uses the new HudRegistry system for modular segment rendering.
Args:
pane_id: tmux pane ID (e.g., "%1")
Returns:
Rendered HUD string like "[axium] pane:%1 env:root uptime:2h15m"
Note:
This method is called by the daemon to pre-compute HUD strings.
Uses HudRegistry for consistent rendering with hud.main().
"""
try:
# Import hud module to ensure segments are registered
from axium.core.hud import get_registry
# Get pane environment
env = self.state["panes"].get(pane_id) or "-"
# Get wrapper and theme config from hud.yaml
wrapper = self.hud_config.get("style", {}).get("wrapper", {})
theme = self.hud_config.get("style", {}).get("theme", {})
# Build context for segment rendering (same as hud.main())
context = {
"state": self.state,
"pane_id": pane_id,
"env": env,
"started": self.state.get("started"),
"wrapper": wrapper,
"theme": theme,
}
# Render all segments via registry
# Don't call hud.main() as it does IPC calls which causes async issues
registry = get_registry()
segments = registry.render_all(context)
return "[axium] " + " ".join(segments)
except Exception as e:
import traceback
logger.error(
"Error rendering HUD for pane %s: %s\n%s",
pane_id,
e,
traceback.format_exc(),
)
return "[axium] inactive"
def _update_hud_cache(self, pane_id: str) -> None:
"""
Update cached HUD string for a specific pane.
Re-renders the HUD and stores it in the cache for instant retrieval.
Args:
pane_id: tmux pane ID (e.g., "%1")
Side Effects:
Updates self.state["hud_cache"][pane_id]
Note:
Called automatically when pane environment changes or on reload.
Errors are logged but don't raise to avoid disrupting daemon.
"""
try:
self.state["hud_cache"][pane_id] = self._render_hud_for_pane(pane_id)
logger.debug("Updated HUD cache for pane %s", pane_id)
except Exception as e:
logger.error("Failed to render HUD for pane %s: %s", pane_id, e)
# Don't add to cache if render fails - let caller handle fallback
def _refresh_all_hud_caches(self) -> None:
"""
Refresh HUD cache for all known panes.
Iterates through all panes in state and re-renders their HUD strings.
Called on daemon startup and after reload operations.
Side Effects:
Updates self.state["hud_cache"] for all panes
Note:
Safe to call even if panes dict is empty (no-op).
"""
for pane_id in self.state["panes"].keys():
self._update_hud_cache(pane_id)
logger.debug("Refreshed HUD cache for %d panes", len(self.state["panes"]))
def _regenerate_completions(self, event_name: str) -> None:
"""
Regenerate completion cache after command structure changes.
Called by event handlers when spokes/gears are loaded, reloaded,
or unloaded, or when daemon config is reloaded.
Args:
event_name: Name of the event that triggered regeneration
Side Effects:
Writes to ~/.config/axium/completions.json
Note:
Errors are logged but not raised to avoid disrupting
the main event flow.
"""
try:
from axium.core.completions import generate_completion_cache
success = generate_completion_cache()
if success:
logger.debug("Regenerated completions after %s", event_name)
else:
logger.warning("Failed to regenerate completions after %s", event_name)
except Exception as e:
logger.error("Error regenerating completions after %s: %s", event_name, e)
def _on_spoke_loaded(self, spoke_name: str) -> None:
"""Event handler: spoke_loaded."""
self._regenerate_completions(f"spoke_loaded:{spoke_name}")
# Refresh HUD caches since spoke may have registered new segments
self._refresh_all_hud_caches()
def _on_spoke_reloaded(self, spoke_name: str) -> None:
"""Event handler: spoke_reloaded."""
self._regenerate_completions(f"spoke_reloaded:{spoke_name}")
# Refresh HUD caches since spoke may have updated segments
self._refresh_all_hud_caches()
def _on_spoke_unloaded(self, spoke_name: str) -> None:
"""Event handler: spoke_unloaded."""
self._regenerate_completions(f"spoke_unloaded:{spoke_name}")
# Refresh HUD caches since spoke segments are removed
self._refresh_all_hud_caches()
def _on_gear_loaded(self, gear_name: str) -> None:
"""Event handler: gear_loaded."""
self._regenerate_completions(f"gear_loaded:{gear_name}")
# Refresh HUD caches since gear may have registered new segments
self._refresh_all_hud_caches()
def _on_gear_unloaded(self, gear_name: str) -> None:
"""Event handler: gear_unloaded."""
self._regenerate_completions(f"gear_unloaded:{gear_name}")
# Refresh HUD caches since gear segments are removed
self._refresh_all_hud_caches()
def _on_daemon_reload(self) -> None:
"""Event handler: daemon_reload."""
self._regenerate_completions("daemon_reload")
def _handle_set_pane_env(self, msg: dict) -> dict:
"""
Handle set_pane_env IPC command.
Sets environment for specific tmux pane and emits env_change event.
Args:
msg: IPC message with 'pane' and 'value' fields
Returns:
Response dict with 'ok' status
"""
pane_id = msg.get("pane")
new_env = msg.get("env")
if not pane_id:
return {"ok": False, "error": "pane ID required"}
# Validate environment name before setting
from . import env as env_module
is_valid, error = env_module.validate_env_name(new_env)
if not is_valid:
return {"ok": False, "error": error}
old_env = self.state["panes"].get(pane_id)
self.state["panes"][pane_id] = new_env
self._save_state()
# Invalidate all config caches (env-aware configs need refresh)
from . import config
config.invalidate_cache()
logger.debug("Invalidated all config caches due to pane env change")
# Update HUD cache for this pane
self._update_hud_cache(pane_id)
logger.info("Pane %s environment set to: %s", pane_id, new_env)
# Emit env_change event with pane context
from . import spokes
spokes.get_event_bus().emit("env_change", new_env, old_env, pane=pane_id)
return {"ok": True}
def _handle_get_pane_env(self, msg: dict) -> dict:
"""
Handle get_pane_env IPC command.
Gets environment for specific tmux pane.
Args:
msg: IPC message with 'pane' field
Returns:
Response dict with 'ok' status and 'env' value
"""
pane_id = msg.get("pane")
if not pane_id:
return {"ok": False, "error": "pane ID required"}
env_name = self.state["panes"].get(pane_id)
return {"ok": True, "env": env_name}
def _handle_clear_pane_env(self, msg: dict) -> dict:
"""
Handle clear_pane_env IPC command.
Clears environment mapping for specific pane and emits env_change event.
Args:
msg: IPC message with 'pane' field
Returns:
Response dict with 'ok' status
"""
pane_id = msg.get("pane")
if not pane_id:
return {"ok": False, "error": "pane ID required"}
old_env = self.state["panes"].get(pane_id)
if pane_id in self.state["panes"]:
del self.state["panes"][pane_id]
self._save_state()
logger.info("Cleared pane %s environment", pane_id)
# Emit post-action event
self.bus.emit("env_change", new_env=None, old_env=old_env, pane=pane_id)
return {"ok": True}
def _handle_notify(self, msg: dict) -> dict:
"""
Handle notify IPC command.
Checks notify permission for spoke, then queues notification.
Args:
msg: IPC message with 'spoke', 'title', 'body', 'level' fields
Returns:
Response dict with 'ok' status
"""
from . import permissions
spoke_name = msg.get("spoke")
title = msg.get("title", "")
body = msg.get("body", "")
level = msg.get("level", "info")
if not spoke_name:
return {"ok": False, "error": "spoke name required"}
# Check permission
spoke_perms = self.permissions.get(spoke_name)
if not spoke_perms:
permissions.log_security(spoke_name, "notify", False, title, False)
return {"ok": False, "error": "spoke not loaded"}
allowed = permissions.check_permission(
spoke_name, "notify", spoke_perms, detail=title
)
if not allowed:
return {"ok": False, "error": "permission denied: notify"}
# Queue notification
notification = {
"spoke": spoke_name,
"title": title,
"body": body,
"level": level,
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
self.notification_queue.append(notification)
logger.info(
"Notification queued: spoke=%s title='%s'",
spoke_name,
title[:50],
)
return {"ok": True}
def _handle_daemon_exec(self, msg: dict) -> dict:
"""
Handle daemon_exec IPC command.
Checks exec permission for spoke, then runs command in background subprocess.
Args:
msg: IPC message with 'spoke', 'command', 'mode' fields
Returns:
Response dict with 'ok' status and 'pid' (if successful)
"""
import subprocess
from . import permissions
spoke_name = msg.get("spoke")
cmd = msg.get("command", "")
mode = msg.get("mode", "background")
if not spoke_name:
return {"ok": False, "error": "spoke name required"}
if not cmd:
return {"ok": False, "error": "command required"}
# Phase 1: only background mode supported
if mode != "background":
return {
"ok": False,
"error": f"unsupported mode: {mode} (only 'background' supported)",
}
# Check permission
spoke_perms = self.permissions.get(spoke_name)
if not spoke_perms:
permissions.log_security(spoke_name, "daemon_exec", False, cmd, False)
return {"ok": False, "error": "spoke not loaded"}
allowed = permissions.check_permission(
spoke_name, "exec", spoke_perms, detail=cmd
)
if not allowed:
return {"ok": False, "error": "permission denied: exec"}
# Run command in background (no TTY, non-interactive)
try:
proc = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL,
start_new_session=True, # Detach from daemon
)
logger.info(
"Background command started: spoke=%s pid=%d cmd='%s'",
spoke_name,
proc.pid,
cmd[:100],
)
return {"ok": True, "pid": proc.pid}
except Exception as e:
logger.error(
"Failed to execute command: spoke=%s cmd='%s' error=%s",
spoke_name,
cmd[:100],
e,
)
return {"ok": False, "error": str(e)}
def _handle_notify_drain(self, msg: dict) -> dict:
"""
Handle notify_drain IPC command.
Returns and clears the notification queue.
Args:
msg: IPC message (no parameters needed)
Returns:
Response dict with 'ok' status and 'notifications' list
"""
notifications = self.notification_queue.copy()
self.notification_queue.clear()
logger.debug("Drained %d notifications", len(notifications))
return {"ok": True, "notifications": notifications}
def _handle_get_permissions(self, msg: dict) -> dict:
"""
Handle get_permissions IPC command.
Returns effective permissions for a spoke.
Args:
msg: IPC message with 'spoke' field
Returns:
Response dict with 'ok' status and 'permissions' dict
"""
spoke_name = msg.get("spoke")
if not spoke_name:
return {"ok": False, "error": "spoke name required"}
spoke_perms = self.permissions.get(spoke_name)
if not spoke_perms:
return {"ok": False, "error": "spoke not loaded"}
# Convert to dict with source info
perms_dict = spoke_perms.to_dict()
sources = {field: spoke_perms.get_source(field) for field in perms_dict.keys()}
return {
"ok": True,
"permissions": perms_dict,
"sources": sources,
}
def _handle_load_spoke_permissions(self, msg: dict) -> dict:
"""
Handle load_spoke_permissions IPC command.
Loads permissions for a spoke from its spoke.yaml and merges with user overrides.
Args:
msg: IPC message with 'spoke', 'spoke_yaml_path' fields
Returns:
Response dict with 'ok' status
"""
from pathlib import Path
from . import permissions
spoke_name = msg.get("spoke")
spoke_yaml_path_str = msg.get("spoke_yaml_path")
if not spoke_name:
return {"ok": False, "error": "spoke name required"}
if not spoke_yaml_path_str:
return {"ok": False, "error": "spoke_yaml_path required"}
try:
spoke_yaml_path = Path(spoke_yaml_path_str)
effective_perms = permissions.get_effective_permissions(
spoke_name, spoke_yaml_path
)
self.permissions[spoke_name] = effective_perms
logger.info(
"Loaded permissions: spoke=%s exec=%s notify=%s fs_read=%d",
spoke_name,
effective_perms.exec,
effective_perms.notify,
len(effective_perms.fs_read),
)
return {"ok": True}
except Exception as e:
logger.error("Failed to load permissions for spoke %s: %s", spoke_name, e)
return {"ok": False, "error": str(e)}
def _handle_get_config(self, msg: dict) -> dict:
"""
Handle get_config IPC command.
Loads config for a spoke and optionally returns a specific value by key path.
Uses the centralized config system with caching.
Args:
msg: IPC message with:
- 'spoke': Spoke name (required)
- 'key': Optional dot-notation key path (e.g., "check.path")
- 'default_filename': Optional config filename (defaults to <spoke>.yaml)
Returns:
Response dict with:
- 'ok': True if successful
- 'config': Full config dict OR specific value if key provided
- 'source': 'cache' or 'loaded'
- 'error': Error message if failed
Example:
>>> # Full config
>>> _handle_get_config({"cmd": "get_config", "spoke": "creds"})
{"ok": True, "config": {...}, "source": "cache"}
>>> # Specific key
>>> _handle_get_config({"cmd": "get_config", "spoke": "creds", "key": "check.path"})
{"ok": True, "config": "~/.aws/credentials", "source": "cache"}
"""
from . import config
spoke_name = msg.get("spoke")
key_path = msg.get("key")
default_filename = msg.get("default_filename", f"{spoke_name}.yaml")
if not spoke_name:
return {"ok": False, "error": "spoke name required"}
try:
# Check if config is cached
cache_keys = [k for k in config._config_cache.keys() if k[0] == spoke_name]
was_cached = len(cache_keys) > 0
# Load config (will use cache if available)
spoke_config = config.load_spoke_config(
spoke_name, default_filename, env_aware=True
)
logger.debug(
"Config request: spoke=%s key=%s cached=%s",
spoke_name,
key_path or "(full)",
was_cached,
)
# If key path provided, extract specific value
if key_path:
value = config.get_config_value_by_path(
spoke_config, key_path, default=None
)
if value is None:
logger.debug(
"Config key not found: spoke=%s key=%s", spoke_name, key_path
)
return {
"ok": False,
"error": f"key '{key_path}' not found in config",
}
return {
"ok": True,
"config": value,
"source": "cache" if was_cached else "loaded",
}
# Return full config
return {
"ok": True,
"config": spoke_config,
"source": "cache" if was_cached else "loaded",
}
except FileNotFoundError as e:
logger.debug("Config file not found: spoke=%s error=%s", spoke_name, e)
return {"ok": False, "error": f"config file not found: {e}"}
except Exception as e:
logger.error("Failed to load config for spoke %s: %s", spoke_name, e)
return {"ok": False, "error": f"config error: {str(e)}"}
def _handle_tmux_split_run(self, msg: dict) -> dict:
"""
Handle tmux_split_run IPC command.
Creates a tmux split pane and runs a command inside it.
Requires 'tmux_split_run' in spoke/gear's IPC permissions.
Args:
msg: IPC message with 'spoke', 'command', 'height' fields
Returns:
Response dict with 'ok' status and 'pane_id' if successful
"""
import subprocess
from . import permissions
spoke_name = msg.get("spoke")
command = msg.get("command", "")
height = msg.get("height", 20)
if not spoke_name or not command:
return {"ok": False, "error": "spoke and command required"}
if not (1 <= height <= 99):
return {"ok": False, "error": "height must be 1-99"}
# Permission check
spoke_perms = self.permissions.get(spoke_name)
if not spoke_perms:
permissions.log_security(
spoke_name, "ipc:tmux_split_run", False, command, False
)
return {"ok": False, "error": "spoke not loaded"}
if not permissions.check_ipc_permission(
spoke_name, "tmux_split_run", spoke_perms, command
):
return {"ok": False, "error": "permission denied: tmux_split_run"}
# Execute tmux split
try:
result = subprocess.run(
[
"tmux",
"split-window",
"-v",
"-l",
f"{height}%",
"-P",
"-F",
"#{pane_id}",
command,
],
capture_output=True,
text=True,
timeout=5,
check=True,
)
pane_id = result.stdout.strip()
logger.info(
"Tmux split created: spoke=%s pane=%s cmd='%s'",
spoke_name,
pane_id,
command[:100],
)
return {"ok": True, "pane_id": pane_id}
except subprocess.TimeoutExpired:
logger.error("Tmux split timeout: spoke=%s", spoke_name)
return {"ok": False, "error": "tmux command timeout"}
except subprocess.CalledProcessError as e:
logger.error(
"Tmux split failed: spoke=%s stderr='%s'", spoke_name, e.stderr
)
return {"ok": False, "error": f"tmux error: {e.stderr}"}
except Exception as e:
logger.error("Tmux split exception: spoke=%s error=%s", spoke_name, e)
return {"ok": False, "error": str(e)}
def _handle_tmux_send_keys(self, msg: dict) -> dict:
"""Handle tmux_send_keys IPC command."""
import subprocess
from . import permissions
spoke_name = msg.get("spoke")
pane_id = msg.get("pane_id", "")
keys = msg.get("keys", "")
if not all([spoke_name, pane_id, keys]):
return {"ok": False, "error": "spoke, pane_id, and keys required"}
# Permission check
spoke_perms = self.permissions.get(spoke_name)
if not spoke_perms:
return {"ok": False, "error": "spoke not loaded"}
if not permissions.check_ipc_permission(
spoke_name, "tmux_send_keys", spoke_perms
):
return {"ok": False, "error": "permission denied: tmux_send_keys"}
try:
subprocess.run(
["tmux", "send-keys", "-t", pane_id, keys],
check=True,
timeout=2,
)
logger.debug("Sent keys to pane: spoke=%s pane=%s", spoke_name, pane_id)
return {"ok": True}
except Exception as e:
logger.error("tmux_send_keys failed: %s", e)
return {"ok": False, "error": str(e)}
def _handle_tmux_capture_pane(self, msg: dict) -> dict:
"""Handle tmux_capture_pane IPC command."""
import subprocess
from . import permissions
spoke_name = msg.get("spoke")
pane_id = msg.get("pane_id", "")
if not spoke_name or not pane_id:
return {"ok": False, "error": "spoke and pane_id required"}
# Permission check
spoke_perms = self.permissions.get(spoke_name)
if not spoke_perms:
return {"ok": False, "error": "spoke not loaded"}
if not permissions.check_ipc_permission(
spoke_name, "tmux_capture_pane", spoke_perms
):
return {"ok": False, "error": "permission denied: tmux_capture_pane"}
try:
result = subprocess.run(
["tmux", "capture-pane", "-t", pane_id, "-p"],
capture_output=True,
text=True,
check=True,
timeout=2,
)
logger.debug(
"Captured pane: spoke=%s pane=%s size=%d",
spoke_name,
pane_id,
len(result.stdout),
)
return {"ok": True, "content": result.stdout}
except Exception as e:
logger.error("tmux_capture_pane failed: %s", e)
return {"ok": False, "error": str(e)}
def _handle_read_file(self, msg: dict) -> dict:
"""
Handle read_file IPC command.
Reads file with permission checking against fs_read patterns.
"""
from pathlib import Path
from . import permissions
spoke_name = msg.get("spoke")
path_str = msg.get("path", "")
if not spoke_name or not path_str:
return {"ok": False, "error": "spoke and path required"}
# Permission check
spoke_perms = self.permissions.get(spoke_name)
if not spoke_perms:
return {"ok": False, "error": "spoke not loaded"}
if not permissions.check_permission(
spoke_name, f"fs_read:{path_str}", spoke_perms
):
return {"ok": False, "error": "permission denied: fs_read"}
# Read file
try:
path = Path(path_str).expanduser().resolve()
if not path.exists():
return {"ok": False, "error": "file not found"}
# Size check (max 10MB)
if path.stat().st_size > 10 * 1024 * 1024:
return {"ok": False, "error": "file too large (max 10MB)"}
content = path.read_text()
logger.debug(
"File read: spoke=%s path=%s size=%d", spoke_name, path, len(content)
)
return {"ok": True, "content": content}
except Exception as e:
logger.error(
"File read failed: spoke=%s path=%s error=%s", spoke_name, path_str, e
)
return {"ok": False, "error": str(e)}
def _handle_write_file(self, msg: dict) -> dict:
"""
Handle write_file IPC command.
Writes file with permission checking against fs_write patterns.
"""
from pathlib import Path
from . import permissions
spoke_name = msg.get("spoke")
path_str = msg.get("path", "")
content = msg.get("content", "")
if not spoke_name or not path_str:
return {"ok": False, "error": "spoke and path required"}
# Permission check
spoke_perms = self.permissions.get(spoke_name)
if not spoke_perms:
return {"ok": False, "error": "spoke not loaded"}
if not permissions.check_permission(
spoke_name, f"fs_write:{path_str}", spoke_perms
):
return {"ok": False, "error": "permission denied: fs_write"}
# Write file
try:
path = Path(path_str).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
logger.info(
"File written: spoke=%s path=%s size=%d", spoke_name, path, len(content)
)
return {"ok": True}
except Exception as e:
logger.error(
"File write failed: spoke=%s path=%s error=%s", spoke_name, path_str, e
)
return {"ok": False, "error": str(e)}
def _handle_write_log(self, msg: dict) -> dict:
"""Handle write_log IPC command."""
from datetime import datetime, timezone
from pathlib import Path
spoke_name = msg.get("spoke")
message = msg.get("message", "")
level = msg.get("level", "info").upper()
if not spoke_name or not message:
return {"ok": False, "error": "spoke and message required"}
try:
log_dir = Path.home() / ".config" / "axium" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"{spoke_name}.log"
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
log_line = f"[{timestamp}] [{level}] {message}\n"
with log_file.open("a") as f:
f.write(log_line)
logger.debug("Log written: spoke=%s level=%s", spoke_name, level)
return {"ok": True}
except Exception as e:
logger.error("write_log failed: spoke=%s error=%s", spoke_name, e)
return {"ok": False, "error": str(e)}
def _handle_register_prefix(self, msg: dict) -> dict:
"""
Handle register_prefix IPC command.
Registers prefix rule with conflict detection.
"""
from . import prefix
spoke_name = msg.get("spoke")
command = msg.get("command")
wrapper = msg.get("wrapper")
if not all([spoke_name, command, wrapper]):
return {"ok": False, "error": "spoke, command, and wrapper required"}
# Check for existing owner
existing_owner = prefix.get_rule_owner(command)
if existing_owner and existing_owner != spoke_name:
logger.warning(
"Prefix conflict: %s tried to register '%s' but owned by %s",
spoke_name,
command,
existing_owner,
)
return {
"ok": False,
"error": f"command already registered by {existing_owner}",
"conflict": True,
}
# Register
try:
success = prefix.register_prefix_rule(command, wrapper, spoke_name)
if success:
# Regenerate state cache
self._write_state_cache()
logger.info(
"Prefix registered: spoke=%s command=%s wrapper=%s",
spoke_name,
command,
wrapper,
)
return {"ok": True}
else:
return {"ok": False, "error": "registration failed"}
except Exception as e:
logger.error("Prefix registration failed: spoke=%s error=%s", spoke_name, e)
return {"ok": False, "error": str(e)}
def _handle_get_hud_segments(self, msg: dict) -> dict:
"""
Handle get_hud_segments IPC command.
Returns list of all registered HUD segments with their metadata.
Returns:
{"ok": True, "segments": [{"name": "...", "priority": 10, "spoke": "..."}]}
"""
try:
from .hud import get_registry
registry = get_registry()
segments = []
for segment in registry.segments:
segments.append(
{
"name": segment.name,
"priority": segment.priority,
"spoke": getattr(segment, "spoke", None) or "core",
}
)
# Sort by priority
segments.sort(key=lambda x: x["priority"])
return {"ok": True, "segments": segments}
except Exception as e:
logger.error("get_hud_segments failed: error=%s", e)
return {"ok": False, "error": str(e)}
async def handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""
Handle an IPC request from a client.
Reads JSON request from socket, processes command, sends JSON response.
Each command is handled synchronously but multiple clients can connect
concurrently.
Args:
reader: asyncio StreamReader for reading request
writer: asyncio StreamWriter for sending response
IPC Request Format:
```json
{"cmd": "command_name", "arg1": "value", ...}
```
IPC Response Format:
```json
{"ok": true, "result": ...}
// or
{"ok": false, "error": "message"}
```
Supported Commands:
```json
ping: {"cmd": "ping"} → {"ok": true, "pong": true}
get_state: {"cmd": "get_state"} → {"ok": true, "state": {...}}
set_env: {"cmd": "set_env", "value": "prod"}
→ {"ok": true}
Side effect: Emits env_change event to Spokes
set_pane_env: {"cmd": "set_pane_env", "pane": "%1", "value": "prod"}
→ {"ok": true}
Side effect: Emits env_change event with pane context
get_hud: {"cmd": "get_hud", "pane": "%1"}
→ {"ok": true, "hud": "[axium] pane:%1 env:prod uptime:2h15m"}
Fast path: Returns pre-rendered HUD string from cache
get_pane_env: {"cmd": "get_pane_env", "pane": "%1"}
→ {"ok": true, "env": "prod"}
clear_pane_env: {"cmd": "clear_pane_env", "pane": "%1"}
→ {"ok": true}
reload: {"cmd": "reload"} → {"ok": true, "reloaded": true}
Side effect: Reloads state from disk
apply_prefixes: {"cmd": "apply_prefixes", "command": "aws",
"args": ["s3", "ls"], "context": {...}}
→ {"ok": true, "command": [...], "env_vars": {...}}
list_prefixed_commands: {"cmd": "list_prefixed_commands"}
→ {"ok": true, "commands": ["aws", "terraform", ...]}
stop: {"cmd": "stop"} → {"ok": true, "stopping": True}
Side effect: Triggers graceful shutdown
```
Note:
Errors are caught and returned as {"ok": false, "error": "message"}
Connection is closed after each request (no persistent connections)
"""
try:
raw = await reader.readline()
if not raw:
writer.close()
return
msg = json.loads(raw.decode())
cmd = msg.get("cmd")
logger.debug("Received IPC command: %s", cmd)
if cmd == "ping":
resp = {"ok": True, "pong": True}
elif cmd == "get_state":
resp = {"ok": True, "state": self.state}
elif cmd == "set_env":
new_env = msg.get("value")
# Validate environment name before setting
from . import env as env_module
is_valid, error = env_module.validate_env_name(new_env)
if not is_valid:
resp = {"ok": False, "error": error}
else:
old_env = self.state.get("active_env")
self.state["active_env"] = new_env
self._save_state()
logger.info("Environment set to: %s", new_env)
# Invalidate all config caches (env-aware configs need refresh)
from . import config
config.invalidate_cache()
logger.debug("Invalidated all config caches due to env change")
# Emit env_change event
from . import spokes
spokes.get_event_bus().emit("env_change", new_env, old_env)
resp = {"ok": True}
elif cmd == "get_hud":
# Render HUD fresh each time (includes dynamic uptime)
pane_id = msg.get("pane")
if not pane_id:
resp = {"ok": False, "error": "pane ID required"}
else:
try:
# Always render fresh to get current uptime
logger.debug("Rendering HUD for pane %s", pane_id)
hud_str = self._render_hud_for_pane(pane_id)
logger.debug("HUD rendered: %s", hud_str)
resp = {"ok": True, "hud": hud_str}
except Exception as e:
import traceback
logger.error(
"Error rendering HUD for pane %s: %s\n%s",
pane_id,
e,
traceback.format_exc(),
)
resp = {"ok": True, "hud": "[axium] inactive"}
elif cmd == "set_pane_env":
resp = self._handle_set_pane_env(msg)
elif cmd == "get_pane_env":
resp = self._handle_get_pane_env(msg)
elif cmd == "clear_pane_env":
resp = self._handle_clear_pane_env(msg)
elif cmd == "reload":
logger.info("Reload command received")
# Reload state from disk
self._load_state()
# Reload HUD configuration
self._load_hud_config()
# Reload prefix configuration
prefix.reload_config()
# Reload all spoke configurations
from . import config
config.reload_all_configs()
# Regenerate state cache in case prefix rules changed
self._write_state_cache()
# Refresh HUD cache for all panes
self._refresh_all_hud_caches()
# Emit post-reload events
self.bus.emit("hud_refresh")
self.bus.emit("daemon_reload")
self.bus.emit("config_reloaded")
resp = {"ok": True, "reloaded": True}
elif cmd == "apply_prefixes":
# Apply prefix rules to a command
command = msg.get("command")
args = msg.get("args", [])
context = msg.get("context", {})
try:
final_cmd, env_vars = prefix.apply_prefixes(command, args, context)
resp = {
"ok": True,
"command": final_cmd,
"env_vars": env_vars,
}
except Exception as e:
logger.error("Failed to apply prefixes: %s", e)
resp = {"ok": False, "error": str(e)}
elif cmd == "list_prefixed_commands":
# List all commands that have prefix rules
try:
commands = prefix.get_prefixed_commands()
resp = {"ok": True, "commands": commands}
except Exception as e:
logger.error("Failed to list prefixed commands: %s", e)
resp = {"ok": False, "error": str(e)}
elif cmd == "notify":
resp = self._handle_notify(msg)
elif cmd == "daemon_exec":
resp = self._handle_daemon_exec(msg)
elif cmd == "notify_drain":
resp = self._handle_notify_drain(msg)
elif cmd == "get_permissions":
resp = self._handle_get_permissions(msg)
elif cmd == "load_spoke_permissions":
resp = self._handle_load_spoke_permissions(msg)
elif cmd == "get_config":
resp = self._handle_get_config(msg)
elif cmd == "tmux_split_run":
resp = self._handle_tmux_split_run(msg)
elif cmd == "tmux_send_keys":
resp = self._handle_tmux_send_keys(msg)
elif cmd == "tmux_capture_pane":
resp = self._handle_tmux_capture_pane(msg)
elif cmd == "read_file":
resp = self._handle_read_file(msg)
elif cmd == "write_file":
resp = self._handle_write_file(msg)
elif cmd == "write_log":
resp = self._handle_write_log(msg)
elif cmd == "register_prefix":
resp = self._handle_register_prefix(msg)
elif cmd == "get_hud_segments":
resp = self._handle_get_hud_segments(msg)
elif cmd == "daemon_status":
# Return daemon status information
uptime_seconds = int(time.time() - self._start_time)
# Format uptime as "Xh Ym Zs"
hours = uptime_seconds // 3600
minutes = (uptime_seconds % 3600) // 60
seconds = uptime_seconds % 60
parts = []
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0 or hours > 0: # Show minutes if we have hours
parts.append(f"{minutes}m")
parts.append(f"{seconds}s")
uptime_str = " ".join(parts)
resp = {
"ok": True,
"status": {
"running": True,
"uptime": uptime_str,
"active_env": self.state.get("active_env"),
"panes": len(self.state.get("panes", {})),
},
}
elif cmd == "reload_spoke":
# Reload a specific spoke - daemon performs the action
spoke_name = msg.get("spoke")
if not spoke_name:
resp = {"ok": False, "error": "spoke name required"}
else:
try:
# PERFORM THE ACTION (don't just emit event)
from axium.core.spokes import reload_spokes
reloaded = reload_spokes(spoke_name)
if reloaded:
# THEN emit post-action notification
self.bus.emit("spoke_reloaded", spoke_name=spoke_name)
resp = {"ok": True, "spoke": spoke_name}
else:
resp = {
"ok": False,
"error": f"failed to reload spoke: {spoke_name}",
}
except Exception as e:
logger.error("Spoke reload failed: %s", e)
resp = {"ok": False, "error": str(e)}
elif cmd == "reload_spokes":
# Reload all spokes - daemon performs the action
try:
from axium.core.spokes import reload_spokes
reloaded_list = reload_spokes() # Reload all
# Emit post-action event for EACH reloaded spoke
for spoke_name in reloaded_list:
self.bus.emit("spoke_reloaded", spoke_name=spoke_name)
resp = {"ok": True, "spokes": reloaded_list}
except Exception as e:
logger.error("Spokes reload failed: %s", e)
resp = {"ok": False, "error": str(e), "spokes": []}
elif cmd == "stop":
logger.info("Stop command received")
resp = {"ok": True, "stopping": True}
writer.write((json.dumps(resp) + "\n").encode())
await writer.drain()
writer.close()
await asyncio.sleep(0.05)
self._stop.set()
return
else:
logger.warning("Unknown command received: %s", cmd)
resp = {"ok": False, "error": "unknown command"}
writer.write((json.dumps(resp) + "\n").encode())
await writer.drain()
writer.close()
except Exception as e:
logger.error("Error handling IPC request: %s", e, exc_info=True)
try:
writer.write(
(json.dumps({"ok": False, "error": str(e)}) + "\n").encode()
)
await writer.drain()
writer.close()
except Exception:
pass
async def _periodic_segment_update(self) -> None:
"""
Periodically update cached HUD segments.
Runs every 5 minutes to refresh expensive segments like credential checks.
This ensures segments stay up-to-date even without explicit events.
"""
while not self._stop.is_set():
try:
await asyncio.sleep(300) # 5 minutes
if self._stop.is_set():
break
# Update all cached segments
from .hud import get_registry
registry = get_registry()
env_name = self.state.get("env", "-")
context = {"env": env_name, "state": self.state}
registry.update_cached_segments(context)
logger.debug("Periodic cached segment update complete")
except Exception as e:
logger.error("Error in periodic segment update: %s", e)
async def run(self) -> None:
"""
Run the daemon event loop.
Starts UNIX socket server and waits for shutdown signal.
Cleans up socket file on exit.
The server accepts connections on /tmp/axiumd.sock and handles
each client in a separate task via handle_client().
Blocks until _stop event is set (via stop command or signal).
Side Effects:
- Creates UNIX socket at /tmp/axiumd.sock
- Removes existing socket if present
- Cleans up socket on exit
- Starts periodic cached segment updates every 5 minutes
Note:
This should be run via asyncio.run() in the main process.
"""
if SOCKET_PATH.exists():
try:
SOCKET_PATH.unlink()
logger.debug("Removed existing socket at %s", SOCKET_PATH)
except FileNotFoundError:
pass
logger.info("Starting Unix socket server at %s", SOCKET_PATH)
self.server = await asyncio.start_unix_server(
self.handle_client, path=str(SOCKET_PATH)
)
# Start periodic segment update task
update_task = asyncio.create_task(self._periodic_segment_update())
try:
async with self.server:
logger.info("Daemon ready, waiting for connections")
await self._stop.wait()
finally:
# Cancel periodic task
update_task.cancel()
try:
await update_task
except asyncio.CancelledError:
pass
logger.info("Cleaning up socket")
try:
SOCKET_PATH.unlink()
except FileNotFoundError:
pass