-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathetcd.go
More file actions
118 lines (96 loc) · 3.23 KB
/
etcd.go
File metadata and controls
118 lines (96 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package support
import (
"context"
"fmt"
"log"
"time"
"github.com/etcd-io/etcd/pkg/transport"
"go.etcd.io/etcd/clientv3"
)
const dbTimeoutTime = 5
// (private function) connToEtcd connects to an ETCD database using TLS settings and returns the connection object
func connToEtcd(certPath *string, endpoints *[]string) *clientv3.Client {
tlsInfo := transport.TLSInfo{
CertFile: *certPath + "\\peer.crt",
KeyFile: *certPath + "\\peer.key",
TrustedCAFile: *certPath + "\\ca.crt",
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
log.Fatal(err)
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: *endpoints,
DialTimeout: dbTimeoutTime * time.Second,
TLS: tlsConfig,
})
if err != nil {
log.Fatal(err)
}
return cli
}
// ReadFromEtcd reads all sub-prefixes from a given key and returns them in
// a (string, byte array) map structure
func ReadFromEtcd(certPath *string, endpoints *[]string, keyToRead string) (map[string][]byte, error) {
cli := connToEtcd(certPath, endpoints)
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), dbTimeoutTime*time.Second)
defer cancel()
response, err := cli.Get(ctx, keyToRead, clientv3.WithPrefix())
if err != nil {
return nil, err
}
// convert their weird KV struct into generic map[string]string before return
answer := make(map[string][]byte)
for i := range response.Kvs {
keyval := string(response.Kvs[i].Key)
answer[keyval] = response.Kvs[i].Value
}
return answer, nil
}
// WatchReadFromEtcd watches all sub-prefixes from a given key and returns any
// changes to them in a (string, byte array) map structure, this fuction loops
// forever unless broken from explicitly
func WatchReadFromEtcd(certPath *string, endpoints *[]string, keyToRead string, watchedChangeCh chan map[string][]byte, closeWacher chan bool) {
fmt.Printf("Now watching %s!\n", keyToRead)
cli := connToEtcd(certPath, endpoints)
defer cli.Close()
modifiedKv := make(map[string][]byte)
watchChan := cli.Watch(context.Background(), keyToRead, clientv3.WithPrefix())
go func() {
for wc := range watchChan {
for _, ev := range wc.Events {
keyval := string(ev.Kv.Key)
modifiedKv[keyval] = ev.Kv.Value
watchedChangeCh <- modifiedKv
}
}
}()
<-closeWacher // should only stop blocking if we close the channel
fmt.Printf("Killing watcher for %s!\n", keyToRead)
}
// WriteToEtcd writes once to a given key in etcd, returns nil of no error
func WriteToEtcd(certPath *string, endpoints *[]string, keyToWrite string, valueToWrite string) error {
cli := connToEtcd(certPath, endpoints)
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), dbTimeoutTime*time.Second)
defer cancel()
_, err := cli.Put(ctx, keyToWrite, valueToWrite)
if err != nil {
return err
}
return nil
}
// Deletes the given key from etcd and returns the amount deleted
func DeleteFromEtcd(certPath *string, endpoints *[]string, keyToDelete string) int64 {
var response *clientv3.DeleteResponse
cli := connToEtcd(certPath, endpoints)
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), dbTimeoutTime*time.Second)
defer cancel()
response, err := cli.Delete(ctx, keyToDelete)
if err != nil {
return 0
}
return response.Deleted
}