Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1077ec2
Deduplicate merger WIP
Great-Frosty Dec 16, 2025
1967175
Name fix.
Great-Frosty Dec 16, 2025
60f1a77
More tests.
Great-Frosty Dec 16, 2025
29bd636
dedup WIP.
Great-Frosty Dec 16, 2025
3ed4a09
More tests.
Great-Frosty Dec 16, 2025
aa2b608
More tests and minor after fixes.
Great-Frosty Dec 16, 2025
c6184db
Minor refactor.
Great-Frosty Dec 16, 2025
b0fa3ea
Readme updated.
Great-Frosty Dec 16, 2025
8c00915
Linter fixes.
Great-Frosty Dec 17, 2025
c70c6c4
black fixes.
Great-Frosty Dec 17, 2025
43b06c7
Minor redis fix.
Great-Frosty Feb 5, 2026
0f11803
Minor parrallelism fix.
Great-Frosty Feb 5, 2026
2086519
Switched to orjson.
Great-Frosty Feb 5, 2026
795c574
Refactor into separate merger modules.
Great-Frosty Feb 5, 2026
e3fdefb
Refactor continues.
Great-Frosty Feb 7, 2026
076c101
Test cleanup.
Great-Frosty Feb 7, 2026
70e0006
Minor cleanup.
Great-Frosty Feb 7, 2026
8032ce3
If subfeed is sync - throw it into thread.
Great-Frosty Feb 7, 2026
efece1c
Dedup runtime separated.
Great-Frosty Feb 8, 2026
fd2201b
More test coverage.
Great-Frosty Feb 8, 2026
65601b9
Even more test coverage.
Great-Frosty Feb 8, 2026
86be2a4
Tests for when one subfeed is empty.
Great-Frosty Feb 8, 2026
218e927
Minor cleanup
Great-Frosty Feb 8, 2026
4e6bfef
Remove some boilerplate from mergers.
Great-Frosty Feb 8, 2026
e3310f0
Patch for positional leak when underfetched.
Great-Frosty Feb 8, 2026
051373a
Minor bugfixes and formatting.
Great-Frosty Feb 9, 2026
ec63967
Readme added.
Great-Frosty Feb 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
# SmartFeed Architecture (medium-brief)

## 1) What SmartFeed does

SmartFeed builds one paginated feed from multiple client-provided sources (“subfeeds”) using a declarative tree config:

- **Leaf**: `SubFeed` (calls one client method)
- **Mergers**: compose children (`append`, `distribute`, `positional`, `percentage`, `percentage_gradient`, `view_session`)
- **Wrapper**: `MergerDeduplication` (changes execution semantics around one child)

Core runtime:

- parse config -> create request `ExecutionContext` -> run tree via shared `Executor` -> return `FeedResult` + `next_page`.


## 2) Public surfaces and core data types

### Public entrypoint

- `FeedManager(config, methods_dict, redis_client=None)`
- `get_data(user_id, limit, next_page, **params) -> FeedResult`

`methods_dict` maps config `method_name` strings to host-app callables.

### Config schema surface

`smartfeed.schemas` keeps stable imports for:

- `FeedConfig`: top-level model (`version`, `feed`)
- `FeedTypes`: discriminated union by `type`

### Cursor / pagination models

- `FeedResultNextPageInside`: one node cursor (`page`, `after`)
- `FeedResultNextPage`: full-tree cursor map (`data: {node_id -> FeedResultNextPageInside}`)

### Result models

- `FeedResultClient`: required return type of client subfeed methods
- `FeedResult`: normalized return type of any SmartFeed node


## 3) Node interface contract

All nodes inherit `BaseFeedConfigModel` and are executed through:

- `get_data(methods_dict, user_id, limit, next_page, redis_client=None, ctx=None, **params) -> FeedResult`

Important notes:

- If a node implements `build_plan(...)`, executor uses the plan path.
- Base `get_data(...)` delegates back to executor and expects `build_plan(...)` to exist.
- Every node has `dedup_priority: int` (used by dedup arbitration/refill ordering).


## 4) ExecutionContext

`ExecutionContext` is per-request state propagated through the tree:

- `methods_dict`, `user_id`, `redis_client`
- `executor` (lazy via `ensure_executor()`)
- optional policy/settings:
- `dedup`: `DeduplicationPolicy` when dedup wrapper is active
- `refill_settings`: `RefillExecutionSettings(overfetch_factor, max_refill_loops)`

Responsibilities:

- centralize shared plumbing (executor + redis client)
- keep execution policies out of user params


## 5) Executor (runtime engine)

Primary entry:

- `Executor.run(node, ctx, limit, next_page, **params) -> FeedResult`

Execution strategy:

1. **Plan-first**
- `build_plan(...)` -> execute returned `Plan`
- otherwise call node `get_data(...)`
2. **Centralized concurrency**
- child runs use executor-managed `asyncio.gather(...)`
3. **Dedup/refill hooks**
- for non-slot nodes with `ctx.dedup`, run `DedupRuntime.run_node_with_dedup_refill(...)`
- for `SlotsPlan`, dedup/refill is handled inside slot execution

`SlotsPlan` execution highlights:

1. collect unique owners + demand per owner
2. fetch owners concurrently (with optional `owner_fetch_limits` overrides)
3. merge only changed cursor keys (`CursorMap.merge_delta`)
4. apply:
- dedup arbitration + refill (`apply_slots_plan_dedup`) when `ctx.dedup` exists
- refill-only deficits (`apply_slots_plan_refill`) when only `ctx.refill_settings` exists
5. consume slot schedule and call `assemble(...)`

When dedup is active for a slots plan, owners are executed with `dedup=None` in owner context so global arbitration stays centralized.


## 6) Plans: declarative execution

Plans separate “what to run” from “how to run it”.

- `CallablePlan(fn)`
- node-provided async function with custom flow, still executed by executor

- `SlotsPlan(ctx, limit, next_page, params, slots, assemble, owner_fetch_limits=None)`
- `slots`: ordered `SlotSpec(owner, max_count)` schedule
- `assemble(output, merged_next_page, owner_results)`: builds final `FeedResult`


## 7) Mergers and leaf responsibilities

### SubFeed (leaf)

- derives its local cursor from `next_page.data[subfeed_id]` (defaults page=1/after=None)
- calls `methods_dict[method_name]`
- passes only params present in method signature + `subfeed_params`
- async methods are awaited; sync methods run via `asyncio.to_thread(...)`
- `raise_error=False` converts method failure into empty `FeedResultClient`
- optional `shuffle` then normalizes to `FeedResult`

### Slot-based mergers

These build `SlotsPlan`:

- `MergerAppend`: concatenation (optional shuffle)
- `MergerAppendDistribute` (`type="merger_distribute"`): append then redistribute by `distribution_key`
- `MergerPositional`: page-local slot ownership for `positional` vs `default`, keeps its own merger cursor
- `MergerPercentage`: integer allocation by percentages; when total is exactly 100, remainder is distributed to avoid underfill
- `MergerPercentageGradient`: two-owner percentage curve across the page, then advances merger page cursor

### MergerViewSession (Redis-backed session cache)

Goal: cache a session-sized list and serve slices.

Flow:

1. build cache key: `{merger_id}_{user_id}` + optional suffix from `custom_view_session_key`
2. check Redis `exists`; if no cache or no merger cursor in request -> regenerate session
3. on hit, `get`; if Redis returns `None` unexpectedly, regenerate
4. on generation: execute child once for `session_size`, optional dedup, store JSON with TTL
5. return page slice and increment merger cursor page
6. optional `shuffle` is applied to returned page slice (cache payload is not reshuffled)

### MergerDeduplication (single-child wrapper)

Goal: deduplicate while keeping child mix/slot semantics.

Key behavior:

- fresh session when merger cursor is absent or `page <= 0`
- reset descendant cursors
- for Redis backend, reset Redis seen-state key
- seen-state backend:
- `cursor`: encoded into merger cursor `after`
- `redis`: ZSET `dedup:{merger_id}:{user_id}` (+ optional custom suffix)
- builds `DeduplicationPolicy` + child `ExecutionContext(dedup=..., refill_settings=...)`
- executes child via shared executor, commits store, writes merger cursor (`page+1`, `after` for cursor backend)

Refill/overfetch behavior:

- duplicates trigger bounded refill loops (`max_refill_loops`)
- overfetch (`overfetch_factor`) is applied only for rewindable integer-offset cursors
- when overfetch is used, leaf cursor is rewound to inspected-count to avoid skipping unseen items


## 8) Dedup policy + seen stores

### DeduplicationPolicy

Owns key extraction + acceptance rules:

- entity key from `dedup_key` + `missing_key_policy`
- reject duplicates already seen in current response (`seen_request_set`)
- compare candidate priority vs persisted seen priority

Capabilities:

- batched prefetch from store
- per-owner arbitration with deterministic tie-break: `(-dedup_priority, owner_rank, item_rank)`
- ordered single-stream acceptance (`accept_batch`) returning accepted items + inspected count

### Seen stores

- `CursorSeenStore`
- in-cursor map of `{key -> max_priority}`
- optional compression + max-key trimming at commit

- `RedisSeenStore`
- cached reads via `redis_zmscore(...)`
- buffered writes via `redis_zadd_and_expire(...)`


## 9) Redis/JSON helpers

- `_redis_call(client, method_name, *args, **kwargs)`
- async redis client: direct await
- sync redis client: `asyncio.to_thread(...)`

Other helpers:

- `jsonlib`: thin `orjson` wrapper compatible with package usage (`dumps`/`loads`)
- `dedup_utils`: cursor encode/decode + Redis ZSET helper fallbacks (`zmscore` / pipeline)


## 10) End-to-end call flows

### A) Standard request (no view session, no dedup)

1. `FeedManager.get_data(...)` builds `ExecutionContext`
2. `Executor.run(root, ctx, limit, next_page)`
3. recursive execution via plans or direct `get_data(...)`
4. returns `FeedResult(data, next_page, has_next_page)`

### B) Slot-based merger request

1. merger returns `SlotsPlan`
2. executor fetches owners concurrently
3. optional arbitration/refill runs
4. slots are consumed in schedule order
5. `assemble(...)` builds final result

### C) Dedup wrapper request

1. wrapper creates store + policy and child context
2. child executes under dedup/refill control
3. executor performs acceptance/arbitration + bounded refills
4. store commits; wrapper writes merger cursor state

### D) View-session request

1. wrapper resolves cache key
2. cache miss/new session -> regenerate and cache
3. cache hit -> load session list from Redis
4. return requested slice + advanced merger page
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,17 @@ test:

test_cache:
pytest -s -vv -k "test_merger_view_session"

.PHONY: test_async_chart charting

# Runs only the async loop block + Chrome trace test.
# Writes trace.json next to this Makefile (project root).
test_async_chart:
rm -f ./trace.json
SMARTFEED_CHROME_TRACE=./trace.json pytest -q tests/test_async_loop_blocks_trace.py
@echo "\nWrote trace: $(CURDIR)/trace.json"
@echo "Open Chrome -> chrome://tracing -> Load -> select trace.json"

# Convenience target: generate the trace + try to open chrome://tracing.
charting: test_async_chart
-@open -a "Google Chrome" "chrome://tracing" 2>/dev/null || true
112 changes: 112 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Python-package для формирования ленты (Feed) из клиен
- [Использование](#использование)
- [Установка](#установка)
- [Формирование конфигурации](#формирование-конфигурации)
- [MergerDeduplication (дедупликация)](#mergerdeduplication-дедупликация)
- [Параметры MergerDeduplication](#параметры-mergerdeduplication)
- [Важные нюансы (сброс, cursor/redis, overfetch)](#важные-нюансы-сброс-cursorredis-overfetch)
- [Требования к клиентскому методу](#требования-к-клиентскому-методу)
- [Запуск](#запуск)

Expand Down Expand Up @@ -68,6 +71,115 @@ poetry add git+ssh://git@github.com:epoch8/looky-timeline.git
},
```

### MergerDeduplication (дедупликация)

MergerDeduplication — обёртка над одним дочерним узлом (merger или subfeed), которая удаляет дубли по ключу.

Ключевые свойства реализации:

- Дедупликация выполняется на уровне листьев (SubFeed), а не пост-обработкой результата мерджера.
Это важно: вложенные мерджеры (positional/percentage/gradient/append/distribute) сохраняют свои правила смешивания.
Если элемент удалён как дубль, MergerDeduplication «дозапросит» следующий элемент из того же источника.
- Состояние «уже видели» может храниться:
- в курсоре (state_backend="cursor") — удобно без Redis, но курсор может расти;
- в Redis (state_backend="redis") — удобно для большого состояния.

Пример: обернуть существующую конфигурацию фида дедупликацией:

```json
{
"version": "1",
"feed": {
"merger_id": "dedup_main",
"type": "merger_deduplication",
"dedup_key": "id",
"missing_key_policy": "error",
"state_backend": "cursor",
"cursor_compress": true,
"cursor_max_keys": 2000,
"overfetch_factor": 2,
"max_refill_loops": 20,
"data": {
"merger_id": "merger_percent",
"type": "merger_percentage",
"items": [
{
"percentage": 60,
"data": {
"subfeed_id": "sf_posts",
"type": "subfeed",
"method_name": "posts",
"dedup_priority": 10
}
},
{
"percentage": 40,
"data": {
"subfeed_id": "sf_ads",
"type": "subfeed",
"method_name": "ads",
"dedup_priority": 0
}
}
]
}
}
}
```

В примере выше, если `posts` и `ads` отдают объекты с одинаковым `id`, то «побеждает» источник с большим `dedup_priority`.

### Параметры MergerDeduplication

Обязательные поля:

- `merger_id: str` — уникальный ID мерджера.
- `type: "merger_deduplication"`
- `data` — ровно один дочерний узел (subfeed или merger).

Поля дедупликации:

- `dedup_key: str | null` — имя ключа/атрибута для поиска дублей.
- если `null`, ключом считается сам объект (подходит, когда объекты уже hashable/строковые).
- `missing_key_policy: "error" | "keep" | "drop"` (default: `"error"`)
- `error`: выбросить ошибку, если у элемента нет `dedup_key`;
- `keep`: сохранить элемент, даже если ключа нет;
- `drop`: выкинуть элемент без ключа.

Состояние seen (межстраничная дедупликация):

- `state_backend: "cursor" | "redis"` (default: `"cursor"`)
- `state_ttl_seconds: int` (default: `3600`) — TTL для Redis состояния (только для backend=`redis`).
- `cursor_compress: bool` (default: `true`) — сжимать seen-состояние в cursor backend.
- `cursor_max_keys: int | null` — ограничить размер seen-состояния в cursor backend (полезно для контроля размера курсора).

Производительность/поведение:

- `overfetch_factor: int` (default: `1`) — «перезапрос» внутри листьев, чтобы быстрее добрать `limit` без множества рефиллов.
- `max_refill_loops: int` (default: `20`) — верхняя граница количества дозапросов на один лист.

### Важные нюансы (сброс, cursor/redis, overfetch)

- Сброс состояния при `page <= 0` или отсутствии курсора для `merger_id`.
- MergerDeduplication воспринимает это как «fresh session» и очищает курсоры всех дочерних узлов.
- Для backend=`redis` дополнительно удаляет ключ состояния в Redis.

- Если `state_backend="redis"`, нужно передать `redis_client` в `FeedManager`.
- Ключ состояния в Redis строится как `dedup:{merger_id}:{user_id}`.
- Можно добавить суффикс через параметр запроса `custom_deduplication_key` (или `custom_view_session_key`),
чтобы разделять состояния для разных режимов выдачи.

- Приоритет (`dedup_priority`) — это приоритет победы при конфликте дублей, а не порядок вывода.
- Больше `dedup_priority` → элемент «побеждает» и будет считаться seen с этим приоритетом.
- Это поле доступно у всех узлов (merger/subfeed) и используется MergerDeduplication при дедупликации.

- overfetch работает безопасно только для «перематываемых» курсоров.
- Сейчас overfetch включается только если `next_page.after` у листа — целочисленный offset.
- Если `after` — строка/словарь/любой другой объект, он считается непрозрачным и overfetch не применяется.

- Главный реальный bottleneck в дедупликации — не обёртки/копии, а рефиллы.
- Если дублей много и upstream-методы дорогие, стоит аккуратно подобрать `overfetch_factor` и `max_refill_loops`.

### Требования к клиентскому методу

Клиентский метод для получения данных должен обязательно включать в себя следующие параметры:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ packages = [
python = ">=3.9"
pydantic = ">=1.10.7"
redis = ">=4.5.5"
orjson = ">=3.9.0"

[tool.poetry.group.dev.dependencies]
isort = "^5.12.0"
Expand Down
Loading