Skip to content

broker

The data-access layer every CLI command and the WebUI share: all agent, fleet, and message operations against SQLite live here. Read this page to embed the broker from Python or to change any persisted behavior. Like every API page, it is for contributors changing cafleet and embedders driving it from Python; CLI users find the command surface in CLI options.

Package layout

cafleet.broker is a package split by domain:

Submodule Contents
broker/fleets.py fleet CRUD (create_fleet, list_fleets, get_fleet, delete_fleet)
broker/agents.py agent registry + placement (register_agent, deregister_agent, verify_agent_fleet, …)
broker/members.py member roster + activity proxies (list_members, list_members_with_activity)
broker/messaging.py send_message, broadcast_message, poll_tasks, ack_task, cancel_task + inline-preview notification
broker/queries.py read-only task queries (list_inbox, list_sent, list_timeline, get_task)
broker/_shared.py private cross-submodule helpers and the read_session / write_session context managers

Re-export contract

broker/__init__.py re-exports the full public API with __all__, so consumers import the package and use attribute access — from cafleet import broker then broker.send_message(...) — never a submodule directly. The package attribute is also the supported patch seam for tests; the single DB seam is cafleet.broker._shared.get_sync_sessionmaker.

broker

Sync SQLAlchemy data-access layer shared by the CLI and WebUI.

deregister_agent

deregister_agent(agent_id: int) -> bool

Soft-delete the agent and drop its placement.

Parameters:

Name Type Description Default
agent_id int

Agent id to deregister.

required

Returns:

Type Description
bool

True if a row was flipped from active to deregistered;

bool

False if no matching active agent existed.

Raises:

Type Description
UsageError

If agent_id is the root Director of any fleet — torn down via cafleet fleet delete instead.

ClickException

If agent_id is the built-in Administrator.

Source code in cafleet/src/cafleet/broker/agents.py
def deregister_agent(agent_id: int) -> bool:
    """Soft-delete the agent and drop its placement.

    Args:
        agent_id: Agent id to deregister.

    Returns:
        ``True`` if a row was flipped from ``active`` to ``deregistered``;
        ``False`` if no matching active agent existed.

    Raises:
        click.UsageError: If ``agent_id`` is the root Director of any
            fleet — torn down via ``cafleet fleet delete`` instead.
        click.ClickException: If ``agent_id`` is the built-in Administrator.
    """
    with _shared.write_session() as session:
        is_root_director = session.execute(
            select(exists().where(Fleet.director_agent_id == agent_id))
        ).scalar_one()
        if is_root_director:
            raise click.UsageError(
                "cannot deregister the root Director; "
                "use 'cafleet fleet delete' instead"
            )

        card_json = session.execute(
            select(Agent.agent_card_json).where(Agent.agent_id == agent_id)
        ).scalar_one_or_none()
        if card_json is not None and _shared.is_administrator(card_json):
            raise click.ClickException("Administrator cannot be deregistered")
        deregistered = session.execute(
            update(Agent)
            .where(
                Agent.agent_id == agent_id,
                Agent.status == "active",
            )
            .values(
                status="deregistered",
                deregistered_at=_shared.now_iso(),
            )
            .returning(Agent.agent_id)
        ).all()
        if deregistered:
            session.execute(
                delete(AgentPlacement).where(AgentPlacement.agent_id == agent_id)
            )
            # Runtime config has no historical value; drop it on the same
            # lifecycle as the placement.
            monitor.delete_agent_monitor_row(session, agent_id)
    return bool(deregistered)

get_agent

get_agent(agent_id: int, fleet_id: int) -> dict | None

Return the active agent's detail (with placement) or None.

Parameters:

Name Type Description Default
agent_id int

Agent id to look up.

required
fleet_id int

Fleet id the agent must belong to.

required

Returns:

Type Description
dict | None

Dict with agent_id, name, description, status,

dict | None

registered_at, kind ("user" or the Administrator kind),

dict | None

and placement (the placement sub-dict or None). Returns

dict | None

None if no active agent matches.

Source code in cafleet/src/cafleet/broker/agents.py
def get_agent(agent_id: int, fleet_id: int) -> dict | None:
    """Return the active agent's detail (with placement) or None.

    Args:
        agent_id: Agent id to look up.
        fleet_id: Fleet id the agent must belong to.

    Returns:
        Dict with ``agent_id``, ``name``, ``description``, ``status``,
        ``registered_at``, ``kind`` (``"user"`` or the Administrator kind),
        and ``placement`` (the placement sub-dict or ``None``). Returns
        ``None`` if no active agent matches.
    """
    with _shared.read_session() as session:
        agent = session.execute(
            select(Agent).where(
                Agent.agent_id == agent_id,
                Agent.fleet_id == fleet_id,
                Agent.status == "active",
            )
        ).scalar_one_or_none()

        if agent is None:
            return None

        placement_row = session.execute(
            select(AgentPlacement).where(AgentPlacement.agent_id == agent_id)
        ).scalar_one_or_none()

    result: dict = {
        "agent_id": agent.agent_id,
        "name": agent.name,
        "description": agent.description,
        "status": agent.status,
        "registered_at": agent.registered_at,
        "kind": (
            _shared.ADMINISTRATOR_KIND
            if _shared.is_administrator(agent.agent_card_json)
            else "user"
        ),
        "placement": None,
    }
    if placement_row is not None:
        result["placement"] = _shared.placement_dict(placement_row)
    return result

get_agent_names

get_agent_names(agent_ids: list[int]) -> dict[int, str]

Batch agent_id → name lookup including deregistered agents.

Source code in cafleet/src/cafleet/broker/agents.py
def get_agent_names(agent_ids: list[int]) -> dict[int, str]:
    """Batch ``agent_id → name`` lookup including deregistered agents."""
    if not agent_ids:
        return {}
    with _shared.read_session() as session:
        rows = session.execute(
            select(Agent.agent_id, Agent.name).where(Agent.agent_id.in_(agent_ids))
        ).all()
    return {row.agent_id: row.name for row in rows}

list_agents

list_agents(fleet_id: int) -> list[dict]

Return all active agents in the fleet.

Source code in cafleet/src/cafleet/broker/agents.py
def list_agents(fleet_id: int) -> list[dict]:
    """Return all active agents in the fleet."""
    stmt = select(
        Agent.agent_id,
        Agent.name,
        Agent.description,
        Agent.registered_at,
    ).where(
        Agent.fleet_id == fleet_id,
        Agent.status == "active",
    )
    with _shared.read_session() as session:
        rows = session.execute(stmt).all()
    return [
        {
            "agent_id": row.agent_id,
            "name": row.name,
            "description": row.description,
            "status": "active",
            "registered_at": row.registered_at,
        }
        for row in rows
    ]

list_fleet_agents

list_fleet_agents(fleet_id: int) -> list[dict]

Return active agents plus deregistered agents that still own tasks.

kind is derived in SQL via json_extract so the card blob never leaves SQLite — otherwise we would materialize every row's JSON just to compute a one-token discriminator. coalesce handles cards without a cafleet.kind path by substituting an empty string.

Source code in cafleet/src/cafleet/broker/agents.py
def list_fleet_agents(fleet_id: int) -> list[dict]:
    """Return active agents plus deregistered agents that still own tasks.

    ``kind`` is derived in SQL via ``json_extract`` so the card blob never
    leaves SQLite — otherwise we would materialize every row's JSON just to
    compute a one-token discriminator. ``coalesce`` handles cards without a
    ``cafleet.kind`` path by substituting an empty string.
    """
    has_tasks = exists().where(
        or_(
            Task.context_id == Agent.agent_id,
            Task.from_agent_id == Agent.agent_id,
        )
    )
    kind_expr = func.coalesce(
        func.json_extract(Agent.agent_card_json, "$.cafleet.kind"), ""
    )
    stmt = select(
        Agent.agent_id,
        Agent.name,
        Agent.description,
        Agent.status,
        Agent.registered_at,
        kind_expr.label("kind_raw"),
    ).where(
        Agent.fleet_id == fleet_id,
        or_(
            Agent.status == "active",
            and_(Agent.status == "deregistered", has_tasks),
        ),
    )
    with _shared.read_session() as session:
        rows = session.execute(stmt).all()
    return [
        {
            "agent_id": row.agent_id,
            "name": row.name,
            "description": row.description,
            "status": row.status,
            "registered_at": row.registered_at,
            "kind": (
                _shared.ADMINISTRATOR_KIND
                if row.kind_raw == _shared.ADMINISTRATOR_KIND
                else "user"
            ),
        }
        for row in rows
    ]

register_agent

register_agent(fleet_id: int, name: str, description: str, skills: list[dict] | None = None, placement: dict | None = None) -> dict

Register a new agent in the fleet and optionally create its placement.

Rejects soft-deleted fleets with a message that differs from the "not found" case so callers can surface the right recovery hint. When placement is supplied, the named Director must be active in the same fleet, must not be the Administrator, and must be the fleet's root Director (fleets.director_agent_id). A fleet has exactly one Director: nested teams are forbidden, so every member placement's director_agent_id equals the fleet root and a non-root value is rejected.

Parameters:

Name Type Description Default
fleet_id int

Fleet id the new agent belongs to.

required
name str

Short human-identifiable label.

required
description str

One-sentence purpose statement.

required
skills list[dict] | None

Optional list of skill dicts persisted into the agent's agent_card_json blob.

None
placement dict | None

Optional dict carrying director_agent_id, tmux_session, tmux_window_id, tmux_pane_id, and coding_agent. When present, an AgentPlacement row is created alongside the agent.

None

Returns:

Type Description
dict

Dict with agent_id, name, and registered_at.

Raises:

Type Description
UsageError

If the fleet does not exist, is soft-deleted, the named Director is not active in the same fleet, or the placement director_agent_id is not the fleet's root Director.

ClickException

If the named Director is the built-in Administrator.

Source code in cafleet/src/cafleet/broker/agents.py
def register_agent(
    fleet_id: int,
    name: str,
    description: str,
    skills: list[dict] | None = None,
    placement: dict | None = None,
) -> dict:
    """Register a new agent in the fleet and optionally create its placement.

    Rejects soft-deleted fleets with a message that differs from the
    "not found" case so callers can surface the right recovery hint. When
    ``placement`` is supplied, the named Director must be active in the
    same fleet, must not be the Administrator, and must be the fleet's root
    Director (``fleets.director_agent_id``). A fleet has exactly one Director:
    nested teams are forbidden, so every member placement's
    ``director_agent_id`` equals the fleet root and a non-root value is
    rejected.

    Args:
        fleet_id: Fleet id the new agent belongs to.
        name: Short human-identifiable label.
        description: One-sentence purpose statement.
        skills: Optional list of skill dicts persisted into the agent's
            ``agent_card_json`` blob.
        placement: Optional dict carrying ``director_agent_id``,
            ``tmux_session``, ``tmux_window_id``, ``tmux_pane_id``, and
            ``coding_agent``. When present, an ``AgentPlacement`` row is
            created alongside the agent.

    Returns:
        Dict with ``agent_id``, ``name``, and ``registered_at``.

    Raises:
        click.UsageError: If the fleet does not exist, is soft-deleted, the
            named Director is not active in the same fleet, or the placement
            ``director_agent_id`` is not the fleet's root Director.
        click.ClickException: If the named Director is the built-in
            Administrator.
    """
    sess = get_fleet(fleet_id)
    if sess is None:
        raise click.UsageError(f"Fleet '{fleet_id}' not found.")
    if sess["deleted_at"] is not None:
        raise click.UsageError(f"fleet {fleet_id} is deleted")

    registered_at = _shared.now_iso()
    agent_card = {
        "name": name,
        "description": description,
        "skills": skills or [],
    }

    with _shared.write_session() as session:
        if placement is not None:
            director_id = placement["director_agent_id"]
            director_card = session.execute(
                select(Agent.agent_card_json).where(
                    Agent.agent_id == director_id,
                    Agent.fleet_id == fleet_id,
                    Agent.status == "active",
                )
            ).scalar_one_or_none()
            if director_card is None:
                raise click.UsageError(
                    f"Director agent '{director_id}' not found or not active "
                    f"in fleet '{fleet_id}'."
                )
            if _shared.is_administrator(director_card):
                raise click.ClickException("Administrator cannot be a director")
            root_director_id = sess["director_agent_id"]
            if director_id != root_director_id:
                raise click.UsageError(
                    f"nested teams are not supported; placement director_agent_id "
                    f"{director_id} must equal the fleet root "
                    f"Director {root_director_id}."
                )

        agent = Agent(
            fleet_id=fleet_id,
            name=name,
            description=description,
            status="active",
            registered_at=registered_at,
            agent_card_json=json.dumps(agent_card),
        )
        session.add(agent)
        session.flush()
        agent_id = agent.agent_id
        if placement is not None:
            session.add(
                AgentPlacement(
                    agent_id=agent_id,
                    director_agent_id=placement["director_agent_id"],
                    tmux_session=placement["tmux_session"],
                    tmux_window_id=placement["tmux_window_id"],
                    tmux_pane_id=placement["tmux_pane_id"],
                    coding_agent=placement["coding_agent"],
                    created_at=registered_at,
                )
            )
            # Enroll the pane-bound agent in monitoring, atomically with its
            # placement insert (only agents with a pane can be pinged).
            monitor.enroll_agent(session, agent_id)

    return {
        "agent_id": agent_id,
        "name": name,
        "registered_at": registered_at,
    }

update_placement_pane_id

update_placement_pane_id(agent_id: int, pane_id: str) -> dict | None

Patch the agent's placement with a freshly resolved tmux pane id.

Called after split_window returns the spawned pane's id so the placement row reflects the live pane rather than the placeholder used during the initial INSERT.

Parameters:

Name Type Description Default
agent_id int

Agent id whose placement should be updated.

required
pane_id str

New tmux_pane_id value.

required

Returns:

Type Description
dict | None

The refreshed placement dict, or None if no placement row was

dict | None

affected.

Source code in cafleet/src/cafleet/broker/agents.py
def update_placement_pane_id(agent_id: int, pane_id: str) -> dict | None:
    """Patch the agent's placement with a freshly resolved tmux pane id.

    Called after ``split_window`` returns the spawned pane's id so the
    placement row reflects the live pane rather than the placeholder used
    during the initial INSERT.

    Args:
        agent_id: Agent id whose placement should be updated.
        pane_id: New ``tmux_pane_id`` value.

    Returns:
        The refreshed placement dict, or ``None`` if no placement row was
        affected.
    """
    with _shared.write_session() as session:
        updated = session.execute(
            update(AgentPlacement)
            .where(AgentPlacement.agent_id == agent_id)
            .values(tmux_pane_id=pane_id)
            .returning(AgentPlacement.agent_id)
        ).first()
        if updated is None:
            return None
        row = session.execute(
            select(AgentPlacement).where(AgentPlacement.agent_id == agent_id)
        ).scalar_one()
    return _shared.placement_dict(row)

verify_agent_fleet

verify_agent_fleet(agent_id: int, fleet_id: int) -> bool

Return True iff the agent belongs to the fleet (any status).

Parameters:

Name Type Description Default
agent_id int

Agent id to verify.

required
fleet_id int

Fleet id to check membership against.

required

Returns:

Type Description
bool

True if a matching row exists; False otherwise. Status is

bool

ignored — deregistered agents still pass.

Source code in cafleet/src/cafleet/broker/agents.py
def verify_agent_fleet(agent_id: int, fleet_id: int) -> bool:
    """Return True iff the agent belongs to the fleet (any status).

    Args:
        agent_id: Agent id to verify.
        fleet_id: Fleet id to check membership against.

    Returns:
        ``True`` if a matching row exists; ``False`` otherwise. Status is
        ignored — deregistered agents still pass.
    """
    with _shared.read_session() as session:
        return session.execute(
            select(
                exists().where(
                    Agent.agent_id == agent_id,
                    Agent.fleet_id == fleet_id,
                )
            )
        ).scalar_one()

create_fleet

create_fleet(label: str | None = None, *, director_context: MultiplexerContext, coding_agent: str) -> dict

Atomically bootstrap a fleet with its root Director and Administrator.

The fleet row is written first with director_agent_id=NULL and back-filled once the Director's agent row exists, so the column is DB-nullable even though the post-bootstrap invariant is NOT NULL.

Parameters:

Name Type Description Default
label str | None

Optional human-readable label for the fleet.

None
director_context MultiplexerContext

Resolved tmux pane identity for the root Director, obtained via Multiplexer.context_discovery.

required
coding_agent str

Operator-declared metadata that lands in the root Director's placement.coding_agent column. The CLI is the only caller and always supplies it (default 'claude' lives at the Click layer).

required

Returns:

Type Description
dict

A dict carrying fleet_id, label, created_at,

dict

administrator_agent_id, and a director sub-dict with the

dict

Director's identity and placement metadata.

Source code in cafleet/src/cafleet/broker/fleets.py
def create_fleet(
    label: str | None = None,
    *,
    director_context: MultiplexerContext,
    coding_agent: str,
) -> dict:
    """Atomically bootstrap a fleet with its root Director and Administrator.

    The fleet row is written first with ``director_agent_id=NULL`` and
    back-filled once the Director's agent row exists, so the column is
    DB-nullable even though the post-bootstrap invariant is NOT NULL.

    Args:
        label: Optional human-readable label for the fleet.
        director_context: Resolved tmux pane identity for the root Director,
            obtained via ``Multiplexer.context_discovery``.
        coding_agent: Operator-declared metadata that lands in the root
            Director's ``placement.coding_agent`` column. The CLI is the only
            caller and always supplies it (default ``'claude'`` lives at the
            Click layer).

    Returns:
        A dict carrying ``fleet_id``, ``label``, ``created_at``,
        ``administrator_agent_id``, and a ``director`` sub-dict with the
        Director's identity and placement metadata.
    """
    created_at = _shared.now_iso()
    director_card = {
        "name": _DIRECTOR_NAME,
        "description": _DIRECTOR_DESCRIPTION,
        "skills": [],
    }
    director_placement = {
        "director_agent_id": None,
        "tmux_session": director_context.session,
        "tmux_window_id": director_context.window_id,
        "tmux_pane_id": director_context.pane_id,
        "coding_agent": coding_agent,
        "created_at": created_at,
    }

    with _shared.write_session() as session:
        fleet = Fleet(
            label=label,
            created_at=created_at,
            deleted_at=None,
            director_agent_id=None,
        )
        session.add(fleet)
        session.flush()
        fleet_id = fleet.fleet_id
        director = Agent(
            fleet_id=fleet_id,
            name=_DIRECTOR_NAME,
            description=_DIRECTOR_DESCRIPTION,
            status="active",
            registered_at=created_at,
            deregistered_at=None,
            agent_card_json=json.dumps(director_card),
        )
        session.add(director)
        session.flush()
        director_agent_id = director.agent_id
        session.add(AgentPlacement(agent_id=director_agent_id, **director_placement))
        session.flush()
        # Enroll the root Director (pane-bound) in monitoring; the Administrator
        # below has no placement and is intentionally not enrolled.
        monitor.enroll_agent(session, director_agent_id)
        session.execute(
            update(Fleet)
            .where(Fleet.fleet_id == fleet_id)
            .values(director_agent_id=director_agent_id)
        )
        administrator_card = {
            "name": "Administrator",
            "description": f"Built-in administrator agent for fleet {fleet_id}",
            "skills": [],
            "cafleet": {"kind": _shared.ADMINISTRATOR_KIND},
        }
        administrator = Agent(
            fleet_id=fleet_id,
            name=administrator_card["name"],
            description=administrator_card["description"],
            status="active",
            registered_at=created_at,
            deregistered_at=None,
            agent_card_json=json.dumps(administrator_card),
        )
        session.add(administrator)
        session.flush()
        administrator_agent_id = administrator.agent_id

    return {
        "fleet_id": fleet_id,
        "label": label,
        "created_at": created_at,
        "administrator_agent_id": administrator_agent_id,
        "director": {
            "agent_id": director_agent_id,
            "name": _DIRECTOR_NAME,
            "description": _DIRECTOR_DESCRIPTION,
            "registered_at": created_at,
            "placement": director_placement,
        },
    }

delete_fleet

delete_fleet(fleet_id: int) -> dict

Soft-delete a fleet and deregister its agents, in one transaction.

Tasks are left untouched so audit history survives. Idempotent: re-running against an already-deleted row short-circuits on the deleted_at IS NULL guard and returns deregistered_count=0.

Parameters:

Name Type Description Default
fleet_id int

Fleet id to soft-delete.

required

Returns:

Type Description
dict

Dict with deregistered_count — the number of agents flipped from

dict

active to deregistered by this call.

Raises:

Type Description
ClickException

If the fleet does not exist.

Source code in cafleet/src/cafleet/broker/fleets.py
def delete_fleet(fleet_id: int) -> dict:
    """Soft-delete a fleet and deregister its agents, in one transaction.

    Tasks are left untouched so audit history survives. Idempotent: re-running
    against an already-deleted row short-circuits on the ``deleted_at IS NULL``
    guard and returns ``deregistered_count=0``.

    Args:
        fleet_id: Fleet id to soft-delete.

    Returns:
        Dict with ``deregistered_count`` — the number of agents flipped from
        ``active`` to ``deregistered`` by this call.

    Raises:
        click.ClickException: If the fleet does not exist.
    """
    now = _shared.now_iso()
    with _shared.write_session() as session:
        fleet_exists = session.execute(
            select(exists().where(Fleet.fleet_id == fleet_id))
        ).scalar_one()
        if not fleet_exists:
            # ClickException exits 1 (matching ``fleet show``); UsageError
            # would print a Usage: banner + exit 2, wrong for a runtime miss.
            raise click.ClickException(f"fleet '{fleet_id}' not found.")

        soft_deleted = session.execute(
            update(Fleet)
            .where(
                Fleet.fleet_id == fleet_id,
                Fleet.deleted_at.is_(None),
            )
            .values(deleted_at=now)
            .returning(Fleet.fleet_id)
        ).all()
        if not soft_deleted:
            return {"deregistered_count": 0}

        deregistered = session.execute(
            update(Agent)
            .where(
                Agent.fleet_id == fleet_id,
                Agent.status == "active",
            )
            .values(status="deregistered", deregistered_at=now)
            .returning(Agent.agent_id)
        ).all()
        deregistered_count = len(deregistered)
        agents_in_fleet = select(Agent.agent_id).where(Agent.fleet_id == fleet_id)
        session.execute(
            delete(AgentPlacement).where(AgentPlacement.agent_id.in_(agents_in_fleet))
        )
        monitor.delete_fleet_monitor_rows(session, fleet_id)

    return {"deregistered_count": deregistered_count}

get_fleet

get_fleet(fleet_id: int) -> dict | None

Return the fleet row (including soft-deleted) or None.

The returned dict exposes deleted_at so callers can distinguish a missing fleet from a soft-deleted one — register_agent relies on this to reject soft-deleted fleets with a different error message.

Parameters:

Name Type Description Default
fleet_id int

Fleet id to look up.

required

Returns:

Type Description
dict | None

Dict with fleet_id, label, created_at, deleted_at,

dict | None

and director_agent_id, or None if no row exists.

Source code in cafleet/src/cafleet/broker/fleets.py
def get_fleet(fleet_id: int) -> dict | None:
    """Return the fleet row (including soft-deleted) or None.

    The returned dict exposes ``deleted_at`` so callers can distinguish a
    missing fleet from a soft-deleted one — ``register_agent`` relies on
    this to reject soft-deleted fleets with a different error message.

    Args:
        fleet_id: Fleet id to look up.

    Returns:
        Dict with ``fleet_id``, ``label``, ``created_at``, ``deleted_at``,
        and ``director_agent_id``, or ``None`` if no row exists.
    """
    with _shared.read_session() as session:
        result = session.execute(select(Fleet).where(Fleet.fleet_id == fleet_id))
        row = result.scalar_one_or_none()
    if row is None:
        return None
    return {
        "fleet_id": row.fleet_id,
        "label": row.label,
        "created_at": row.created_at,
        "deleted_at": row.deleted_at,
        "director_agent_id": row.director_agent_id,
    }

list_fleets

list_fleets() -> list[dict]

Return non-soft-deleted fleets with their active agent counts.

Source code in cafleet/src/cafleet/broker/fleets.py
def list_fleets() -> list[dict]:
    """Return non-soft-deleted fleets with their active agent counts."""
    stmt = (
        select(
            Fleet.fleet_id,
            Fleet.director_agent_id,
            Fleet.label,
            Fleet.created_at,
            func.count(Agent.agent_id).label("agent_count"),
        )
        .select_from(Fleet)
        .outerjoin(
            Agent,
            and_(
                Agent.fleet_id == Fleet.fleet_id,
                Agent.status == "active",
            ),
        )
        .where(Fleet.deleted_at.is_(None))
        .group_by(
            Fleet.fleet_id,
            Fleet.director_agent_id,
            Fleet.label,
            Fleet.created_at,
        )
        .order_by(Fleet.created_at.desc(), Fleet.fleet_id.asc())
    )
    with _shared.read_session() as session:
        rows = session.execute(stmt).all()
    return [
        {
            "fleet_id": row.fleet_id,
            "director_agent_id": row.director_agent_id,
            "label": row.label,
            "created_at": row.created_at,
            "agent_count": row.agent_count,
        }
        for row in rows
    ]

list_members

list_members(fleet_id: int) -> list[dict]

Return the fleet's active members, with placements.

A fleet has exactly one Director (the root), so every member placement's director_agent_id equals the fleet root. Members are selected by director_agent_id IS NOT NULL, which lists all members and excludes the root Director's own placement (the only row with a NULL director).

Parameters:

Name Type Description Default
fleet_id int

Fleet id to scope the query to.

required

Returns:

Type Description
list[dict]

List of dicts each carrying agent_id, name, description,

list[dict]

status, registered_at, and placement.

Source code in cafleet/src/cafleet/broker/members.py
def list_members(fleet_id: int) -> list[dict]:
    """Return the fleet's active members, with placements.

    A fleet has exactly one Director (the root), so every member placement's
    ``director_agent_id`` equals the fleet root. Members are selected by
    ``director_agent_id IS NOT NULL``, which lists all members and excludes
    the root Director's own placement (the only row with a ``NULL`` director).

    Args:
        fleet_id: Fleet id to scope the query to.

    Returns:
        List of dicts each carrying ``agent_id``, ``name``, ``description``,
        ``status``, ``registered_at``, and ``placement``.
    """
    stmt = _base_members_select(fleet_id)
    with _shared.read_session() as session:
        rows = session.execute(stmt).all()
    return [
        {
            "agent_id": row.agent_id,
            "name": row.name,
            "description": row.description,
            "status": row.status,
            "registered_at": row.registered_at,
            "placement": _shared.placement_dict(row),
        }
        for row in rows
    ]

list_members_with_activity

list_members_with_activity(fleet_id: int) -> list[dict]

list_members plus per-member activity proxies sourced from tasks.

Scoped by fleet_id only — the flat single-Director model means every member placement's director_agent_id equals the fleet root, so the director_agent_id IS NOT NULL member filter applies here too (and excludes the root Director's own placement).

last_sent / last_recv / last_ack aggregate status_timestamp over the tasks table per agent. All three filter Task.type != 'broadcast_summary' (mirrors poll_tasks); broadcast_summary rows land in the broadcaster's own context with status_state='completed' and would otherwise pollute every proxy for the broadcaster.

Parameters:

Name Type Description Default
fleet_id int

Fleet id to scope the query to.

required

Returns:

Type Description
list[dict]

List of dicts as in :func:list_members, additionally carrying

list[dict]

last_sent, last_recv, last_ack (ISO timestamps or

list[dict]

None), and idle — the integer-second delta between now

list[dict]

and the most recent of last_sent / last_recv, or None

list[dict]

when both are None.

Source code in cafleet/src/cafleet/broker/members.py
def list_members_with_activity(fleet_id: int) -> list[dict]:
    """``list_members`` plus per-member activity proxies sourced from ``tasks``.

    Scoped by ``fleet_id`` only — the flat single-Director model means every
    member placement's ``director_agent_id`` equals the fleet root, so the
    ``director_agent_id IS NOT NULL`` member filter applies here too (and
    excludes the root Director's own placement).

    ``last_sent`` / ``last_recv`` / ``last_ack`` aggregate ``status_timestamp``
    over the ``tasks`` table per agent. All three filter ``Task.type !=
    'broadcast_summary'`` (mirrors ``poll_tasks``); broadcast_summary rows
    land in the broadcaster's own context with ``status_state='completed'``
    and would otherwise pollute every proxy for the broadcaster.

    Args:
        fleet_id: Fleet id to scope the query to.

    Returns:
        List of dicts as in :func:`list_members`, additionally carrying
        ``last_sent``, ``last_recv``, ``last_ack`` (ISO timestamps or
        ``None``), and ``idle`` — the integer-second delta between ``now``
        and the most recent of ``last_sent`` / ``last_recv``, or ``None``
        when both are ``None``.
    """
    last_sent_sq = (
        select(func.max(Task.status_timestamp))
        .where(
            Task.from_agent_id == Agent.agent_id,
            _shared.NOT_BROADCAST_SUMMARY,
        )
        .correlate(Agent)
        .scalar_subquery()
    )
    last_recv_sq = (
        select(func.max(Task.status_timestamp))
        .where(
            Task.context_id == Agent.agent_id,
            _shared.NOT_BROADCAST_SUMMARY,
        )
        .correlate(Agent)
        .scalar_subquery()
    )
    last_ack_sq = (
        select(func.max(Task.status_timestamp))
        .where(
            Task.context_id == Agent.agent_id,
            Task.status_state == "completed",
            _shared.NOT_BROADCAST_SUMMARY,
        )
        .correlate(Agent)
        .scalar_subquery()
    )
    stmt = _base_members_select(fleet_id).add_columns(
        last_sent_sq.label("last_sent"),
        last_recv_sq.label("last_recv"),
        last_ack_sq.label("last_ack"),
    )
    with _shared.read_session() as session:
        rows = session.execute(stmt).all()

    now = datetime.now(UTC)
    return [
        {
            "agent_id": row.agent_id,
            "name": row.name,
            "description": row.description,
            "status": row.status,
            "registered_at": row.registered_at,
            "placement": _shared.placement_dict(row),
            "last_sent": row.last_sent,
            "last_recv": row.last_recv,
            "last_ack": row.last_ack,
            "idle": _idle_seconds(now, row.last_sent, row.last_recv),
        }
        for row in rows
    ]

ack_task

ack_task(agent_id: int, task_id: int) -> dict

Transition a task from input_required to completed for the recipient.

Parameters:

Name Type Description Default
agent_id int

Recipient agent id; must match Task.context_id.

required
task_id int

Task id to ack.

required

Returns:

Type Description
dict

Dict with task — the updated task dict.

Raises:

Type Description
ValueError

If the task does not exist or is not in input_required state.

PermissionError

If agent_id is not the recipient.

Source code in cafleet/src/cafleet/broker/messaging.py
def ack_task(agent_id: int, task_id: int) -> dict:
    """Transition a task from ``input_required`` to ``completed`` for the recipient.

    Args:
        agent_id: Recipient agent id; must match ``Task.context_id``.
        task_id: Task id to ack.

    Returns:
        Dict with ``task`` — the updated task dict.

    Raises:
        ValueError: If the task does not exist or is not in
            ``input_required`` state.
        PermissionError: If ``agent_id`` is not the recipient.
    """
    return _transition_task_state(
        agent_id,
        task_id,
        expected_agent_field="context_id",
        new_state="completed",
        action_verb="ACK",
        permission_error_msg="Only the recipient can ACK a task",
    )

broadcast_message

broadcast_message(fleet_id: int, agent_id: int, text: str) -> list[dict]

Fan out one delivery task per active non-admin peer plus a sender summary.

Administrators are excluded at the SQL layer via json_extract so the card blob stays in the database; they are write-only identities. Every delivery row shares the same origin_task_id (the summary's task id) so receivers can thread back to the original broadcast.

Parameters:

Name Type Description Default
fleet_id int

Fleet id to scope the broadcast to.

required
agent_id int

Broadcaster's agent id.

required
text str

Message body delivered to every recipient.

required

Returns:

Type Description
list[dict]

Single-element list containing a dict with task (the summary row

list[dict]

owned by the broadcaster) and notifications_sent_count — the

list[dict]

number of inline-preview keystrokes that landed successfully.

Raises:

Type Description
ValueError

If the sender is not active in fleet_id.

Source code in cafleet/src/cafleet/broker/messaging.py
def broadcast_message(fleet_id: int, agent_id: int, text: str) -> list[dict]:
    """Fan out one delivery task per active non-admin peer plus a sender summary.

    Administrators are excluded at the SQL layer via ``json_extract`` so the
    card blob stays in the database; they are write-only identities. Every
    delivery row shares the same ``origin_task_id`` (the summary's task id)
    so receivers can thread back to the original broadcast.

    Args:
        fleet_id: Fleet id to scope the broadcast to.
        agent_id: Broadcaster's agent id.
        text: Message body delivered to every recipient.

    Returns:
        Single-element list containing a dict with ``task`` (the summary row
        owned by the broadcaster) and ``notifications_sent_count`` — the
        number of inline-preview keystrokes that landed successfully.

    Raises:
        ValueError: If the sender is not active in ``fleet_id``.
    """
    with _shared.write_session() as session:
        if not _shared.agent_is_active_in_fleet(session, agent_id, fleet_id):
            raise ValueError(
                f"Sender agent not found or not active in fleet: {agent_id}"
            )

        recipient_ids = list(
            session.execute(
                select(Agent.agent_id).where(
                    Agent.fleet_id == fleet_id,
                    Agent.status == "active",
                    Agent.agent_id != agent_id,
                    func.coalesce(
                        func.json_extract(Agent.agent_card_json, "$.cafleet.kind"),
                        "",
                    )
                    != _shared.ADMINISTRATOR_KIND,
                )
            ).scalars()
        )

        now = _shared.now_iso()
        summary_dict = {
            "context_id": agent_id,
            "from_agent_id": agent_id,
            "to_agent_id": 0,
            "type": "broadcast_summary",
            "created_at": now,
            "status_state": "completed",
            "status_timestamp": now,
            "origin_task_id": None,
            "text": f"Broadcast sent to {len(recipient_ids)} recipients",
        }
        summary_task_id = _insert_task(session, summary_dict)
        summary_dict["task_id"] = summary_task_id
        summary_dict["origin_task_id"] = summary_task_id
        _save_task(session, summary_dict)

        deliveries: list[tuple[int, dict]] = []
        for recipient_id in recipient_ids:
            delivery_dict = _unicast_task_dict(
                recipient_id=recipient_id,
                sender_id=agent_id,
                text=text,
                now=now,
                origin_task_id=summary_task_id,
            )
            delivery_dict["task_id"] = _insert_task(session, delivery_dict)
            deliveries.append((recipient_id, delivery_dict))

        notifications_sent_count = sum(
            _try_notify_recipient(
                session,
                recipient_id=recipient_id,
                sender_id=agent_id,
                task_dict=delivery_dict,
            )
            for recipient_id, delivery_dict in deliveries
        )

    return [
        {"task": summary_dict, "notifications_sent_count": notifications_sent_count}
    ]

cancel_task

cancel_task(agent_id: int, task_id: int) -> dict

Transition a task from input_required to canceled for the sender.

Parameters:

Name Type Description Default
agent_id int

Sender agent id; must match Task.from_agent_id.

required
task_id int

Task id to cancel.

required

Returns:

Type Description
dict

Dict with task — the updated task dict.

Raises:

Type Description
ValueError

If the task does not exist or is not in input_required state.

PermissionError

If agent_id is not the sender.

Source code in cafleet/src/cafleet/broker/messaging.py
def cancel_task(agent_id: int, task_id: int) -> dict:
    """Transition a task from ``input_required`` to ``canceled`` for the sender.

    Args:
        agent_id: Sender agent id; must match ``Task.from_agent_id``.
        task_id: Task id to cancel.

    Returns:
        Dict with ``task`` — the updated task dict.

    Raises:
        ValueError: If the task does not exist or is not in
            ``input_required`` state.
        PermissionError: If ``agent_id`` is not the sender.
    """
    return _transition_task_state(
        agent_id,
        task_id,
        expected_agent_field="from_agent_id",
        new_state="canceled",
        action_verb="cancel",
        permission_error_msg="Only the sender can cancel a task",
    )

poll_tasks

poll_tasks(agent_id: int) -> list[dict]

Return un-acked deliveries addressed to agent_id, newest first.

Only input_required tasks are returned — once a delivery is ACKed (completed) or canceled it no longer appears. broadcast_summary rows are filtered out, as those belong to the broadcaster's own context and are not deliveries.

Parameters:

Name Type Description Default
agent_id int

Recipient agent id; matches Task.context_id.

required

Returns:

Type Description
list[dict]

List of flat task dicts (one per row) carrying every column from the

list[dict]

tasks table, in DESC status_timestamp order.

Source code in cafleet/src/cafleet/broker/messaging.py
def poll_tasks(agent_id: int) -> list[dict]:
    """Return un-acked deliveries addressed to ``agent_id``, newest first.

    Only ``input_required`` tasks are returned — once a delivery is ACKed
    (``completed``) or canceled it no longer appears. ``broadcast_summary``
    rows are filtered out, as those belong to the broadcaster's own context
    and are not deliveries.

    Args:
        agent_id: Recipient agent id; matches ``Task.context_id``.

    Returns:
        List of flat task dicts (one per row) carrying every column from the
        ``tasks`` table, in DESC ``status_timestamp`` order.
    """
    return _shared.list_tasks_where(
        Task.context_id == agent_id,
        status="input_required",
    )

send_message

send_message(fleet_id: int, agent_id: int, to: int | str, text: str) -> dict

Create a unicast task addressed to to and best-effort notify it.

Persists a new Task row with type='unicast' and status_state='input_required', then calls _try_notify_recipient to keystroke an inline preview into the recipient's tmux pane. Notification failure does not roll back the insert — the message remains available via :func:poll_tasks.

Parameters:

Name Type Description Default
fleet_id int

Fleet id; sender and recipient must both belong to it.

required
agent_id int

Sender's agent id.

required
to int | str

Recipient's agent id. Accepts a string for non-CLI callers (WebUI, tests); it is coerced with int(...).

required
text str

Message body. Truncation is render-side; the persisted row holds the full string.

required

Returns:

Type Description
dict

Dict with task (the persisted task dict) and notification_sent

dict

(boolean indicating whether the inline-preview keystroke landed).

Raises:

Type Description
ValueError

If to is not a valid integer, the sender is not active in fleet_id, or the recipient is missing or lives in a different fleet.

Source code in cafleet/src/cafleet/broker/messaging.py
def send_message(fleet_id: int, agent_id: int, to: int | str, text: str) -> dict:
    """Create a unicast task addressed to ``to`` and best-effort notify it.

    Persists a new ``Task`` row with ``type='unicast'`` and
    ``status_state='input_required'``, then calls
    ``_try_notify_recipient`` to keystroke an inline preview into the
    recipient's tmux pane. Notification failure does not roll back the
    insert — the message remains available via :func:`poll_tasks`.

    Args:
        fleet_id: Fleet id; sender and recipient must both belong to it.
        agent_id: Sender's agent id.
        to: Recipient's agent id. Accepts a string for non-CLI callers
            (WebUI, tests); it is coerced with ``int(...)``.
        text: Message body. Truncation is render-side; the persisted row
            holds the full string.

    Returns:
        Dict with ``task`` (the persisted task dict) and ``notification_sent``
        (boolean indicating whether the inline-preview keystroke landed).

    Raises:
        ValueError: If ``to`` is not a valid integer, the sender is not
            active in ``fleet_id``, or the recipient is missing or lives in a
            different fleet.
    """
    try:
        to_id = int(to)
    except (TypeError, ValueError) as exc:
        raise ValueError(f"Invalid destination format: {to}") from exc

    with _shared.write_session() as session:
        if not _shared.agent_is_active_in_fleet(session, agent_id, fleet_id):
            raise ValueError(
                f"Sender agent not found or not active in fleet: {agent_id}"
            )

        dest_fleet = session.execute(
            select(Agent.fleet_id).where(
                Agent.agent_id == to_id,
                Agent.status == "active",
            )
        ).scalar_one_or_none()
        if dest_fleet is None:
            raise ValueError(f"Destination agent not found: {to_id}")
        if dest_fleet != fleet_id:
            raise ValueError(f"Destination agent not in fleet: {to_id}")

        task_dict = _unicast_task_dict(
            recipient_id=to_id,
            sender_id=agent_id,
            text=text,
            now=_shared.now_iso(),
        )
        task_dict["task_id"] = _insert_task(session, task_dict)
        notification_sent = _try_notify_recipient(
            session,
            recipient_id=to_id,
            sender_id=agent_id,
            task_dict=task_dict,
        )

    return {"task": task_dict, "notification_sent": notification_sent}

claim_monitor_runtime

claim_monitor_runtime(fleet_id: int, pid: int, tick_seconds: int, when: str) -> bool

Atomically claim the fleet's single monitor slot.

Inserts a fresh row, reclaims a stale one, or refuses a live one — all in one write transaction (SQLite's write lock serializes concurrent claims).

Returns:

Type Description
bool

True if this pid now owns the slot; False if a live monitor

bool

already holds it.

Source code in cafleet/src/cafleet/broker/monitor.py
def claim_monitor_runtime(
    fleet_id: int, pid: int, tick_seconds: int, when: str
) -> bool:
    """Atomically claim the fleet's single monitor slot.

    Inserts a fresh row, reclaims a stale one, or refuses a live one — all in
    one write transaction (SQLite's write lock serializes concurrent claims).

    Returns:
        ``True`` if this ``pid`` now owns the slot; ``False`` if a live monitor
        already holds it.
    """
    now = datetime.fromisoformat(when)
    with _shared.write_session() as session:
        row = session.get(MonitorRuntime, fleet_id)
        if row is None:
            session.add(
                MonitorRuntime(
                    fleet_id=fleet_id,
                    pid=pid,
                    started_at=when,
                    last_tick_at=when,
                    tick_seconds=tick_seconds,
                )
            )
            return True
        if _is_live(row, now):
            return False
        row.pid = pid
        row.started_at = when
        row.last_tick_at = when
        row.tick_seconds = tick_seconds
        return True

clear_monitor_runtime

clear_monitor_runtime(fleet_id: int, pid: int) -> None

Ownership-checked clear — nulls pid/started_at/last_tick_at iff pid owns the slot.

A cleanly-stopped monitor leaves no residual started_at, so status/the WebUI report a fully-stopped row. A non-owner clear matches zero rows and is a no-op, so a self-terminating loser never wipes the winner's row on exit.

Source code in cafleet/src/cafleet/broker/monitor.py
def clear_monitor_runtime(fleet_id: int, pid: int) -> None:
    """Ownership-checked clear — nulls ``pid``/``started_at``/``last_tick_at`` iff ``pid`` owns the slot.

    A cleanly-stopped monitor leaves no residual ``started_at``, so `status`/the
    WebUI report a fully-stopped row. A non-owner clear matches zero rows and is
    a no-op, so a self-terminating loser never wipes the winner's row on exit.
    """
    with _shared.write_session() as session:
        session.execute(
            update(MonitorRuntime)
            .where(MonitorRuntime.fleet_id == fleet_id, MonitorRuntime.pid == pid)
            .values(pid=None, started_at=None, last_tick_at=None)
        )

enroll_agent

enroll_agent(session, agent_id: int, interval: int = DEFAULT_PING_INTERVAL_SECONDS) -> None

Insert a monitor_config row for a pane-bound agent.

Called inside the same write transaction as the agent/placement insert (by register_agent / create_fleet), so enrollment is atomic with registration. Only agents with a tmux pane (the root Director and members) are enrolled.

Source code in cafleet/src/cafleet/broker/monitor.py
def enroll_agent(
    session, agent_id: int, interval: int = DEFAULT_PING_INTERVAL_SECONDS
) -> None:
    """Insert a ``monitor_config`` row for a pane-bound agent.

    Called inside the same write transaction as the agent/placement insert (by
    ``register_agent`` / ``create_fleet``), so enrollment is atomic with
    registration. Only agents with a tmux pane (the root Director and members)
    are enrolled.
    """
    session.add(MonitorConfig(agent_id=agent_id, interval_seconds=interval, enabled=1))

get_monitor_config

get_monitor_config(fleet_id: int, agent_id: int) -> dict | None

Return the agent's schedule, or None if not enrolled / not in fleet.

Source code in cafleet/src/cafleet/broker/monitor.py
def get_monitor_config(fleet_id: int, agent_id: int) -> dict | None:
    """Return the agent's schedule, or ``None`` if not enrolled / not in fleet."""
    stmt = (
        select(
            MonitorConfig.agent_id,
            MonitorConfig.interval_seconds,
            MonitorConfig.last_ping_at,
            MonitorConfig.enabled,
        )
        .join(Agent, Agent.agent_id == MonitorConfig.agent_id)
        .where(MonitorConfig.agent_id == agent_id, Agent.fleet_id == fleet_id)
    )
    with _shared.read_session() as session:
        row = session.execute(stmt).first()
    return _config_dict(row) if row is not None else None

heartbeat_monitor_runtime

heartbeat_monitor_runtime(fleet_id: int, pid: int, when: str) -> bool

Ownership-checked heartbeat — rewrites last_tick_at iff pid owns the slot.

Returns False when the slot was reclaimed by another instance (the WHERE pid=? matches zero rows), the signal the displaced monitor uses to self-terminate without pinging.

Source code in cafleet/src/cafleet/broker/monitor.py
def heartbeat_monitor_runtime(fleet_id: int, pid: int, when: str) -> bool:
    """Ownership-checked heartbeat — rewrites ``last_tick_at`` iff ``pid`` owns the slot.

    Returns ``False`` when the slot was reclaimed by another instance (the
    ``WHERE pid=?`` matches zero rows), the signal the displaced monitor uses
    to self-terminate without pinging.
    """
    with _shared.write_session() as session:
        result = session.execute(
            update(MonitorRuntime)
            .where(MonitorRuntime.fleet_id == fleet_id, MonitorRuntime.pid == pid)
            .values(pid=pid, last_tick_at=when)
        )
        return result.rowcount == 1

list_monitor_configs

list_monitor_configs(fleet_id: int) -> list[dict]

Return every enrolled agent's schedule in the fleet (enabled as bool).

Source code in cafleet/src/cafleet/broker/monitor.py
def list_monitor_configs(fleet_id: int) -> list[dict]:
    """Return every enrolled agent's schedule in the fleet (``enabled`` as bool)."""
    stmt = (
        select(
            MonitorConfig.agent_id,
            MonitorConfig.interval_seconds,
            MonitorConfig.last_ping_at,
            MonitorConfig.enabled,
        )
        .join(Agent, Agent.agent_id == MonitorConfig.agent_id)
        .where(Agent.fleet_id == fleet_id)
    )
    with _shared.read_session() as session:
        rows = session.execute(stmt).all()
    return [_config_dict(row) for row in rows]

list_monitor_targets

list_monitor_targets(fleet_id: int) -> list[dict]

Per-tick scan: one row per active, enrolled agent in the fleet.

Each dict carries agent_id, name, is_director (derived from fleets.director_agent_id), pane_id, interval_seconds, last_ping_at, enabled (bool), and pending_count — the count of the agent's input_required deliveries excluding broadcast_summary rows, a correlated subquery mirroring members.py.

Source code in cafleet/src/cafleet/broker/monitor.py
def list_monitor_targets(fleet_id: int) -> list[dict]:
    """Per-tick scan: one row per active, enrolled agent in the fleet.

    Each dict carries ``agent_id``, ``name``, ``is_director`` (derived from
    ``fleets.director_agent_id``), ``pane_id``, ``interval_seconds``,
    ``last_ping_at``, ``enabled`` (bool), and ``pending_count`` — the count of
    the agent's ``input_required`` deliveries excluding ``broadcast_summary``
    rows, a correlated subquery mirroring ``members.py``.
    """
    pending_sq = (
        select(func.count(Task.task_id))
        .where(
            Task.context_id == Agent.agent_id,
            Task.status_state == "input_required",
            _shared.NOT_BROADCAST_SUMMARY,
        )
        .correlate(Agent)
        .scalar_subquery()
    )
    stmt = (
        select(
            Agent.agent_id,
            Agent.name,
            AgentPlacement.tmux_pane_id,
            Fleet.director_agent_id,
            MonitorConfig.interval_seconds,
            MonitorConfig.last_ping_at,
            MonitorConfig.enabled,
            pending_sq.label("pending_count"),
        )
        .join(MonitorConfig, MonitorConfig.agent_id == Agent.agent_id)
        .join(AgentPlacement, AgentPlacement.agent_id == Agent.agent_id)
        .join(Fleet, Fleet.fleet_id == Agent.fleet_id)
        .where(Agent.fleet_id == fleet_id, Agent.status == "active")
    )
    with _shared.read_session() as session:
        rows = session.execute(stmt).all()
    return [
        {
            "agent_id": row.agent_id,
            "name": row.name,
            "is_director": row.agent_id == row.director_agent_id,
            "pane_id": row.tmux_pane_id,
            "interval_seconds": row.interval_seconds,
            "last_ping_at": row.last_ping_at,
            "enabled": bool(row.enabled),
            "pending_count": row.pending_count,
        }
        for row in rows
    ]

monitor_is_live

monitor_is_live(fleet_id: int, now: datetime) -> bool

Return True iff the fleet currently has a live monitor holding the slot.

The advisory single-instance pre-check for monitor start (the atomic claim_monitor_runtime is the authoritative guard). Reuses _is_live: heartbeat freshness is authoritative, os.kill(pid, 0) corroborates.

Source code in cafleet/src/cafleet/broker/monitor.py
def monitor_is_live(fleet_id: int, now: datetime) -> bool:
    """Return True iff the fleet currently has a live monitor holding the slot.

    The advisory single-instance pre-check for ``monitor start`` (the atomic
    ``claim_monitor_runtime`` is the authoritative guard). Reuses ``_is_live``:
    heartbeat freshness is authoritative, ``os.kill(pid, 0)`` corroborates.
    """
    with _shared.read_session() as session:
        row = session.get(MonitorRuntime, fleet_id)
        if row is None:
            return False
        return _is_live(row, now)

read_monitor_runtime

read_monitor_runtime(fleet_id: int) -> dict | None

Return the fleet's runtime row, or None when no monitor ever claimed it.

Source code in cafleet/src/cafleet/broker/monitor.py
def read_monitor_runtime(fleet_id: int) -> dict | None:
    """Return the fleet's runtime row, or ``None`` when no monitor ever claimed it."""
    with _shared.read_session() as session:
        row = session.get(MonitorRuntime, fleet_id)
        if row is None:
            return None
        return {
            "fleet_id": row.fleet_id,
            "pid": row.pid,
            "started_at": row.started_at,
            "last_tick_at": row.last_tick_at,
            "tick_seconds": row.tick_seconds,
        }

record_ping

record_ping(agent_id: int, when: str) -> None

Stamp last_ping_at for a single agent (thin wrapper over record_pings).

Source code in cafleet/src/cafleet/broker/monitor.py
def record_ping(agent_id: int, when: str) -> None:
    """Stamp ``last_ping_at`` for a single agent (thin wrapper over ``record_pings``)."""
    record_pings([agent_id], when)

record_pings

record_pings(agent_ids: list[int], when: str) -> None

Stamp last_ping_at for every agent in one write transaction.

Lets a tick's dispatched pings be recorded with a single UPDATE … WHERE agent_id IN (…) instead of one transaction per agent. when is an ISO-8601 string stored verbatim in the TEXT column. An empty list is a no-op (no transaction, no IN ()).

Source code in cafleet/src/cafleet/broker/monitor.py
def record_pings(agent_ids: list[int], when: str) -> None:
    """Stamp ``last_ping_at`` for every agent in one write transaction.

    Lets a tick's dispatched pings be recorded with a single ``UPDATE … WHERE
    agent_id IN (…)`` instead of one transaction per agent. ``when`` is an
    ISO-8601 string stored verbatim in the TEXT column. An empty list is a
    no-op (no transaction, no ``IN ()``).
    """
    if not agent_ids:
        return
    with _shared.write_session() as session:
        session.execute(
            update(MonitorConfig)
            .where(MonitorConfig.agent_id.in_(agent_ids))
            .values(last_ping_at=when)
        )

update_monitor_config

update_monitor_config(fleet_id: int, agent_id: int, *, interval_seconds: int | None = None, enabled: bool | None = None) -> dict

Update an enrolled agent's interval and/or enabled flag; return the new config.

A partial update leaves the unspecified field untouched. enabled is accepted as a bool and written as 0/1.

Raises:

Type Description
ClickException

If the agent is not in the fleet or not enrolled.

Source code in cafleet/src/cafleet/broker/monitor.py
def update_monitor_config(
    fleet_id: int,
    agent_id: int,
    *,
    interval_seconds: int | None = None,
    enabled: bool | None = None,
) -> dict:
    """Update an enrolled agent's interval and/or enabled flag; return the new config.

    A partial update leaves the unspecified field untouched. ``enabled`` is
    accepted as a bool and written as 0/1.

    Raises:
        click.ClickException: If the agent is not in the fleet or not enrolled.
    """
    with _shared.write_session() as session:
        enrolled = session.execute(
            select(MonitorConfig.agent_id)
            .join(Agent, Agent.agent_id == MonitorConfig.agent_id)
            .where(MonitorConfig.agent_id == agent_id, Agent.fleet_id == fleet_id)
        ).scalar_one_or_none()
        if enrolled is None:
            raise click.ClickException(
                f"agent {agent_id} is not enrolled in monitoring for fleet {fleet_id}."
            )

        values: dict = {}
        if interval_seconds is not None:
            values["interval_seconds"] = interval_seconds
        if enabled is not None:
            values["enabled"] = 1 if enabled else 0
        if values:
            session.execute(
                update(MonitorConfig)
                .where(MonitorConfig.agent_id == agent_id)
                .values(**values)
            )

        row = session.execute(
            select(
                MonitorConfig.agent_id,
                MonitorConfig.interval_seconds,
                MonitorConfig.last_ping_at,
                MonitorConfig.enabled,
            ).where(MonitorConfig.agent_id == agent_id)
        ).first()
        return _config_dict(row)

get_task

get_task(fleet_id: int, task_id: int) -> dict

Return the task iff at least one of its endpoints lives in the fleet.

Parameters:

Name Type Description Default
fleet_id int

Fleet id used to gate visibility.

required
task_id int

Task id to fetch.

required

Returns:

Type Description
dict

Dict with task — the flat typed-column task dict.

Raises:

Type Description
ValueError

If the task does not exist or neither endpoint belongs to fleet_id.

Source code in cafleet/src/cafleet/broker/queries.py
def get_task(fleet_id: int, task_id: int) -> dict:
    """Return the task iff at least one of its endpoints lives in the fleet.

    Args:
        fleet_id: Fleet id used to gate visibility.
        task_id: Task id to fetch.

    Returns:
        Dict with ``task`` — the flat typed-column task dict.

    Raises:
        ValueError: If the task does not exist or neither endpoint belongs
            to ``fleet_id``.
    """
    with _shared.read_session() as session:
        task_dict = _shared.read_task(session, task_id)
        if task_dict is None:
            raise ValueError(f"Task {task_id} not found")

        endpoint_ids = [task_dict["from_agent_id"]]
        to_id = task_dict["to_agent_id"]
        if to_id:
            endpoint_ids.append(to_id)
        in_fleet = session.execute(
            select(
                exists().where(
                    Agent.agent_id.in_(endpoint_ids),
                    Agent.fleet_id == fleet_id,
                )
            )
        ).scalar_one()
        if not in_fleet:
            raise ValueError(f"Task {task_id} not found")

    return {"task": task_dict}

list_inbox

list_inbox(agent_id: int) -> list[dict]

Return raw task rows addressed to agent_id (no broadcast_summary).

Source code in cafleet/src/cafleet/broker/queries.py
def list_inbox(agent_id: int) -> list[dict]:
    """Return raw task rows addressed to ``agent_id`` (no broadcast_summary)."""
    return _shared.list_tasks_where(Task.context_id == agent_id)

list_sent

list_sent(agent_id: int) -> list[dict]

Return raw task rows sent by agent_id (no broadcast_summary).

Source code in cafleet/src/cafleet/broker/queries.py
def list_sent(agent_id: int) -> list[dict]:
    """Return raw task rows sent by ``agent_id`` (no broadcast_summary)."""
    return _shared.list_tasks_where(Task.from_agent_id == agent_id)

list_timeline

list_timeline(fleet_id: int, limit: int = 200) -> list[dict]

Return the fleet's recent tasks in DESC status_timestamp order.

broadcast_summary rows are filtered out so the timeline shows only delivery rows. Membership is tested via from_agent_id joined to agents.fleet_id.

Parameters:

Name Type Description Default
fleet_id int

Fleet id to scope the query to.

required
limit int

Maximum number of rows to return (default 200).

200

Returns:

Type Description
list[dict]

List of flat task dicts in DESC status_timestamp order.

Source code in cafleet/src/cafleet/broker/queries.py
def list_timeline(fleet_id: int, limit: int = 200) -> list[dict]:
    """Return the fleet's recent tasks in DESC ``status_timestamp`` order.

    ``broadcast_summary`` rows are filtered out so the timeline shows only
    delivery rows. Membership is tested via ``from_agent_id`` joined to
    ``agents.fleet_id``.

    Args:
        fleet_id: Fleet id to scope the query to.
        limit: Maximum number of rows to return (default 200).

    Returns:
        List of flat task dicts in DESC ``status_timestamp`` order.
    """
    stmt = (
        select(*(getattr(Task, col) for col in _shared.TASK_COLUMNS))
        .join(Agent, Task.from_agent_id == Agent.agent_id)
        .where(
            Agent.fleet_id == fleet_id,
            _shared.NOT_BROADCAST_SUMMARY,
        )
        .order_by(Task.status_timestamp.desc())
        .limit(limit)
    )
    with _shared.read_session() as session:
        rows = session.execute(stmt).all()
    return [_shared.row_to_task_dict(row) for row in rows]