diff --git a/graphite/config_reader.go b/graphite/config_reader.go new file mode 100644 index 0000000..5bff512 --- /dev/null +++ b/graphite/config_reader.go @@ -0,0 +1,172 @@ +package main + +import ( + "bufio" + "fmt" + "os" + "regexp" + "sort" + "strconv" + "strings" +) + +type GraphiteConfigAggregator struct { + //TODO +} + +type GraphiteConfigCluster struct { + name string + ctype string + replfactor int + servers []*GraphiteConfigServer +} + +type GraphiteConfigRoute struct { + pattern *regexp.Regexp + destinations []string + stop bool + drop bool +} + +type GraphiteConfigServer struct { + host string + port uint16 + index uint32 +} + +type GraphiteConfig struct { + //aggregators []*GraphiteConfigAggregator + clusters map[string]*GraphiteConfigCluster + routes []*GraphiteConfigRoute +} + +func NewConfig() *GraphiteConfig { + return &GraphiteConfig{ + clusters: make(map[string]*GraphiteConfigCluster), + routes: make([]*GraphiteConfigRoute, 0), + } +} + +func ConfigFromFile(path string) (*GraphiteConfig, error) { + var err error + cfg := NewConfig() + + file, err := os.Open(path) + if err != nil { + return nil, err + } + defer file.Close() + + reader := bufio.NewScanner(file) + + for reader.Scan() { + line := strings.TrimSpace(reader.Text()) + if len(line) == 0 { + continue + } + if match, _ := regexp.MatchString("^cluster", line); match { + cluster := &GraphiteConfigCluster{ + name: strings.Trim(line[7:], " '\""), + } + replRegex, err := regexp.Compile("([\\w\\d_]+)\\s+replication\\s+(\\d+)") + if err != nil { + return nil, err + } + if !reader.Scan() { + return nil, fmt.Errorf("Expected to get a cluster definition "+ + "for cluster %q, got none", cluster.name) + } + replLine := reader.Text() + replMatch := replRegex.FindStringSubmatch(replLine) + if len(replMatch) == 0 { + return nil, fmt.Errorf("Unexpected replication definition found: "+ + "%q, can not parse it", replLine) + } + hashAlg, replFactorStr := replMatch[1], replMatch[2] + replFactor, err := strconv.ParseInt(replFactorStr, 10, 32) + if err != nil { + return nil, fmt.Errorf("Failed to parse replication factor: %q", + replFactorStr) + } + cluster.replfactor = int(replFactor) + cluster.ctype = hashAlg + clusterServerRegex := regexp.MustCompile("^(\\w+):(\\d+)=(\\d+)$") + Cluster: + for reader.Scan() { + clusterLine := strings.TrimSpace(reader.Text()) + shouldBreak := false + if match, _ := regexp.MatchString(";$", clusterLine); match { + shouldBreak = true + clusterLine = strings.Trim(clusterLine, ";") + } + if len(clusterLine) != 0 { + match := clusterServerRegex.FindStringSubmatch(clusterLine) + if len(match) == 0 { + return nil, fmt.Errorf("Failed to parse cluster server"+ + " config: %q", clusterLine) + } + serverPort, err := strconv.ParseInt(match[2], 10, 16) + if err != nil { + return nil, err + } + serverIndex, err := strconv.ParseInt(match[3], 10, 32) + if err != nil { + return nil, err + } + server := &GraphiteConfigServer{ + host: match[1], + port: uint16(serverPort), + index: uint32(serverIndex), + } + cluster.servers = append(cluster.servers, server) + } + if shouldBreak { + sort.Slice(cluster.servers, func(i, j int) bool { + return cluster.servers[i].index < cluster.servers[j].index + }) + break Cluster + } + } + cfg.clusters[cluster.name] = cluster + } else if match, _ := regexp.MatchString("^match", line); match { + matchRule := strings.TrimSpace(line[5:]) + fmt.Printf("Route definition started: %q\n", matchRule) + matchRegex, err := regexp.Compile(matchRule) + if err != nil { + return nil, err + } + configRoute := &GraphiteConfigRoute{ + pattern: matchRegex, + } + shouldBreak := false + sendToStarted := false + Match: + for reader.Scan() { + line := strings.TrimSpace(reader.Text()) + if match, _ := regexp.MatchString(";$", line); match { + shouldBreak = true + line = strings.Trim(line, ";") + } + if len(line) > 0 { + fmt.Printf("Match line: %q\n", line) + if line == "stop" { + configRoute.stop = true + } else if line == "drop" { + configRoute.drop = true + } else if line == "send to" { + sendToStarted = true + } else if sendToStarted { + configRoute.destinations = append(configRoute.destinations, line) + } else { + return nil, fmt.Errorf("Unexpected line: %q", line) + } + } + if shouldBreak { + break Match + } + } + cfg.routes = append(cfg.routes, configRoute) + } + } + return cfg, nil +} diff --git a/graphite/config_reader_test.go b/graphite/config_reader_test.go new file mode 100644 index 0000000..df92574 --- /dev/null +++ b/graphite/config_reader_test.go @@ -0,0 +1,280 @@ +package main + +import ( + "io/ioutil" + "os" + "reflect" + "regexp" + "testing" +) + +func TestConfigReader_FromFile(t *testing.T) { + + tests := []struct { + name string + config string + expectedCfg *GraphiteConfig + }{ + { + name: "single cluster, single route", + config: `cluster 'test-cluster' + jump_fnv1a_ch replication 1 + flow_dst_host:7220=000 + flow_dst_host:7221=001 + flow_dst_host:7222=002 + flow_dst_host:7223=003 + flow_dst_host:7224=004 + ; +match .* + send to + test-cluster + stop;`, + expectedCfg: &GraphiteConfig{ + clusters: map[string]*GraphiteConfigCluster{ + "test-cluster": &GraphiteConfigCluster{ + name: "test-cluster", + ctype: "jump_fnv1a_ch", + replfactor: 1, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7220, + index: 0, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7221, + index: 1, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7222, + index: 2, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7223, + index: 3, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7224, + index: 4, + }, + }, + }, + }, + routes: []*GraphiteConfigRoute{ + &GraphiteConfigRoute{ + pattern: regexp.MustCompile(".*"), + destinations: []string{"test-cluster"}, + stop: true, + drop: false, + }, + }, + }, + }, + { + name: "dual cluster, single route", + config: `cluster test-cluster1 + jump_fnv1a_ch replication 3 + host_1_1:2001=001 + host_1_2:2002=002 + host_1_3:2003=003; + + cluster test-cluster2 + jump_fnv1a_ch replication 2 + host_2_2:2002=02 + host_2_3:2003=03 + host_2_1:2001=01; + + match metrics\..* + send to + test-cluster1 + test-cluster2;`, + expectedCfg: &GraphiteConfig{ + clusters: map[string]*GraphiteConfigCluster{ + "test-cluster1": &GraphiteConfigCluster{ + name: "test-cluster1", + ctype: "jump_fnv1a_ch", + replfactor: 3, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "host_1_1", + port: 2001, + index: 1, + }, + &GraphiteConfigServer{ + host: "host_1_2", + port: 2002, + index: 2, + }, + &GraphiteConfigServer{ + host: "host_1_3", + port: 2003, + index: 3, + }, + }, + }, + "test-cluster2": &GraphiteConfigCluster{ + name: "test-cluster2", + ctype: "jump_fnv1a_ch", + replfactor: 2, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "host_2_1", + port: 2001, + index: 1, + }, + &GraphiteConfigServer{ + host: "host_2_2", + port: 2002, + index: 2, + }, + &GraphiteConfigServer{ + host: "host_2_3", + port: 2003, + index: 3, + }, + }, + }, + }, + routes: []*GraphiteConfigRoute{ + &GraphiteConfigRoute{ + pattern: regexp.MustCompile("metrics\\..*"), + destinations: []string{"test-cluster1", "test-cluster2"}, + stop: false, + drop: false, + }, + }, + }, + }, + { + name: "dual cluster, dual route", + config: `cluster test-cluster1 + jump_fnv1a_ch replication 3 + host_1_1:2001=001 + host_1_2:2002=002 + host_1_3:2003=003; + + cluster test-cluster2 + jump_fnv1a_ch replication 2 + host_2_2:2002=02 + host_2_3:2003=03 + host_2_1:2001=01; + + match metrics\..* + send to + test-cluster1 + test-cluster2; + match .* + drop;`, + expectedCfg: &GraphiteConfig{ + clusters: map[string]*GraphiteConfigCluster{ + "test-cluster1": &GraphiteConfigCluster{ + name: "test-cluster1", + ctype: "jump_fnv1a_ch", + replfactor: 3, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "host_1_1", + port: 2001, + index: 1, + }, + &GraphiteConfigServer{ + host: "host_1_2", + port: 2002, + index: 2, + }, + &GraphiteConfigServer{ + host: "host_1_3", + port: 2003, + index: 3, + }, + }, + }, + "test-cluster2": &GraphiteConfigCluster{ + name: "test-cluster2", + ctype: "jump_fnv1a_ch", + replfactor: 2, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "host_2_1", + port: 2001, + index: 1, + }, + &GraphiteConfigServer{ + host: "host_2_2", + port: 2002, + index: 2, + }, + &GraphiteConfigServer{ + host: "host_2_3", + port: 2003, + index: 3, + }, + }, + }, + }, + routes: []*GraphiteConfigRoute{ + &GraphiteConfigRoute{ + pattern: regexp.MustCompile("metrics\\..*"), + destinations: []string{"test-cluster1", "test-cluster2"}, + stop: false, + drop: false, + }, + &GraphiteConfigRoute{ + pattern: regexp.MustCompile(".*"), + stop: false, + drop: true, + }, + }, + }, + }, + } + + t.Parallel() + + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + tmpFile, err := ioutil.TempFile("/tmp", "flow-graphite-test-config") + if err != nil { + t.Fatalf("Failed to create a tmp file: %s", err) + } + defer os.Remove(tmpFile.Name()) + + if err := ioutil.WriteFile(tmpFile.Name(), []byte(testCase.config), 0644); err != nil { + t.Fatalf("Failed to write the data to tmp file: %s", err) + } + + cfg, err := ConfigFromFile(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to read the config: %s", err) + } + + if !reflect.DeepEqual(cfg.clusters, testCase.expectedCfg.clusters) { + t.Errorf("Diverging config cluster values: want: %+v, got: %+v", + cfg.clusters, testCase.expectedCfg.clusters) + } + + if !reflect.DeepEqual(cfg.routes, testCase.expectedCfg.routes) { + t.Errorf("Diverging config route values: want: %+v, got: %+v", + cfg.routes, testCase.expectedCfg.routes) + for i, route := range cfg.routes { + if i >= len(testCase.expectedCfg.routes) { + t.Errorf("Expected config is missing index %d", i) + } + if !reflect.DeepEqual(route, testCase.expectedCfg.routes[i]) { + t.Errorf("Diverging route value at index %d: got %+v, want: %+v", + i, route, testCase.expectedCfg.routes[i]) + } + } + } + + if !reflect.DeepEqual(cfg, testCase.expectedCfg) { + t.Fatalf("Unexpected config value: %+v, want: %+v", cfg, testCase.expectedCfg) + } + }) + } + +} diff --git a/graphite/configs/flow-graphite-config-ng.yml b/graphite/configs/flow-graphite-config-ng.yml new file mode 100644 index 0000000..22ace75 --- /dev/null +++ b/graphite/configs/flow-graphite-config-ng.yml @@ -0,0 +1,26 @@ +system: + maxprocs: 4 + metrics: + enabled: true + interval: 1 + receiver: + type: graphite + params: + namespace: metrics.flowd + host: localhost + port: 2003 + +components: + udp_rcv: + module: receiver.udp + params: + bind_addr: :3101 + graphite: + plugin: graphite + constructor: New + params: + config: /home/olegs/workspace/golang/src/github.com/whiteboxio/flow-plugins/graphite/configs/graphite-relay.conf + +pipeline: + udp_rcv: + connect: graphite diff --git a/graphite/configs/flow-graphite-config.yml b/graphite/configs/flow-graphite-config.yml deleted file mode 100644 index ec224ae..0000000 --- a/graphite/configs/flow-graphite-config.yml +++ /dev/null @@ -1,111 +0,0 @@ -system: - maxprocs: 4 - metrics: - enabled: true - interval: 1 #second - receiver: - type: graphite - params: - namespace: metrics.flowd - host: localhost - port: 2003 - -components: - udp_rcv_plain: - module: receiver.udp - params: - bind_addr: :3101 - udp_rcv_pickle: - module: receiver.udp - params: - bind_addr: :3102 - rcv_dmx: - module: link.dmx - graphite_msg_parser_plain: - plugin: graphite - constructor: NewMsgParser - params: - format: plain - graphite_msg_parser_pickle: - plugin: graphite - constructor: NewMsgParser - params: - format: pickle - replicator: - module: link.replicator - params: - replicas: 1 - hash_key: metric-name - tcp_7220: - module: sink.tcp - params: - bind_addr: :7220 - tcp_7221: - module: sink.tcp - params: - bind_addr: :7221 - tcp_7222: - module: sink.tcp - params: - bind_addr: :7222 - tcp_7223: - module: sink.tcp - params: - bind_addr: :7223 - tcp_7224: - module: sink.tcp - params: - bind_addr: :7224 - tcp_7225: - module: sink.tcp - params: - bind_addr: :7225 - tcp_7226: - module: sink.tcp - params: - bind_addr: :7226 - tcp_7227: - module: sink.tcp - params: - bind_addr: :7227 - tcp_7228: - module: sink.tcp - params: - bind_addr: :7228 - tcp_7229: - module: sink.tcp - params: - bind_addr: :7229 - tcp_7230: - module: sink.tcp - params: - bind_addr: :7230 - tcp_7231: - module: sink.tcp - params: - bind_addr: :7231 - -pipeline: - udp_rcv_plain: - connect: graphite_msg_parser_plain - udp_rcv_pickle: - connect: graphite_msg_parser_pickle - rcv_dmx: - links: - - graphite_msg_parser_plain - - graphite_msg_parser_pickle - connect: replicator - replicator: - links: - - tcp_7220 - - tcp_7221 - - tcp_7222 - - tcp_7223 - - tcp_7224 - - tcp_7225 - - tcp_7226 - - tcp_7227 - - tcp_7228 - - tcp_7229 - - tcp_7230 - - tcp_7231 \ No newline at end of file diff --git a/graphite/main.go b/graphite/main.go index 5f19b69..7e2b6f8 100644 --- a/graphite/main.go +++ b/graphite/main.go @@ -2,27 +2,119 @@ package main import ( "bytes" + "fmt" + "time" - core "github.com/whiteboxio/flow/pkg/core" + "github.com/whiteboxio/flow/pkg/core" + mpx "github.com/whiteboxio/flow/pkg/link/mpx" + replicator "github.com/whiteboxio/flow/pkg/link/replicator" + tcp_sink "github.com/whiteboxio/flow/pkg/sink/tcp" ) -type MsgParser struct { - Name string +const ( + MsgSendTimeout = 100 * time.Millisecond +) + +type GraphiteLink struct { + name string + config *GraphiteConfig + clusters map[string]core.Link *core.Connector } -func NewMsgParser(name string, params core.Params) (core.Link, error) { - return &MsgParser{name, core.NewConnector()}, nil +func New(name string, params core.Params) (core.Link, error) { + link, err := bootstrap(name, params) + return link, err } -func (mp *MsgParser) Recv(msg *core.Message) error { +func (gl *GraphiteLink) Recv(msg *core.Message) error { + var metricName string if ix := bytes.IndexByte(msg.Payload, ' '); ix != -1 { - metricName := msg.Payload[:ix] - msg.SetMeta("metric-name", metricName) - //log.Infof("Sending metric with name: [%s]", metricName) - return mp.Send(msg) + msg.SetMeta("metric-name", msg.Payload[:ix]) + metricName = string(msg.Payload[:ix]) + } else { + return msg.AckUnroutable() + } + dests := make(map[string]bool) +Routes: + for _, route := range gl.config.routes { + if route.pattern.MatchString(metricName) { + for _, dest := range route.destinations { + dests[dest] = true + } + if route.drop { + continue Routes + } + if route.stop { + break Routes + } + } + } + links := make([]core.Link, len(dests)) + ix := 0 + for dest := range dests { + links[ix] = gl.clusters[dest] + ix++ + } + + return mpx.Multiplex(msg, links, MsgSendTimeout) +} + +func bootstrap(name string, params core.Params) (core.Link, error) { + configPath, ok := params["config"] + if !ok { + return nil, fmt.Errorf("Missing graphite config path") + } + config, err := ConfigFromFile(configPath.(string)) + if err != nil { + return nil, err + } + clusters := make(map[string]core.Link) + for name, cfg := range config.clusters { + cluster, err := buildCluster(cfg) + if err != nil { + return nil, err + } + clusters[name] = cluster } - return msg.AckInvalid() + graphite := &GraphiteLink{ + name, + config, + clusters, + core.NewConnector(), + } + + return graphite, nil +} + +func buildCluster(config *GraphiteConfigCluster) (core.Link, error) { + repl, err := replicator.New(config.name, core.Params{ + "hash_key": "metric-name", + "hash_algo": config.ctype, + "replicas": config.replfactor, + }) + if err != nil { + return nil, err + } + endpoints := make([]core.Link, len(config.servers)) + for ix, serverCfg := range config.servers { + endpoint, err := tcp_sink.New( + fmt.Sprintf("graphite_endpoint_%s_%d", serverCfg.host, serverCfg.port), + core.Params{ + "bind_addr": fmt.Sprintf("%s:%d", serverCfg.host, serverCfg.port), + }, + ) + if err != nil { + return nil, err + } + endpoints[ix] = endpoint + } + + if err := repl.LinkTo(endpoints); err != nil { + return nil, err + } + + return repl, nil } func main() {}