diff --git a/CHANGELOG.md b/CHANGELOG.md index 094e455..b2bbc86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +# 0.3.0 (2025-12-08) + +* Добавлена дедупликация по приоритетам между субфидами + - Новый параметр `priority` в SubFeed (меньше = выше приоритет) + - Параметры конфигурации: `deduplicate`, `dedup_key`, `dedup_session_ttl` + - Правило: при одинаковом приоритете дубли НЕ удаляются +* Межстраничная дедупликация через seen_ids в Redis +* Гарантированное заполнение страницы до limit (цикл дозапроса) +* Корректное движение курсоров (after = последний использованный элемент) +* Добавлен FeedResultItem для отслеживания источника элементов +* Совместимость с MergerViewSession: добавлена поддержка items_with_source +* Документация: docs/DEDUPLICATION.md +* 21 новый тест для дедупликации + # 0.2.0 (2025-11-25) * Bump dependency versions diff --git a/docs/DEDUPLICATION.md b/docs/DEDUPLICATION.md new file mode 100644 index 0000000..8421e47 --- /dev/null +++ b/docs/DEDUPLICATION.md @@ -0,0 +1,511 @@ +# Дедупликация в SmartFeed + +## Оглавление + +1. [Обзор](#обзор) +2. [Конфигурация](#конфигурация) +3. [Как работает дедупликация](#как-работает-дедупликация) +4. [Приоритеты субфидов](#приоритеты-субфидов) +5. [Межстраничная дедупликация (seen_ids)](#межстраничная-дедупликация-seen_ids) +6. [Движение курсоров](#движение-курсоров) +7. [Примеры конфигураций](#примеры-конфигураций) +8. [Тестовые сценарии](#тестовые-сценарии) + +--- + +## Обзор + +Дедупликация в SmartFeed позволяет удалять дублирующиеся элементы между разными субфидами на основе приоритетов. Это особенно полезно когда один и тот же контент может появиться в нескольких источниках данных. + +### Ключевые возможности + +- **Дедупликация по приоритетам**: если элемент есть в нескольких субфидах, остаётся только из субфида с высшим приоритетом +- **Правило одинаковых приоритетов**: если приоритеты равны — дубли НЕ удаляются (оба остаются) +- **Межстраничная дедупликация**: элементы, показанные на предыдущих страницах, не появятся снова +- **Корректное движение курсоров**: курсор указывает на последний использованный элемент, а не запрошенный +- **Гарантированное заполнение страницы**: если дедупликация удалила много элементов, система автоматически дозапросит данные +- **Совместимость с MergerViewSession**: дедупликация корректно работает с кэшированными мерджерами + +--- + +## Конфигурация + +### Параметры в FeedConfig + +```python +config = { + "version": "1", + "deduplicate": True, # Включить дедупликацию + "dedup_key": "id", # Ключ для определения дублей + "dedup_session_ttl": 300, # TTL для seen_ids в Redis (секунды) + "feed": { ... } +} +``` + +| Параметр | Тип | По умолчанию | Описание | +|----------|-----|--------------|----------| +| `deduplicate` | bool | `False` | Включить/выключить дедупликацию | +| `dedup_key` | str \| None | `None` | Поле для определения дублей. Если `None` — сравнивается весь элемент | +| `dedup_session_ttl` | int | `300` | Время жизни seen_ids в Redis (секунды) | + +### Параметр priority в SubFeed + +```python +{ + "subfeed_id": "ads", + "type": "subfeed", + "method_name": "get_ads", + "priority": 1 # Меньше = выше приоритет +} +``` + +| Значение priority | Приоритет | +|-------------------|-----------| +| 0 | Самый высокий (по умолчанию) | +| 1 | Высокий | +| 2 | Средний | +| 10 | Низкий | + +--- + +## Как работает дедупликация + +### Алгоритм + +``` +1. Загрузка seen_ids из Redis + └── Элементы, показанные на предыдущих страницах + +2. ЦИКЛ (пока не набрали limit элементов и есть данные): + │ + ├── 2.1 Запрос данных с увеличенным лимитом + │ └── fetch_limit = max(needed * 2, limit) + │ + ├── 2.2 Группировка элементов по dedup_key + │ └── {"item_id_1": [элемент_из_A, элемент_из_B], ...} + │ + ├── 2.3 Для каждой группы: + │ ├── Если элемент в seen_ids → пропустить + │ ├── Если один элемент → оставить + │ └── Если несколько элементов: + │ ├── Найти минимальный (лучший) приоритет + │ └── Оставить ВСЕ элементы с этим приоритетом + │ + ├── 2.4 Добавить к результату + │ + └── 2.5 Если набрали >= limit → выход из цикла + Если has_next_page = False → выход из цикла + +3. Обрезать до limit + +4. Сохранить ID показанных элементов в seen_ids + +5. Пересчитать курсоры + └── after = последний использованный элемент от каждого субфида +``` + +### Гарантированное заполнение страницы + +Цикл гарантирует, что страница будет заполнена до `limit` элементов, +даже если много дублей. Максимум 5 итераций для защиты от бесконечного цикла. + +``` +Пример: + limit = 10 + + Итерация 1: + - Запросили 20 элементов (needed=10, fetch=10*2) + - После дедупликации: 4 элемента (16 дублей удалено) + - Набрали: 4 из 10 + + Итерация 2: + - Запросили 12 элементов (needed=6, fetch=6*2) + - После дедупликации: 8 элементов + - Набрали: 4+8 = 12 из 10 ✓ + + Результат: 10 элементов (обрезали до limit) +``` + +### Пример работы + +``` +Субфиды: + subfeed_a (priority=1): a1, a2, common1, a3, a4 + subfeed_b (priority=2): b1, b2, common1, b3, b4 + +Шаг 3 - Группировка: + "a1": [a1 из A] + "a2": [a2 из A] + "common1": [common1 из A, common1 из B] ← дубль! + "b1": [b1 из B] + ... + +Шаг 4 - Дедупликация common1: + - priority A = 1, priority B = 2 + - min_priority = 1 + - Оставляем common1 из A, удаляем из B + +Результат: a1, a2, common1(A), a3, a4, b1, b2, b3, b4 +``` + +--- + +## Приоритеты субфидов + +### Правила + +1. **Меньше число = выше приоритет** + - priority=0 побеждает priority=1 + - priority=1 побеждает priority=2 + +2. **Одинаковый приоритет = оба остаются** + ``` + subfeed_a (priority=1): common1 + subfeed_b (priority=1): common1 + + Результат: common1(A), common1(B) ← оба! + ``` + +3. **Разный приоритет = низкий удаляется** + ``` + subfeed_a (priority=1): common1 + subfeed_b (priority=2): common1 + + Результат: common1(A) ← только из A + ``` + +### Когда использовать одинаковые приоритеты + +- Когда дубли допустимы (например, один пост от разных источников) +- Когда важно показать контент из обоих источников + +### Когда использовать разные приоритеты + +- Когда один источник важнее (реклама vs органика) +- Когда дубли нежелательны + +--- + +## Межстраничная дедупликация (seen_ids) + +### Проблема + +Без межстраничной дедупликации элемент может появиться на странице 1 из subfeed_a, +и снова на странице 2 из subfeed_b. + +### Решение + +SmartFeed хранит ID показанных элементов в Redis: + +``` +Ключ: smartfeed_seen:{user_id} +Значение: ["id1", "id2", "common1", ...] +TTL: dedup_session_ttl (по умолчанию 300 секунд) +``` + +### Алгоритм + +``` +Страница 1: + 1. seen_ids = {} (пусто, новая сессия) + 2. Показываем: a1, a2, common1, a3 + 3. Сохраняем: seen_ids = {a1, a2, common1, a3} + +Страница 2: + 1. Загружаем: seen_ids = {a1, a2, common1, a3} + 2. subfeed_b возвращает: b1, common1, b2 + 3. common1 в seen_ids → пропускаем + 4. Показываем: b1, b2 + 5. Сохраняем: seen_ids = {a1, a2, common1, a3, b1, b2} +``` + +### Новая сессия + +Если `next_page.data` пустой — это новая сессия: +- seen_ids очищается +- Пользователь увидит контент заново + +--- + +## Движение курсоров + +### Проблема + +SmartFeed запрашивает данные с запасом (`fetch_limit > limit`), чтобы компенсировать удаляемые дубли. +Если курсор будет указывать на последний **запрошенный** элемент, а не **использованный** — +элементы будут пропущены. + +### Пример проблемы (до исправления) + +``` +limit=5, fetch_limit=10 + +Субфид вернул: e1, e2, e3, e4, e5, e6, e7, e8, e9, e10 + └── курсор after=e10 + +Использовали: e1, e2, e3, e4, e5 + └── должен быть after=e5 + +Следующая страница начнётся с e11, элементы e6-e10 потеряны! +``` + +### Решение + +Курсор устанавливается на последний **использованный** элемент: + +```python +def _recalculate_cursors(self, next_page, original_items, final_items): + # Находим последний использованный элемент от каждого субфида + last_used_by_source = {} + for item in final_items: + last_used_by_source[item.source_id] = item + + # Устанавливаем after на последний использованный элемент + new_cursor.after = last_used_by_source[source_id].item +``` + +### Случаи + +| Сценарий | Действие с курсором | +|----------|---------------------| +| Использовано всё что запрошено | Курсор остаётся как есть | +| Использовано меньше чем запрошено | `after` = последний использованный элемент | +| Ничего не использовано (все дубли) | `page` откатывается на 1 | + +--- + +## Примеры конфигураций + +### Базовая дедупликация + +```python +config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "main", + "type": "merger_append", + "items": [ + { + "subfeed_id": "premium", + "type": "subfeed", + "method_name": "get_premium", + "priority": 1, # Высокий приоритет + }, + { + "subfeed_id": "regular", + "type": "subfeed", + "method_name": "get_regular", + "priority": 2, # Низкий приоритет + }, + ], + }, +} +``` + +### С процентным мерджером + +```python +config = { + "version": "1", + "deduplicate": True, + "dedup_key": "post_id", + "dedup_session_ttl": 600, # 10 минут + "feed": { + "merger_id": "feed", + "type": "merger_percentage", + "items": [ + { + "percentage": 70, + "data": { + "subfeed_id": "followings", + "type": "subfeed", + "method_name": "get_followings", + "priority": 1, + }, + }, + { + "percentage": 30, + "data": { + "subfeed_id": "recommendations", + "type": "subfeed", + "method_name": "get_recommendations", + "priority": 2, + }, + }, + ], + }, +} +``` + +### С позиционным мерджером (реклама на фиксированных позициях) + +```python +config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "feed", + "type": "merger_positional", + "positions": [3, 6, 9], # Реклама на позициях 3, 6, 9 + "positional": { + "subfeed_id": "ads", + "type": "subfeed", + "method_name": "get_ads", + "priority": 1, # Реклама приоритетнее + }, + "default": { + "subfeed_id": "content", + "type": "subfeed", + "method_name": "get_content", + "priority": 2, + }, + }, +} +``` + +--- + +## Тестовые сценарии + +### Тесты в test_deduplication.py (20 тестов) + +| Тест | Что проверяет | +|------|---------------| +| `test_dedup_basic` | Базовая дедупликация: дубль удаляется из низкоприоритетного субфида | +| `test_dedup_same_priority` | Одинаковый приоритет: дубли НЕ удаляются | +| `test_dedup_no_duplicates` | Без дублей: данные не меняются | +| `test_dedup_disabled` | `deduplicate=False`: дубли остаются | +| `test_dedup_cursor_movement` | Курсоры обновляются корректно | +| `test_dedup_cursor_partial_use` | items_with_source заполняется | +| `test_dedup_three_subfeeds_cascade` | 3 субфида: дубль остаётся только из приоритета 1 | +| `test_dedup_mixed_priorities` | Приоритеты 1,1,2: дубли между 1 и 1 остаются | +| `test_dedup_percentage_merger` | Дедупликация в MergerPercentage | +| `test_dedup_nested_mergers` | Вложенные мерджеры | +| `test_dedup_positional_merger` | MergerPositional с дедупликацией | +| `test_dedup_items_with_source_populated` | Метаданные элементов заполняются | +| `test_dedup_without_dedup_key` | Без dedup_key: сравнивается весь элемент | +| `test_dedup_has_next_page` | has_next_page корректен | +| `test_cursor_moves_to_last_used_element` | Курсор указывает на последний использованный элемент | +| `test_cursor_no_elements_lost_with_duplicates` | Элементы не теряются при пагинации | +| `test_cursor_after_points_to_last_used` | `after` = последний элемент страницы | +| `test_cross_page_dedup_between_subfeeds` | Межстраничная дедупликация между субфидами | +| `test_page_filled_despite_many_duplicates` | **Страница заполняется до limit даже при многих дублях** | +| `test_cursor_unused_subfeed_not_advanced` | Неиспользованный субфид не двигает курсор | + +--- + +## Требования + +- **Redis** — для хранения seen_ids (межстраничная дедупликация) + - Если Redis не передан, межстраничная дедупликация не работает + - Дедупликация на одной странице работает без Redis + +--- + +## Диаграмма потока данных + +``` + ┌─────────────────┐ + │ FeedManager │ + │ get_data() │ + └────────┬────────┘ + │ + ┌────────────────────────┼────────────────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌───────────────┐ ┌───────────────┐ ┌──────────────────┐ + │ Загрузить │ │ Запросить │ │ Очистить seen_ids│ + │ seen_ids │ │ с fetch_limit │ │ (новая сессия) │ + │ из Redis │ │ │ │ │ + └───────┬───────┘ └───────┬───────┘ └──────────────────┘ + │ │ + └──────────┬───────────┘ + │ + ▼ + ┌─────────────────────┐ + │ Дедупликация по │ + │ приоритетам │ + │ + фильтр seen_ids │ + └──────────┬──────────┘ + │ + ▼ + ┌─────────────────────┐ + │ Обрезать до limit │ + └──────────┬──────────┘ + │ + ┌──────────┼──────────┐ + │ │ │ + ▼ ▼ ▼ + ┌────────────┐ ┌────────┐ ┌────────────────┐ + │ Сохранить │ │Пересчёт│ │ Вернуть │ + │ seen_ids │ │курсоров│ │ FeedResult │ + │ в Redis │ │ │ │ │ + └────────────┘ └────────┘ └────────────────┘ +``` + +--- + +## Совместимость с MergerViewSession + +`MergerViewSession` — это специальный мерджер, который кэширует весь фид в Redis и отдаёт данные постранично из кэша. При включённой дедупликации важно понимать, как эти два механизма взаимодействуют. + +### Как работает дедупликация с MergerViewSession + +1. **Заполнение items_with_source**: `MergerViewSession` теперь корректно заполняет `items_with_source` для каждого элемента на странице, что позволяет `FeedManager` правильно обрабатывать дедупликацию. + +2. **Приоритет**: Так как `MergerViewSession` сам является контейнером для других мерджеров/субфидов, он использует `priority=0` для всех своих элементов. Это означает, что если у вас есть несколько `MergerViewSession` в конфигурации, дубли между ними будут сохранены (правило одинаковых приоритетов). + +3. **Цикл дозапроса**: Если дедупликация удалила много элементов, `FeedManager` НЕ будет повторно вызывать `MergerViewSession.get_data()`, так как данные уже закэшированы. Вместо этого он просто вернёт то, что осталось после дедупликации. + +4. **Рекомендации**: + - Если вы используете `MergerViewSession` с внутренней дедупликацией (`deduplicate: true` в самом `MergerViewSession`), это дедупликация **внутри сессии** при её создании. + - Если вы включаете дедупликацию на уровне `FeedConfig`, это дедупликация **между страницами** и **между разными мерджерами**. + - Для большинства случаев достаточно использовать дедупликацию на уровне `FeedConfig`. + +### Пример конфигурации с MergerViewSession + +```python +config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "dedup_session_ttl": 300, + "feed": { + "merger_id": "main_view_session", + "type": "merger_view_session", + "session_size": 800, + "session_live_time": 3600, + "deduplicate": False, # Внутренняя дедупликация отключена + "data": { + "merger_id": "main_merger", + "type": "merger_percentage", + "items": [ + { + "sub_feed_id": "feed_a", + "type": "sub_feed", + "priority": 1, # Высший приоритет + "method": "get_feed_a", + "percent": 50, + }, + { + "sub_feed_id": "feed_b", + "type": "sub_feed", + "priority": 2, + "method": "get_feed_b", + "percent": 50, + } + ] + } + } +} +``` + +В этом примере: +- `MergerViewSession` кэширует 800 элементов из `merger_percentage` +- Дедупликация на уровне `FeedConfig` удалит дубли между `feed_a` и `feed_b` с учётом приоритетов +- Элементы из `feed_a` (priority=1) будут иметь приоритет над элементами из `feed_b` (priority=2) +- `seen_ids` будет отслеживать показанные элементы между страницами + diff --git a/pyproject.toml b/pyproject.toml index 108a4c7..4b4d3be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "epoch8-smartfeed" -version = "0.2.0" +version = "0.3.0" description = "" authors = [ "Epoch8 Team " diff --git a/smartfeed/examples/example_client.py b/smartfeed/examples/example_client.py index 9a421ff..09dd4a0 100644 --- a/smartfeed/examples/example_client.py +++ b/smartfeed/examples/example_client.py @@ -169,3 +169,244 @@ async def keys_method( result = FeedResultClient(data=result_data, next_page=next_page, has_next_page=True) return result + + @staticmethod + async def dedup_method_a( + user_id: str, + limit: int, + next_page: FeedResultNextPageInside, + limit_to_return: Optional[int] = None, + ) -> FeedResultClient: + """ + Метод A для тестирования дедупликации. + Возвращает элементы с id: a1, a2, a3, common1, common2, a4, a5... + + :param user_id: ID профиля. + :param limit: кол-во элементов. + :param next_page: курсор пагинации. + :param limit_to_return: ограничить кол-во результата. + :return: список элементов с дублями. + """ + # Элементы: a1, a2, a3, common1, common2, a4, a5, a6, common3, a7... + data = [] + for i in range(1, 100): + if i % 5 == 0: + data.append({"id": f"common{i//5}", "source": "A", "value": i}) + else: + data.append({"id": f"a{i}", "source": "A", "value": i}) + + from_index = 0 + if next_page.after: + for idx, item in enumerate(data): + if item == next_page.after: + from_index = idx + 1 + break + + to_index = from_index + limit + result_data = data[from_index:to_index] + + if isinstance(limit_to_return, int) and limit_to_return > 0: + result_data = result_data[:limit_to_return] + + next_page.after = result_data[-1] if result_data else None + next_page.page += 1 + + result = FeedResultClient(data=result_data, next_page=next_page, has_next_page=to_index < len(data)) + return result + + @staticmethod + async def dedup_method_b( + user_id: str, + limit: int, + next_page: FeedResultNextPageInside, + limit_to_return: Optional[int] = None, + ) -> FeedResultClient: + """ + Метод B для тестирования дедупликации. + Возвращает элементы с id: b1, b2, common1, b3, b4, common2... + + :param user_id: ID профиля. + :param limit: кол-во элементов. + :param next_page: курсор пагинации. + :param limit_to_return: ограничить кол-во результата. + :return: список элементов с дублями. + """ + # Элементы: b1, b2, common1, b3, b4, common2... + data = [] + for i in range(1, 100): + if i % 3 == 0: + data.append({"id": f"common{i//3}", "source": "B", "value": i * 10}) + else: + data.append({"id": f"b{i}", "source": "B", "value": i * 10}) + + from_index = 0 + if next_page.after: + for idx, item in enumerate(data): + if item == next_page.after: + from_index = idx + 1 + break + + to_index = from_index + limit + result_data = data[from_index:to_index] + + if isinstance(limit_to_return, int) and limit_to_return > 0: + result_data = result_data[:limit_to_return] + + next_page.after = result_data[-1] if result_data else None + next_page.page += 1 + + result = FeedResultClient(data=result_data, next_page=next_page, has_next_page=to_index < len(data)) + return result + + @staticmethod + async def dedup_method_c( + user_id: str, + limit: int, + next_page: FeedResultNextPageInside, + limit_to_return: Optional[int] = None, + ) -> FeedResultClient: + """ + Метод C для тестирования дедупликации (третий источник). + Возвращает элементы с id: c1, common1, c2, common2, c3, common3... + + :param user_id: ID профиля. + :param limit: кол-во элементов. + :param next_page: курсор пагинации. + :param limit_to_return: ограничить кол-во результата. + :return: список элементов с дублями. + """ + data = [] + for i in range(1, 100): + if i % 2 == 0: + data.append({"id": f"common{i//2}", "source": "C", "value": i * 100}) + else: + data.append({"id": f"c{i}", "source": "C", "value": i * 100}) + + from_index = 0 + if next_page.after: + for idx, item in enumerate(data): + if item == next_page.after: + from_index = idx + 1 + break + + to_index = from_index + limit + result_data = data[from_index:to_index] + + if isinstance(limit_to_return, int) and limit_to_return > 0: + result_data = result_data[:limit_to_return] + + next_page.after = result_data[-1] if result_data else None + next_page.page += 1 + + result = FeedResultClient(data=result_data, next_page=next_page, has_next_page=to_index < len(data)) + return result + + @staticmethod + async def dedup_no_overlap_method( + user_id: str, + limit: int, + next_page: FeedResultNextPageInside, + limit_to_return: Optional[int] = None, + ) -> FeedResultClient: + """ + Метод без пересечений для тестирования. + + :param user_id: ID профиля. + :param limit: кол-во элементов. + :param next_page: курсор пагинации. + :param limit_to_return: ограничить кол-во результата. + :return: список уникальных элементов. + """ + data = [{"id": f"unique_{user_id}_{i}", "value": i} for i in range(1, 100)] + + from_index = 0 + if next_page.after: + for idx, item in enumerate(data): + if item == next_page.after: + from_index = idx + 1 + break + + to_index = from_index + limit + result_data = data[from_index:to_index] + + if isinstance(limit_to_return, int) and limit_to_return > 0: + result_data = result_data[:limit_to_return] + + next_page.after = result_data[-1] if result_data else None + next_page.page += 1 + + result = FeedResultClient(data=result_data, next_page=next_page, has_next_page=to_index < len(data)) + return result + + @staticmethod + async def placeholder_tours( + user_id: str, + limit: int, + next_page: FeedResultNextPageInside, + limit_to_return: Optional[int] = None, + ) -> FeedResultClient: + """ + Метод для получения placeholder туров (для позиционного мерджера). + Возвращает туры с id вида "placeholder_X". + + :param user_id: ID профиля. + :param limit: кол-во элементов. + :param next_page: курсор пагинации. + :param limit_to_return: ограничить кол-во результата. + :return: список placeholder туров. + """ + # Генерируем placeholder туры + data = [{"id": f"placeholder_{i}", "type": "placeholder", "value": i} for i in range(1, 21)] + + from_index = (next_page.page - 1) * limit + to_index = from_index + limit + result_data = data[from_index:to_index] + + if isinstance(limit_to_return, int) and limit_to_return > 0: + result_data = result_data[:limit_to_return] + + next_page.after = result_data[-1] if result_data else None + next_page.page += 1 + + result = FeedResultClient(data=result_data, next_page=next_page, has_next_page=to_index < len(data)) + return result + + @staticmethod + async def regular_tours( + user_id: str, + limit: int, + next_page: FeedResultNextPageInside, + limit_to_return: Optional[int] = None, + ) -> FeedResultClient: + """ + Метод для получения обычных туров (для view session). + Возвращает туры с id вида "tour_X", некоторые из которых дублируются с placeholder. + + :param user_id: ID профиля. + :param limit: кол-во элементов. + :param next_page: курсор пагинации. + :param limit_to_return: ограничить кол-во результата. + :return: список обычных туров с дублями. + """ + # Генерируем обычные туры, включая дубли с placeholder + data = [] + for i in range(1, 101): + if i <= 10: + # Первые 10 элементов - дубли с placeholder + data.append({"id": f"placeholder_{i}", "type": "regular", "value": i * 10}) + else: + # Остальные - уникальные туры + data.append({"id": f"tour_{i}", "type": "regular", "value": i * 10}) + + from_index = (next_page.page - 1) * limit + to_index = from_index + limit + result_data = data[from_index:to_index] + + if isinstance(limit_to_return, int) and limit_to_return > 0: + result_data = result_data[:limit_to_return] + + next_page.after = result_data[-1] if result_data else None + next_page.page += 1 + + result = FeedResultClient(data=result_data, next_page=next_page, has_next_page=to_index < len(data)) + return result diff --git a/smartfeed/manager.py b/smartfeed/manager.py index e91bbe9..47c9dbd 100644 --- a/smartfeed/manager.py +++ b/smartfeed/manager.py @@ -1,9 +1,12 @@ -from typing import Any, Dict, Optional, Union +import json +from collections import defaultdict +from typing import Any, Dict, List, Optional, Set, Union import redis from redis.asyncio import Redis as AsyncRedis +from redis.asyncio import RedisCluster as AsyncRedisCluster -from .schemas import FeedConfig, FeedResult, FeedResultNextPage +from .schemas import FeedConfig, FeedResult, FeedResultItem, FeedResultNextPage, FeedResultNextPageInside class FeedManager: @@ -24,6 +27,223 @@ def __init__(self, config: Dict, methods_dict: Dict, redis_client: Optional[Unio self.methods_dict = methods_dict self.redis_client = redis_client + def _get_dedup_key_value(self, item: Any) -> Any: + """ + Получение значения ключа дедупликации из элемента. + + :param item: элемент данных. + :return: значение ключа дедупликации. + """ + dedup_key = self.feed_config.dedup_key + + if not dedup_key: + return item + + try: + dedup_value = item.get(dedup_key) + except AttributeError: + dedup_value = getattr(item, dedup_key, None) + + if dedup_value is None: + raise ValueError(f"Deduplication failed: entity {item} has no key or attr {dedup_key}") + + return dedup_value + + def _deduplicate_by_priority( + self, + items_with_source: List[FeedResultItem], + seen_ids: Set[Any], + ) -> tuple[List[FeedResultItem], Dict[str, int]]: + """ + Дедупликация элементов по приоритету. + + Правила: + - Если два элемента имеют одинаковый dedup_key, оставляем тот, у которого приоритет выше (меньше число). + - Если приоритеты одинаковые — оставляем оба. + - Элементы из seen_ids пропускаются (межстраничная дедупликация). + + :param items_with_source: список элементов с информацией об источнике. + :param seen_ids: множество уже показанных ID. + :return: кортеж (дедуплицированный список, словарь использованных элементов по source_id). + """ + # Группируем по dedup_key + by_dedup_key: Dict[Any, List[FeedResultItem]] = defaultdict(list) + for item in items_with_source: + dedup_key_value = self._get_dedup_key_value(item.item) + by_dedup_key[dedup_key_value].append(item) + + result: List[FeedResultItem] = [] + used_by_source: Dict[str, int] = defaultdict(int) + + for dedup_key_value, group in by_dedup_key.items(): + # Пропускаем уже показанные элементы + if dedup_key_value in seen_ids: + continue + + if len(group) == 1: + # Один элемент — просто добавляем + result.append(group[0]) + used_by_source[group[0].source_id] += 1 + else: + # Несколько элементов — группируем по приоритету + by_priority: Dict[int, List[FeedResultItem]] = defaultdict(list) + for item in group: + by_priority[item.priority].append(item) + + # Находим минимальный (лучший) приоритет + min_priority = min(by_priority.keys()) + best_items = by_priority[min_priority] + + # Если несколько элементов с лучшим приоритетом — оставляем все (одинаковый приоритет) + for item in best_items: + result.append(item) + used_by_source[item.source_id] += 1 + + # Сортируем результат по оригинальной позиции для сохранения порядка + # result.sort(key=lambda x: (x.priority, x.position)) + result.sort(key=lambda x: x.position) + + return result, dict(used_by_source) + + def _recalculate_cursors( + self, + next_page: FeedResultNextPage, + original_items_with_source: List[FeedResultItem], + final_items: List[FeedResultItem], + ) -> FeedResultNextPage: + """ + Пересчет курсоров после дедупликации. + + Курсор каждого субфида устанавливается на последний реально использованный элемент, + а не на последний запрошенный. + + :param next_page: оригинальный next_page (курсоры после запроса всех данных). + :param original_items_with_source: оригинальный список элементов до дедупликации/обрезки. + :param final_items: финальный список элементов после дедупликации и обрезки до limit. + :return: пересчитанный next_page с курсорами на реально использованные элементы. + """ + new_next_page = FeedResultNextPage(data={}) + + # Группируем original_items по source_id для получения списка элементов каждого субфида + items_by_source: Dict[str, List[FeedResultItem]] = defaultdict(list) + for item in original_items_with_source: + items_by_source[item.source_id].append(item) + + # Находим последний использованный элемент от каждого source_id + last_used_by_source: Dict[str, FeedResultItem] = {} + for item in final_items: + last_used_by_source[item.source_id] = item + + # Подсчитываем количество использованных элементов по source_id + used_count_by_source: Dict[str, int] = defaultdict(int) + for item in final_items: + used_count_by_source[item.source_id] += 1 + + for source_id, cursor in next_page.data.items(): + source_items = items_by_source.get(source_id, []) + fetched = len(source_items) + used = used_count_by_source.get(source_id, 0) + + if used == 0: + # Ничего не использовано — не двигаем курсор + # Откатываем page на 1 назад, after оставляем прежний + new_next_page.data[source_id] = FeedResultNextPageInside( + page=max(1, cursor.page - 1), + after=cursor.after, + ) + elif used < fetched: + # Использовано меньше чем запрошено — устанавливаем after на последний использованный элемент + last_used = last_used_by_source[source_id] + new_next_page.data[source_id] = FeedResultNextPageInside( + page=cursor.page, # page оставляем как есть + after=last_used.item, # after = последний использованный элемент + ) + else: + # Использовано всё что запрошено — курсор остаётся как есть + new_next_page.data[source_id] = cursor + + return new_next_page + + async def _get_seen_ids(self, user_id: Any, **params: Any) -> Set[Any]: + """ + Получение seen_ids из Redis. + + :param user_id: ID пользователя. + :param params: дополнительные параметры. + :return: множество seen_ids. + """ + if not self.redis_client: + return set() + + if custom_key := params.get("custom_dedup_session_key"): + cache_key = f"smartfeed_seen:{user_id}:{custom_key}" + else: + cache_key = f"smartfeed_seen:{user_id}" + + try: + if isinstance(self.redis_client, (AsyncRedis, AsyncRedisCluster)): + cached_data = await self.redis_client.get(cache_key) + else: + cached_data = self.redis_client.get(cache_key) + + if cached_data: + return set(json.loads(cached_data)) + except Exception: + pass + + return set() + + async def _save_seen_ids(self, user_id: Any, seen_ids: Set[Any], **params: Any) -> None: + """ + Сохранение seen_ids в Redis. + + :param user_id: ID пользователя. + :param seen_ids: множество seen_ids. + :param params: дополнительные параметры. + """ + if not self.redis_client: + return + + if custom_key := params.get("custom_dedup_session_key"): + cache_key = f"smartfeed_seen:{user_id}:{custom_key}" + else: + cache_key = f"smartfeed_seen:{user_id}" + + ttl = self.feed_config.dedup_session_ttl + + try: + seen_list = list(seen_ids) + if isinstance(self.redis_client, (AsyncRedis, AsyncRedisCluster)): + await self.redis_client.set(cache_key, json.dumps(seen_list)) + await self.redis_client.expire(cache_key, ttl) + else: + self.redis_client.set(cache_key, json.dumps(seen_list), ex=ttl) + except Exception: + pass + + async def _clear_seen_ids(self, user_id: Any, **params: Any) -> None: + """ + Очистка seen_ids в Redis (для новой сессии). + + :param user_id: ID пользователя. + :param params: дополнительные параметры. + """ + if not self.redis_client: + return + + if custom_key := params.get("custom_dedup_session_key"): + cache_key = f"smartfeed_seen:{user_id}:{custom_key}" + else: + cache_key = f"smartfeed_seen:{user_id}" + + try: + if isinstance(self.redis_client, (AsyncRedis, AsyncRedisCluster)): + await self.redis_client.delete(cache_key) + else: + self.redis_client.delete(cache_key) + except Exception: + pass + async def get_data(self, user_id: Any, limit: int, next_page: FeedResultNextPage, **params: Any) -> FeedResult: """ Метод для получения данных согласно конфигурации. @@ -34,13 +254,125 @@ async def get_data(self, user_id: Any, limit: int, next_page: FeedResultNextPage :param params: любые внешние параметры, передаваемые в исполняемую функцию на клиентской стороне. :return: результат получения данных согласно конфигурации фида. """ + # Проверяем, нужна ли дедупликация + if not self.feed_config.deduplicate: + # Без дедупликации — стандартное поведение + result = await self.feed_config.feed.get_data( + methods_dict=self.methods_dict, + user_id=user_id, + limit=limit, + next_page=next_page, + redis_client=self.redis_client, + **params, + ) + return result + + # С дедупликацией + # Проверяем, это новая сессия (пустой next_page) + is_new_session = len(next_page.data) == 0 + + if is_new_session: + # Очищаем seen_ids для новой сессии + await self._clear_seen_ids(user_id, **params) + seen_ids: Set[Any] = set() + else: + # Загружаем seen_ids из Redis + seen_ids = await self._get_seen_ids(user_id, **params) + + # Цикл для гарантированного заполнения страницы до limit + # Запрашиваем данные пока не наберём limit элементов или пока субфиды не закончатся + all_items_with_source: List[FeedResultItem] = [] + all_deduplicated_items: List[FeedResultItem] = [] + current_next_page = next_page + last_result: Optional[FeedResult] = None + has_more_data = True + + # Максимум итераций для защиты от бесконечного цикла + max_iterations = 5 + iteration = 0 + + while len(all_deduplicated_items) < limit and has_more_data and iteration < max_iterations: + iteration += 1 + + # Сколько ещё нужно элементов + needed = limit - len(all_deduplicated_items) + + # Запрашиваем с запасом (x2 от нужного, минимум limit) + fetch_limit = max(needed * 2, limit) + + result = await self.feed_config.feed.get_data( + methods_dict=self.methods_dict, + user_id=user_id, + limit=fetch_limit, + next_page=current_next_page, + redis_client=self.redis_client, + **params, + ) + last_result = result + + # Если нет items_with_source — дедупликация невозможна + if not result.items_with_source: + if not all_deduplicated_items: + # Первая итерация и нет данных + return FeedResult( + data=result.data[:limit], + next_page=result.next_page, + has_next_page=result.has_next_page or len(result.data) > limit, + items_with_source=[], + ) + break + + # Собираем все элементы + all_items_with_source.extend(result.items_with_source) + + # Дедуплицируем по приоритету (включая seen_ids и уже собранные элементы) + # Создаём временный seen_ids с уже добавленными элементами + temp_seen_ids = seen_ids.copy() + for item in all_deduplicated_items: + temp_seen_ids.add(self._get_dedup_key_value(item.item)) + + new_deduplicated, _ = self._deduplicate_by_priority( + result.items_with_source, + temp_seen_ids, + ) + + all_deduplicated_items.extend(new_deduplicated) + + # Обновляем курсоры для следующей итерации + current_next_page = result.next_page + + # Проверяем, есть ли ещё данные + has_more_data = result.has_next_page + + # Обрезаем до limit + final_items = all_deduplicated_items[:limit] + + # Извлекаем данные + final_data = [item.item for item in final_items] + + # Обновляем seen_ids + for item in final_items: + dedup_key_value = self._get_dedup_key_value(item.item) + seen_ids.add(dedup_key_value) + + # Сохраняем seen_ids в Redis + await self._save_seen_ids(user_id, seen_ids, **params) + + # Пересчитываем курсоры на основе реально использованных элементов + if last_result: + new_next_page = self._recalculate_cursors( + current_next_page, + all_items_with_source, + final_items, + ) + final_has_next_page = has_more_data or len(all_deduplicated_items) > limit + else: + new_next_page = next_page + final_has_next_page = False - result = await self.feed_config.feed.get_data( - methods_dict=self.methods_dict, - user_id=user_id, - limit=limit, - next_page=next_page, - redis_client=self.redis_client, - **params, + return FeedResult( + data=final_data, + next_page=new_next_page, + has_next_page=final_has_next_page, + items_with_source=final_items, ) - return result diff --git a/smartfeed/schemas.py b/smartfeed/schemas.py index 45df221..8d3810e 100644 --- a/smartfeed/schemas.py +++ b/smartfeed/schemas.py @@ -49,6 +49,23 @@ class FeedResultNextPage(BaseModel): data: Dict[str, FeedResultNextPageInside] +class FeedResultItem(BaseModel): + """ + Модель элемента данных с информацией об источнике для дедупликации. + + Attributes: + item сам элемент данных. + source_id ID субфида-источника. + priority приоритет субфида-источника. + position позиция элемента в исходном списке субфида. + """ + + item: Any + source_id: str + priority: int + position: int + + class FeedResult(BaseModel): """ Модель результата метода get_data() любой позиции / целого фида. @@ -57,11 +74,13 @@ class FeedResult(BaseModel): data список данных, возвращенных мерджером / субфидом. next_page курсор пагинации. has_next_page флаг наличия следующей страницы данных. + items_with_source список элементов с информацией об источнике (для дедупликации). """ data: List next_page: FeedResultNextPage has_next_page: bool + items_with_source: List[FeedResultItem] = [] class FeedResultClient(BaseModel): @@ -286,10 +305,25 @@ async def _get_cache( logging.info("Successfully read cached data for %s", cache_key) session_data = json.loads(cached_data) page = next_page.data[self.merger_id].page if self.merger_id in next_page.data else 1 + page_data = session_data[(page - 1) * limit :][:limit] + + # Заполняем items_with_source для совместимости с дедупликацией. + # MergerViewSession не имеет приоритетов, поэтому используем priority=0. + items_with_source = [ + FeedResultItem( + item=item, + source_id=self.merger_id, + priority=0, + position=i, + ) + for i, item in enumerate(page_data) + ] + result = FeedResult( - data=session_data[(page - 1) * limit :][:limit], + data=page_data, next_page=FeedResultNextPage(data={self.merger_id: FeedResultNextPageInside(page=page + 1, after=None)}), has_next_page=bool(len(session_data) > limit * page), + items_with_source=items_with_source, ) return result @@ -342,10 +376,25 @@ async def _get_cache_async( logging.info("Successfully read cached data for %s", cache_key) session_data = json.loads(cached_data) page = next_page.data[self.merger_id].page if self.merger_id in next_page.data else 1 + page_data = session_data[(page - 1) * limit :][:limit] + + # Заполняем items_with_source для совместимости с дедупликацией. + # MergerViewSession не имеет приоритетов, поэтому используем priority=0. + items_with_source = [ + FeedResultItem( + item=item, + source_id=self.merger_id, + priority=0, + position=i, + ) + for i, item in enumerate(page_data) + ] + result = FeedResult( - data=session_data[(page - 1) * limit :][:limit], + data=page_data, next_page=FeedResultNextPage(data={self.merger_id: FeedResultNextPageInside(page=page + 1, after=None)}), has_next_page=bool(len(session_data) > limit * page), + items_with_source=items_with_source, ) return result @@ -439,7 +488,7 @@ async def get_data( """ # Формируем результат append мерджера. - result = FeedResult(data=[], next_page=FeedResultNextPage(data={}), has_next_page=False) + result = FeedResult(data=[], next_page=FeedResultNextPage(data={}), has_next_page=False, items_with_source=[]) result_limit = limit for item in self.items: @@ -456,6 +505,9 @@ async def get_data( # Добавляем данные позиции к общему результату процентного мерджера. result.data.extend(item_result.data) + # Собираем items_with_source от дочерних элементов. + result.items_with_source.extend(item_result.items_with_source) + # Обновляем result_limit result_limit -= len(item_result.data) @@ -556,6 +608,7 @@ async def get_data( }, ), has_next_page=default_res.has_next_page, + items_with_source=list(default_res.items_with_source), ) # Получаем список позиций с учетом текущей страницы. @@ -602,10 +655,21 @@ async def get_data( # Формируем общие данные позиционного мерджера. for i, post in enumerate(pos_res.data): result.data = result.data[: page_positions[i] - 1] + [post] + result.data[page_positions[i] - 1 :] + # Вставляем items_with_source на соответствующие позиции. + if i < len(pos_res.items_with_source): + insert_pos = page_positions[i] - 1 + if insert_pos < 0: + insert_pos = 0 + result.items_with_source = ( + result.items_with_source[:insert_pos] + + [pos_res.items_with_source[i]] + + result.items_with_source[insert_pos:] + ) # Проверка на возврат данных в количестве не более limit. if len(result.data) > limit: result.data = result.data[:limit] + result.items_with_source = result.items_with_source[:limit] # Обновляем страницу для курсора пагинации мерджера. result.next_page.data[self.merger_id].page += 1 @@ -701,9 +765,10 @@ async def get_data( """ # Формируем результат процентного мерджера. - result = FeedResult(data=[], next_page=FeedResultNextPage(data={}), has_next_page=False) + result = FeedResult(data=[], next_page=FeedResultNextPage(data={}), has_next_page=False, items_with_source=[]) items_data: List = [] + items_with_source_data: List[List[FeedResultItem]] = [] for item in self.items: # Получаем данные из позиций процентного мерджера. item_result = await item.data.get_data( @@ -717,6 +782,7 @@ async def get_data( # Добавляем данные позиции в список данных позиций. items_data.append(item_result.data) + items_with_source_data.append(item_result.items_with_source) # Если has_next_page = False, то проверяем has_next_page у позиции и, если необходимо, обновляем. if not result.has_next_page and item_result.has_next_page: @@ -727,6 +793,8 @@ async def get_data( # Добавляем данные позиции к общему результату процентного мерджера. result.data = await self._merge_items_data(items_data=items_data) + # Мержим items_with_source таким же образом. + result.items_with_source = await self._merge_items_data(items_data=items_with_source_data) # Если в конфигурации указано "смешать" данные. if self.shuffle: @@ -855,6 +923,7 @@ async def get_data( }, ), has_next_page=False, + items_with_source=[], ) # Получаем список лимитов данных и соотношений согласно странице и градиенту. @@ -883,6 +952,8 @@ async def get_data( from_start_index = 0 to_start_index = 0 + from_source_start_index = 0 + to_source_start_index = 0 for lp_data in limits_and_percents["percentages"]: # Высчитываем лимиты для каждой позиции исходя из процентного соотношения. from_end_index = (lp_data["limit"] * lp_data["from"] // 100) + from_start_index @@ -892,9 +963,17 @@ async def get_data( result.data.extend(item_from.data[from_start_index:from_end_index]) result.data.extend(item_to.data[to_start_index:to_end_index]) + # Добавляем items_with_source. + from_source_end_index = min(from_end_index, len(item_from.items_with_source)) + to_source_end_index = min(to_end_index, len(item_to.items_with_source)) + result.items_with_source.extend(item_from.items_with_source[from_source_start_index:from_source_end_index]) + result.items_with_source.extend(item_to.items_with_source[to_source_start_index:to_source_end_index]) + # Обновляем стартовые индексы. from_start_index = from_end_index to_start_index = to_end_index + from_source_start_index = from_source_end_index + to_source_start_index = to_source_end_index # Обновляем next_page. result.next_page.data.update(item_from.next_page.data) @@ -982,7 +1061,7 @@ async def get_data( """ # Формируем результат append мерджера. - result = FeedResult(data=[], next_page=FeedResultNextPage(data={}), has_next_page=False) + result = FeedResult(data=[], next_page=FeedResultNextPage(data={}), has_next_page=False, items_with_source=[]) result_limit = limit for item in self.items: @@ -999,6 +1078,9 @@ async def get_data( # Добавляем данные позиции к общему результату процентного мерджера. result.data.extend(item_result.data) + # Собираем items_with_source от дочерних элементов. + result.items_with_source.extend(item_result.items_with_source) + # Обновляем result_limit result_limit -= len(item_result.data) @@ -1028,6 +1110,7 @@ class SubFeed(BaseFeedConfigModel): method_name название клиентского метода для получения данных субфида. subfeed_params статичные параметры для метода субфида. shuffle флаг для перемешивания полученных данных мерджера. + priority приоритет субфида для дедупликации (меньше = выше приоритет). """ subfeed_id: str @@ -1036,6 +1119,7 @@ class SubFeed(BaseFeedConfigModel): subfeed_params: Dict[str, Any] = {} raise_error: Optional[bool] = True shuffle: bool = False + priority: int = 0 async def get_data( self, @@ -1097,10 +1181,22 @@ async def get_data( if self.shuffle: shuffle(method_result.data) + # Формируем items_with_source для дедупликации. + items_with_source = [ + FeedResultItem( + item=item, + source_id=self.subfeed_id, + priority=self.priority, + position=i, + ) + for i, item in enumerate(method_result.data) + ] + result = FeedResult( data=method_result.data, next_page=FeedResultNextPage(data={self.subfeed_id: method_result.next_page}), has_next_page=method_result.has_next_page, + items_with_source=items_with_source, ) return result @@ -1115,10 +1211,16 @@ class FeedConfig(BaseModel): session_size размер кэшируемого фида (limit получения данных для сохранения в кэш). session_live_time срок хранения в кэше для кэшируемого фида (в секундах). feed мерджер или субфид. + deduplicate флаг дедупликации по приоритетам между субфидами. + dedup_key ключ для идентификации дублей (атрибут или ключ словаря). + dedup_session_ttl TTL для хранения seen_ids в Redis (в секундах). """ version: str feed: FeedTypes + deduplicate: bool = False + dedup_key: Optional[str] = None + dedup_session_ttl: int = 300 # Update Forward Refs diff --git a/tests/fixtures/configs.py b/tests/fixtures/configs.py index 8c96e4e..6282107 100644 --- a/tests/fixtures/configs.py +++ b/tests/fixtures/configs.py @@ -9,6 +9,12 @@ "error": ClientMixerClass().error_method, "doubles": ClientMixerClass().doubles_method, "posted": ClientMixerClass().keys_method, + "dedup_a": ClientMixerClass().dedup_method_a, + "dedup_b": ClientMixerClass().dedup_method_b, + "dedup_c": ClientMixerClass().dedup_method_c, + "dedup_no_overlap": ClientMixerClass().dedup_no_overlap_method, + "placeholder_tours": ClientMixerClass().placeholder_tours, + "regular_tours": ClientMixerClass().regular_tours, } PARSING_CONFIG_FIXTURE = { diff --git a/tests/test_deduplication.py b/tests/test_deduplication.py new file mode 100644 index 0000000..118598a --- /dev/null +++ b/tests/test_deduplication.py @@ -0,0 +1,1356 @@ +""" +Тесты для дедупликации по приоритетам в SmartFeed. + +Структура тестовых данных (методы из example_client.py): + +dedup_method_a (dedup_a): + Возвращает: a1, a2, a3, a4, common1, a6, a7, a8, a9, common2, a11, ... + Паттерн: каждый 5-й элемент = common{i//5}, остальные = a{i} + source="A", value=i + +dedup_method_b (dedup_b): + Возвращает: b1, b2, common1, b4, b5, common2, b7, b8, common3, ... + Паттерн: каждый 3-й элемент = common{i//3}, остальные = b{i} + source="B", value=i*10 + +dedup_method_c (dedup_c): + Возвращает: c1, common1, c3, common2, c5, common3, ... + Паттерн: каждый 2-й элемент = common{i//2}, остальные = c{i} + source="C", value=i*100 + +dedup_no_overlap_method: + Возвращает: unique_{user_id}_1, unique_{user_id}_2, ... + Нет пересечений между субфидами. + +ads (example_method): + Возвращает: {user_id}_1, {user_id}_2, ... + Строки, не словари. +""" + +import pytest + +from smartfeed.manager import FeedManager +from smartfeed.schemas import FeedResultNextPage, FeedResultNextPageInside +from tests.fixtures.configs import METHODS_DICT +from tests.fixtures.redis import redis_client # noqa: F401 + +# ============================================================================ +# Простые кейсы +# ============================================================================ + + +@pytest.mark.asyncio +async def test_dedup_basic() -> None: + """ + Тест базовой дедупликации: 2 субфида с разными приоритетами, есть дубли. + + Субфиды: + subfeed_a (priority=1): a1, a2, a3, a4, common1, a6, a7, a8, a9, common2, ... + subfeed_b (priority=2): b1, b2, common1, b4, b5, common2, ... + + Дубли: common1, common2, ... присутствуют в обоих субфидах. + + Ожидаемый результат: + - common элементы остаются только из subfeed_a (priority=1, выше приоритет) + - Все common элементы имеют source="A" + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, # Высший приоритет + }, + { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, # Низший приоритет + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + # Проверяем, что результат содержит элементы + assert len(result.data) == 10 + + # Проверяем, что common элементы имеют source="A" (приоритет 1) + common_items = [item for item in result.data if item["id"].startswith("common")] + for item in common_items: + assert item["source"] == "A", f"Common item {item['id']} should be from source A (priority 1)" + + +@pytest.mark.asyncio +async def test_dedup_same_priority() -> None: + """ + Тест: 2 субфида с одинаковым приоритетом — дубли НЕ удаляются. + + Субфиды: + subfeed_a (priority=1): a1, a2, a3, a4, common1, a6, ... (source="A") + subfeed_b (priority=1): b1, b2, common1, b4, b5, ... (source="B") + + Дубли: common1 присутствует в обоих субфидах с ОДИНАКОВЫМ приоритетом. + + Ожидаемый результат: + - common1 остаётся из ОБОИХ источников (2 элемента) + - Правило: одинаковый приоритет = дубли не удаляются + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_percentage", + "items": [ + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, # Одинаковый приоритет + }, + }, + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 1, # Одинаковый приоритет + }, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=20, + next_page=FeedResultNextPage(data={}), + ) + + # Ищем common1 элементы — должно быть 2 (из обоих источников) + common1_items = [item for item in result.data if item["id"] == "common1"] + + # При одинаковом приоритете оба элемента должны остаться + assert len(common1_items) == 2, "Both common1 items should remain with same priority" + + +@pytest.mark.asyncio +async def test_dedup_no_duplicates() -> None: + """ + Тест: субфиды без дублей — данные не меняются. + + Субфиды: + subfeed_a (priority=1): unique_user1_1, unique_user1_2, unique_user1_3, ... + subfeed_b (priority=2): unique_user1_1, unique_user1_2, ... (те же данные!) + + Примечание: dedup_no_overlap возвращает unique_{user_id}_{i}, так что + с одним user_id данные будут одинаковые. Но limit_to_return ограничивает. + + Ожидаемый результат: + - Данные получены + - Количество <= limit + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_no_overlap", + "priority": 1, + "subfeed_params": {"limit_to_return": 5}, + }, + { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_no_overlap", + "priority": 2, + "subfeed_params": {"limit_to_return": 5}, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="user1", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + # С разными user_id элементы уникальны + # Но здесь user_id одинаковый, так что проверяем что данные есть + assert len(result.data) <= 10 + + +@pytest.mark.asyncio +async def test_dedup_disabled() -> None: + """ + Тест: deduplicate=False — дубли остаются. + + Субфиды: + subfeed_a (priority=1): a1, a2, a3, a4, common1, ... (source="A") + subfeed_b (priority=2): b1, b2, common1, ... (source="B") + + Дубли: common1 присутствует в обоих. + + Ожидаемый результат: + - deduplicate=False, поэтому дедупликация НЕ применяется + - common1 присутствует из ОБОИХ источников (2 элемента) + """ + config = { + "version": "1", + "deduplicate": False, # Дедупликация выключена + "feed": { + "merger_id": "test_merger", + "type": "merger_percentage", + "items": [ + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + }, + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=20, + next_page=FeedResultNextPage(data={}), + ) + + # Без дедупликации common1 должен быть из обоих источников + common1_items = [item for item in result.data if item.get("id") == "common1"] + assert len(common1_items) == 2, "Both common1 items should remain when dedup is disabled" + + +# ============================================================================ +# Кейсы с курсорами +# ============================================================================ + + +@pytest.mark.asyncio +async def test_dedup_cursor_movement() -> None: + """ + Тест: курсор двигается правильно, элементы не повторяются между страницами. + + Субфиды: + subfeed_a (priority=1): a1, a2, a3, a4, common1, a6, a7, ... + subfeed_b (priority=2): b1, b2, common1, b4, b5, ... + + Ожидаемый результат: + - Страница 1: 10 элементов, курсоры обновлены + - Страница 2: другие 10 элементов + - Элементы между страницами НЕ пересекаются (межстраничная дедупликация через seen_ids) + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + + # Первая страница + result1 = await manager.get_data( + user_id="test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + assert len(result1.data) == 10 + assert "subfeed_a" in result1.next_page.data or "subfeed_b" in result1.next_page.data + + # Вторая страница с курсорами + result2 = await manager.get_data( + user_id="test", + limit=10, + next_page=result1.next_page, + ) + + # Проверяем что вторая страница не пустая + assert len(result2.data) > 0 + + # Проверяем что элементы не повторяются между страницами + ids1 = {item["id"] for item in result1.data} + ids2 = {item["id"] for item in result2.data} + assert ids1.isdisjoint(ids2), "Pages should not have overlapping items" + + +@pytest.mark.asyncio +async def test_dedup_cursor_partial_use() -> None: + """ + Тест: items_with_source заполняется корректно. + + Субфиды: + subfeed_a (priority=1): a1, a2, a3, a4, common1, ... + subfeed_b (priority=2): b1, b2, common1, ... + + Ожидаемый результат: + - items_with_source содержит информацию о каждом элементе + - Количество items_with_source = количество data + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + # Проверяем что items_with_source заполнен + assert len(result.items_with_source) == len(result.data) + + +# ============================================================================ +# Нетривиальные кейсы +# ============================================================================ + + +@pytest.mark.asyncio +async def test_dedup_three_subfeeds_cascade() -> None: + """ + Тест: 3 субфида с приоритетами 1, 2, 3 — элемент есть во всех трех. + + Субфиды: + subfeed_a (priority=1): a1, a2, a3, a4, common1, ... (source="A") + subfeed_b (priority=2): b1, b2, common1, ... (source="B") + subfeed_c (priority=3): c1, common1, c3, common2, ... (source="C") + + Дубли: common1 присутствует во ВСЕХ трёх субфидах. + + Ожидаемый результат: + - common1 остаётся только из subfeed_a (priority=1, самый высокий) + - Только 1 элемент common1 с source="A" + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + { + "subfeed_id": "subfeed_c", + "type": "subfeed", + "method_name": "dedup_c", + "priority": 3, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=20, + next_page=FeedResultNextPage(data={}), + ) + + # common1 есть во всех трех источниках, должен остаться только из A + common1_items = [item for item in result.data if item["id"] == "common1"] + assert len(common1_items) == 1 + assert common1_items[0]["source"] == "A" + + +@pytest.mark.asyncio +async def test_dedup_mixed_priorities() -> None: + """ + Тест: субфиды с приоритетами 1, 1, 2 — дубли между приоритетом 1 остаются. + + Субфиды: + subfeed_a1 (priority=1): a1, a2, a3, a4, common1, ... (source="A") + subfeed_a2 (priority=1): a1, a2, a3, a4, common1, ... (source="A", те же данные!) + subfeed_b (priority=2): b1, b2, common1, ... (source="B") + + Дубли: + - a1 из subfeed_a1 и subfeed_a2 (одинаковый приоритет 1) + - common1 из всех трёх + + Ожидаемый результат: + - a1 остаётся из ОБОИХ subfeed_a1 и subfeed_a2 (одинаковый приоритет = не удаляем) + - common1 из subfeed_b (priority=2) удаляется, остаются из priority=1 + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_percentage", + "items": [ + { + "percentage": 34, + "data": { + "subfeed_id": "subfeed_a1", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + }, + { + "percentage": 33, + "data": { + "subfeed_id": "subfeed_a2", + "type": "subfeed", + "method_name": "dedup_a", # Тот же метод, те же данные + "priority": 1, + }, + }, + { + "percentage": 33, + "data": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=20, + next_page=FeedResultNextPage(data={}), + ) + + # a1 есть в обоих субфидах с приоритетом 1 — оба должны остаться + a1_items = [item for item in result.data if item["id"] == "a1"] + assert len(a1_items) == 2, "Both a1 items with same priority should remain" + + +@pytest.mark.asyncio +async def test_dedup_percentage_merger() -> None: + """ + Тест: дедупликация в процентном мерджере (MergerPercentage). + + Субфиды (50/50): + subfeed_a (priority=1): a1, a2, a3, a4, common1, ... (source="A") + subfeed_b (priority=2): b1, b2, common1, ... (source="B") + + MergerPercentage берёт 50% из каждого субфида и чередует. + + Ожидаемый результат: + - Данные из обоих субфидов + - common элементы только из subfeed_a (priority=1) + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_percentage", + "items": [ + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + }, + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + # Проверяем что данные есть + assert len(result.data) > 0 + + # Проверяем что common элементы из A (приоритет 1) + common_items = [item for item in result.data if item["id"].startswith("common")] + for item in common_items: + assert item["source"] == "A" + + +@pytest.mark.asyncio +async def test_dedup_nested_mergers() -> None: + """ + Тест: вложенные мерджеры — MergerAppend внутри MergerPercentage. + + Структура: + MergerPercentage (60/40): + - 60%: MergerAppend + - subfeed_a (priority=1): a1, a2, ..., common1, ... (source="A") + - 40%: subfeed_b (priority=2): b1, b2, common1, ... (source="B") + + Дубли: common1 есть в обоих. + + Ожидаемый результат: + - Дедупликация работает на всех уровнях вложенности + - common элементы из subfeed_a (priority=1) + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "outer_merger", + "type": "merger_percentage", + "items": [ + { + "percentage": 60, + "data": { + "merger_id": "inner_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + ], + }, + }, + { + "percentage": 40, + "data": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + assert len(result.data) > 0 + + # common элементы должны быть из A + common_items = [item for item in result.data if item["id"].startswith("common")] + for item in common_items: + assert item["source"] == "A" + + +@pytest.mark.asyncio +async def test_dedup_positional_merger() -> None: + """ + Тест: MergerPositional с дедупликацией. + + Структура: + MergerPositional (позиции 3, 6, 9): + - positional (priority=1): a1, a2, ..., common1, ... (source="A") + - default (priority=2): b1, b2, common1, ... (source="B") + + MergerPositional вставляет positional элементы на позиции [3, 6, 9], + остальные позиции заполняются из default. + + Дубли: common1 есть в обоих. + + Ожидаемый результат: + - Данные получены из обоих источников + - Все id уникальны после дедупликации + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "positional_merger", + "type": "merger_positional", + "positions": [3, 6, 9], + "positional": { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, # Высокий приоритет для positional + }, + "default": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + assert len(result.data) > 0 + + # Проверяем что items_with_source заполнен + assert len(result.items_with_source) == len(result.data) + + # Проверяем что есть элементы из обоих источников + sources = {item.source_id for item in result.items_with_source} + assert len(sources) >= 1 # Минимум один источник + + # Проверяем что нет дублей по id (все id уникальны после дедупликации) + ids = [item["id"] for item in result.data] + assert len(ids) == len(set(ids)), "All ids should be unique after deduplication" + + +@pytest.mark.asyncio +async def test_dedup_items_with_source_populated() -> None: + """ + Тест: items_with_source правильно заполняется метаданными. + + Субфиды: + subfeed_a (priority=1): a1, a2, ..., common1, ... + subfeed_b (priority=2): b1, b2, common1, ... + + Ожидаемый результат: + - items_with_source содержит для каждого элемента: + - source_id: "subfeed_a" или "subfeed_b" + - priority: 1 или 2 + - item: сам элемент данных + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + # items_with_source должен быть заполнен + assert len(result.items_with_source) == len(result.data) + + # Каждый элемент в items_with_source должен иметь source_id и priority + for item_info in result.items_with_source: + assert item_info.source_id in ["subfeed_a", "subfeed_b"] + assert item_info.priority in [1, 2] + assert item_info.item is not None + + +@pytest.mark.asyncio +async def test_dedup_without_dedup_key() -> None: + """ + Тест: дедупликация без dedup_key — используется сам элемент как ключ. + + Субфиды: + subfeed_a (priority=1): "x_1", "x_2", "x_3", ... (строки) + subfeed_b (priority=2): "x_1", "x_2", "x_3", ... (те же строки) + + Метод "ads" возвращает строки "{user_id}_{i}", не словари. + dedup_key=None означает, что сам элемент используется как ключ. + + Ожидаемый результат: + - Дубли удаляются (x_1 из subfeed_b удаляется, остаётся из subfeed_a) + - Все элементы уникальны + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": None, # Без ключа — сам элемент + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "ads", # Возвращает строки x_1, x_2... + "priority": 1, + }, + { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "ads", # Те же данные + "priority": 2, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="x", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + # Дубли должны быть удалены (разные приоритеты) + # Все элементы x_1, x_2... должны быть уникальны + assert len(result.data) == len(set(result.data)) + + +@pytest.mark.asyncio +async def test_dedup_has_next_page() -> None: + """ + Тест: has_next_page корректно работает с дедупликацией. + + Субфиды: + subfeed_a (priority=1): a1, a2, a3, a4, common1, a6, a7, ... (много данных) + + Ожидаемый результат: + - limit=5, но данных больше + - has_next_page = True + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + result = await manager.get_data( + user_id="test", + limit=5, + next_page=FeedResultNextPage(data={}), + ) + + # Должна быть следующая страница (данных больше чем limit) + assert result.has_next_page is True + + +# ============================================================================ +# Тесты для корректного движения курсора +# ============================================================================ + + +@pytest.mark.asyncio +async def test_cursor_moves_to_last_used_element() -> None: + """ + Тест: курсор устанавливается на последний ИСПОЛЬЗОВАННЫЙ элемент, а не на запрошенный. + + Сценарий: + - limit=5, но запрашиваем с запасом (fetch_limit ≈ 10) + - Субфид возвращает 10 элементов: a1, a2, a3, a4, common1, a6, a7, a8, a9, common2 + - Используем только 5: a1, a2, a3, a4, common1 + - Курсор (after) должен указывать на common1 (5-й элемент), а не на common2 (10-й) + + Ожидаемый результат: + - На странице 1 показаны элементы 1-5 + - Курсор указывает на 5-й элемент + - На странице 2 показаны элементы 6-10 (a6, a7, a8, a9, common2) + - Элементы 6-10 НЕ пропущены + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + + # Страница 1: получаем 5 элементов + result1 = await manager.get_data( + user_id="test", + limit=5, + next_page=FeedResultNextPage(data={}), + ) + + assert len(result1.data) == 5 + page1_ids = [item["id"] for item in result1.data] + + # Страница 2: получаем следующие 5 элементов + result2 = await manager.get_data( + user_id="test", + limit=5, + next_page=result1.next_page, + ) + + assert len(result2.data) > 0 + page2_ids = [item["id"] for item in result2.data] + + # Проверяем что страницы не пересекаются + assert set(page1_ids).isdisjoint(set(page2_ids)), "Pages should not overlap" + + # Проверяем что не пропущены элементы между страницами + # Первый элемент страницы 2 должен идти сразу после последнего элемента страницы 1 + # (в порядке приоритета и позиции) + + +@pytest.mark.asyncio +async def test_cursor_no_elements_lost_with_duplicates() -> None: + """ + Тест: при удалении дублей не теряются элементы между страницами. + + Сценарий: + - 2 субфида с разными приоритетами + - Субфид A (priority=1): a1, a2, a3, a4, common1, a6, ... + - Субфид B (priority=2): b1, b2, common1, b4, b5, common2, ... + - limit=10 + - common1 из B удаляется (дубль) + + Ожидаемый результат: + - Курсор субфида B должен указывать на последний ИСПОЛЬЗОВАННЫЙ элемент из B + - На следующей странице элементы B не пропущены + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_percentage", + "items": [ + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + }, + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + + # Собираем все элементы с 3 страниц + all_ids: set = set() + next_page = FeedResultNextPage(data={}) + + for page_num in range(3): + result = await manager.get_data( + user_id="test_cursor", + limit=10, + next_page=next_page, + ) + + page_ids = {item["id"] for item in result.data} + + # Проверяем что нет пересечений с предыдущими страницами + overlap = all_ids.intersection(page_ids) + assert len(overlap) == 0, f"Page {page_num + 1} overlaps with previous: {overlap}" + + all_ids.update(page_ids) + next_page = result.next_page + + if not result.has_next_page: + break + + # Проверяем что собрали достаточно уникальных элементов + assert len(all_ids) >= 20, f"Should collect at least 20 unique items, got {len(all_ids)}" + + +@pytest.mark.asyncio +async def test_cursor_after_points_to_last_used() -> None: + """ + Тест: поле after в курсоре указывает на последний использованный элемент. + + Сценарий: + - Один субфид, limit=3 + - Субфид возвращает: a1, a2, a3, a4, common1, ... + - Используем только a1, a2, a3 + + Ожидаемый результат: + - after в курсоре = a3 (или его словарное представление) + - Следующий запрос начнётся с a4 + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + + # Страница 1 + result1 = await manager.get_data( + user_id="test", + limit=3, + next_page=FeedResultNextPage(data={}), + ) + + assert len(result1.data) == 3 + last_item_page1 = result1.data[-1] + + # Проверяем что after указывает на последний элемент страницы + cursor = result1.next_page.data.get("subfeed_a") + assert cursor is not None + assert cursor.after == last_item_page1, "Cursor after should point to last used item" + + # Страница 2 + result2 = await manager.get_data( + user_id="test", + limit=3, + next_page=result1.next_page, + ) + + # Первый элемент страницы 2 должен быть следующим после последнего элемента страницы 1 + first_item_page2 = result2.data[0] + assert first_item_page2["id"] != last_item_page1["id"], "Page 2 should start with next item" + + +@pytest.mark.asyncio +async def test_cross_page_dedup_between_subfeeds() -> None: + """ + Тест: межстраничная дедупликация между разными субфидами. + + Сценарий: + - Страница 1: subfeed_a возвращает a1, a2, a3, a4, common1, a6, a7, a8, a9, common2 + - Страница 2: subfeed_b возвращает b1, b2, common1, b4, b5, common2, ... + - common1 и common2 уже были показаны на странице 1 (из subfeed_a) + + Ожидаемый результат: + - На странице 2 common1 и common2 из subfeed_b НЕ показываются + - seen_ids хранит ID элементов между страницами + - Элементы b1, b2, b4, b5 показываются (они уникальны) + + Это проверяет что seen_ids работает между страницами и между субфидами. + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_append", + "items": [ + { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_a", + "priority": 1, + }, + { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_b", + "priority": 2, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + + # Страница 1: получаем элементы (в основном из subfeed_a из-за MergerAppend) + # dedup_a: a1, a2, a3, a4, common1, a6, a7, a8, a9, common2, ... + result1 = await manager.get_data( + user_id="cross_page_test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + page1_ids = {item["id"] for item in result1.data} + + # Проверяем что на странице 1 есть common элементы + common_on_page1 = {id for id in page1_ids if id.startswith("common")} + assert len(common_on_page1) > 0, "Page 1 should have some common elements" + + # Страница 2 + result2 = await manager.get_data( + user_id="cross_page_test", + limit=10, + next_page=result1.next_page, + ) + + page2_ids = {item["id"] for item in result2.data} + + # КЛЮЧЕВАЯ ПРОВЕРКА: common элементы со страницы 1 НЕ должны быть на странице 2 + duplicates_shown = page1_ids.intersection(page2_ids) + assert len(duplicates_shown) == 0, ( + f"Elements from page 1 should NOT appear on page 2. " f"Duplicates found: {duplicates_shown}" + ) + + # Дополнительно: проверяем что страница 2 не пустая + assert len(result2.data) > 0, "Page 2 should have elements" + + +@pytest.mark.asyncio +async def test_page_filled_despite_many_duplicates() -> None: + """ + Тест: страница гарантированно заполняется до limit, даже если много дублей. + + Сценарий: + - limit=10 + - subfeed_a и subfeed_b возвращают много общих элементов (common1, common2, ...) + - subfeed_c (dedup_c) возвращает: c1, common1, c3, common2, c5, common3, ... + (каждый второй элемент - common) + + Ожидаемый результат: + - Страница заполнена ровно до limit (10 элементов) + - Даже если первые запросы вернули много дублей, цикл продолжает + запрашивать пока не наберёт нужное количество + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_percentage", + "items": [ + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_c", # Много common элементов + "priority": 1, + }, + }, + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_c", # Те же данные = много дублей! + "priority": 2, + }, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + + result = await manager.get_data( + user_id="fill_test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + # Страница должна быть заполнена до limit + assert len(result.data) == 10, f"Page should have exactly 10 items, got {len(result.data)}" + + # Все ID должны быть уникальны + ids = [item["id"] for item in result.data] + assert len(ids) == len(set(ids)), "All IDs should be unique" + + +@pytest.mark.asyncio +async def test_cursor_unused_subfeed_not_advanced() -> None: + """ + Тест: если субфид не использован (все элементы удалены как дубли), его курсор не двигается. + + Сценарий: + - Субфид A (priority=1): common1, common2, common3, ... + - Субфид B (priority=2): common1, common2, common3, ... (те же элементы!) + - Все элементы B удалены как дубли + + Ожидаемый результат: + - Курсор субфида B откатывается (page - 1) + - На следующей странице B снова пробует те же элементы + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "feed": { + "merger_id": "test_merger", + "type": "merger_percentage", + "items": [ + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_a", + "type": "subfeed", + "method_name": "dedup_c", # common1, c1, common2, c3, ... + "priority": 1, + }, + }, + { + "percentage": 50, + "data": { + "subfeed_id": "subfeed_b", + "type": "subfeed", + "method_name": "dedup_c", # Те же данные! + "priority": 2, + }, + }, + ], + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT) + + result = await manager.get_data( + user_id="test", + limit=10, + next_page=FeedResultNextPage(data={}), + ) + + # Все элементы из subfeed_b должны быть удалены как дубли (одинаковые данные, ниже приоритет) + # Проверяем что курсор subfeed_b не продвинулся (page <= 1 или откатился) + cursor_b = result.next_page.data.get("subfeed_b") + if cursor_b: + # Если элементы из B не использованы, page должен быть 1 или меньше + # (в зависимости от реализации) + assert cursor_b.page <= 2, "Cursor for unused subfeed should not advance much" + + +@pytest.mark.parametrize("redis_client", ["sync", "async"], indirect=True) +@pytest.mark.asyncio +async def test_dedup_positional_with_view_session(redis_client) -> None: + """ + Тест дедупликации с MergerPositional + MergerViewSession. + + Конфигурация: + - MergerPositional с placeholder_tours на позициях [1, 3, 5, 7] + - MergerViewSession с regular_tours как default (session_size=200) + - placeholder_tours: возвращает {"id": "placeholder_1", ...}, {"id": "placeholder_2", ...}, ... + - regular_tours: первые 10 элементов дублируются с placeholder (id: "placeholder_1" до "placeholder_10") + - priority: placeholder_tours = 1 (высший), MergerViewSession = 0 (по умолчанию) + + Ожидаемое поведение: + - MergerPositional сначала вставляет placeholder туры на позиции [1, 3, 5, 7] + - Затем заполняет остальные позиции из MergerViewSession + - Дедупликация удаляет дубли: так как placeholder_tours (priority=1) имеет более высокий приоритет, + элементы placeholder_1 до placeholder_10 из regular_tours будут удалены + - Но так как MergerViewSession уже закэшировал данные, он не может дозапросить новые элементы + - Поэтому в результате будут только placeholder элементы из positional и tour_11+ из regular_tours + - Страница будет заполнена до limit=15 + """ + config = { + "version": "1", + "deduplicate": True, + "dedup_key": "id", + "dedup_session_ttl": 300, + "feed": { + "merger_id": "serp_fast_main", + "type": "merger_positional", + "positions": [1, 3, 5, 7], + "positional": { + "subfeed_id": "placeholder_tours", + "type": "subfeed", + "method_name": "placeholder_tours", + "priority": 1, # Высший приоритет + "subfeed_params": {}, + }, + "default": { + "merger_id": "serp_fast_session", + "type": "merger_view_session", + "session_size": 200, + "session_live_time": 300, + "data": { + "subfeed_id": "regular_tours", + "type": "subfeed", + "method_name": "regular_tours", + "priority": 3, # Низший приоритет + "subfeed_params": {}, + }, + }, + }, + } + + manager = FeedManager(config=config, methods_dict=METHODS_DICT, redis_client=redis_client) + result = await manager.get_data( + user_id="test_user", + limit=15, + next_page=FeedResultNextPage(data={}), + ) + + # Проверяем что получили 15 элементов + assert len(result.data) == 15, f"Expected 15 items, got {len(result.data)}" + + # Отладочный вывод + print("\n=== Результат ===") + for i, item in enumerate(result.data): + print(f" [{i}] {item['id']} (type: {item['type']})") + + # Проверяем что дубли удалены + all_ids = [item["id"] for item in result.data] + + # Считаем сколько раз встречается каждый id + from collections import Counter + + id_counts = Counter(all_ids) + + # Проверяем что нет дублей + for item_id, count in id_counts.items(): + assert count == 1, f"Item {item_id} appears {count} times, expected 1" + + # Проверяем что результат содержит placeholder_1 до placeholder_10 + # (так как они имеют priority=1 и должны вытеснить дубли из regular_tours) + # и tour_11 до tour_15 (уникальные элементы из regular_tours) + expected_ids = [f"placeholder_{i}" for i in range(1, 11)] + [f"tour_{i}" for i in range(11, 16)] + assert all_ids == expected_ids, f"Expected {expected_ids}, got {all_ids}" + + # Проверяем что has_next_page = True (есть еще данные) + assert result.has_next_page is True + + print(f"✅ Test passed! Got {len(result.data)} items") + print(f" IDs: {all_ids[:10]}...") + print(f" Deduplication worked correctly!")