Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions config/common.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
package config

import (
"fmt"
"time"
)

type (
MetaStoreType = string
DataStoreType = string
)

const (
MetaStoreTypeFile MetaStoreType = "file"

DataStoreTypeInfluxDB DataStoreType = "influxdb"
DataStoreTypeTDengine DataStoreType = "tdengine"

MinBatchSize = 1
MaxBatchSize = 10000
)

type DriverOptions struct {
DriverHealthCheckIntervalSecond int `json:"driver_health_check_interval_second" yaml:"driver_health_check_interval_second"`
DeviceHealthCheckIntervalSecond int `json:"device_health_check_interval_second" yaml:"device_health_check_interval_second"`
Expand All @@ -13,6 +33,102 @@ type ManagerOptions struct {
HTTP struct {
Port int `json:"port" yaml:"port"`
} `json:"http" yaml:"http"`

MetaStoreOptions *MetaStoreOptions `json:"meta_store" yaml:"meta_store"`
DataStoreOptions *DataStoreOptions `json:"data_store" yaml:"data_store"`
}

func (o *ManagerOptions) Check() error {
if o.DataStoreOptions == nil {
return nil
}
if err := o.DataStoreOptions.Check(); err != nil {
return err
}
return nil
}

type MetaStoreOptions struct {
Type MetaStoreType `json:"type" yaml:"type"`
File *FileOptions `json:"file" yaml:"file"`
}

type FileOptions struct {
Path string `json:"path" yaml:"path"`
}

type DataStoreOptions struct {
Type DataStoreType `json:"type" yaml:"type"`

// Common for All DBs
// https://docs.taosdata.com/taos-sql/database#创建数据库
Database string `json:"database" yaml:"database"`
BatchSize int `json:"batch_size" yaml:"batch_size"`

InfluxDB *InfluxDBOptions `json:"influxdb" yaml:"influxdb"`
TDengine *TDengineOptions `json:"tdengine" yaml:"tdengine"`
}

func (o *DataStoreOptions) Check() error {
if o.Database == "" {
return fmt.Errorf("influxdb database must be specified")
}

if o.BatchSize > MaxBatchSize {
return fmt.Errorf("max-batch-size cannot be large than %d", MaxBatchSize)
}
if o.BatchSize < MinBatchSize {
return fmt.Errorf("batch-size cannot be less than %d", MinBatchSize)
}

switch o.Type {
case DataStoreTypeInfluxDB:
if o.InfluxDB == nil {
return fmt.Errorf("the configuration for InfluxDB must be required")
}
return o.InfluxDB.Check()
case DataStoreTypeTDengine:
if o.TDengine == nil {
return fmt.Errorf("the configuration for TDengine must be required")
}
return o.TDengine.Check()
default:
return fmt.Errorf("unsupported datastore type: %s", o.Type)
}
}

type InfluxDBOptions struct {
URL string `json:"url" yaml:"url"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
UserAgent string `json:"user_agent" yaml:"user_agent"`
Precision string `json:"precision" yaml:"precision"`
RetentionPolicy string `json:"retention_policy" yaml:"retention_policy"`
WriteConsistency string `json:"write_consistency" yaml:"write_consistency"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
}

func (o *InfluxDBOptions) Check() error {
return nil
}

type TDengineOptions struct {
Schema string `json:"schema" yaml:"schema"`
Host string `json:"host" yaml:"host"`
Port uint16 `json:"port" yaml:"port"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
Keep uint `json:"keep" yaml:"keep"`
Days uint `json:"days" yaml:"days"`
Blocks uint `json:"blocks" yaml:"blocks"`
Update uint8 `json:"update" yaml:"update"`
Precision string `json:"precision" yaml:"precision"`

MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
}

func (o *TDengineOptions) Check() error {
return nil
}

type LogOptions struct {
Expand Down
62 changes: 42 additions & 20 deletions config/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"github.com/thingio/edge-device-std/errors"
"sync"
)

const (
Expand All @@ -14,36 +15,57 @@ const (
FileFormat = "yaml"
)

var cfg = new(Configuration)
var once = sync.Once{}

type Configuration struct {
DriverOptions DriverOptions `json:"driver" yaml:"driver"`
ManagerOptions ManagerOptions `json:"manager" yaml:"manager"`
LogOptions LogOptions `json:"log" yaml:"log"`
MessageBus MessageBusOptions `json:"msgbus" yaml:"msgbus"`
}

func NewConfiguration() (*Configuration, errors.EdgeError) {
// read the configuration file path
var configPath string
flag.StringVar(&configPath, "cp", FilePath, "config file path, e.g. \"/etc\"")
var configName string
flag.StringVar(&configName, "cn", FileName, "config file name, e.g. \"config\", excluding the suffix")
flag.Parse()

viper.SetEnvPrefix(EnvPrefix)
viper.AutomaticEnv()
viper.AddConfigPath(configPath)
viper.SetConfigName(configName)
viper.SetConfigType(FileFormat)
if err := viper.ReadInConfig(); err != nil {
return nil, errors.Configuration.Cause(err, "fail to read the configuration file")
func (c *Configuration) Check() error {
if err := c.ManagerOptions.Check(); err != nil {
return err
}
return nil
}

func NewConfiguration() (*Configuration, errors.EdgeError) {
var err error

once.Do(func() {
// read the configuration file path
var configPath string
flag.StringVar(&configPath, "cp", FilePath, "config file path, e.g. \"/etc\"")
var configName string
flag.StringVar(&configName, "cn", FileName, "config file name, e.g. \"config\", excluding the suffix")
flag.Parse()

cfg := new(Configuration)
if err := viper.Unmarshal(cfg, func(dc *mapstructure.DecoderConfig) {
dc.TagName = FileFormat
}); err != nil {
viper.SetEnvPrefix(EnvPrefix)
viper.AutomaticEnv()
viper.AddConfigPath(configPath)
viper.SetConfigName(configName)
viper.SetConfigType(FileFormat)

if err = viper.ReadInConfig(); err != nil {
return
}

if err = viper.Unmarshal(cfg, func(dc *mapstructure.DecoderConfig) {
dc.TagName = FileFormat
}); err != nil {
return
}

if err = cfg.Check(); err != nil {
return
}
})

if err != nil {
return nil, errors.Configuration.Cause(err, "fail to unmarshal the configuration file")
}

return cfg, nil
}
2 changes: 1 addition & 1 deletion docs/zh/系统架构/Accessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
1. 设备数据持久化与查询;
1. 流程:
1. 设备元数据增加 `recording` 字段,表示是否将采集的设备进行落库;
2. accessor 启动后,连接到 manager,先从 manager 获取全量设备元数据,再监听设备元数据的变更(使用 edge-device-std 定义的 operations );
2. accessor 启动后,先从 manager 获取全量设备元数据,再监听设备元数据的变更(使用 edge-device-std 定义的 operations );
3. 如果设备的 `recording` 字段为 true,则在 accessor 中启动一个 Recorder 监听 event 以及 props 对应的主题,将采集到的数据落库,否则跳过或者关闭
Recorder;
2. 是否需要支持多数据源,如 InfluxDB | TDEngine 等?
Expand Down
Binary file added docs/zh/系统架构/Accessor/架构图.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions errors/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
Driver = NewType(300000, "Driver")
DeviceTwin = NewType(400000, "DeviceTwin")
MetaStore = NewType(500000, "MetaStore")
DataStore = NewType(600000, "DataStore")
)

type ErrType struct {
Expand Down
7 changes: 7 additions & 0 deletions operations/manager_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (m *metaManagerService) subscribe(optType MetaOperationType,

type (
DataManagerService interface {
Subscribe(topic string) (<-chan interface{}, func(), error)
SubscribeDeviceStatus(protocolID string) (<-chan interface{}, func(), error)
SubscribeDeviceProps(protocolID, productID, deviceID string, propertyID models.ProductPropertyID) (<-chan interface{}, func(), error)
SubscribeDeviceEvent(protocolID, productID, deviceID string, eventID models.ProductEventID) (<-chan interface{}, func(), error)
Expand All @@ -85,6 +86,12 @@ func newDataManagerService(mb bus.MessageBus, lg *logger.Logger) (DataManagerSer
return &dataManagerService{mb: mb, lg: lg}, nil
}

func (d *dataManagerService) Subscribe(topic string) (<-chan interface{}, func(), error) {
return subscribe(d.mb, d.lg, topic, func(msg *message.Message) (interface{}, error) {
return msg, nil
})
}

func (d *dataManagerService) SubscribeDeviceStatus(protocolID string) (<-chan interface{}, func(), error) {
return d.subscribe(protocolID, TopicSingleLevelWildcard, TopicSingleLevelWildcard, TopicSingleLevelWildcard, DataOperationTypeHealthCheck,
func(o *DataOperation) (interface{}, error) {
Expand Down