Skip to main content

Control Plane Guide

Enterprise orchestration system for managing heterogeneous AI agents in distributed environments.

Overview

The Aragora Control Plane provides centralized coordination for AI agents with:

  • Service Discovery: Automatic agent registration and heartbeat-based liveness tracking
  • Task Scheduling: Priority-based task distribution with capability matching
  • Health Monitoring: Liveness probes and circuit breaker integration
  • Load Balancing: Intelligent agent selection strategies
┌─────────────────────────────────────────────────────────┐
│ Control Plane │
│ AgentRegistry │ TaskScheduler │ HealthMonitor │
├─────────────────────────────────────────────────────────┤
│ Redis Backend │
│ Service Discovery │ Job Queue │ Health Probes │
└─────────────────────────────────────────────────────────┘

Quick Start

Demo Script

python scripts/demo_control_plane.py

Use --quick for a 2-minute walkthrough and --simulate-load to stress the queue. See docs/CONTROL_PLANE.md for full options.

Python API

from aragora.control_plane import ControlPlaneCoordinator, create_control_plane

# Create and connect
coordinator = await create_control_plane()

# Register an agent
await coordinator.register_agent(
agent_id="claude-3",
capabilities=["debate", "code", "analysis"],
model="claude-3-opus",
provider="anthropic",
metadata={"version": "3.5"}
)

# Submit a task
task_id = await coordinator.submit_task(
task_type="debate",
payload={"question": "Should we use microservices?"},
required_capabilities=["debate"],
)

# Wait for result
result = await coordinator.wait_for_result(task_id, timeout=60.0)
print(result.result)

# Shutdown
await coordinator.shutdown()

REST API

All endpoints are available under /api/v1/control-plane with legacy aliases under /api/control-plane. Examples below use the legacy path for brevity.

Terminology note: in the API and worker identifiers, vetted decisionmaking sessions are called "deliberations".

Authentication is required. Task and vetted decisionmaking operations (deliberations in the API) require the controlplane:tasks permission; agent registration and management require controlplane:agents.

# Register an agent
curl -X POST http://localhost:8080/api/control-plane/agents \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"agent_id": "claude-3",
"capabilities": ["debate", "code"],
"model": "claude-3-opus",
"provider": "anthropic"
}'

# Submit a task
curl -X POST http://localhost:8080/api/control-plane/tasks \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"task_type": "debate",
"payload": {"question": "What is the best design pattern?"},
"required_capabilities": ["debate"],
"priority": "normal"
}'

# Check task status
curl http://localhost:8080/api/control-plane/tasks/\{task_id\}

Configuration

Environment Variables

VariableDefaultDescription
REDIS_URLredis://localhost:6379Redis connection URL
CONTROL_PLANE_PREFIXaragora:cp:Key prefix for Redis keys
HEARTBEAT_INTERVAL10Agent heartbeat interval (seconds)
HEARTBEAT_TIMEOUT30Seconds before agent marked offline
PROBE_INTERVAL30Health probe interval (seconds)
PROBE_TIMEOUT10Health probe timeout (seconds)
TASK_TIMEOUT300Default task timeout (seconds)
MAX_TASK_RETRIES3Maximum task retry attempts
CLEANUP_INTERVAL60Stale agent cleanup interval (seconds)
ARAGORA_CONTROL_PLANE_POLICY_SOURCEautoPolicy source: compliance or inprocess

Programmatic Configuration

from aragora.control_plane import ControlPlaneConfig, ControlPlaneCoordinator

config = ControlPlaneConfig(
redis_url="redis://localhost:6379",
key_prefix="myapp:cp:",
heartbeat_timeout=60.0,
task_timeout=600.0,
)

coordinator = await ControlPlaneCoordinator.create(config)

Components

Agent Registry

The AgentRegistry manages service discovery for AI agents.

Agent Status

StatusDescription
STARTINGAgent is initializing
READYAgent is available for tasks
BUSYAgent is processing a task
DRAININGCompleting current task, no new tasks
OFFLINEAgent is not responding
FAILEDAgent has failed

Agent Capabilities

Standard capabilities supported:

CapabilityDescription
debateCan participate in debates
codeCan write/analyze code
analysisCan perform analysis tasks
critiqueCan critique other agents' work
judgeCan serve as a debate judge
implementCan implement code changes
designCan create designs/architectures
researchCan perform research tasks
auditCan perform audits
summarizeCan summarize content

Custom capabilities can be added as strings.

Registration Example

from aragora.control_plane import AgentCapability

agent = await coordinator.register_agent(
agent_id="my-agent",
capabilities=[
AgentCapability.DEBATE,
AgentCapability.CODE,
"custom-capability", # Custom string capability
],
model="gpt-4",
provider="openai",
metadata={
"version": "1.0",
"region": "us-east-1",
},
)

print(f"Registered: {agent.agent_id}")
print(f"Status: {agent.status.value}")

Heartbeats

Agents must send periodic heartbeats to remain in the active pool:

# Send heartbeat
await coordinator.heartbeat("my-agent")

# Send heartbeat with status update
from aragora.control_plane import AgentStatus
await coordinator.heartbeat("my-agent", status=AgentStatus.BUSY)

REST API:

curl -X POST http://localhost:8080/api/control-plane/agents/my-agent/heartbeat \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"status": "busy"}'

Task Scheduler

The TaskScheduler handles task distribution with priority-based queuing.

Task Priority

PriorityValueDescription
CRITICAL0Immediate execution
HIGH1Urgent tasks
NORMAL2Default priority
LOW3Background tasks

Task Status

StatusDescription
PENDINGWaiting in queue
CLAIMEDClaimed by an agent
RUNNINGBeing executed
COMPLETEDSuccessfully completed
FAILEDFailed (may be retried)
CANCELLEDManually cancelled
TIMEOUTExceeded timeout

Task Lifecycle

from aragora.control_plane import TaskPriority

# 1. Submit task
task_id = await coordinator.submit_task(
task_type="code_review",
payload={"file": "main.py", "diff": "..."},
required_capabilities=["code", "critique"],
priority=TaskPriority.HIGH,
timeout_seconds=120,
metadata={"pr_id": "123"},
)

# 2. Claim task (from agent side)
task = await coordinator.claim_task(
agent_id="my-agent",
capabilities=["code", "critique"],
block_ms=5000, # Wait up to 5s for a task
)

if task:
# 3. Process task
try:
result = process_task(task.payload)

# 4a. Complete task
await coordinator.complete_task(
task_id=task.id,
result={"review": result},
agent_id="my-agent",
latency_ms=1500.0,
)
except Exception as e:
# 4b. Fail task
await coordinator.fail_task(
task_id=task.id,
error=str(e),
agent_id="my-agent",
requeue=True, # Retry with another agent
)

Notifications & Task Events

Task lifecycle events emit notifications through the NotificationDispatcher:

  • Task events (submitted, claimed, completed, failed) are emitted via aragora/control_plane/task_events.py.
  • Vetted decisionmaking consensus events are emitted by aragora/control_plane/deliberation.py and routed through the control plane integration callback.

Configure channels in docs/CONTROL_PLANE.md and ensure the notification worker is enabled (ARAGORA_NOTIFICATION_WORKER=1).

Health Monitor

The HealthMonitor tracks agent health and integrates with circuit breakers.

Health Status

StatusDescription
HEALTHYAll checks passing
DEGRADEDSome issues detected
UNHEALTHYCritical failures
UNKNOWNNo health data

Health Probes

Register custom health probes for agents:

def my_health_probe() -> bool:
# Return True if healthy
return check_model_connection()

await coordinator.register_agent(
agent_id="my-agent",
capabilities=["debate"],
health_probe=my_health_probe,
)

Querying Health

# Get specific agent health
health = coordinator.get_agent_health("my-agent")
print(f"Status: {health.status.value}")
print(f"Last check: {health.last_check}")
print(f"Consecutive failures: {health.consecutive_failures}")

# Get system health
system_health = coordinator.get_system_health()
print(f"System status: {system_health.value}")

# Check if agent is available
if coordinator.is_agent_available("my-agent"):
# Safe to assign tasks
pass

REST API Reference

Agents

MethodEndpointDescription
GET/api/control-plane/agentsList registered agents
POST/api/control-plane/agentsRegister an agent
GET/api/control-plane/agents/:idGet agent info
DELETE/api/control-plane/agents/:idUnregister agent
POST/api/control-plane/agents/:id/heartbeatSend heartbeat

List Agents

# List all available agents
curl "http://localhost:8080/api/control-plane/agents"

# Filter by capability
curl "http://localhost:8080/api/control-plane/agents?capability=debate"

# Include offline agents
curl "http://localhost:8080/api/control-plane/agents?available=false"

Response:

{
"agents": [
{
"agent_id": "claude-3",
"capabilities": ["debate", "code"],
"status": "ready",
"model": "claude-3-opus",
"provider": "anthropic",
"tasks_completed": 42,
"avg_latency_ms": 1234.5
}
],
"total": 1
}

Tasks

MethodEndpointDescription
POST/api/control-plane/tasksSubmit a task
GET/api/control-plane/tasks/:idGet task status
POST/api/control-plane/tasks/:id/completeComplete task
POST/api/control-plane/tasks/:id/failFail task
POST/api/control-plane/tasks/:id/cancelCancel task
POST/api/control-plane/tasks/claimClaim next task

Submit Task

curl -X POST http://localhost:8080/api/control-plane/tasks \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"task_type": "debate",
"payload": {
"question": "What is the best programming language?",
"context": "For web development"
},
"required_capabilities": ["debate"],
"priority": "normal",
"timeout_seconds": 300,
"metadata": {
"user_id": "123"
}
}'

Response:

{
"task_id": "task_abc123"
}

Vetted Decisionmaking Sessions (Deliberations)

MethodEndpointDescription
POST/api/control-plane/deliberationsRun or queue a vetted decisionmaking session
GET/api/control-plane/deliberations/:idGet vetted decisionmaking result
GET/api/control-plane/deliberations/:id/statusGet vetted decisionmaking status

Submit Vetted Decisionmaking (Sync)

curl -X POST http://localhost:8080/api/control-plane/deliberations \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"content": "Assess the security risk of this API design",
"decision_type": "debate",
"priority": "high",
"response_channels": [{"platform": "http_api"}]
}'

Submit Vetted Decisionmaking (Async)

curl -X POST http://localhost:8080/api/control-plane/deliberations \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"content": "Review this architecture for scalability risks",
"decision_type": "debate",
"async": true,
"priority": "high",
"required_capabilities": ["deliberation"]
}'

Response:

{
"task_id": "task_abc123",
"request_id": "req_xyz789",
"status": "queued"
}

Run a Vetted Decisionmaking Worker (Deliberation Worker)

python scripts/control_plane_deliberation_worker.py --agent-id deliberation-worker-1

Poll Status

curl http://localhost:8080/api/control-plane/deliberations/req_xyz789/status

Health

MethodEndpointDescription
GET/api/control-plane/healthSystem health
GET/api/control-plane/health/:agent_idAgent health
GET/api/control-plane/statsStatistics

System Health

curl http://localhost:8080/api/control-plane/health

Response:

{
"status": "healthy",
"agents": {
"claude-3": {
"status": "healthy",
"last_check": "2024-01-15T10:30:00Z",
"consecutive_failures": 0
}
}
}

Agent Selection Strategies

When selecting an agent for a task, the control plane supports multiple strategies:

Least Loaded (Default)

Selects the agent with the fewest completed tasks (proxy for current load):

agent = await coordinator.select_agent(
capabilities=["debate"],
strategy="least_loaded",
)

Round Robin

Cycles through agents based on last heartbeat:

agent = await coordinator.select_agent(
capabilities=["debate"],
strategy="round_robin",
)

Random

Random selection from available agents:

agent = await coordinator.select_agent(
capabilities=["debate"],
strategy="random",
)

Excluding Agents

agent = await coordinator.select_agent(
capabilities=["debate"],
strategy="least_loaded",
exclude=["problematic-agent-1", "problematic-agent-2"],
)

Error Handling

Task Retries

Tasks that fail are automatically retried based on max_task_retries:

task_id = await coordinator.submit_task(
task_type="analysis",
payload={...},
required_capabilities=["analysis"],
)

# If the task fails, it will be requeued up to max_task_retries times
# unless requeue=False is passed to fail_task()

Circuit Breakers

The control plane integrates with Aragora's circuit breaker system. Agents that fail repeatedly are temporarily excluded from task assignment.

Graceful Degradation

When Redis is unavailable, the control plane falls back to an in-memory store, allowing local development without Redis.

Best Practices

Agent Implementation

  1. Send Regular Heartbeats: Send heartbeats more frequently than HEARTBEAT_TIMEOUT / 2
  2. Report Status Changes: Update status when transitioning between states
  3. Handle Graceful Shutdown: Set status to DRAINING before shutdown
  4. Implement Health Probes: Provide meaningful health checks
import asyncio

async def agent_loop(coordinator, agent_id):
# Register
await coordinator.register_agent(
agent_id=agent_id,
capabilities=["debate"],
)

try:
while running:
# Claim task
task = await coordinator.claim_task(
agent_id=agent_id,
capabilities=["debate"],
)

if task:
# Process
result = await process_task(task)
await coordinator.complete_task(task.id, result, agent_id)

# Heartbeat
await coordinator.heartbeat(agent_id)
await asyncio.sleep(5)
finally:
# Graceful shutdown
await coordinator.heartbeat(agent_id, AgentStatus.DRAINING)
await coordinator.unregister_agent(agent_id)

Task Design

  1. Idempotent Tasks: Design tasks to be safely retried
  2. Reasonable Timeouts: Set timeouts appropriate for task complexity
  3. Granular Capabilities: Use specific capabilities for better routing
  4. Include Metadata: Add context for debugging and monitoring

Monitoring

Query statistics regularly:

stats = await coordinator.get_stats()

# Registry stats
print(f"Total agents: {stats['registry']['total_agents']}")
print(f"Available: {stats['registry']['available_agents']}")

# Scheduler stats
print(f"Pending tasks: {stats['scheduler']['pending_tasks']}")
print(f"Running tasks: {stats['scheduler']['running_tasks']}")

# Health stats
print(f"Healthy agents: {stats['health']['healthy_count']}")

Troubleshooting

Agent Shows Offline

  1. Check if heartbeats are being sent
  2. Verify Redis connectivity
  3. Check HEARTBEAT_TIMEOUT configuration
  4. Look for network issues between agent and Redis

Tasks Not Being Processed

  1. Verify agents have required capabilities
  2. Check if agents are in READY status
  3. Look for circuit breaker activations
  4. Check task queue depth in statistics

High Latency

  1. Monitor Redis performance
  2. Check for task accumulation
  3. Consider adding more agents
  4. Review task timeout settings

Architecture Decisions

See ADR-002: Control Plane Architecture for detailed architectural decisions.