broker¶
The data-access layer every CLI command and the WebUI share: all agent, fleet, and message operations against SQLite live here. Read this page to embed the broker from Python or to change any persisted behavior. Like every API page, it is for contributors changing cafleet and embedders driving it from Python; CLI users find the command surface in CLI options.
Package layout¶
cafleet.broker is a package split by domain:
| Submodule | Contents |
|---|---|
broker/fleets.py |
fleet CRUD (create_fleet, list_fleets, get_fleet, delete_fleet) |
broker/agents.py |
agent registry + placement (register_agent, deregister_agent, verify_agent_fleet, …) |
broker/members.py |
member roster + activity proxies (list_members, list_members_with_activity) |
broker/messaging.py |
send_message, broadcast_message, poll_tasks, ack_task, cancel_task + inline-preview notification |
broker/queries.py |
read-only task queries (list_inbox, list_sent, list_timeline, get_task) |
broker/_shared.py |
private cross-submodule helpers and the read_session / write_session context managers |
Re-export contract¶
broker/__init__.py re-exports the full public API with __all__, so
consumers import the package and use attribute access — from cafleet
import broker then broker.send_message(...) — never a submodule
directly. The package attribute is also the supported patch seam for
tests; the single DB seam is
cafleet.broker._shared.get_sync_sessionmaker.
broker ¶
Sync SQLAlchemy data-access layer shared by the CLI and WebUI.
deregister_agent ¶
Soft-delete the agent and drop its placement.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
int
|
Agent id to deregister. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
|
Raises:
| Type | Description |
|---|---|
UsageError
|
If |
ClickException
|
If |
Source code in cafleet/src/cafleet/broker/agents.py
get_agent ¶
Return the active agent's detail (with placement) or None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
int
|
Agent id to look up. |
required |
fleet_id
|
int
|
Fleet id the agent must belong to. |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
Dict with |
dict | None
|
|
dict | None
|
and |
dict | None
|
|
Source code in cafleet/src/cafleet/broker/agents.py
get_agent_names ¶
Batch agent_id → name lookup including deregistered agents.
Source code in cafleet/src/cafleet/broker/agents.py
list_agents ¶
Return all active agents in the fleet.
Source code in cafleet/src/cafleet/broker/agents.py
list_fleet_agents ¶
Return active agents plus deregistered agents that still own tasks.
kind is derived in SQL via json_extract so the card blob never
leaves SQLite — otherwise we would materialize every row's JSON just to
compute a one-token discriminator. coalesce handles cards without a
cafleet.kind path by substituting an empty string.
Source code in cafleet/src/cafleet/broker/agents.py
register_agent ¶
register_agent(fleet_id: int, name: str, description: str, skills: list[dict] | None = None, placement: dict | None = None) -> dict
Register a new agent in the fleet and optionally create its placement.
Rejects soft-deleted fleets with a message that differs from the
"not found" case so callers can surface the right recovery hint. When
placement is supplied, the named Director must be active in the
same fleet, must not be the Administrator, and must be the fleet's root
Director (fleets.director_agent_id). A fleet has exactly one Director:
nested teams are forbidden, so every member placement's
director_agent_id equals the fleet root and a non-root value is
rejected.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id the new agent belongs to. |
required |
name
|
str
|
Short human-identifiable label. |
required |
description
|
str
|
One-sentence purpose statement. |
required |
skills
|
list[dict] | None
|
Optional list of skill dicts persisted into the agent's
|
None
|
placement
|
dict | None
|
Optional dict carrying |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
Dict with |
Raises:
| Type | Description |
|---|---|
UsageError
|
If the fleet does not exist, is soft-deleted, the
named Director is not active in the same fleet, or the placement
|
ClickException
|
If the named Director is the built-in Administrator. |
Source code in cafleet/src/cafleet/broker/agents.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | |
update_placement_pane_id ¶
Patch the agent's placement with a freshly resolved tmux pane id.
Called after split_window returns the spawned pane's id so the
placement row reflects the live pane rather than the placeholder used
during the initial INSERT.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
int
|
Agent id whose placement should be updated. |
required |
pane_id
|
str
|
New |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
The refreshed placement dict, or |
dict | None
|
affected. |
Source code in cafleet/src/cafleet/broker/agents.py
verify_agent_fleet ¶
Return True iff the agent belongs to the fleet (any status).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
int
|
Agent id to verify. |
required |
fleet_id
|
int
|
Fleet id to check membership against. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
ignored — deregistered agents still pass. |
Source code in cafleet/src/cafleet/broker/agents.py
create_fleet ¶
create_fleet(label: str | None = None, *, director_context: MultiplexerContext, coding_agent: str) -> dict
Atomically bootstrap a fleet with its root Director and Administrator.
The fleet row is written first with director_agent_id=NULL and
back-filled once the Director's agent row exists, so the column is
DB-nullable even though the post-bootstrap invariant is NOT NULL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str | None
|
Optional human-readable label for the fleet. |
None
|
director_context
|
MultiplexerContext
|
Resolved tmux pane identity for the root Director,
obtained via |
required |
coding_agent
|
str
|
Operator-declared metadata that lands in the root
Director's |
required |
Returns:
| Type | Description |
|---|---|
dict
|
A dict carrying |
dict
|
|
dict
|
Director's identity and placement metadata. |
Source code in cafleet/src/cafleet/broker/fleets.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | |
delete_fleet ¶
Soft-delete a fleet and deregister its agents, in one transaction.
Tasks are left untouched so audit history survives. Idempotent: re-running
against an already-deleted row short-circuits on the deleted_at IS NULL
guard and returns deregistered_count=0.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id to soft-delete. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dict with |
dict
|
|
Raises:
| Type | Description |
|---|---|
ClickException
|
If the fleet does not exist. |
Source code in cafleet/src/cafleet/broker/fleets.py
get_fleet ¶
Return the fleet row (including soft-deleted) or None.
The returned dict exposes deleted_at so callers can distinguish a
missing fleet from a soft-deleted one — register_agent relies on
this to reject soft-deleted fleets with a different error message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id to look up. |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
Dict with |
dict | None
|
and |
Source code in cafleet/src/cafleet/broker/fleets.py
list_fleets ¶
Return non-soft-deleted fleets with their active agent counts.
Source code in cafleet/src/cafleet/broker/fleets.py
list_members ¶
Return the fleet's active members, with placements.
A fleet has exactly one Director (the root), so every member placement's
director_agent_id equals the fleet root. Members are selected by
director_agent_id IS NOT NULL, which lists all members and excludes
the root Director's own placement (the only row with a NULL director).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id to scope the query to. |
required |
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of dicts each carrying |
list[dict]
|
|
Source code in cafleet/src/cafleet/broker/members.py
list_members_with_activity ¶
list_members plus per-member activity proxies sourced from tasks.
Scoped by fleet_id only — the flat single-Director model means every
member placement's director_agent_id equals the fleet root, so the
director_agent_id IS NOT NULL member filter applies here too (and
excludes the root Director's own placement).
last_sent / last_recv / last_ack aggregate status_timestamp
over the tasks table per agent. All three filter Task.type !=
'broadcast_summary' (mirrors poll_tasks); broadcast_summary rows
land in the broadcaster's own context with status_state='completed'
and would otherwise pollute every proxy for the broadcaster.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id to scope the query to. |
required |
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of dicts as in :func: |
list[dict]
|
|
list[dict]
|
|
list[dict]
|
and the most recent of |
list[dict]
|
when both are |
Source code in cafleet/src/cafleet/broker/members.py
ack_task ¶
Transition a task from input_required to completed for the recipient.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
int
|
Recipient agent id; must match |
required |
task_id
|
int
|
Task id to ack. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dict with |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the task does not exist or is not in
|
PermissionError
|
If |
Source code in cafleet/src/cafleet/broker/messaging.py
broadcast_message ¶
Fan out one delivery task per active non-admin peer plus a sender summary.
Administrators are excluded at the SQL layer via json_extract so the
card blob stays in the database; they are write-only identities. Every
delivery row shares the same origin_task_id (the summary's task id)
so receivers can thread back to the original broadcast.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id to scope the broadcast to. |
required |
agent_id
|
int
|
Broadcaster's agent id. |
required |
text
|
str
|
Message body delivered to every recipient. |
required |
Returns:
| Type | Description |
|---|---|
list[dict]
|
Single-element list containing a dict with |
list[dict]
|
owned by the broadcaster) and |
list[dict]
|
number of inline-preview keystrokes that landed successfully. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the sender is not active in |
Source code in cafleet/src/cafleet/broker/messaging.py
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 | |
cancel_task ¶
Transition a task from input_required to canceled for the sender.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
int
|
Sender agent id; must match |
required |
task_id
|
int
|
Task id to cancel. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dict with |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the task does not exist or is not in
|
PermissionError
|
If |
Source code in cafleet/src/cafleet/broker/messaging.py
poll_tasks ¶
Return un-acked deliveries addressed to agent_id, newest first.
Only input_required tasks are returned — once a delivery is ACKed
(completed) or canceled it no longer appears. broadcast_summary
rows are filtered out, as those belong to the broadcaster's own context
and are not deliveries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
int
|
Recipient agent id; matches |
required |
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of flat task dicts (one per row) carrying every column from the |
list[dict]
|
|
Source code in cafleet/src/cafleet/broker/messaging.py
send_message ¶
Create a unicast task addressed to to and best-effort notify it.
Persists a new Task row with type='unicast' and
status_state='input_required', then calls
_try_notify_recipient to keystroke an inline preview into the
recipient's tmux pane. Notification failure does not roll back the
insert — the message remains available via :func:poll_tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id; sender and recipient must both belong to it. |
required |
agent_id
|
int
|
Sender's agent id. |
required |
to
|
int | str
|
Recipient's agent id. Accepts a string for non-CLI callers
(WebUI, tests); it is coerced with |
required |
text
|
str
|
Message body. Truncation is render-side; the persisted row holds the full string. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dict with |
dict
|
(boolean indicating whether the inline-preview keystroke landed). |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in cafleet/src/cafleet/broker/messaging.py
claim_monitor_runtime ¶
Atomically claim the fleet's single monitor slot.
Inserts a fresh row, reclaims a stale one, or refuses a live one — all in one write transaction (SQLite's write lock serializes concurrent claims).
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
already holds it. |
Source code in cafleet/src/cafleet/broker/monitor.py
clear_monitor_runtime ¶
Ownership-checked clear — nulls pid/started_at/last_tick_at iff pid owns the slot.
A cleanly-stopped monitor leaves no residual started_at, so status/the
WebUI report a fully-stopped row. A non-owner clear matches zero rows and is
a no-op, so a self-terminating loser never wipes the winner's row on exit.
Source code in cafleet/src/cafleet/broker/monitor.py
enroll_agent ¶
Insert a monitor_config row for a pane-bound agent.
Called inside the same write transaction as the agent/placement insert (by
register_agent / create_fleet), so enrollment is atomic with
registration. Only agents with a tmux pane (the root Director and members)
are enrolled.
Source code in cafleet/src/cafleet/broker/monitor.py
get_monitor_config ¶
Return the agent's schedule, or None if not enrolled / not in fleet.
Source code in cafleet/src/cafleet/broker/monitor.py
heartbeat_monitor_runtime ¶
Ownership-checked heartbeat — rewrites last_tick_at iff pid owns the slot.
Returns False when the slot was reclaimed by another instance (the
WHERE pid=? matches zero rows), the signal the displaced monitor uses
to self-terminate without pinging.
Source code in cafleet/src/cafleet/broker/monitor.py
list_monitor_configs ¶
Return every enrolled agent's schedule in the fleet (enabled as bool).
Source code in cafleet/src/cafleet/broker/monitor.py
list_monitor_targets ¶
Per-tick scan: one row per active, enrolled agent in the fleet.
Each dict carries agent_id, name, is_director (derived from
fleets.director_agent_id), pane_id, interval_seconds,
last_ping_at, enabled (bool), and pending_count — the count of
the agent's input_required deliveries excluding broadcast_summary
rows, a correlated subquery mirroring members.py.
Source code in cafleet/src/cafleet/broker/monitor.py
monitor_is_live ¶
Return True iff the fleet currently has a live monitor holding the slot.
The advisory single-instance pre-check for monitor start (the atomic
claim_monitor_runtime is the authoritative guard). Reuses _is_live:
heartbeat freshness is authoritative, os.kill(pid, 0) corroborates.
Source code in cafleet/src/cafleet/broker/monitor.py
read_monitor_runtime ¶
Return the fleet's runtime row, or None when no monitor ever claimed it.
Source code in cafleet/src/cafleet/broker/monitor.py
record_ping ¶
Stamp last_ping_at for a single agent (thin wrapper over record_pings).
record_pings ¶
Stamp last_ping_at for every agent in one write transaction.
Lets a tick's dispatched pings be recorded with a single UPDATE … WHERE
agent_id IN (…) instead of one transaction per agent. when is an
ISO-8601 string stored verbatim in the TEXT column. An empty list is a
no-op (no transaction, no IN ()).
Source code in cafleet/src/cafleet/broker/monitor.py
update_monitor_config ¶
update_monitor_config(fleet_id: int, agent_id: int, *, interval_seconds: int | None = None, enabled: bool | None = None) -> dict
Update an enrolled agent's interval and/or enabled flag; return the new config.
A partial update leaves the unspecified field untouched. enabled is
accepted as a bool and written as 0/1.
Raises:
| Type | Description |
|---|---|
ClickException
|
If the agent is not in the fleet or not enrolled. |
Source code in cafleet/src/cafleet/broker/monitor.py
get_task ¶
Return the task iff at least one of its endpoints lives in the fleet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id used to gate visibility. |
required |
task_id
|
int
|
Task id to fetch. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dict with |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the task does not exist or neither endpoint belongs
to |
Source code in cafleet/src/cafleet/broker/queries.py
list_inbox ¶
Return raw task rows addressed to agent_id (no broadcast_summary).
list_sent ¶
Return raw task rows sent by agent_id (no broadcast_summary).
list_timeline ¶
Return the fleet's recent tasks in DESC status_timestamp order.
broadcast_summary rows are filtered out so the timeline shows only
delivery rows. Membership is tested via from_agent_id joined to
agents.fleet_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fleet_id
|
int
|
Fleet id to scope the query to. |
required |
limit
|
int
|
Maximum number of rows to return (default 200). |
200
|
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of flat task dicts in DESC |