Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sdk/python/aaip/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
AAIP exception hierarchy.
"""


class AAIPError(Exception):
"""Base exception for all AAIP-specific errors."""
pass


class IdentityDecryptionError(AAIPError):
"""Raised when identity decryption fails (e.g., wrong passphrase)."""
pass
114 changes: 107 additions & 7 deletions sdk/python/aaip/identity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import hashlib
import json
import logging
import os
import secrets
import time
from pathlib import Path
Expand All @@ -27,6 +29,9 @@
# Optional fast path
# ---------------------------------------------------------------------------

logger = logging.getLogger(__name__)


def _has_cryptography() -> bool:
try:
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
Expand Down Expand Up @@ -77,22 +82,117 @@ def generate(cls) -> "AgentIdentity":
def load_or_create(cls, path: str = IDENTITY_FILE) -> "AgentIdentity":
p = Path(path)
if p.exists():
d = json.loads(p.read_text())
seed = bytes.fromhex(d["private_key_hex"])
pub = bytes.fromhex(d["public_key_hex"])
return cls(seed, pub)
try:
d = json.loads(p.read_text())
except json.JSONDecodeError as e:
from ..exceptions import IdentityDecryptionError
raise IdentityDecryptionError(
f"Identity file contains invalid JSON: {e}"
) from e
# Check if identity is encrypted
if "private_key_encrypted" in d:
# Encrypted identity requires passphrase
passphrase = os.environ.get("AAIP_IDENTITY_PASSPHRASE")
if not passphrase or passphrase.strip() == "":
from ..exceptions import IdentityDecryptionError
raise IdentityDecryptionError(
"Identity is encrypted but AAIP_IDENTITY_PASSPHRASE is not set."
)
# Validate required fields
required: tuple[str, ...] = ("public_key_hex",)
for field in required:
if field not in d:
from ..exceptions import IdentityDecryptionError
raise IdentityDecryptionError(
f"Encrypted identity missing required field: {field}"
)
# Decrypt the seed
from ._encryption import decrypt_seed
try:
seed = decrypt_seed(d, passphrase)
except Exception as e:
from ..exceptions import IdentityDecryptionError
raise IdentityDecryptionError(
f"Decryption failed: {e}"
) from e
try:
pub = bytes.fromhex(d["public_key_hex"])
except ValueError as e:
from ..exceptions import IdentityDecryptionError
raise IdentityDecryptionError(
f"Invalid hex in public_key_hex: {e}"
) from e
identity = cls(seed, pub)
logger.info("Loaded encrypted identity")
return identity
else:
# Plaintext identity
required = ("private_key_hex", "public_key_hex")
for field in required:
if field not in d:
from ..exceptions import IdentityDecryptionError
raise IdentityDecryptionError(
f"Plaintext identity missing required field: {field}"
)
try:
seed = bytes.fromhex(d["private_key_hex"])
except ValueError as e:
from ..exceptions import IdentityDecryptionError
raise IdentityDecryptionError(
f"Invalid hex in private_key_hex: {e}"
) from e
try:
pub = bytes.fromhex(d["public_key_hex"])
except ValueError as e:
from ..exceptions import IdentityDecryptionError
raise IdentityDecryptionError(
f"Invalid hex in public_key_hex: {e}"
) from e
identity = cls(seed, pub)
# Warn if passphrase is set (should encrypt)
passphrase = os.environ.get("AAIP_IDENTITY_PASSPHRASE")
if passphrase and passphrase.strip() != "":
logger.info(
"Identity is plaintext; it will be encrypted on next save."
)
else:
logger.warning(
"Private key stored without encryption. "
"Set AAIP_IDENTITY_PASSPHRASE for production security."
)
return identity
identity = cls.generate()
identity.save(path)
return identity

def save(self, path: str = IDENTITY_FILE) -> None:
Path(path).write_text(json.dumps({
passphrase = os.environ.get("AAIP_IDENTITY_PASSPHRASE")
if passphrase is not None:
stripped = passphrase.strip()
if stripped and len(stripped) < 8:
raise ValueError("Passphrase must be at least 8 characters")
data = {
"aaip_version": "1.0",
"created_at": int(time.time()),
"agent_id": self.agent_id,
"public_key_hex": self.public_key_hex,
"private_key_hex": self._seed.hex(),
}, indent=2))
}
if passphrase and passphrase.strip() != "":
# Encrypt the seed
from ._encryption import encrypt_seed
encrypted = encrypt_seed(self._seed, passphrase)
data.update(encrypted)
# Keep private_key_hex empty to avoid confusion
data["private_key_hex"] = ""
logger.info("Saved encrypted identity")
else:
# Plaintext storage (backward compatibility)
data["private_key_hex"] = self._seed.hex()
logger.warning(
"Private key stored without encryption. "
"Set AAIP_IDENTITY_PASSPHRASE for production security."
)
Path(path).write_text(json.dumps(data, indent=2))

# ── sign / verify ────────────────────────────────────────────────

Expand Down
143 changes: 143 additions & 0 deletions sdk/python/aaip/identity/_encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""
Private key encryption utilities for AAIP identity.

Uses AES-256-GCM with PBKDF2 key derivation.
Reuses the `cryptography` dependency already required by the identity module.
"""

import base64
import secrets
from typing import Dict, Any

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes

from ..exceptions import IdentityDecryptionError


def derive_key(passphrase: str, salt: bytes) -> bytes:
"""
Derive a 32-byte AES‑256 key from a passphrase using PBKDF2‑HMAC‑SHA256.

Args:
passphrase: The passphrase as a string.
salt: 16‑byte random salt.

Returns:
32‑byte key suitable for AES‑256.
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100_000,
)
return kdf.derive(passphrase.encode("utf-8"))


def encrypt_seed(seed: bytes, passphrase: str) -> Dict[str, Any]:
"""
Encrypt a 32‑byte ed25519 seed with AES‑256‑GCM.

Args:
seed: The 32‑byte private seed.
passphrase: Passphrase for key derivation.

Returns:
Dictionary with base64‑encoded encrypted fields:
{
"private_key_encrypted": "...",
"encryption_salt": "...",
"encryption_iv": "...",
"encryption_tag": "...",
"encryption_method": "AES-256-GCM-PBKDF2",
"encryption_version": 1
}
"""
if len(seed) != 32:
raise ValueError("Seed must be 32 bytes")

# Generate random salt (16 bytes) and nonce (12 bytes for AES‑GCM)
salt = secrets.token_bytes(16)
iv = secrets.token_bytes(12)

# Derive key
key = derive_key(passphrase, salt)

# Encrypt
aesgcm = AESGCM(key)
ciphertext_with_tag = aesgcm.encrypt(iv, seed, None)

# Split ciphertext and tag (GCM tag is last 16 bytes)
ciphertext = ciphertext_with_tag[:-16]
tag = ciphertext_with_tag[-16:]

return {
"private_key_encrypted": base64.b64encode(ciphertext).decode("ascii"),
"encryption_salt": base64.b64encode(salt).decode("ascii"),
"encryption_iv": base64.b64encode(iv).decode("ascii"),
"encryption_tag": base64.b64encode(tag).decode("ascii"),
"encryption_method": "AES-256-GCM-PBKDF2",
"encryption_version": 1,
}


def decrypt_seed(data: Dict[str, Any], passphrase: str) -> bytes:
"""
Decrypt a seed from the encrypted‑fields dictionary.

Args:
data: Dictionary as returned by `encrypt_seed`.
passphrase: The same passphrase used for encryption.

Returns:
The original 32‑byte seed.

Raises:
IdentityDecryptionError: If decryption fails (wrong passphrase,
corrupted data, or missing fields).
"""
try:
# Validate required fields
required = (
"private_key_encrypted",
"encryption_salt",
"encryption_iv",
"encryption_tag",
"encryption_method",
"encryption_version",
)
for field in required:
if field not in data:
raise ValueError(f"Missing field: {field}")

if data["encryption_method"] != "AES-256-GCM-PBKDF2":
raise ValueError(f"Unsupported method: {data['encryption_method']}")
if int(data["encryption_version"]) != 1:
raise ValueError(f"Unsupported version: {data['encryption_version']}")

# Decode base64
ciphertext = base64.b64decode(data["private_key_encrypted"])
salt = base64.b64decode(data["encryption_salt"])
iv = base64.b64decode(data["encryption_iv"])
tag = base64.b64decode(data["encryption_tag"])

# Reconstruct ciphertext + tag
ciphertext_with_tag = ciphertext + tag

# Derive key
key = derive_key(passphrase, salt)

# Decrypt
aesgcm = AESGCM(key)
seed = aesgcm.decrypt(iv, ciphertext_with_tag, None)

if len(seed) != 32:
raise ValueError("Decrypted seed length mismatch")

return seed

except Exception as e:
# Re‑raise as IdentityDecryptionError
raise IdentityDecryptionError(f"Decryption failed: {e}") from e
Loading