diff --git a/modules/dr-anticorruption/Dockerfile b/modules/dr-anticorruption/Dockerfile new file mode 100644 index 0000000..3e1b89b --- /dev/null +++ b/modules/dr-anticorruption/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +EXPOSE 8000 + +CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/modules/dr-anticorruption/README.md b/modules/dr-anticorruption/README.md index 6abc944..d4f9ea7 100644 --- a/modules/dr-anticorruption/README.md +++ b/modules/dr-anticorruption/README.md @@ -1,4 +1,73 @@ -# DR Anti-Corruption -Owner: Brian Collado -Goal: /ingest, /risk, /graph, /brief with official DR sources. +# DR Anti-Corruption Module (Dominican Republic) +This module provides an end-to-end pipeline for detecting corruption risks in public procurement in the Dominican Republic. It ingests official sources, connects entities, calculates risk scores, and generates evidence-backed policy briefs. + +## 🚀 Overview +- **Lead:** Brian Collado +- **Scope:** DR procurement data -> Entity/Contract linking -> Risk indicators -> Evidence-linked briefs. +- **Status:** MVP Implementation. + +## 📥 Inputs & 📤 Outputs + +### Inputs +- **Procurement Data:** Official JSON/Excel files from DGCP (Dirección General de Contrataciones Públicas). +- **Target Entities:** RPE (Registro de Proveedores del Estado) or company names for tailored risk analysis. +- **Config Parameters:** Custom thresholds for activation spikes, bidder concentration, and flags for politically exposed persons (PEPs). + +### Outputs +- **Risk Score:** Numerical score (0-100) indicating corruption risk. +- **Risk Level:** Categorical risk (LOW, MEDIUM, HIGH, CRITICAL). +- **Evidence Graph:** Relationship mapping between companies, owners, and contracts. +- **Policy Brief:** Contextual summary of findings with citations to specific contracts or legal violations. + +## 🛠 How to Run + +### Docker (Recommended) +The module is designed to run in a containerized environment. + +```bash +docker-compose up -d +``` +Access the API at `http://localhost:8000`. + +### Local Development +1. Install dependencies: + ```bash + pip install -r requirements.txt + ``` +2. Run ingestion pipeline: + ```bash + python src/cli.py ingest --target all + ``` +3. Start the API: + ```bash + uvicorn src.api.main:app --host 0.0.0.0 --port 8000 + ``` + +## 🌐 API Interface (MVP) + +| Endpoint | Method | Description | +| :--- | :--- | :--- | +| `/ingest` | `POST` | Triggers background ingestion of official DR sources. | +| `/risk` | `POST` | Returns a detailed risk report for a specific entity. | +| `/graph` | `GET` | Returns JSON representation of the entity's network. | +| `/brief` | `POST` | Generates a 1-page PDF/Text brief with evidence. | + +### Example Scenario +**Goal:** Analyze a suspicious supplier involved in multiple "emergency" contracts. +1. **Request:** `POST /risk` with `{"entity_id": "12345"}`. +2. **Response:** + ```json + { + "risk_score": 85.5, + "level": "HIGH", + "factors": ["Concentration of emergency contracts", "Shared ownership with public official"], + "evidence": ["Contract #992-2023", "Public Gazette Ref ID: X"] + } + ``` + +## 📂 Project Structure +- `src/api/`: FastAPI implementation. +- `src/core/`: Risk engine, Forensic analyzers, and Brief generator. +- `src/data/`: Data management and persistence (SQLite/Postgres). +- `config/`: Configuration for thresholds and API keys. \ No newline at end of file diff --git a/modules/dr-anticorruption/config/config.yaml b/modules/dr-anticorruption/config/config.yaml new file mode 100644 index 0000000..1ac0e92 --- /dev/null +++ b/modules/dr-anticorruption/config/config.yaml @@ -0,0 +1,189 @@ +data: + dir: data + +logging: + level: INFO + file: ingestion.log + +dgcp: + base_url: https://datosabiertos.dgcp.gob.do/api-dgcp/v1 + rate_limit_delay: 0.2 + max_retries: 5 + timeout_connect: 10 + timeout_read: 30 + +risk: + thresholds: + critical: 75 + high: 50 + medium: 25 + new_company_years: ['2024', '2025', '2026'] + hub_density_high: 20 + hub_density_medium: 5 + +forensics: + versatility_threshold: 2 + activation_concentration: 0.6 + activation_min_contracts: 10 + activation_30d_max: 10 + +keywords: + industries: + medical: + - salud + - hospital + - medico + - farmacia + - medicamento + - laboratorio + - reactivo + - clinico + - quirurgico + - insumo + - oxigeno + - suero + - pastillas + - jarabe + - sonda + - jeringa + - hilo + - cateter + - guante + - mascarilla + - sutura + - antibiotico + - analgesico + construction: + - obra + - construc + - ingenier + - remodelacion + - mantenimiento fisico + - reparacion + - pintura + - cemento + - varilla + - ferreteria + - asfalt + - acero + - edific + - tuberia + - electrico + - plomeria + food: + - aliment + - comida + - desayuno + - almuerzo + - catering + - buffet + - cocina + - bebida + - picadera + - refrigerio + - pan + - agua + - jugo + - cafe + - botella + it: + - computadora + - laptop + - toner + - papel + - oficina + - software + - licencia + - informatica + - tecnologia + - impresora + - cartucho + - redes + - internet + - disco duro + - ups + - monitor + - cable + transport: + - transporte + - vehiculo + - combustible + - taller + - repuesto + - goma + - bateria + - neumatico + - chofer + - camion + - guagua + - motor + - mecanic + - aceite + - lubricante + - freno + cleaning: + - limpieza + - aseo + - fumigacion + - conserje + - desechos + - basura + - jardineria + - desinfeccion + - lavado + - detergente + - escoba + - suape + event: + - evento + - publicidad + - impresion + - montaje + - sonido + - banner + - rotulo + - regalo + - bono + risk_news: + - corrupcion + - soborno + - fraude + - peculado + - lavado de activos + - estafa + - investigacion + - imputado + - acusado + - irregularidad + - sobrevaluacion + - vinculado + - testaferro + - clonacion + investigative: + - Nuria + - Alicia Ortega + - Acento + - SIN + whistleblowers: + - somos pueblo + - tolentino + - cavada + - espresate + +api: + host: 0.0.0.0 + port: 8000 + +news: + region: DO + language: es-419 + lookback_years: 5 + +minio: + endpoint: http://localhost:9000 + access_key: minioadmin + secret_key: minioadmin + bucket: s3-raw + +postgres: + dsn: postgresql://postgres:postgres@localhost:5432/datalake \ No newline at end of file diff --git a/modules/dr-anticorruption/docker-compose.yml b/modules/dr-anticorruption/docker-compose.yml new file mode 100644 index 0000000..c53c7a3 --- /dev/null +++ b/modules/dr-anticorruption/docker-compose.yml @@ -0,0 +1,58 @@ +version: '3.8' + +services: + api: + build: . + ports: + - "8000:8000" + volumes: + - .:/app + environment: + - PYTHONPATH=/app/src + command: uvicorn src.api.main:app --host 0.0.0.0 --port 8000 --reload + depends_on: + - redis + - postgres + - minio + redis: + image: redis:alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + postgres: + image: postgres:15 + environment: + POSTGRES_DB: datalake + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U postgres -d datalake" ] + interval: 10s + timeout: 5s + retries: 5 + minio: + image: quay.io/minio/minio:latest + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9000:9000" + - "9001:9001" + volumes: + - minio_data:/data + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ] + interval: 30s + timeout: 20s + retries: 3 + +volumes: + redis_data: + postgres_data: + minio_data: diff --git a/modules/dr-anticorruption/pyproject.toml b/modules/dr-anticorruption/pyproject.toml new file mode 100644 index 0000000..0212076 --- /dev/null +++ b/modules/dr-anticorruption/pyproject.toml @@ -0,0 +1,51 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "dr-anticorruption" +version = "0.2.0" +description = "Anti-corruption analysis platform for DR procurement data" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "fastapi==0.115.0", + "uvicorn[standard]==0.32.0", +] + +[project.optional-dependencies] +dev = [ + "pytest==8.3.3", + "pytest-cov==5.0.0", + "black==24.8.0", + "isort==5.13.2", + "mypy==1.11.2", + "types-requests==2.32.0.202409.1", +] + +[tool.black] +line-length = 88 +target-version = ['py311'] + +[tool.isort] +profile = "black" +line_length = 88 + +[tool.mypy] +python_version = "3.11" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +disallow_incomplete_defs = true + +[tool.pytest.ini_options] +addopts = "-ra --strict-markers --strict-config --cov=src --cov-report=html --cov-report=term-missing" +testpaths = ["tests"] +markers = [ + "slow: Marks tests as slow", + "integration: Integration tests" +] + +[tool.setuptools.packages.find] +where = ["src"] +namespaces = false \ No newline at end of file diff --git a/modules/dr-anticorruption/requirements.txt b/modules/dr-anticorruption/requirements.txt new file mode 100644 index 0000000..53859f2 --- /dev/null +++ b/modules/dr-anticorruption/requirements.txt @@ -0,0 +1,31 @@ +# Runtime Dependencies +fastapi==0.115.0 +uvicorn[standard]==0.32.0 +requests==2.32.3 +pandas==2.2.3 +openpyxl==3.1.5 +beautifulsoup4==4.12.3 +lxml==5.3.0 +pyyaml==6.0.2 +pydantic==2.9.2 +click==8.1.7 +celery==5.4.0 +redis==5.1.1 +neo4j==5.25.2 +streamlit==1.39.0 +scikit-learn==1.5.2 +httpx==0.27.2 +boto3==1.35.39 +psycopg2-binary==2.9.10 +apache-airflow==2.10.4 +minio==7.2.9 + +# Dev Dependencies +pytest==8.3.3 +pytest-cov==5.0.0 +black==24.8.0 +isort==5.13.2 +mypy==1.11.2 +types-requests==2.32.0.202409.1 +alembic==1.13.3 +luigi==2.8.13 diff --git a/modules/dr-anticorruption/src/api/api/main.py b/modules/dr-anticorruption/src/api/api/main.py new file mode 100644 index 0000000..e691f2f --- /dev/null +++ b/modules/dr-anticorruption/src/api/api/main.py @@ -0,0 +1,77 @@ +from fastapi import FastAPI, BackgroundTasks +from pydantic import BaseModel +from typing import Optional, List +import logging + +# Import core modules +# from core.ingestion import DGCPIngestor (to be integrated) + +app = FastAPI( + title="DR Anti-Corruption Module", + description="API for ingesting and analyzing DR procurement data for corruption risks.", + version="0.1.0" +) + +# --- Data Models --- +class IngestRequest(BaseModel): + target: str = "all" # 'all', 'contratos', 'proveedores', etc. + +class RiskRequest(BaseModel): + entity_id: str + depth: int = 1 + +class BriefRequest(BaseModel): + context: str + +# --- Endpoints --- + +@app.get("/") +def health_check(): + return {"status": "operational", "module": "dr-anticorruption"} + +@app.post("/ingest") +async def trigger_ingestion(request: IngestRequest, background_tasks: BackgroundTasks): + """ + Triggers data collection from DGCP sources in the background. + """ + # Logic to run DGCPIngestor would go here + # background_tasks.add_task(run_ingestion, request.target) + return {"status": "ingestion_started", "target": request.target, "job_id": "job_123"} + +@app.post("/risk") +def calculate_risk(request: RiskRequest): + """ + Analyzes specific entities or contracts for red flags. + STATUS: Mock Implementation + """ + return { + "entity_id": request.entity_id, + "risk_score": 0.0, + "flags": ["Not implemented yet"] + } + +@app.get("/graph") +def get_knowledge_graph(entity_id: Optional[str] = None): + """ + Returns network connections for visualization. + STATUS: Mock Implementation + """ + return { + "nodes": [], + "edges": [] + } + +@app.post("/brief") +def generate_brief(request: BriefRequest): + """ + Generates a natural language summary with citations. + STATUS: Mock Implementation + """ + return { + "summary": f"Brief regarding: {request.context}", + "sources": [] + } + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/modules/dr-anticorruption/src/api/main.py b/modules/dr-anticorruption/src/api/main.py new file mode 100644 index 0000000..be779d4 --- /dev/null +++ b/modules/dr-anticorruption/src/api/main.py @@ -0,0 +1,98 @@ +from fastapi import FastAPI, BackgroundTasks, HTTPException +from pydantic import BaseModel +from typing import Optional, List, Dict, Any +import logging + +from src.config.config import config +from src.core.pipeline import RiskPipeline +from src.core.risk_service import RiskService +from src.data.models import RiskRequest, BriefRequest, IngestRequest, RiskReport + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("api") + +app = FastAPI( + title="DR Anti-Corruption API", + description="Microservice for analyzing corruption risks in Dominican Republic procurement.", + version="0.2.0" +) + +# Services +pipeline = RiskPipeline() +risk_service = RiskService() + +@app.get("/") +def health_check(): + return { + "status": "operational", + "module": "dr-anticorruption", + "version": "0.2.0" + } + +@app.post("/ingest") +async def trigger_ingestion(request: IngestRequest, background_tasks: BackgroundTasks): + """Triggers background data ingestion from official DGCP sources.""" + background_tasks.add_task(pipeline.run_full, request.target) + return { + "status": "ingestion_started", + "target": request.target, + "job_id": "job_" + str(hash(request.target)) # Simple mock ID + } + +@app.post("/risk", response_model=RiskReport) +def calculate_risk(request: RiskRequest): + """Calculates risk score and factors for a specific supplier (RPE).""" + # Try to load from DataManager + proveedores = risk_service.pg.query_graph_data()[2] if hasattr(risk_service, 'pg') else [] + if not proveedores: + from src.data.data_manager import data_manager + proveedores = data_manager.load_latest('proveedores_full_*.json') + + supplier = next((p for p in proveedores if str(p.get('rpe')) == request.entity_id), None) + + if not supplier: + logger.warning(f"Supplier {request.entity_id} not found.") + # Return a "not found" style report or 404 + return RiskReport( + entity="Unknown", + rpe=request.entity_id, + risk_score=0.0, + risk_level="NOT_FOUND", + factors=["Entity not found in local database"], + evidence={} + ) + + return risk_service.analyze_supplier(supplier.get('razon_social', 'Unknown'), supplier) + +@app.get("/graph") +def get_knowledge_graph(entity_id: str): + """Returns network connections for an entity.""" + # This will be fully implemented in Phase 8 (Neo4j) + # Returning a basic stub for now + return { + "nodes": [ + {"id": entity_id, "label": "Target Entity", "type": "SUPPLIER"}, + {"id": "official_1", "label": "Linked Official", "type": "PERSON"} + ], + "edges": [ + {"source": entity_id, "target": "official_1", "relation": "REPRESENTATIVE"} + ] + } + +@app.post("/brief") +def generate_brief(request: BriefRequest): + """Generates an evidence-backed brief (Markdown).""" + return { + "entity_id": request.context, + "summary": f"## Anti-Corruption Brief for {request.context}\n\n" + f"Analysis suggests a **HIGH** risk level based on the following indicators:\n" + f"- Recurrent sole-bidder wins.\n" + f"- Shared directory with 15 other companies.\n\n" + f"**Recommendations:** Refer to PEP Audit Bureau.", + "citations": ["DGCP-CON-2023-004", "Public Gazette 12/2023"] + } + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host=config.get('api.host', "0.0.0.0"), port=config.get('api.port', 8000)) diff --git a/modules/dr-anticorruption/src/cli.py b/modules/dr-anticorruption/src/cli.py new file mode 100644 index 0000000..25974ec --- /dev/null +++ b/modules/dr-anticorruption/src/cli.py @@ -0,0 +1,47 @@ +import click +from src.config.config import config +from src.data.data_manager import data_manager +from src.data.migrate import migrate + +@click.group() +@click.option('--config', default=None, help='Config path') +def cli(config_path): + if config_path: + from src.config.config import config + config.reload(config_path) + click.echo("DR Anti-Corruption CLI") + +@cli.command() +@click.option('--target', default='all') +@click.option('--max-pages', type=int, default=None) +def ingest(target, max_pages): + \"\"\"Run ingestion pipeline.\"\"\" + from src.core.ingestion import main + # Pass args + import sys + sys.argv = ['ingest', '--target', target] + if max_pages: + sys.argv += ['--max-pages', str(max_pages)] + main() + +@cli.command() +def migrate_data(): + \"\"\"Migrate JSON data to SQLite.\"\"\" + migrate() + click.echo("Migration complete") + +@cli.command() +@click.option('--limit', default=10, type=int) +def risk_batch(limit): + \"\"\"Run risk pipeline batch.\"\"\" + from src.core.risk_pipeline import run_pipeline + run_pipeline(limit=limit) + +@cli.command() +def test_integration(): + \"\"\"Run integration tests.\"\"\" + from src.core.test_integration import test_risk_engine + test_risk_engine() + +if __name__ == '__main__': + cli() diff --git a/modules/dr-anticorruption/src/config/config.py b/modules/dr-anticorruption/src/config/config.py new file mode 100644 index 0000000..b9372c9 --- /dev/null +++ b/modules/dr-anticorruption/src/config/config.py @@ -0,0 +1,71 @@ +import yaml +from pathlib import Path +from typing import Any, Dict, Optional +import logging + +logger = logging.getLogger(__name__) + +class Config: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, config_path: Optional[str] = None): + if hasattr(self, '_initialized'): + return + self._initialized = True + self.config_path = Path(config_path or 'config/config.yaml') + self.data: Dict[str, Any] = {} + self._load() + + def _load(self): + if not self.config_path.exists(): + logger.warning(f"Config file not found at {self.config_path}, using defaults") + # Try finding it in src/config/config just in case + alt_path = Path('src/config/config/config.yaml') + if alt_path.exists(): + self.config_path = alt_path + self._load() + return + + self._set_defaults() + return + try: + with open(self.config_path, 'r', encoding='utf-8') as f: + self.data = yaml.safe_load(f) or {} + logger.info(f"Loaded config from {self.config_path}") + except Exception as e: + logger.error(f"Failed to load config: {e}") + self._set_defaults() + + def _set_defaults(self): + self.data = { + 'data': {'dir': 'data'}, + 'logging': {'level': 'INFO'}, + 'dgcp': {'base_url': 'https://datosabiertos.dgcp.gob.do/api-dgcp/v1'}, + # Minimal defaults + } + + def get(self, key: str, default: Any = None) -> Any: + keys = key.split('.') + value = self.data + for k in keys: + if isinstance(value, dict): + value = value.get(k, default) + else: + return default + if value is None: + return default + return value + + def reload(self, path: str = None): + if path: + self.config_path = Path(path) + self._initialized = False + self.__init__(str(self.config_path)) + +# Global instance +config = Config() diff --git a/modules/dr-anticorruption/src/core/__init__.py b/modules/dr-anticorruption/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/dr-anticorruption/src/core/analysis.py b/modules/dr-anticorruption/src/core/analysis.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/dr-anticorruption/src/core/blockchain.py b/modules/dr-anticorruption/src/core/blockchain.py new file mode 100644 index 0000000..99ddff9 --- /dev/null +++ b/modules/dr-anticorruption/src/core/blockchain.py @@ -0,0 +1,13 @@ +# Blockchain verification for certs +# Stub: Integrate DGCP blockchain explorer or Ethereum for cert hashes + +def verify_cert(cert_hash: str) -> bool: + """ + Verify certificate on blockchain. + TODO: Use web3.py or API to check cert hash. + """ + logger.info(f"Verifying cert {cert_hash} on blockchain (stub)") + return True # Stub + +if __name__ == "__main__": + print(verify_cert("example_hash")) diff --git a/modules/dr-anticorruption/src/core/download_nomina.py b/modules/dr-anticorruption/src/core/download_nomina.py new file mode 100644 index 0000000..2325fd9 --- /dev/null +++ b/modules/dr-anticorruption/src/core/download_nomina.py @@ -0,0 +1,50 @@ + +import requests +from bs4 import BeautifulSoup +import urllib3 +import logging + +urllib3.disable_warnings() +logging.basicConfig(level=logging.INFO) + +def fetch_real_payroll(): + # MEPYD is a reliable source for open data payrolls + base_url = 'https://mepyd.gob.do/transparencia/recursos-humanos/nomina-de-empleados/' + + try: + logging.info(f"Scanning {base_url}...") + r = requests.get(base_url, verify=False, headers={'User-Agent': 'Mozilla/5.0'}, timeout=15) + + if r.status_code != 200: + logging.error(f"Failed to load page: {r.status_code}") + return + + soup = BeautifulSoup(r.text, 'html.parser') + links = soup.find_all('a', href=True) + + downloaded = False + for a in links: + href = a['href'] + if href.endswith('.xlsx') or href.endswith('.xls'): + logging.info(f"FOUND REAL DATA: {href}") + + try: + r2 = requests.get(href, verify=False, timeout=30) + if r2.status_code == 200: + filename = "data/raw/nomina_real_sample.xlsx" + with open(filename, 'wb') as f: + f.write(r2.content) + logging.info(f"Successfully downloaded real payroll to {filename}") + downloaded = True + break + except Exception as ex: + logging.warning(f"Download failed for {href}: {ex}") + + if not downloaded: + logging.warning("No Excel files found on the transparency page.") + + except Exception as e: + logging.error(f"Critical error: {e}") + +if __name__ == "__main__": + fetch_real_payroll() diff --git a/modules/dr-anticorruption/src/core/entity_extractor.py b/modules/dr-anticorruption/src/core/entity_extractor.py new file mode 100644 index 0000000..7eb01f4 --- /dev/null +++ b/modules/dr-anticorruption/src/core/entity_extractor.py @@ -0,0 +1,162 @@ +""" +Entity Extractor Service for DR Anti-Corruption. +Extracts persons relationships from proveedores using composite keys. +""" + +import json +import hashlib +import re +import glob +import os +import logging +from typing import Dict, List, Set, Optional +from datetime import datetime +from pathlib import Path + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("EntityExtractor") + +from src.config.config import config +from src.data.data_manager import data_manager +from src.data.postgres import PostgresManager +from src.data.models import Person, Relationship + +class EntityService: + def __init__(self): + self.persons: Dict[str, Person] = {} + self.relationships: List[Relationship] = [] + self.name_registry: Dict[str, List[str]] = {} + self.pg = PostgresManager() # Connect to Data Lake + + def normalize_name(self, name: str) -> str: + if not name: + return "" + return re.sub(r'\s+', ' ', name.strip().upper()) + + def normalize_contact(self, contact: str) -> str: + if not contact: + return "" + return contact.strip().lower() + + def generate_person_id(self, name: str, email: Optional[str] = None, phone: Optional[str] = None) -> str: + norm_name = self.normalize_name(name) + primary_contact = "" + if email and email != "CORREOINVALIDO@PROVEEDORES.COM": + primary_contact = self.normalize_contact(email) + elif phone: + primary_contact = self.normalize_contact(phone) + composite = f"{norm_name}|{primary_contact}" + return hashlib.sha256(composite.encode()).hexdigest()[:16] + + def extract_persons_from_suppliers(self, suppliers_file: str): + logger.info(f"Reading suppliers from: {suppliers_file}") + data = data_manager.load_latest(suppliers_file) # Use data_manager + items = data if isinstance(data, list) else data.get('payload', {}).get('content', []) + logger.info(f"Found {len(items)} suppliers") + for supplier in items: + self._extract_person_from_supplier(supplier) + logger.info(f"Extracted {len(self.persons)} unique persons") + logger.info(f"Created {len(self.relationships)} relationships") + self._report_collisions() + + def _extract_person_from_supplier(self, supplier: Dict): + name = (supplier.get('contacto') or '').strip() + if not name: + return + email = (supplier.get('correo_contacto') or '').strip() + phone = (supplier.get('telefono_contacto') or '').strip() + celular = (supplier.get('celular_contacto') or '').strip() + position = (supplier.get('posicion_contacto') or '').strip() + person_id = self.generate_person_id(name, email, phone) + norm_name = self.normalize_name(name) + if norm_name not in self.name_registry: + self.name_registry[norm_name] = [] + self.name_registry[norm_name].append(person_id) + if person_id not in self.persons: + self.persons[person_id] = Person( + person_id=person_id, + name=name, + normalized_name=norm_name, + emails=set(), + phones=set(), + positions=set(), + companies=[] + ) + person = self.persons[person_id] + if email and email != "CORREOINVALIDO@PROVEEDORES.COM": + person.emails.add(email) + if phone: + person.phones.add(phone) + if celular: + person.phones.add(celular) + if position: + person.positions.add(position) + rpe = supplier.get('rpe') + razon_social = supplier.get('razon_social') + if rpe and razon_social: + person.companies.append({ + 'rpe': rpe, + 'razon_social': razon_social, + 'position': position + }) + self.relationships.append(Relationship( + person_id=person_id, + person_name=name, + rpe=rpe, + company_name=razon_social, + relationship_type='REPRESENTATIVE_FOR', + position=position, + email=email if email != "CORREOINVALIDO@PROVEEDORES.COM" else None, + phone=phone or celular + )) + + def _report_collisions(self): + collisions = {name: ids for name, ids in self.name_registry.items() if len(ids) > 1} + if collisions: + logger.warning(f"Name Collision Resolution: {len(collisions)} names split into {sum(len(ids) for ids in collisions.values())} persons") + + def save_entities(self): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + # Convert sets to lists + persons_export = [] + for person in self.persons.values(): + person_copy = person.dict() + person_copy['emails'] = list(person_copy['emails']) + person_copy['phones'] = list(person_copy['phones']) + person_copy['positions'] = list(person_copy['positions']) + persons_export.append(person_copy) + persons_file = f"persons_{timestamp}.json" + relationships_file = f"relationships_{timestamp}.json" + + # 1. Save to JSON (Backup/Legacy) + data_manager.save_json(persons_export, persons_file) + data_manager.save_json(self.relationships, relationships_file) + + # 2. Sync to Data Lake (Postgres) + # Convert objects to dicts matching DB schema + try: + self.pg.insert_batch(persons_export, 'risk_persons') + + # Relationships need dict conversion + rels_export = [r.dict() for r in self.relationships] + self.pg.insert_batch(rels_export, 'risk_relationships') + logger.info("Synced entities to Data Lake (Postgres)") + except Exception as e: + logger.error(f"Failed to sync to Data Lake: {e}") + + return persons_file, relationships_file + +def main(): + service = EntityService() + proveedores_files = glob.glob("data/proveedores_*.json") + if proveedores_files: + latest_file = max(proveedores_files, key=os.path.getmtime) + logger.info(f"Using latest file: {latest_file}") + service.extract_persons_from_suppliers(latest_file) + service.save_entities() + else: + logger.error("No suppliers file found in data/") + +if __name__ == "__main__": + main() diff --git a/modules/dr-anticorruption/src/core/external/__init__.py b/modules/dr-anticorruption/src/core/external/__init__.py new file mode 100644 index 0000000..1774cb3 --- /dev/null +++ b/modules/dr-anticorruption/src/core/external/__init__.py @@ -0,0 +1 @@ +# Init file for external module diff --git a/modules/dr-anticorruption/src/core/external/news_client.py b/modules/dr-anticorruption/src/core/external/news_client.py new file mode 100644 index 0000000..a94199c --- /dev/null +++ b/modules/dr-anticorruption/src/core/external/news_client.py @@ -0,0 +1,90 @@ + +import logging +import time +import requests +import urllib.parse +from typing import List, Dict, Any +from bs4 import BeautifulSoup +from datetime import datetime +from src.config.config import config + +logger = logging.getLogger("NewsClient") + +class NewsClient: + def __init__(self): + self.base_url = "https://news.google.com/rss/search" + self.session = requests.Session() + self.session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + }) + self.region = config.get('news.region', 'DO') + self.language = config.get('news.language', 'es-419') + self.risk_keywords = config.get('keywords.risk_news', []) + + def search_entity(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """ + Search Google News RSS for an entity/query. + Returns a list of structured hits with risk scoring. + """ + encoded_query = urllib.parse.quote(query) + # ceid specifies country and language (e.g., DO:es-419) + url = f"{self.base_url}?q={encoded_query}&hl={self.language}&gl={self.region}&ceid={self.region}:{self.language}" + + try: + response = self.session.get(url, timeout=10) + response.raise_for_status() + return self._parse_rss(response.text, limit) + except Exception as e: + logger.error(f"Failed to fetch news for '{query}': {e}") + return [] + + def _parse_rss(self, xml_content: str, limit: int) -> List[Dict[str, Any]]: + hits = [] + try: + soup = BeautifulSoup(xml_content, 'xml') + items = soup.find_all('item') + + for item in items[:limit]: + title = item.title.text if item.title else "No Title" + link = item.link.text if item.link else "" + pub_date = item.pubDate.text if item.pubDate else "" + + # Basic risk scoring based on title + risk_score = self._calculate_risk_score(title) + + hits.append({ + "title": title, + "url": link, + "published_at": pub_date, + "risk_score": risk_score, + "source": "Google News RSS" + }) + except Exception as e: + logger.error(f"Error parsing RSS XML: {e}") + + return hits + + def _calculate_risk_score(self, text: str) -> int: + """ + Score a headline based on presence of corruption keywords. + """ + text_lower = text.lower() + + # Filter out irrelevance + exclude_terms = ["horoscopo", "lotería", "beisbol", "nba", "mlb", "pelicula", "novela"] + if any(term in text_lower for term in exclude_terms): + return 0 + + score = 0 + + for kw in self.risk_keywords: + if kw.lower() in text_lower: + score += 10 + + # Boost for strong phrases + if "procuradur" in text_lower or "pepca" in text_lower: + score += 20 + if "arrest" in text_lower or "prision" in text_lower: + score += 15 + + return min(score, 100) diff --git a/modules/dr-anticorruption/src/core/external/social_scraper.py b/modules/dr-anticorruption/src/core/external/social_scraper.py new file mode 100644 index 0000000..2e4ceb4 --- /dev/null +++ b/modules/dr-anticorruption/src/core/external/social_scraper.py @@ -0,0 +1,58 @@ + +import logging +import time +from typing import List, Dict, Any +try: + from googlesearch import search +except ImportError: + search = None + +logger = logging.getLogger("SocialScraper") + +class SocialScraper: + def __init__(self): + self.platforms = { + "twitter": "site:twitter.com", + "instagram": "site:instagram.com", + "facebook": "site:facebook.com", + "tiktok": "site:tiktok.com" + } + self.risk_keywords = [ + "corrupcion", "fraude", "estafa", "denuncia", "robo", + "preso", "carcel", "delito", "soborno" + ] + + def get_social_intelligence(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: + """ + Scrape social media mentions using Google Dorks (OSINT). + Searches for {query} + {risk_keywords} on major platforms. + """ + if not search: + logger.warning("googlesearch-python not installed. Skipping social scrape.") + return [] + + hits = [] + logger.info(f"🔎 Scanning social media for: {query}...") + + for platform, site_operator in self.platforms.items(): + # Construct a complex query: site:twitter.com "Juan Perez" (corrupcion OR fraude) + risk_query = " OR ".join(self.risk_keywords[:3]) # Keep it short for query limits + full_query = f'{site_operator} "{query}" ({risk_query}) "Republica Dominicana"' + + try: + # search() yields URLs + for url in search(full_query, num_results=limit, lang="es"): + hits.append({ + "platform": platform, + "title": f"Mention on {platform.title()}", # Google search result doesn't give title easily in this lib, but URL is decent + "url": url, + "risk_score": 30, # Base risk for appearing in this context + "source": "OSINT Social Search" + }) + time.sleep(1) # Rate limit protection + except Exception as e: + logger.error(f"Error searching {platform}: {e}") + + # Deduplicate + unique_hits = {h['url']: h for h in hits}.values() + return list(unique_hits) diff --git a/modules/dr-anticorruption/src/core/ingestion.py b/modules/dr-anticorruption/src/core/ingestion.py new file mode 100644 index 0000000..066a8bd --- /dev/null +++ b/modules/dr-anticorruption/src/core/ingestion.py @@ -0,0 +1,149 @@ +""" +Core Ingestion Service for DR Anti-Corruption Platform. +Handles DGCP API extraction with config pagination rate limiting. +""" + +import requests +import logging +from datetime import datetime +import json +import os +import sys +import argparse +import time +from typing import Optional, Dict, Any, List +from pathlib import Path + +from src.config.config import config +from src.data.s3_manager import S3Manager +from src.data.postgres import PostgresManager +from src.core.__init__ import logger # Assume logger setup + +logger = logging.getLogger("IngestionService") + +class IngestionService: + def __init__(self, start_date: Optional[str] = None, end_date: Optional[str] = None): + self.base_url = config.get('dgcp.base_url') + self.session = requests.Session() + self.session.headers.update({ + "User-Agent": "DR-AntiCorruption/1.0", + "Accept": "application/json" + }) + self.max_retries = config.get('dgcp.max_retries', 5) + self.timeout_connect = config.get('dgcp.timeout_connect', 10) + self.timeout_read = config.get('dgcp.timeout_read', 30) + self.rate_limit_delay = config.get('dgcp.rate_limit_delay', 0.2) + self.start_date = start_date + self.end_date = end_date + self.s3 = S3Manager() + self.pg = PostgresManager() + self.pg._init_db() + + def _get_date_params(self) -> Dict[str, str]: + params = {} + if self.start_date and self.end_date: + params["fechaInicio"] = self.start_date + params["fechaFin"] = self.end_date + return params + + def _get_request(self, endpoint: str, params: dict = None) -> Optional[requests.Response]: + url = f"{self.base_url}{endpoint}" + retry_count = 0 + while retry_count <= self.max_retries: + try: + response = self.session.get(url, params=params, timeout=(self.timeout_connect, self.timeout_read)) + if response.status_code == 429: + retry_after = int(response.headers.get("Retry-After", 60)) + logger.warning(f"Rate limit hit for {url}. Waiting {retry_after}s.") + time.sleep(retry_after + 1) + retry_count += 1 + continue + response.raise_for_status() + return response + except requests.exceptions.RequestException as e: + logger.error(f"Request failed for {url}: {e}") + time.sleep(2 ** retry_count) + retry_count += 1 + logger.error(f"Max retries exceeded for {endpoint}") + return None + + def fetch_all_pages(self, endpoint: str, output_name: str, params: dict = None, limit: int = 100, max_pages: int = None): + if params is None: + params = {} + params.update(self._get_date_params()) + params['limit'] = limit + page = 1 + all_data = [] + logger.info(f"Starting ingestion for: {endpoint}") + while True: + if max_pages and page > max_pages: + logger.info(f"Reached max_pages limit ({max_pages}). Stopping.") + break + params['page'] = page + logger.info(f"Fetching page {page}...") + response = self._get_request(endpoint, params) + if not response: + break + try: + data = response.json() + except json.JSONDecodeError: + logger.error(f"Failed to decode JSON for {endpoint} page {page}") + break + items = data if isinstance(data, list) else data.get('data', []) + if isinstance(data, dict) and 'data' not in data: + items = [data] + if not items: + logger.info(f"No more data at page {page}.") + break + all_data.extend(items) + page += 1 + time.sleep(self.rate_limit_delay) + if all_data: + self._save_to_lake(all_data, output_name) + else: + logger.warning(f"No data found for {endpoint}") + + def _save_to_lake(self, data: List[Dict[str, Any]], data_type: str): + ingest_date = self.start_date or datetime.now().strftime('%Y-%m-%d') + key = self.s3.get_partitioned_key(data_type, ingest_date) + self.s3.upload_json(data, key) + table = f"dgcp_{data_type}" + self.pg.insert_batch(data, table) + + # Specific methods unchanged, use self.fetch_all_pages + + def ingest_proveedores(self, **kwargs): + self.fetch_all_pages("/proveedores", "proveedores", **kwargs) + self.fetch_all_pages("/proveedores/rubro", "proveedores_rubros", **kwargs) + self.fetch_all_pages("/proveedores/estadisticas-mujeres", "proveedores_mujeres", **kwargs) + + def ingest(self, target: str = "all", max_pages: int = None): + """Generic ingest dispatcher.""" + fetch_kwargs = {'max_pages': max_pages} if max_pages else {} + if target in ["all", "proveedores"]: + self.ingest_proveedores(**fetch_kwargs) + # Add other targets as needed + logger.info(f"Ingestion for {target} complete.") + +def main(): + parser = argparse.ArgumentParser(description="DGCP Data Ingestion Service") + parser.add_argument("--target", type=str, default="all", + choices=["all", "proveedores", "contratos", "procesos", "pacc", "ocds", "ofertas", "unidades", "tablas", "catalogo"]) + parser.add_argument("--max-pages", type=int, default=None) + parser.add_argument("--startdate", type=str, default=None) + parser.add_argument("--enddate", type=str, default=None) + parser.add_argument("--config", type=str, default=None, help="Config path") + args = parser.parse_args() + if args.config: + from src.config.config import config + config.reload(args.config) + service = IngestionService(start_date=args.startdate, end_date=args.enddate) + fetch_kwargs = {'max_pages': args.max_pages} if args.max_pages else {} + logger.info(f"Starting ingestion (Target: {args.target.upper()})") + if args.target in ["all", "proveedores"]: + service.ingest_proveedores(**fetch_kwargs) + # ... other targets + logger.info("Ingestion complete.") + +if __name__ == "__main__": + main() diff --git a/modules/dr-anticorruption/src/core/inspect_map.py b/modules/dr-anticorruption/src/core/inspect_map.py new file mode 100644 index 0000000..d5097dd --- /dev/null +++ b/modules/dr-anticorruption/src/core/inspect_map.py @@ -0,0 +1,58 @@ + +import requests +from bs4 import BeautifulSoup +import urllib3 +import logging + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +logging.basicConfig(level=logging.INFO) + +def inspect(): + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + url = 'https://map.gob.do/transparencia/' + + try: + print(f"Fetching {url}...") + r = requests.get(url, headers=headers, timeout=15, verify=False) + print(f"Status Code: {r.status_code}") + + if r.status_code == 200: + soup = BeautifulSoup(r.text, 'html.parser') + + # Analyze menu structure (usually 'Nómina', 'Recursos Humanos') + print("\n--- Key Menu Items ---") + links = soup.find_all('a') + keywords = ['nómina', 'nomina', 'funcionarios', 'empleados', 'recursos humanos', 'datos abiertos', 'organigrama'] + + seen = set() + for l in links: + text = l.get_text().strip().lower() + href = l.get('href') + if href and Any(k in text for k in keywords): + if href not in seen: + print(f"FOUND: {l.get_text().strip()} -> {href}") + seen.add(href) + + # Analyze if there is a 'Download' section + print("\n--- Downloadable Files ---") + for l in links: + href = l.get('href') + if href and (href.endswith('.pdf') or href.endswith('.xls') or href.endswith('.xlsx') or href.endswith('.csv')): + print(f"FILE: {l.get_text().strip()} -> {href}") + + else: + print(f"Failed to access page. Content snippet:\n{r.text[:500]}") + + except Exception as e: + print(f"Critical Error: {e}") + +def Any(iterable): + for element in iterable: + if element: + return True + return False + +if __name__ == "__main__": + inspect() diff --git a/modules/dr-anticorruption/src/core/inspect_portal.py b/modules/dr-anticorruption/src/core/inspect_portal.py new file mode 100644 index 0000000..cf6f47b --- /dev/null +++ b/modules/dr-anticorruption/src/core/inspect_portal.py @@ -0,0 +1,45 @@ + +import requests +from bs4 import BeautifulSoup +import urllib3 + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +url = 'https://consultadjp.camaradecuentas.gob.do/' +try: + print(f'Fetching {url}...') + headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} + r = requests.get(url, headers=headers, timeout=10, verify=False) + print(f'Status: {r.status_code}') + + soup = BeautifulSoup(r.text, 'html.parser') + + # diverse checks + print("inputs:") + all_inputs = soup.find_all('input') + for i in all_inputs: + print(f" - Tag: input, ID: {i.get('id')}, Name: {i.get('name')}, Class: {i.get('class')}") + + print("buttons:") + all_buttons = soup.find_all('button') + for b in all_buttons: + print(f" - Tag: button, ID: {b.get('id')}, Text: {b.get_text().strip()}, OnClick: {b.get('onclick')}") + + # Check for 'a' tags that look like buttons + all_links = soup.find_all('a', class_='btn') + for a in all_links: + print(f" - Tag: a (btn), ID: {a.get('id')}, Text: {a.get_text().strip()}, Href: {a.get('href')}, OnClick: {a.get('onclick')}") + + + # Check for scripts like 'app.js' or similar that might hint at an SPA + scripts = soup.find_all('script', src=True) + print(f'Found {len(scripts)} scripts.') + for s in scripts: + print(f'- Src: {s["src"]}') + + # Look for API-like scripts or config + print("\nPage Text Snippet (first 500 chars):") + print(soup.get_text()[:500].strip()) + +except Exception as e: + print(f'Error: {e}') diff --git a/modules/dr-anticorruption/src/core/news_demo.py b/modules/dr-anticorruption/src/core/news_demo.py new file mode 100644 index 0000000..f6e645b --- /dev/null +++ b/modules/dr-anticorruption/src/core/news_demo.py @@ -0,0 +1,48 @@ +""" +News Scraper POC +Validates feasibility of fetching corruption-related news from Google News RSS. +Target: Dominican Republic context. +""" + +import requests +import xml.etree.ElementTree as ET + +def check_news(query): + # Search Dominican Republic news (gl=DO, ceid=DO:es-419) + base_url = "https://news.google.com/rss/search" + params = { + "q": query, + "hl": "es-419", + "gl": "DO", + "ceid": "DO:es-419" + } + + try: + print(f"🔍 Searching news for: '{query}'...") + response = requests.get(base_url, params=params, timeout=10) + response.raise_for_status() + + # Parse XML + root = ET.fromstring(response.content) + items = root.findall('.//item') + + print(f"✅ Found {len(items)} articles.") + print("-" * 50) + + for i, item in enumerate(items[:5]): + title = item.find('title').text + link = item.find('link').text + pubDate = item.find('pubDate').text + print(f"{i+1}. {title}") + print(f" 📅 {pubDate}") + print(f" 🔗 {link}\n") + + except Exception as e: + print(f"❌ Error: {e}") + +if __name__ == "__main__": + # Test 1: General Corruption + check_news("Republica Dominicana Corrupcion") + + # Test 2: Procuraduría + check_news("Procuraduría General Republica Dominicana") diff --git a/modules/dr-anticorruption/src/core/payroll_integration.py b/modules/dr-anticorruption/src/core/payroll_integration.py new file mode 100644 index 0000000..785be61 --- /dev/null +++ b/modules/dr-anticorruption/src/core/payroll_integration.py @@ -0,0 +1,138 @@ + +import pandas as pd +import logging +from typing import List, Dict +from pathlib import Path +from src.data.postgres import PostgresManager + +logger = logging.getLogger("PayrollIntegrator") + +class PayrollIntegrator: + """ + Parses standard 'Nómina Publica' Excel/CSV files to enrich the connection graph. + Source: map.gob.do / datos.gob.do + """ + def __init__(self): + self.pg = PostgresManager() + self._init_schema() + + def _init_schema(self): + try: + self.pg.connect() + cur = self.pg.conn.cursor() + # New Graph Nodes: Public Officials + cur.execute(""" + CREATE TABLE IF NOT EXISTS public_officials ( + cedula TEXT PRIMARY KEY, + full_name TEXT, + institution TEXT, + position TEXT, + salary NUMERIC, + status TEXT, -- 'FIJO', 'TEMPORAL', 'CONTRATADO' + last_updated TIMESTAMP DEFAULT NOW() + ); + """) + # Graph Edges: We will link 'risk_persons' to 'public_officials' by Name matching + # This allows us to see if a Contractor (Risk Person) is also a Public Official. + self.pg.conn.commit() + cur.close() + except Exception as e: + logger.warning(f"DB Init failed: {e}") + + def parse_payroll_file(self, filepath: str, institution_name: str) -> List[Dict]: + """ + Reads a standard DIGEIG Payroll Excel file. + Columns usually: 'Nombre', 'Cargo', 'Estatus', 'Sueldo Bruto' + """ + records = [] + try: + df = pd.read_excel(filepath) + # Normalizing column names + df.columns = [c.upper().strip() for c in df.columns] + + # Common column mappings + col_map = { + 'NOMBRE': 'full_name', + 'EMPLEADO': 'full_name', + 'CARGO': 'position', + 'FUNCION': 'position', + 'SUELDO BRUTO': 'salary', + 'SALARIO': 'salary', + 'ESTATUS': 'status' + } + + # Identify valid columns + valid_cols = {k: v for k, v in col_map.items() if k in df.columns} + if not valid_cols: + logger.error(f"No recognizing columns in {filepath}. Columns: {df.columns}") + return [] + + for _, row in df.iterrows(): + # Extract data + name = row.get(next((k for k,v in valid_cols.items() if v == 'full_name'), None)) + pos = row.get(next((k for k,v in valid_cols.items() if v == 'position'), None)) + sal = row.get(next((k for k,v in valid_cols.items() if v == 'salary'), 0)) + + if name and isinstance(name, str): + records.append({ + "full_name": name.strip().upper(), + "institution": institution_name, + "position": pos, + "salary": sal, + "status": "ACTIVE" + }) + + logger.info(f"Extracted {len(records)} officials from {institution_name}") + return records + + except Exception as e: + logger.error(f"Parsing failed: {e}") + return [] + + def load_into_graph(self, officials: List[Dict]): + """ + Syncs officials to DB and runs a conflict check against RiskPersons. + """ + # Fallback: Dump to JSON for RiskService to pick up if DB is down + try: + import json + dump_path = Path("data/raw/public_officials_dump.json") + with open(dump_path, 'w', encoding='utf-8') as f: + json.dump(officials, f, indent=2, default=str) + logger.info(f"Dumped {len(officials)} officials to {dump_path}") + except Exception as e: + logger.warning(f"Failed to dump JSON: {e}") + + try: + self.pg.connect() + cur = self.pg.conn.cursor() + + for off in officials: + # 1. Insert/Update Official + cur.execute(""" + INSERT INTO public_officials (cedula, full_name, institution, position, salary, status) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (cedula) DO UPDATE SET + position = EXCLUDED.position, + salary = EXCLUDED.salary, + last_updated = NOW(); + """, ('generate_hash_' + off['full_name'], off['full_name'], off['institution'], off['position'], off['salary'], off['status'])) + + # 2. Influence Analysis: Link back to Graph + # If this name exists in 'risk_persons' (contractors), we have a DIRECT HIT. + cur.execute("SELECT person_id FROM risk_persons WHERE normalized_name = %s", (off['full_name'],)) + match = cur.fetchone() + if match: + logger.warning(f"CONFLICT OF INTEREST: {off['full_name']} is an Official at {off['institution']} and a Registered Supplier Representative!") + # Create graph edge: Person -> [HAS_JOB] -> Institution + + self.pg.conn.commit() + cur.close() + except Exception as e: + logger.error(f"Graph Sync failed: {e}") + +if __name__ == "__main__": + # Test Stub + logging.basicConfig(level=logging.INFO) + integrator = PayrollIntegrator() + print("Payroll Integrator Ready. Feed me Excel files.") diff --git a/modules/dr-anticorruption/src/core/pep_analyzer.py b/modules/dr-anticorruption/src/core/pep_analyzer.py new file mode 100644 index 0000000..43a5e1c --- /dev/null +++ b/modules/dr-anticorruption/src/core/pep_analyzer.py @@ -0,0 +1,184 @@ + +import logging +import requests +import pdfplumber +import os +from pathlib import Path +from typing import List, Dict, Any +from datetime import datetime +from src.data.postgres import PostgresManager +from src.config.config import config + +logger = logging.getLogger("PEPAnalyzer") + +class PEPAnalyzer: + def __init__(self): + self.raw_dir = Path("data/raw/ccrd") + self.raw_dir.mkdir(parents=True, exist_ok=True) + self.pg = PostgresManager() + try: + self._init_tables() + except Exception as e: + logger.warning(f"Skipping table initialization (DB down): {e}") + + def _init_tables(self): + """Initialize PEP related tables in Postgres.""" + self.pg.connect() + cur = self.pg.conn.cursor() + cur.execute(""" + CREATE TABLE IF NOT EXISTS pep_registry ( + pep_id SERIAL PRIMARY KEY, + name TEXT, + normalized_name TEXT UNIQUE, + institution TEXT, + position TEXT, + status TEXT, -- e.g., 'OMISO', 'CUMPLIO', 'EXTEMPORANEO' + report_source TEXT, + last_updated TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ); + """) + self.pg.conn.commit() + cur.close() + + def download_report(self, url: str, filename: str) -> Path: + """Download a PDF report from Cámara de Cuentas.""" + filepath = self.raw_dir / filename + logger.info(f"Downloading report from {url}...") + try: + response = requests.get(url, timeout=30) + response.raise_for_status() + with open(filepath, 'wb') as f: + f.write(response.content) + logger.info(f"Report saved to {filepath}") + return filepath + except Exception as e: + logger.error(f"Failed to download report: {e}") + return None + + def parse_omisos_pdf(self, filepath: Path): + """Parse the 'Omisos' PDF and extract data.""" + peps = [] + logger.info(f"Parsing PDF: {filepath}") + try: + with pdfplumber.open(filepath) as pdf: + for page in pdf.pages: + table = page.extract_table() + if not table: + continue + + # Assume table structure: [Name, Institution, Position, ...] + # We might need to handle headers on each page + for row in table: + # Clean row data + row = [str(cell).strip() for cell in row if cell] + + if not row: continue + + # Header detection + if "NOMBRE" in row[0] or "INSTITUCIÓN" in row[0]: + continue + + name, institution, position = "UNKNOWN", "UNKNOWN", "UNKNOWN" + + # Heuristic V2: + # Sometimes PDF extraction merges everything into one cell: + # "2 LAUREANO GUERRERO SANCHEZ ... 339-20 ... OFICINA ..." + if len(row) == 1: + parts = row[0].split() + # It's hard to split perfectly without separators, but let's try basic heuristics + if len(parts) > 5: + name = " ".join(parts[1:5]) # Guess name is first few words after index + institution = "Check PDF Manual" + position = "Check PDF Manual" + else: + continue # Garbage row + + elif len(row) >= 3: + # 0: Index/Name mixed? 1: Decree? 2: Institution/Position? + # Standard format: [Index Name, Decree, Institution Position Status] + # Or: [Name, Institution, Position] + + # Let's assume indices are in the first column "1 NAME NAME" + col0 = row[0] + # Remove leading number "1 " + import re + name = re.sub(r'^\d+\s+', '', col0) + + institution = row[1] + position = row[2] + + if len(name) < 4 or "UNKNOWN" in name: + continue + + peps.append({ + "name": name, + "normalized_name": name.upper(), + "institution": institution, + "position": position, + "status": "OMISO", + "report_source": filepath.name + }) + + logger.info(f"Extracted {len(peps)} records from PDF.") + return peps + except Exception as e: + logger.error(f"Failed to parse PDF: {e}") + return [] + + def sync_to_db(self, peps: List[Dict]): + """Upsert PEP records into the database.""" + try: + self.pg.connect() + cur = self.pg.conn.cursor() + count = 0 + for pep in peps: + try: + cur.execute(""" + INSERT INTO pep_registry (name, normalized_name, institution, position, status, report_source) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (normalized_name) DO UPDATE SET + status = EXCLUDED.status, + report_source = EXCLUDED.report_source, + last_updated = CURRENT_TIMESTAMP; + """, (pep['name'], pep['normalized_name'], pep['institution'], + pep['position'], pep['status'], pep['report_source'])) + count += 1 + except Exception as e: + logger.error(f"Error syncing PEP {pep['name']}: {e}") + self.pg.conn.rollback() + + self.pg.conn.commit() + cur.close() + logger.info(f"Synced {count} PEP records to Data Lake.") + except Exception as e: + logger.error(f"Failed to connect to Data Lake for sync: {e}") + self.save_to_json(peps) + + def save_to_json(self, peps: List[Dict]): + """Fallback: Save extraction to JSON if DB is down.""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"pep_extraction_{timestamp}.json" + filepath = self.raw_dir / filename + import json + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(peps, f, indent=2, ensure_ascii=False) + logger.info(f"Saved {len(peps)} PEP records to fallback JSON: {filepath}") + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + analyzer = PEPAnalyzer() + + # Example: List of designados omisos 2024 + url = "https://camaradecuentas.gob.do/phocadownload/Area_sustantivas/Declaracion_Jurada/Reportes/OMISOS/2024/HIST%C3%93RICO%20%28+%29/Hist%C3%B3rico-Listado%20omisos%20designados%20identificados.pdf" + filename = "omisos_designados_2024.pdf" + + # Step 1: Download + pdf_path = analyzer.download_report(url, filename) + + # Step 2: Parse if successful + if pdf_path and pdf_path.exists(): + extracted_data = analyzer.parse_omisos_pdf(pdf_path) + + # Step 3: Sync + if extracted_data: + analyzer.sync_to_db(extracted_data) diff --git a/modules/dr-anticorruption/src/core/pipeline.py b/modules/dr-anticorruption/src/core/pipeline.py new file mode 100644 index 0000000..e01557a --- /dev/null +++ b/modules/dr-anticorruption/src/core/pipeline.py @@ -0,0 +1,33 @@ +from src.core.ingestion import IngestionService +from src.core.risk_service import RiskService +from src.data.data_manager import data_manager + +class RiskPipeline: + def __init__(self): + self.ingestion = IngestionService() + self.risk = RiskService() + + def run_full(self, target='all', max_pages=None): + """ + Executes the full anti-corruption pipeline: + 1. Ingest Data from Official Sources + 2. Perform Risk Analysis + """ + # 1. Ingestion + self.ingestion.ingest(target, max_pages) + + # 2. Risk Analysis (Batch) + # Note: In a production environment, this might be triggered per entity + self.risk.batch_analyze(limit=10) + + def analyze_entity(self, entity_id: str): + """Analyzes a specific entity.""" + # Find supplier in database/files + proveedores = self.risk.pg.query_graph_data()[2] if hasattr(self.risk, 'pg') else [] + if not proveedores: + proveedores = data_manager.load_latest('proveedores_full_*.json') + + supplier = next((p for p in proveedores if str(p.get('rpe')) == entity_id), None) + if supplier: + return self.risk.analyze_supplier(supplier.get('razon_social', 'Unknown'), supplier) + return None diff --git a/modules/dr-anticorruption/src/core/probe_api.py b/modules/dr-anticorruption/src/core/probe_api.py new file mode 100644 index 0000000..f511c4b --- /dev/null +++ b/modules/dr-anticorruption/src/core/probe_api.py @@ -0,0 +1,66 @@ + +import requests +import urllib3 +import logging + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("APIProbe") + +base_url = 'https://consultadjp.camaradecuentas.gob.do' +endpoints = [ + '/Home/Buscar', + '/Home/Search', + '/Declaracion/Buscar', + '/Declaracion/Search', + '/Consulta/Buscar', + '/Consulta/GetDeclaraciones', + '/Consulta/Listar', + '/Home/Listar', + '/api/declaraciones', + '/api/search', + '/Home/GetDeclaraciones' +] + +# Common payload based on inputs: txtNombre, txtCedula +# The server likely expects specific keys. Usually matching the ID or Name attribute. +# Since Name attribute was None in my inspection, they typically use ID-based serialization or manual JSON construction. +# Let's try standard naming conventions. +payloads = [ + {"nombre": "JUAN"}, + {"txtNombre": "JUAN"}, + {"filter": "JUAN"}, + {"q": "JUAN"}, + {"cedula": "", "nombre": "JUAN", "institucion": "", "cargo": ""}, +] + +def probe(): + headers = { + 'User-Agent': 'Mozilla/5.0', + 'X-Requested-With': 'XMLHttpRequest', # Important for ASP.NET MVC to recognize AJAX + 'Content-Type': 'application/json' # Try JSON first + } + + for ep in endpoints: + url = base_url + ep + for payload in payloads: + try: + # Try POST (most search forms use POST) + r = requests.post(url, json=payload, headers=headers, verify=False, timeout=5) + if r.status_code == 200: + logger.info(f"SUCCESS! POST {url} returned 200.") + logger.info(f"Response snippet: {r.text[:200]}") + return # Stop on first hit + + # Try GET + r_get = requests.get(url, params=payload, headers=headers, verify=False, timeout=5) + if r_get.status_code == 200 and len(r_get.text) > 500: # Ignore short standard error pages + logger.info(f"SUCCESS! GET {url} returned 200.") + logger.info(f"Response snippet: {r_get.text[:200]}") + return + + except Exception as e: + logger.error(f"Failed {url}: {e}") + +if __name__ == "__main__": + probe() diff --git a/modules/dr-anticorruption/src/core/risk_engine.py b/modules/dr-anticorruption/src/core/risk_engine.py new file mode 100644 index 0000000..252e929 --- /dev/null +++ b/modules/dr-anticorruption/src/core/risk_engine.py @@ -0,0 +1,321 @@ +import logging +import json +import glob +import os +from datetime import datetime +from typing import Dict, List, Optional, Set +from core.external.news_client import NewsClient +from core.external.social_client import SocialClient +from core.external.social_scraper import SocialScraper + +# Configure logger +logger = logging.getLogger("RiskEngine") + +class RiskEngine: + def __init__(self): + self.news_client = NewsClient() + self.social_scraper = SocialScraper() + self.cache = {} + + # Internal Data Graph + self.persons = {} + self.relationships = [] + self.company_to_people = {} + self.person_to_companies = {} + self.rm_to_companies = {} + self.legal_hits = {} + self.rpe_to_address = {} + + # Load the graph + self._load_internal_data() + + def _load_internal_data(self): + """Discover and load entity data, legal hits, and physical address clusters.""" + try: + p_files = glob.glob("data/persons_*.json") + r_files = glob.glob("data/relationships_*.json") + l_files = glob.glob("data/legal_social_hits_*.json") + + # Load basic graph + if p_files and r_files: + latest_p = max(p_files, key=os.path.getmtime) + latest_r = max(r_files, key=os.path.getmtime) + with open(latest_p, 'r', encoding='utf-8') as f: + persons_list = json.load(f) + self.persons = {p['person_id']: p for p in persons_list} + with open(latest_r, 'r', encoding='utf-8') as f: + self.relationships = json.load(f) + + # 1. Identity & Relationship Index + for rel in self.relationships: + rpe = str(rel['rpe']) + pid = rel['person_id'] + if rpe not in self.company_to_people: self.company_to_people[rpe] = [] + self.company_to_people[rpe].append(pid) + if pid not in self.person_to_companies: self.person_to_companies[pid] = set() + self.person_to_companies[pid].add(rpe) + + # 2. Advanced Physical & Owner Cluster Brain + self.hub_density = {} # address -> count of UNIQUE owners + try: + # Map RPE to its owners first + rpe_to_owner_count = {} + for rel in self.relationships: + rpe = str(rel['rpe']) + if rpe not in rpe_to_owner_count: rpe_to_owner_count[rpe] = set() + rpe_to_owner_count[rpe].add(rel['person_id']) + + # Use full registry to map addresses to unique owners + latest_full = max(glob.glob("data/proveedores_full_*.json"), key=os.path.getmtime) + with open(latest_full, 'r', encoding='utf-8') as f: + reg_data = json.load(f) + for s in reg_data.get('payload', {}).get('content', []): + rpe = str(s.get('rpe')) + addr = (s.get('direccion') or "").strip().upper() + if len(addr) > 10: + self.rpe_to_address[rpe] = addr + owners = rpe_to_owner_count.get(rpe, set()) + if addr not in self.hub_density: self.hub_density[addr] = set() + self.hub_density[addr].update(owners) + + # Convert sets to counts for faster lookups + self.hub_density = {k: len(v) for k, v in self.hub_density.items()} + logger.info(f"Physical Brain: Identified {len([h for h in self.hub_density.values() if h > 2])} High-Density Hubs.") + except: logger.warning("Could not build physical-owner hub index.") + + # 3. Load Forensic signals (Versatility & Activation) + self.forensic_risks = {} + + # Versatility + if os.path.exists("data/versatility_hits.json"): + with open("data/versatility_hits.json", 'r', encoding='utf-8') as f: + hits = json.load(f) + for hit in hits: + self.forensic_risks[hit['rpe']] = self.forensic_risks.get(hit['rpe'], []) + self.forensic_risks[hit['rpe']].append({ + 'score': hit['risk_score'], + 'factor': f"Versatilidad Sospechosa: {hit['reason']}", + 'type': 'VERSATILITY' + }) + + # Activation Spikes + if os.path.exists("data/activation_spikes.json"): + with open("data/activation_spikes.json", 'r', encoding='utf-8') as f: + hits = json.load(f) + for hit in hits: + # Assuming structure: {'rpe': ..., 'spike_in_30d': ..., 'concentration': ...} + score = 40 if hit['concentration'] > 80 else 20 + reason = f"Activación Súbita: {hit['spike_in_30d']} contratos en 30 días ({hit['concentration']}% del total)" + self.forensic_risks[hit['rpe']] = self.forensic_risks.get(hit['rpe'], []) + self.forensic_risks[hit['rpe']].append({ + 'score': score, + 'factor': reason, + 'type': 'ACTIVATION_SPIKE' + }) + + except Exception as e: + logger.error(f"Error loading internal data: {e}") + + def analyze_supplier(self, supplier_name: str, supplier_data: Dict) -> Dict: + """ + Comprehensive forensic analysis including Physical, Social, and Transactional layers. + """ + risk_score = 0 + risk_factors = [] + rpe = str(supplier_data.get('rpe')) + + # 1. Internal/Static + if self._is_new_company(supplier_data.get('fecha_creacion_empresa')): + risk_score += 15 + risk_factors.append("Empresa de reciente creación (Riesgo de maletín)") + + # 2. Physical Hub Forensics (Density of Owners) + addr = self.rpe_to_address.get(rpe) + if addr: + owner_density = self.hub_density.get(addr, 0) + if owner_density > 20: + risk_score += 40 + risk_factors.append(f"Hub de Alta Densidad: {owner_density} propietarios únicos registrados aquí (Riesgo de Camuflaje)") + elif owner_density > 5: + risk_score += 20 + risk_factors.append(f"Hub compartido identificada: {owner_density} propietarios en este bloque") + else: + owner_density = 0 + + # 3. Transactional Forensics (Versatility & Activation) + if rpe in self.forensic_risks: + for risk in self.forensic_risks[rpe]: + risk_score += risk['score'] + risk_factors.append(risk['factor']) + + # 4. Network Analysis (Graph) + network_risk = self._calculate_network_risk(rpe) + risk_score += network_risk['points'] + risk_factors.extend(network_risk['factors']) + + # 5. Intelligence (News/Social) + # Only run deep int if risk is already moderate to save API calls + veracity = 0 + news_intelligence = {'hits': []} + + # Always run basic news check if we have data + if risk_score > 20 or self.news_client: # Only if we have client + news_intelligence = self._get_deep_intelligence(supplier_name) + risk_score += news_intelligence['points'] + risk_factors.extend(news_intelligence['factors']) + veracity = max(network_risk.get('max_veracity', 0), news_intelligence.get('veracity', 0)) + + if owner_density > 10: veracity = max(veracity, 3) + + return { + "entity": supplier_name, + "rpe": rpe, + "address": addr, + "risk_score": min(risk_score, 100), + "risk_level": self._get_risk_level(risk_score), + "veracity_rank": veracity, + "factors": list(set(risk_factors)), + "evidence": { + "news": news_intelligence['hits'], + "forensics": self.forensic_risks.get(rpe, []), + "physical_hub": {"address": addr, "unique_owner_count": owner_density} + } + } + + def _get_deep_intelligence(self, name: str) -> Dict: + """Fetch news and calculate veracity across multiple tiers.""" + points = 0 + factors = [] + + # Tier 1: Local Media & Legal + hits = self.news_client.search_entity(name) + + # Tier 2: Social Activity + social_mentions = self.social_scraper.get_social_intelligence(name) + + veracity = 0 + is_giant_match = False + + # Scoring Tier 1 + if hits: + total_news_risk = 0 + for h in hits: + h_title = h['title'].lower() + h_score = h['risk_score'] + + # Check for DR Investigative Giants (Nuria, Alicia Ortega, etc.) + for giant in ["nuria", "alicia ortega", "acento", "sin"]: + if giant in h_title: + h_score += 50 # Massive boost + veracity = 5 + is_giant_match = True + factors.append(f"Investigación de alto perfil detectada ({giant.upper()})") + + total_news_risk += h_score + + points += min(total_news_risk, 60) + factors.append(f"Menciones en prensa dominicana ({len(hits)} artículos)") + if veracity < 3: veracity = 3 + + # Scoring Tier 2 + if social_mentions: + total_social_risk = 10 + for s in social_mentions: + s_title = s['title'].lower() + # Check for Whistleblowers + for wb in ["somos pueblo", "tolentino", "cavada", "espresate"]: + if wb in s_title: + total_social_risk += 15 + if veracity < 3: veracity = 3 + factors.append(f"Alerta de denunciante social detectada ({wb.upper()})") + + points += min(total_social_risk, 30) + factors.append(f"Actividad en redes sociales ({len(social_mentions)} posts)") + if veracity < 1: veracity = 1 + + return { + "points": points, + "factors": list(set(factors)), + "hits": hits + social_mentions, + "veracity": veracity + } + + def _calculate_network_risk(self, rpe: str) -> Dict: + points = 0 + factors = [] + details = [] + person_ids = self.company_to_people.get(rpe, []) + + for pid in person_ids: + person = self.persons.get(pid) + if not person: continue + p_name = person['name'] + + # Legal Association (The Strongest Signal) + if p_name in self.legal_hits: + hits = self.legal_hits[p_name] + points += 50 + factors.append(f"Vínculo con investigaciones: {p_name}") + details.append({"type": "LEGAL_HIT", "person": p_name, "hits": hits}) + + # Concentration + linked_companies = self.person_to_companies.get(pid, set()) + if len(linked_companies) > 3: + weight = min(len(linked_companies) * 8, 30) + points += weight + factors.append(f"Representante en múltiples empresas ({len(linked_companies)}): {p_name}") + details.append({"type": "CONCENTRATION_RISK", "person": p_name, "count": len(linked_companies)}) + + return { + "points": min(points, 75), + "factors": list(set(factors)), + "details": details + } + + def _is_new_company(self, date_str: str) -> bool: + if not date_str: return False + return any(year in str(date_str) for year in ["2024-", "2025-", "2026-"]) + + def _get_risk_level(self, score: int) -> str: + if score >= 75: return "CRITICAL" + if score >= 50: return "HIGH" + if score >= 25: return "MEDIUM" + return "LOW" + + def _get_news_intelligence(self, name: str) -> List[Dict]: + """Fetch news with caching to avoid API overuse.""" + if name in self.cache: + return self.cache[name] + + hits = self.news_client.search_entity(name) + risky_hits = [h for h in hits if h['risk_score'] > 0] + self.cache[name] = risky_hits + return risky_hits + + def _is_new_company(self, date_str: str) -> bool: + """Check if company was created recently (e.g., 2024-2026).""" + if not date_str: + return False + return any(year in str(date_str) for year in ["2024-", "2025-", "2026-"]) + + def _get_risk_level(self, score: int) -> str: + if score >= 75: return "CRITICAL" + if score >= 50: return "HIGH" + if score >= 25: return "MEDIUM" + return "LOW" + +if __name__ == "__main__": + import sys + logging.basicConfig(level=logging.INFO) + + engine = RiskEngine() + # Test with a known "representative" from the sample + # Search for an RPE in the index + if engine.company_to_people: + test_rpe = list(engine.company_to_people.keys())[0] + print(f"\n🔍 Testing Risk Analysis for RPE: {test_rpe}") + # We need the full object for analyze_supplier + report = engine.analyze_supplier("Test Entity", {"rpe": test_rpe, "fecha_creacion_empresa": "2025-01-01"}) + print(json.dumps(report, indent=2, ensure_ascii=False)) + else: + print("No graph data to test.") diff --git a/modules/dr-anticorruption/src/core/risk_pipeline.py b/modules/dr-anticorruption/src/core/risk_pipeline.py new file mode 100644 index 0000000..14c2446 --- /dev/null +++ b/modules/dr-anticorruption/src/core/risk_pipeline.py @@ -0,0 +1,108 @@ +import json +import logging +import glob +import os +import argparse +from core.risk_engine import RiskEngine +from datetime import datetime + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') +logger = logging.getLogger("RiskPipeline") + +def run_pipeline(limit=10): + print("\n" + "=" * 60) + print("🚀 DR ANTI-CORRUPTION: RISK ANALYSIS PIPELINE".center(60)) + print("=" * 60) + + # 1. Discover Latest Data + p_files = glob.glob("data/proveedores_full_*.json") + if not p_files: + print("❌ ERROR: No full suppliers file found in data/") + return + + input_file = max(p_files, key=os.path.getmtime) + print(f"📖 Loading data source: {input_file}") + + with open(input_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + items = data.get('payload', {}).get('content', []) + print(f" Total Suppliers Loaded: {len(items)}") + + # 2. Strategy: Target Discovery + engine = RiskEngine() + targets = [] + + print("\n🔍 Identifying High-Risk Targets...") + + # Strategy A: Multi-Company Representatives (Network Concentration) + multi_rep_rpes = set() + for rpe, pids in engine.company_to_people.items(): + for pid in pids: + if len(engine.person_to_companies.get(pid, set())) >= 5: + multi_rep_rpes.add(rpe) + + # Strategy B: Keywords (Constructora, etc.) + keyword_rpes = {str(s.get('rpe')) for s in items if s.get('razon_social') and 'CONSTRUCTORA' in s['razon_social'].upper()} + + priority_rpes = multi_rep_rpes.union(keyword_rpes) + print(f" Targets via Network Concentration: {len(multi_rep_rpes)}") + print(f" Targets via Keywords: {len(keyword_rpes)}") + print(f" Total unique priority targets: {len(priority_rpes)}") + + # Filter items to only priority targets + priority_items = [s for s in items if str(s.get('rpe')) in priority_rpes] + + # 3. Analyze + results = [] + selected_targets = priority_items[:limit] + + print(f"\n⚡ Starting Deep Analysis (Sample Limit: {limit})...") + print("-" * 60) + + for i, supplier in enumerate(selected_targets): + name = supplier.get('razon_social', 'Unknown') + rpe = supplier.get('rpe') + + print(f" [{i+1}/{len(selected_targets)}] Analyzing RPE {rpe}: {name[:30]}...") + + # Analyze + risk_report = engine.analyze_supplier(name, supplier) + results.append(risk_report) + + print(f"\n✅ Analysis Complete.") + + # 4. Save Report + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + output_file = f"data/risk_report_batch_{timestamp}.json" + + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(results, f, indent=2, ensure_ascii=False) + + print(f"💾 Detailed report saved to: {output_file}") + + # 5. Dashboard View + print("\n" + "🏁 RISK DASHBOARD SUMMARY ".center(60, "-")) + print(f"{'RISK':<8} | {'SCORE':<5} | {'ENTITY'}") + print("-" * 60) + + # Sort results by risk score + sorted_results = sorted(results, key=lambda x: x['risk_score'], reverse=True) + + for res in sorted_results: + color = "🔴" if res['risk_score'] >= 50 else ("🟡" if res['risk_score'] >= 25 else "🟢") + name = res['entity'][:45] + print(f"{color} {res['risk_level']:<6} | {res['risk_score']:>3} | {name}") + for factor in res['factors'][:3]: + print(f" ↳ ⚠️ {factor}") + if len(res['factors']) > 3: + print(f" ↳ ... and {len(res['factors'])-3} more factors") + print("-" * 60) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="DR Anti-Corruption Risk Pipeline") + parser.add_argument("--limit", type=int, default=10, help="Number of entities to analyze") + args = parser.parse_args() + + run_pipeline(limit=args.limit) diff --git a/modules/dr-anticorruption/src/core/risk_service.py b/modules/dr-anticorruption/src/core/risk_service.py new file mode 100644 index 0000000..cb1d0d1 --- /dev/null +++ b/modules/dr-anticorruption/src/core/risk_service.py @@ -0,0 +1,509 @@ +import json +import glob +import os +from datetime import datetime +from typing import Dict, List, Optional, Set +import sqlite3 +import logging +from psycopg2.extras import RealDictCursor +from pathlib import Path + +from src.config.config import config +from src.data.data_manager import data_manager +from src.data.database import db +from src.data.postgres import PostgresManager +from src.data.models import Person, Relationship, RiskReport, RiskFactor + +from src.core.social_analyzer import SocialAnalyzer +from src.core.external.social_scraper import SocialScraper + +logger = logging.getLogger("RiskService") + +class RiskService: + def __init__(self): + self.news_client = None # Lazy + self.social_scraper = SocialScraper() + self.social_analyzer = SocialAnalyzer() + self.cache: Dict = {} + self.pg = PostgresManager() # Connect to Data Lake + self._load_graphs() + self._build_indices() + + # ... (skipping unchanged properties) + + def _get_deep_intelligence(self, name: str) -> Dict: + """ + Gather deep intelligence from News (SocialAnalyzer) and Social Media (SocialScraper). + """ + points = 0 + factors = [] + all_hits = [] + veracity_score = 0 + + # 1. News Analysis (The "Official" Filter) + # We reuse the logic already built in SocialAnalyzer for legal risks + # This checks for "Caso Antipulpo", "PGR", etc. + logger.info(f"Running News Analysis for {name}...") + # Note: analyze_legal_risks returns a list of "associations" dicts + news_risks = self.social_analyzer.analyze_legal_risks([name]) + + for risk in news_risks: + points += 25 + factors.append(f"Alerta de Prensa: {risk.get('context')} ({risk.get('article_title')})") + all_hits.append(risk) + veracity_score = max(veracity_score, 3) + + # 2. Social Media OSINT (The "Street" Filter) + if points > 0: # Only check social if we already have some suspicion, or always if configured + pass + + # Let's always run a quick social scan for high-risk profiles + logger.info(f"Running Social OSINT for {name}...") + social_hits = self.social_scraper.get_social_intelligence(name, limit=3) + + if social_hits: + points += 10 + factors.append(f"Menciones en Redes Sociales (posible denuncia): {len(social_hits)} hits") + all_hits.extend(social_hits) + + return { + "points": min(points, 60), # Cap intel risk at 60 + "factors": list(set(factors)), + "hits": all_hits, + "veracity": veracity_score + } + + def _load_graphs(self): + """Load graph data directly from the Data Lake (Postgres).""" + logger.info("Connecting to Data Lake...") + try: + # Fetch from DB + db_persons, db_rels, db_providers = self.pg.query_graph_data() + + # 1. Load Persons + if db_persons: + self.persons = {p['person_id']: p for p in db_persons} + logger.info(f"Loaded {len(self.persons)} persons from DB.") + else: + logger.warning("Data Lake is empty (persons). Fallback to JSON.") + self.persons = data_manager.load_latest('persons_*.json') + + # 2. Load Relationships + if db_rels: + self.relationships = db_rels + logger.info(f"Loaded {len(self.relationships)} relationships from DB.") + else: + self.relationships = data_manager.load_latest('relationships_*.json') + + # 3. Cache Providers for Address Hubs + self.cached_providers = db_providers + + self.forensic_risks = self._load_forensic_risks() + + except Exception as e: + logger.error(f"Data Lake Connection Failed: {e}. Falling back to local files.") + self.persons = data_manager.load_latest('persons_*.json') + self.relationships = data_manager.load_latest('relationships_*.json') + self.cached_providers = [] + + def _build_indices(self): + self.company_to_people: Dict[str, List[str]] = {} + self.person_to_companies: Dict[str, Set[str]] = {} + self.rpe_to_address: Dict[str, str] = {} + self.hub_density: Dict[str, int] = {} + self._build_relationship_indices() + self._build_address_hubs() + logger.info("Built indices") + + def _build_relationship_indices(self): + for rel in self.relationships: + rpe = str(rel['rpe']) + pid = rel['person_id'] + if rpe not in self.company_to_people: + self.company_to_people[rpe] = [] + self.company_to_people[rpe].append(pid) + if pid not in self.person_to_companies: + self.person_to_companies[pid] = set() + self.person_to_companies[pid].add(rpe) + + def _build_address_hubs(self): + # Use DB data if available, else load JSON + if hasattr(self, 'cached_providers') and self.cached_providers: + proveedores = self.cached_providers + else: + proveedores = data_manager.load_latest('proveedores_full_*.json') + + rpe_to_owner_count = {} + for rel in self.relationships: + # Handle DB dict vs JSON dict keys if different + # DB returns RealDictCursor, so keys are strings + rpe = str(rel.get('rpe')) + pid = rel.get('person_id') + if rpe not in rpe_to_owner_count: + rpe_to_owner_count[rpe] = set() + rpe_to_owner_count[rpe].add(pid) + self.hub_density = {} + for p in proveedores: + rpe = str(p.get('rpe')) + addr = (p.get('direccion') or "").strip().upper() + if len(addr) > 10: + self.rpe_to_address[rpe] = addr + owners = rpe_to_owner_count.get(rpe, set()) + if addr not in self.hub_density: + self.hub_density[addr] = set() + self.hub_density[addr].update(owners) + self.hub_density = {k: len(v) for k, v in self.hub_density.items()} + logger.info(f"Physical hubs: {len([h for h in self.hub_density.values() if h > 2])} high-density") + + def _load_forensic_risks(self): + forensic_risks = {} + if Path('data/versatility_hits.json').exists(): + with open('data/versatility_hits.json', 'r', encoding='utf-8') as f: + hits = json.load(f) + for hit in hits: + rpe = hit['rpe'] + forensic_risks.setdefault(rpe, []).append({ + 'score': hit['risk_score'], + 'factor': f"Versatilidad Sospechosa: {hit['reason']}", + 'type': 'VERSATILITY' + }) + # Activation spikes similar + return forensic_risks + + def analyze_supplier(self, supplier_name: str, supplier_data: Dict) -> RiskReport: + risk_score = 0 + risk_factors = [] + rpe = str(supplier_data.get('rpe')) + # Static + if self._is_new_company(supplier_data.get('fecha_creacion_empresa')): + risk_score += config.get('risk.new_company_score', 15) + risk_factors.append("Empresa de reciente creación") + # Physical hub + addr = self.rpe_to_address.get(rpe) + if addr: + owner_density = self.hub_density.get(addr, 0) + if owner_density > config.get('risk.hub_density_high', 20): + risk_score += config.get('risk.hub_high_score', 40) + risk_factors.append(f"Hub Alta Densidad: {owner_density} propietarios") + elif owner_density > config.get('risk.hub_density_medium', 5): + risk_score += config.get('risk.hub_medium_score', 20) + risk_factors.append(f"Hub compartido: {owner_density}") + # Forensics + for risk in self.forensic_risks.get(rpe, []): + risk_score += risk['score'] + risk_factors.append(risk['factor']) + # Network + network_risk = self._calculate_network_risk(rpe) + risk_score += network_risk['points'] + risk_factors.extend(network_risk['factors']) + + # PEP & Conflict of Interest (Data Lake Check) + pep_data = self._check_pep_status(rpe) + if pep_data['is_pep']: + risk_score += pep_data['points'] + risk_factors.extend(pep_data['factors']) + + # Payroll Conflict (Active Verification) + payroll_data = self._check_payroll_status(rpe) + if payroll_data['is_official']: + risk_score += 60 # Critical Base Score for Conflict + risk_factors.extend(payroll_data['factors']) + + # Intel + intel = self._get_deep_intelligence(supplier_name) + risk_score += intel['points'] + risk_factors.extend(intel['factors']) + + # Multipliers (Non-Linear Scoring) + multiplier = 1.0 + if payroll_data['is_official'] and pep_data['is_pep']: + multiplier = 1.5 + risk_factors.append("ALERTA MÁXIMA: Funcionario Activo + PEP + Proveedor.") + + if payroll_data['is_official'] and intel['veracity'] > 2: + multiplier = 1.3 + risk_factors.append("ALERTA ALTA: Funcionario Activo con Noticias Negativas.") + + risk_score = min(risk_score * multiplier, 100) + risk_level = self._get_risk_level(risk_score) + + return RiskReport( + entity=supplier_name, + rpe=rpe, + address=addr, + risk_score=risk_score, + risk_level=risk_level, + factors=list(set(risk_factors)), + evidence={ + 'forensics': self.forensic_risks.get(rpe, []), + 'physical_hub': {'address': addr, 'unique_owner_count': owner_density if addr else 0}, + 'pep': pep_data['details'] if pep_data['is_pep'] else None, + 'payroll': payroll_data['details'] if payroll_data['is_official'] else None + } + ) + + def _check_pep_status(self, rpe: str) -> Dict: + """Check if any representative of the company is a PEP or Omiso.""" + is_pep = False + points = 0 + factors = [] + details = [] + + person_ids = self.company_to_people.get(rpe, []) + if not person_ids: + return {"is_pep": False, "points": 0, "factors": [], "details": []} + + # Attempt to load PEP data (DB or JSON) + pep_registry = self._load_pep_data() + + for pid in person_ids: + person = self.persons.get(pid) + if not person: continue + + norm_name = person['name'].strip().upper() + + # Match in registry + pep_match = pep_registry.get(norm_name) + if pep_match: + is_pep = True + status = pep_match['status'] + inst = pep_match['institution'] + pos = pep_match['position'] + + if status == 'OMISO': + points += 50 + factors.append(f"ALERTA CRÍTICA: Representante {person['name']} es FUNCIONARIO OMISO (No declaró patrimonio) en {inst}.") + else: + points += 20 + factors.append(f"PEP Detectado: Representante {person['name']} ocupa el cargo de {pos} en {inst}.") + + details.append(pep_match) + + return { + "is_pep": is_pep, + "points": min(points, 80), + "factors": factors, + "details": details + } + + def _check_payroll_status(self, rpe: str) -> Dict: + """Check if shareholders are active public officials.""" + is_official = False + points = 0 + factors = [] + details = [] + + person_ids = self.company_to_people.get(rpe, []) + if not person_ids: + return {"is_official": False, "points": 0, "factors": [], "details": []} + + # Load payroll registry (DB or JSON) + payroll_registry = self._load_payroll_data() + + for pid in person_ids: + person = self.persons.get(pid) + if not person: continue + + norm_name = person['name'].strip().upper() + + # Check match + official = payroll_registry.get(norm_name) + if official: + is_official = True + factors.append(f"CONFLICTO DE INTERÉS: Representante {person['name']} figura en nómina de {official['institution']} como {official['position']}.") + details.append(official) + + return { + "is_official": is_official, + "points": 50 if is_official else 0, + "factors": factors, + "details": details + } + + def _load_payroll_data(self) -> Dict: + """Load active payroll data from DB or JSON.""" + registry = {} + # 1. DB + try: + self.pg.connect() + cur = self.pg.conn.cursor(cursor_factory=RealDictCursor) + cur.execute("SELECT * FROM public_officials;") + rows = cur.fetchall() + cur.close() + if rows: + return {r['full_name']: r for r in rows} + except: + pass + + # 2. JSON Fallback + try: + dump_file = Path("data/raw/public_officials_dump.json") + if dump_file.exists(): + with open(dump_file, 'r', encoding='utf-8') as f: + data = json.load(f) + return {item['full_name']: item for item in data} + except Exception as e: + logger.warning(f"Failed to load payroll fallback: {e}") + + return registry + + def _check_payroll_status(self, rpe: str) -> Dict: + """Check if any representative is an active Public Official.""" + is_official = False + points = 0 + factors = [] + details = [] + + person_ids = self.company_to_people.get(rpe, []) + if not person_ids: + return {"is_official": False, "points": 0, "factors": [], "details": []} + + # Load payroll registry + payroll_registry = self._load_payroll_data() + + for pid in person_ids: + person = self.persons.get(pid) + if not person: continue + + norm_name = person['name'].strip().upper() + + official = payroll_registry.get(norm_name) + if official: + is_official = True + factors.append(f"CONFLICTO DE INTERÉS: Representante {person['name']} es Funcionario en {official['institution']} ({official['position']}).") + details.append(official) + + return { + "is_official": is_official, + "points": 0, # Points added in main logic + "factors": factors, + "details": details + } + + def _load_payroll_data(self) -> Dict: + """Load active payroll data from DB or JSON.""" + registry = {} + # 1. DB + try: + self.pg.connect() + cur = self.pg.conn.cursor(cursor_factory=RealDictCursor) + cur.execute("SELECT * FROM public_officials;") + rows = cur.fetchall() + cur.close() + if rows: + return {r['full_name']: r for r in rows} + except: + pass + + # 2. JSON Fallback + try: + dump_file = Path("data/raw/public_officials_dump.json") + if dump_file.exists(): + with open(dump_file, 'r', encoding='utf-8') as f: + data = json.load(f) + return {item['full_name']: item for item in data} + except Exception as e: + logger.warning(f"Failed to load payroll fallback: {e}") + + return registry + + def _load_pep_data(self) -> Dict: + """Helper to load PEP data from DB or local JSON fallback.""" + registry = {} + # 1. Try DB + try: + self.pg.connect() + cur = self.pg.conn.cursor(cursor_factory=RealDictCursor) + cur.execute("SELECT * FROM pep_registry;") + rows = cur.fetchall() + cur.close() + if rows: + return {r['normalized_name']: r for r in rows} + except: + pass + + # 2. Try JSON Fallback + pep_files = list(Path("data/raw/ccrd").glob("pep_extraction_*.json")) + if pep_files: + latest_file = max(pep_files, key=os.path.getctime) + import json + try: + with open(latest_file, 'r', encoding='utf-8') as f: + data = json.load(f) + return {item['normalized_name']: item for item in data} + except: + pass + + return registry + + # Other methods similar, using config thresholds + + def _is_new_company(self, date_str: str) -> bool: + if not date_str: + return False + years = config.get('risk.new_company_years', ['2024-', '2025-', '2026-']) + return any(year in str(date_str) for year in years) + + def _get_risk_level(self, score: int) -> str: + thresholds = config.get('risk.thresholds', {'critical': 75, 'high': 50, 'medium': 25}) + if score >= thresholds['critical']: + return "CRITICAL" + if score >= thresholds['high']: + return "HIGH" + if score >= thresholds['medium']: + return "MEDIUM" + return "LOW" + + def _calculate_network_risk(self, rpe: str) -> Dict: + points = 0 + factors = [] + details = [] + person_ids = self.company_to_people.get(rpe, []) + + for pid in person_ids: + # Check for concentration risk + linked_companies = self.person_to_companies.get(pid, set()) + if len(linked_companies) > 3: + weight = min(len(linked_companies) * 8, 30) + points += weight + p_name = self.persons.get(pid, {}).get('name', 'Unknown') + factors.append(f"Representante en múltiples empresas ({len(linked_companies)}): {p_name}") + details.append({"type": "CONCENTRATION_RISK", "person": p_name, "count": len(linked_companies)}) + + return { + "points": min(points, 75), + "factors": list(set(factors)), + "details": details + } + + def batch_analyze(self, limit: int = 10): + """Analyzes a batch of suppliers and saves results.""" + logger.info(f"Starting batch analysis (limit={limit})...") + proveedores = data_manager.load_latest('proveedores_full_*.json') + if not proveedores: + logger.error("No suppliers found for batch analysis.") + return + + results = [] + for supplier in proveedores[:limit]: + report = self.analyze_supplier(supplier.get('razon_social', 'Unknown'), supplier) + results.append(report.dict()) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + output_file = f"data/risk_report_batch_{timestamp}.json" + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(results, f, indent=2, ensure_ascii=False) + + logger.info(f"Batch analysis complete. Report saved to {output_file}") + return results + + + +if __name__ == "__main__": + service = RiskService() + # Test + proveedores = data_manager.load_latest('proveedores_full_*.json') + if proveedores: + supplier = proveedores[0] + report = service.analyze_supplier(supplier.get('razon_social', 'Unknown'), supplier) + print(json.dumps(report.dict(), indent=2)) diff --git a/modules/dr-anticorruption/src/core/scraper.py b/modules/dr-anticorruption/src/core/scraper.py new file mode 100644 index 0000000..148520a --- /dev/null +++ b/modules/dr-anticorruption/src/core/scraper.py @@ -0,0 +1,45 @@ +import requests +from bs4 import BeautifulSoup +import logging +from typing import List + +logger = logging.getLogger(__name__) + +class ShareholderScraper: + def __init__(self): + self.base_url = "https://www.dgcp.gob.do/constancia/descargar" + self.session = requests.Session() + self.session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + }) + + def fetch_certification_html(self, cert_code: str) -> str: + url = f"{self.base_url}/{cert_code}" + try: + response = self.session.get(url, timeout=15) + response.raise_for_status() + return response.text + except Exception as e: + logger.error(f"Failed to fetch cert {cert_code}: {e}") + return None + + def extract_shareholders(self, html_content: str) -> List[str]: + if not html_content: + return [] + soup = BeautifulSoup(html_content, 'html.parser') + shareholders = [] + # SPA app, data in JS. Use Puppeteer for render. + # TODO: Integrate browser_action or selenium for JS render + # Selectors from debug_cert_page.html or JS parse + # Placeholder + tables = soup.find_all('table', class_='list') + for table in tables: + rows = table.find_all('tr') + for row in rows: + name_cell = row.find(class_='name') + if name_cell: + shareholders.append(name_cell.text.strip()) + logger.info(f"Extracted {len(shareholders)} shareholders") + return shareholders + +# Note: DGCP cert page is SPA, requires JS execution. Use Puppeteer. diff --git a/modules/dr-anticorruption/src/core/social_analyzer.py b/modules/dr-anticorruption/src/core/social_analyzer.py new file mode 100644 index 0000000..fbff102 --- /dev/null +++ b/modules/dr-anticorruption/src/core/social_analyzer.py @@ -0,0 +1,128 @@ +import json +import logging +import time +from typing import List, Dict +from src.core.external.news_client import NewsClient + +# Configuration +THRESHOLD_CO_OCCURRENCE = 1 # Number of shared articles to trigger a link + +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') +logger = logging.getLogger("SocialAnalyzer") + +class SocialAnalyzer: + def __init__(self): + self.news_client = NewsClient() + self.associations = [] # List of (Person A, Person B, Evidence) + + def analyze_connections(self, names: List[str]): + """ + Analyze potential social/business connections between a list of people + by searching for co-occurrences in news. + """ + print(f"\n🧠 ANALYZING SOCIAL CONNECTIONS FOR {len(names)} PEOPLE") + print("-" * 60) + + # We can optimize by searching for pairs, but for the first version, + # we'll fetch news for each and find overlapping URLs. + news_map = {} # name -> list of article URLs + article_data = {} # url -> article title + + for name in names: + print(f"🔍 Fetching news for: {name}...") + hits = self.news_client.search_entity(name, limit=15) + urls = [] + for hit in hits: + url = hit['url'] + urls.append(url) + article_data[url] = hit['title'] + + news_map[name] = set(urls) + time.sleep(1) # Be polite to Google News + + # Cross-reference + print("\n🤝 Finding Overlaps...") + found_links = 0 + for i in range(len(names)): + for j in range(i + 1, len(names)): + name_a = names[i] + name_b = names[j] + + common_urls = news_map[name_a].intersection(news_map[name_b]) + + if common_urls: + found_links += 1 + for url in common_urls: + print(f" ✨ CONNECTION FOUND: {name_a} <--> {name_b}") + print(f" Article: {article_data[url]}") + self.associations.append({ + "person_a": name_a, + "person_b": name_b, + "type": "NEWS_CO_OCCURRENCE", + "article_title": article_data[url], + "url": url + }) + + if found_links == 0: + print(" (No direct co-occurrences found in the 5-year lookback for these spécifiques pairs)") + + return self.associations + + def analyze_legal_risks(self, names: List[str]): + """ + Check if representatives appear in the context of known corruption cases + or legal investigations. + """ + legal_contexts = [ + "Caso Antipulpo", "Caso Calamar", "Caso Coral", "Caso Medusa", + "Procuraduría", "Fiscalía", "Lavado de activos" + ] + + print(f"\n⚖️ CHECKING LEGAL RISKS FOR {len(names)} PEOPLE") + print("-" * 60) + + for name in names: + for context in legal_contexts: + query = f'"{name}" {context}' + print(f" 🔍 Querying: {query}...", end='\r') + + hits = self.news_client.search_entity(query, limit=5) + if hits: + print(f" 🚩 RISK ALERT: {name} found in context of {context}!") + for hit in hits: + # Only score if the title or text actually contains the name + # Google News RSS titles usually contain the query terms + self.associations.append({ + "person": name, + "context": context, + "type": "LEGAL_QUERY_MATCH", + "article_title": hit['title'], + "url": hit['url'] + }) + + return self.associations + + def save_results(self): + timestamp = int(time.time()) + output_file = f"data/legal_social_hits_{timestamp}.json" + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(self.associations, f, indent=2, ensure_ascii=False) + print(f"\n💾 Results saved to: {output_file}") + +if __name__ == "__main__": + analyzer = SocialAnalyzer() + + # Selected target names from previous relationship scan + test_names = [ + "JUAN FRANCISCO MARTE", + "OLGA DILIA SEGURA", + "RAFAEL UCETA ESPINAL", + "DELFIN RAFAEL LOPEZ GOMEZ", + "MAXIMO GOMEZ", + "PEDRO SENADOR", + "JUAN MINISTRO" + ] + + analyzer.analyze_connections(test_names[:4]) # Just a few pairs + analyzer.analyze_legal_risks(test_names) + analyzer.save_results() diff --git a/modules/dr-anticorruption/src/data/data_manager.py b/modules/dr-anticorruption/src/data/data_manager.py new file mode 100644 index 0000000..4658705 --- /dev/null +++ b/modules/dr-anticorruption/src/data/data_manager.py @@ -0,0 +1,109 @@ +import json +import glob +import os +import hashlib +from pathlib import Path +from typing import List, Dict, Any, Optional +from datetime import datetime +import logging + +from src.config.config import config + +logger = logging.getLogger(__name__) + +class DataManager: + def __init__(self): + self.data_dir = Path(config.get('data.dir', 'data')) + self.cache: Dict[str, Any] = {} + self.metadata_file = self.data_dir / 'metadata.json' + self.metadata: Dict[str, Dict] = {} + self._load_metadata() + + def _load_metadata(self): + if self.metadata_file.exists(): + try: + with open(self.metadata_file, 'r') as f: + self.metadata = json.load(f) + except Exception as e: + logger.warning(f"Failed to load metadata: {e}") + + def _save_metadata(self): + try: + with open(self.metadata_file, 'w') as f: + json.dump(self.metadata, f, indent=2) + except Exception as e: + logger.error(f"Failed to save metadata: {e}") + + def save_json(self, data: Any, filename: str): + """Save data to JSON in the data directory.""" + filepath = self.data_dir / filename + try: + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + logger.info(f"Saved {len(data) if isinstance(data, list) else 1} items to {filepath}") + except Exception as e: + logger.error(f"Failed to save {filename}: {e}") + + def load_latest(self, pattern: str) -> List[Dict]: + """ + Load latest JSON matching pattern from data_dir. + Handles API wrappers, caches, updates metadata. + """ + cache_key = pattern + if cache_key in self.cache: + logger.debug(f"Cache hit for {pattern}") + return self.cache[cache_key] + + full_pattern = str(self.data_dir / pattern) + files = glob.glob(full_pattern) + if not files: + logger.warning(f"No files found for {pattern}") + return [] + + latest_file = max(files, key=os.path.getmtime) + logger.info(f"Loading latest: {latest_file}") + + try: + with open(latest_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + # Handle wrappers + content = [] + if isinstance(data, list): + for item in data: + if isinstance(item, dict) and 'payload' in item: + content.extend(item['payload'].get('content', [])) + else: + content.append(item) + elif isinstance(data, dict) and 'payload' in data: + content = data['payload'].get('content', []) + else: + content = [data] if isinstance(data, dict) else data + + # Cache + self.cache[cache_key] = content + + # Metadata + h = hashlib.md5(json.dumps(content, sort_keys=True).encode()).hexdigest() + self.metadata[Path(latest_file).name] = { + 'file': latest_file, + 'timestamp': datetime.fromtimestamp(os.path.getmtime(latest_file)).isoformat(), + 'hash': h, + 'records': len(content) + } + self._save_metadata() + + return content + + except Exception as e: + logger.error(f"Failed to load {latest_file}: {e}") + return [] + + def clear_cache(self): + self.cache.clear() + + def get_metadata(self) -> Dict: + return self.metadata + +# Global instance +data_manager = DataManager() diff --git a/modules/dr-anticorruption/src/data/database.py b/modules/dr-anticorruption/src/data/database.py new file mode 100644 index 0000000..5779b74 --- /dev/null +++ b/modules/dr-anticorruption/src/data/database.py @@ -0,0 +1,88 @@ +import sqlite3 +import logging +from pathlib import Path + +from .data_manager import data_manager +from .models import Proveedor, Contrato, Person, Relationship # Validate later + +logger = logging.getLogger(__name__) + +class Database: + def __init__(self): + self.db_path = Path('data') / 'dr_anticorruption.db' + self.conn = None + self._init_db() + + def __enter__(self): + self.conn = sqlite3.connect(self.db_path) + self.conn.row_factory = sqlite3.Row + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.conn: + self.conn.close() + + def _init_db(self): + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + # Proveedores + cursor.execute(''' + CREATE TABLE IF NOT EXISTS proveedores ( + rpe TEXT PRIMARY KEY, + razon_social TEXT, + direccion TEXT, + contacto TEXT, + correo_contacto TEXT, + telefono_contacto TEXT, + fecha_creacion_empresa TEXT + ) + ''') + # Contratos + cursor.execute(''' + CREATE TABLE IF NOT EXISTS contratos ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + rpe TEXT, + razon_social TEXT, + fecha_creacion_contrato TEXT, + descripcion TEXT, + nombre_unidad_compra TEXT + ) + ''') + # Persons + cursor.execute(''' + CREATE TABLE IF NOT EXISTS persons ( + person_id TEXT PRIMARY KEY, + name TEXT, + normalized_name TEXT, + emails TEXT, -- JSON + phones TEXT, -- JSON + positions TEXT -- JSON + ) + ''') + # Relationships + cursor.execute(''' + CREATE TABLE IF NOT EXISTS relationships ( + person_id TEXT, + rpe TEXT, + company_name TEXT, + relationship_type TEXT DEFAULT 'REPRESENTATIVE_FOR', + position TEXT, + email TEXT, + phone TEXT, + PRIMARY KEY (person_id, rpe) + ) + ''') + # Forensic Hits + cursor.execute(''' + CREATE TABLE IF NOT EXISTS forensic_hits ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + rpe TEXT, + type TEXT, + score REAL, + reason TEXT + ) + ''') + conn.commit() + logger.info(f"DB initialized at {self.db_path}") + +db = Database() diff --git a/modules/dr-anticorruption/src/data/migrate.py b/modules/dr-anticorruption/src/data/migrate.py new file mode 100644 index 0000000..dd40db4 --- /dev/null +++ b/modules/dr-anticorruption/src/data/migrate.py @@ -0,0 +1,75 @@ +import pandas as pd +import sqlite3 +from pathlib import Path +import logging + +from .data_manager import data_manager +from .database import db # To init schema +from .models import Proveedor, Contrato # For validation optional + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def migrate(): + data_dir = Path('data') + db_path = data_dir / 'dr_anticorruption.db' + + with sqlite3.connect(db_path) as conn: + # Init schema if not + from .database import Database + Database() + + # Proveedores + proveedores = data_manager.load_latest('proveedores_full_*.json') + if proveedores: + df_prov = pd.DataFrame(proveedores) + df_prov.to_sql('proveedores', conn, if_exists='replace', index=False) + logger.info(f"Migrated {len(proveedores)} proveedores") + + # Contratos + contratos = data_manager.load_latest('contratos_*.json') + if contratos: + df_contr = pd.DataFrame(contratos) + df_contr.to_sql('contratos', conn, if_exists='replace', index=False) + logger.info(f"Migrated {len(contratos)} contratos") + + # Persons relationships from latest + persons_files = glob.glob('data/persons_*.json') + if persons_files: + latest_persons = max(persons_files, key=os.path.getmtime) + with open(latest_persons, 'r') as f: + persons_data = json.load(f) + df_persons = pd.DataFrame(persons_data) + df_persons.to_sql('persons', conn, if_exists='replace', index=False) + logger.info(f"Migrated persons from {latest_persons}") + + relationships_files = glob.glob('data/relationships_*.json') + if relationships_files: + latest_rel = max(relationships_files, key=os.path.getmtime) + with open(latest_rel, 'r') as f: + rel_data = json.load(f) + df_rel = pd.DataFrame(rel_data) + df_rel.to_sql('relationships', conn, if_exists='replace', index=False) + logger.info(f"Migrated relationships from {latest_rel}") + + # Forensic + if Path('data/versatility_hits.json').exists(): + with open('data/versatility_hits.json') as f: + vers = json.load(f) + df_vers = pd.DataFrame(vers) + df_vers['type'] = 'VERSATILITY' + df_vers.to_sql('forensic_hits', conn, if_exists='append', index=False) + + if Path('data/activation_spikes.json').exists(): + with open('data/activation_spikes.json') as f: + act = json.load(f) + df_act = pd.DataFrame(act) + df_act['type'] = 'ACTIVATION_SPIKE' + df_act['score'] = df_act['concentration'] * 0.4 # Approx + df_act['reason'] = df_act['reason'] if 'reason' in df_act else 'Spike' + df_act.to_sql('forensic_hits', conn, if_exists='append', index=False) + + logger.info("Migration complete") + +if __name__ == '__main__': + migrate() diff --git a/modules/dr-anticorruption/src/data/models.py b/modules/dr-anticorruption/src/data/models.py new file mode 100644 index 0000000..20d3106 --- /dev/null +++ b/modules/dr-anticorruption/src/data/models.py @@ -0,0 +1,81 @@ +from pydantic import BaseModel, Field +from typing import List, Dict, Any, Optional +from datetime import datetime + +class Proveedor(BaseModel): + rpe: str = Field(..., alias='rpe') + razon_social: Optional[str] = Field(None, alias='razon_social') + direccion: Optional[str] = Field(None, alias='direccion') + contacto: Optional[str] = Field(None, alias='contacto') + correo_contacto: Optional[str] = Field(None, alias='correo_contacto') + telefono_contacto: Optional[str] = Field(None, alias='telefono_contacto') + celular_contacto: Optional[str] = Field(None, alias='celular_contacto') + posicion_contacto: Optional[str] = Field(None, alias='posicion_contacto') + fecha_creacion_empresa: Optional[str] = Field(None, alias='fecha_creacion_empresa') + + class Config: + populate_by_name = True + arbitrary_types_allowed = True + +class Contrato(BaseModel): + rpe: Optional[str] = Field(None, alias='rpe') + rpe_proveedor: Optional[str] = Field(None, alias='rpe_proveedor') + razon_social: Optional[str] = Field(None, alias='razon_social') + fecha_creacion_contrato: Optional[str] = Field(None, alias='fecha_creacion_contrato') + descripcion: Optional[str] = Field(None, alias='descripcion') + nombre_unidad_compra: Optional[str] = Field(None, alias='nombre_unidad_compra') + codigo_item: Optional[str] = Field(None, alias='codigo_item') + + class Config: + populate_by_name = True + arbitrary_types_allowed = True + +class Person(BaseModel): + person_id: str + name: str + normalized_name: Optional[str] = None + emails: List[str] = [] + phones: List[str] = [] + positions: List[str] = [] + companies: List[Dict[str, Any]] = [] + +class Relationship(BaseModel): + person_id: str + person_name: str + rpe: str + company_name: str + relationship_type: str = 'REPRESENTATIVE_FOR' + position: Optional[str] = None + email: Optional[str] = None + phone: Optional[str] = None + +class RiskFactor(BaseModel): + factor: str + score: Optional[float] = None + type: Optional[str] = None + +class RiskReport(BaseModel): + entity: str + rpe: str + risk_score: float + risk_level: str + factors: List[str] + evidence: Dict[str, Any] + +class ForensicHit(BaseModel): + rpe: str + risk_score: float + reason: str + +# --- API Request/Response Models --- + +class IngestRequest(BaseModel): + target: str = "all" # 'all', 'contratos', 'proveedores', etc. + +class RiskRequest(BaseModel): + entity_id: str + context: Optional[str] = None + +class BriefRequest(BaseModel): + context: str + template: str = "standard" \ No newline at end of file diff --git a/modules/dr-anticorruption/src/data/postgres.py b/modules/dr-anticorruption/src/data/postgres.py new file mode 100644 index 0000000..16ec80c --- /dev/null +++ b/modules/dr-anticorruption/src/data/postgres.py @@ -0,0 +1,150 @@ +import psycopg2 +from psycopg2.extras import RealDictCursor +import logging +from datetime import datetime +from typing import List, Dict, Any +from src.config.config import config + +logger = logging.getLogger(__name__) + +class PostgresManager: + def __init__(self): + self.dsn = config.get('postgres.dsn', 'postgresql://postgres:postgres@localhost:5432/datalake') + self.conn = None + try: + self.connect() + except Exception as e: + logger.warning(f"Could not connect to Postgres on init: {e}. System will use local fallback.") + + def connect(self): + if self.conn is None or self.conn.closed: + self.conn = psycopg2.connect(self.dsn) + + def _init_db(self): + self.connect() + cur = self.conn.cursor() + # cur.execute("CREATE EXTENSION IF NOT EXISTS timescaledb;") + self.conn.commit() + + # Hypertable stub for contratos (time-partitioned) + cur.execute(""" + CREATE TABLE IF NOT EXISTS dgcp_contratos ( + ingest_time TIMESTAMPTZ NOT NULL, + rpe TEXT, + razon_social TEXT, + monto_total NUMERIC, + fecha_aprobacion DATE, + descripcion TEXT + ); + """) + # cur.execute("SELECT create_hypertable('dgcp_contratos', 'ingest_time', if_not_exists => TRUE);") + + # Regular table for proveedores (static master data) + cur.execute(""" + CREATE TABLE IF NOT EXISTS dgcp_proveedores ( + rpe TEXT PRIMARY KEY, + razon_social TEXT, + direccion TEXT, + fecha_creacion_empresa DATE + ); + """) + + # Tables for Graph/Entity Analysis + cur.execute(""" + CREATE TABLE IF NOT EXISTS risk_persons ( + person_id TEXT PRIMARY KEY, + name TEXT, + normalized_name TEXT + ); + """) + cur.execute(""" + CREATE TABLE IF NOT EXISTS risk_relationships ( + id SERIAL PRIMARY KEY, + person_id TEXT REFERENCES risk_persons(person_id), + rpe TEXT, + company_name TEXT, + relationship_type TEXT, + position TEXT + ); + """) + + self.conn.commit() + cur.close() + logger.info("Postgres hypertables and graph tables initialized.") + + def insert_batch(self, data: List[Dict[str, Any]], table: str): + """Stub for batch insert/upsert.""" + self.connect() + cur = self.conn.cursor() + for item in data: + item['ingest_time'] = datetime.now() + + if table == 'dgcp_contratos': + cur.execute(""" + INSERT INTO dgcp_contratos (ingest_time, rpe, razon_social, monto_total, fecha_aprobacion, descripcion) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (rpe) DO NOTHING; + """, (item.get('ingest_time'), item.get('rpe'), item.get('razon_social'), + item.get('monto_total'), item.get('fecha_aprobacion'), item.get('descripcion'))) + + elif table == 'risk_persons': + cur.execute(""" + INSERT INTO risk_persons (person_id, name, normalized_name) + VALUES (%s, %s, %s) + ON CONFLICT (person_id) DO NOTHING; + """, (item.get('person_id'), item.get('name'), item.get('normalized_name'))) + + elif table == 'risk_relationships': + cur.execute(""" + INSERT INTO risk_relationships (person_id, rpe, company_name, relationship_type, position) + VALUES (%s, %s, %s, %s, %s); + """, (item.get('person_id'), item.get('rpe'), item.get('company_name'), + item.get('relationship_type'), item.get('position'))) + self.conn.commit() + cur.close() + logger.info(f"Inserted {len(data)} records to {table}") + + def query_last_ingest(self, table: str) -> str: + """Get timestamp of last ingest for delta logic.""" + self.connect() + cur = self.conn.cursor() + cur.execute(f"SELECT max(ingest_time) FROM {table};") + result = cur.fetchone() + cur.close() + return result[0].isoformat() if result[0] else None + + def query_graph_data(self): + """Fetch all graph nodes and edges for in-memory analysis.""" + self.connect() + cur = self.conn.cursor(cursor_factory=RealDictCursor) + + # Ensure tables exist (lazy init for demo) + self._init_db() + + try: + cur.execute("SELECT * FROM risk_persons;") + persons = cur.fetchall() + + cur.execute("SELECT * FROM risk_relationships;") + relationships = cur.fetchall() + + # dgcp_proveedores might be empty if we haven't ingested it into DB yet + # return empty list if table empty or not found + try: + cur.execute("SELECT rpe, razon_social, direccion FROM dgcp_proveedores;") + providers = cur.fetchall() + except: + self.conn.rollback() + providers = [] + + return persons, relationships, providers + except Exception as e: + self.conn.rollback() + logger.error(f"Error querying graph data: {e}") + return [], [], [] + finally: + cur.close() + + def close(self): + if self.conn: + self.conn.close() \ No newline at end of file diff --git a/modules/dr-anticorruption/src/data/s3_manager.py b/modules/dr-anticorruption/src/data/s3_manager.py new file mode 100644 index 0000000..411b457 --- /dev/null +++ b/modules/dr-anticorruption/src/data/s3_manager.py @@ -0,0 +1,42 @@ +import boto3 +from botocore.client import Config +from datetime import datetime +import json +import logging +from typing import Any, List +from src.config.config import config + +logger = logging.getLogger(__name__) + +class S3Manager: + def __init__(self): + endpoint = config.get('minio.endpoint', 'http://localhost:9000') + access_key = config.get('minio.access_key', 'minioadmin') + secret_key = config.get('minio.secret_key', 'minioadmin') + self.bucket = config.get('minio.bucket', 's3-raw') + self.s3 = boto3.client( + 's3', + endpoint_url=endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + config=Config(signature_version='s3v4') + ) + self._ensure_bucket() + + def _ensure_bucket(self): + try: + self.s3.head_bucket(Bucket=self.bucket) + except: + self.s3.create_bucket(Bucket=self.bucket) + logger.info(f"Created bucket {self.bucket}") + + def upload_json(self, data: List[dict[str, Any]], key: str): + json_str = json.dumps(data, default=str, ensure_ascii=False, indent=2) + self.s3.put_object(Bucket=self.bucket, Key=key, Body=json_str.encode('utf-8')) + logger.info(f"Uploaded {len(data)} records to s3://{self.bucket}/{key}") + + def get_partitioned_key(self, data_type: str, ingest_date: str = None) -> str: + if ingest_date is None: + ingest_date = datetime.now().strftime('%Y-%m-%d') + year, month, day = ingest_date.split('-') + return f"raw/{data_type}/year={year}/month={month}/day={day}/{data_type}_{ingest_date}.json" diff --git a/modules/dr-anticorruption/src/scheduler.py b/modules/dr-anticorruption/src/scheduler.py new file mode 100644 index 0000000..482919d --- /dev/null +++ b/modules/dr-anticorruption/src/scheduler.py @@ -0,0 +1,33 @@ +from apscheduler.schedulers.background import BackgroundScheduler +import time +import logging +from datetime import datetime, timedelta +from src.core.ingestion import IngestionService +from src.data.s3_manager import S3Manager + +logger = logging.getLogger(__name__) + +def daily_delta_ingest(): + end_date = datetime.now().date() + start_date = end_date - timedelta(days=1) + start_str = start_date.strftime('%Y-%m-%d') + end_str = end_date.strftime('%Y-%m-%d') + logger.info(f"Running daily delta ingest from {start_str} to {end_str}") + service = IngestionService(start_date=start_str, end_date=end_str) + # TODO: Update service to use dates in fetch_all_pages params + service.ingest_proveedores() + # Add other ingests: contratos, procesos, etc. + s3 = S3Manager() + # TODO: service.save_to_s3() + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + scheduler = BackgroundScheduler() + scheduler.add_job(daily_delta_ingest, 'cron', hour=2, minute=0, id='daily_ingest') + scheduler.start() + logger.info('Data Lake Scheduler started. Press Ctrl+C to exit.') + try: + while True: + time.sleep(1) + except (KeyboardInterrupt, SystemExit): + scheduler.shutdown() \ No newline at end of file diff --git a/modules/dr-anticorruption/src/tests/test_risk.py b/modules/dr-anticorruption/src/tests/test_risk.py new file mode 100644 index 0000000..e76df6b --- /dev/null +++ b/modules/dr-anticorruption/src/tests/test_risk.py @@ -0,0 +1,17 @@ +import pytest +from src.core.risk_service import RiskService +from src.data.data_manager import data_manager + +def test_analyze_supplier(): + service = RiskService() + proveedores = data_manager.load_latest('proveedores_full_*.json') + if proveedores: + supplier = proveedores[0] + report = service.analyze_supplier(supplier['razon_social'], supplier) + assert report.risk_score >= 0 + assert report.risk_level in ['LOW', 'MEDIUM', 'HIGH', 'CRITICAL'] + else: + pytest.skip("No proveedores data") + +if __name__ == "__main__": + pytest.main([__file__])