Skip to content

Latest commit

 

History

History
371 lines (294 loc) · 8.94 KB

File metadata and controls

371 lines (294 loc) · 8.94 KB

分库分表

daog 支持表分片(水平分表)和数据源分片(分库),可以应对大数据量场景。

表分片

表分片将一张逻辑表的数据分散到多张物理表中,如 user_info_00, user_info_01, ..., user_info_99

配置 ShardingFunc

在生成的 -ext.go 文件中配置分表函数:

// UserInfo-ext.go
package dal

import "fmt"

func init() {
    // 配置分表策略:按用户 ID 取模
    UserInfoMeta.ShardingFunc = func(tableName string, shardingKey any) string {
        userId := shardingKey.(int64)
        return fmt.Sprintf("%s_%02d", tableName, userId%100)
    }
}

使用分表上下文

使用 NewTransContextWithSharding 创建支持分表的事务上下文:

userId := int64(12345)

tc, err := daog.NewTransContextWithSharding(
    datasource,
    txrequest.RequestWrite,
    "trace-id",
    userId,    // 分表键
    nil,       // 分库键(不分库时为 nil)
)
if err != nil {
    return err
}

defer func() {
    tc.CompleteWithPanic(err, recover())
}()

// 此时所有操作都会路由到 user_info_45 表(12345 % 100 = 45)
user := &dal.UserInfo{Name: "张三"}
_, err = dal.UserInfoDao.Insert(tc, user)

分表函数示例

按 ID 取模:

UserInfoMeta.ShardingFunc = func(tableName string, shardingKey any) string {
    id := shardingKey.(int64)
    return fmt.Sprintf("%s_%02d", tableName, id%100)
}
// user_info -> user_info_00 ~ user_info_99

按日期分表:

UserInfoMeta.ShardingFunc = func(tableName string, shardingKey any) string {
    date := shardingKey.(time.Time)
    return fmt.Sprintf("%s_%s", tableName, date.Format("200601"))
}
// user_info -> user_info_202401, user_info_202402, ...

按区域分表:

UserInfoMeta.ShardingFunc = func(tableName string, shardingKey any) string {
    region := shardingKey.(string)
    return fmt.Sprintf("%s_%s", tableName, region)
}
// user_info -> user_info_north, user_info_south, ...

数据源分片

数据源分片将数据分散到多个数据库实例中。

创建分片数据源

confs := []*daog.DbConf{
    {
        DbUrl:  "user:pass@tcp(db1.example.com:3306)/mydb?parseTime=true",
        Size:   10,
        LogSQL: true,
    },
    {
        DbUrl:  "user:pass@tcp(db2.example.com:3306)/mydb?parseTime=true",
        Size:   10,
        LogSQL: true,
    },
    {
        DbUrl:  "user:pass@tcp(db3.example.com:3306)/mydb?parseTime=true",
        Size:   10,
        LogSQL: true,
    },
}

// 使用内置的取模策略
ds, err := daog.NewShardingDatasource(confs, daog.ModInt64ShardingDatasourcePolicy(0))
if err != nil {
    return err
}
defer ds.Shutdown()

DatasourceShardingPolicy 接口

数据源分片策略需要实现 DatasourceShardingPolicy 接口:

type DatasourceShardingPolicy interface {
    // Shard 根据分片键和分片总数返回目标分片索引
    Shard(shardKey any, count int) (int, error)
}

内置分片策略

ModInt64ShardingDatasourcePolicy:

基于 int64 取模的简单分片策略:

// 使用方式
policy := daog.ModInt64ShardingDatasourcePolicy(0)  // 参数值无实际用途
ds, err := daog.NewShardingDatasource(confs, policy)

实现原理:

func (h ModInt64ShardingDatasourcePolicy) Shard(shardKey any, count int) (int, error) {
    key, ok := shardKey.(int64)
    if !ok {
        return 0, errors.New("invalid shard key")
    }
    return int(key % int64(count)), nil
}

自定义分片策略

// 基于用户 ID 哈希的分片策略
type UserIdHashPolicy struct{}

func (p UserIdHashPolicy) Shard(shardKey any, count int) (int, error) {
    userId, ok := shardKey.(int64)
    if !ok {
        return 0, errors.New("shardKey must be int64")
    }

    // 使用一致性哈希或其他算法
    hash := fnv.New32a()
    binary.Write(hash, binary.LittleEndian, userId)
    return int(hash.Sum32() % uint32(count)), nil
}

// 使用自定义策略
ds, err := daog.NewShardingDatasource(confs, UserIdHashPolicy{})

使用分库上下文

userId := int64(12345)

tc, err := daog.NewTransContextWithSharding(
    shardingDatasource,        // 分片数据源
    txrequest.RequestWrite,
    "trace-id",
    nil,                       // 分表键(不分表时为 nil)
    userId,                    // 分库键
)

分库分表组合

同时使用分库和分表:

func init() {
    // 配置分表策略
    UserInfoMeta.ShardingFunc = func(tableName string, shardingKey any) string {
        userId := shardingKey.(int64)
        return fmt.Sprintf("%s_%02d", tableName, userId%10)
    }
}

// 创建分库数据源
confs := []*daog.DbConf{
    {DbUrl: "...db0...", ...},
    {DbUrl: "...db1...", ...},
}
ds, _ := daog.NewShardingDatasource(confs, daog.ModInt64ShardingDatasourcePolicy(0))

// 使用
userId := int64(12345)

tc, err := daog.NewTransContextWithSharding(
    ds,
    txrequest.RequestWrite,
    "trace-id",
    userId,    // 分表键:路由到 user_info_05 (12345 % 10 = 5)
    userId,    // 分库键:路由到 db1 (12345 % 2 = 1)
)

分片上下文获取

在分片函数中,可以从上下文获取分片键:

// 内部函数,一般不直接使用
tableKey := getTableShardingKeyFromCtx(ctx)
dsKey := getDatasourceShardingKeyFromCtx(ctx)

注意事项

1. 分片键必须在事务开始时确定

分片键在创建 TransContext 时指定,整个事务内的操作都会路由到同一个分片。

2. 跨分片查询

daog 不支持跨分片的自动聚合查询。如需跨分片查询,需要:

  • 在应用层手动查询多个分片
  • 使用其他中间件处理跨分片查询
// 手动跨分片查询示例
func queryAllShards(userIds []int64) ([]*dal.UserInfo, error) {
    var allUsers []*dal.UserInfo

    // 按分片键分组
    shardGroups := groupByShardKey(userIds)

    for shardKey, ids := range shardGroups {
        users, err := queryUsersInShard(shardKey, ids)
        if err != nil {
            return nil, err
        }
        allUsers = append(allUsers, users...)
    }

    return allUsers, nil
}

3. 分片键类型一致性

分表函数和分库策略中的类型断言必须与实际传入的分片键类型一致:

// 如果传入 int64,分片函数也要使用 int64
tc, _ := daog.NewTransContextWithSharding(ds, ..., int64(userId), int64(userId))

// 分片函数
UserInfoMeta.ShardingFunc = func(tableName string, shardingKey any) string {
    userId := shardingKey.(int64)  // 类型必须匹配
    return fmt.Sprintf("%s_%02d", tableName, userId%100)
}

4. 分表数量规划

合理规划分表数量,考虑:

  • 数据增长速度
  • 查询模式
  • 运维复杂度
// 预留足够的分表空间
// 100 个分表:user_info_00 ~ user_info_99
func(tableName string, key any) string {
    return fmt.Sprintf("%s_%02d", tableName, key.(int64)%100)
}

// 1000 个分表:user_info_000 ~ user_info_999
func(tableName string, key any) string {
    return fmt.Sprintf("%s_%03d", tableName, key.(int64)%1000)
}

完整示例

分库分表场景

package main

import (
    "fmt"
    "github.com/rolandhe/daog"
    txrequest "github.com/rolandhe/daog/tx"
    "your/project/dal"
)

var shardingDS daog.Datasource

func init() {
    // 1. 配置分表策略
    dal.UserInfoMeta.ShardingFunc = func(tableName string, key any) string {
        return fmt.Sprintf("%s_%02d", tableName, key.(int64)%10)
    }

    // 2. 创建分库数据源
    confs := []*daog.DbConf{
        {DbUrl: "user:pass@tcp(db0:3306)/mydb?parseTime=true", Size: 10},
        {DbUrl: "user:pass@tcp(db1:3306)/mydb?parseTime=true", Size: 10},
    }

    var err error
    shardingDS, err = daog.NewShardingDatasource(confs, daog.ModInt64ShardingDatasourcePolicy(0))
    if err != nil {
        panic(err)
    }
}

func createUser(user *dal.UserInfo) error {
    return daog.AutoTrans(
        func() (*daog.TransContext, error) {
            // 使用用户 ID 作为分片键
            return daog.NewTransContextWithSharding(
                shardingDS,
                txrequest.RequestWrite,
                "create-user",
                user.Id,    // 分表键
                user.Id,    // 分库键
            )
        },
        func(tc *daog.TransContext) error {
            _, err := dal.UserInfoDao.Insert(tc, user)
            return err
        },
    )
}

func getUser(userId int64) (*dal.UserInfo, error) {
    return daog.AutoTransWithResult(
        func() (*daog.TransContext, error) {
            return daog.NewTransContextWithSharding(
                shardingDS,
                txrequest.RequestReadonly,
                "get-user",
                userId,    // 分表键
                userId,    // 分库键
            )
        },
        func(tc *daog.TransContext) (*dal.UserInfo, error) {
            return dal.UserInfoDao.GetById(tc, userId)
        },
    )
}

相关文档