diff --git a/config/common.go b/config/common.go index a628630..fa86ab2 100644 --- a/config/common.go +++ b/config/common.go @@ -1,5 +1,25 @@ package config +import ( + "fmt" + "time" +) + +type ( + MetaStoreType = string + DataStoreType = string +) + +const ( + MetaStoreTypeFile MetaStoreType = "file" + + DataStoreTypeInfluxDB DataStoreType = "influxdb" + DataStoreTypeTDengine DataStoreType = "tdengine" + + MinBatchSize = 1 + MaxBatchSize = 10000 +) + type DriverOptions struct { DriverHealthCheckIntervalSecond int `json:"driver_health_check_interval_second" yaml:"driver_health_check_interval_second"` DeviceHealthCheckIntervalSecond int `json:"device_health_check_interval_second" yaml:"device_health_check_interval_second"` @@ -13,6 +33,102 @@ type ManagerOptions struct { HTTP struct { Port int `json:"port" yaml:"port"` } `json:"http" yaml:"http"` + + MetaStoreOptions *MetaStoreOptions `json:"meta_store" yaml:"meta_store"` + DataStoreOptions *DataStoreOptions `json:"data_store" yaml:"data_store"` +} + +func (o *ManagerOptions) Check() error { + if o.DataStoreOptions == nil { + return nil + } + if err := o.DataStoreOptions.Check(); err != nil { + return err + } + return nil +} + +type MetaStoreOptions struct { + Type MetaStoreType `json:"type" yaml:"type"` + File *FileOptions `json:"file" yaml:"file"` +} + +type FileOptions struct { + Path string `json:"path" yaml:"path"` +} + +type DataStoreOptions struct { + Type DataStoreType `json:"type" yaml:"type"` + + // Common for All DBs + // https://docs.taosdata.com/taos-sql/database#创建数据库 + Database string `json:"database" yaml:"database"` + BatchSize int `json:"batch_size" yaml:"batch_size"` + + InfluxDB *InfluxDBOptions `json:"influxdb" yaml:"influxdb"` + TDengine *TDengineOptions `json:"tdengine" yaml:"tdengine"` +} + +func (o *DataStoreOptions) Check() error { + if o.Database == "" { + return fmt.Errorf("influxdb database must be specified") + } + + if o.BatchSize > MaxBatchSize { + return fmt.Errorf("max-batch-size cannot be large than %d", MaxBatchSize) + } + if o.BatchSize < MinBatchSize { + return fmt.Errorf("batch-size cannot be less than %d", MinBatchSize) + } + + switch o.Type { + case DataStoreTypeInfluxDB: + if o.InfluxDB == nil { + return fmt.Errorf("the configuration for InfluxDB must be required") + } + return o.InfluxDB.Check() + case DataStoreTypeTDengine: + if o.TDengine == nil { + return fmt.Errorf("the configuration for TDengine must be required") + } + return o.TDengine.Check() + default: + return fmt.Errorf("unsupported datastore type: %s", o.Type) + } +} + +type InfluxDBOptions struct { + URL string `json:"url" yaml:"url"` + Username string `json:"username" yaml:"username"` + Password string `json:"password" yaml:"password"` + UserAgent string `json:"user_agent" yaml:"user_agent"` + Precision string `json:"precision" yaml:"precision"` + RetentionPolicy string `json:"retention_policy" yaml:"retention_policy"` + WriteConsistency string `json:"write_consistency" yaml:"write_consistency"` + Timeout time.Duration `json:"timeout" yaml:"timeout"` +} + +func (o *InfluxDBOptions) Check() error { + return nil +} + +type TDengineOptions struct { + Schema string `json:"schema" yaml:"schema"` + Host string `json:"host" yaml:"host"` + Port uint16 `json:"port" yaml:"port"` + Username string `json:"username" yaml:"username"` + Password string `json:"password" yaml:"password"` + Keep uint `json:"keep" yaml:"keep"` + Days uint `json:"days" yaml:"days"` + Blocks uint `json:"blocks" yaml:"blocks"` + Update uint8 `json:"update" yaml:"update"` + Precision string `json:"precision" yaml:"precision"` + + MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"` +} + +func (o *TDengineOptions) Check() error { + return nil } type LogOptions struct { diff --git a/config/define.go b/config/define.go index 4f76a8a..99fe80e 100644 --- a/config/define.go +++ b/config/define.go @@ -5,6 +5,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/spf13/viper" "github.com/thingio/edge-device-std/errors" + "sync" ) const ( @@ -14,6 +15,9 @@ const ( FileFormat = "yaml" ) +var cfg = new(Configuration) +var once = sync.Once{} + type Configuration struct { DriverOptions DriverOptions `json:"driver" yaml:"driver"` ManagerOptions ManagerOptions `json:"manager" yaml:"manager"` @@ -21,29 +25,47 @@ type Configuration struct { MessageBus MessageBusOptions `json:"msgbus" yaml:"msgbus"` } -func NewConfiguration() (*Configuration, errors.EdgeError) { - // read the configuration file path - var configPath string - flag.StringVar(&configPath, "cp", FilePath, "config file path, e.g. \"/etc\"") - var configName string - flag.StringVar(&configName, "cn", FileName, "config file name, e.g. \"config\", excluding the suffix") - flag.Parse() - - viper.SetEnvPrefix(EnvPrefix) - viper.AutomaticEnv() - viper.AddConfigPath(configPath) - viper.SetConfigName(configName) - viper.SetConfigType(FileFormat) - if err := viper.ReadInConfig(); err != nil { - return nil, errors.Configuration.Cause(err, "fail to read the configuration file") +func (c *Configuration) Check() error { + if err := c.ManagerOptions.Check(); err != nil { + return err } + return nil +} + +func NewConfiguration() (*Configuration, errors.EdgeError) { + var err error + + once.Do(func() { + // read the configuration file path + var configPath string + flag.StringVar(&configPath, "cp", FilePath, "config file path, e.g. \"/etc\"") + var configName string + flag.StringVar(&configName, "cn", FileName, "config file name, e.g. \"config\", excluding the suffix") + flag.Parse() - cfg := new(Configuration) - if err := viper.Unmarshal(cfg, func(dc *mapstructure.DecoderConfig) { - dc.TagName = FileFormat - }); err != nil { + viper.SetEnvPrefix(EnvPrefix) + viper.AutomaticEnv() + viper.AddConfigPath(configPath) + viper.SetConfigName(configName) + viper.SetConfigType(FileFormat) + + if err = viper.ReadInConfig(); err != nil { + return + } + + if err = viper.Unmarshal(cfg, func(dc *mapstructure.DecoderConfig) { + dc.TagName = FileFormat + }); err != nil { + return + } + + if err = cfg.Check(); err != nil { + return + } + }) + + if err != nil { return nil, errors.Configuration.Cause(err, "fail to unmarshal the configuration file") } - return cfg, nil } diff --git "a/docs/zh/\347\263\273\347\273\237\346\236\266\346\236\204/Accessor/README.md" "b/docs/zh/\347\263\273\347\273\237\346\236\266\346\236\204/Accessor/README.md" index 5371e07..349c0e5 100644 --- "a/docs/zh/\347\263\273\347\273\237\346\236\266\346\236\204/Accessor/README.md" +++ "b/docs/zh/\347\263\273\347\273\237\346\236\266\346\236\204/Accessor/README.md" @@ -4,7 +4,7 @@ 1. 设备数据持久化与查询; 1. 流程: 1. 设备元数据增加 `recording` 字段,表示是否将采集的设备进行落库; - 2. accessor 启动后,连接到 manager,先从 manager 获取全量设备元数据,再监听设备元数据的变更(使用 edge-device-std 定义的 operations ); + 2. accessor 启动后,先从 manager 获取全量设备元数据,再监听设备元数据的变更(使用 edge-device-std 定义的 operations ); 3. 如果设备的 `recording` 字段为 true,则在 accessor 中启动一个 Recorder 监听 event 以及 props 对应的主题,将采集到的数据落库,否则跳过或者关闭 Recorder; 2. 是否需要支持多数据源,如 InfluxDB | TDEngine 等? diff --git "a/docs/zh/\347\263\273\347\273\237\346\236\266\346\236\204/Accessor/\346\236\266\346\236\204\345\233\276.png" "b/docs/zh/\347\263\273\347\273\237\346\236\266\346\236\204/Accessor/\346\236\266\346\236\204\345\233\276.png" new file mode 100644 index 0000000..a235cce Binary files /dev/null and "b/docs/zh/\347\263\273\347\273\237\346\236\266\346\236\204/Accessor/\346\236\266\346\236\204\345\233\276.png" differ diff --git a/errors/type.go b/errors/type.go index 53edd26..b89efd8 100644 --- a/errors/type.go +++ b/errors/type.go @@ -18,6 +18,7 @@ var ( Driver = NewType(300000, "Driver") DeviceTwin = NewType(400000, "DeviceTwin") MetaStore = NewType(500000, "MetaStore") + DataStore = NewType(600000, "DataStore") ) type ErrType struct { diff --git a/operations/manager_service.go b/operations/manager_service.go index e9a5386..18673db 100644 --- a/operations/manager_service.go +++ b/operations/manager_service.go @@ -71,6 +71,7 @@ func (m *metaManagerService) subscribe(optType MetaOperationType, type ( DataManagerService interface { + Subscribe(topic string) (<-chan interface{}, func(), error) SubscribeDeviceStatus(protocolID string) (<-chan interface{}, func(), error) SubscribeDeviceProps(protocolID, productID, deviceID string, propertyID models.ProductPropertyID) (<-chan interface{}, func(), error) SubscribeDeviceEvent(protocolID, productID, deviceID string, eventID models.ProductEventID) (<-chan interface{}, func(), error) @@ -85,6 +86,12 @@ func newDataManagerService(mb bus.MessageBus, lg *logger.Logger) (DataManagerSer return &dataManagerService{mb: mb, lg: lg}, nil } +func (d *dataManagerService) Subscribe(topic string) (<-chan interface{}, func(), error) { + return subscribe(d.mb, d.lg, topic, func(msg *message.Message) (interface{}, error) { + return msg, nil + }) +} + func (d *dataManagerService) SubscribeDeviceStatus(protocolID string) (<-chan interface{}, func(), error) { return d.subscribe(protocolID, TopicSingleLevelWildcard, TopicSingleLevelWildcard, TopicSingleLevelWildcard, DataOperationTypeHealthCheck, func(o *DataOperation) (interface{}, error) {