Data Models
All system data models. They are the source of truth for communication between components.
Configuration models (config/schema.py)
All use Pydantic v2 with extra = "forbid" (unknown keys → validation error).
LLMConfig
class LLMConfig(BaseModel):
provider: str = "litellm" # only supported provider
mode: str = "direct" # "direct" | "proxy"
model: str = "gpt-4o" # any LiteLLM model
api_base: str | None = None # custom base URL (LiteLLM Proxy, Ollama, etc.)
api_key_env: str = "LITELLM_API_KEY" # name of the env var containing the API key
timeout: int = 60 # seconds per LLM call
retries: int = 2 # retries on transient errors
stream: bool = True # streaming enabled by default
prompt_caching: bool = False # F14: mark system with cache_control (Anthropic/OpenAI)
AgentConfig
class AgentConfig(BaseModel):
system_prompt: str # injected as the first message
allowed_tools: list[str] = [] # [] = all available tools
confirm_mode: str = "confirm-sensitive" # "yolo"|"confirm-all"|"confirm-sensitive"
max_steps: int = 20 # Pydantic default=20; varies in DEFAULT_AGENTS:
# plan=20, build=50, resume=15, review=20
LoggingConfig
class LoggingConfig(BaseModel):
# v3: "human" = agent traceability level (HUMAN=25)
level: str = "human" # "debug"|"info"|"human"|"warn"|"error"
file: Path|None = None # path to the .jsonl file (optional)
verbose: int = 0 # 0=warn, 1=info, 2=debug, 3+=all
WorkspaceConfig
class WorkspaceConfig(BaseModel):
root: Path = Path(".") # workspace root; all ops confined here
allow_delete: bool = False # gate for delete_file tool
MCPServerConfig / MCPConfig
class MCPServerConfig(BaseModel):
name: str # identifier; used in prefix: mcp_{name}_{tool}
url: str # HTTP base URL of the MCP server
token_env: str | None = None # env var with the Bearer token
token: str | None = None # inline token (not recommended in production)
class MCPConfig(BaseModel):
servers: list[MCPServerConfig] = []
IndexerConfig (F10)
class IndexerConfig(BaseModel):
enabled: bool = True # if False, no indexing and no tree in the prompt
max_file_size: int = 1_000_000 # bytes; larger files are skipped
exclude_dirs: list[str] = [] # additional dirs (besides .git, node_modules, etc.)
exclude_patterns: list[str] = [] # additional patterns (besides *.pyc, *.min.js, etc.)
use_cache: bool = True # disk cache with 5-minute TTL
The indexer always excludes by default: .git, node_modules, __pycache__, .venv, venv, dist, build, .tox, .pytest_cache, .mypy_cache.
ContextConfig (F11)
class ContextConfig(BaseModel):
max_tool_result_tokens: int = 2000 # Level 1: truncate long tool results (~4 chars/token)
summarize_after_steps: int = 8 # Level 2: compress old messages after N steps
keep_recent_steps: int = 4 # Level 2: recent steps to preserve intact
max_context_tokens: int = 80000 # Level 3: total hard limit (~4 chars/token)
parallel_tools: bool = True # parallelize independent tool calls
A value of 0 disables the corresponding mechanism:
max_tool_result_tokens=0→ no truncation of tool results.summarize_after_steps=0→ no LLM compression.max_context_tokens=0→ no sliding window (dangerous for long tasks).
HookItemConfig (v4-A1)
class HookItemConfig(BaseModel):
name: str = "" # hook identifier (e.g.: "python-lint")
command: str # shell command to execute; supports {file} placeholder
matcher: str = "*" # regex/glob to filter tools
file_patterns: list[str] = [] # glob patterns (e.g.: ["*.py", "*.ts"])
timeout: int = 10 # ge=1, le=300 — maximum seconds
async_: bool = False # alias="async" — run in background
enabled: bool = True # if False, the hook is ignored
Backward-compat alias: HookConfig = HookItemConfig.
HooksConfig (v4-A1)
class HooksConfig(BaseModel):
# 10 lifecycle events
pre_tool_use: list[HookItemConfig] = []
post_tool_use: list[HookItemConfig] = []
pre_llm_call: list[HookItemConfig] = []
post_llm_call: list[HookItemConfig] = []
session_start: list[HookItemConfig] = []
session_end: list[HookItemConfig] = []
on_error: list[HookItemConfig] = []
agent_complete: list[HookItemConfig] = []
budget_warning: list[HookItemConfig] = []
context_compress: list[HookItemConfig] = []
# Backward-compat v3-M4: internally mapped to post_tool_use
post_edit: list[HookItemConfig] = []
GuardrailsConfig (v4-A2)
class GuardrailsConfig(BaseModel):
enabled: bool = False
protected_files: list[str] = [] # glob patterns
blocked_commands: list[str] = [] # regex patterns
max_files_modified: int | None = None
max_lines_changed: int | None = None
max_commands_executed: int | None = None
require_test_after_edit: bool = False
quality_gates: list[QualityGateConfig] = []
code_rules: list[CodeRuleConfig] = []
QualityGateConfig (v4-A2)
class QualityGateConfig(BaseModel):
name: str # gate name (e.g.: "lint", "tests")
command: str # shell command to execute
required: bool = True # if False, informational only
timeout: int = 60 # ge=1, le=600 — seconds
CodeRuleConfig (v4-A2)
class CodeRuleConfig(BaseModel):
pattern: str # regex to search in written code
message: str # message for the agent
severity: Literal["warn", "block"] = "warn"
SkillsConfig (v4-A3)
class SkillsConfig(BaseModel):
auto_discover: bool = True # discover skills in .architect/skills/
inject_by_glob: bool = True # inject skills based on active files
MemoryConfig (v4-A4)
class MemoryConfig(BaseModel):
enabled: bool = False # enable procedural memory
auto_detect_corrections: bool = True # detect corrections automatically
EvaluationConfig (F12)
class EvaluationConfig(BaseModel):
mode: Literal["off", "basic", "full"] = "off"
max_retries: int = 2 # ge=1, le=5 — retries in "full" mode
confidence_threshold: float = 0.8 # ge=0.0, le=1.0 — threshold to accept result
mode="off": no evaluation (default, does not consume extra tokens).mode="basic": one extra LLM call after execution. If it fails, status →"partial".mode="full": up tomax_retriesevaluation + correction cycles with a new prompt.
CommandsConfig (F13)
class CommandsConfig(BaseModel):
enabled: bool = True # if False, run_command is not registered
default_timeout: int = 30 # default seconds (ge=1, le=600)
max_output_lines: int = 200 # lines before truncation (ge=10, le=5000)
blocked_patterns: list[str] = [] # extra regexes to block
safe_commands: list[str] = [] # additional commands classified as 'safe'
allowed_only: bool = False # if True, dangerous commands rejected in execute()
CLI override: --allow-commands (enabled=True) / --no-commands (enabled=False).
CostsConfig (F14)
class CostsConfig(BaseModel):
enabled: bool = True # if False, CostTracker is not instantiated
prices_file: Path | None = None # custom prices; if None, uses default_prices.json
budget_usd: float | None = None # USD limit; BudgetExceededError if exceeded
warn_at_usd: float | None = None # warning threshold (log warning, does not stop)
CLI override: --budget FLOAT (equivalent to budget_usd).
LLMCacheConfig (F14)
class LLMCacheConfig(BaseModel):
enabled: bool = False # if True, enables LocalLLMCache
dir: Path = Path("~/.architect/cache") # directory on disk
ttl_hours: int = 24 # ge=1, le=8760 — hours of validity
CLI override: --cache (enabled=True), --no-cache (enabled=False), --cache-clear (clears before running).
AppConfig (root)
class AppConfig(BaseModel):
llm: LLMConfig = LLMConfig()
agents: dict[str, AgentConfig] = {} # custom agents from YAML
logging: LoggingConfig = LoggingConfig()
workspace: WorkspaceConfig = WorkspaceConfig()
mcp: MCPConfig = MCPConfig()
indexer: IndexerConfig = IndexerConfig() # F10
context: ContextConfig = ContextConfig() # F11
evaluation: EvaluationConfig = EvaluationConfig() # F12
commands: CommandsConfig = CommandsConfig() # F13
costs: CostsConfig = CostsConfig() # F14
llm_cache: LLMCacheConfig = LLMCacheConfig() # F14
hooks: HooksConfig = HooksConfig() # v4-A1 (backward-compat v3-M4)
guardrails: GuardrailsConfig = GuardrailsConfig() # v4-A2
skills: SkillsConfig = SkillsConfig() # v4-A3
memory: MemoryConfig = MemoryConfig() # v4-A4
sessions: SessionsConfig = SessionsConfig() # v4-B1
ralph: RalphLoopConfig = RalphLoopConfig() # v4-C1
parallel: ParallelRunsConfig = ParallelRunsConfig() # v4-C2
checkpoints: CheckpointsConfig = CheckpointsConfig() # v4-C4
auto_review: AutoReviewConfig = AutoReviewConfig() # v4-C5
telemetry: TelemetryConfig = TelemetryConfig() # v1.0.0 (D4)
health: HealthConfig = HealthConfig() # v1.0.0 (D2)
LLM Models (llm/adapter.py)
ToolCall
Represents a tool call that the LLM requests to execute.
class ToolCall(BaseModel):
id: str # unique ID assigned by the LLM (e.g.: "call_abc123")
name: str # tool name (e.g.: "edit_file")
arguments: dict[str, Any] # already-parsed arguments (adapter handles JSON string → dict)
LLMResponse
Normalized LLM response, regardless of the provider.
class LLMResponse(BaseModel):
content: str | None # response text (None if there are tool_calls)
tool_calls: list[ToolCall] # list of requested tool calls ([] if none)
finish_reason: str # "stop" | "tool_calls" | "length" | ...
usage: dict | None # {"prompt_tokens": N, "completion_tokens": N,
# "total_tokens": N, "cache_read_input_tokens": N}
cache_read_input_tokens is available when the provider uses prompt caching (Anthropic). The CostTracker uses it to calculate the reduced cost of cached tokens.
The most important finish_reason values:
"stop"+tool_calls=[]: the agent finished.contentis the final response."tool_calls"or"stop"+tool_calls != []: there are tools to execute."length": the LLM ran out of tokens; the loop can continue.
StreamChunk
Text streaming chunk.
class StreamChunk(BaseModel):
type: str # always "content" (for future extension)
data: str # text fragment from the LLM
Agent state (core/state.py)
StopReason (enum, v3)
class StopReason(Enum):
"""Reason why the agent stopped."""
LLM_DONE = "llm_done" # Natural: the LLM did not request more tools
MAX_STEPS = "max_steps" # Watchdog: step limit reached
BUDGET_EXCEEDED = "budget_exceeded" # Watchdog: cost limit exceeded
CONTEXT_FULL = "context_full" # Watchdog: context window full
TIMEOUT = "timeout" # Watchdog: total time exceeded
USER_INTERRUPT = "user_interrupt" # The user pressed Ctrl+C / SIGTERM
LLM_ERROR = "llm_error" # Unrecoverable LLM error
Distinguishes natural termination (LLM_DONE) from forced stops by safety nets. Stored in AgentState.stop_reason and included in the JSON output.
ToolCallResult (frozen dataclass)
Immutable result of a tool execution.
@dataclass(frozen=True)
class ToolCallResult:
tool_name: str
args: dict[str, Any]
result: ToolResult # from tools/base.py
was_confirmed: bool = True
was_dry_run: bool = False
timestamp: float = field(default_factory=time.time)
StepResult (frozen dataclass)
Immutable result of a complete loop iteration.
@dataclass(frozen=True)
class StepResult:
step_number: int
llm_response: LLMResponse
tool_calls_made: list[ToolCallResult]
timestamp: float = field(default_factory=time.time)
AgentState (mutable dataclass)
Accumulated state throughout the agent’s execution.
@dataclass
class AgentState:
messages: list[dict] # OpenAI history (grows each step)
steps: list[StepResult] # step history (append-only)
status: str = "running" # "running" | "success" | "partial" | "failed"
stop_reason: StopReason | None = None # v3: stop reason (None while running)
final_output: str | None = None # final response when status != "running"
start_time: float = field(...)
model: str | None = None # model used (for output)
cost_tracker: CostTracker | None = None # F14: cost tracker (injected by CLI)
# Computed properties
current_step: int # len(steps)
total_tool_calls: int # sum of all tool calls across all steps
is_finished: bool # status != "running"
def to_output_dict(self) -> dict:
# Serialization for --json
result = {
"status": self.status,
"stop_reason": self.stop_reason.value if self.stop_reason else None,
"output": self.final_output or "",
"steps": len(self.steps),
"tools_used": [...], # list of {name, partial args, success}
"duration_seconds": time.time() - self.start_time,
"model": self.model,
}
# F14: include costs if data is available
if self.cost_tracker and self.cost_tracker.has_data():
result["costs"] = self.cost_tracker.summary()
return result
The status field can be modified externally by the SelfEvaluator (F12) or by BudgetExceededError (F14).
Costs module (costs/) — F14
ModelPricing (dataclass)
@dataclass
class ModelPricing:
input_per_million: float # USD per million input tokens
output_per_million: float # USD per million output tokens
cached_input_per_million: float | None # USD/M for cached tokens (None = use input_per_million)
PriceLoader
Loads prices from costs/default_prices.json (or a custom file via CostsConfig.prices_file).
class PriceLoader:
def __init__(self, custom_prices_file: Path | None = None): ...
def get_prices(self, model: str) -> ModelPricing:
# 1. Exact match (e.g.: "gpt-4o" → prices["gpt-4o"])
# 2. Prefix match (e.g.: "claude-sonnet-4-6-20250514" → prices["claude-sonnet-4-6"])
# 3. Generic fallback: input=3.0, output=15.0, cached=None
# NEVER raises exceptions
Models embedded in default_prices.json: gpt-4o, gpt-4o-mini, gpt-4.1, gpt-4.1-mini, claude-sonnet-4-6, claude-opus-4-6, claude-haiku-4-5, gemini/gemini-2.0-flash, deepseek/deepseek-chat, ollama (zero cost).
StepCost (dataclass)
@dataclass
class StepCost:
step: int # agent step number
model: str # model used (e.g.: "gpt-4o")
input_tokens: int # total input tokens (includes cached)
output_tokens: int # output tokens
cached_tokens: int # tokens served from provider cache
cost_usd: float # step cost in USD
source: str # "agent" | "eval" | "summary"
CostTracker
class CostTracker:
def __init__(
self,
price_loader: PriceLoader,
budget_usd: float | None = None, # limit; BudgetExceededError if exceeded
warn_at_usd: float | None = None, # warning threshold (log warning, no exception)
): ...
def record(self, step: int, model: str, usage: dict, source: str = "agent") -> None:
# Extracts prompt_tokens, completion_tokens, cache_read_input_tokens
# Calculates differentiated cost: cached_tokens x cached_rate + non_cached x input_rate + output x output_rate
# Raises BudgetExceededError if total_cost_usd > budget_usd
# NEVER raises other exceptions
# Aggregation properties
total_input_tokens: int # sum of all input_tokens
total_output_tokens: int # sum of all output_tokens
total_cached_tokens: int # sum of all cached_tokens
total_cost_usd: float # total cost in USD
step_count: int # number of recorded steps
def has_data(self) -> bool: ... # True if step_count > 0
def summary(self) -> dict: ... # totals + breakdown by_source
def format_summary_line(self) -> str: # "$0.0042 (12,450 in / 3,200 out / 500 cached)"
summary() returns:
{
"total_input_tokens": 12450,
"total_output_tokens": 3200,
"total_cached_tokens": 500,
"total_tokens": 15650,
"total_cost_usd": 0.004213,
"by_source": {"agent": 0.003800, "eval": 0.000413},
}
BudgetExceededError
Raised by CostTracker.record() when total_cost_usd > budget_usd. The AgentLoop catches it, sets state.status = "partial", and terminates the loop.
class BudgetExceededError(Exception):
pass
Lifecycle Hooks (core/hooks.py) — v4-A1
HookEvent (enum)
class HookEvent(Enum):
PRE_TOOL_USE = "pre_tool_use"
POST_TOOL_USE = "post_tool_use"
PRE_LLM_CALL = "pre_llm_call"
POST_LLM_CALL = "post_llm_call"
SESSION_START = "session_start"
SESSION_END = "session_end"
ON_ERROR = "on_error"
BUDGET_WARNING = "budget_warning"
CONTEXT_COMPRESS = "context_compress"
AGENT_COMPLETE = "agent_complete"
HookDecision (enum)
class HookDecision(Enum):
ALLOW = "allow" # Allow the action
BLOCK = "block" # Block the action (pre-hooks only)
MODIFY = "modify" # Modify input and allow
HookResult (dataclass)
@dataclass
class HookResult:
decision: HookDecision = HookDecision.ALLOW
reason: str | None = None # reason for block/error
additional_context: str | None = None # extra context for the LLM
updated_input: dict[str, Any] | None = None # modified input (MODIFY)
duration_ms: float = 0.0
HooksRegistry
class HooksRegistry:
hooks: dict[HookEvent, list[HookConfig]]
def get_hooks(self, event: HookEvent) -> list[HookConfig]: ...
def has_hooks(self) -> bool: ...
HookExecutor
class HookExecutor:
def __init__(self, registry: HooksRegistry, workspace_root: str): ...
def execute_hook(self, hook, event, context, stdin_data) -> HookResult: ...
def run_event(self, event, context, stdin_data) -> list[HookResult]: ...
def run_post_edit(self, tool_name, args) -> str | None: ... # backward-compat v3
Exit code protocol: 0=ALLOW, 2=BLOCK, other=Error (warning, does not break loop).
Env vars: ARCHITECT_EVENT, ARCHITECT_WORKSPACE, ARCHITECT_TOOL, ARCHITECT_FILE.
HookRunResult (legacy, v3-M4)
@dataclass
class HookRunResult:
hook_name: str
success: bool
output: str # truncated to 1000 chars
exit_code: int
PostEditHooks (legacy) remains available for backward compatibility.
GuardrailsEngine (core/guardrails.py) — v4-A2
Deterministic security engine evaluated BEFORE hooks.
class GuardrailsEngine:
def __init__(self, config: GuardrailsConfig, workspace_root: str): ...
def check_file_access(self, file_path: str, action: str) -> tuple[bool, str]: ...
def check_command(self, command: str) -> tuple[bool, str]: ...
def check_edit_limits(self, file_path: str, lines_added: int, lines_removed: int) -> tuple[bool, str]: ...
def check_code_rules(self, content: str, file_path: str) -> list[tuple[str, str]]: ...
def should_force_test(self) -> bool: ...
def run_quality_gates(self) -> list[dict]: ...
Internal tracking: _files_modified, _lines_changed, _commands_executed, _edits_since_last_test.
Skills (skills/loader.py) — v4-A3
SkillInfo (dataclass)
@dataclass
class SkillInfo:
name: str
description: str = ""
globs: list[str] = field(default_factory=list)
content: str = ""
source: str = "" # "local" | "installed" | "project"
SkillsLoader
class SkillsLoader:
def __init__(self, workspace_root: str): ...
def load_project_context(self) -> str | None: ... # .architect.md / AGENTS.md / CLAUDE.md
def discover_skills(self) -> list[SkillInfo]: ... # .architect/skills/ + installed-skills/
def get_relevant_skills(self, file_paths: list[str]) -> list[SkillInfo]: ...
def build_system_context(self, active_files: list[str] | None) -> str: ...
SkillInstaller
class SkillInstaller:
def __init__(self, workspace_root: str): ...
def install_from_github(self, repo_spec: str) -> bool: ... # sparse checkout
def create_local(self, name: str) -> Path: ... # SKILL.md template
def list_installed(self) -> list[dict[str, str]]: ... # {name, source, path}
def uninstall(self, name: str) -> bool: ...
Procedural Memory (skills/memory.py) — v4-A4
class ProceduralMemory:
CORRECTION_PATTERNS = [
(r"no[,.]?\s+(usa|utiliza|haz|pon|cambia|es)\b", "direct_correction"),
(r"(eso no|eso está mal|no es correcto|está mal)", "negation"),
(r"(en realidad|realmente|de hecho)\b", "clarification"),
(r"(debería ser|el correcto es|el comando es)\b", "should_be"),
(r"(no funciona así|así no)\b", "wrong_approach"),
(r"(siempre|nunca)\s+(usa|hagas|pongas)\b", "absolute_rule"),
]
def __init__(self, workspace_root: str): ...
def detect_correction(self, user_msg: str, prev_agent_action: str | None) -> str | None: ...
def add_correction(self, correction: str) -> None: ... # dedup + persist
def add_pattern(self, pattern: str) -> None: ...
def get_context(self) -> str: ... # for injection into system prompt
def analyze_session_learnings(self, conversation: list[dict]) -> list[str]: ...
Persists in .architect/memory.md with the format: - [YYYY-MM-DD] Correction: {text}.
Local LLM Cache (llm/cache.py) — F14
LocalLLMCache
class LocalLLMCache:
def __init__(self, cache_dir: Path, ttl_hours: int = 24): ...
def get(
self,
messages: list[dict],
tools: list[dict] | None,
) -> LLMResponse | None:
# Returns LLMResponse if there is a valid hit (not expired)
# Returns None on miss, expiration, or error — NEVER raises
def set(
self,
messages: list[dict],
tools: list[dict] | None,
response: LLMResponse,
) -> None:
# Saves response to disk — fails silently on error
def clear(self) -> int: ... # deletes all .json files; returns count
def stats(self) -> dict: ... # {entries, expired, total_size_bytes, dir}
def _make_key(self, messages, tools) -> str:
# SHA-256[:24] of json.dumps({"messages":..., "tools":...}, sort_keys=True)
# Deterministic regardless of key order
One .json file per entry in cache_dir. TTL based on the file’s mtime. The LLMAdapter queries it before calling LiteLLM and saves the response on a miss.
Evaluator (core/evaluator.py) — F12
EvalResult (dataclass)
Result of an agent evaluation by the SelfEvaluator.
@dataclass
class EvalResult:
completed: bool # Was the task completed correctly?
confidence: float # confidence level [0.0, 1.0] (clamped)
issues: list[str] = [] # list of detected issues
suggestion: str = "" # suggestion to improve the result
raw_response: str = "" # raw LLM response (debugging)
EvalResult example with issues:
EvalResult(
completed=False,
confidence=0.35,
issues=["The file tests/test_utils.py was not created", "The imports were not updated"],
suggestion="Create the file tests/test_utils.py with pytest and update the imports in src/",
raw_response='{"completed": false, "confidence": 0.35, ...}'
)
Tool result (tools/base.py)
ToolResult
The only possible return type from any tool. Exceptions are never raised.
class ToolResult(BaseModel):
success: bool
output: str # always present; on failure contains error description
error: str | None # technical error message (None on success)
Tool argument models (tools/schemas.py)
All with extra = "forbid".
Filesystem tools
class ReadFileArgs(BaseModel):
path: str # relative to workspace root
class WriteFileArgs(BaseModel):
path: str
content: str
mode: str = "overwrite" # "overwrite" | "append"
class DeleteFileArgs(BaseModel):
path: str
class ListFilesArgs(BaseModel):
path: str = "."
pattern: str|None = None # glob (e.g.: "*.py", "**/*.md")
recursive: bool = False
Editing tools (F9)
class EditFileArgs(BaseModel):
path: str # file to modify
old_str: str # exact text to replace (must be unique in the file)
new_str: str # replacement text
class ApplyPatchArgs(BaseModel):
path: str # file to modify
patch: str # unified diff (format --- +++ @@ ...)
Command execution tool (F13)
class RunCommandArgs(BaseModel):
command: str # command to execute (shell string)
cwd: str | None = None # working directory (relative to workspace)
timeout: int = 30 # seconds (ge=1, le=600)
env: dict[str, str] | None = None # additional environment variables
Search tools (F10)
class SearchCodeArgs(BaseModel):
pattern: str # Python regular expression
path: str = "." # search directory
file_pattern: str = "*.py" # glob to filter files
context_lines: int = 2 # context lines per match
max_results: int = 50
class GrepArgs(BaseModel):
pattern: str # literal text
path: str = "."
file_pattern: str = "*"
recursive: bool = True
case_sensitive: bool = True
max_results: int = 100
class FindFilesArgs(BaseModel):
pattern: str # filename glob (e.g.: "*.yaml")
path: str = "."
recursive: bool = True
Indexer models (indexer/tree.py) — F10
@dataclass
class FileInfo:
path: Path # path relative to workspace root
size: int # bytes
ext: str # extension (e.g.: ".py", ".ts", ".yaml")
language: str # language name (e.g.: "Python", "TypeScript")
lines: int # number of lines (0 if unreadable)
@dataclass
class RepoIndex:
root: Path
files: list[FileInfo]
total_files: int
total_lines: int
languages: dict[str, int] # {language: number of files}
build_time_ms: float
def format_tree(self) -> str:
# Returns the workspace tree as a string for the system prompt
# <=300 files → detailed tree with Unicode connectors
# >300 files → compact view grouped by root directory
The RepoIndexer builds the RepoIndex by traversing the workspace with os.walk(), filtering excluded directories and files. The IndexCache serializes/deserializes the index in JSON with a 5-minute TTL.
Sessions (features/sessions.py) — v4-B1
SessionsConfig
class SessionsConfig(BaseModel):
auto_save: bool = True # save state after each step
cleanup_after_days: int = 7 # days after which `cleanup` deletes sessions
SessionState (dataclass)
@dataclass
class SessionState:
session_id: str # format: YYYYMMDD-HHMMSS-hexhex
task: str # original user prompt
agent: str # agent name (build, plan, etc.)
model: str # LLM model used
status: str # running, success, partial, failed
steps_completed: int # steps executed
messages: list[dict] # LLM message history
files_modified: list[str] # files touched during the session
total_cost: float # accumulated cost in USD
started_at: str # ISO 8601 timestamp
updated_at: str # ISO 8601 timestamp (updated on each save)
stop_reason: str | None # stop reason (llm_done, timeout, etc.)
metadata: dict # arbitrary additional data
Methods: to_dict() / from_dict() for JSON serialization.
Sessions with more than 50 messages are automatically truncated: the last 30 messages are preserved and truncated: true is set in metadata.
SessionManager
class SessionManager:
def __init__(self, workspace_root: str): ...
def save(self, state: SessionState) -> None: ...
def load(self, session_id: str) -> SessionState | None: ... # None if not found or corrupt JSON
def list_sessions(self) -> list[dict]: ... # summarized metadata, newest first
def cleanup(self, older_than_days: int = 7) -> int: ... # returns count of deleted sessions
def delete(self, session_id: str) -> bool: ...
generate_session_id
def generate_session_id() -> str:
# Format: YYYYMMDD-HHMMSS-hexhex
# Example: 20260223-143022-a1b2c3
Reports (features/report.py) — v4-B2
ExecutionReport (dataclass)
@dataclass
class ExecutionReport:
task: str
agent: str
model: str
status: str # success, partial, failed
duration_seconds: float
steps: int
total_cost: float
stop_reason: str | None = None
files_modified: list[dict] = field(default_factory=list)
quality_gates: list[dict] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
git_diff: str | None = None
timeline: list[dict] = field(default_factory=list)
ReportGenerator
class ReportGenerator:
def __init__(self, report: ExecutionReport): ...
def to_json(self) -> str: ... # full JSON, parseable by jq
def to_markdown(self) -> str: ... # Markdown with tables and sections
def to_github_pr_comment(self) -> str: ... # GitHub with collapsible <details>
collect_git_diff
def collect_git_diff(workspace_root: str) -> str | None:
# Runs `git diff HEAD`, truncates to 50KB
# Returns None if not a git repo or no changes
Status icons: success → OK, partial → WARN, failed → FAIL.
Dry Run (features/dryrun.py) — v4-B4
PlannedAction (dataclass)
@dataclass
class PlannedAction:
tool_name: str
description: str
tool_input: dict
DryRunTracker
class DryRunTracker:
actions: list[PlannedAction]
def record_action(self, tool_name: str, tool_input: dict) -> None: ...
def get_plan_summary(self) -> str: ... # formatted summary of all actions
@property
def action_count(self) -> int: ...
Constants: WRITE_TOOLS (frozenset) and READ_TOOLS (frozenset), disjoint. Only actions from WRITE_TOOLS are recorded in the tracker.
_summarize_action(tool_name, tool_input) generates readable descriptions with 5 code paths (path, command, long command, fallback keys, empty dict).
Ralph Loop (features/ralph.py) — v4-C1
RalphConfig (dataclass)
@dataclass
class RalphConfig:
task: str # task/prompt for the agent
checks: list[str] # shell commands that must pass (exit 0)
max_iterations: int = 25 # iteration limit
max_cost: float | None = None # maximum total cost in USD
max_time: int | None = None # maximum total time in seconds
completion_tag: str = "COMPLETE" # tag the agent emits when declaring completion
agent: str = "build" # agent to use in each iteration
model: str | None = None # LLM model (None = config default)
use_worktree: bool = False # run in an isolated git worktree
LoopIteration (dataclass)
@dataclass
class LoopIteration:
number: int # iteration number (1-based)
status: str # "success", "partial", "failed"
checks_passed: list[str] # checks that passed
checks_failed: list[str] # checks that failed
cost: float # USD cost of this iteration
duration: float # seconds
RalphLoopResult (dataclass)
@dataclass
class RalphLoopResult:
success: bool # True if all checks passed
iterations: list[LoopIteration] # iteration history
total_cost: float # total accumulated cost in USD
total_duration: float # total duration in seconds
stop_reason: str # "checks_passed", "max_iterations", "max_cost", "max_time", "agent_failed"
RalphLoop
class RalphLoop:
def __init__(
self,
agent_factory: Callable[..., Any],
config: RalphConfig,
) -> None: ...
def run(self) -> RalphLoopResult: ...
def _run_checks(self, checks: list[str]) -> tuple[list[str], list[str]]: ...
def _build_iteration_prompt(self, iteration: int, failed: list[str], outputs: dict) -> str: ...
RalphLoopConfig (Pydantic — config/schema.py)
class RalphLoopConfig(BaseModel):
max_iterations: int = 25 # 1-100
max_cost: float | None = None # USD, None = no limit
max_time: int | None = None # seconds, None = no limit
completion_tag: str = "COMPLETE"
agent: str = "build"
Pipeline Mode (features/pipelines.py) — v4-C3
PipelineStep (dataclass)
@dataclass
class PipelineStep:
name: str # unique step name
prompt: str # prompt (supports {{variables}})
agent: str = "build" # agent to use
model: str | None = None # LLM model (None = default)
max_steps: int = 50 # maximum agent steps
condition: str | None = None # shell condition (exit 0 = execute)
output_var: str | None = None # capture output in variable {{name}}
checks: list[str] = field(default_factory=list) # post-step checks
checkpoint: bool = False # create git checkpoint after completion
PipelineConfig (dataclass)
@dataclass
class PipelineConfig:
name: str # pipeline name
steps: list[PipelineStep] # list of steps to execute
variables: dict[str, str] = field(default_factory=dict) # initial variables
PipelineStepResult (dataclass)
@dataclass
class PipelineStepResult:
step_name: str # step name
status: str # "success", "partial", "failed", "skipped"
output: str # agent output
cost: float # step cost in USD
duration: float # seconds
PipelineRunner
class PipelineRunner:
def __init__(
self,
agent_factory: Callable[..., Any],
config: PipelineConfig,
) -> None: ...
def run(self, from_step: str | None = None, dry_run: bool = False) -> list[PipelineStepResult]: ...
def _substitute_variables(self, text: str, variables: dict) -> str: ...
def _check_condition(self, condition: str) -> bool: ...
def _run_checks(self, checks: list[str]) -> tuple[list[str], list[str]]: ...
def _create_checkpoint(self, step_name: str) -> None: ...
Parallel Runs (features/parallel.py) — v4-C2
ParallelConfig (dataclass)
@dataclass
class ParallelConfig:
tasks: list[str] # tasks to execute
workers: int = 3 # number of parallel workers
models: list[str] = field(default_factory=list) # round-robin models
agent: str = "build"
budget_per_worker: float | None = None # USD per worker
timeout_per_worker: int | None = None # seconds per worker
WorkerResult (dataclass)
@dataclass
class WorkerResult:
worker_id: int # 1-based
branch: str # "architect/parallel-1"
model: str # model used
status: str # "success", "partial", "failed", "timeout"
steps: int # agent steps
cost: float # cost in USD
duration: float # seconds
files_modified: list[str] # changed files
worktree_path: str # worktree path
ParallelRunner
class ParallelRunner:
WORKTREE_PREFIX = ".architect-parallel"
def __init__(
self,
config: ParallelConfig,
workspace_root: str,
) -> None: ...
def run(self) -> list[WorkerResult]: ...
def cleanup_worktrees(self) -> None: ...
def _create_worktrees(self) -> None: ...
def _run_worker(self, worker_id: int, task: str, model: str) -> WorkerResult: ...
ParallelRunsConfig (Pydantic — config/schema.py)
class ParallelRunsConfig(BaseModel):
workers: int = 3 # 1-10
agent: str = "build"
max_steps: int = 50
budget_per_worker: float | None = None
timeout_per_worker: int | None = None
Checkpoints (features/checkpoints.py) — v4-C4
Checkpoint (dataclass)
@dataclass(frozen=True)
class Checkpoint:
step: int # step number
commit_hash: str # full git hash
message: str # descriptive message
timestamp: float # Unix timestamp
files_changed: list[str] # modified files
def short_hash(self) -> str:
return self.commit_hash[:7]
CheckpointManager
CHECKPOINT_PREFIX = "architect:checkpoint"
class CheckpointManager:
def __init__(self, workspace_root: str) -> None: ...
def create(self, step: int, message: str = "") -> Checkpoint | None: ... # None if no changes
def list_checkpoints(self) -> list[Checkpoint]: ... # most recent first
def rollback(self, step: int | None = None, commit: str | None = None) -> bool: ...
def get_latest(self) -> Checkpoint | None: ...
def has_changes_since(self, commit_hash: str) -> bool: ...
CheckpointsConfig (Pydantic — config/schema.py)
class CheckpointsConfig(BaseModel):
enabled: bool = False # True = enable automatic checkpoints
every_n_steps: int = 5 # 1-50
Auto-Review (agents/reviewer.py) — v4-C5
ReviewResult (dataclass)
@dataclass
class ReviewResult:
has_issues: bool # True if issues were found
review_text: str # full review text
cost: float # review cost in USD
AutoReviewer
class AutoReviewer:
def __init__(
self,
agent_factory: Callable[..., Any],
review_model: str | None = None,
) -> None: ...
def review_changes(self, task: str, git_diff: str) -> ReviewResult: ...
@staticmethod
def build_fix_prompt(review_text: str) -> str: ...
@staticmethod
def get_recent_diff(workspace_root: str, commits_back: int = 1) -> str: ...
AutoReviewConfig (Pydantic — config/schema.py)
class AutoReviewConfig(BaseModel):
enabled: bool = False # True = enable auto-review
review_model: str | None = None # model for the reviewer (None = same as builder)
max_fix_passes: int = 1 # 0-3 (0 = report only)
TelemetryConfig (Pydantic — config/schema.py, v1.0.0)
class TelemetryConfig(BaseModel):
enabled: bool = False # True = enable OpenTelemetry traces
exporter: str = "console" # "otlp" | "console" | "json-file"
endpoint: str = "http://localhost:4317" # gRPC endpoint for OTLP
trace_file: str | None = None # file path for json-file
HealthConfig (Pydantic — config/schema.py, v1.0.0)
class HealthConfig(BaseModel):
enabled: bool = False # True = automatic analysis
include_patterns: list[str] = ["**/*.py"] # file patterns to analyze
exclude_dirs: list[str] = [".git", "venv", "__pycache__", "node_modules"]
HealthSnapshot (dataclass — core/health.py, v1.0.0)
@dataclass
class HealthSnapshot:
files_analyzed: int
total_functions: int
avg_complexity: float
max_complexity: int
long_functions: int # > 50 lines
duplicate_blocks: int # duplicate blocks
functions: list[FunctionMetric]
HealthDelta (dataclass — core/health.py, v1.0.0)
@dataclass
class HealthDelta:
before: HealthSnapshot
after: HealthSnapshot
def to_report(self) -> str: ... # markdown table
FunctionMetric (frozen dataclass — core/health.py, v1.0.0)
@dataclass(frozen=True)
class FunctionMetric:
file: str
name: str
lines: int
complexity: int
CompetitiveConfig (dataclass — features/competitive.py, v1.0.0)
@dataclass
class CompetitiveConfig:
task: str
models: list[str]
checks: list[str]
agent: str = "build"
max_steps: int = 50
budget_per_model: float | None = None
timeout_per_model: int | None = None
config_path: str | None = None
api_base: str | None = None
CompetitiveResult (dataclass — features/competitive.py, v1.0.0)
@dataclass
class CompetitiveResult:
model: str
status: str # success | partial | failed | timeout
steps: int
cost: float
duration: float
files_modified: list[str]
checks_passed: int
checks_total: int
worktree_path: str
score: float # 0-100
DispatchSubagentArgs (Pydantic — tools/dispatch.py, v1.0.0)
class DispatchSubagentArgs(BaseModel):
agent_type: str # "explore" | "test" | "review"
task: str # sub-task description
context: str = "" # additional context
Error hierarchy
Exception
├── MCPError mcp/client.py
│ ├── MCPConnectionError HTTP connection error to the MCP server
│ └── MCPToolCallError error in remote tool execution
│
├── PathTraversalError execution/validators.py
│ # Attempt to access outside the workspace (../../etc/passwd)
│
├── ValidationError execution/validators.py
│ # File or directory not found during validation
│
├── PatchError tools/patch.py
│ # Error parsing or applying a unified diff in apply_patch
│
├── NoTTYError execution/policies.py
│ # Interactive confirmation needed but no TTY available (CI/headless)
│
├── ToolNotFoundError tools/registry.py
│ # Requested tool not registered in the registry
│
├── DuplicateToolError tools/registry.py
│ # Attempt to register a tool with an already existing name (without allow_override=True)
│
├── AgentNotFoundError agents/registry.py
│ # Agent name not found in DEFAULT_AGENTS or in YAML
│
├── StepTimeoutError(TimeoutError) core/timeout.py
│ # Agent step exceeded the configured maximum time
│ # .seconds: int — time in seconds that was exceeded
│
├── BudgetExceededError costs/tracker.py
│ # Total session cost exceeded the configured budget_usd
│ # Raised by CostTracker.record() → caught by AgentLoop → state.status="partial"
│
├── GuardrailViolation core/guardrails.py # v4-A2
│ # Deterministic guardrail violation (file access, command block, edit limits)
│ # Caught by ExecutionEngine → ToolResult(success=False)
│
└── BlockedCommandError tools/commands.py
# Command in the static blocklist (always blocked)
These exceptions are for internal signaling — most are caught in ExecutionEngine or AgentLoop and converted into a ToolResult(success=False) or an agent status change, respectively. None of them should propagate to the end user.