Skip to content

Latest commit

 

History

History
1596 lines (1312 loc) · 63 KB

File metadata and controls

1596 lines (1312 loc) · 63 KB

Reckit - AI Coding 指南

项目概述

Reckit 是一个工业级推荐系统工具库,采用 Pipeline + Node 架构,通过接口抽象实现高度可扩展性。

包名github.com/rushteam/reckit

工程目标

场景分工:

  • 深度模型、复杂梯度更新 → 适用工具:PyTorch/Tensorflow
  • 高并发、低延迟、统计计算 → 适用工具:Golang(Reckit)

本项目采用 PyTorch/Tensorflow + Golang 的分工模式:

  • PyTorch/Tensorflow:负责深度模型的训练、复杂梯度更新等机器学习任务
  • Golang (Reckit):负责高并发推荐服务、低延迟在线推理、统计计算等生产环境任务

核心设计原则

  • 所有策略、算法、配置都通过接口实现,不使用字符串匹配
  • 用户可以通过实现接口扩展功能,无需修改库代码
  • 使用 string 类型作为通用 ID(支持所有 ID 格式)
  • 所有默认值通过配置接口提供,无硬编码

核心架构

Request → Context → Recall → Filter → Rank → ReRank → PostProcess → Response

架构层次

  1. 应用层:业务逻辑和 Pipeline 配置
  2. 领域层:核心抽象(Node、Source、Model、Service)
  3. 基础设施层:存储、向量库、ML 服务集成

关键接口

核心接口(领域层接口在 core 包)

// Pipeline Node(所有处理单元的基础接口)
type Node interface {
    Name() string
    Kind() Kind  // recall / filter / rank / rerank / postprocess
    Process(ctx context.Context, rctx *core.RecommendContext, items []*core.Item) ([]*core.Item, error)
}

// 召回源接口
type Source interface {
    Name() string
    Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
}

// 排序模型接口(本地轻量模型:LR/GBDT 等)
// 远程推理请使用 core.MLService + RPCNode,或通过 model.MLServiceAdapter 桥接
type RankModel interface {
    Name() string
    Predict(features map[string]float64) (float64, error)
}

// MLServiceAdapter 将 MLService 适配为 RankModel(显式桥接)
func MLServiceAdapter(name string, svc core.MLService) RankModel

// 过滤器接口(逐条过滤)
type Filter interface {
    Name() string
    ShouldFilter(ctx context.Context, rctx *core.RecommendContext, item *core.Item) (bool, error)
}

// 批量过滤器接口(可选实现,适用于需要批量查询外部服务的场景)
// FilterNode 优先调用 BatchFilter.FilterBatch,降级到 Filter.ShouldFilter
type BatchFilter interface {
    Filter
    FilterBatch(ctx context.Context, rctx *core.RecommendContext, items []*core.Item) ([]*core.Item, error)
}

// Context 扩展接口(可插拔扩展点)
type Extension interface {
 ExtensionName() string // 全局唯一标识,如 "aippy.abtest"
}
// 泛型辅助:类型安全地获取 Extension
func ExtensionAs[T Extension](rctx *RecommendContext, name string) (T, bool)

// 存储接口(领域层接口,在 core 包)
type Store interface {
    Get(ctx context.Context, key string) ([]byte, error)
    Set(ctx context.Context, key string, value []byte, ttl int64) error
    // ...
}

// 有序集合带分数查询(可选扩展,在 core 包)
// 实现此接口后 SortedSetRecall 可获取分数并支持正/倒序。
type SortedSetRangeStore interface {
    ZRangeWithScores(ctx context.Context, key string, start, stop int64) ([]ScoredMember, error)
    ZRevRangeWithScores(ctx context.Context, key string, start, stop int64) ([]ScoredMember, error)
}
type ScoredMember struct { Member string; Score float64 }

// 特征服务接口(领域层接口,在 core 包)
type FeatureService interface {
    GetUserFeatures(ctx context.Context, userID string) (map[string]float64, error)
    BatchGetUserFeatures(ctx context.Context, userIDs []string) (map[string]map[string]float64, error)
    GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)
    BatchGetItemFeatures(ctx context.Context, itemIDs []string) (map[string]map[string]float64, error)
    GetRealtimeFeatures(ctx context.Context, userID, itemID string) (map[string]float64, error)
    BatchGetRealtimeFeatures(ctx context.Context, pairs []FeatureUserItemPair) (map[FeatureUserItemPair]map[string]float64, error)
 Close(ctx context.Context) error
}

// 向量服务接口(领域层接口,在 core 包)
type VectorService interface {
    Search(ctx context.Context, req *VectorSearchRequest) (*VectorSearchResult, error)
 Close(ctx context.Context) error
}

// 向量数据库服务接口(领域层接口,在 core 包)
type VectorDatabaseService interface {
    VectorService  // 嵌入召回场景接口
    Insert(ctx context.Context, req *VectorInsertRequest) error
    Update(ctx context.Context, req *VectorUpdateRequest) error
    Delete(ctx context.Context, req *VectorDeleteRequest) error
    CreateCollection(ctx context.Context, req *VectorCreateCollectionRequest) error
    DropCollection(ctx context.Context, collection string) error
    HasCollection(ctx context.Context, collection string) (bool, error)
}

// ML 服务接口(领域层接口,在 core 包)
type MLService interface {
    Predict(ctx context.Context, req *MLPredictRequest) (*MLPredictResponse, error)
    Health(ctx context.Context) error
 Close(ctx context.Context) error
}

策略接口(可扩展)

// 合并策略(recall/merge_strategy.go)
type MergeStrategy interface {
    Merge(items []*core.Item, dedup bool) []*core.Item
}
// 去重策略:FirstMergeStrategy, UnionMergeStrategy, PriorityMergeStrategy
// 混排策略:WeightedScoreMergeStrategy, QuotaMergeStrategy, RatioMergeStrategy, RoundRobinMergeStrategy, WaterfallMergeStrategy
// 组合策略:ChainMergeStrategy(串联多个策略,前一个输出作为下一个输入)

// 召回层错误处理策略(recall/error_handler.go)
type ErrorHandler interface {
    HandleError(source Source, err error, rctx *core.RecommendContext) ([]*core.Item, error)
}
// 内置实现:
//   IgnoreErrorHandler    - 忽略错误(支持 OnError 回调)
//   RetryErrorHandler     - 重试 N 次(支持 OnRetry / OnGiveUp 回调)
//   FallbackErrorHandler  - 降级到备用召回源(支持 OnFallback 回调)

// 排序策略(rank/lr_node.go)
type SortStrategy interface {
    Sort(items []*core.Item)
}
// 内置实现:ScoreDescSortStrategy, ScoreAscSortStrategy, MultiFieldSortStrategy

// 相似度计算器(recall/collaborative_filtering.go)
type SimilarityCalculator interface {
    Calculate(x, y []float64) float64
}
// 内置实现:CosineSimilarity, PearsonCorrelation

// Label 合并策略(pkg/utils/label.go)
type LabelMergeStrategy interface {
    Merge(existing, incoming Label) Label
}
// 内置实现:DefaultLabelMergeStrategy, PriorityLabelMergeStrategy

// Pipeline Hook(pipeline/pipeline.go)
type PipelineHook interface {
    BeforeNode(ctx context.Context, rctx *core.RecommendContext, node Node, items []*core.Item) ([]*core.Item, error)
    AfterNode(ctx context.Context, rctx *core.RecommendContext, node Node, items []*core.Item, err error) ([]*core.Item, error)
}

// Pipeline 全局错误钩子(pipeline/error_hook.go)
// 当 Node 出错时,Pipeline 依次调用所有 ErrorHook:
//   - 任一返回 recovered=true → 跳过该 Node 继续执行(降级)
//   - 全部返回 false → Pipeline 终止
// 所有 ErrorHook 都会被调用(便于 metrics/alerting 采集完整数据)。
type ErrorHook interface {
    OnNodeError(ctx context.Context, rctx *core.RecommendContext, node Node, err error) (recovered bool)
}
// 内置实现:
//   WarnAndSkipHook       - 日志 + 跳过(始终降级)
//   KindRecoveryHook      - 按 Node Kind 选择性降级 + OnError 回调
//   ErrorCallbackHook     - 仅上报 metrics/alerting,不降级
//   CompositeErrorHook    - 组合多个 ErrorHook

// 条件接口(filter/conditional.go)
type Condition interface {
    Evaluate(ctx context.Context, rctx *core.RecommendContext) (bool, error)
}
// 便捷函数适配:ConditionFunc

// 多样性约束(rerank/diversity.go,高级模式)
type DiversityConstraint struct {
    Dimensions          []string  // 组合维度
    MaxConsecutive      int       // 最大连续同组数
    WindowSize          int       // 滑动窗口大小
    MaxPerWindow        int       // 窗口内同组最大出现次数
    Weight              float64   // 权重回退选择用
    MultiValueDelimiter string    // 多值拆分符
}
// Diversity.Constraints 非空时启用高级模式

// 分数标准化模式(rerank/vecmath.go)
// ScoreNormNone / ScoreNormZScore / ScoreNormMinMax

// Bandit 统计接口(rerank/bandit.go)
type BanditStatsProvider interface {
    BatchGetStats(ctx context.Context, rctx *core.RecommendContext, itemIDs []string) (map[string]BanditStats, error)
}
type BanditStats struct { Impressions, Conversions int64 }
// 供 UCBNode / ThompsonSamplingNode / ColdStartBoostNode 使用

// 曝光频次存储接口(filter/frequency_cap.go)
type FrequencyCapStore interface {
    GetImpressionCount(ctx context.Context, userID, itemID string, window time.Duration) (int, error)
}

// 后处理补位函数(postprocess/padding.go)
type PaddingFunc func(ctx context.Context, rctx *core.RecommendContext, need int) ([]*core.Item, error)

// 特征元数据加载器接口(feature/metadata_loader.go)
type MetadataLoader interface {
    Load(ctx context.Context, source string) (*FeatureMetadata, error)
}
// 内置实现:FileMetadataLoader, HTTPMetadataLoader, S3MetadataLoader

// 特征标准化器加载器接口(feature/metadata_loader.go)
type ScalerLoader interface {
    Load(ctx context.Context, source string) (FeatureScaler, error)
}
// 内置实现:FileScalerLoader, HTTPScalerLoader, S3ScalerLoader

// S3 兼容协议客户端接口(feature/oss_loader.go)
type S3Client interface {
    GetObject(ctx context.Context, bucket, key string) (io.ReadCloser, error)
}
// 支持 AWS S3、阿里云 OSS、腾讯云 COS、MinIO 等

配置接口

// 通用召回配置(core/config.go)
type RecallConfig interface {
    DefaultTopKItems() int
    DefaultTimeout() time.Duration
}

// 协同过滤专用配置(core/config.go)
type CFConfig interface {
    RecallConfig
    DefaultTopKSimilarUsers() int
    DefaultMinCommonItems() int
    DefaultMinCommonUsers() int
}
// 默认实现:DefaultRecallConfig(同时满足 RecallConfig 和 CFConfig)

迁移说明(RecallConfig → CFConfig)UserBasedCF.ConfigItemBasedCF.Config 的字段类型从 core.RecallConfig 改为 core.CFConfig。 如果你的代码将配置存储在 core.RecallConfig 类型的变量中再传给 CF struct,需要改为 core.CFConfig

// Before(编译报错)
var cfg core.RecallConfig = &MyConfig{}
u2i := &recall.UserBasedCF{Config: cfg}

// After(修复方式)
var cfg core.CFConfig = &MyConfig{}
u2i := &recall.UserBasedCF{Config: cfg}

如果你使用 &core.DefaultRecallConfig{} 字面量或自定义 struct 直接赋值,无需任何改动—— 只要你的 struct 实现了原来的 5 个方法,它自动满足 CFConfig

迁移说明(User 泛化 + Attributes)

  1. RecommendContext.User*UserProfile 改为 any(透传容器,框架不直接读取)
  2. Attributes map[string]any 是框架读取用户数据的标准通道(特征、行为序列等)
  3. GetUserProfile() 已移除;UserProfileFromMap() 已移除
  4. UserProfile struct 保留为可选便利实现,框架不依赖
// Before
rctx.User.Age                              // 直接访问强类型字段
rctx.User.RecentClicks                     // 框架内置 Node 自动读取

// After
rctx.Attributes["age"]                     // 通过 Attributes 传递
rctx.Attributes["recent_clicks"]           // 框架内置 Node 从 Attributes 读取
up := rctx.User.(*MyUserStruct)            // 自定义 Node 中 type assert

行为序列(RecentClicks):框架内置 Node(DIN、BERT、Word2Vec 等) 从 Attributes["recent_clicks"] 读取,或通过 HistoryFunc/BehaviorFunc 配置。

核心数据结构

RecommendContext

type RecommendContext struct {
    UserID   string
    DeviceID string
    Scene    string

    User       any                    // 调用方透传的用户对象,框架不直接读取
    Attributes map[string]any         // 用户级属性(框架读取用户数据的标准通道)
    Labels     map[string]utils.Label // 用户级标签
    Params     map[string]any         // 请求级上下文参数

    ext map[string]Extension          // 可插拔扩展(懒初始化,private)
}

// Extension 操作方法
rctx.SetExtension(e Extension)                       // 注册扩展(nil-safe)
rctx.GetExtension(name string) (Extension, bool)     // 按名称获取
core.ExtensionAs[T](rctx, name) (T, bool)            // 泛型类型安全获取

Item

type Item struct {
    ID       string
    Score    float64
    Features map[string]float64
    Meta     map[string]any
    Labels   map[string]utils.Label

    LabelMergeStrategy utils.LabelMergeStrategy  // 可选,自定义 Label 合并策略
}

// GetValue 统一查找(Labels > Meta > Features),各模块无需重复实现多字段查找。
func (it *Item) GetValue(key string) (string, bool)

Item 三字段分工

字段 类型 语义 消费者 典型内容
Features map[string]float64 模型特征(数值型) Rank 模型 (model.Predict) ctr, cvr, price, user_age
Meta map[string]any 业务元数据(任意类型) 业务逻辑、展示层、过滤 title, category, author, cover_url
Labels map[string]Label 策略标签(带来源追踪) 策略引擎、Diversity、DSL、可解释性 recall_source, recall_priority, boost_reason
  • Features:由 feature.EnrichNode 注入,供排序模型直接消费,不带来源信息
  • Meta:业务侧自行填充,存储展示、过滤所需的结构化信息,值类型不限
  • Labels:推荐链路各阶段(recall / rank / rerank / rule)写入,每个 Label 记录 Value + Source,同名 key 按合并策略(默认 | 拼接)自动合并

字段查找优先级Diversity 等节点通过 getValue(item, key) 读取字段时,按 Labels > Meta > Features 的优先级查找。离散类别(如 category、author)建议放 Labels 或 Meta;仅当特征编码为数值放在 Features 时,才会 fallback 到 Features(float64 自动转为字符串)。

UserProfile(可选参考实现)

core.UserProfile 是框架提供的便利 struct,不是强制要求。 框架内置 Node 不依赖此类型,业务方可使用自己的 User struct。

type UserProfile struct {
    UserID        string
    Gender        string
    Age           int
    Location      string
    Interests     map[string]float64
    RecentClicks  []string
    RecentImpress []string
    PreferTags    map[string]float64
    Buckets       map[string]string
    Extras        map[string]any
    UpdateTime    time.Time
}

目录结构

github.com/rushteam/reckit/
├── core/              # 核心数据结构(Item, Context, UserProfile, Config)
├── pipeline/          # Pipeline 和 Node 接口
├── recall/            # 召回模块(Source, Fanout, CF, ANN, Content 等)
├── filter/            # 过滤模块(Blacklist, UserBlock, Exposed, Expr, QualityGate, DedupField, TimeDecay, FrequencyCap, ConditionalNode 等)
├── rank/              # 排序模块(LR, DNN, DIN, RPC 等)
├── rerank/            # 重排模块(Diversity、DPP、SSD、Sample、FairInterleave、WeightedInterleave、GroupQuota、TrafficPlan、ScoreAdjust、RecallChannelMix、EpsilonGreedy、UCB、ThompsonSampling、ColdStartBoost 等)
├── postprocess/       # 后处理模块(Padding、TruncateFields)
├── model/             # 排序模型抽象和实现
├── feature/           # 特征服务(Enrich, Service, Provider)
├── store/             # 存储抽象(Memory,Redis 移至扩展包)
├── vector/             # 向量服务接口(Milvus 移至扩展包)
├── service/           # ML 服务(TF Serving, ANN Service)
├── feature/           # 特征服务实现(接口在 core.FeatureService)
├── config/            # Pipeline 配置工厂
├── ext/                # 扩展包目录(独立 go.mod)
│   ├── store/
│   │   └── redis/     # Redis 存储实现
│   ├── feast/
│   │   ├── http/      # Feast HTTP 客户端实现(适配为 core.FeatureService)
│   │   └── grpc/      # Feast gRPC 客户端实现(适配为 core.FeatureService)
│   └── vector/
│       └── milvus/    # Milvus 向量数据库实现
└── pkg/
    ├── utils/         # Label 工具
    ├── dsl/           # Label DSL 表达式引擎
    └── conv/          # 类型转换与泛型工具(ToFloat64、ConfigGet、MapToFloat64 等)

扩展包说明

核心包 github.com/rushteam/reckit 无外部依赖,只保留工具库(CEL、YAML、sync)。

具体实现已移至 ext/ 扩展包,每个扩展包有独立的 go.mod,用户按需引入:

  • Redis Store: go get github.com/rushteam/reckit/ext/store/redis
  • Feast gRPC: go get github.com/rushteam/reckit/ext/feast/grpc
  • Milvus Vector: go get github.com/rushteam/reckit/ext/vector/milvus

详见 ext/README.md

关键文件位置

核心文件

  • core/item.go - Item 定义和 Label 操作
  • core/context.go - RecommendContext 定义
  • core/user_profile.go - UserProfile 定义
  • core/config.go - 配置接口定义
  • core/extension.go - Extension 接口和 ExtensionAs 泛型函数
  • pipeline/node.go - Node 接口定义
  • pipeline/pipeline.go - Pipeline 执行器和 Hook
  • pipeline/error_hook.go - ErrorHook 接口和内置实现(WarnAndSkip / KindRecovery / ErrorCallback)
  • pipeline/config.go - 配置加载和工厂

召回模块

  • recall/source.go - Source 接口
  • recall/fanout.go - 多路并发召回(Fanout 同时实现 Source 接口支持嵌套)
  • recall/merge_strategy.go - MergeStrategy 接口和所有实现(First/Union/Priority/WeightedScore/Quota/Ratio/RoundRobin/Waterfall/Chain)
  • recall/error_handler.go - ErrorHandler 接口和实现(Ignore/Retry/Fallback,均支持回调)
  • recall/collaborative_filtering.go - U2I/I2I 协同过滤
  • recall/ann.go - Embedding ANN 召回
  • recall/content.go - 内容推荐
  • recall/matrix_factorization.go - 矩阵分解召回
  • recall/word2vec_recall.go - Word2Vec / Item2Vec 召回(文本模式 + 序列模式)
  • recall/bert_recall.go - BERT 召回(基于语义相似度)
  • recall/sorted_set.go - 通用有序集合召回(SortedSetRecall),含 NewHotRecall/NewTrendingRecall/NewLatestRecall/NewTopRatedRecall/NewEditorPickRecall 等便捷构造器
  • recall/user_history.go - 用户历史召回(UserHistoryStore 返回带 Score 的 ScoredHistoryItem,支持多行为合并时按时间排序)

过滤模块

  • filter/filter.go - Filter / BatchFilter 接口
  • filter/node.go - FilterNode(组合多个 Filter;BatchFilter 优先整批过滤,再逐条过滤)
  • filter/blacklist.go - 黑名单过滤
  • filter/user_block.go - 用户级屏蔽过滤
  • filter/exposed.go - 已曝光过滤
  • filter/expr_filter.go - ExprFilter(CEL/DSL 表达式过滤;Invert 翻转语义)
  • filter/quality_gate.go - QualityGateFilter(分数门槛:Score < MinScore 过滤)
  • filter/dedup_field.go - DedupByFieldFilterBatchFilter 实现,按字段去重保留首条)
  • filter/time_decay.go - TimeDecayFilter(按 Meta[TimeField] 时间戳过滤过期内容)
  • filter/frequency_cap.go - FrequencyCapFilterFrequencyCapStore 接口,user-item 粒度曝光频次控制)
  • filter/conditional.go - ConditionalNode(条件节点:Condition 为 true 时执行内部 Node,否则透传;适用于 AB 实验、场景开关)

排序模块

  • rank/lr_node.go - LR 排序节点(含排序策略)
  • rank/rpc_node.go - RPC 排序节点
  • rank/dnn_node.go - DNN 排序节点
  • rank/din_node.go - DIN 排序节点(BehaviorFunc 配置行为序列)
  • rank/wide_deep_node.go - Wide&Deep 排序节点
  • rank/two_tower_node.go - Two Tower 排序节点
  • model/model.go - RankModel 接口 + MLServiceAdapter(MLService → RankModel 桥接)
  • model/lr.go - LR 模型实现
  • model/rpc.go - RPC 模型实现
  • model/word2vec.go - Word2Vec 模型实现
  • model/bert.go - BERT 模型实现

重排模块

  • rerank/diversity.go - 多样性重排,三种模式:① 类别去重 ② 多 key 打散 ③ 高级 Constraints(多规则独立约束 + 权重回退 + 多值维度 + 通道隔离 + 探索限制)
  • rerank/dpp_diversity.go - DPPDiversityNode(基于 Determinantal Point Process 的 embedding 级多样性重排;Greedy MAP + 窗口化 DPP)
  • rerank/ssd_diversity.go - SSDDiversityNode(滑动子空间多样性,Gram-Schmidt 正交化;比 DPP 更轻量,适合在线低延迟)
  • rerank/vecmath.go - DPP / SSD 共享的向量数学工具(dot、norm、scale 等,无外部依赖)
  • rerank/topn.go - TopN 截断
  • rerank/sample.go - SampleNode(采样降量;Shuffle=true Fisher-Yates 洗牌后取前 N,适用于增加探索性)
  • rerank/fair_interleave.go - FairInterleaveNode(按召回通道等权轮询交叉,组内按 Score 降序)
  • rerank/weighted_interleave.go - WeightedInterleaveNode(按通道权重交叉混排;residual 贪心算法分配曝光比例)
  • rerank/group_quota.go - GroupQuotaNode(按维度字段或 CEL 表达式分组,softmax/avg 配额分配 + GroupMin/GroupMax/GroupCaps 约束)
  • rerank/traffic_plan.go - TrafficPlanNode + TrafficPlanner(调控 id/位次写入 LabelKeyTrafficControlID / LabelKeyTrafficSlot,可选重排)
  • rerank/score_adjust.go - ScoreAdjustFilter / CEL 规则改分)与 ScoreWeightBoost + ScoreWeightProvider(按 ID 外部权重)
  • rerank/recall_channel_mix.go - RecallChannelMix(精排后按召回通道固定/随机槽位混排)
  • rerank/epsilon_greedy.go - EpsilonGreedyNode(ε-贪心探索:以概率 Epsilon 将 explore 池中的物品提到 exploit 区)
  • rerank/bandit.go - UCBNode(UCB1 公式重排,需 BanditStatsProvider)+ ThompsonSamplingNode(Beta-Bernoulli 汤普森采样,PureExplore 可选纯探索模式)+ BanditStatsProvider / BanditStats 接口
  • rerank/cold_start_boost.go - ColdStartBoostNode(新物品提权:曝光 < Threshold 时按线性衰减加成)

后处理模块

  • postprocess/padding.go - PaddingNode(结果不足 N 条时用 FallbackItems / PaddingFunc 补足,去重并打 __padding__ 标签)
  • postprocess/truncate_fields.go - TruncateFieldsNode(响应前裁剪 Features / Meta / Labels,可选 KeepMetaKeys 仅保留指定 Meta key)

recall.MergeStrategy 的分工MergeStrategy 作用于召回阶段,负责多路结果的合并、去重、加权与配额RecallChannelMix 作用于精排之后,依赖物品上的 recall_source(默认取合并标签的首段,见 PrimaryRecallChannel),按规则做槽位占位与剩余策略,用于运营位次/通道曝光。二者互补,不要混用职责。

YAML 构建器config/builders,需 _ "github.com/rushteam/reckit/config/builders"):

  • Recallrecall.fanoutrecall.hot / recall.sorted_set(通用有序集合)、recall.annDependencies.VectorService)、recall.u2iDependencies.RecallDataStore)、recall.i2i(同上)、recall.content(同上)、recall.mf(同上)、recall.user_historyDependencies.UserHistoryStore)、recall.word2vecDependencies.Word2VecModel)、recall.bertDependencies.BERTModel + BERTStore)、recall.two_towerDependencies.MLService + VectorService)、recall.youtube_dnnendpoint + Dependencies.VectorService)、recall.dssmendpoint + Dependencies.VectorService)、recall.rpcendpoint)、recall.graphendpoint
  • Filterfilter(含 blacklist / user_block / exposed / expr / quality_gate / dedup_field / time_decay / frequency_cap)、filter.conditional
  • Rankrank.lrrank.rpcrank.wide_deeprank.two_towerrank.dnnrank.din
  • ReRankrerank.diversity(含 constraints 高级模式)、rerank.dpp_diversityrerank.ssd_diversityrerank.topnrerank.samplererank.fair_interleavererank.weighted_interleavererank.group_quotarerank.traffic_planrerank.score_adjustrerank.score_weightrerank.recall_channel_mixrerank.mmoe
  • Explore/Exploitrerank.epsilon_greedyrerank.ucbDependencies.BanditStatsProvider)、rerank.thompson_samplingDependencies.BanditStatsProvider)、rerank.cold_start_boost
  • PostProcesspostprocess.paddingDependencies.PaddingFunc)、postprocess.truncate_fields
  • Featurefeature.enrich

特征模块

  • core/feature_service.go - FeatureService 领域接口
  • feature/service.go - FeatureService 实现(BaseFeatureService)
  • feature/enrich.go - 特征注入节点
  • feature/store_provider.go - 存储特征提供者
  • feature/metadata.go - 特征元数据和标准化器定义
  • feature/metadata_loader.go - 特征元数据加载器接口和文件实现
  • feature/http_loader.go - HTTP 接口加载器实现
  • feature/oss_loader.go - S3 兼容协议加载器实现(支持 AWS S3、阿里云 OSS、腾讯云 COS、MinIO 等)
  • feature/processing.go - 特征处理工具类(归一化、分箱、交叉特征等)
  • feature/encoder.go - 特征编码工具类(One-Hot、Label、Hash、Embedding 等)

工具模块

  • pkg/utils/label.go - Label 定义和合并策略
  • pkg/dsl/eval.go - Label DSL 表达式引擎
  • pkg/conv/conv.go - 类型转换与泛型工具(ToFloat64、ToInt、ToString、ConfigGet、MapToFloat64、SliceAnyToString 等)

使用模式

1. 创建 Pipeline

p := &pipeline.Pipeline{
    Nodes: []pipeline.Node{
        // 召回
        &recall.Fanout{
            Sources: []recall.Source{
                recall.NewHotRecall(nil, "", 0), // 或 &recall.SortedSetRecall{IDs: [...]}
                &recall.U2IRecall{...},
            },
            Dedup:         true,
            MergeStrategy: &recall.PriorityMergeStrategy{},
        },
        // 过滤
        &filter.FilterNode{
            Filters: []filter.Filter{
                filter.NewBlacklistFilter(...),
            },
        },
        // 特征注入
        &feature.EnrichNode{
            FeatureService: featureService, // core.FeatureService 接口
        },
        // 排序
        &rank.LRNode{
            Model:        lrModel,
            SortStrategy: &rank.ScoreDescSortStrategy{},
        },
    },
    Hooks: []pipeline.PipelineHook{
        &LoggingHook{},
    },
}

items, err := p.Run(ctx, rctx, nil)

2. 创建召回源

config := &core.DefaultRecallConfig{}
u2i := &recall.U2IRecall{
    Store:                cfStore,
    TopKSimilarUsers:     10,
    TopKItems:            20,
    SimilarityCalculator: &recall.CosineSimilarity{},
    Config:                config,
}

2.1 通用有序集合召回(SortedSetRecall)

SortedSetRecall 是 Hot / Trending / Latest / TopRated 等业务召回的底层抽象: 从 Store 有序集合按指定方向拉取 TopK 个 item ID 及分数。

import "github.com/rushteam/reckit/recall"

// 方式 1:便捷构造器(推荐)—— 命名清晰、语义明确
hotRecall      := recall.NewHotRecall(store, "hot:feed", 100)       // 热门(降序)
trendingRecall := recall.NewTrendingRecall(store, "trending:feed", 50) // 趋势(降序)
latestRecall   := recall.NewLatestRecall(store, "latest:feed", 50)  // 最新(降序)
topRatedRecall := recall.NewTopRatedRecall(store, "rating:feed", 50) // 高分(降序)
editorRecall   := recall.NewEditorPickRecall(store, "editor:pick", 20) // 编辑推荐

// 方式 2:使用 KeyPrefix,key 按 scene 自动拼接
// 当 rctx.Scene = "video" 时,实际 key = "hot:video"
hotByScene := &recall.SortedSetRecall{
    Store: store, KeyPrefix: "hot", TopK: 100,
    Order: recall.OrderDesc, NodeName: "recall.hot",
}

// 方式 3:升序场景(如价格最低)
cheapest := &recall.SortedSetRecall{
    Store: store, Key: "price:items", TopK: 50,
    Order: recall.OrderAsc, NodeName: "recall.cheapest",
}

// 方式 4:无 Store,使用静态 ID 列表(开发/测试)
static := &recall.SortedSetRecall{
    IDs: []string{"item1", "item2", "item3"},
    NodeName: "recall.static",
}

// 在 Fanout 中使用
fanout := &recall.Fanout{
    Sources: []recall.Source{hotRecall, trendingRecall, latestRecall},
    Dedup: true,
    MergeStrategy: &recall.RatioMergeStrategy{
        SourceRatios: map[string]float64{
            "recall.hot": 0.5, "recall.trending": 0.3, "recall.latest": 0.2,
        },
        TotalLimit: 100,
    },
}

数据读取优先级SortedSetRangeStore(带分数+双向)→ KeyValueStore.ZRange(降序、无分数)→ Store.Get(JSON 数组)→ IDs(静态列表)。 当 Store 实现 core.SortedSetRangeStore 时,item.Score 自动填充有序集合分数。

YAML 配置

- type: "recall.sorted_set"
  config:
    key_prefix: "hot"
    top_k: 100
    order: "desc"
    name: "recall.hot"

3. 自定义特征抽取器

import "github.com/rushteam/reckit/feature"

// 方式 1:使用默认抽取器(带前缀)
extractor := feature.NewDefaultFeatureExtractor(
    feature.WithFeatureService(featureService),
    feature.WithFieldPrefix("user_"),
)

// 方式 2:完全自定义
customExtractor := feature.NewCustomFeatureExtractor(
    "my_extractor",
    func(ctx context.Context, rctx *core.RecommendContext) (map[string]float64, error) {
        features := make(map[string]float64)
        // 自定义逻辑:从多个源组合特征
        if rctx.User != nil {
            features["age"] = float64(rctx.User.Age)
        }
        // 从外部服务获取
        externalFeatures, _ := externalService.GetFeatures(ctx, rctx.UserID)
        for k, v := range externalFeatures {
            features["external_"+k] = v
        }
        return features, nil
    },
)

// 在召回源中使用
twoTowerRecall := recall.NewTwoTowerRecall(
    featureService,
    userTowerService,
    vectorService,
    recall.WithTwoTowerUserFeatureExtractor(extractor), // 或 customExtractor
)

详细文档:见 feature/EXTRACTOR_GUIDE.md

4. 使用内置合并策略

// 加权分数:按源权重调整分数后排序
fanout := &recall.Fanout{
    Sources: sources, Dedup: true,
    MergeStrategy: &recall.WeightedScoreMergeStrategy{
        SourceWeights: map[string]float64{"recall.hot": 2.0, "recall.cf": 1.0, "recall.ann": 1.5},
        TopN: 100,
    },
}

// 固定配额:每源取固定数量
fanout.MergeStrategy = &recall.QuotaMergeStrategy{
    SourceQuotas: map[string]int{"recall.hot": 10, "recall.cf": 20, "recall.ann": 15},
}

// 比例配比:按比例分配总量
fanout.MergeStrategy = &recall.RatioMergeStrategy{
    SourceRatios: map[string]float64{"recall.hot": 0.2, "recall.cf": 0.3, "recall.ann": 0.5},
    TotalLimit:   100,
}

// 轮询交叉:各源轮流取(适合信息流多样性)
fanout.MergeStrategy = &recall.RoundRobinMergeStrategy{
    SourceOrder: []string{"recall.cf", "recall.ann", "recall.hot"},
    TopN: 50,
}

// 瀑布级联:高优先级源优先填满,不足时低优先级源兜底
fanout.MergeStrategy = &recall.WaterfallMergeStrategy{
    SourcePriority: []string{"recall.cf", "recall.ann", "recall.hot"},
    TotalLimit:     100,
    SourceLimits:   map[string]int{"recall.hot": 20}, // 热门最多 20 条
}

// 组合策略:先加权调分,再按配额截取
fanout.MergeStrategy = &recall.ChainMergeStrategy{
    Strategies: []recall.MergeStrategy{
        &recall.WeightedScoreMergeStrategy{
            SourceWeights: map[string]float64{"recall.hot": 2.0, "recall.cf": 1.0},
        },
        &recall.QuotaMergeStrategy{
            SourceQuotas: map[string]int{"recall.hot": 10, "recall.cf": 30},
        },
    },
}

5. 嵌套 Fanout(树形召回拓扑)

Fanout 同时实现 Node 和 Source 接口,可嵌套在另一个 Fanout 中作为子召回源。 通过 NodeName 区分不同层级(影响 recall_source label)。

topFanout := &recall.Fanout{
    NodeName: "recall.top",
    Sources: []recall.Source{
        &recall.Fanout{
            NodeName: "recall.personalized",
            Sources:  []recall.Source{cfRecall, annRecall},
            Dedup:    true,
            MergeStrategy: &recall.RatioMergeStrategy{
                SourceRatios: map[string]float64{"recall.cf": 0.4, "recall.ann": 0.6},
                TotalLimit:   60,
            },
        },
        &recall.Fanout{
            NodeName: "recall.non_personalized",
            Sources:  []recall.Source{hotRecall, trendingRecall},
            Dedup:    true,
            MergeStrategy: &recall.FirstMergeStrategy{},
        },
    },
    Dedup: true,
    MergeStrategy: &recall.QuotaMergeStrategy{
        SourceQuotas: map[string]int{
            "recall.personalized":     60,
            "recall.non_personalized": 30,
        },
    },
}

6. 自定义策略

// 自定义合并策略
type CustomMergeStrategy struct{}
func (s *CustomMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item {
    // 自定义逻辑
}

// 使用
fanout := &recall.Fanout{
    MergeStrategy: &CustomMergeStrategy{},
}

7. 配置驱动 Pipeline + 自定义 Node 注册

reckit 提供 Node 注册表 + 工厂函数 模式,支持:

  • 用户注册自定义 node type,YAML 中直接引用
  • YAML config 字段原样透传给工厂函数(map[string]interface{}
  • 通过闭包捕获外部依赖(Redis client、Repository 等),实现依赖注入

核心 API

API 说明
config.Register(typeName, builder) 全局注册 Node 构建器(推荐在 init 或启动阶段调用)
config.DefaultFactory() 返回包含所有已注册 Node 的 NodeFactory
config.SupportedTypes() 查看已注册类型列表
config.ValidatePipelineConfig(cfg) 校验 YAML 中引用的 type 是否全部已注册

注册自定义 Node(含依赖注入)

import "github.com/rushteam/reckit/config"

// 1. 初始化外部依赖
redisClient := redis.NewClient(...)
itemRepo := repository.NewItemRepo(db)

// 2. 注册自定义 Node —— 闭包捕获依赖,config 字段由 YAML 透传
config.Register("custom.boost", func(cfg map[string]interface{}) (pipeline.Node, error) {
    return &BoostNode{
        Repo:      itemRepo,                                 // 闭包注入
        Redis:     redisClient,                              // 闭包注入
        BoostRate: conv.ConfigGet(cfg, "boost_rate", 1.5),   // YAML 透传
        MaxItems:  int(conv.ConfigGetInt64(cfg, "max", 100)),
    }, nil
})

YAML 配置引用自定义 Node

pipeline:
  name: "my_feed"
  nodes:
    - type: "recall.fanout"
      config:
        dedup: true
        sources:
          - type: "hot"
            ids: [1, 2, 3]
    - type: "rank.lr"
      config:
        bias: 0.0
        weights: { ctr: 1.2, cvr: 0.8 }
    # 自定义节点:type 对应 config.Register 的 typeName
    # config 下所有字段原样透传给工厂函数
    - type: "custom.boost"
      config:
        boost_rate: 2.0
        max: 50

完整启动流程

import (
    "github.com/rushteam/reckit/config"
    _ "github.com/rushteam/reckit/config/builders" // 注册内置 Node
)

func main() {
    // 1. 初始化依赖
    repo := newMyRepo(db)

    // 2. 注册自定义 Node(闭包捕获依赖)
    config.Register("custom.boost", func(cfg map[string]interface{}) (pipeline.Node, error) {
        return &BoostNode{Repo: repo, BoostRate: conv.ConfigGet(cfg, "boost_rate", 1.5)}, nil
    })

    // 3. 加载 YAML
    cfg, _ := pipeline.LoadFromYAML("pipeline.yaml")

    // 4. 校验配置(可选,提前发现未注册的 type)
    if err := config.ValidatePipelineConfig(cfg); err != nil {
        log.Fatal(err)
    }

    // 5. 构建并运行
    factory := config.DefaultFactory()
    p, _ := cfg.BuildPipeline(factory)
    items, _ := p.Run(ctx, rctx, nil)
}

注意:内置 Node(recall.fanoutrank.lr 等)需要 import _ "github.com/rushteam/reckit/config/builders" 触发 init 注册。

8. 错误处理与降级

reckit 提供两层错误处理机制:

接口 位置 作用域
Pipeline 层 ErrorHook pipeline/error_hook.go 所有 Node
Recall 层 ErrorHandler recall/error_handler.go 单个召回源

Pipeline 全局错误钩子

p := &pipeline.Pipeline{
    Nodes: []pipeline.Node{recall, filter, rank, rerank},
    ErrorHooks: []pipeline.ErrorHook{
        // 1. 上报 metrics(不降级,仅采集)
        &pipeline.ErrorCallbackHook{
            Callback: func(ctx context.Context, node pipeline.Node, err error) {
                metrics.Counter("pipeline.node.error", 1, "node", node.Name(), "kind", string(node.Kind()))
            },
        },
        // 2. 非关键阶段自动降级
        &pipeline.KindRecoveryHook{
            RecoverKinds: map[pipeline.Kind]bool{
                pipeline.KindReRank:      true, // 重排失败 → 跳过
                pipeline.KindPostProcess: true, // 后处理失败 → 跳过
            },
        },
    },
}

当 Node 出错时执行流:

  1. Pipeline 依次调用所有 ErrorHook(不论前一个返回什么)
  2. 若任一 ErrorHook 返回 recovered=true → 跳过该 Node,用上一步 items 继续
  3. 若全部返回 false → Pipeline 终止并返回错误
  4. 未配置 ErrorHooks 时行为与之前完全一致(fail-fast)

内置 ErrorHook:

实现 行为
WarnAndSkipHook 日志输出 + 始终跳过(适合开发/测试)
KindRecoveryHook 按 Node Kind 选择性降级 + OnError 回调
ErrorCallbackHook 仅调用回调上报,不降级
CompositeErrorHook 组合多个 ErrorHook

召回源错误处理

fanout := &recall.Fanout{
    Sources: []recall.Source{mainRecall, annRecall},
    ErrorHandler: &recall.RetryErrorHandler{
        MaxRetries: 2,
        RetryDelay: 100 * time.Millisecond,
        OnRetry: func(src recall.Source, attempt int, err error) {
            log.Printf("recall %s retry #%d: %v", src.Name(), attempt, err)
        },
        OnGiveUp: func(src recall.Source, err error) {
            metrics.Counter("recall.giveup", 1, "source", src.Name())
        },
    },
}

所有 ErrorHandler 均支持回调字段,方便接入 metrics/alerting:

实现 行为 回调字段
IgnoreErrorHandler 忽略错误,返回空结果 OnError
RetryErrorHandler 重试 N 次,全部失败则降级 OnRetry, OnGiveUp
FallbackErrorHandler 降级到备用召回源 OnFallback

扩展指南

添加新的召回算法

  1. 实现 recall.Source 接口
  2. (可选)在工厂中注册
type MyRecall struct {
    Store MyStore
}

func (r *MyRecall) Name() string { return "recall.my" }
func (r *MyRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error) {
    // 实现召回逻辑
    items := []*core.Item{}
    // ...
    return items, nil
}

添加新的过滤器

实现 filter.Filter 接口(逐条过滤):

type MyFilter struct{}

func (f *MyFilter) Name() string { return "filter.my" }
func (f *MyFilter) ShouldFilter(ctx context.Context, rctx *core.RecommendContext, item *core.Item) (bool, error) {
    // 返回 true 表示过滤,false 表示保留
    return false, nil
}

如果过滤逻辑涉及批量查询外部服务(如批量查 Redis 已曝光),额外实现 filter.BatchFilter 接口:

type MyBatchFilter struct{}

func (f *MyBatchFilter) Name() string { return "filter.my_batch" }

// ShouldFilter 作为降级路径(非 Fanout 场景、或 FilterBatch 出错时)
func (f *MyBatchFilter) ShouldFilter(ctx context.Context, rctx *core.RecommendContext, item *core.Item) (bool, error) {
    // 逐条判断逻辑
    return false, nil
}

// FilterBatch 批量过滤,返回保留的 item 列表
func (f *MyBatchFilter) FilterBatch(ctx context.Context, rctx *core.RecommendContext, items []*core.Item) ([]*core.Item, error) {
    // 一次性批量查询外部服务,O(1) 次网络调用
    ids := make([]string, len(items))
    for i, it := range items {
        ids[i] = it.ID
    }
    blocked, _ := myStore.BatchCheck(ctx, rctx.UserID, ids)
    
    out := make([]*core.Item, 0, len(items))
    for _, it := range items {
        if !blocked[it.ID] {
            out = append(out, it)
        }
    }
    return out, nil
}

FilterNode 执行顺序:先执行所有 BatchFilter.FilterBatch(整批过滤),再对剩余 item 逐条执行普通 Filter.ShouldFilter

添加新的排序模型

  1. 实现 model.RankModel 接口
  2. 创建对应的 Rank Node
type MyModel struct{}

func (m *MyModel) Name() string { return "my_model" }
func (m *MyModel) Predict(features map[string]float64) (float64, error) {
    // 实现预测逻辑
    return 0.5, nil
}

type MyRankNode struct {
    Model model.RankModel
}

func (n *MyRankNode) Name() string { return "rank.my" }
func (n *MyRankNode) Kind() pipeline.Kind { return pipeline.KindRank }
func (n *MyRankNode) Process(ctx context.Context, rctx *core.RecommendContext, items []*core.Item) ([]*core.Item, error) {
    for _, item := range items {
        score, _ := n.Model.Predict(item.Features)
        item.Score = score
    }
    return items, nil
}

设计模式

策略模式

  • MergeStrategy - 合并策略(去重:First/Union/Priority;混排:WeightedScore/Quota/Ratio/RoundRobin/Waterfall;组合:Chain)
  • ErrorHandler - 召回层错误处理策略(Ignore / Retry / Fallback,均支持回调)
  • ErrorHook - Pipeline 层全局错误钩子(WarnAndSkip / KindRecovery / ErrorCallback / Composite)
  • SortStrategy - 排序策略
  • SimilarityCalculator - 相似度计算策略
  • RankModel - 排序模型策略

工厂模式

  • NodeFactory - Node 构建工厂(支持动态注册)
  • FeatureServiceFactory - 特征服务工厂(创建 core.FeatureService 实现)
  • MLServiceFactory - ML 服务工厂(创建 core.MLService 实现)

适配器模式

  • MLServiceAdapter - MLService → RankModel 桥接(model/model.go
  • VectorStoreAdapter - 适配向量服务
  • FeatureServiceAdapter - 适配 Feast(位于扩展包 ext/feast/http
  • S3Client - 适配 S3 兼容协议(AWS S3、阿里云 OSS、腾讯云 COS、MinIO 等)

装饰器模式

  • FeatureCache - 特征缓存装饰器
  • FeatureMonitor - 特征监控装饰器
  • FallbackStrategy - 降级策略装饰器

重要注意事项

  1. ID 类型:所有 ID 使用 string 类型(用户 ID、物品 ID)
  2. 必需字段
    • Fanout.MergeStrategy - 必需(如果为 nil,使用默认 FirstMergeStrategy
    • UserBasedCF.SimilarityCalculator - 必需
    • UserBasedCF.Config - 必需
    • ItemBasedCF.SimilarityCalculator - 必需
    • ItemBasedCF.Config - 必需
  3. 接口优先:所有策略都通过接口实现,不使用字符串配置
  4. 无硬编码:所有默认值都从配置接口获取
  5. 线程安全NodeFactory 使用 sync.RWMutex 保证线程安全
  6. 扩展包设计:核心包无外部依赖,具体实现位于扩展包中
    • Redis Store: go get github.com/rushteam/reckit/ext/store/redis
    • Feast HTTP/gRPC: go get github.com/rushteam/reckit/ext/feast/http/grpc
    • Milvus Vector: go get github.com/rushteam/reckit/ext/vector/milvus
    • 用户按需引入,避免不必要的依赖
    • 也可以参考扩展包实现,自行实现对应接口
  7. 领域层接口优先:推荐使用领域层接口(如 core.FeatureService),而非基础设施层接口
    • Feast 应通过适配器适配为 core.FeatureService 使用
    • 领域层接口更通用,不绑定具体实现
  8. 类型转换工具:使用 pkg/conv 进行类型转换,避免手写 switch-case
    • conv.ToFloat64conv.ToIntconv.ToString - 支持多种类型自动转换
    • conv.MapToFloat64 - map[string]any -> map[string]float64
    • conv.SliceAnyToString - []any -> []string(兼容 YAML/JSON)
    • conv.ConfigGet[T]conv.ConfigGetInt64 - 从配置 map 读取值
  9. User 字段为 anyRecommendContext.Userany 类型透传容器,框架内置 Node 不读取
    • 框架通过 Attributes map[string]any 获取用户数据
    • 行为序列通过 Attributes["recent_clicks"]HistoryFunc/BehaviorFunc 传递
    • UserProfile struct 保留为可选参考实现,业务方可自定义
  10. 模型服务协议:在需要约定时尽量采用 KServe v2(Open Inference Protocol) 标准;详见 python/service/KSERVE_V2_ALIGNMENT.md

常用操作

创建 Item

item := core.NewItem("item_123")
item.Score = 0.85
item.Features = map[string]float64{"ctr": 0.15, "cvr": 0.08}
item.PutLabel("recall_source", utils.Label{Value: "hot", Source: "recall"})

创建 Context

rctx := &core.RecommendContext{
    UserID: "user_456",
    Scene:  "feed",
    User:   myUserStruct,  // 业务方自定义 struct,框架不读取
    Attributes: map[string]any{
        "age":            25.0,
        "gender":         1.0,
        "recent_clicks":  []string{"item_1", "item_2"},
        "user_embedding": []float64{0.1, 0.2, 0.3},
    },
}

使用 Extension(Context 扩展)

import "github.com/rushteam/reckit/core"

// 1. 定义扩展(实现 Extension 接口)
type ABTestExt struct {
 Group  string
 Bucket int
}
func (e *ABTestExt) ExtensionName() string { return "aippy.abtest" }

// 2. 注册扩展到 Context
rctx := &core.RecommendContext{UserID: "u1"}
rctx.SetExtension(&ABTestExt{Group: "experiment_a", Bucket: 3})

// 3. 在 Node 中类型安全地获取
ab, ok := core.ExtensionAs[*ABTestExt](rctx, "aippy.abtest")
if ok {
 fmt.Println(ab.Group, ab.Bucket) // "experiment_a" 3
}

// 4. 也可用 GetExtension 获取(需手动 type assert)
e, ok := rctx.GetExtension("aippy.abtest")

使用 Label DSL

eval := dsl.NewEval(item, rctx)
result, _ := eval.Evaluate(`label.recall_source == "hot"`)
result, _ := eval.Evaluate(`item.score > 0.7`)
result, _ := eval.Evaluate(`label.recall_source.contains("ann")`)

使用类型转换工具(pkg/conv)

import "github.com/rushteam/reckit/pkg/conv"

// 类型转换(支持多种数值类型自动转换)
v, _ := conv.ToFloat64(anyVal)  // 支持 float64/float32/int/int64/int32/bool
n, _ := conv.ToInt(anyVal)      // 支持 int/int64/int32/float64/float32
s, _ := conv.ToString(anyVal)

// Map/Slice 转换
weights := conv.MapToFloat64(configMap)  // map[string]any -> map[string]float64
ids := conv.SliceAnyToString(yamlSlice)  // []any -> []string(兼容 YAML/JSON)

// 配置读取(泛型)
bias := conv.ConfigGet[float64](config, "bias", 0.0)
timeout := conv.ConfigGetInt64(config, "timeout", 5)  // 兼容 int/float64
labelKey := conv.ConfigGet[string](config, "label_key", "category")

// 泛型类型断言
t, ok := conv.TypeAssert[MyType](v)

使用 UserProfile 扩展属性

// 设置扩展属性
userProfile.SetExtra("vip_level", 3)
userProfile.SetExtra("preferred_price_range", "100-500")
userProfile.SetExtra("custom_tags", []string{"tech", "gaming"})

// 获取扩展属性(类型转换)
vipLevel, _ := userProfile.GetExtraFloat64("vip_level")
priceRange, _ := userProfile.GetExtraString("preferred_price_range")
purchaseCount, _ := userProfile.GetExtraInt("purchase_history_count")

// 获取扩展属性(泛型,精确类型匹配)
tags, _ := core.GetExtraAs[[]string](userProfile, "custom_tags")
// 注意:GetExtraAs 仅做类型断言,不进行数值转换;数值转换请使用 GetExtraFloat64/GetExtraInt

使用扩展包(Redis、Feast、Milvus)

核心包无外部依赖,具体实现位于扩展包中,需要单独引入:

Redis Store

import (
    "github.com/rushteam/reckit/core"
    redisstore "github.com/rushteam/reckit/ext/store/redis"
)

// 安装:go get github.com/rushteam/reckit/ext/store/redis
store, err := redisstore.NewRedisStore("localhost:6379", 0)
if err != nil {
    log.Fatal(err)
}
defer store.Close(ctx)

// 作为 core.Store 使用
var s core.Store = store

Feast 特征服务(通过适配器)

Feast 是特征存储工具,应通过适配器适配为 core.FeatureService 领域接口使用。

import (
    "github.com/rushteam/reckit/feature"
    feasthttp "github.com/rushteam/reckit/ext/feast/http"
    feastgrpc "github.com/rushteam/reckit/ext/feast/grpc"
)

// 方式 1:使用 HTTP 客户端
// 安装:go get github.com/rushteam/reckit/ext/feast/http
feastClient, _ := feasthttp.NewClient("http://localhost:6566", "my_project")
mapping := &feasthttp.FeatureMapping{
    UserFeatures: []string{"user_stats:age", "user_stats:gender"},
    ItemFeatures: []string{"item_stats:price", "item_stats:category"},
}
featureService := feasthttp.NewFeatureServiceAdapter(feastClient, mapping)

// 方式 2:使用 gRPC 客户端
// 安装:go get github.com/rushteam/reckit/ext/feast/grpc
feastClient, _ := feastgrpc.NewGrpcClient("localhost", 6565, "my_project")
featureService := feasthttp.NewFeatureServiceAdapter(feastClient, mapping)

// 作为 core.FeatureService 使用(领域层接口)
var fs core.FeatureService = featureService

或自行实现:参考扩展包实现,自行实现 core.FeatureService 接口。

Milvus 向量数据库

import (
    "github.com/rushteam/reckit/core"
    milvus "github.com/rushteam/reckit/ext/vector/milvus"
)

// 安装:go get github.com/rushteam/reckit/ext/vector/milvus
milvusService := milvus.NewMilvusService("localhost:19530")

// 作为 core.VectorService 使用(召回场景)
var vectorService core.VectorService = milvusService

// 作为 core.VectorDatabaseService 使用(数据管理场景)
var dbService core.VectorDatabaseService = milvusService

或自行实现:参考扩展包实现,自行实现 core.VectorServicecore.VectorDatabaseService 接口。

使用 Word2Vec / Item2Vec 模型

import "github.com/rushteam/reckit/model"
import "github.com/rushteam/reckit/recall"

// 1. 创建 Word2Vec 模型(从 JSON 或 map;JSON 可由 python/train/train_item2vec.py 导出)
raw, _ := json.Unmarshal(...) // 或内联 map
w2vModel, _ := model.LoadWord2VecFromMap(raw)

// 2. 文本向量化(Word2Vec)
vector := w2vModel.EncodeText("electronics smartphone tech")

// 3. 序列向量化(Item2Vec:用户行为序列)
userVector := w2vModel.EncodeSequence([]string{"item_1", "item_2", "item_3"})

// 4. 召回:文本模式 vs Item2Vec 序列模式
word2vecRecall := &recall.Word2VecRecall{
    Model: w2vModel, Store: store, TopK: 20,
    Mode: "text", TextField: "title",
}
item2vecRecall := &recall.Word2VecRecall{
    Model: w2vModel, Store: store, TopK: 20,
    Mode: "sequence", // Item2Vec
}

Python 训练: python/train/train_item2vec.py --mode item2vec|word2vec,输出 JSON。详见 docs/WORD2VEC_ITEM2VEC.md

使用 Diversity(多样性重排)

import "github.com/rushteam/reckit/rerank"

// 1. 按类别去重(保留每个类别的第一个 item)
categoryDedup := &rerank.Diversity{
    LabelKey: "category", // 从 Labels["category"] / Meta["category"] / Features["category"] 获取
}

// 2. 按作者打散(避免同一作者连续出现)
authorDiversity := &rerank.Diversity{
    DiversityKeys:  []string{"author"}, // 按 author 打散
    MaxConsecutive: 1,                  // 同一作者最多连续出现 1 次
    WindowSize:     3,                  // 滑动窗口大小
}

// 3. 同时使用(先类别去重,再按多 key 打散)
combined := &rerank.Diversity{
    LabelKey:       "category",
    DiversityKeys:  []string{"author", "category"},
    MaxConsecutive: 2,
}

// 4. 高级模式:多规则独立约束 + 权重回退 + 多值维度
advancedDiversity := &rerank.Diversity{
    Constraints: []rerank.DiversityConstraint{
        {
            Dimensions:     []string{"category"},
            MaxConsecutive: 2,                    // 同类别最多连续 2 次
            Weight:         1.0,
        },
        {
            Dimensions:     []string{"author"},
            WindowSize:     5,
            MaxPerWindow:   2,                    // 5 个位置内同作者最多 2 次
            Weight:         0.5,
        },
        {
            Dimensions:          []string{"tags"},
            MaxConsecutive:      1,
            MultiValueDelimiter: "|",             // "体育|科技" 拆分后任一交集即同组
            Weight:              0.3,
        },
    },
    ExcludeChannels: []string{"ad"},              // 广告通道不参与多样性,尾部追加
    Limit:           50,                          // 只对前 50 个位置做多样性
    ExploreLimit:    20,                          // 每位置最多扫描 20 个候选
}

// 字段查找优先级:Labels > Meta > Features
// - Labels["category"].Value = "tech"     → 使用 "tech"
// - Meta["category"] = "tech" (string)    → 使用 "tech"(Labels 未找到时)
// - Features["category_id"] = 3.0         → 使用 "3"(Labels、Meta 均未找到时)

使用 DPP / SSD 多样性重排(基于 Embedding)

import "github.com/rushteam/reckit/rerank"

// DPP 多样性(Determinantal Point Process)
// 需要 item.Meta["embedding"] = []float64{...},由 feature.EnrichNode 提前注入
dppNode := &rerank.DPPDiversityNode{
    N:            30,          // 输出 30 条
    Alpha:        0.5,         // 质量-多样性权衡(越小越多样)
    WindowSize:   10,          // 窗口化 DPP(大列表时分块处理)
    EmbeddingKey: "embedding", // item.Meta 中 embedding 的 key
    NormalizeEmb: true,        // L2 归一化
    ScoreNorm:    rerank.ScoreNormMinMax, // 分数标准化到 [ε, 1]
}

// SSD 多样性(Sliding Subspace Diversity,比 DPP 更轻量)
ssdNode := &rerank.SSDDiversityNode{
    N:            30,
    Gamma:        0.25,        // 质量-多样性权衡(越大越多样)
    WindowSize:   5,           // 滑动窗口大小(遗忘速度)
    EmbeddingKey: "embedding",
    NormalizeEmb: true,
    ScoreNorm:    rerank.ScoreNormZScore, // Z-Score 标准化
}

// 在 Pipeline 中使用
p := &pipeline.Pipeline{
    Nodes: []pipeline.Node{
        recallNode,
        filterNode,
        &feature.EnrichNode{FeatureService: featureService},
        rankNode,
        dppNode, // 或 ssdNode
    },
}

使用 DeepFM 模型(PyTorch)

import "github.com/rushteam/reckit/model"
import "github.com/rushteam/reckit/rank"
import "github.com/rushteam/reckit/service"

// 1. 训练 DeepFM 模型(Python)
// cd python
// python train/train_deepfm.py --version v1.0.0

// 2. 启动 DeepFM 服务(Python,需兼容 KServe V2 协议)
// uvicorn service.deepfm_server:app --host 0.0.0.0 --port 8080

// 3. 在 Golang 中使用(推荐:通过 KServe V2 协议)
kserveClient := service.NewKServeClient("http://localhost:8080", "deepfm",
    service.WithKServeTimeout(5*time.Second),
)
deepfmModel := model.NewRPCModelFromService("deepfm", kserveClient)

p := &pipeline.Pipeline{
    Nodes: []pipeline.Node{
        &recall.Fanout{...},
        &feature.EnrichNode{...},
        &rank.RPCNode{Model: deepfmModel},
    },
}

Python 训练: python/train/train_deepfm.py,详见 python/train/DEEPFM_README.md

KServe V2 特征字典模式(BYTES)

默认情况下 KServeClientFeatures (map[string]float64) 排序展平为 FP64 tensor 发送。 当服务端需要自行编码特征(如 DeepFM 包含 embedding lookup)时,可启用 WithKServeV2FeaturesAsJSON, 将每条特征序列化为 JSON 字符串,以 BYTES 类型发送:

import "github.com/rushteam/reckit/service"

// 启用特征字典模式
kserveClient := service.NewKServeClient("http://localhost:8080", "deepfm",
    service.WithKServeTimeout(5*time.Second),
    service.WithKServeV2FeaturesAsJSON(), // Features 以 JSON BYTES 发送
)

// 发送的 V2 请求体示例:
// {
//   "inputs": [{
//     "name": "features",
//     "shape": [2],
//     "datatype": "BYTES",
//     "data": [
//       "{\"ctr\":0.15,\"cvr\":0.08,\"user_age\":25}",
//       "{\"ctr\":0.22,\"cvr\":0.12,\"user_age\":30}"
//     ]
//   }]
// }
//
// 服务端可直接 json.loads(data[i]) 得到特征字典,无需约定特征顺序。

两种模式对比:

  • 默认(FP64 tensor):客户端排序展平,适合服务端直接接收数值向量的场景
  • WithKServeV2FeaturesAsJSON(BYTES):客户端保留原始特征字典,适合服务端自行做特征编码(embedding lookup、分箱等)的场景

标准参考

使用 BERT 模型

import "github.com/rushteam/reckit/model"
import "github.com/rushteam/reckit/recall"
import "github.com/rushteam/reckit/service"

// 1. 创建 BERT 服务客户端(推荐 KServe V2 / Triton)
kserveClient := service.NewKServeClient(
    "http://localhost:8080", // Triton / KServe 端点
    "bert-base",              // 模型名称
    service.WithKServeTimeout(5*time.Second),
    service.WithKServeV2OutputName("embeddings"),
)

// 2. 创建 BERT 模型
bertModel := model.NewBERTModel(kserveClient, 768).
    WithModelName("bert-base").
    WithMaxLength(512).
    WithPoolingStrategy("cls")

// 3. 文本编码
vector, _ := bertModel.EncodeText(ctx, "electronics smartphone tech")

// 4. 批量编码(提高效率)
vectors, _ := bertModel.EncodeTexts(ctx, []string{"text1", "text2", "text3"})

// 5. 基于 BERT 的召回
bertRecall := &recall.BERTRecall{
    Model:     bertModel,
    Store:     bertStore,
    TopK:      20,
    Mode:      "text",      // text 或 query
    TextField: "title",     // title / description / tags
    BatchSize: 32,          // 批量编码大小
}

MMoE / YouTube DNN / DSSM / GraphRecall

  • MMoE:多目标重排(CTR + 时长 + GMV)。python train/train_mmoe.pyuvicorn service.mmoe_server:app --port 8081;Golang rerank.MMoENode{ Endpoint: "http://localhost:8081/predict", WeightCTR, WeightWatchTime, WeightGMV }
  • YouTube DNN:视频/内容流召回。python train/train_youtube_dnn.pyuvicorn service.youtube_dnn_server:app --port 8082;Golang recall.YouTubeDNNRecall{ UserEmbeddingURL, VectorService, TopK, Collection }
  • DSSM:Query-Doc 语义召回。python train/train_dssm.pyuvicorn service.dssm_server:app --port 8083;Golang recall.DSSMRecall{ QueryEmbeddingURL, VectorService, TopK, Collection };query 特征来自 rctx.Params["query_features"]
  • GraphRecall:Node2vec 社交/关注页召回。python train/train_node2vec.pyuvicorn service.graph_recall_server:app --port 8084;Golang recall.GraphRecall{ Endpoint: "http://localhost:8084/recall", TopK }

加载特征元数据

import "github.com/rushteam/reckit/feature"

// 方式 1:本地文件加载
fileLoader := feature.NewFileMetadataLoader()
meta, _ := fileLoader.Load(ctx, "python/model/feature_meta.json")

// 方式 2:HTTP 接口加载
httpLoader := feature.NewHTTPMetadataLoader(5 * time.Second)
meta, _ := httpLoader.Load(ctx, "http://api.example.com/models/v1.0.0/feature_meta")

// 方式 3:S3 兼容协议加载(支持 AWS S3、阿里云 OSS、腾讯云 COS、MinIO 等)
s3Client := &MyS3Client{} // 需要实现 feature.S3Client 接口
s3Loader := feature.NewS3MetadataLoader(s3Client, "my-bucket")
meta, _ := s3Loader.Load(ctx, "models/v1.0.0/feature_meta.json")

// 使用特征元数据
validated := meta.ValidateFeatures(features)
missing := meta.GetMissingFeatures(features)
vector := meta.BuildFeatureVector(features)

加载特征标准化器

// 方式 1:本地文件加载
fileScalerLoader := feature.NewFileScalerLoader()
scaler, _ := fileScalerLoader.Load(ctx, "python/model/feature_scaler.json")

// 方式 2:HTTP 接口加载
httpScalerLoader := feature.NewHTTPScalerLoader(5 * time.Second)
scaler, _ := httpScalerLoader.Load(ctx, "http://api.example.com/models/v1.0.0/feature_scaler")

// 方式 3:S3 兼容协议加载
s3ScalerLoader := feature.NewS3ScalerLoader(s3Client, "my-bucket")
scaler, _ := s3ScalerLoader.Load(ctx, "models/v1.0.0/feature_scaler.json")

// 使用标准化器
normalized := scaler.Normalize(features)

示例代码位置

  • examples/basic/ - 基础示例
  • examples/all_recall_algorithms/ - 所有召回算法示例
  • examples/extensibility/ - 扩展性示例(自定义策略、Hook)
  • examples/config/ - 配置化 Pipeline 示例(使用 pkg/conv 进行配置解析)
  • examples/user_profile/ - 用户画像示例(包含扩展属性 Extras 的使用)
  • examples/feature_service/ - 特征服务示例
  • examples/personalization/ - 个性化推荐示例
  • examples/feature_metadata/ - 特征元数据使用示例
  • examples/feature_metadata_loader/ - 特征元数据加载器示例(本地文件、HTTP、S3 兼容协议)
  • examples/feature_processing/ - 特征处理工具类示例
  • examples/feature_version/ - 特征版本管理示例
  • examples/word2vec/ - Word2Vec / Item2Vec 示例(含 JSON 加载、Python 训练说明)
  • examples/deepfm/ - DeepFM 排序示例(PyTorch 训练 + RPC 调用)
  • examples/bert/ - BERT 模型使用示例

相关文档

  • README.md - 项目主文档
  • docs/ARCHITECTURE.md - 架构设计文档
  • docs/RECALL_ALGORITHMS.md - 召回算法文档
  • docs/WORD2VEC_ITEM2VEC.md - Word2Vec / Item2Vec 使用与 Python 训练
  • docs/RANK_MODELS.md - 排序模型文档
  • docs/EXTENSIBILITY_ANALYSIS.md - 可扩展性分析
  • docs/INTERFACES_AND_IMPLEMENTATIONS.md - 接口与实现完整分析
  • docs/FEATURE_CONSISTENCY.md - 特征一致性文档(训练与在线一致性)
  • docs/FEATURE_PROCESSING.md - 特征处理文档(归一化、编码等)
  • docs/ENCODER_INTERFACE_DESIGN.md - 编码器接口设计说明
  • docs/USER_PROFILE.md - 用户画像文档(包含扩展属性 Extras 的使用)
  • docs/MODEL_SERVICE_PROTOCOL.md - 模型服务协议约束(推荐 KServe V2 / Open Inference Protocol)
  • pkg/conv/README.md - 类型转换与泛型工具文档
  • ext/README.md - 扩展包使用指南