Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17
go-version: 1.23

- name: Build
run: VERSION="$PRE_DEF_VERSION" make
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ bin/
.vscode

*.log

ca
client
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ FILES := $$(find . -name "*.go")
default:
$(GOBUILD) -o bin/tiflash-ctl

arm:
GOOS=linux GOARCH=arm64 $(GOBUILD) -o bin/tiflash-ctl-arm

test:
$(GOTEST) -timeout 30s ./...
250 changes: 249 additions & 1 deletion cmd/dispatch.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
package cmd

import (
"context"
"encoding/csv"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/JaySon-Huang/tiflash-ctl/pkg/logutil"
"github.com/JaySon-Huang/tiflash-ctl/pkg/options"
"github.com/JaySon-Huang/tiflash-ctl/pkg/tidb"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/spf13/cobra"
kvConfig "github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"go.uber.org/zap"
)

type FetchRegionsOpts struct {
Expand All @@ -24,6 +37,26 @@ type ExecCmdOpts struct {
flashCmd string
}

type ExecSQLCmdOpts struct {
pdAddr string
flashAddr string
flashSQL string
decimal uint32
sslCA string
sslCert string
sslKey string
}

type CompactCmdOpts struct {
pdAddr string
flashAddr string
physicalTableId int64
startKey string
sslCA string
sslCert string
sslKey string
}

func newDispatchCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "dispatch",
Expand Down Expand Up @@ -71,7 +104,51 @@ func newDispatchCmd() *cobra.Command {
return c
}

cmd.AddCommand(newGetRegionCmd(), newExecCmd())
newExecSQLCmd := func() *cobra.Command {
var opt ExecSQLCmdOpts
c := &cobra.Command{
Use: "exec_sql",
Short: "Exec SQL command",
RunE: func(cmd *cobra.Command, args []string) error {
if opt.flashSQL == "" {
return fmt.Errorf("should set the command to execute")
}
return execTiFlashSQLCmd(opt)
},
}
c.Flags().StringVar(&opt.pdAddr, "pd", "127.0.0.1:2379", "pd address")
c.Flags().StringVar(&opt.flashAddr, "flash", "127.0.0.1:3930", "TiFlash address for SQL execution")
c.Flags().StringVar(&opt.flashSQL, "sql", "", "The SQL command to execute in TiFlash")
c.Flags().Uint32Var(&opt.decimal, "decimal", 3, "The decimal precision for floating point values in the output")
c.Flags().StringVar(&opt.sslCA, "ca", "", "Path to the CA certificate file for TLS")
c.Flags().StringVar(&opt.sslCert, "cert", "", "Path to the client certificate file for TLS")
c.Flags().StringVar(&opt.sslKey, "key", "", "Path to the client key file for TLS")
return c
}

newCompactCmd := func() *cobra.Command {
var opt CompactCmdOpts
c := &cobra.Command{
Use: "compact",
Short: "Compact a table in TiFlash",
RunE: func(cmd *cobra.Command, args []string) error {
if opt.physicalTableId == 0 {
return fmt.Errorf("should set the physical table id to compact")
}
return compactTiFlashTable(opt)
},
}
c.Flags().StringVar(&opt.pdAddr, "pd", "127.0.0.1:2379", "pd address")
c.Flags().StringVar(&opt.flashAddr, "flash", "127.0.0.1:3930", "TiFlash address for SQL execution")
c.Flags().Int64Var(&opt.physicalTableId, "table_id", 0, "The physical table ID to compact in TiFlash")
c.Flags().StringVar(&opt.startKey, "start_key", "", "The start key for compacting the table, in hex format")
c.Flags().StringVar(&opt.sslCA, "ca", "", "Path to the CA certificate file for TLS")
c.Flags().StringVar(&opt.sslCert, "cert", "", "Path to the client certificate file for TLS")
c.Flags().StringVar(&opt.sslKey, "key", "", "Path to the client key file for TLS")
return c
}

cmd.AddCommand(newGetRegionCmd(), newExecCmd(), newExecSQLCmd(), newCompactCmd())

return cmd
}
Expand Down Expand Up @@ -155,3 +232,174 @@ func execTiFlashCmd(opts ExecCmdOpts) error {

return nil
}

type tiFlashSQLExecuteResponseMetaColumn struct {
Name string `json:"name"`
Type string `json:"type"`
}
type tiFlashSQLExecuteResponse struct {
Meta []tiFlashSQLExecuteResponseMetaColumn `json:"meta"`
Data [][]any `json:"data"`
}

func execTiFlashSQLCmd(opts ExecSQLCmdOpts) error {
cfg := kvConfig.GetGlobalConfig()
cfg.Security = kvConfig.NewSecurity(opts.sslCA, opts.sslCert, opts.sslKey, []string{})
kvConfig.StoreGlobalConfig(cfg)

client, err := txnkv.NewClient([]string{opts.pdAddr})
if err != nil {
return fmt.Errorf("failed to create TiFlash client: %w", err)
}

ctx := context.Background()
timeout := time.Duration(5*60) * time.Second
req := tikvrpc.Request{
Type: tikvrpc.CmdGetTiFlashSystemTable,
StoreTp: tikvrpc.TiFlash,
Req: &kvrpcpb.TiFlashSystemTableRequest{
Sql: opts.flashSQL,
},
}
resp, err := client.KVStore.GetTiKVClient().SendRequest(ctx, opts.flashAddr, &req, timeout)
if err != nil {
return fmt.Errorf("failed to send request to TiFlash: %w", err)
}
tiflashResp, ok := resp.Resp.(*kvrpcpb.TiFlashSystemTableResponse)
if !ok {
return fmt.Errorf("unexpected response type: %T", resp.Resp)
}
logutil.BgLogger().Debug("response", zap.String("response", tiflashResp.String()))
// Parse the response data to be more user-friendly
var result tiFlashSQLExecuteResponse
err = json.Unmarshal(tiflashResp.Data, &result)
if err != nil {
return fmt.Errorf("failed to unmarshal TiFlash response data: %w", err)
}

/// Output as csv format, output to console
w := csv.NewWriter(os.Stdout)

// header
header := make([]string, len(result.Meta))
for i, col := range result.Meta {
header[i] = col.Name
}
w.Write(header)

// rows
floatFormat := fmt.Sprintf("%%.%df", opts.decimal)
for _, rowFields := range result.Data {
if len(rowFields) == 0 {
continue
}
outputRow := make([]string, len(rowFields))
for colIdx, fieldVal := range rowFields {
if fieldVal == nil {
outputRow[colIdx] = "NULL"
continue
}
switch result.Meta[colIdx].Type {
case "Float64", "Float32":
valStr := fmt.Sprintf(floatFormat, fieldVal)
outputRow[colIdx] = valStr
case "String", "Int64", "UInt64":
valStr := fmt.Sprintf("%s", fieldVal)
outputRow[colIdx] = valStr
default:
// for other types, just convert to string
valStr := fmt.Sprintf("%s", fieldVal)
outputRow[colIdx] = valStr
}
}

if err = w.Write(outputRow); err != nil {
return fmt.Errorf("failed to write row to CSV: %w", err)
}
}
w.Flush()
return w.Error()
}

func compactTiFlashTable(opts CompactCmdOpts) error {
cfg := kvConfig.GetGlobalConfig()
cfg.Security = kvConfig.NewSecurity(opts.sslCA, opts.sslCert, opts.sslKey, []string{})
kvConfig.StoreGlobalConfig(cfg)

client, err := txnkv.NewClient([]string{opts.pdAddr})
if err != nil {
return fmt.Errorf("failed to create TiFlash client: %w", err)
}
ctx := context.Background()
timeout := time.Duration(5*60) * time.Second

// Empty start key to compact the whole table
var startKey []byte
if len(opts.startKey) > 0 {
var err error
startKey, err = hex.DecodeString(opts.startKey)
if err != nil {
return fmt.Errorf("failed to decode start key: %w", err)
}
}

tableCompactSuccess := false
for {
req := tikvrpc.Request{
Type: tikvrpc.CmdCompact,
StoreTp: tikvrpc.TiFlash,
Req: &kvrpcpb.CompactRequest{StartKey: startKey, PhysicalTableId: opts.physicalTableId},
}
logutil.BgLogger().Info("Compact TiFlash table",
zap.Int64("physical_table_id", opts.physicalTableId),
zap.String("store", opts.flashAddr),
zap.String("start_key", hex.EncodeToString(startKey)),
)
response, err := client.KVStore.GetTiKVClient().SendRequest(ctx, opts.flashAddr, &req, timeout)
if err != nil {
logutil.BgLogger().Error("Failed to send request to TiFlash",
zap.Error(err),
)
break
}
resp, ok := response.Resp.(*kvrpcpb.CompactResponse)
if !ok {
logutil.BgLogger().Error("Unexpected response type from TiFlash",
zap.String("store", opts.flashAddr),
)
break
}
if resp.GetError() != nil {
logutil.BgLogger().Error("Compact failed",
zap.String("store", opts.flashAddr),
zap.String("resp", proto.MarshalTextString(resp)),
)
break
}
if !resp.HasRemaining {
tableCompactSuccess = true
logutil.BgLogger().Info("Compact finished",
zap.Int64("physical_table_id", opts.physicalTableId),
)
break
}
lastEndKey := resp.GetCompactedEndKey()
if len(lastEndKey) == 0 {
logutil.BgLogger().Error("Compact failed, internal error, no end key returned",
zap.String("store", opts.flashAddr),
)
break
}
// then continue to compact the next range
startKey = lastEndKey
logutil.BgLogger().Info("Next compact range",
zap.Int64("physical_table_id", opts.physicalTableId),
zap.String("store", opts.flashAddr),
zap.String("start_key", hex.EncodeToString(startKey)),
)
}

logutil.BgLogger().Info("Compact command finished", //
zap.Int64("physical_table_id", opts.physicalTableId), zap.Bool("success", tableCompactSuccess), zap.String("key", hex.EncodeToString(startKey)))
return nil
}
23 changes: 23 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,40 @@ import (
"fmt"
"os"

"github.com/pingcap/log"
"github.com/spf13/cobra"
)

var (
logLevel string
logConfig log.Config
)

func initLogger() {
logConfig = log.Config{
Level: logLevel,
}

logger, props, err := log.InitLogger(&logConfig)
if err != nil {
panic("failed to initialize logger: " + err.Error())
}
log.ReplaceGlobals(logger, props)
}

func Execute() {
cobra.EnableCommandSorting = false

rootCmd := &cobra.Command{
Use: "tiflash-ctl",
Short: "TiFlash Controller",
Long: "TiFlash Controller (tiflash-ctl) is a command line tool for TiFlash Server",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
initLogger()
},
}
// shared by all subcommands
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "L", "info", "Set the log level")
rootCmd.AddCommand(newDispatchCmd(), newCheckCmd())

if err := rootCmd.Execute(); err != nil {
Expand Down
Loading