Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,18 @@ CREATE TABLE IF NOT EXISTS audit_logs (
action VARCHAR(100) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS bank_connections (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
connector_id VARCHAR(50) NOT NULL,
connection_id VARCHAR(255) NOT NULL UNIQUE,
account_id VARCHAR(255),
account_name VARCHAR(255),
institution_name VARCHAR(255),
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
last_sync_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_bank_connections_user ON bank_connections(user_id);
16 changes: 16 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,19 @@ class AuditLog(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class BankConnection(db.Model):
"""Stores bank connection metadata for users."""
__tablename__ = "bank_connections"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
connector_id = db.Column(db.String(50), nullable=False)
connection_id = db.Column(db.String(255), nullable=False, unique=True)
account_id = db.Column(db.String(255), nullable=True)
account_name = db.Column(db.String(255), nullable=True)
institution_name = db.Column(db.String(255), nullable=True)
status = db.Column(db.String(20), default="PENDING", nullable=False)
last_sync_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .bank_connections import bp as bank_connections_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(bank_connections_bp, url_prefix="/bank")
295 changes: 295 additions & 0 deletions packages/backend/app/routes/bank_connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
"""
Bank Connections API

Provides endpoints for managing bank connections and syncing transactions.
"""

from datetime import datetime

from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt_identity

from ..extensions import db
from ..models import BankConnection, Expense
from ..services.bank_connector import (
BankConnector,
ConnectorRegistry,
ConnectionResult,
ConnectionStatus,
MockBankConnector,
)

bp = Blueprint("bank_connections", __name__)


def _connector_to_dict(connector: BankConnector) -> dict:
"""Convert connector to dict for API response."""
return {
"connector_id": connector.connector_id,
"connector_name": connector.connector_name,
"supported_features": connector.supported_features,
}


def _connection_to_dict(connection: BankConnection) -> dict:
"""Convert BankConnection model to dict for API response."""
return {
"id": connection.id,
"connector_id": connection.connector_id,
"connection_id": connection.connection_id,
"account_id": connection.account_id,
"account_name": connection.account_name,
"institution_name": connection.institution_name,
"status": connection.status,
"last_sync_at": connection.last_sync_at.isoformat() if connection.last_sync_at else None,
"created_at": connection.created_at.isoformat(),
"updated_at": connection.updated_at.isoformat(),
}


@bp.get("/connectors")
@jwt_required()
def list_connectors():
"""List all available bank connectors."""
connectors = [
_connector_to_dict(ConnectorRegistry.create_instance(conn_id))
for conn_id in ConnectorRegistry.list_connectors()
]
return jsonify(connectors)


@bp.post("/connections")
@jwt_required()
def create_connection():
"""Create a new bank connection."""
uid = int(get_jwt_identity())
data = request.get_json() or {}

connector_id = data.get("connector_id")
credentials = data.get("credentials", {})

if not connector_id:
return jsonify(error="connector_id required"), 400

# Get connector
connector = ConnectorRegistry.create_instance(connector_id)
if not connector:
return jsonify(error="Unknown connector"), 400

# Validate credentials
is_valid, error_msg = connector.validate_credentials(credentials)
if not is_valid:
return jsonify(error=error_msg), 400

# Connect to bank
result = connector.connect(credentials)
if not result.success:
return jsonify(error=result.message, code=result.error_code), 400

# Store connection
connection_id = f"conn_{connector_id}_{uid}_{datetime.utcnow().timestamp()}"
bank_conn = BankConnection(
user_id=uid,
connector_id=connector_id,
connection_id=connection_id,
account_id=result.accounts[0].account_id if result.accounts else None,
account_name=result.accounts[0].account_name if result.accounts else None,
institution_name=result.accounts[0].institution_name if result.accounts else None,
status=ConnectionStatus.CONNECTED.value,
)
db.session.add(bank_conn)
db.session.commit()

return jsonify(_connection_to_dict(bank_conn)), 201


@bp.get("/connections")
@jwt_required()
def list_connections():
"""List all bank connections for the current user."""
uid = int(get_jwt_identity())
connections = BankConnection.query.filter_by(user_id=uid).all()
return jsonify([_connection_to_dict(c) for c in connections])


@bp.get("/connections/<int:connection_id>")
@jwt_required()
def get_connection(connection_id: int):
"""Get a specific bank connection."""
uid = int(get_jwt_identity())
connection = BankConnection.query.filter_by(id=connection_id, user_id=uid).first()
if not connection:
return jsonify(error="Connection not found"), 404
return jsonify(_connection_to_dict(connection))


@bp.delete("/connections/<int:connection_id>")
@jwt_required()
def delete_connection(connection_id: int):
"""Delete a bank connection."""
uid = int(get_jwt_identity())
connection = BankConnection.query.filter_by(id=connection_id, user_id=uid).first()
if not connection:
return jsonify(error="Connection not found"), 404

# Get connector and disconnect
connector = ConnectorRegistry.create_instance(connection.connector_id)
if connector:
connector.disconnect(connection.connection_id)

db.session.delete(connection)
db.session.commit()

return jsonify(message="Connection deleted")


@bp.post("/connections/<int:connection_id>/import")
@jwt_required()
def import_transactions(connection_id: int):
"""Import transactions from a bank connection."""
uid = int(get_jwt_identity())
data = request.get_json() or {}

connection = BankConnection.query.filter_by(id=connection_id, user_id=uid).first()
if not connection:
return jsonify(error="Connection not found"), 404

# Parse dates
start_date = None
end_date = None
if data.get("start_date"):
try:
start_date = datetime.fromisoformat(data["start_date"])
except ValueError:
return jsonify(error="Invalid start_date"), 400
if data.get("end_date"):
try:
end_date = datetime.fromisoformat(data["end_date"])
except ValueError:
return jsonify(error="Invalid end_date"), 400

# Get connector and import
connector = ConnectorRegistry.create_instance(connection.connector_id)
if not connector:
return jsonify(error="Connector not found"), 500

result = connector.import_transactions(
connection.connection_id,
start_date=start_date,
end_date=end_date,
)

if not result.success:
return jsonify(error=result.message, code=result.error_code), 400

# Convert transactions to expense format
expenses_data = [t.to_expense_dict() for t in result.transactions]

return jsonify({
"message": result.message,
"transactions": expenses_data,
"count": len(expenses_data),
})


@bp.post("/connections/<int:connection_id>/refresh")
@jwt_required()
def refresh_transactions(connection_id: int):
"""Refresh transactions from a bank connection."""
uid = int(get_jwt_identity())
data = request.get_json() or {}

connection = BankConnection.query.filter_by(id=connection_id, user_id=uid).first()
if not connection:
return jsonify(error="Connection not found"), 404

# Parse 'since' date
since = None
if data.get("since"):
try:
since = datetime.fromisoformat(data["since"])
except ValueError:
return jsonify(error="Invalid since date"), 400
elif connection.last_sync_at:
since = connection.last_sync_at

# Get connector and refresh
connector = ConnectorRegistry.create_instance(connection.connector_id)
if not connector:
return jsonify(error="Connector not found"), 500

result = connector.refresh_transactions(
connection.connection_id,
since=since,
)

if not result.success:
return jsonify(error=result.message, code=result.error_code), 400

# Update last sync time
connection.last_sync_at = datetime.utcnow()
db.session.commit()

# Convert transactions to expense format
expenses_data = [t.to_expense_dict() for t in result.transactions]

return jsonify({
"message": result.message,
"transactions": expenses_data,
"count": len(expenses_data),
})


@bp.post("/connections/<int:connection_id>/import/commit")
@jwt_required()
def commit_imported_transactions(connection_id: int):
"""Commit imported transactions as expenses."""
uid = int(get_jwt_identity())
data = request.get_json() or {}

connection = BankConnection.query.filter_by(id=connection_id, user_id=uid).first()
if not connection:
return jsonify(error="Connection not found"), 404

transactions = data.get("transactions", [])
if not transactions:
return jsonify(error="No transactions to import"), 400

inserted = 0
duplicates = 0

for tx in transactions:
# Check for duplicate (same date, amount, description)
existing = Expense.query.filter_by(
user_id=uid,
spent_at=tx.get("date"),
amount=tx.get("amount"),
notes=tx.get("description"),
).first()

if existing:
duplicates += 1
continue

expense = Expense(
user_id=uid,
amount=tx.get("amount"),
currency=tx.get("currency", "USD"),
expense_type=tx.get("expense_type", "EXPENSE"),
category_id=tx.get("category_id"),
notes=tx.get("description", ""),
spent_at=tx.get("date"),
)
db.session.add(expense)
inserted += 1

db.session.commit()

# Update last sync time
connection.last_sync_at = datetime.utcnow()
db.session.commit()

return jsonify({
"inserted": inserted,
"duplicates": duplicates,
}), 201
Loading