Skip to content

Daemon API

Background daemon process and state management.


daemon

Async daemon implementation with IPC handler.

daemon

Axium Daemon - Async background service.

The daemon is the heart of Axium, running as a background process to: - Maintain session state (active environment, uptime, etc.) - Handle IPC requests from CLI via UNIX socket - Coordinate events across Spokes - Apply prefix rules to commands

The daemon uses asyncio for concurrent request handling and communicates via JSON over a UNIX socket at /tmp/axiumd.sock.

State is persisted to ~/.config/axium/state.json and includes: - active_env: Current environment name - panes: Per-pane environment mapping

Runtime state (not persisted): - started: Daemon start timestamp (ISO 8601) - tmux_pane: TMUX_PANE value when daemon started - hud_cache: Pre-rendered HUD strings for panes

IPC Protocol
Request:  {"cmd": "command_name", "arg1": "value", ...}
Response: {"ok": true, "result": ...} or {"ok": false, "error": "..."}
Supported Commands
  • ping: Health check
  • get_state: Get full daemon state
  • set_env: Set active environment
  • reload: Reload state from disk
  • apply_prefixes: Apply prefix rules to a command
  • list_prefixed_commands: List commands with prefix rules
  • stop: Gracefully shut down daemon
Example
$ axium daemon start
$ axium daemon status
$ axium daemon stop

AxiumDaemon

Main daemon class managing state and IPC.

The daemon runs an asyncio event loop with a UNIX socket server accepting JSON IPC requests from the CLI.

Attributes:

Name Type Description
state

Runtime state dict with keys: Persistent (saved to state.json): - active_env: Current environment name (str | None) - panes: Per-pane environment mapping (dict) Runtime-only: - started: ISO 8601 timestamp of daemon start (str) - tmux_pane: TMUX_PANE when daemon started (str | None) - hud_cache: Pre-rendered HUD strings (dict)

_start_time

Unix timestamp of daemon start (float, for uptime)

server

asyncio.Server instance for UNIX socket

_stop

asyncio.Event for graceful shutdown

Example
daemon = AxiumDaemon()
await daemon.run()
Source code in axium/core/daemon.py
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
class AxiumDaemon:
    """
    Main daemon class managing state and IPC.

    The daemon runs an asyncio event loop with a UNIX socket server
    accepting JSON IPC requests from the CLI.

    Attributes:
        state: Runtime state dict with keys:
            Persistent (saved to state.json):
                - active_env: Current environment name (str | None)
                - panes: Per-pane environment mapping (dict)
            Runtime-only:
                - started: ISO 8601 timestamp of daemon start (str)
                - tmux_pane: TMUX_PANE when daemon started (str | None)
                - hud_cache: Pre-rendered HUD strings (dict)
        _start_time: Unix timestamp of daemon start (float, for uptime)
        server: asyncio.Server instance for UNIX socket
        _stop: asyncio.Event for graceful shutdown

    Example:
        ```python
        daemon = AxiumDaemon()
        await daemon.run()
        ```
    """

    def __init__(self):
        """
        Initialize daemon with default state.

        Loads persistent state from state.json if it exists, otherwise
        initializes with defaults. Creates empty event registry.
        """
        # Runtime-only state (not persisted)
        self.state = {
            "active_env": None,
            "panes": {},  # Per-pane environment mapping: {"%1": "root", "%2": "builder"}
            "hud_cache": {},  # Pre-rendered HUD strings: {"%1": "[axium] pane:%1 ..."}
            "tmux_pane": os.getenv("TMUX_PANE"),
            "started": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        }
        self._start_time = time.time()  # Unix timestamp for fast uptime calculations
        self.permissions = (
            {}
        )  # Effective permissions per spoke: {spoke_name: SpokePermissions}
        self.notification_queue = (
            []
        )  # Queued notifications: [{"title": ..., "body": ..., "spoke": ...}]
        self.hud_config = {}  # HUD layout configuration from hud.yaml
        self.server = None
        self._stop = asyncio.Event()

        # Initialize EventBus for spoke coordination
        from axium.core.spokes import get_event_bus

        self.bus = get_event_bus()

        # Set up completion cache event listeners
        self.bus.on("spoke_loaded", self._on_spoke_loaded)
        self.bus.on("spoke_reloaded", self._on_spoke_reloaded)
        self.bus.on("spoke_unloaded", self._on_spoke_unloaded)
        self.bus.on("gear_loaded", self._on_gear_loaded)
        self.bus.on("gear_unloaded", self._on_gear_unloaded)
        self.bus.on("daemon_reload", self._on_daemon_reload)

        self._load_state()
        self._load_hud_config()  # Load HUD layout from hud.yaml
        self._write_state_cache()
        self._refresh_all_hud_caches()  # Pre-render HUD for all panes

        # Load gears for daemon IPC operations
        # Note: Gears are also loaded in CLI for command registration
        # This loads them in daemon context for IPC permission enforcement
        self._load_gears_for_daemon()

        # Generate initial completion cache
        self._regenerate_completions("daemon_init")

    def _load_state(self) -> None:
        """
        Load persistent state from state.json.

        Reads active_env and panes mapping from disk if state.json exists.
        Other state (started, tmux_pane) is runtime-only and not loaded.

        Side Effects:
            Updates self.state["active_env"] and self.state["panes"] if file exists and is valid.

        Note:
            Logs warning if state.json is malformed but continues with defaults.
        """
        if STATE_PATH.exists():
            try:
                data = json.loads(STATE_PATH.read_text())
                self.state["active_env"] = data.get("active_env")
                self.state["panes"] = data.get("panes", {})
                logger.info("Loaded state from %s", STATE_PATH)
            except Exception as e:
                logger.warning("Failed to load state: %s", e)

    def _load_hud_config(self) -> None:
        """
        Load HUD layout configuration from hud.yaml.

        Reads HUD layout and style configuration from ~/.config/axium/hud.yaml.
        Falls back to hardcoded defaults if file doesn't exist or is invalid.

        Side Effects:
            Updates self.hud_config with layout and style settings

        Note:
            - layout: List of segment names to display (e.g., ["env", "uptime"])
            - style: Dict with color and padding settings
            - Logs warning if hud.yaml is malformed but continues with defaults
        """
        import yaml

        hud_path = CONF_DIR / "hud.yaml"

        # Default configuration
        default_config = {
            "layout": ["env", "uptime"],
            "style": {"color": "#00B7C7", "padding": 1},
        }

        if not hud_path.exists():
            self.hud_config = default_config
            logger.debug("Using default HUD config (hud.yaml not found)")
            return

        try:
            data = yaml.safe_load(hud_path.read_text())
            self.hud_config = data if data else default_config
            logger.info("Loaded HUD config from %s", hud_path)
        except Exception as e:
            logger.warning("Failed to load hud.yaml: %s (using defaults)", e)
            self.hud_config = default_config

    def _load_gears_for_daemon(self) -> None:
        """
        Load gears for daemon IPC operations.

        Loads gears to populate self.permissions dict for IPC permission enforcement.
        Gears are also loaded in CLI context for command registration.

        This is called during daemon initialization.
        """
        from . import gears

        discovered = gears.discover_gears()
        if not discovered:
            logger.debug("No gears found")
            return

        # Gears need app and events, but in daemon context we don't have the CLI app
        # We only need to load permissions for IPC operations
        # Commands are registered when gears load in CLI context
        for gear_name in discovered:
            try:
                # Load just the permissions without calling register()
                gear_path = gears.GEARS_DIR / gear_name
                gear_yaml = gear_path / "gear.yaml"

                if not gear_yaml.exists():
                    logger.warning("Gear %s missing gear.yaml", gear_name)
                    continue

                # Load and store permissions
                effective_perms = gears.get_effective_gear_permissions(
                    gear_name, gear_yaml
                )
                self.permissions[gear_name] = effective_perms

                logger.info(
                    "Loaded gear permissions: %s (exec=%s ipc=%d)",
                    gear_name,
                    effective_perms.exec,
                    len(effective_perms.ipc),
                )

            except Exception as e:
                logger.error("Failed to load gear %s permissions: %s", gear_name, e)

    def _save_state(self) -> None:
        """
        Persist state to state.json.

        Saves only persistent fields (active_env, panes) to disk.
        Runtime fields (started, tmux_pane) are not saved.

        Side Effects:
            Writes to ~/.config/axium/state.json

        Note:
            Creates parent directory if it doesn't exist.
            Logs error but doesn't raise if save fails.
        """
        try:
            CONF_DIR.mkdir(parents=True, exist_ok=True)
            data = {
                "active_env": self.state["active_env"],
                "panes": self.state.get("panes", {}),
            }
            STATE_PATH.write_text(json.dumps(data, indent=2))
            logger.debug("Saved state to %s", STATE_PATH)
        except Exception as e:
            logger.error("Failed to save state: %s", e)

    def _write_state_cache(self) -> None:
        """
        Write shell-optimized state cache to state_cache.json.

        Creates a JSON file with prefixed commands list for fast shell startup.
        This file is read by bash/init.sh to generate wrapper functions without
        requiring Python or daemon IPC during shell initialization.

        Cache Format:
            ```json
            {
                "prefixed_commands": ["aws", "terraform", "ansible"]
            }
            ```

        Side Effects:
            Writes to ~/.config/axium/state_cache.json

        Note:
            This should be called whenever prefix rules change:
            - On daemon startup (__init__)
            - When prefix.yaml is modified (reload command)
            - When Spokes are loaded/unloaded

            Errors are logged but not raised to avoid breaking daemon startup.
        """
        try:
            CONF_DIR.mkdir(parents=True, exist_ok=True)
            commands = prefix.get_prefixed_commands()
            cache_data = {"prefixed_commands": commands}
            STATE_CACHE_PATH.write_text(json.dumps(cache_data, indent=2))
            logger.debug(
                "Wrote state cache to %s with %d commands",
                STATE_CACHE_PATH,
                len(commands),
            )
        except Exception as e:
            logger.error("Failed to write state cache: %s", e)

    def _render_hud_for_pane(self, pane_id: str) -> str:
        """
        Pre-render HUD string for a specific pane.

        Runs in daemon process, generating the complete HUD string without
        requiring CLI-side computation. This enables instant HUD responses.

        Uses the new HudRegistry system for modular segment rendering.

        Args:
            pane_id: tmux pane ID (e.g., "%1")

        Returns:
            Rendered HUD string like "[axium] pane:%1  env:root  uptime:2h15m"

        Note:
            This method is called by the daemon to pre-compute HUD strings.
            Uses HudRegistry for consistent rendering with hud.main().
        """
        try:
            # Import hud module to ensure segments are registered
            from axium.core.hud import get_registry

            # Get pane environment
            env = self.state["panes"].get(pane_id) or "-"

            # Get wrapper and theme config from hud.yaml
            wrapper = self.hud_config.get("style", {}).get("wrapper", {})
            theme = self.hud_config.get("style", {}).get("theme", {})

            # Build context for segment rendering (same as hud.main())
            context = {
                "state": self.state,
                "pane_id": pane_id,
                "env": env,
                "started": self.state.get("started"),
                "wrapper": wrapper,
                "theme": theme,
            }

            # Render all segments via registry
            # Don't call hud.main() as it does IPC calls which causes async issues
            registry = get_registry()
            segments = registry.render_all(context)

            return "[axium] " + "  ".join(segments)
        except Exception as e:
            import traceback

            logger.error(
                "Error rendering HUD for pane %s: %s\n%s",
                pane_id,
                e,
                traceback.format_exc(),
            )
            return "[axium] inactive"

    def _update_hud_cache(self, pane_id: str) -> None:
        """
        Update cached HUD string for a specific pane.

        Re-renders the HUD and stores it in the cache for instant retrieval.

        Args:
            pane_id: tmux pane ID (e.g., "%1")

        Side Effects:
            Updates self.state["hud_cache"][pane_id]

        Note:
            Called automatically when pane environment changes or on reload.
            Errors are logged but don't raise to avoid disrupting daemon.
        """
        try:
            self.state["hud_cache"][pane_id] = self._render_hud_for_pane(pane_id)
            logger.debug("Updated HUD cache for pane %s", pane_id)
        except Exception as e:
            logger.error("Failed to render HUD for pane %s: %s", pane_id, e)
            # Don't add to cache if render fails - let caller handle fallback

    def _refresh_all_hud_caches(self) -> None:
        """
        Refresh HUD cache for all known panes.

        Iterates through all panes in state and re-renders their HUD strings.
        Called on daemon startup and after reload operations.

        Side Effects:
            Updates self.state["hud_cache"] for all panes

        Note:
            Safe to call even if panes dict is empty (no-op).
        """
        for pane_id in self.state["panes"].keys():
            self._update_hud_cache(pane_id)
        logger.debug("Refreshed HUD cache for %d panes", len(self.state["panes"]))

    def _regenerate_completions(self, event_name: str) -> None:
        """
        Regenerate completion cache after command structure changes.

        Called by event handlers when spokes/gears are loaded, reloaded,
        or unloaded, or when daemon config is reloaded.

        Args:
            event_name: Name of the event that triggered regeneration

        Side Effects:
            Writes to ~/.config/axium/completions.json

        Note:
            Errors are logged but not raised to avoid disrupting
            the main event flow.
        """
        try:
            from axium.core.completions import generate_completion_cache

            success = generate_completion_cache()
            if success:
                logger.debug("Regenerated completions after %s", event_name)
            else:
                logger.warning("Failed to regenerate completions after %s", event_name)
        except Exception as e:
            logger.error("Error regenerating completions after %s: %s", event_name, e)

    def _on_spoke_loaded(self, spoke_name: str) -> None:
        """Event handler: spoke_loaded."""
        self._regenerate_completions(f"spoke_loaded:{spoke_name}")
        # Refresh HUD caches since spoke may have registered new segments
        self._refresh_all_hud_caches()

    def _on_spoke_reloaded(self, spoke_name: str) -> None:
        """Event handler: spoke_reloaded."""
        self._regenerate_completions(f"spoke_reloaded:{spoke_name}")
        # Refresh HUD caches since spoke may have updated segments
        self._refresh_all_hud_caches()

    def _on_spoke_unloaded(self, spoke_name: str) -> None:
        """Event handler: spoke_unloaded."""
        self._regenerate_completions(f"spoke_unloaded:{spoke_name}")
        # Refresh HUD caches since spoke segments are removed
        self._refresh_all_hud_caches()

    def _on_gear_loaded(self, gear_name: str) -> None:
        """Event handler: gear_loaded."""
        self._regenerate_completions(f"gear_loaded:{gear_name}")
        # Refresh HUD caches since gear may have registered new segments
        self._refresh_all_hud_caches()

    def _on_gear_unloaded(self, gear_name: str) -> None:
        """Event handler: gear_unloaded."""
        self._regenerate_completions(f"gear_unloaded:{gear_name}")
        # Refresh HUD caches since gear segments are removed
        self._refresh_all_hud_caches()

    def _on_daemon_reload(self) -> None:
        """Event handler: daemon_reload."""
        self._regenerate_completions("daemon_reload")

    def _handle_set_pane_env(self, msg: dict) -> dict:
        """
        Handle set_pane_env IPC command.

        Sets environment for specific tmux pane and emits env_change event.

        Args:
            msg: IPC message with 'pane' and 'value' fields

        Returns:
            Response dict with 'ok' status
        """
        pane_id = msg.get("pane")
        new_env = msg.get("env")

        if not pane_id:
            return {"ok": False, "error": "pane ID required"}

        # Validate environment name before setting
        from . import env as env_module

        is_valid, error = env_module.validate_env_name(new_env)
        if not is_valid:
            return {"ok": False, "error": error}

        old_env = self.state["panes"].get(pane_id)
        self.state["panes"][pane_id] = new_env
        self._save_state()

        # Invalidate all config caches (env-aware configs need refresh)
        from . import config

        config.invalidate_cache()
        logger.debug("Invalidated all config caches due to pane env change")

        # Update HUD cache for this pane
        self._update_hud_cache(pane_id)

        logger.info("Pane %s environment set to: %s", pane_id, new_env)

        # Emit env_change event with pane context
        from . import spokes

        spokes.get_event_bus().emit("env_change", new_env, old_env, pane=pane_id)

        return {"ok": True}

    def _handle_get_pane_env(self, msg: dict) -> dict:
        """
        Handle get_pane_env IPC command.

        Gets environment for specific tmux pane.

        Args:
            msg: IPC message with 'pane' field

        Returns:
            Response dict with 'ok' status and 'env' value
        """
        pane_id = msg.get("pane")

        if not pane_id:
            return {"ok": False, "error": "pane ID required"}

        env_name = self.state["panes"].get(pane_id)
        return {"ok": True, "env": env_name}

    def _handle_clear_pane_env(self, msg: dict) -> dict:
        """
        Handle clear_pane_env IPC command.

        Clears environment mapping for specific pane and emits env_change event.

        Args:
            msg: IPC message with 'pane' field

        Returns:
            Response dict with 'ok' status
        """
        pane_id = msg.get("pane")

        if not pane_id:
            return {"ok": False, "error": "pane ID required"}

        old_env = self.state["panes"].get(pane_id)

        if pane_id in self.state["panes"]:
            del self.state["panes"][pane_id]
            self._save_state()
            logger.info("Cleared pane %s environment", pane_id)

            # Emit post-action event
            self.bus.emit("env_change", new_env=None, old_env=old_env, pane=pane_id)

        return {"ok": True}

    def _handle_notify(self, msg: dict) -> dict:
        """
        Handle notify IPC command.

        Checks notify permission for spoke, then queues notification.

        Args:
            msg: IPC message with 'spoke', 'title', 'body', 'level' fields

        Returns:
            Response dict with 'ok' status
        """
        from . import permissions

        spoke_name = msg.get("spoke")
        title = msg.get("title", "")
        body = msg.get("body", "")
        level = msg.get("level", "info")

        if not spoke_name:
            return {"ok": False, "error": "spoke name required"}

        # Check permission
        spoke_perms = self.permissions.get(spoke_name)
        if not spoke_perms:
            permissions.log_security(spoke_name, "notify", False, title, False)
            return {"ok": False, "error": "spoke not loaded"}

        allowed = permissions.check_permission(
            spoke_name, "notify", spoke_perms, detail=title
        )

        if not allowed:
            return {"ok": False, "error": "permission denied: notify"}

        # Queue notification
        notification = {
            "spoke": spoke_name,
            "title": title,
            "body": body,
            "level": level,
            "timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        }
        self.notification_queue.append(notification)

        logger.info(
            "Notification queued: spoke=%s title='%s'",
            spoke_name,
            title[:50],
        )

        return {"ok": True}

    def _handle_daemon_exec(self, msg: dict) -> dict:
        """
        Handle daemon_exec IPC command.

        Checks exec permission for spoke, then runs command in background subprocess.

        Args:
            msg: IPC message with 'spoke', 'command', 'mode' fields

        Returns:
            Response dict with 'ok' status and 'pid' (if successful)
        """
        import subprocess

        from . import permissions

        spoke_name = msg.get("spoke")
        cmd = msg.get("command", "")
        mode = msg.get("mode", "background")

        if not spoke_name:
            return {"ok": False, "error": "spoke name required"}

        if not cmd:
            return {"ok": False, "error": "command required"}

        # Phase 1: only background mode supported
        if mode != "background":
            return {
                "ok": False,
                "error": f"unsupported mode: {mode} (only 'background' supported)",
            }

        # Check permission
        spoke_perms = self.permissions.get(spoke_name)
        if not spoke_perms:
            permissions.log_security(spoke_name, "daemon_exec", False, cmd, False)
            return {"ok": False, "error": "spoke not loaded"}

        allowed = permissions.check_permission(
            spoke_name, "exec", spoke_perms, detail=cmd
        )

        if not allowed:
            return {"ok": False, "error": "permission denied: exec"}

        # Run command in background (no TTY, non-interactive)
        try:
            proc = subprocess.Popen(
                cmd,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                stdin=subprocess.DEVNULL,
                start_new_session=True,  # Detach from daemon
            )

            logger.info(
                "Background command started: spoke=%s pid=%d cmd='%s'",
                spoke_name,
                proc.pid,
                cmd[:100],
            )

            return {"ok": True, "pid": proc.pid}

        except Exception as e:
            logger.error(
                "Failed to execute command: spoke=%s cmd='%s' error=%s",
                spoke_name,
                cmd[:100],
                e,
            )
            return {"ok": False, "error": str(e)}

    def _handle_notify_drain(self, msg: dict) -> dict:
        """
        Handle notify_drain IPC command.

        Returns and clears the notification queue.

        Args:
            msg: IPC message (no parameters needed)

        Returns:
            Response dict with 'ok' status and 'notifications' list
        """
        notifications = self.notification_queue.copy()
        self.notification_queue.clear()

        logger.debug("Drained %d notifications", len(notifications))

        return {"ok": True, "notifications": notifications}

    def _handle_get_permissions(self, msg: dict) -> dict:
        """
        Handle get_permissions IPC command.

        Returns effective permissions for a spoke.

        Args:
            msg: IPC message with 'spoke' field

        Returns:
            Response dict with 'ok' status and 'permissions' dict
        """
        spoke_name = msg.get("spoke")

        if not spoke_name:
            return {"ok": False, "error": "spoke name required"}

        spoke_perms = self.permissions.get(spoke_name)

        if not spoke_perms:
            return {"ok": False, "error": "spoke not loaded"}

        # Convert to dict with source info
        perms_dict = spoke_perms.to_dict()
        sources = {field: spoke_perms.get_source(field) for field in perms_dict.keys()}

        return {
            "ok": True,
            "permissions": perms_dict,
            "sources": sources,
        }

    def _handle_load_spoke_permissions(self, msg: dict) -> dict:
        """
        Handle load_spoke_permissions IPC command.

        Loads permissions for a spoke from its spoke.yaml and merges with user overrides.

        Args:
            msg: IPC message with 'spoke', 'spoke_yaml_path' fields

        Returns:
            Response dict with 'ok' status
        """
        from pathlib import Path

        from . import permissions

        spoke_name = msg.get("spoke")
        spoke_yaml_path_str = msg.get("spoke_yaml_path")

        if not spoke_name:
            return {"ok": False, "error": "spoke name required"}

        if not spoke_yaml_path_str:
            return {"ok": False, "error": "spoke_yaml_path required"}

        try:
            spoke_yaml_path = Path(spoke_yaml_path_str)
            effective_perms = permissions.get_effective_permissions(
                spoke_name, spoke_yaml_path
            )
            self.permissions[spoke_name] = effective_perms

            logger.info(
                "Loaded permissions: spoke=%s exec=%s notify=%s fs_read=%d",
                spoke_name,
                effective_perms.exec,
                effective_perms.notify,
                len(effective_perms.fs_read),
            )

            return {"ok": True}

        except Exception as e:
            logger.error("Failed to load permissions for spoke %s: %s", spoke_name, e)
            return {"ok": False, "error": str(e)}

    def _handle_get_config(self, msg: dict) -> dict:
        """
        Handle get_config IPC command.

        Loads config for a spoke and optionally returns a specific value by key path.
        Uses the centralized config system with caching.

        Args:
            msg: IPC message with:
                - 'spoke': Spoke name (required)
                - 'key': Optional dot-notation key path (e.g., "check.path")
                - 'default_filename': Optional config filename (defaults to <spoke>.yaml)

        Returns:
            Response dict with:
                - 'ok': True if successful
                - 'config': Full config dict OR specific value if key provided
                - 'source': 'cache' or 'loaded'
                - 'error': Error message if failed

        Example:
            >>> # Full config
            >>> _handle_get_config({"cmd": "get_config", "spoke": "creds"})
            {"ok": True, "config": {...}, "source": "cache"}

            >>> # Specific key
            >>> _handle_get_config({"cmd": "get_config", "spoke": "creds", "key": "check.path"})
            {"ok": True, "config": "~/.aws/credentials", "source": "cache"}
        """
        from . import config

        spoke_name = msg.get("spoke")
        key_path = msg.get("key")
        default_filename = msg.get("default_filename", f"{spoke_name}.yaml")

        if not spoke_name:
            return {"ok": False, "error": "spoke name required"}

        try:
            # Check if config is cached
            cache_keys = [k for k in config._config_cache.keys() if k[0] == spoke_name]
            was_cached = len(cache_keys) > 0

            # Load config (will use cache if available)
            spoke_config = config.load_spoke_config(
                spoke_name, default_filename, env_aware=True
            )

            logger.debug(
                "Config request: spoke=%s key=%s cached=%s",
                spoke_name,
                key_path or "(full)",
                was_cached,
            )

            # If key path provided, extract specific value
            if key_path:
                value = config.get_config_value_by_path(
                    spoke_config, key_path, default=None
                )

                if value is None:
                    logger.debug(
                        "Config key not found: spoke=%s key=%s", spoke_name, key_path
                    )
                    return {
                        "ok": False,
                        "error": f"key '{key_path}' not found in config",
                    }

                return {
                    "ok": True,
                    "config": value,
                    "source": "cache" if was_cached else "loaded",
                }

            # Return full config
            return {
                "ok": True,
                "config": spoke_config,
                "source": "cache" if was_cached else "loaded",
            }

        except FileNotFoundError as e:
            logger.debug("Config file not found: spoke=%s error=%s", spoke_name, e)
            return {"ok": False, "error": f"config file not found: {e}"}

        except Exception as e:
            logger.error("Failed to load config for spoke %s: %s", spoke_name, e)
            return {"ok": False, "error": f"config error: {str(e)}"}

    def _handle_tmux_split_run(self, msg: dict) -> dict:
        """
        Handle tmux_split_run IPC command.

        Creates a tmux split pane and runs a command inside it.
        Requires 'tmux_split_run' in spoke/gear's IPC permissions.

        Args:
            msg: IPC message with 'spoke', 'command', 'height' fields

        Returns:
            Response dict with 'ok' status and 'pane_id' if successful
        """
        import subprocess

        from . import permissions

        spoke_name = msg.get("spoke")
        command = msg.get("command", "")
        height = msg.get("height", 20)

        if not spoke_name or not command:
            return {"ok": False, "error": "spoke and command required"}

        if not (1 <= height <= 99):
            return {"ok": False, "error": "height must be 1-99"}

        # Permission check
        spoke_perms = self.permissions.get(spoke_name)
        if not spoke_perms:
            permissions.log_security(
                spoke_name, "ipc:tmux_split_run", False, command, False
            )
            return {"ok": False, "error": "spoke not loaded"}

        if not permissions.check_ipc_permission(
            spoke_name, "tmux_split_run", spoke_perms, command
        ):
            return {"ok": False, "error": "permission denied: tmux_split_run"}

        # Execute tmux split
        try:
            result = subprocess.run(
                [
                    "tmux",
                    "split-window",
                    "-v",
                    "-l",
                    f"{height}%",
                    "-P",
                    "-F",
                    "#{pane_id}",
                    command,
                ],
                capture_output=True,
                text=True,
                timeout=5,
                check=True,
            )
            pane_id = result.stdout.strip()
            logger.info(
                "Tmux split created: spoke=%s pane=%s cmd='%s'",
                spoke_name,
                pane_id,
                command[:100],
            )
            return {"ok": True, "pane_id": pane_id}

        except subprocess.TimeoutExpired:
            logger.error("Tmux split timeout: spoke=%s", spoke_name)
            return {"ok": False, "error": "tmux command timeout"}
        except subprocess.CalledProcessError as e:
            logger.error(
                "Tmux split failed: spoke=%s stderr='%s'", spoke_name, e.stderr
            )
            return {"ok": False, "error": f"tmux error: {e.stderr}"}
        except Exception as e:
            logger.error("Tmux split exception: spoke=%s error=%s", spoke_name, e)
            return {"ok": False, "error": str(e)}

    def _handle_tmux_send_keys(self, msg: dict) -> dict:
        """Handle tmux_send_keys IPC command."""
        import subprocess

        from . import permissions

        spoke_name = msg.get("spoke")
        pane_id = msg.get("pane_id", "")
        keys = msg.get("keys", "")

        if not all([spoke_name, pane_id, keys]):
            return {"ok": False, "error": "spoke, pane_id, and keys required"}

        # Permission check
        spoke_perms = self.permissions.get(spoke_name)
        if not spoke_perms:
            return {"ok": False, "error": "spoke not loaded"}

        if not permissions.check_ipc_permission(
            spoke_name, "tmux_send_keys", spoke_perms
        ):
            return {"ok": False, "error": "permission denied: tmux_send_keys"}

        try:
            subprocess.run(
                ["tmux", "send-keys", "-t", pane_id, keys],
                check=True,
                timeout=2,
            )
            logger.debug("Sent keys to pane: spoke=%s pane=%s", spoke_name, pane_id)
            return {"ok": True}
        except Exception as e:
            logger.error("tmux_send_keys failed: %s", e)
            return {"ok": False, "error": str(e)}

    def _handle_tmux_capture_pane(self, msg: dict) -> dict:
        """Handle tmux_capture_pane IPC command."""
        import subprocess

        from . import permissions

        spoke_name = msg.get("spoke")
        pane_id = msg.get("pane_id", "")

        if not spoke_name or not pane_id:
            return {"ok": False, "error": "spoke and pane_id required"}

        # Permission check
        spoke_perms = self.permissions.get(spoke_name)
        if not spoke_perms:
            return {"ok": False, "error": "spoke not loaded"}

        if not permissions.check_ipc_permission(
            spoke_name, "tmux_capture_pane", spoke_perms
        ):
            return {"ok": False, "error": "permission denied: tmux_capture_pane"}

        try:
            result = subprocess.run(
                ["tmux", "capture-pane", "-t", pane_id, "-p"],
                capture_output=True,
                text=True,
                check=True,
                timeout=2,
            )
            logger.debug(
                "Captured pane: spoke=%s pane=%s size=%d",
                spoke_name,
                pane_id,
                len(result.stdout),
            )
            return {"ok": True, "content": result.stdout}
        except Exception as e:
            logger.error("tmux_capture_pane failed: %s", e)
            return {"ok": False, "error": str(e)}

    def _handle_read_file(self, msg: dict) -> dict:
        """
        Handle read_file IPC command.

        Reads file with permission checking against fs_read patterns.
        """
        from pathlib import Path

        from . import permissions

        spoke_name = msg.get("spoke")
        path_str = msg.get("path", "")

        if not spoke_name or not path_str:
            return {"ok": False, "error": "spoke and path required"}

        # Permission check
        spoke_perms = self.permissions.get(spoke_name)
        if not spoke_perms:
            return {"ok": False, "error": "spoke not loaded"}

        if not permissions.check_permission(
            spoke_name, f"fs_read:{path_str}", spoke_perms
        ):
            return {"ok": False, "error": "permission denied: fs_read"}

        # Read file
        try:
            path = Path(path_str).expanduser().resolve()
            if not path.exists():
                return {"ok": False, "error": "file not found"}

            # Size check (max 10MB)
            if path.stat().st_size > 10 * 1024 * 1024:
                return {"ok": False, "error": "file too large (max 10MB)"}

            content = path.read_text()
            logger.debug(
                "File read: spoke=%s path=%s size=%d", spoke_name, path, len(content)
            )
            return {"ok": True, "content": content}

        except Exception as e:
            logger.error(
                "File read failed: spoke=%s path=%s error=%s", spoke_name, path_str, e
            )
            return {"ok": False, "error": str(e)}

    def _handle_write_file(self, msg: dict) -> dict:
        """
        Handle write_file IPC command.

        Writes file with permission checking against fs_write patterns.
        """
        from pathlib import Path

        from . import permissions

        spoke_name = msg.get("spoke")
        path_str = msg.get("path", "")
        content = msg.get("content", "")

        if not spoke_name or not path_str:
            return {"ok": False, "error": "spoke and path required"}

        # Permission check
        spoke_perms = self.permissions.get(spoke_name)
        if not spoke_perms:
            return {"ok": False, "error": "spoke not loaded"}

        if not permissions.check_permission(
            spoke_name, f"fs_write:{path_str}", spoke_perms
        ):
            return {"ok": False, "error": "permission denied: fs_write"}

        # Write file
        try:
            path = Path(path_str).expanduser().resolve()
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_text(content)
            logger.info(
                "File written: spoke=%s path=%s size=%d", spoke_name, path, len(content)
            )
            return {"ok": True}

        except Exception as e:
            logger.error(
                "File write failed: spoke=%s path=%s error=%s", spoke_name, path_str, e
            )
            return {"ok": False, "error": str(e)}

    def _handle_write_log(self, msg: dict) -> dict:
        """Handle write_log IPC command."""
        from datetime import datetime, timezone
        from pathlib import Path

        spoke_name = msg.get("spoke")
        message = msg.get("message", "")
        level = msg.get("level", "info").upper()

        if not spoke_name or not message:
            return {"ok": False, "error": "spoke and message required"}

        try:
            log_dir = Path.home() / ".config" / "axium" / "logs"
            log_dir.mkdir(parents=True, exist_ok=True)

            log_file = log_dir / f"{spoke_name}.log"
            timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
            log_line = f"[{timestamp}] [{level}] {message}\n"

            with log_file.open("a") as f:
                f.write(log_line)

            logger.debug("Log written: spoke=%s level=%s", spoke_name, level)
            return {"ok": True}

        except Exception as e:
            logger.error("write_log failed: spoke=%s error=%s", spoke_name, e)
            return {"ok": False, "error": str(e)}

    def _handle_register_prefix(self, msg: dict) -> dict:
        """
        Handle register_prefix IPC command.

        Registers prefix rule with conflict detection.
        """
        from . import prefix

        spoke_name = msg.get("spoke")
        command = msg.get("command")
        wrapper = msg.get("wrapper")

        if not all([spoke_name, command, wrapper]):
            return {"ok": False, "error": "spoke, command, and wrapper required"}

        # Check for existing owner
        existing_owner = prefix.get_rule_owner(command)
        if existing_owner and existing_owner != spoke_name:
            logger.warning(
                "Prefix conflict: %s tried to register '%s' but owned by %s",
                spoke_name,
                command,
                existing_owner,
            )
            return {
                "ok": False,
                "error": f"command already registered by {existing_owner}",
                "conflict": True,
            }

        # Register
        try:
            success = prefix.register_prefix_rule(command, wrapper, spoke_name)
            if success:
                # Regenerate state cache
                self._write_state_cache()
                logger.info(
                    "Prefix registered: spoke=%s command=%s wrapper=%s",
                    spoke_name,
                    command,
                    wrapper,
                )
                return {"ok": True}
            else:
                return {"ok": False, "error": "registration failed"}

        except Exception as e:
            logger.error("Prefix registration failed: spoke=%s error=%s", spoke_name, e)
            return {"ok": False, "error": str(e)}

    def _handle_get_hud_segments(self, msg: dict) -> dict:
        """
        Handle get_hud_segments IPC command.

        Returns list of all registered HUD segments with their metadata.

        Returns:
            {"ok": True, "segments": [{"name": "...", "priority": 10, "spoke": "..."}]}
        """
        try:
            from .hud import get_registry

            registry = get_registry()
            segments = []

            for segment in registry.segments:
                segments.append(
                    {
                        "name": segment.name,
                        "priority": segment.priority,
                        "spoke": getattr(segment, "spoke", None) or "core",
                    }
                )

            # Sort by priority
            segments.sort(key=lambda x: x["priority"])

            return {"ok": True, "segments": segments}

        except Exception as e:
            logger.error("get_hud_segments failed: error=%s", e)
            return {"ok": False, "error": str(e)}

    async def handle_client(
        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
    ) -> None:
        """
        Handle an IPC request from a client.

        Reads JSON request from socket, processes command, sends JSON response.
        Each command is handled synchronously but multiple clients can connect
        concurrently.

        Args:
            reader: asyncio StreamReader for reading request
            writer: asyncio StreamWriter for sending response

        IPC Request Format:
            ```json
            {"cmd": "command_name", "arg1": "value", ...}
            ```

        IPC Response Format:
            ```json
            {"ok": true, "result": ...}
            // or
            {"ok": false, "error": "message"}
            ```

        Supported Commands:
            ```json
            ping: {"cmd": "ping"} → {"ok": true, "pong": true}

            get_state: {"cmd": "get_state"} → {"ok": true, "state": {...}}

            set_env: {"cmd": "set_env", "value": "prod"}
                   → {"ok": true}
                   Side effect: Emits env_change event to Spokes

            set_pane_env: {"cmd": "set_pane_env", "pane": "%1", "value": "prod"}
                        → {"ok": true}
                        Side effect: Emits env_change event with pane context

            get_hud: {"cmd": "get_hud", "pane": "%1"}
                   → {"ok": true, "hud": "[axium] pane:%1  env:prod  uptime:2h15m"}
                   Fast path: Returns pre-rendered HUD string from cache

            get_pane_env: {"cmd": "get_pane_env", "pane": "%1"}
                        → {"ok": true, "env": "prod"}

            clear_pane_env: {"cmd": "clear_pane_env", "pane": "%1"}
                          → {"ok": true}

            reload: {"cmd": "reload"} → {"ok": true, "reloaded": true}
                   Side effect: Reloads state from disk

            apply_prefixes: {"cmd": "apply_prefixes", "command": "aws",
                            "args": ["s3", "ls"], "context": {...}}
                          → {"ok": true, "command": [...], "env_vars": {...}}

            list_prefixed_commands: {"cmd": "list_prefixed_commands"}
                                  → {"ok": true, "commands": ["aws", "terraform", ...]}

            stop: {"cmd": "stop"} → {"ok": true, "stopping": True}
                 Side effect: Triggers graceful shutdown
            ```

        Note:
            Errors are caught and returned as {"ok": false, "error": "message"}
            Connection is closed after each request (no persistent connections)
        """
        try:
            raw = await reader.readline()
            if not raw:
                writer.close()
                return
            msg = json.loads(raw.decode())
            cmd = msg.get("cmd")
            logger.debug("Received IPC command: %s", cmd)

            if cmd == "ping":
                resp = {"ok": True, "pong": True}
            elif cmd == "get_state":
                resp = {"ok": True, "state": self.state}
            elif cmd == "set_env":
                new_env = msg.get("value")

                # Validate environment name before setting
                from . import env as env_module

                is_valid, error = env_module.validate_env_name(new_env)
                if not is_valid:
                    resp = {"ok": False, "error": error}
                else:
                    old_env = self.state.get("active_env")
                    self.state["active_env"] = new_env
                    self._save_state()
                    logger.info("Environment set to: %s", new_env)

                    # Invalidate all config caches (env-aware configs need refresh)
                    from . import config

                    config.invalidate_cache()
                    logger.debug("Invalidated all config caches due to env change")

                    # Emit env_change event
                    from . import spokes

                    spokes.get_event_bus().emit("env_change", new_env, old_env)

                    resp = {"ok": True}
            elif cmd == "get_hud":
                # Render HUD fresh each time (includes dynamic uptime)
                pane_id = msg.get("pane")
                if not pane_id:
                    resp = {"ok": False, "error": "pane ID required"}
                else:
                    try:
                        # Always render fresh to get current uptime
                        logger.debug("Rendering HUD for pane %s", pane_id)
                        hud_str = self._render_hud_for_pane(pane_id)
                        logger.debug("HUD rendered: %s", hud_str)
                        resp = {"ok": True, "hud": hud_str}
                    except Exception as e:
                        import traceback

                        logger.error(
                            "Error rendering HUD for pane %s: %s\n%s",
                            pane_id,
                            e,
                            traceback.format_exc(),
                        )
                        resp = {"ok": True, "hud": "[axium] inactive"}
            elif cmd == "set_pane_env":
                resp = self._handle_set_pane_env(msg)
            elif cmd == "get_pane_env":
                resp = self._handle_get_pane_env(msg)
            elif cmd == "clear_pane_env":
                resp = self._handle_clear_pane_env(msg)
            elif cmd == "reload":
                logger.info("Reload command received")
                # Reload state from disk
                self._load_state()
                # Reload HUD configuration
                self._load_hud_config()
                # Reload prefix configuration
                prefix.reload_config()
                # Reload all spoke configurations
                from . import config

                config.reload_all_configs()
                # Regenerate state cache in case prefix rules changed
                self._write_state_cache()
                # Refresh HUD cache for all panes
                self._refresh_all_hud_caches()
                # Emit post-reload events
                self.bus.emit("hud_refresh")
                self.bus.emit("daemon_reload")
                self.bus.emit("config_reloaded")
                resp = {"ok": True, "reloaded": True}
            elif cmd == "apply_prefixes":
                # Apply prefix rules to a command
                command = msg.get("command")
                args = msg.get("args", [])
                context = msg.get("context", {})

                try:
                    final_cmd, env_vars = prefix.apply_prefixes(command, args, context)
                    resp = {
                        "ok": True,
                        "command": final_cmd,
                        "env_vars": env_vars,
                    }
                except Exception as e:
                    logger.error("Failed to apply prefixes: %s", e)
                    resp = {"ok": False, "error": str(e)}
            elif cmd == "list_prefixed_commands":
                # List all commands that have prefix rules
                try:
                    commands = prefix.get_prefixed_commands()
                    resp = {"ok": True, "commands": commands}
                except Exception as e:
                    logger.error("Failed to list prefixed commands: %s", e)
                    resp = {"ok": False, "error": str(e)}
            elif cmd == "notify":
                resp = self._handle_notify(msg)
            elif cmd == "daemon_exec":
                resp = self._handle_daemon_exec(msg)
            elif cmd == "notify_drain":
                resp = self._handle_notify_drain(msg)
            elif cmd == "get_permissions":
                resp = self._handle_get_permissions(msg)
            elif cmd == "load_spoke_permissions":
                resp = self._handle_load_spoke_permissions(msg)
            elif cmd == "get_config":
                resp = self._handle_get_config(msg)
            elif cmd == "tmux_split_run":
                resp = self._handle_tmux_split_run(msg)
            elif cmd == "tmux_send_keys":
                resp = self._handle_tmux_send_keys(msg)
            elif cmd == "tmux_capture_pane":
                resp = self._handle_tmux_capture_pane(msg)
            elif cmd == "read_file":
                resp = self._handle_read_file(msg)
            elif cmd == "write_file":
                resp = self._handle_write_file(msg)
            elif cmd == "write_log":
                resp = self._handle_write_log(msg)
            elif cmd == "register_prefix":
                resp = self._handle_register_prefix(msg)
            elif cmd == "get_hud_segments":
                resp = self._handle_get_hud_segments(msg)
            elif cmd == "daemon_status":
                # Return daemon status information
                uptime_seconds = int(time.time() - self._start_time)
                # Format uptime as "Xh Ym Zs"
                hours = uptime_seconds // 3600
                minutes = (uptime_seconds % 3600) // 60
                seconds = uptime_seconds % 60

                parts = []
                if hours > 0:
                    parts.append(f"{hours}h")
                if minutes > 0 or hours > 0:  # Show minutes if we have hours
                    parts.append(f"{minutes}m")
                parts.append(f"{seconds}s")
                uptime_str = " ".join(parts)

                resp = {
                    "ok": True,
                    "status": {
                        "running": True,
                        "uptime": uptime_str,
                        "active_env": self.state.get("active_env"),
                        "panes": len(self.state.get("panes", {})),
                    },
                }
            elif cmd == "reload_spoke":
                # Reload a specific spoke - daemon performs the action
                spoke_name = msg.get("spoke")
                if not spoke_name:
                    resp = {"ok": False, "error": "spoke name required"}
                else:
                    try:
                        # PERFORM THE ACTION (don't just emit event)
                        from axium.core.spokes import reload_spokes

                        reloaded = reload_spokes(spoke_name)

                        if reloaded:
                            # THEN emit post-action notification
                            self.bus.emit("spoke_reloaded", spoke_name=spoke_name)
                            resp = {"ok": True, "spoke": spoke_name}
                        else:
                            resp = {
                                "ok": False,
                                "error": f"failed to reload spoke: {spoke_name}",
                            }
                    except Exception as e:
                        logger.error("Spoke reload failed: %s", e)
                        resp = {"ok": False, "error": str(e)}
            elif cmd == "reload_spokes":
                # Reload all spokes - daemon performs the action
                try:
                    from axium.core.spokes import reload_spokes

                    reloaded_list = reload_spokes()  # Reload all

                    # Emit post-action event for EACH reloaded spoke
                    for spoke_name in reloaded_list:
                        self.bus.emit("spoke_reloaded", spoke_name=spoke_name)

                    resp = {"ok": True, "spokes": reloaded_list}
                except Exception as e:
                    logger.error("Spokes reload failed: %s", e)
                    resp = {"ok": False, "error": str(e), "spokes": []}
            elif cmd == "stop":
                logger.info("Stop command received")
                resp = {"ok": True, "stopping": True}
                writer.write((json.dumps(resp) + "\n").encode())
                await writer.drain()
                writer.close()
                await asyncio.sleep(0.05)
                self._stop.set()
                return
            else:
                logger.warning("Unknown command received: %s", cmd)
                resp = {"ok": False, "error": "unknown command"}
            writer.write((json.dumps(resp) + "\n").encode())
            await writer.drain()
            writer.close()
        except Exception as e:
            logger.error("Error handling IPC request: %s", e, exc_info=True)
            try:
                writer.write(
                    (json.dumps({"ok": False, "error": str(e)}) + "\n").encode()
                )
                await writer.drain()
                writer.close()
            except Exception:
                pass

    async def _periodic_segment_update(self) -> None:
        """
        Periodically update cached HUD segments.

        Runs every 5 minutes to refresh expensive segments like credential checks.
        This ensures segments stay up-to-date even without explicit events.
        """
        while not self._stop.is_set():
            try:
                await asyncio.sleep(300)  # 5 minutes
                if self._stop.is_set():
                    break

                # Update all cached segments
                from .hud import get_registry

                registry = get_registry()
                env_name = self.state.get("env", "-")
                context = {"env": env_name, "state": self.state}
                registry.update_cached_segments(context)
                logger.debug("Periodic cached segment update complete")

            except Exception as e:
                logger.error("Error in periodic segment update: %s", e)

    async def run(self) -> None:
        """
        Run the daemon event loop.

        Starts UNIX socket server and waits for shutdown signal.
        Cleans up socket file on exit.

        The server accepts connections on /tmp/axiumd.sock and handles
        each client in a separate task via handle_client().

        Blocks until _stop event is set (via stop command or signal).

        Side Effects:
            - Creates UNIX socket at /tmp/axiumd.sock
            - Removes existing socket if present
            - Cleans up socket on exit
            - Starts periodic cached segment updates every 5 minutes

        Note:
            This should be run via asyncio.run() in the main process.
        """
        if SOCKET_PATH.exists():
            try:
                SOCKET_PATH.unlink()
                logger.debug("Removed existing socket at %s", SOCKET_PATH)
            except FileNotFoundError:
                pass
        logger.info("Starting Unix socket server at %s", SOCKET_PATH)
        self.server = await asyncio.start_unix_server(
            self.handle_client, path=str(SOCKET_PATH)
        )

        # Start periodic segment update task
        update_task = asyncio.create_task(self._periodic_segment_update())

        try:
            async with self.server:
                logger.info("Daemon ready, waiting for connections")
                await self._stop.wait()
        finally:
            # Cancel periodic task
            update_task.cancel()
            try:
                await update_task
            except asyncio.CancelledError:
                pass

        logger.info("Cleaning up socket")
        try:
            SOCKET_PATH.unlink()
        except FileNotFoundError:
            pass

__init__()

Initialize daemon with default state.

Loads persistent state from state.json if it exists, otherwise initializes with defaults. Creates empty event registry.

Source code in axium/core/daemon.py
def __init__(self):
    """
    Initialize daemon with default state.

    Loads persistent state from state.json if it exists, otherwise
    initializes with defaults. Creates empty event registry.
    """
    # Runtime-only state (not persisted)
    self.state = {
        "active_env": None,
        "panes": {},  # Per-pane environment mapping: {"%1": "root", "%2": "builder"}
        "hud_cache": {},  # Pre-rendered HUD strings: {"%1": "[axium] pane:%1 ..."}
        "tmux_pane": os.getenv("TMUX_PANE"),
        "started": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
    }
    self._start_time = time.time()  # Unix timestamp for fast uptime calculations
    self.permissions = (
        {}
    )  # Effective permissions per spoke: {spoke_name: SpokePermissions}
    self.notification_queue = (
        []
    )  # Queued notifications: [{"title": ..., "body": ..., "spoke": ...}]
    self.hud_config = {}  # HUD layout configuration from hud.yaml
    self.server = None
    self._stop = asyncio.Event()

    # Initialize EventBus for spoke coordination
    from axium.core.spokes import get_event_bus

    self.bus = get_event_bus()

    # Set up completion cache event listeners
    self.bus.on("spoke_loaded", self._on_spoke_loaded)
    self.bus.on("spoke_reloaded", self._on_spoke_reloaded)
    self.bus.on("spoke_unloaded", self._on_spoke_unloaded)
    self.bus.on("gear_loaded", self._on_gear_loaded)
    self.bus.on("gear_unloaded", self._on_gear_unloaded)
    self.bus.on("daemon_reload", self._on_daemon_reload)

    self._load_state()
    self._load_hud_config()  # Load HUD layout from hud.yaml
    self._write_state_cache()
    self._refresh_all_hud_caches()  # Pre-render HUD for all panes

    # Load gears for daemon IPC operations
    # Note: Gears are also loaded in CLI for command registration
    # This loads them in daemon context for IPC permission enforcement
    self._load_gears_for_daemon()

    # Generate initial completion cache
    self._regenerate_completions("daemon_init")

handle_client(reader, writer) async

Handle an IPC request from a client.

Reads JSON request from socket, processes command, sends JSON response. Each command is handled synchronously but multiple clients can connect concurrently.

Parameters:

Name Type Description Default
reader StreamReader

asyncio StreamReader for reading request

required
writer StreamWriter

asyncio StreamWriter for sending response

required
IPC Request Format
{"cmd": "command_name", "arg1": "value", ...}
IPC Response Format
{"ok": true, "result": ...}
// or
{"ok": false, "error": "message"}
Supported Commands
ping: {"cmd": "ping"}  {"ok": true, "pong": true}

get_state: {"cmd": "get_state"}  {"ok": true, "state": {...}}

set_env: {"cmd": "set_env", "value": "prod"}
        {"ok": true}
       Side effect: Emits env_change event to Spokes

set_pane_env: {"cmd": "set_pane_env", "pane": "%1", "value": "prod"}
             {"ok": true}
            Side effect: Emits env_change event with pane context

get_hud: {"cmd": "get_hud", "pane": "%1"}
        {"ok": true, "hud": "[axium] pane:%1  env:prod  uptime:2h15m"}
       Fast path: Returns pre-rendered HUD string from cache

get_pane_env: {"cmd": "get_pane_env", "pane": "%1"}
             {"ok": true, "env": "prod"}

clear_pane_env: {"cmd": "clear_pane_env", "pane": "%1"}
               {"ok": true}

reload: {"cmd": "reload"}  {"ok": true, "reloaded": true}
       Side effect: Reloads state from disk

apply_prefixes: {"cmd": "apply_prefixes", "command": "aws",
                "args": ["s3", "ls"], "context": {...}}
               {"ok": true, "command": [...], "env_vars": {...}}

list_prefixed_commands: {"cmd": "list_prefixed_commands"}
                       {"ok": true, "commands": ["aws", "terraform", ...]}

stop: {"cmd": "stop"}  {"ok": true, "stopping": True}
     Side effect: Triggers graceful shutdown
Note

Errors are caught and returned as {"ok": false, "error": "message"} Connection is closed after each request (no persistent connections)

Source code in axium/core/daemon.py
async def handle_client(
    self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
    """
    Handle an IPC request from a client.

    Reads JSON request from socket, processes command, sends JSON response.
    Each command is handled synchronously but multiple clients can connect
    concurrently.

    Args:
        reader: asyncio StreamReader for reading request
        writer: asyncio StreamWriter for sending response

    IPC Request Format:
        ```json
        {"cmd": "command_name", "arg1": "value", ...}
        ```

    IPC Response Format:
        ```json
        {"ok": true, "result": ...}
        // or
        {"ok": false, "error": "message"}
        ```

    Supported Commands:
        ```json
        ping: {"cmd": "ping"} → {"ok": true, "pong": true}

        get_state: {"cmd": "get_state"} → {"ok": true, "state": {...}}

        set_env: {"cmd": "set_env", "value": "prod"}
               → {"ok": true}
               Side effect: Emits env_change event to Spokes

        set_pane_env: {"cmd": "set_pane_env", "pane": "%1", "value": "prod"}
                    → {"ok": true}
                    Side effect: Emits env_change event with pane context

        get_hud: {"cmd": "get_hud", "pane": "%1"}
               → {"ok": true, "hud": "[axium] pane:%1  env:prod  uptime:2h15m"}
               Fast path: Returns pre-rendered HUD string from cache

        get_pane_env: {"cmd": "get_pane_env", "pane": "%1"}
                    → {"ok": true, "env": "prod"}

        clear_pane_env: {"cmd": "clear_pane_env", "pane": "%1"}
                      → {"ok": true}

        reload: {"cmd": "reload"} → {"ok": true, "reloaded": true}
               Side effect: Reloads state from disk

        apply_prefixes: {"cmd": "apply_prefixes", "command": "aws",
                        "args": ["s3", "ls"], "context": {...}}
                      → {"ok": true, "command": [...], "env_vars": {...}}

        list_prefixed_commands: {"cmd": "list_prefixed_commands"}
                              → {"ok": true, "commands": ["aws", "terraform", ...]}

        stop: {"cmd": "stop"} → {"ok": true, "stopping": True}
             Side effect: Triggers graceful shutdown
        ```

    Note:
        Errors are caught and returned as {"ok": false, "error": "message"}
        Connection is closed after each request (no persistent connections)
    """
    try:
        raw = await reader.readline()
        if not raw:
            writer.close()
            return
        msg = json.loads(raw.decode())
        cmd = msg.get("cmd")
        logger.debug("Received IPC command: %s", cmd)

        if cmd == "ping":
            resp = {"ok": True, "pong": True}
        elif cmd == "get_state":
            resp = {"ok": True, "state": self.state}
        elif cmd == "set_env":
            new_env = msg.get("value")

            # Validate environment name before setting
            from . import env as env_module

            is_valid, error = env_module.validate_env_name(new_env)
            if not is_valid:
                resp = {"ok": False, "error": error}
            else:
                old_env = self.state.get("active_env")
                self.state["active_env"] = new_env
                self._save_state()
                logger.info("Environment set to: %s", new_env)

                # Invalidate all config caches (env-aware configs need refresh)
                from . import config

                config.invalidate_cache()
                logger.debug("Invalidated all config caches due to env change")

                # Emit env_change event
                from . import spokes

                spokes.get_event_bus().emit("env_change", new_env, old_env)

                resp = {"ok": True}
        elif cmd == "get_hud":
            # Render HUD fresh each time (includes dynamic uptime)
            pane_id = msg.get("pane")
            if not pane_id:
                resp = {"ok": False, "error": "pane ID required"}
            else:
                try:
                    # Always render fresh to get current uptime
                    logger.debug("Rendering HUD for pane %s", pane_id)
                    hud_str = self._render_hud_for_pane(pane_id)
                    logger.debug("HUD rendered: %s", hud_str)
                    resp = {"ok": True, "hud": hud_str}
                except Exception as e:
                    import traceback

                    logger.error(
                        "Error rendering HUD for pane %s: %s\n%s",
                        pane_id,
                        e,
                        traceback.format_exc(),
                    )
                    resp = {"ok": True, "hud": "[axium] inactive"}
        elif cmd == "set_pane_env":
            resp = self._handle_set_pane_env(msg)
        elif cmd == "get_pane_env":
            resp = self._handle_get_pane_env(msg)
        elif cmd == "clear_pane_env":
            resp = self._handle_clear_pane_env(msg)
        elif cmd == "reload":
            logger.info("Reload command received")
            # Reload state from disk
            self._load_state()
            # Reload HUD configuration
            self._load_hud_config()
            # Reload prefix configuration
            prefix.reload_config()
            # Reload all spoke configurations
            from . import config

            config.reload_all_configs()
            # Regenerate state cache in case prefix rules changed
            self._write_state_cache()
            # Refresh HUD cache for all panes
            self._refresh_all_hud_caches()
            # Emit post-reload events
            self.bus.emit("hud_refresh")
            self.bus.emit("daemon_reload")
            self.bus.emit("config_reloaded")
            resp = {"ok": True, "reloaded": True}
        elif cmd == "apply_prefixes":
            # Apply prefix rules to a command
            command = msg.get("command")
            args = msg.get("args", [])
            context = msg.get("context", {})

            try:
                final_cmd, env_vars = prefix.apply_prefixes(command, args, context)
                resp = {
                    "ok": True,
                    "command": final_cmd,
                    "env_vars": env_vars,
                }
            except Exception as e:
                logger.error("Failed to apply prefixes: %s", e)
                resp = {"ok": False, "error": str(e)}
        elif cmd == "list_prefixed_commands":
            # List all commands that have prefix rules
            try:
                commands = prefix.get_prefixed_commands()
                resp = {"ok": True, "commands": commands}
            except Exception as e:
                logger.error("Failed to list prefixed commands: %s", e)
                resp = {"ok": False, "error": str(e)}
        elif cmd == "notify":
            resp = self._handle_notify(msg)
        elif cmd == "daemon_exec":
            resp = self._handle_daemon_exec(msg)
        elif cmd == "notify_drain":
            resp = self._handle_notify_drain(msg)
        elif cmd == "get_permissions":
            resp = self._handle_get_permissions(msg)
        elif cmd == "load_spoke_permissions":
            resp = self._handle_load_spoke_permissions(msg)
        elif cmd == "get_config":
            resp = self._handle_get_config(msg)
        elif cmd == "tmux_split_run":
            resp = self._handle_tmux_split_run(msg)
        elif cmd == "tmux_send_keys":
            resp = self._handle_tmux_send_keys(msg)
        elif cmd == "tmux_capture_pane":
            resp = self._handle_tmux_capture_pane(msg)
        elif cmd == "read_file":
            resp = self._handle_read_file(msg)
        elif cmd == "write_file":
            resp = self._handle_write_file(msg)
        elif cmd == "write_log":
            resp = self._handle_write_log(msg)
        elif cmd == "register_prefix":
            resp = self._handle_register_prefix(msg)
        elif cmd == "get_hud_segments":
            resp = self._handle_get_hud_segments(msg)
        elif cmd == "daemon_status":
            # Return daemon status information
            uptime_seconds = int(time.time() - self._start_time)
            # Format uptime as "Xh Ym Zs"
            hours = uptime_seconds // 3600
            minutes = (uptime_seconds % 3600) // 60
            seconds = uptime_seconds % 60

            parts = []
            if hours > 0:
                parts.append(f"{hours}h")
            if minutes > 0 or hours > 0:  # Show minutes if we have hours
                parts.append(f"{minutes}m")
            parts.append(f"{seconds}s")
            uptime_str = " ".join(parts)

            resp = {
                "ok": True,
                "status": {
                    "running": True,
                    "uptime": uptime_str,
                    "active_env": self.state.get("active_env"),
                    "panes": len(self.state.get("panes", {})),
                },
            }
        elif cmd == "reload_spoke":
            # Reload a specific spoke - daemon performs the action
            spoke_name = msg.get("spoke")
            if not spoke_name:
                resp = {"ok": False, "error": "spoke name required"}
            else:
                try:
                    # PERFORM THE ACTION (don't just emit event)
                    from axium.core.spokes import reload_spokes

                    reloaded = reload_spokes(spoke_name)

                    if reloaded:
                        # THEN emit post-action notification
                        self.bus.emit("spoke_reloaded", spoke_name=spoke_name)
                        resp = {"ok": True, "spoke": spoke_name}
                    else:
                        resp = {
                            "ok": False,
                            "error": f"failed to reload spoke: {spoke_name}",
                        }
                except Exception as e:
                    logger.error("Spoke reload failed: %s", e)
                    resp = {"ok": False, "error": str(e)}
        elif cmd == "reload_spokes":
            # Reload all spokes - daemon performs the action
            try:
                from axium.core.spokes import reload_spokes

                reloaded_list = reload_spokes()  # Reload all

                # Emit post-action event for EACH reloaded spoke
                for spoke_name in reloaded_list:
                    self.bus.emit("spoke_reloaded", spoke_name=spoke_name)

                resp = {"ok": True, "spokes": reloaded_list}
            except Exception as e:
                logger.error("Spokes reload failed: %s", e)
                resp = {"ok": False, "error": str(e), "spokes": []}
        elif cmd == "stop":
            logger.info("Stop command received")
            resp = {"ok": True, "stopping": True}
            writer.write((json.dumps(resp) + "\n").encode())
            await writer.drain()
            writer.close()
            await asyncio.sleep(0.05)
            self._stop.set()
            return
        else:
            logger.warning("Unknown command received: %s", cmd)
            resp = {"ok": False, "error": "unknown command"}
        writer.write((json.dumps(resp) + "\n").encode())
        await writer.drain()
        writer.close()
    except Exception as e:
        logger.error("Error handling IPC request: %s", e, exc_info=True)
        try:
            writer.write(
                (json.dumps({"ok": False, "error": str(e)}) + "\n").encode()
            )
            await writer.drain()
            writer.close()
        except Exception:
            pass

run() async

Run the daemon event loop.

Starts UNIX socket server and waits for shutdown signal. Cleans up socket file on exit.

The server accepts connections on /tmp/axiumd.sock and handles each client in a separate task via handle_client().

Blocks until _stop event is set (via stop command or signal).

Side Effects
  • Creates UNIX socket at /tmp/axiumd.sock
  • Removes existing socket if present
  • Cleans up socket on exit
  • Starts periodic cached segment updates every 5 minutes
Note

This should be run via asyncio.run() in the main process.

Source code in axium/core/daemon.py
async def run(self) -> None:
    """
    Run the daemon event loop.

    Starts UNIX socket server and waits for shutdown signal.
    Cleans up socket file on exit.

    The server accepts connections on /tmp/axiumd.sock and handles
    each client in a separate task via handle_client().

    Blocks until _stop event is set (via stop command or signal).

    Side Effects:
        - Creates UNIX socket at /tmp/axiumd.sock
        - Removes existing socket if present
        - Cleans up socket on exit
        - Starts periodic cached segment updates every 5 minutes

    Note:
        This should be run via asyncio.run() in the main process.
    """
    if SOCKET_PATH.exists():
        try:
            SOCKET_PATH.unlink()
            logger.debug("Removed existing socket at %s", SOCKET_PATH)
        except FileNotFoundError:
            pass
    logger.info("Starting Unix socket server at %s", SOCKET_PATH)
    self.server = await asyncio.start_unix_server(
        self.handle_client, path=str(SOCKET_PATH)
    )

    # Start periodic segment update task
    update_task = asyncio.create_task(self._periodic_segment_update())

    try:
        async with self.server:
            logger.info("Daemon ready, waiting for connections")
            await self._stop.wait()
    finally:
        # Cancel periodic task
        update_task.cancel()
        try:
            await update_task
        except asyncio.CancelledError:
            pass

    logger.info("Cleaning up socket")
    try:
        SOCKET_PATH.unlink()
    except FileNotFoundError:
        pass

kill_daemon_pid()

Kill daemon process by PID from file.

Reads PID from axiumd.pid and sends SIGTERM. If process doesn't exist, removes stale PID file.

Returns:

Type Description
bool

True if process was killed or didn't exist, False on error

Side Effects
  • Sends SIGTERM to daemon process
  • Removes axiumd.pid file
Source code in axium/core/daemon.py
def kill_daemon_pid() -> bool:
    """
    Kill daemon process by PID from file.

    Reads PID from axiumd.pid and sends SIGTERM. If process doesn't exist,
    removes stale PID file.

    Returns:
        True if process was killed or didn't exist, False on error

    Side Effects:
        - Sends SIGTERM to daemon process
        - Removes axiumd.pid file
    """
    pid = _read_pid()
    if not pid:
        return True

    try:
        os.kill(pid, signal.SIGTERM)
        logger.debug("Sent SIGTERM to daemon PID %d", pid)
        # Wait briefly for process to exit
        for _ in range(10):
            try:
                os.kill(pid, 0)  # Check if process exists
                time.sleep(0.1)
            except ProcessLookupError:
                break
        PID_PATH.unlink(missing_ok=True)
        return True
    except ProcessLookupError:
        # Process doesn't exist, clean up PID file
        PID_PATH.unlink(missing_ok=True)
        return True
    except Exception as e:
        logger.debug("Failed to kill daemon PID %d: %s", pid, e)
        return False

cleanup_zombie_daemons()

Clean up zombie axium daemon processes.

Finds all running "axium daemon start" processes and kills them, except for the current process and the legitimate daemon from PID file.

Returns:

Type Description
int

Number of processes killed

Note

Uses pgrep to find and terminate zombie daemon processes. Protects the legitimate daemon PID from the PID file. Skips cleanup if PYTEST_CURRENT_TEST environment variable is set.

Source code in axium/core/daemon.py
def cleanup_zombie_daemons() -> int:
    """
    Clean up zombie axium daemon processes.

    Finds all running "axium daemon start" processes and kills them,
    except for the current process and the legitimate daemon from PID file.

    Returns:
        Number of processes killed

    Note:
        Uses pgrep to find and terminate zombie daemon processes.
        Protects the legitimate daemon PID from the PID file.
        Skips cleanup if PYTEST_CURRENT_TEST environment variable is set.
    """
    import subprocess

    # Skip cleanup in test environment to avoid interfering with mocks
    if os.getenv("PYTEST_CURRENT_TEST"):
        return 0

    killed = 0

    try:
        # Get current process PID to avoid killing ourselves
        current_pid = os.getpid()

        # Read legitimate daemon PID from file - must protect this!
        daemon_pid = _read_pid()

        # Find all axium daemon processes
        result = subprocess.run(
            ["pgrep", "-f", "axium daemon start"], capture_output=True, text=True
        )

        if result.returncode == 0:
            pids = [int(p) for p in result.stdout.strip().split("\n") if p]
            for pid in pids:
                # Skip current process
                if pid == current_pid:
                    continue

                # Skip the legitimate daemon from PID file
                # This is the key fix - don't kill the running daemon!
                if daemon_pid and pid == daemon_pid:
                    logger.debug(
                        "Skipping legitimate daemon PID %d (from PID file)", pid
                    )
                    continue

                # This is an orphaned/zombie process - kill it
                try:
                    os.kill(pid, signal.SIGTERM)
                    killed += 1
                    logger.debug("Killed zombie daemon process PID %d", pid)
                except (ProcessLookupError, PermissionError):
                    pass

    except Exception as e:
        logger.debug("Failed to cleanup zombie daemons: %s", e)

    return killed

start(debug=False)

Start the Axium daemon process.

In normal mode (debug=False), performs double fork to daemonize: 1. Fork once to detach from parent 2. Create new session (setsid) 3. Fork again to prevent zombie process 4. Parent exits, child continues as daemon

In debug mode (debug=True), runs in foreground with console logging.

Parameters:

Name Type Description Default
debug bool

If True, run in foreground with logs to stdout. If False, daemonize and log to ~/.config/axium/axiumd.log.

False

Returns:

Type Description
bool

True if daemon started successfully (parent process in daemon mode).

bool

In debug mode, never returns (runs until stopped).

Side Effects
  • Calls bootstrap.ensure_axium_config() to create config directory
  • Writes PID to ~/.config/axium/axiumd.pid
  • Creates UNIX socket at /tmp/axiumd.sock
  • Sets up signal handlers (SIGTERM, SIGINT)
  • Runs until stop command or signal received
Example
# Background daemon
if start(debug=False):
    print("Daemon started successfully")

# Foreground (for development)
start(debug=True)  # Never returns
Note

Parent process returns True immediately in daemon mode. Child process runs until stopped (never returns).

Source code in axium/core/daemon.py
def start(debug: bool = False) -> bool:
    """
    Start the Axium daemon process.

    In normal mode (debug=False), performs double fork to daemonize:
    1. Fork once to detach from parent
    2. Create new session (setsid)
    3. Fork again to prevent zombie process
    4. Parent exits, child continues as daemon

    In debug mode (debug=True), runs in foreground with console logging.

    Args:
        debug: If True, run in foreground with logs to stdout.
               If False, daemonize and log to ~/.config/axium/axiumd.log.

    Returns:
        True if daemon started successfully (parent process in daemon mode).
        In debug mode, never returns (runs until stopped).

    Side Effects:
        - Calls bootstrap.ensure_axium_config() to create config directory
        - Writes PID to ~/.config/axium/axiumd.pid
        - Creates UNIX socket at /tmp/axiumd.sock
        - Sets up signal handlers (SIGTERM, SIGINT)
        - Runs until stop command or signal received

    Example:
        ```python
        # Background daemon
        if start(debug=False):
            print("Daemon started successfully")

        # Foreground (for development)
        start(debug=True)  # Never returns
        ```

    Note:
        Parent process returns True immediately in daemon mode.
        Child process runs until stopped (never returns).
    """
    # Clean up any zombie daemon processes first
    killed = cleanup_zombie_daemons()
    if killed > 0:
        logger.debug("Cleaned up %d zombie daemon process(es)", killed)

    # Ensure config directory and files exist
    from . import bootstrap

    bootstrap.ensure_axium_config()

    if not debug:
        pid = os.fork()
        if pid > 0:
            _write_pid(pid)
            return True
        os.setsid()
        pid2 = os.fork()
        if pid2 > 0:
            os._exit(0)

        # Redirect stdin, stdout, stderr to /dev/null in daemon mode
        devnull = os.open(os.devnull, os.O_RDWR)
        os.dup2(devnull, sys.stdin.fileno())
        os.dup2(devnull, sys.stdout.fileno())
        os.dup2(devnull, sys.stderr.fileno())
        if devnull > 2:
            os.close(devnull)

    _setup_logging(debug)
    logger.info("Axium daemon starting (debug=%s)", debug)

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    daemon = AxiumDaemon()

    def handle_sig(_sig, _frm):
        logger.info("Received signal %s, shutting down", _sig)
        daemon._stop.set()

    signal.signal(signal.SIGTERM, handle_sig)
    signal.signal(signal.SIGINT, handle_sig)

    try:
        logger.info("Starting daemon event loop")
        loop.run_until_complete(daemon.run())
    finally:
        logger.info("Daemon stopped")
        loop.close()
        if debug and PID_PATH.exists():
            try:
                PID_PATH.unlink()
            except Exception:
                pass

stop()

Stop the Axium daemon.

Attempts to stop daemon via IPC stop command. If that fails, reads PID from axiumd.pid and sends SIGTERM. Also cleans up any zombie daemon processes.

Returns:

Type Description
dict

dict with structure: {"ok": true, "stopping": true} - IPC stop successful {"ok": true, "signal": "SIGTERM"} - Sent SIGTERM to PID {"ok": false, "error": "..."} - Failed to stop

Example
>>> stop()
{'ok': True, 'stopping': True}
Note

Does not wait for daemon to fully exit, just sends stop signal. Check daemon status after calling to verify shutdown. Cleans up zombie processes as a side effect.

Source code in axium/core/daemon.py
def stop() -> dict:
    """
    Stop the Axium daemon.

    Attempts to stop daemon via IPC stop command. If that fails,
    reads PID from axiumd.pid and sends SIGTERM. Also cleans up
    any zombie daemon processes.

    Returns:
        dict with structure:
            {"ok": true, "stopping": true} - IPC stop successful
            {"ok": true, "signal": "SIGTERM"} - Sent SIGTERM to PID
            {"ok": false, "error": "..."} - Failed to stop

    Example:
        ```python
        >>> stop()
        {'ok': True, 'stopping': True}
        ```

    Note:
        Does not wait for daemon to fully exit, just sends stop signal.
        Check daemon status after calling to verify shutdown.
        Cleans up zombie processes as a side effect.
    """
    try:
        from .ipc import send_request_sync

        result = send_request_sync({"cmd": "stop"})
        # Clean up zombies after successful stop
        cleanup_zombie_daemons()
        return result
    except Exception as e:
        pid = _read_pid()
        if pid:
            try:
                os.kill(pid, signal.SIGTERM)
                # Clean up zombies after signal
                cleanup_zombie_daemons()
                return {"ok": True, "signal": "SIGTERM"}
            except Exception as ex:
                return {"ok": False, "error": str(ex)}
        # Clean up zombies even if stop failed
        killed = cleanup_zombie_daemons()
        if killed > 0:
            return {"ok": True, "cleaned_zombies": killed}
        return {"ok": False, "error": f"no pid ({e})"}

status()

Get daemon status information.

Returns:

Type Description
dict

dict with structure: { "pid": 12345 or None, "socket": "/tmp/axiumd.sock", "socket_exists": True or False }

Example
>>> status()
{'pid': 12345, 'socket': '/tmp/axiumd.sock', 'socket_exists': True}
Note

This only checks PID file and socket file existence. Use send_request_sync({"cmd": "ping"}) to verify daemon is responsive.

Source code in axium/core/daemon.py
def status() -> dict:
    """
    Get daemon status information.

    Returns:
        dict with structure:
            {
                "pid": 12345 or None,
                "socket": "/tmp/axiumd.sock",
                "socket_exists": True or False
            }

    Example:
        ```python
        >>> status()
        {'pid': 12345, 'socket': '/tmp/axiumd.sock', 'socket_exists': True}
        ```

    Note:
        This only checks PID file and socket file existence.
        Use send_request_sync({"cmd": "ping"}) to verify daemon is responsive.
    """
    pid = _read_pid()
    return {
        "pid": pid,
        "socket": str(SOCKET_PATH),
        "socket_exists": SOCKET_PATH.exists(),
    }

ensure_daemon_running()

Ensure daemon is running, starting it automatically if needed.

Checks if daemon is responsive via ping with retry logic. If not responsive or not running, verifies daemon is actually stopped before starting a new one.

This is called by the CLI on every command invocation to ensure the daemon is always available for IPC operations, event handling, and completion cache management.

Returns:

Type Description
bool

True if daemon is running (or successfully started), False on error

Example
>>> ensure_daemon_running()
True  # Daemon is now guaranteed to be running
Note

Uses 2-second timeout with 2 retry attempts for ping check to handle cases where daemon is busy loading spokes/gears. If ping fails after retries, checks PID file and process status before starting. This prevents accidental restarts when daemon is running but slow. If daemon start fails, logs warning but returns False (non-fatal).

Source code in axium/core/daemon.py
def ensure_daemon_running() -> bool:
    """
    Ensure daemon is running, starting it automatically if needed.

    Checks if daemon is responsive via ping with retry logic. If not responsive
    or not running, verifies daemon is actually stopped before starting a new one.

    This is called by the CLI on every command invocation to ensure the daemon
    is always available for IPC operations, event handling, and completion cache
    management.

    Returns:
        True if daemon is running (or successfully started), False on error

    Example:
        ```python
        >>> ensure_daemon_running()
        True  # Daemon is now guaranteed to be running
        ```

    Note:
        Uses 2-second timeout with 2 retry attempts for ping check to handle
        cases where daemon is busy loading spokes/gears.
        If ping fails after retries, checks PID file and process status before starting.
        This prevents accidental restarts when daemon is running but slow.
        If daemon start fails, logs warning but returns False (non-fatal).
    """
    try:
        from axium.core.ipc import send_request_sync

        # Try ping with longer timeout and retries
        # Daemon might be busy loading spokes/gears or handling requests
        for attempt in range(2):
            try:
                resp = send_request_sync({"cmd": "ping"}, timeout=2.0)
                if resp and resp.get("ok"):
                    logger.debug(
                        "Daemon is responsive (pid: %s, attempt: %d)",
                        _read_pid(),
                        attempt + 1,
                    )
                    return True
            except Exception as e:
                if attempt == 0:
                    # First attempt failed, try once more after brief pause
                    logger.debug(
                        "Ping attempt %d failed: %s, retrying...", attempt + 1, e
                    )
                    import time

                    time.sleep(0.3)  # Brief pause before retry
                    continue
                # Second attempt also failed
                logger.debug("Daemon not responsive after %d attempts", attempt + 1)

        # Ping failed after retries - verify daemon is actually stopped before starting
        pid = _read_pid()
        if pid:
            # PID file exists - check if process is alive
            try:
                os.kill(pid, 0)  # Signal 0 checks process existence without killing
                logger.debug(
                    "Daemon process exists (pid: %s) but not responsive after retries - letting it continue",
                    pid,
                )
                # Process exists but isn't responding to pings even after retries
                # This can happen during heavy initialization (loading many spokes/gears)
                # Don't start a new daemon - return True to allow operation to continue
                return True
            except ProcessLookupError:
                # Process doesn't exist - safe to start new daemon
                logger.debug(
                    "PID file exists but process is dead (pid: %s) - starting new daemon",
                    pid,
                )
                pass
            except PermissionError:
                # Can't check process (shouldn't happen for our own process)
                logger.warning(
                    "Cannot check daemon process (pid: %s) - permission denied", pid
                )
                return False

        # Daemon is definitively not running, start it
        logger.debug("Starting daemon automatically")
        success = start(debug=False)

        if success:
            # Give daemon a moment to initialize socket
            import time

            time.sleep(0.2)  # 200ms for socket creation and initialization
            logger.info("Auto-started Axium daemon")
            return True
        else:
            logger.warning("Failed to auto-start daemon")
            return False

    except Exception as e:
        logger.error("Error ensuring daemon is running: %s", e)
        return False