-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinstance.go
More file actions
70 lines (59 loc) · 1.17 KB
/
instance.go
File metadata and controls
70 lines (59 loc) · 1.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package Milena
import (
"github.com/JodeZer/Milena/log"
"os"
"sync"
)
type stopSig struct{}
type stopChan chan stopSig
// Milena Instance
type Instance struct {
rw sync.RWMutex
c *Config
proclock *plock
clusters []*kafkaCluster
}
// NewInstances
func NewInsatnce(conf *Config) (*Instance, error) {
ins := &Instance{}
ins.proclock = &plock{conf.LockFile}
ins.c = conf
log.Degbugf("%+v", conf)
return ins, nil
}
// StartInstance
func (i *Instance) Start() {
i.rw.Lock()
defer i.rw.Unlock()
//create lock file
if err := i.proclock.Lock(); err != nil {
log.Errorf(err.Error())
os.Exit(1)
}
for _, s := range i.c.Servers {
k, err := newKafkaCluster(&kafkaClusterConfig{
ClusterName: s.Name,
Brokers: s.Brokers,
DataDir: i.c.DataDir + "/" + s.Name,
ListenTopics: s.Topics,
})
if err != nil {
log.Errorf("start failed %s", err)
os.Exit(1)
}
i.clusters = append(i.clusters, k)
}
for _, c := range i.clusters {
c.Run()
}
}
// StopInstance
func (i *Instance) Stop() {
i.rw.Lock()
defer i.rw.Unlock()
for _, c := range i.clusters {
log.Degbugf("%s call stop", c.c.ClusterName)
c.Stop()
}
i.proclock.Unlock()
}