From 66724d418147af093c5b1950a600a9fa163996c2 Mon Sep 17 00:00:00 2001 From: icanhazbroccoli Date: Fri, 21 Sep 2018 08:58:20 +0200 Subject: [PATCH 1/8] Started implementing Graphite config parser: shaped up the basic data structs --- graphite/config_reader.go | 51 ++++++++++++++++++++++++++++++++ graphite/config_reader_test.go | 54 ++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 graphite/config_reader.go create mode 100644 graphite/config_reader_test.go diff --git a/graphite/config_reader.go b/graphite/config_reader.go new file mode 100644 index 0000000..b10ef6d --- /dev/null +++ b/graphite/config_reader.go @@ -0,0 +1,51 @@ +package main + +import ( + "net" +) + +type GraphiteConfigAggregator struct { + //TODO +} + +type GraphiteConfigCluster struct { + name string + ctype string + replfactor uint + servers []*GraphiteConfigServer + next *GraphiteConfigCluster +} + +type GraphiteConfigRoute struct { + pattern string + destinations []*GraphiteConfigCluster + stop bool + next *GraphiteConfigRoute +} + +type GraphiteConfigServer struct { + ip net.IP + port uint8 +} + +type GraphiteConfig struct { + aggregators []GraphiteConfigAggregator + clusters []GraphiteConfigCluster + routes []GraphiteConfigRoute + servers []GraphiteConfigServer +} + +func New() *GraphiteConfig { + return &GraphiteConfig{ + aggregators: make([]GraphiteConfigAggregator, 0), + clusters: make([]GraphiteConfigCluster, 0), + routes: make([]GraphiteConfigRoute, 0), + servers: make([]GraphiteConfigServer, 0), + } +} + +func ReadFile(path string) (*GraphiteConfig, error) { + //TODO + cfg := New() + return cfg, nil +} diff --git a/graphite/config_reader_test.go b/graphite/config_reader_test.go new file mode 100644 index 0000000..e3e9c91 --- /dev/null +++ b/graphite/config_reader_test.go @@ -0,0 +1,54 @@ +package main + +import ( + "io/ioutil" + "os" + "reflect" + "testing" +) + +func TestConfigReader_ReadFile(t *testing.T) { + + configData := []byte(` +cluster test-cluster + jump_fnv1a_ch replication 1 + flow_eventlog_tcp_7220:7220=000 + flow_eventlog_tcp_7221:7221=001 + flow_eventlog_tcp_7222:7222=002 + flow_eventlog_tcp_7223:7223=003 + flow_eventlog_tcp_7224:7224=004 + ; +match .* + send to + test-cluster + stop; + +`) + + tmpFile, err := ioutil.TempFile("/tmp", "flow-graphite-test-config") + if err != nil { + t.Fatalf("Failed to create a tmp file: %s", err) + } + defer os.Remove(tmpFile.Name()) + + if err := ioutil.WriteFile(tmpFile.Name(), configData, 0644); err != nil { + t.Fatalf("Failed to write the data to tmp file: %s", err) + } + + cfg, err := ReadFile(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to read the config: %s", err) + } + + expectedCfg := &GraphiteConfig{ + clusters: []GraphiteConfigCluster{ + {}, + }, + routes: []GraphiteConfigRoute{ + {}, + }, + } + if !reflect.DeepEqual(cfg, expectedCfg) { + t.Fatalf("Unexpected config value: %+v, want: %+v", cfg, expectedCfg) + } +} From 666f999748ca5c478733e4f7ad3e21e7926e4986 Mon Sep 17 00:00:00 2001 From: icanhazbroccoli Date: Mon, 1 Oct 2018 22:40:49 +0200 Subject: [PATCH 2/8] Implemented cluster definition and match rule graphite config parser --- graphite/config_reader.go | 142 ++++++++++++++++++++++++++++++--- graphite/config_reader_test.go | 6 +- graphite/main.go | 1 + 3 files changed, 137 insertions(+), 12 deletions(-) diff --git a/graphite/config_reader.go b/graphite/config_reader.go index b10ef6d..9a55ebc 100644 --- a/graphite/config_reader.go +++ b/graphite/config_reader.go @@ -1,7 +1,13 @@ package main import ( - "net" + "bufio" + "fmt" + "os" + "regexp" + "sort" + "strconv" + "strings" ) type GraphiteConfigAggregator struct { @@ -13,19 +19,19 @@ type GraphiteConfigCluster struct { ctype string replfactor uint servers []*GraphiteConfigServer - next *GraphiteConfigCluster } type GraphiteConfigRoute struct { - pattern string - destinations []*GraphiteConfigCluster + pattern *regexp.Regexp + destinations []string stop bool - next *GraphiteConfigRoute + drop bool } type GraphiteConfigServer struct { - ip net.IP - port uint8 + host string + port uint16 + index uint32 } type GraphiteConfig struct { @@ -44,8 +50,126 @@ func New() *GraphiteConfig { } } -func ReadFile(path string) (*GraphiteConfig, error) { - //TODO +func FromFile(path string) (*GraphiteConfig, error) { + var err error cfg := New() + + file, err := os.Open(path) + if err != nil { + return nil, err + } + defer file.Close() + + reader := bufio.NewScanner(file) + + for reader.Scan() { + line := strings.TrimSpace(reader.Text()) + if len(line) == 0 { + continue + } + if match, _ := regexp.MatchString("^cluster", line); match { + cluster := GraphiteConfigCluster{ + name: strings.Trim(line[7:], " '\""), + } + replRegex, err := regexp.Compile("([\\w\\d_]+)\\s+replication\\s+(\\d+)") + if err != nil { + return nil, err + } + if !reader.Scan() { + return nil, fmt.Errorf("Expected to get a cluster definition "+ + "for cluster %q, got none", cluster.name) + } + replLine := reader.Text() + replMatch := replRegex.FindStringSubmatch(replLine) + if len(replMatch) == 0 { + return nil, fmt.Errorf("Unexpected replication definition found: "+ + "%q, can not parse it", replLine) + } + hashAlg, replFactorStr := replMatch[1], replMatch[2] + replFactor, err := strconv.ParseUint(replFactorStr, 10, 32) + if err != nil { + return nil, fmt.Errorf("Failed to parse replication factor: %q", + replFactorStr) + } + cluster.replfactor = uint(replFactor) + cluster.ctype = hashAlg + clusterServerRegex := regexp.MustCompile("^(\\w+):(\\d+)=(\\d+)$") + Cluster: + for reader.Scan() { + clusterLine := strings.TrimSpace(reader.Text()) + shouldBreak := false + if match, _ := regexp.MatchString(";$", clusterLine); match { + shouldBreak = true + clusterLine = strings.Trim(clusterLine, ";") + } + if len(clusterLine) != 0 { + match := clusterServerRegex.FindStringSubmatch(clusterLine) + if len(match) == 0 { + return nil, fmt.Errorf("Failed to parse cluster server"+ + " config: %q", clusterLine) + } + serverPort, err := strconv.ParseUint(match[2], 10, 16) + if err != nil { + return nil, err + } + serverIndex, err := strconv.ParseUint(match[3], 10, 32) + if err != nil { + return nil, err + } + server := &GraphiteConfigServer{ + host: match[1], + port: uint16(serverPort), + index: uint32(serverIndex), + } + cluster.servers = append(cluster.servers, server) + } + if shouldBreak { + sort.Slice(cluster.servers, func(i, j int) bool { + return cluster.servers[i].index < cluster.servers[j].index + }) + cfg.clusters = append(cfg.clusters, cluster) + break Cluster + } + } + } else if match, _ := regexp.MatchString("^match", line); match { + matchRule := strings.TrimSpace(line[5:]) + fmt.Printf("Route definition started: %q\n", matchRule) + matchRegex, err := regexp.Compile(matchRule) + if err != nil { + return nil, err + } + configRoute := GraphiteConfigRoute{ + pattern: matchRegex, + } + shouldBreak := false + sendToStarted := false + Match: + for reader.Scan() { + line := strings.TrimSpace(reader.Text()) + if match, _ := regexp.MatchString(";$", line); match { + shouldBreak = true + line = strings.Trim(line, ";") + } + if len(line) > 0 { + fmt.Printf("Match line: %q\n", line) + if line == "stop" { + configRoute.stop = true + } else if line == "drop" { + configRoute.drop = true + } else if line == "send to" { + sendToStarted = true + } else if sendToStarted { + configRoute.destinations = append(configRoute.destinations, line) + } else { + return nil, fmt.Errorf("Unexpected line: %q", line) + } + } + if shouldBreak { + break Match + } + } + cfg.routes = append(cfg.routes, configRoute) + } + } return cfg, nil } diff --git a/graphite/config_reader_test.go b/graphite/config_reader_test.go index e3e9c91..5df1f48 100644 --- a/graphite/config_reader_test.go +++ b/graphite/config_reader_test.go @@ -7,10 +7,10 @@ import ( "testing" ) -func TestConfigReader_ReadFile(t *testing.T) { +func TestConfigReader_FromFile(t *testing.T) { configData := []byte(` -cluster test-cluster +cluster 'test-cluster' jump_fnv1a_ch replication 1 flow_eventlog_tcp_7220:7220=000 flow_eventlog_tcp_7221:7221=001 @@ -35,7 +35,7 @@ match .* t.Fatalf("Failed to write the data to tmp file: %s", err) } - cfg, err := ReadFile(tmpFile.Name()) + cfg, err := FromFile(tmpFile.Name()) if err != nil { t.Fatalf("Failed to read the config: %s", err) } diff --git a/graphite/main.go b/graphite/main.go index 5f19b69..36d93a5 100644 --- a/graphite/main.go +++ b/graphite/main.go @@ -16,6 +16,7 @@ func NewMsgParser(name string, params core.Params) (core.Link, error) { } func (mp *MsgParser) Recv(msg *core.Message) error { + //fmt.Printf("Graphite plugin received a new message: {%s}\n", msg.Payload) if ix := bytes.IndexByte(msg.Payload, ' '); ix != -1 { metricName := msg.Payload[:ix] msg.SetMeta("metric-name", metricName) From 94292057ac930c5b432befb7817e0baba3b442ed Mon Sep 17 00:00:00 2001 From: icanhazbroccoli Date: Tue, 2 Oct 2018 08:38:27 +0200 Subject: [PATCH 3/8] Some more progress on graphite plugin config reader --- graphite/config_reader.go | 20 +++++----- graphite/config_reader_test.go | 67 ++++++++++++++++++++++++++++------ 2 files changed, 65 insertions(+), 22 deletions(-) diff --git a/graphite/config_reader.go b/graphite/config_reader.go index 9a55ebc..259ac41 100644 --- a/graphite/config_reader.go +++ b/graphite/config_reader.go @@ -35,18 +35,16 @@ type GraphiteConfigServer struct { } type GraphiteConfig struct { - aggregators []GraphiteConfigAggregator - clusters []GraphiteConfigCluster - routes []GraphiteConfigRoute - servers []GraphiteConfigServer + aggregators []*GraphiteConfigAggregator + clusters map[string]*GraphiteConfigCluster + routes []*GraphiteConfigRoute } func New() *GraphiteConfig { return &GraphiteConfig{ - aggregators: make([]GraphiteConfigAggregator, 0), - clusters: make([]GraphiteConfigCluster, 0), - routes: make([]GraphiteConfigRoute, 0), - servers: make([]GraphiteConfigServer, 0), + aggregators: make([]*GraphiteConfigAggregator, 0), + clusters: make(map[string]*GraphiteConfigCluster), + routes: make([]*GraphiteConfigRoute, 0), } } @@ -68,7 +66,7 @@ func FromFile(path string) (*GraphiteConfig, error) { continue } if match, _ := regexp.MatchString("^cluster", line); match { - cluster := GraphiteConfigCluster{ + cluster := &GraphiteConfigCluster{ name: strings.Trim(line[7:], " '\""), } replRegex, err := regexp.Compile("([\\w\\d_]+)\\s+replication\\s+(\\d+)") @@ -127,10 +125,10 @@ func FromFile(path string) (*GraphiteConfig, error) { sort.Slice(cluster.servers, func(i, j int) bool { return cluster.servers[i].index < cluster.servers[j].index }) - cfg.clusters = append(cfg.clusters, cluster) break Cluster } } + cfg.clusters[cluster.name] = cluster } else if match, _ := regexp.MatchString("^match", line); match { matchRule := strings.TrimSpace(line[5:]) fmt.Printf("Route definition started: %q\n", matchRule) @@ -138,7 +136,7 @@ func FromFile(path string) (*GraphiteConfig, error) { if err != nil { return nil, err } - configRoute := GraphiteConfigRoute{ + configRoute := &GraphiteConfigRoute{ pattern: matchRegex, } shouldBreak := false diff --git a/graphite/config_reader_test.go b/graphite/config_reader_test.go index 5df1f48..c11e0a1 100644 --- a/graphite/config_reader_test.go +++ b/graphite/config_reader_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "reflect" + "regexp" "testing" ) @@ -12,11 +13,11 @@ func TestConfigReader_FromFile(t *testing.T) { configData := []byte(` cluster 'test-cluster' jump_fnv1a_ch replication 1 - flow_eventlog_tcp_7220:7220=000 - flow_eventlog_tcp_7221:7221=001 - flow_eventlog_tcp_7222:7222=002 - flow_eventlog_tcp_7223:7223=003 - flow_eventlog_tcp_7224:7224=004 + flow_dst_host:7220=000 + flow_dst_host:7221=001 + flow_dst_host:7222=002 + flow_dst_host:7223=003 + flow_dst_host:7224=004 ; match .* send to @@ -41,14 +42,58 @@ match .* } expectedCfg := &GraphiteConfig{ - clusters: []GraphiteConfigCluster{ - {}, + clusters: map[string]*GraphiteConfigCluster{ + "test-cluster": &GraphiteConfigCluster{ + name: "test-cluster", + ctype: "jump_fnv1a_ch", + replfactor: 1, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7220, + index: 0, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7221, + index: 1, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7222, + index: 2, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7223, + index: 3, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7224, + index: 4, + }, + }, + }, }, - routes: []GraphiteConfigRoute{ - {}, + routes: []*GraphiteConfigRoute{ + &GraphiteConfigRoute{ + pattern: regexp.MustCompile(".*"), + destinations: []string{"test-cluster"}, + stop: true, + drop: false, + }, }, } - if !reflect.DeepEqual(cfg, expectedCfg) { - t.Fatalf("Unexpected config value: %+v, want: %+v", cfg, expectedCfg) + if !reflect.DeepEqual(cfg.clusters, expectedCfg.clusters) { + t.Fatalf("Mismatch in config clsuters: got: %+v, want: %+v", + cfg.clusters, expectedCfg.clusters) } + if !reflect.DeepEqual(cfg.routes, expectedCfg.routes) { + t.Fatalf("Mismatch in config routes: got: %+v, want: %+v", + cfg.routes, expectedCfg.routes) + } + //if !reflect.DeepEqual(cfg, expectedCfg) { + // t.Fatalf("Unexpected config value: %+v, want: %+v", cfg, expectedCfg) + //} } From 7b7a4e8f0ec946b792073b886626459a660aa64a Mon Sep 17 00:00:00 2001 From: icanhazbroccoli Date: Wed, 3 Oct 2018 08:51:37 +0200 Subject: [PATCH 4/8] More tests for graphite config parser --- graphite/config_reader.go | 12 +- graphite/config_reader_test.go | 231 +++++++++++++++++++++++---------- 2 files changed, 171 insertions(+), 72 deletions(-) diff --git a/graphite/config_reader.go b/graphite/config_reader.go index 259ac41..6af0cae 100644 --- a/graphite/config_reader.go +++ b/graphite/config_reader.go @@ -35,16 +35,16 @@ type GraphiteConfigServer struct { } type GraphiteConfig struct { - aggregators []*GraphiteConfigAggregator - clusters map[string]*GraphiteConfigCluster - routes []*GraphiteConfigRoute + //aggregators []*GraphiteConfigAggregator + clusters map[string]*GraphiteConfigCluster + routes []*GraphiteConfigRoute } func New() *GraphiteConfig { return &GraphiteConfig{ - aggregators: make([]*GraphiteConfigAggregator, 0), - clusters: make(map[string]*GraphiteConfigCluster), - routes: make([]*GraphiteConfigRoute, 0), + //aggregators: make([]*GraphiteConfigAggregator, 0), + clusters: make(map[string]*GraphiteConfigCluster), + routes: make([]*GraphiteConfigRoute, 0), } } diff --git a/graphite/config_reader_test.go b/graphite/config_reader_test.go index c11e0a1..2fb67c0 100644 --- a/graphite/config_reader_test.go +++ b/graphite/config_reader_test.go @@ -10,8 +10,14 @@ import ( func TestConfigReader_FromFile(t *testing.T) { - configData := []byte(` -cluster 'test-cluster' + tests := []struct { + name string + config string + expectedCfg *GraphiteConfig + }{ + { + name: "single cluster, single route", + config: `cluster 'test-cluster' jump_fnv1a_ch replication 1 flow_dst_host:7220=000 flow_dst_host:7221=001 @@ -22,78 +28,171 @@ cluster 'test-cluster' match .* send to test-cluster - stop; - -`) - - tmpFile, err := ioutil.TempFile("/tmp", "flow-graphite-test-config") - if err != nil { - t.Fatalf("Failed to create a tmp file: %s", err) - } - defer os.Remove(tmpFile.Name()) - - if err := ioutil.WriteFile(tmpFile.Name(), configData, 0644); err != nil { - t.Fatalf("Failed to write the data to tmp file: %s", err) - } - - cfg, err := FromFile(tmpFile.Name()) - if err != nil { - t.Fatalf("Failed to read the config: %s", err) - } - - expectedCfg := &GraphiteConfig{ - clusters: map[string]*GraphiteConfigCluster{ - "test-cluster": &GraphiteConfigCluster{ - name: "test-cluster", - ctype: "jump_fnv1a_ch", - replfactor: 1, - servers: []*GraphiteConfigServer{ - &GraphiteConfigServer{ - host: "flow_dst_host", - port: 7220, - index: 0, + stop;`, + expectedCfg: &GraphiteConfig{ + clusters: map[string]*GraphiteConfigCluster{ + "test-cluster": &GraphiteConfigCluster{ + name: "test-cluster", + ctype: "jump_fnv1a_ch", + replfactor: 1, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7220, + index: 0, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7221, + index: 1, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7222, + index: 2, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7223, + index: 3, + }, + &GraphiteConfigServer{ + host: "flow_dst_host", + port: 7224, + index: 4, + }, + }, }, - &GraphiteConfigServer{ - host: "flow_dst_host", - port: 7221, - index: 1, + }, + routes: []*GraphiteConfigRoute{ + &GraphiteConfigRoute{ + pattern: regexp.MustCompile(".*"), + destinations: []string{"test-cluster"}, + stop: true, + drop: false, }, - &GraphiteConfigServer{ - host: "flow_dst_host", - port: 7222, - index: 2, + }, + }, + }, + { + name: "dual cluster, single route", + config: `cluster test-cluster1 + jump_fnv1a_ch replication 3 + host_1_1:2001=001 + host_1_2:2002=002 + host_1_3:2003=003; + + cluster test-cluster2 + jump_fnv1a_ch replication 2 + host_2_2:2002=02 + host_2_3:2003=03 + host_2_1:2001=01; + + match metrics\..* + send to + test-cluster1 + test-cluster2;`, + expectedCfg: &GraphiteConfig{ + clusters: map[string]*GraphiteConfigCluster{ + "test-cluster1": &GraphiteConfigCluster{ + name: "test-cluster1", + ctype: "jump_fnv1a_ch", + replfactor: 3, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "host_1_1", + port: 2001, + index: 1, + }, + &GraphiteConfigServer{ + host: "host_1_2", + port: 2002, + index: 2, + }, + &GraphiteConfigServer{ + host: "host_1_3", + port: 2003, + index: 3, + }, + }, }, - &GraphiteConfigServer{ - host: "flow_dst_host", - port: 7223, - index: 3, + "test-cluster2": &GraphiteConfigCluster{ + name: "test-cluster2", + ctype: "jump_fnv1a_ch", + replfactor: 2, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "host_2_1", + port: 2001, + index: 1, + }, + &GraphiteConfigServer{ + host: "host_2_2", + port: 2002, + index: 2, + }, + &GraphiteConfigServer{ + host: "host_2_3", + port: 2003, + index: 3, + }, + }, }, - &GraphiteConfigServer{ - host: "flow_dst_host", - port: 7224, - index: 4, + }, + routes: []*GraphiteConfigRoute{ + &GraphiteConfigRoute{ + pattern: regexp.MustCompile("metrics\\..*"), + destinations: []string{"test-cluster1", "test-cluster2"}, + stop: false, + drop: false, }, }, }, }, - routes: []*GraphiteConfigRoute{ - &GraphiteConfigRoute{ - pattern: regexp.MustCompile(".*"), - destinations: []string{"test-cluster"}, - stop: true, - drop: false, - }, - }, } - if !reflect.DeepEqual(cfg.clusters, expectedCfg.clusters) { - t.Fatalf("Mismatch in config clsuters: got: %+v, want: %+v", - cfg.clusters, expectedCfg.clusters) - } - if !reflect.DeepEqual(cfg.routes, expectedCfg.routes) { - t.Fatalf("Mismatch in config routes: got: %+v, want: %+v", - cfg.routes, expectedCfg.routes) + + t.Parallel() + + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + tmpFile, err := ioutil.TempFile("/tmp", "flow-graphite-test-config") + if err != nil { + t.Fatalf("Failed to create a tmp file: %s", err) + } + defer os.Remove(tmpFile.Name()) + + if err := ioutil.WriteFile(tmpFile.Name(), []byte(testCase.config), 0644); err != nil { + t.Fatalf("Failed to write the data to tmp file: %s", err) + } + + cfg, err := FromFile(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to read the config: %s", err) + } + + if !reflect.DeepEqual(cfg.clusters, testCase.expectedCfg.clusters) { + t.Errorf("Diverging config cluster values: want: %+v, got: %+v", + cfg.clusters, testCase.expectedCfg.clusters) + } + + if !reflect.DeepEqual(cfg.routes, testCase.expectedCfg.routes) { + t.Errorf("Diverging config route values: want: %+v, got: %+v", + cfg.routes, testCase.expectedCfg.routes) + for i, route := range cfg.routes { + if i >= len(testCase.expectedCfg.routes) { + t.Errorf("Expected config is missing index %d", i) + } + if !reflect.DeepEqual(route, testCase.expectedCfg.routes[i]) { + t.Errorf("Diverging route value at index %d: got %+v, want: %+v", + i, route, testCase.expectedCfg.routes[i]) + } + } + } + + if !reflect.DeepEqual(cfg, testCase.expectedCfg) { + t.Fatalf("Unexpected config value: %+v, want: %+v", cfg, testCase.expectedCfg) + } + }) } - //if !reflect.DeepEqual(cfg, expectedCfg) { - // t.Fatalf("Unexpected config value: %+v, want: %+v", cfg, expectedCfg) - //} + } From 785c8f128b10b710bae079b310fd6ab6a9eb2792 Mon Sep 17 00:00:00 2001 From: icanhazbroccoli Date: Wed, 3 Oct 2018 22:24:47 +0200 Subject: [PATCH 5/8] Implemented a draft version of graphite link --- graphite/config_reader.go | 7 +-- graphite/config_reader_test.go | 84 +++++++++++++++++++++++++- graphite/main.go | 105 +++++++++++++++++++++++++++++---- 3 files changed, 178 insertions(+), 18 deletions(-) diff --git a/graphite/config_reader.go b/graphite/config_reader.go index 6af0cae..5457bdf 100644 --- a/graphite/config_reader.go +++ b/graphite/config_reader.go @@ -40,17 +40,16 @@ type GraphiteConfig struct { routes []*GraphiteConfigRoute } -func New() *GraphiteConfig { +func NewConfig() *GraphiteConfig { return &GraphiteConfig{ - //aggregators: make([]*GraphiteConfigAggregator, 0), clusters: make(map[string]*GraphiteConfigCluster), routes: make([]*GraphiteConfigRoute, 0), } } -func FromFile(path string) (*GraphiteConfig, error) { +func ConfigFromFile(path string) (*GraphiteConfig, error) { var err error - cfg := New() + cfg := NewConfig() file, err := os.Open(path) if err != nil { diff --git a/graphite/config_reader_test.go b/graphite/config_reader_test.go index 2fb67c0..df92574 100644 --- a/graphite/config_reader_test.go +++ b/graphite/config_reader_test.go @@ -149,6 +149,88 @@ match .* }, }, }, + { + name: "dual cluster, dual route", + config: `cluster test-cluster1 + jump_fnv1a_ch replication 3 + host_1_1:2001=001 + host_1_2:2002=002 + host_1_3:2003=003; + + cluster test-cluster2 + jump_fnv1a_ch replication 2 + host_2_2:2002=02 + host_2_3:2003=03 + host_2_1:2001=01; + + match metrics\..* + send to + test-cluster1 + test-cluster2; + match .* + drop;`, + expectedCfg: &GraphiteConfig{ + clusters: map[string]*GraphiteConfigCluster{ + "test-cluster1": &GraphiteConfigCluster{ + name: "test-cluster1", + ctype: "jump_fnv1a_ch", + replfactor: 3, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "host_1_1", + port: 2001, + index: 1, + }, + &GraphiteConfigServer{ + host: "host_1_2", + port: 2002, + index: 2, + }, + &GraphiteConfigServer{ + host: "host_1_3", + port: 2003, + index: 3, + }, + }, + }, + "test-cluster2": &GraphiteConfigCluster{ + name: "test-cluster2", + ctype: "jump_fnv1a_ch", + replfactor: 2, + servers: []*GraphiteConfigServer{ + &GraphiteConfigServer{ + host: "host_2_1", + port: 2001, + index: 1, + }, + &GraphiteConfigServer{ + host: "host_2_2", + port: 2002, + index: 2, + }, + &GraphiteConfigServer{ + host: "host_2_3", + port: 2003, + index: 3, + }, + }, + }, + }, + routes: []*GraphiteConfigRoute{ + &GraphiteConfigRoute{ + pattern: regexp.MustCompile("metrics\\..*"), + destinations: []string{"test-cluster1", "test-cluster2"}, + stop: false, + drop: false, + }, + &GraphiteConfigRoute{ + pattern: regexp.MustCompile(".*"), + stop: false, + drop: true, + }, + }, + }, + }, } t.Parallel() @@ -165,7 +247,7 @@ match .* t.Fatalf("Failed to write the data to tmp file: %s", err) } - cfg, err := FromFile(tmpFile.Name()) + cfg, err := ConfigFromFile(tmpFile.Name()) if err != nil { t.Fatalf("Failed to read the config: %s", err) } diff --git a/graphite/main.go b/graphite/main.go index 36d93a5..c5ebeb7 100644 --- a/graphite/main.go +++ b/graphite/main.go @@ -2,28 +2,107 @@ package main import ( "bytes" + "fmt" - core "github.com/whiteboxio/flow/pkg/core" + "github.com/whiteboxio/flow/pkg/core" + replicator "github.com/whiteboxio/flow/pkg/link/replicator" + tcp_sink "github.com/whiteboxio/flow/pkg/sink/tcp" ) -type MsgParser struct { - Name string +type GraphiteLink struct { + name string + config *GraphiteConfig + clusters map[string]core.Link *core.Connector } -func NewMsgParser(name string, params core.Params) (core.Link, error) { - return &MsgParser{name, core.NewConnector()}, nil +func New(name string, params core.Params) (core.Link, error) { + link, err := bootstrap(name, params) + return link, err } -func (mp *MsgParser) Recv(msg *core.Message) error { - //fmt.Printf("Graphite plugin received a new message: {%s}\n", msg.Payload) +func (gl *GraphiteLink) Recv(msg *core.Message) error { + var metricName string if ix := bytes.IndexByte(msg.Payload, ' '); ix != -1 { - metricName := msg.Payload[:ix] - msg.SetMeta("metric-name", metricName) - //log.Infof("Sending metric with name: [%s]", metricName) - return mp.Send(msg) + msg.SetMeta("metric-name", msg.Payload[:ix]) + metricName = string(msg.Payload[:ix]) + } else { + return msg.AckUnroutable() } - return msg.AckInvalid() +Routes: + for _, route := range gl.config.routes { + if route.pattern.MatchString(metricName) { + Dst: + for _, dst := range route.destinations { + endpoint, ok := gl.clusters[dst] + if !ok { + continue Dst + } + //TODO: Collect msg submit statuses and return the composite status + msgCp := core.CpMessage(msg) + go endpoint.Recv(msgCp) + } + if route.stop { + break Routes + } + } + } + + return msg.AckDone() +} + +func bootstrap(name string, params core.Params) (core.Link, error) { + configPath, ok := params["config"] + if !ok { + return nil, fmt.Errorf("Missing graphite config path") + } + config, err := ConfigFromFile(configPath.(string)) + if err != nil { + return nil, err + } + clusters := make(map[string]core.Link) + for name, cfg := range config.clusters { + cluster, err := buildCluster(cfg) + if err != nil { + return nil, err + } + clusters[name] = cluster + } + graphite := &GraphiteLink{ + name, + config, + clusters, + core.NewConnector(), + } + + return graphite, nil } -func main() {} +func buildCluster(config *GraphiteConfigCluster) (core.Link, error) { + repl, err := replicator.New(config.name, core.Params{ + "hash_key": "metric-name", + "hash_algo": config.ctype, + "replicas": config.replfactor, + }) + if err != nil { + return nil, err + } + endpoints := make([]core.Link, len(config.servers)) + for ix, serverCfg := range config.servers { + endpoint, err := tcp_sink.New( + fmt.Sprintf("graphite_endpoint_%s_%d", serverCfg.host, serverCfg.port), + core.Params{ + "bind_addr": fmt.Sprintf("%s:%d", serverCfg.host, serverCfg.port), + }, + ) + if err != nil { + return nil, err + } + endpoints[ix] = endpoint + } + if err := repl.LinkTo(endpoints); err != nil { + return nil, err + } + + return repl, nil +} From 61443721e2c6e15a6c5327b58c5337571a087daa Mon Sep 17 00:00:00 2001 From: icanhazbroccoli Date: Thu, 4 Oct 2018 08:10:08 +0200 Subject: [PATCH 6/8] Implemented message synchronisation mechanism in graphite sender --- graphite/main.go | 45 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/graphite/main.go b/graphite/main.go index c5ebeb7..ca32c38 100644 --- a/graphite/main.go +++ b/graphite/main.go @@ -3,12 +3,19 @@ package main import ( "bytes" "fmt" + "sync" + "sync/atomic" + "time" "github.com/whiteboxio/flow/pkg/core" replicator "github.com/whiteboxio/flow/pkg/link/replicator" tcp_sink "github.com/whiteboxio/flow/pkg/sink/tcp" ) +const ( + MsgSendTimeout = 100 * time.Millisecond +) + type GraphiteLink struct { name string config *GraphiteConfig @@ -29,6 +36,8 @@ func (gl *GraphiteLink) Recv(msg *core.Message) error { } else { return msg.AckUnroutable() } + var succCnt, totalCnt, failCnt int32 = 0, 0, 0 + wg := &sync.WaitGroup{} Routes: for _, route := range gl.config.routes { if route.pattern.MatchString(metricName) { @@ -38,17 +47,43 @@ Routes: if !ok { continue Dst } - //TODO: Collect msg submit statuses and return the composite status msgCp := core.CpMessage(msg) - go endpoint.Recv(msgCp) + wg.Add(1) + totalCnt++ + go func() { + if err := endpoint.Recv(msgCp); err != nil { + atomic.AddInt32(&failCnt, 1) + } else { + atomic.AddInt32(&succCnt, 1) + } + wg.Done() + }() } if route.stop { break Routes } } } - - return msg.AckDone() + done := make(chan bool) + go func() { + wg.Wait() + done <- true + close(done) + }() + select { + case <-done: + if failCnt != 0 { + if failCnt == totalCnt { + return msg.AckFailed() + } else { + return msg.AckPartialSend() + } + } else { + return msg.AckDone() + } + case <-time.After(MsgSendTimeout): + return msg.AckTimedOut() + } } func bootstrap(name string, params core.Params) (core.Link, error) { @@ -106,3 +141,5 @@ func buildCluster(config *GraphiteConfigCluster) (core.Link, error) { return repl, nil } + +func main() {} From e9714fc328a214e564788416c569dba7885fdd59 Mon Sep 17 00:00:00 2001 From: icanhazbroccoli Date: Thu, 4 Oct 2018 22:56:01 +0200 Subject: [PATCH 7/8] Replaced graphite multiplex with mpx.Multiplex --- graphite/main.go | 52 +++++++++++------------------------------------- 1 file changed, 12 insertions(+), 40 deletions(-) diff --git a/graphite/main.go b/graphite/main.go index ca32c38..eb2ae3c 100644 --- a/graphite/main.go +++ b/graphite/main.go @@ -3,11 +3,10 @@ package main import ( "bytes" "fmt" - "sync" - "sync/atomic" "time" "github.com/whiteboxio/flow/pkg/core" + mpx "github.com/whiteboxio/flow/pkg/link/mpx" replicator "github.com/whiteboxio/flow/pkg/link/replicator" tcp_sink "github.com/whiteboxio/flow/pkg/sink/tcp" ) @@ -36,54 +35,26 @@ func (gl *GraphiteLink) Recv(msg *core.Message) error { } else { return msg.AckUnroutable() } - var succCnt, totalCnt, failCnt int32 = 0, 0, 0 - wg := &sync.WaitGroup{} + dests := make(map[string]bool) Routes: for _, route := range gl.config.routes { if route.pattern.MatchString(metricName) { - Dst: - for _, dst := range route.destinations { - endpoint, ok := gl.clusters[dst] - if !ok { - continue Dst - } - msgCp := core.CpMessage(msg) - wg.Add(1) - totalCnt++ - go func() { - if err := endpoint.Recv(msgCp); err != nil { - atomic.AddInt32(&failCnt, 1) - } else { - atomic.AddInt32(&succCnt, 1) - } - wg.Done() - }() + for _, dest := range route.destinations { + dests[dest] = true } if route.stop { break Routes } } } - done := make(chan bool) - go func() { - wg.Wait() - done <- true - close(done) - }() - select { - case <-done: - if failCnt != 0 { - if failCnt == totalCnt { - return msg.AckFailed() - } else { - return msg.AckPartialSend() - } - } else { - return msg.AckDone() - } - case <-time.After(MsgSendTimeout): - return msg.AckTimedOut() + links := make([]core.Link, len(dests)) + ix := 0 + for dest := range dests { + links[ix] = gl.clusters[dest] + ix++ } + + return mpx.Multiplex(msg, links, MsgSendTimeout) } func bootstrap(name string, params core.Params) (core.Link, error) { @@ -135,6 +106,7 @@ func buildCluster(config *GraphiteConfigCluster) (core.Link, error) { } endpoints[ix] = endpoint } + if err := repl.LinkTo(endpoints); err != nil { return nil, err } From 003912543cf4c3ca0a4fa0624e7a504fd99941f9 Mon Sep 17 00:00:00 2001 From: icanhazbroccoli Date: Sat, 6 Oct 2018 20:46:46 +0200 Subject: [PATCH 8/8] Fixed some issues with data types in graphite config reader --- graphite/config_reader.go | 10 +- graphite/configs/flow-graphite-config-ng.yml | 26 +++++ graphite/configs/flow-graphite-config.yml | 111 ------------------- graphite/main.go | 3 + 4 files changed, 34 insertions(+), 116 deletions(-) create mode 100644 graphite/configs/flow-graphite-config-ng.yml delete mode 100644 graphite/configs/flow-graphite-config.yml diff --git a/graphite/config_reader.go b/graphite/config_reader.go index 5457bdf..5bff512 100644 --- a/graphite/config_reader.go +++ b/graphite/config_reader.go @@ -17,7 +17,7 @@ type GraphiteConfigAggregator struct { type GraphiteConfigCluster struct { name string ctype string - replfactor uint + replfactor int servers []*GraphiteConfigServer } @@ -83,12 +83,12 @@ func ConfigFromFile(path string) (*GraphiteConfig, error) { "%q, can not parse it", replLine) } hashAlg, replFactorStr := replMatch[1], replMatch[2] - replFactor, err := strconv.ParseUint(replFactorStr, 10, 32) + replFactor, err := strconv.ParseInt(replFactorStr, 10, 32) if err != nil { return nil, fmt.Errorf("Failed to parse replication factor: %q", replFactorStr) } - cluster.replfactor = uint(replFactor) + cluster.replfactor = int(replFactor) cluster.ctype = hashAlg clusterServerRegex := regexp.MustCompile("^(\\w+):(\\d+)=(\\d+)$") Cluster: @@ -105,11 +105,11 @@ func ConfigFromFile(path string) (*GraphiteConfig, error) { return nil, fmt.Errorf("Failed to parse cluster server"+ " config: %q", clusterLine) } - serverPort, err := strconv.ParseUint(match[2], 10, 16) + serverPort, err := strconv.ParseInt(match[2], 10, 16) if err != nil { return nil, err } - serverIndex, err := strconv.ParseUint(match[3], 10, 32) + serverIndex, err := strconv.ParseInt(match[3], 10, 32) if err != nil { return nil, err } diff --git a/graphite/configs/flow-graphite-config-ng.yml b/graphite/configs/flow-graphite-config-ng.yml new file mode 100644 index 0000000..22ace75 --- /dev/null +++ b/graphite/configs/flow-graphite-config-ng.yml @@ -0,0 +1,26 @@ +system: + maxprocs: 4 + metrics: + enabled: true + interval: 1 + receiver: + type: graphite + params: + namespace: metrics.flowd + host: localhost + port: 2003 + +components: + udp_rcv: + module: receiver.udp + params: + bind_addr: :3101 + graphite: + plugin: graphite + constructor: New + params: + config: /home/olegs/workspace/golang/src/github.com/whiteboxio/flow-plugins/graphite/configs/graphite-relay.conf + +pipeline: + udp_rcv: + connect: graphite diff --git a/graphite/configs/flow-graphite-config.yml b/graphite/configs/flow-graphite-config.yml deleted file mode 100644 index ec224ae..0000000 --- a/graphite/configs/flow-graphite-config.yml +++ /dev/null @@ -1,111 +0,0 @@ -system: - maxprocs: 4 - metrics: - enabled: true - interval: 1 #second - receiver: - type: graphite - params: - namespace: metrics.flowd - host: localhost - port: 2003 - -components: - udp_rcv_plain: - module: receiver.udp - params: - bind_addr: :3101 - udp_rcv_pickle: - module: receiver.udp - params: - bind_addr: :3102 - rcv_dmx: - module: link.dmx - graphite_msg_parser_plain: - plugin: graphite - constructor: NewMsgParser - params: - format: plain - graphite_msg_parser_pickle: - plugin: graphite - constructor: NewMsgParser - params: - format: pickle - replicator: - module: link.replicator - params: - replicas: 1 - hash_key: metric-name - tcp_7220: - module: sink.tcp - params: - bind_addr: :7220 - tcp_7221: - module: sink.tcp - params: - bind_addr: :7221 - tcp_7222: - module: sink.tcp - params: - bind_addr: :7222 - tcp_7223: - module: sink.tcp - params: - bind_addr: :7223 - tcp_7224: - module: sink.tcp - params: - bind_addr: :7224 - tcp_7225: - module: sink.tcp - params: - bind_addr: :7225 - tcp_7226: - module: sink.tcp - params: - bind_addr: :7226 - tcp_7227: - module: sink.tcp - params: - bind_addr: :7227 - tcp_7228: - module: sink.tcp - params: - bind_addr: :7228 - tcp_7229: - module: sink.tcp - params: - bind_addr: :7229 - tcp_7230: - module: sink.tcp - params: - bind_addr: :7230 - tcp_7231: - module: sink.tcp - params: - bind_addr: :7231 - -pipeline: - udp_rcv_plain: - connect: graphite_msg_parser_plain - udp_rcv_pickle: - connect: graphite_msg_parser_pickle - rcv_dmx: - links: - - graphite_msg_parser_plain - - graphite_msg_parser_pickle - connect: replicator - replicator: - links: - - tcp_7220 - - tcp_7221 - - tcp_7222 - - tcp_7223 - - tcp_7224 - - tcp_7225 - - tcp_7226 - - tcp_7227 - - tcp_7228 - - tcp_7229 - - tcp_7230 - - tcp_7231 \ No newline at end of file diff --git a/graphite/main.go b/graphite/main.go index eb2ae3c..7e2b6f8 100644 --- a/graphite/main.go +++ b/graphite/main.go @@ -42,6 +42,9 @@ Routes: for _, dest := range route.destinations { dests[dest] = true } + if route.drop { + continue Routes + } if route.stop { break Routes }