-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathabsinthe.go
More file actions
127 lines (101 loc) · 2.87 KB
/
absinthe.go
File metadata and controls
127 lines (101 loc) · 2.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package archethic
import (
"log"
"net/url"
"github.com/nshafer/phx"
)
type AbsintheSubscription struct {
socket *phx.Socket
ref phx.Ref
}
func (a *AbsintheSubscription) GraphqlSubscription(wsUrl, query string, variables map[string]string, readyHandler func(), handler func(map[string]interface{}) error) {
endPoint, _ := url.Parse(wsUrl)
// Create a new phx.Socket
socket := phx.NewSocket(endPoint)
a.socket = socket
// Wait for the socket to connect before continuing. If it's not able to, it will keep
// retrying forever.
cont := make(chan bool)
a.ref = socket.OnOpen(func() {
cont <- true
})
// Tell the socket to connect (or start retrying until it can connect)
err := socket.Connect()
if err != nil {
log.Fatal(err)
}
// Wait for the connection
<-cont
// Create a phx.Channel to connect to the default '__absinthe__:control' channel with no params
channel := socket.Channel("__absinthe__:control", nil)
//TODO: implement hearbeat
// Join the channel. A phx.Push is returned which can be used to bind to replies and errors
join, err := channel.Join()
if err != nil {
log.Fatal(err)
}
// Listen for a response and only continue once we know we're joined
join.Receive("ok", func(response any) {
cont <- true
})
join.Receive("error", func(response any) {
log.Fatalf("Join error: %s", response)
})
// wait to be joined
<-cont
payload := make(map[string]any)
payload["query"] = query
if variables != nil {
payload["variables"] = variables
}
_, err = channel.Push("doc", payload)
if err != nil {
log.Fatal(err)
}
subscriptionID := ""
channel.On("phx_reply", func(data interface{}) {
dataMap, ok := data.(map[string]interface{})
if !ok {
log.Fatalf("'phx_reply' error to parse data: %s", data)
}
response, ok := dataMap["response"].(map[string]interface{})
if !ok {
log.Fatalf("'phx_reply' error to parse response: %s", dataMap)
}
subscriptionID, ok = response["subscriptionId"].(string)
if !ok {
log.Fatalf("'phx_reply' error to parse subscriptionID: %s", response)
}
if subscriptionID != "" {
cont <- true
}
})
<-cont
socket.OnMessage(func(message phx.Message) {
if message.Topic == subscriptionID && message.Event == "subscription:data" {
payload, ok := message.Payload.(map[string]interface{})
if !ok {
log.Fatalf("Message received, error to parse payload: %s", message.Payload)
}
result, ok := payload["result"].(map[string]interface{})
if !ok {
log.Fatalf("Message received, error to parse result %s", payload)
}
data, ok := result["data"].(map[string]interface{})
if !ok {
log.Fatalf("Message received, error to parse data %s", result)
}
handler(data)
cont <- true
}
})
if readyHandler != nil {
readyHandler()
}
<-cont
// Now we will block forever, hit ctrl+c to exit
select {}
}
func (a *AbsintheSubscription) CancelSubscription() {
a.socket.Off(a.ref)
}