diff --git a/drainer/config.go b/drainer/config.go index 87e9de852..5d527408c 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -89,6 +89,9 @@ type SyncerConfig struct { EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"` DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"` EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"` + + PluginPath string `toml:"plugin-path" json:"plugin-path"` + PluginName string `toml:"plugin-name" json:"plugin-name"` } // EnableDispatch return true if enable dispatch. @@ -216,6 +219,8 @@ func NewConfig() *Config { fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size") fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced") fs.StringVar(new(string), "log-rotate", "", "DEPRECATED") + fs.StringVar(&cfg.SyncerCfg.PluginName, "plugin-name", "", "syncer plugin name") + fs.StringVar(&cfg.SyncerCfg.PluginPath, "plugin-path", "", "syncer plugin path") return cfg } diff --git a/drainer/sync/kafka.go b/drainer/sync/kafka.go index 1746591e8..8879db35d 100644 --- a/drainer/sync/kafka.go +++ b/drainer/sync/kafka.go @@ -48,7 +48,7 @@ type KafkaSyncer struct { lastSuccessTime time.Time shutdown chan struct{} - *baseSyncer + *BaseSyncer } // newAsyncProducer will only be changed in unit test for mock @@ -69,7 +69,7 @@ func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*Kafka topic: topic, toBeAckCommitTS: make(map[int64]int), shutdown: make(chan struct{}), - baseSyncer: newBaseSyncer(tableInfoGetter), + BaseSyncer: newBaseSyncer(tableInfoGetter), } config, err := util.NewSaramaConfig(cfg.KafkaVersion, "kafka.") diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index b3e3a1121..cf0a9304d 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -35,7 +35,7 @@ type MysqlSyncer struct { db *sql.DB loader loader.Loader relayer relay.Relayer - *baseSyncer + *BaseSyncer } // should only be used for unit test to create mock db @@ -130,7 +130,7 @@ func NewMysqlSyncer( db: db, loader: loader, relayer: relayer, - baseSyncer: newBaseSyncer(tableInfoGetter), + BaseSyncer: newBaseSyncer(tableInfoGetter), } go s.run() diff --git a/drainer/sync/pb.go b/drainer/sync/pb.go index 22f5e4af5..a7c683ee8 100644 --- a/drainer/sync/pb.go +++ b/drainer/sync/pb.go @@ -28,7 +28,7 @@ import ( var _ Syncer = &pbSyncer{} type pbSyncer struct { - *baseSyncer + *BaseSyncer binlogger binlogfile.Binlogger cancel func() @@ -45,7 +45,7 @@ func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.Table s := &pbSyncer{ binlogger: binlogger, - baseSyncer: newBaseSyncer(tableInfoGetter), + BaseSyncer: newBaseSyncer(tableInfoGetter), cancel: cancel, } diff --git a/drainer/sync/syncer.go b/drainer/sync/syncer.go index eb6d7d647..760ca59f0 100644 --- a/drainer/sync/syncer.go +++ b/drainer/sync/syncer.go @@ -47,14 +47,15 @@ type Syncer interface { Close() error } -type baseSyncer struct { +//BaseSyncer is basic implementation of syncer +type BaseSyncer struct { *baseError success chan *Item tableInfoGetter translator.TableInfoGetter } -func newBaseSyncer(tableInfoGetter translator.TableInfoGetter) *baseSyncer { - return &baseSyncer{ +func newBaseSyncer(tableInfoGetter translator.TableInfoGetter) *BaseSyncer { + return &BaseSyncer{ baseError: newBaseError(), success: make(chan *Item, 8), tableInfoGetter: tableInfoGetter, @@ -62,11 +63,11 @@ func newBaseSyncer(tableInfoGetter translator.TableInfoGetter) *baseSyncer { } // Successes implements Syncer interface -func (s *baseSyncer) Successes() <-chan *Item { +func (s *BaseSyncer) Successes() <-chan *Item { return s.success } // Error implements Syncer interface -func (s *baseSyncer) Error() <-chan error { +func (s *BaseSyncer) Error() <-chan error { return s.error() } diff --git a/drainer/syncer.go b/drainer/syncer.go index 889aa6b9a..34ae63b23 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/drainer/syncplg" "github.com/pingcap/tidb-binlog/pkg/loader" "github.com/pingcap/errors" @@ -120,6 +121,26 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBac // only use for test case "_intercept": dsyncer = newInterceptSyncer() + case "plugin": + if len(cfg.PluginName) == 0 || len(cfg.PluginPath) == 0 { + return nil, errors.Errorf("plugin-name or plugin-path is incorrect") + } + newSyncer, err := syncplg.LoadPlugin(cfg.PluginPath, cfg.PluginName) + if err != nil { + return nil, errors.Annotate(err, "fail to load plugin dsyncer") + } + + var relayer relay.Relayer + if cfg.Relay.IsEnabled() { + if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.MaxFileSize, schema); err != nil { + return nil, errors.Annotate(err, "fail to create relayer") + } + } + + dsyncer, err = newSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType, relayer, info, cfg.EnableDispatch(), cfg.EnableCausality()) + if err != nil { + return nil, errors.Annotate(err, "fail to create plugin dsyncer") + } default: return nil, errors.Errorf("unknown DestDBType: %s", cfg.DestDBType) } diff --git a/drainer/syncplg/demo/Makefile b/drainer/syncplg/demo/Makefile new file mode 100644 index 000000000..dc13aa17f --- /dev/null +++ b/drainer/syncplg/demo/Makefile @@ -0,0 +1,2 @@ +plugin: + go build -o syncerdemo.so -buildmode=plugin demo.go diff --git a/drainer/syncplg/demo/demo.go b/drainer/syncplg/demo/demo.go new file mode 100644 index 000000000..4b1874f4e --- /dev/null +++ b/drainer/syncplg/demo/demo.go @@ -0,0 +1,44 @@ +package main + +import ( + "errors" + + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/drainer/translator" + "github.com/prometheus/client_golang/prometheus" +) + +//DemoSyncer is a syncer demo +type DemoSyncer struct { + sync.BaseSyncer +} + +//Sync should be implemented +func (ds *DemoSyncer) Sync(item *sync.Item) error { + return nil +} + +//Close should be implemented +func (ds *DemoSyncer) Close() error { + return nil +} + +//NewSyncerPlugin return A syncer instance which implemented interface of sync.Syncer +func NewSyncerPlugin( + cfg *sync.DBConfig, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, + info *loopbacksync.LoopBackSync, + enableDispatch bool, + enableCausility bool) (dsyncer sync.Syncer, err error) { + return nil, errors.New("test error") +} + +var _ DemoSyncer diff --git a/drainer/syncplg/syncer_plugin.go b/drainer/syncplg/syncer_plugin.go new file mode 100644 index 000000000..14aa98b4c --- /dev/null +++ b/drainer/syncplg/syncer_plugin.go @@ -0,0 +1,62 @@ +package syncplg + +import ( + "errors" + "fmt" + "plugin" + + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/drainer/translator" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + //NewSyncerPlugin is the name of exported function by syncer plugin + NewSyncerPlugin = "NewSyncerPlugin" +) + +//NewSyncerFunc is a function type which syncer plugin must implement +type NewSyncerFunc func( + cfg *sync.DBConfig, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, + info *loopbacksync.LoopBackSync, + enableDispatch bool, + enableCausility bool) (dsyncer sync.Syncer, err error) + +//LoadPlugin load syncer plugin +func LoadPlugin(path, name string) (NewSyncerFunc, error) { + fp := path + "/" + name + p, err := plugin.Open(fp) + if err != nil { + return nil, fmt.Errorf("faile to Open %s . err: %s", fp, err.Error()) + } + + sym, err := p.Lookup(NewSyncerPlugin) + if err != nil { + return nil, err + } + newSyncer, ok := sym.(func( + cfg *sync.DBConfig, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, + info *loopbacksync.LoopBackSync, + enableDispatch bool, + enableCausility bool) (dsyncer sync.Syncer, err error)) + if !ok { + return nil, errors.New("function type is incorrect") + } + return newSyncer, nil +}