Arquitectura
Este documento describe la estructura interna de vigil, el flujo del motor de analisis, y el protocolo de analyzers.
Estructura del proyecto
vigil-cli/
src/vigil/
__init__.py # __version__
cli.py # Comandos Click (scan, deps, tests, init, rules)
config/
__init__.py
schema.py # Modelos Pydantic v2 para configuracion
loader.py # Carga y merge de config (YAML + CLI)
rules.py # Catalogo de 26 reglas (RULES_V0)
core/
__init__.py
finding.py # Severity, Category, Location, Finding
engine.py # ScanEngine, ScanResult
file_collector.py # Descubrimiento de archivos
rule_registry.py # RuleRegistry para acceso a reglas
analyzers/
__init__.py
base.py # BaseAnalyzer Protocol
deps/ # CAT-01: Dependency Analyzer
__init__.py
analyzer.py # DependencyAnalyzer (DEP-001..007)
parsers.py # Parsers para requirements.txt, pyproject.toml, package.json
registry_client.py # Cliente HTTP para PyPI/npm con cache local
similarity.py # Levenshtein + corpus de paquetes populares
auth/ # CAT-02: Auth Analyzer
__init__.py
analyzer.py # AuthAnalyzer (AUTH-001..007)
endpoint_detector.py # Deteccion de endpoints HTTP (FastAPI/Flask/Express)
middleware_checker.py # Verificacion de auth middleware
patterns.py # Regex para JWT, CORS, cookies, passwords
secrets/ # CAT-03: Secrets Analyzer
__init__.py
analyzer.py # SecretsAnalyzer (SEC-001..006)
placeholder_detector.py # Deteccion de placeholders y assignments
entropy.py # Calculo de Shannon entropy
env_tracer.py # Tracing de valores desde .env.example
tests/ # CAT-06: Test Quality Analyzer
__init__.py
analyzer.py # TestQualityAnalyzer (TEST-001..006)
assert_checker.py # Extraccion de funciones test, conteo de assertions
mock_checker.py # Deteccion de mock mirrors
coverage_heuristics.py # Identificacion de archivos de test y frameworks
reports/
__init__.py
formatter.py # BaseFormatter Protocol + factory
human.py # Formato terminal con colores
json_fmt.py # Formato JSON estructurado
junit.py # Formato JUnit XML
sarif.py # Formato SARIF 2.1.0
summary.py # Generador de resumen (conteos)
logging/
__init__.py
setup.py # Configuracion de structlog
tests/
conftest.py # Fixtures globales
test_cli.py # Tests del CLI
test_cli_edge_cases.py # Edge cases del CLI
test_integration.py # Tests de integracion end-to-end
test_core/
test_finding.py
test_engine.py
test_file_collector.py
test_config/
test_schema.py
test_loader.py
test_rules.py
test_reports/
test_formatters.py
test_formatters_edge_cases.py
test_fase4_formatters.py # Tests de mejoras FASE 4
test_fase4_qa.py # QA: regresion, edge cases, consistency
test_analyzers/
test_deps/
test_parsers.py # Tests de parsers de dependencias
test_parsers_qa.py # QA: edge cases (markers, BOM, CRLF, Unicode)
test_registry_client.py # Tests del cliente de registries
test_registry_client_qa.py # QA: cache, sanitize, response parsing
test_similarity.py # Tests de deteccion de typosquatting
test_similarity_qa.py # QA: corpus integrity, false positives, PEP 503
test_analyzer.py # Tests del DependencyAnalyzer
test_analyzer_qa.py # QA: false positives/negatives, boundaries
test_integration_qa.py # QA: engine+analyzer, CLI+deps, regression
test_auth/
test_analyzer.py # Tests del AuthAnalyzer
test_endpoint_detector.py # Tests de deteccion de endpoints
test_middleware_checker.py # Tests de verificacion de auth middleware
test_patterns.py # Tests de patrones regex (JWT, CORS, cookies)
test_qa_regression.py # QA: edge cases y regresiones
test_secrets/
test_analyzer.py # Tests del SecretsAnalyzer
test_placeholder_detector.py # Tests de deteccion de placeholders
test_entropy.py # Tests de calculo de Shannon entropy
test_env_tracer.py # Tests de tracing de .env.example
test_qa_regression.py # QA: edge cases y regresiones
test_tests/
test_analyzer.py # Tests del TestQualityAnalyzer
test_assert_checker.py # Tests de extraccion y conteo de assertions
test_mock_checker.py # Tests de deteccion de mock mirrors
test_coverage_heuristics.py # Tests de identificacion de archivos test
test_qa_regression.py # QA: edge cases y regresiones (81 tests)
fixtures/ # Archivos de prueba
deps/ # Fixtures de dependencias
valid_project/ # Proyecto con deps legitimas
hallucinated_deps/ # Deps alucinadas/inventadas
npm_project/ # Proyecto npm con deps inventadas
clean_project/ # Proyecto limpio (sin findings)
vulnerable_project/ # Mix de deps legitimas y sospechosas
edge_cases/ # Empty, comments-only, markers, URLs, malformed
auth/ # Fixtures de auth
insecure_fastapi.py # FastAPI sin auth middleware
insecure_flask.py # Flask sin auth
insecure_express.js # Express sin auth
secure_app.py # App con auth correcta (sin findings)
edge_cases.py # Casos borde
secrets/ # Fixtures de secrets
insecure_secrets.py # Secrets hardcodeados (Python)
insecure_secrets.js # Secrets hardcodeados (JavaScript)
.env.example # Ejemplo de .env.example
copies_env_example.py # Codigo que copia valores de .env.example
secure_code.py # Codigo seguro (sin findings)
tests/ # Fixtures de test quality
vulnerable_tests.py # Tests Python con problemas (sin assertions, triviales, catch-all)
vulnerable_tests.js # Tests JavaScript con problemas
clean_tests.py # Tests Python correctos (sin findings)
clean_tests.js # Tests JavaScript correctos (sin findings)
edge_cases_python.py # Casos borde Python (async, single-line, nested)
edge_cases_js.js # Casos borde JavaScript (async, describe, nested)
npm_tests.test.js # Tests npm/jest con problemas mixtos
Modelos de datos
Severity
Enum string con 5 niveles, ordenados de mayor a menor criticidad:
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
Usar str, Enum permite comparar directamente con strings y serializar a JSON sin conversion.
Category
Enum string con 4 categorias de analisis:
class Category(str, Enum):
DEPENDENCY = "dependency"
AUTH = "auth"
SECRETS = "secrets"
TEST_QUALITY = "test-quality"
Location
Dataclass que indica donde se encontro el problema:
@dataclass
class Location:
file: str # Ruta al archivo
line: int | None = None # Linea (1-based)
column: int | None = None # Columna (1-based)
end_line: int | None = None # Linea final (para rangos)
snippet: str | None = None # Fragmento de codigo
Finding
Dataclass que representa un hallazgo individual:
@dataclass
class Finding:
rule_id: str # "DEP-001", "AUTH-005"
category: Category # Category.DEPENDENCY
severity: Severity # Severity.CRITICAL
message: str # Descripcion del problema
location: Location # Donde se encontro
suggestion: str | None = None # Como corregirlo
metadata: dict[str, Any] = field(default_factory=dict)
@property
def is_blocking(self) -> bool:
return self.severity in (Severity.CRITICAL, Severity.HIGH)
La propiedad is_blocking determina si el finding deberia bloquear un merge (por defecto, CRITICAL y HIGH son bloqueantes).
Flujo del motor
El ScanEngine es el orquestador central. Su metodo run() ejecuta el pipeline completo:
run(paths)
|
+----------v-----------+
| 1. Collect files |
| (file_collector) |
+----------+-----------+
|
+----------v-----------+
| 2. Run analyzers |
| (for each analyzer) |
+----------+-----------+
|
+----------v-----------+
| 3. Apply overrides |
| (rule_overrides) |
+----------+-----------+
|
+----------v-----------+
| 4. Sort findings |
| (by severity) |
+----------+-----------+
|
v
ScanResult
Paso 1: Recopilar archivos
file_collector.collect_files() recibe las rutas del usuario y retorna una lista de archivos a escanear:
- Recorre directorios recursivamente con
os.walk()y pruning in-place de directorios excluidos (dirnames[:] = [...]). Esto evita recorrer.venv/,node_modules/, etc., lo cual es critico para rendimiento (un.venv/tipico contiene miles de archivos). - Filtra por extensiones de lenguaje (
LANGUAGE_EXTENSIONS). - Excluye patrones configurados por componente de path (no por substring).
- Siempre incluye archivos de dependencias (
requirements.txt,package.json, etc.) independientemente del filtro de lenguaje. - Deduplica preservando el orden.
Paso 2: Ejecutar analyzers
Para cada analyzer registrado:
- Verifica si debe ejecutarse (
_should_run()): respeta filtros de--categoryy--rule. - Llama a
analyzer.analyze(files, config). - Recopila los findings retornados.
- Captura excepciones por analyzer (un analyzer fallido no detiene a los demas).
Paso 3: Aplicar overrides
_apply_rule_overrides() procesa la seccion rules: de la configuracion:
- Si una regla tiene
enabled: false, sus findings se eliminan. - Si una regla tiene
severity: "low", la severidad del finding se modifica. - Si una regla esta en
exclude_rules(de--exclude-rule), se elimina.
Paso 4: Ordenar
Los findings se ordenan por severidad descendente (CRITICAL primero, INFO ultimo) usando SEVERITY_SORT_ORDER.
Protocolo de analyzers
Cada analyzer implementa el protocolo BaseAnalyzer:
class BaseAnalyzer(Protocol):
@property
def name(self) -> str: ...
@property
def category(self) -> Category: ...
def analyze(self, files: list[str], config: ScanConfig) -> list[Finding]: ...
Contrato
name: Nombre unico del analyzer (ej."dependency","auth").category: Categoria de findings que genera.analyze(): Recibe la lista de archivos y la configuracion, retorna findings.
Reglas para implementar un analyzer
- Determinista: El mismo input siempre produce el mismo output.
- Sin efectos secundarios: No modifica archivos, no escribe a stdout.
- Manejo de errores interno: Si un archivo no se puede leer, el analyzer lo ignora y continua.
- Logging a stderr: Usar
structlogpara logs de debug/info. - Respetar la configuracion: Leer thresholds y opciones de
ScanConfig.
Ejemplo de implementacion
from vigil.analyzers.base import BaseAnalyzer
from vigil.config.schema import ScanConfig
from vigil.core.finding import Category, Finding, Location, Severity
class DependencyAnalyzer:
@property
def name(self) -> str:
return "dependency"
@property
def category(self) -> Category:
return Category.DEPENDENCY
def analyze(self, files: list[str], config: ScanConfig) -> list[Finding]:
findings: list[Finding] = []
# ... logica de analisis ...
return findings
No se requiere herencia — solo satisfacer el Protocol (structural typing).
Registro de analyzers
En cli.py, los analyzers se registran mediante _register_analyzers(engine) antes de ejecutar el scan:
def _register_analyzers(engine: ScanEngine) -> None:
from vigil.analyzers.deps import DependencyAnalyzer
from vigil.analyzers.auth import AuthAnalyzer
from vigil.analyzers.secrets import SecretsAnalyzer
from vigil.analyzers.tests import TestQualityAnalyzer
engine.register_analyzer(DependencyAnalyzer())
engine.register_analyzer(AuthAnalyzer())
engine.register_analyzer(SecretsAnalyzer())
engine.register_analyzer(TestQualityAnalyzer())
Esta funcion se invoca en los comandos scan, deps y tests.
DependencyAnalyzer
El primer analyzer implementado. Detecta dependencias alucinadas, typosquatting, paquetes nuevos sospechosos, versiones inexistentes y paquetes sin repositorio fuente.
Arquitectura interna
DependencyAnalyzer.analyze(files, config)
|
v
[1. _extract_roots(files)] --> Directorios raiz unicos
|
v
[2. find_and_parse_all(root)] --> Lista de DeclaredDependency
| (parsers: req.txt, pyproject.toml, package.json)
v
[3. _deduplicate_deps()] --> Deps unicos por nombre+ecosystem
|
v
[4. load_popular_packages()] --> Corpus para typosquatting
|
+---> [5a. _check_registries()] --> DEP-001, DEP-002, DEP-005, DEP-007
| | (solo si online + verify_registry)
| v
| RegistryClient.check(name, ecosystem)
| |
| +---> Cache hit? return cached
| +---> HTTP GET PyPI/npm -> PackageInfo -> cache
|
+---> [5b. find_similar_popular()] --> DEP-003 (siempre, no requiere red)
|
v
list[Finding]
Componentes
| Modulo | Responsabilidad |
|---|---|
parsers.py | Parsea requirements.txt, pyproject.toml, package.json en DeclaredDependency |
registry_client.py | HTTP client para PyPI/npm con cache en disco (~/.cache/vigil/registry/) |
similarity.py | Levenshtein distance, normalizacion PEP 503, corpus de paquetes populares |
analyzer.py | Orquesta parsers + registry + similarity, genera findings |
Reglas implementadas
| Regla | Requiere red | Descripcion |
|---|---|---|
| DEP-001 | Si | Paquete no existe en registro |
| DEP-002 | Si | Paquete creado hace menos de N dias |
| DEP-003 | No | Nombre similar a paquete popular |
| DEP-005 | Si | Sin repositorio fuente |
| DEP-007 | Si | Version pinneada no existe |
Reglas diferidas (V1)
| Regla | Razon |
|---|---|
| DEP-004 | Requiere API de estadisticas de descargas |
| DEP-006 | Requiere parser de imports AST |
Sistema de configuracion
Tres capas con merge progresivo
Defaults (schema.py) < Archivo YAML (.vigil.yaml) < Flags CLI
- Defaults: Definidos como valores por defecto en los modelos Pydantic (
ScanConfig,DepsConfig, etc.). - YAML: Cargado con
pyyamly validado con Pydantic. - CLI: Flags de Click que sobreescriben campos especificos.
Loader
load_config() en config/loader.py:
- Busca el archivo de config (manual con
--config, o auto-deteccion subiendo por el arbol de directorios). - Parsea el YAML.
- Crea una instancia de
ScanConfigcon los valores del YAML. - Aplica overrides del CLI sobre la instancia.
- Retorna la configuracion final.
Validacion
Pydantic v2 valida automaticamente:
- Tipos de datos (
min_age_dayses int, no string). - Valores validos (
fail_ones uno de critical/high/medium/low). - Modelos anidados (
deps,auth,secrets,tests,output).
Catalogo de reglas
Las 26 reglas estan definidas en config/rules.py como instancias de RuleDefinition:
@dataclass
class RuleDefinition:
id: str # "DEP-001"
name: str # "Hallucinated dependency"
description: str # Descripcion larga
category: Category # Category.DEPENDENCY
default_severity: Severity # Severity.CRITICAL
enabled_by_default: bool = True
languages: list[str] | None = None # None = todos
owasp_ref: str | None = None # "LLM03"
cwe_ref: str | None = None # "CWE-829"
RuleRegistry
Provee acceso indexado al catalogo:
registry.get("DEP-001")— obtener una regla por ID.registry.all()— todas las reglas.registry.by_category(Category.AUTH)— reglas de una categoria.registry.by_severity(Severity.CRITICAL)— reglas de una severidad.registry.enabled_rules(overrides)— reglas habilitadas despues de aplicar overrides.
Formateadores
Protocolo
class BaseFormatter(Protocol):
def format(self, result: ScanResult) -> str: ...
Factory
get_formatter(format_name, **kwargs) retorna la clase correcta con lazy import. Solo HumanFormatter acepta **kwargs; los demas los ignoran:
"human" -> HumanFormatter(colors=True, show_suggestions=True, quiet=False)
"json" -> JsonFormatter
"junit" -> JunitFormatter
"sarif" -> SarifFormatter
HumanFormatter
Constructor acepta tres opciones:
| Opcion | Default | Descripcion |
|---|---|---|
colors | True | Habilita colores ANSI (auto-detecta TTY) |
show_suggestions | True | Muestra sugerencias de correccion |
quiet | False | Suprime header y resumen, solo findings y errores |
Iconos Unicode: ✗ (critical/high), ⚠ (medium), ~ (low), i (info). Separadores y flechas: ─, →, ✓.
JsonFormatter
Incluye campos analyzers_run, findings_count, errors a nivel raiz. El summary se genera via build_summary() con desglose por severidad, categoria, regla, archivos top 10, y flag has_blocking. El campo snippet en location solo se incluye si esta presente.
JunitFormatter
Incluye un elemento <properties> con metadata del scan (vigil.version, vigil.files_scanned, vigil.analyzers). El texto de <failure> incluye Rule, Severity, Category, File, Suggestion, y Snippet. El atributo tests cuenta findings + errores.
SarifFormatter
Cumple con SARIF 2.1.0. Incluye semanticVersion, defaultConfiguration por regla, helpUri, ruleIndex en resultados, snippet en region, invocations con estado de ejecucion y errores, y referencias OWASP/CWE en properties. Los nombres de reglas se convierten a PascalCase.
Flujo de output
ScanResult -> Formatter.format() -> string -> stdout o archivo
El CLI decide a donde enviar el output:
- Sin
--output: stdout. - Con
--output: escribe a archivo (y tambien a stdout para formato human).
Logging
structlog
vigil usa structlog para logging estructurado:
- Verbose mode (
-v): Level DEBUG, con timestamps y key-value pairs. - Normal mode: Level WARNING, output minimalista.
- Output siempre a stderr: Los logs nunca van a stdout. Esto permite
vigil scan -f json | jqsin contaminar el JSON con logs.
Ejemplo de logs en modo verbose
2024-01-15 10:30:00 [info] files_collected count=42
2024-01-15 10:30:00 [info] analyzer_start name=dependency
2024-01-15 10:30:01 [info] analyzer_done name=dependency findings=2
Dependencias externas
| Dependencia | Version | Proposito |
|---|---|---|
click>=8.1 | CLI framework | Subcomandos, opciones, help automatico |
pydantic>=2.0 | Validacion | Modelos de configuracion con validacion |
httpx>=0.27 | HTTP client | Requests a PyPI/npm (async-capable) |
structlog>=24.1 | Logging | Logging estructurado a stderr |
pyyaml>=6.0 | YAML parser | Carga de archivos de configuracion |
Dependencias de desarrollo
| Dependencia | Version | Proposito |
|---|---|---|
pytest>=8.0 | Testing | Framework de tests |
pytest-cov>=5.0 | Cobertura | Reporte de cobertura de tests |
ruff>=0.4 | Linting | Linter y formatter de Python |
Decisiones de diseno
Por que Protocol y no ABC
Se usa typing.Protocol (structural typing) en lugar de abc.ABC (nominal typing) para:
- Flexibilidad: Los analyzers no necesitan heredar de una clase base.
- Testing: Es trivial crear fakes/mocks que satisfagan el protocolo.
- Desacoplamiento: Los modulos no dependen de la clase base.
Por que dataclasses y no Pydantic para Finding
Finding,Location, yRuleDefinitionson modelos de datos internos que no necesitan validacion.- Pydantic se reserva para la configuracion del usuario donde la validacion es critica.
- Las dataclasses son mas ligeras y rapidas para datos que se crean internamente.
Por que structlog
- Logging estructurado (key-value) facilita parsing y filtrado.
- Separacion clara de output (stdout) vs logs (stderr).
- Configuracion centralizada con processors.
Por que no async
vigil V0 es sincrono. Las razones:
- La mayoria de operaciones son I/O de filesystem, que es rapido.
- Las HTTP requests al registry se pueden hacer con
httpxsincrono. - La simplicidad del codigo sincrono facilita debugging y testing.
- Se puede migrar a async en versiones futuras si el rendimiento lo requiere.