-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathnode.go
More file actions
344 lines (298 loc) · 7.76 KB
/
node.go
File metadata and controls
344 lines (298 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
package dagchain
import (
"encoding/gob"
"errors"
"fmt"
"io"
"net"
"time"
)
// Node is the local server
type Node struct {
nodeAddr string
// the source seed addr
sourceAddr string
// the current seed
seedAddr string
seedConn net.Conn
// ping、pong status with the current seed
// when ping failed a few times, we need to connect to another seed node
pinged bool
// backup seeds list
// when failed to connect to source seed,will try backup seed
seedBackup []*Seed
// the nodes which use our node as the current seed
downstreams map[string]net.Conn
// outer application channels
send chan interface{}
recv chan interface{}
}
// Seed structure
type Seed struct {
addr string
retry int
}
/*
laddr: our node's listen addr
saddr: the source seed addr
send: outer application pushes messages to this channel
recv: outer application receives messages from this channel
*/
func StartNode(laddr, saddr string, send, recv chan interface{}) error {
if laddr == "" {
return errors.New("please use -l to specify our node's listen addr")
}
node = &Node{
nodeAddr: laddr,
sourceAddr: saddr,
downstreams: make(map[string]net.Conn),
send: send,
recv: recv,
}
// start tcp listening
l, err := net.Listen("tcp", laddr)
if err != nil {
return err
}
// wait for downstream nodes to connect
go func() {
for {
conn, err := l.Accept()
if err != nil {
fmt.Println("accept downstream node error:", err)
continue
}
go node.receiveFrom(conn, false, false)
}
}()
// receive outer application's message,and route to the seed node and the downstream nodes
go localSend(node)
// resend the unsent messages(these messages didn't receive a matched ack from target node)
go resend(node)
// the main logic of seed manage
if saddr != "" {
// start to ping with the seed
//when ping failed a few times, we need to connect to another seed node
go node.ping()
err := node.connectSeed(saddr)
if err != nil {
fmt.Printf("failed to connecto the seed%s:%v\n", saddr, err)
return err
}
// start to sync the backup seed from current seed
//the backup seeds are those nodes who directly connected with the current seed
go node.syncBackupSeed()
// start to receive messages from the current seed
node.receiveFrom(node.seedConn, true, false)
SourceSeedTrye:
// although we disconnected with the source seed
// but,here we want retry source seed for a few times(n) first
n := 0
for {
if n > seedMaxRetry {
break
}
err := node.connectSeed(saddr)
if err != nil {
n++
goto CONTINUE
}
node.receiveFrom(node.seedConn, true, false)
//when successfully connected, the counter will be reset to 0
n = 0
CONTINUE:
time.Sleep(3 * time.Second)
}
// after retry several times with the source seed,now we want connect with our backup seeds
for {
if len(node.seedBackup) <= 0 {
// if there is no backup seed,we will go back to the source seed
fmt.Printf("no backup seed exist now\n")
break
}
// here is one important thing to notice
//if stepBack is setted to 'true', we will go back to source seed retrys again
// why?
//because, at times, the big cluster will divided into few smaller clusters, the smaller
// ones will not perceive each other, so we need a way to combine smaller ones to a
// larger one, this is why we will go back to retry the source seed after some time.
//and this stepBack action only happend when we has connected to backup seeds
stepBack := node.connectBackSeeds()
if stepBack {
fmt.Println("step back to the source seed")
goto SourceSeedTrye
}
}
// go back to try source seed
goto SourceSeedTrye
}
select {}
}
// receive messages from remote node
func (node *Node) receiveFrom(conn net.Conn, isSeed bool, needStepBack bool) bool {
var addr string
// close the connection
defer func() {
conn.Close()
// if the node is in downstream, then remove
if addr != "" {
fmt.Printf("remote downstream node %s close the connection\n", addr)
node.delete(addr)
}
// if the node is the seed, then reset
if isSeed {
fmt.Printf("remote seed node %s close the connection\n", node.seedAddr)
node.seedConn = nil
node.seedAddr = ""
}
}()
// the step back has been mentioned above
start := time.Now().Unix()
for {
if needStepBack {
now := time.Now().Unix()
// A connection which connected to backup seed ,will maintain no more than 240 second
if now-start > maxBackupSeedAlive {
return true
}
}
decoder := gob.NewDecoder(conn)
r := &Request{}
err := decoder.Decode(r)
if err != nil {
if err != io.EOF {
fmt.Println("decode message error:", err)
}
break
}
a, err := r.handle(node, conn)
if err != nil {
fmt.Println("handle message error:", err)
break
}
// update the downstream node's listen addr
if a != "" {
addr = a
}
}
return false
}
// delete node from downstream
func (node *Node) delete(addr string) {
lock.Lock()
delete(node.downstreams, addr)
lock.Unlock()
}
// dial to remote node
func (node *Node) dialToNode(addr string) (net.Conn, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
return conn, nil
}
// connect to the seed node
func (node *Node) connectSeed(addr string) error {
conn, err := node.dialToNode(addr)
if err != nil {
return err
}
node.seedConn = conn
node.seedAddr = addr
fmt.Printf("connect to the seed %s successfully\n", addr)
return nil
}
func (node *Node) ping() {
n := 0
for {
if node.pinged {
n = 0
node.pinged = false
continue
}
if n >= maxPingAllowed {
// when the ping failed several times, we will choose another seed to connnect
if node.seedConn != nil {
node.seedConn.Close()
node.seedConn = nil
node.seedAddr = ""
}
n = 0
continue
}
if node.seedConn != nil {
r := &Request{
Command: ServerPing,
Data: node.nodeAddr,
}
e := gob.NewEncoder(node.seedConn)
e.Encode(r)
n++
}
time.Sleep(pingInterval * time.Second)
}
}
// sync backup seed from current seed
// the backup seeds are those nodes who directly connected with the current seed
func (node *Node) syncBackupSeed() {
// waiting for node's initialization
time.Sleep(100 * time.Millisecond)
go func() {
for {
if node.seedConn != nil {
r := &Request{
Command: SyncBackupSeeds,
Data: node.nodeAddr,
}
e := gob.NewEncoder(node.seedConn)
e.Encode(r)
}
time.Sleep(syncBackupSeedInterval * time.Second)
}
}()
}
func (node *Node) connectBackSeeds() bool {
defer func() {
if err := recover(); err != nil {
fmt.Println("a critical error happens when connecto to backup seed", err)
}
}()
for i, seed := range node.seedBackup {
exist := false
var err error
var stepBack bool
// a node can't appear in seedBackup and downstream at the same time
for addr := range node.downstreams {
if addr == seed.addr {
node.seedBackup = append(node.seedBackup[:i], node.seedBackup[i+1:]...)
exist = true
}
}
if exist {
fmt.Printf("a conflict between backupSeeds and downstream,so the backup seed is deleted:%s\n", seed.addr)
continue
}
// seed connection retries can't exceed the upper limit
if seed.retry > seedMaxRetry {
fmt.Printf("seed %sretries exceed the limit\n", seed.addr)
node.seedBackup = append(node.seedBackup[:i], node.seedBackup[i+1:]...)
goto CONTINUE1
}
err = node.connectSeed(seed.addr)
if err != nil {
seed.retry++
fmt.Printf("reconnect to seed %v error: %v\n", seed, err)
goto CONTINUE1
}
stepBack = node.receiveFrom(node.seedConn, true, true)
// go back to source seed
if stepBack {
return true
}
// if a seed was successfully connected, the retry counter will be reset to 0
seed.retry = 0
CONTINUE1:
time.Sleep(3 * time.Second)
}
return false
}