Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ version-file = "src/apkit/_version.py"
dev = [
"coverage>=7.10.7",
"pytest>=8.4.1",
"pytest-asyncio>=1.3.0",
"pytest-cov>=7.0.0",
]
docs = [
Expand Down
71 changes: 71 additions & 0 deletions tests/helper/test_inbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import json

import pytest
from apkit.config import AppConfig
from apkit.helper.inbox import InboxVerifier
from apsig import draft
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa


def _prepare_signed_request():
# prepare private and public keys
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key_pem = (
private_key.public_key()
.public_bytes(
encoding=crypto_serialization.Encoding.PEM,
format=crypto_serialization.PublicFormat.SubjectPublicKeyInfo,
)
.decode("utf-8")
)

# create an activity with an embedded Actor
body_json = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": "https://example.com/likes",
"type": "Like",
"actor": {
"id": "https://example.com/actor",
"type": "Person",
"publicKey": {
"id": "http://example.com/actor#key",
"owner": "https://example.com/actor",
"publicKeyPem": public_key_pem,
},
},
"object": "https://example.com/5",
}

body = json.dumps(body_json).encode("utf-8")

url = "http://example.com/ap"
method = "POST"
headers = {"host": "example.com"}

# sign the request
signer = draft.Signer(
headers=headers,
method=method,
url=url,
key_id="http://example.com/actor#key",
private_key=private_key,
body=body,
)
headers = signer.sign()

# the verifier expects all HTTP header names in lower case
headers["signature"] = headers["Signature"]

return (body, url, method, headers)


@pytest.mark.asyncio
async def test_verify_draft_http_signature():
(body, url, method, headers) = _prepare_signed_request()

config = AppConfig()
inbox_verifier = InboxVerifier(config)

result = await inbox_verifier.verify(body, url, method, headers)
assert result
128 changes: 128 additions & 0 deletions tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import time
from typing import Any
import pytest

from apkit.kv import KeyValueStore
from apkit.cache import Cache


class FakeKeyValueStore(KeyValueStore[Any, Any]):
"""Minimal in-memory KeyValueStore for tests."""

def __init__(self):
self._data = {}

def get(self, key):
return self._data.get(key)

def set(self, key, value, ttl_seconds = None):
self._data[key] = value

def delete(self, key):
self._data.pop(key, None)

def exists(self, key):
return key in self._data

async def async_get(self, key):
return self.get(key)

async def async_set(self, key, value, ttl_seconds = None):
self.set(key, value)

async def async_delete(self, key):
self.delete(key)

async def async_exists(self, key):
return self.exists(key)

@pytest.fixture
def store():
return FakeKeyValueStore()


@pytest.fixture
def cache(store):
return Cache(store)


def test_set_and_get_without_ttl(cache):
cache.set("a", "value", ttl=None)
assert cache.get("a") == "value"


def test_set_and_get_with_ttl_not_expired(cache, monkeypatch):
monkeypatch.setattr(time, "time", lambda: 1000.0)
cache.set("a", "value", ttl=10)

monkeypatch.setattr(time, "time", lambda: 1005.0)
assert cache.get("a") == "value"


def test_get_expired_item(cache, monkeypatch):
monkeypatch.setattr(time, "time", lambda: 1000.0)
cache.set("a", "value", ttl=5)

monkeypatch.setattr(time, "time", lambda: 1006.0)
assert cache.get("a") is None
assert not cache.exists("a")


def test_set_with_non_positive_ttl_deletes(cache):
cache.set("a", "value", ttl=0)
assert cache.get("a") is None

cache.set("b", "value", ttl=-5)
assert cache.get("b") is None


def test_delete(cache):
cache.set("a", "value", ttl=None)
cache.delete("a")
assert cache.get("a") is None


def test_exists_true(cache):
cache.set("a", "value", ttl=None)
assert cache.exists("a") is True


def test_exists_false_for_missing_key(cache):
assert cache.exists("missing") is False


def test_exists_false_for_expired_item(cache, monkeypatch):
monkeypatch.setattr(time, "time", lambda: 1000.0)
cache.set("a", "value", ttl=1)

monkeypatch.setattr(time, "time", lambda: 1002.0)
assert cache.exists("a") is False


@pytest.mark.asyncio
async def test_async_set_and_get(cache):
await cache.async_set("a", "value", ttl=None)
result = await cache.async_get("a")
assert result == "value"


@pytest.mark.asyncio
async def test_async_get_expired(cache, monkeypatch):
monkeypatch.setattr(time, "time", lambda: 1000.0)
await cache.async_set("a", "value", ttl=1)

monkeypatch.setattr(time, "time", lambda: 1002.0)
assert await cache.async_get("a") is None


@pytest.mark.asyncio
async def test_async_exists(cache):
await cache.async_set("a", "value", ttl=None)
assert await cache.async_exists("a") is True


@pytest.mark.asyncio
async def test_async_delete(cache):
await cache.async_set("a", "value", ttl=None)
await cache.async_delete("a")
assert await cache.async_get("a") is None
15 changes: 15 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.