From 70f83f8af8cc70f3718e5a59877d88f5418db035 Mon Sep 17 00:00:00 2001 From: Shivam Rathore Date: Thu, 23 Jan 2020 12:07:06 +0530 Subject: [PATCH 1/3] saastack initial commit --- .DS_Store | Bin 6148 -> 6148 bytes README.md | 6 +++--- call.go | 2 +- call_test.go | 2 +- client.go | 2 +- example/main.go | 6 +++--- example/test-1/server/main.go | 8 ++++---- go.mod | 2 +- graphql/clone_test.go | 6 +++--- graphql/end_to_end_test.go | 6 +++--- graphql/execute.go | 2 +- graphql/execute_test.go | 8 ++++---- graphql/lazy_execute_test.go | 6 +++--- graphql/parser.go | 2 +- graphql/parser_test.go | 2 +- graphql/union_test.go | 6 +++--- gtypes/global.go | 2 +- http.go | 4 ++-- http_test.go | 4 ++-- introspection/introspection.go | 4 ++-- introspection/introspection_test.go | 6 +++--- middleware.go | 2 +- schema/schema.proto | 2 +- schemabuilder/build.go | 2 +- schemabuilder/function.go | 2 +- schemabuilder/input.go | 2 +- schemabuilder/input_object.go | 2 +- schemabuilder/output.go | 2 +- schemabuilder/reflect.go | 2 +- schemabuilder/schema.go | 2 +- ws.go | 6 +++--- 31 files changed, 55 insertions(+), 55 deletions(-) diff --git a/.DS_Store b/.DS_Store index bcdcb5767800c859b9943663f15bcd2fdf66eaee..83b2345776b208d97c733a9e09f7e572f8b7c397 100644 GIT binary patch delta 32 ocmZoMXfc@J&nU1lU^g?Pz-Atn(@dNHvvDv^Y|z-u&heKY0IYEefdBvi delta 59 zcmZoMXfc@J&nUPtU^g?P;AS3{(@e4~3?&Sy47m&io;mr+NjdpR3=9kc3=AyoKw4w- N88%_Y&Fmb1`2p1?4&nd+ diff --git a/README.md b/README.md index 38b6b20..3c16228 100644 --- a/README.md +++ b/README.md @@ -27,9 +27,9 @@ import ( "net/http" "github.com/google/uuid" - "go.appointy.com/jaal" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" ) type Server struct { diff --git a/call.go b/call.go index 6108ffc..ad0cfa6 100644 --- a/call.go +++ b/call.go @@ -7,7 +7,7 @@ import ( "net/http" "time" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/jerrors" ) // HttpCall sends an HTTP Request to the specified url and returns response in map of map diff --git a/call_test.go b/call_test.go index 5a85212..278f168 100644 --- a/call_test.go +++ b/call_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/schemabuilder" ) func TestHttpCall(t *testing.T) { diff --git a/client.go b/client.go index e2fed8d..2a352f4 100644 --- a/client.go +++ b/client.go @@ -6,7 +6,7 @@ import ( "fmt" "net/http" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/jerrors" ) type Decoder interface { diff --git a/example/main.go b/example/main.go index 08fc81a..cb0f249 100644 --- a/example/main.go +++ b/example/main.go @@ -11,9 +11,9 @@ import ( "time" "github.com/google/uuid" - "go.appointy.com/jaal" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" ) func init() { diff --git a/example/test-1/server/main.go b/example/test-1/server/main.go index 18aff23..1a1d7cc 100644 --- a/example/test-1/server/main.go +++ b/example/test-1/server/main.go @@ -9,10 +9,10 @@ import ( "time" "github.com/appointy/idgen" - "go.appointy.com/jaal" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/mempubsub" "golang.org/x/net/context" diff --git a/go.mod b/go.mod index b0ecedf..62d6fe9 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module go.appointy.com/jaal +module go.saastack.io/jaal go 1.13 diff --git a/graphql/clone_test.go b/graphql/clone_test.go index 40477fa..54698ee 100644 --- a/graphql/clone_test.go +++ b/graphql/clone_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" ) func TestClone(t *testing.T) { diff --git a/graphql/end_to_end_test.go b/graphql/end_to_end_test.go index 5960067..9d7290b 100644 --- a/graphql/end_to_end_test.go +++ b/graphql/end_to_end_test.go @@ -8,9 +8,9 @@ import ( "github.com/kylelemons/godebug/pretty" "github.com/stretchr/testify/assert" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/internal" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/internal" + "go.saastack.io/jaal/schemabuilder" ) func TestInterface(t *testing.T) { diff --git a/graphql/execute.go b/graphql/execute.go index 35a58c0..7b3be42 100644 --- a/graphql/execute.go +++ b/graphql/execute.go @@ -8,7 +8,7 @@ import ( "runtime" "strings" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/jerrors" ) type Executor struct { diff --git a/graphql/execute_test.go b/graphql/execute_test.go index ceda6ab..2b08c1e 100644 --- a/graphql/execute_test.go +++ b/graphql/execute_test.go @@ -7,10 +7,10 @@ import ( "testing" "github.com/davecgh/go-spew/spew" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/internal" - "go.appointy.com/jaal/jerrors" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/internal" + "go.saastack.io/jaal/jerrors" + "go.saastack.io/jaal/schemabuilder" "google.golang.org/grpc/codes" ) diff --git a/graphql/lazy_execute_test.go b/graphql/lazy_execute_test.go index ee3fd9a..3a13586 100644 --- a/graphql/lazy_execute_test.go +++ b/graphql/lazy_execute_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/jerrors" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/jerrors" + "go.saastack.io/jaal/schemabuilder" ) func TestLazyExecution(t *testing.T) { diff --git a/graphql/parser.go b/graphql/parser.go index 5d47cff..54ab615 100644 --- a/graphql/parser.go +++ b/graphql/parser.go @@ -10,7 +10,7 @@ import ( "github.com/graphql-go/graphql/language/ast" "github.com/graphql-go/graphql/language/parser" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/jerrors" ) type Query struct { diff --git a/graphql/parser_test.go b/graphql/parser_test.go index 8a056bf..e7357fd 100644 --- a/graphql/parser_test.go +++ b/graphql/parser_test.go @@ -5,7 +5,7 @@ import ( "reflect" "testing" - . "go.appointy.com/jaal/graphql" + . "go.saastack.io/jaal/graphql" ) func TestParseSupported(t *testing.T) { diff --git a/graphql/union_test.go b/graphql/union_test.go index ed6a9c9..d724ae2 100644 --- a/graphql/union_test.go +++ b/graphql/union_test.go @@ -7,9 +7,9 @@ import ( "testing" "github.com/kylelemons/godebug/pretty" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/internal" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/internal" + "go.saastack.io/jaal/schemabuilder" ) type GatewayType int diff --git a/gtypes/global.go b/gtypes/global.go index 116c621..1043eac 100644 --- a/gtypes/global.go +++ b/gtypes/global.go @@ -6,7 +6,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/iancoleman/strcase" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/schemabuilder" "google.golang.org/genproto/protobuf/field_mask" ) diff --git a/http.go b/http.go index f7d81c8..3ac4586 100644 --- a/http.go +++ b/http.go @@ -6,8 +6,8 @@ import ( "errors" "net/http" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/jerrors" ) type HandlerOption func(*handlerOptions) diff --git a/http_test.go b/http_test.go index 80d93ae..1cef2a1 100644 --- a/http_test.go +++ b/http_test.go @@ -7,8 +7,8 @@ import ( "testing" "github.com/kylelemons/godebug/pretty" - "go.appointy.com/jaal" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal" + "go.saastack.io/jaal/schemabuilder" ) func testHTTPRequest(req *http.Request) *httptest.ResponseRecorder { diff --git a/introspection/introspection.go b/introspection/introspection.go index 1ebeb34..fa1fbbd 100644 --- a/introspection/introspection.go +++ b/introspection/introspection.go @@ -6,8 +6,8 @@ import ( "fmt" "sort" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/schemabuilder" ) type introspection struct { diff --git a/introspection/introspection_test.go b/introspection/introspection_test.go index 2263a7b..85859e4 100644 --- a/introspection/introspection_test.go +++ b/introspection/introspection_test.go @@ -7,11 +7,11 @@ import ( "strings" "testing" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" "github.com/stretchr/testify/require" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" ) type User struct { diff --git a/middleware.go b/middleware.go index aac4ad5..0ede14d 100644 --- a/middleware.go +++ b/middleware.go @@ -2,7 +2,7 @@ package jaal import "context" -import "go.appointy.com/jaal/graphql" +import "go.saastack.io/jaal/graphql" type HandlerFunc func(context.Context, graphql.Type, *graphql.Query) (interface{}, error) diff --git a/schema/schema.proto b/schema/schema.proto index 101caf6..873a35a 100644 --- a/schema/schema.proto +++ b/schema/schema.proto @@ -5,7 +5,7 @@ package graphql; import "google/protobuf/descriptor.proto"; -option go_package="go.appointy.com/jaal/schema"; +option go_package="go.saastack.io/jaal/schema"; extend google.protobuf.MethodOptions { // schema is used to tag an rpc as query or mutation. diff --git a/schemabuilder/build.go b/schemabuilder/build.go index c319d53..6b47cab 100644 --- a/schemabuilder/build.go +++ b/schemabuilder/build.go @@ -6,7 +6,7 @@ import ( "github.com/golang/protobuf/ptypes/duration" "github.com/golang/protobuf/ptypes/timestamp" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // schemaBuilder is a struct for holding all the graph information for types as diff --git a/schemabuilder/function.go b/schemabuilder/function.go index 52dbd77..bf3b7b4 100644 --- a/schemabuilder/function.go +++ b/schemabuilder/function.go @@ -5,7 +5,7 @@ import ( "fmt" "reflect" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // buildFunction takes the reflect type of an object and a method attached to that object to build a GraphQL Field diff --git a/schemabuilder/input.go b/schemabuilder/input.go index a942047..3566daf 100644 --- a/schemabuilder/input.go +++ b/schemabuilder/input.go @@ -9,7 +9,7 @@ import ( "github.com/golang/protobuf/ptypes/duration" "github.com/golang/protobuf/ptypes/timestamp" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // argField is a representation of an input parameter field for a function. It diff --git a/schemabuilder/input_object.go b/schemabuilder/input_object.go index 4e83518..ccea98d 100644 --- a/schemabuilder/input_object.go +++ b/schemabuilder/input_object.go @@ -5,7 +5,7 @@ import ( "fmt" "reflect" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // makeInputObjectParser constructs an argParser for the passed in args struct i.e. the input struct which contains all the objects to be given as input. For eg: diff --git a/schemabuilder/output.go b/schemabuilder/output.go index 7b9b650..76ab676 100644 --- a/schemabuilder/output.go +++ b/schemabuilder/output.go @@ -6,7 +6,7 @@ import ( "reflect" "sort" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // buildStruct is a function for building the graphql.Type for a passed in struct type. diff --git a/schemabuilder/reflect.go b/schemabuilder/reflect.go index de6b2a1..61a26e5 100644 --- a/schemabuilder/reflect.go +++ b/schemabuilder/reflect.go @@ -7,7 +7,7 @@ import ( "strings" "unicode" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // graphQLFieldInfo contains basic struct field information related to GraphQL. diff --git a/schemabuilder/schema.go b/schemabuilder/schema.go index ee7b50c..d766e91 100644 --- a/schemabuilder/schema.go +++ b/schemabuilder/schema.go @@ -4,7 +4,7 @@ import ( "fmt" "reflect" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // Schema is a struct that can be used to build out a GraphQL schema. Functions diff --git a/ws.go b/ws.go index 4629db1..4e06d3b 100644 --- a/ws.go +++ b/ws.go @@ -13,9 +13,9 @@ import ( "github.com/gorilla/websocket" "gocloud.dev/pubsub" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/jerrors" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/jerrors" + "go.saastack.io/jaal/schemabuilder" ) // HTTPSubHandler implements the handler required for executing the graphql subscriptions From 794fdce37e005f24fb2ceeb07cd53ab5568c4a9b Mon Sep 17 00:00:00 2001 From: Shivam Rathore Date: Fri, 17 Apr 2020 17:51:56 +0530 Subject: [PATCH 2/3] Fix subscription panic on closing websocket connection --- go.mod | 3 +- go.sum | 2 ++ ws.go | 104 ++++++++++++++++++++++++++------------------------------- 3 files changed, 52 insertions(+), 57 deletions(-) diff --git a/go.mod b/go.mod index 62d6fe9..040e55c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module go.saastack.io/jaal -go 1.13 +go 1.14 require ( github.com/appointy/idgen v0.0.0-20190227121039-a884768ebb9d @@ -15,6 +15,7 @@ require ( go.opencensus.io v0.22.0 // indirect gocloud.dev v0.15.0 golang.org/x/net v0.0.0-20190514140710-3ec191127204 + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect google.golang.org/api v0.6.0 // indirect google.golang.org/genproto v0.0.0-20190508193815-b515fa19cec8 google.golang.org/grpc v1.21.0 diff --git a/go.sum b/go.sum index 9a274d4..409f773 100644 --- a/go.sum +++ b/go.sum @@ -230,6 +230,8 @@ golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 h1:PPwnA7z1Pjf7XYaBP9GL1VAMZmcIWyFz7QCMSIIa3Bg= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.3.2/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= diff --git a/ws.go b/ws.go index 4e06d3b..201daf7 100644 --- a/ws.go +++ b/ws.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/appointy/idgen" "log" "net/http" "strings" @@ -19,11 +20,11 @@ import ( ) // HTTPSubHandler implements the handler required for executing the graphql subscriptions -func HTTPSubHandler(schema *graphql.Schema, s *pubsub.Subscription) (http.Handler, func()) { +func HTTPSubHandler(schema *graphql.Schema, subs *pubsub.Subscription) (http.Handler, func()) { source := make(chan *event) sessions := &sessions{ - data: map[string][]chan *event{}, - chans: map[string][]chan struct{}{}, + data: map[string]chan *event{}, + chans: map[string]chan struct{}{}, } return &httpSubHandler{ handler: handler{ @@ -35,7 +36,7 @@ func HTTPSubHandler(schema *graphql.Schema, s *pubsub.Subscription) (http.Handle source: source, sessions: sessions, }, func() { - go startListening(s, source, func() { + go startListening(subs, source, func() { exit(sessions) }) go listenSource(source, sessions) @@ -46,9 +47,7 @@ func listenSource(events chan *event, ss *sessions) { for evt := range events { ss.RLock() for _, v := range ss.data { - for _, s := range v { - s <- evt - } + v <- evt } ss.RUnlock() } @@ -58,7 +57,8 @@ func startListening(s *pubsub.Subscription, source chan<- *event, cancel func()) for { msg, err := s.Receive(context.Background()) if err != nil { - fmt.Println("Pubsub failed: ", err) + fmt.Println("Pubsub failed with error:", err) + fmt.Println("Closing all sessions") cancel() return } @@ -86,8 +86,8 @@ type event struct { type sessions struct { sync.RWMutex - data map[string][]chan *event - chans map[string][]chan struct{} + data map[string]chan *event + chans map[string]chan struct{} } type wsMessage struct { @@ -139,7 +139,8 @@ func (h *httpSubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } - if err := writeResponse(conn, "connection_ack", "", nil, nil); err != nil { + id := idgen.New("sock") + if err := writeResponse(conn, "connection_ack", id, nil, nil); err != nil { fmt.Println(err) return } @@ -153,11 +154,12 @@ loop: } fmt.Println(err) } + data.Id = id switch data.Type { case "start": var gql gqlPayload if err := json.Unmarshal(data.Payload, &gql); err != nil { - if err := writeResponse(conn, "connection_error", "", nil, err); err != nil { + if err := writeResponse(conn, "connection_error", data.Id, nil, err); err != nil { fmt.Println(err) return } @@ -193,16 +195,11 @@ loop: }, } go func(conn *webConn, data *wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request) { - if err := h.serveHTTP(conn, *data, schema, query, end, w, r); err != nil { - fmt.Println("Id:", data.Id, ": terminated: ", err) + err := h.serveHTTP(conn, *data, schema, query, end, w, r) + if err := writeResponse(conn, "complete", data.Id, nil, err); err != nil { + fmt.Println(err) } h.sessions.Lock() - if _, ok := h.sessions.data[data.Id]; ok { - if err := writeResponse(conn, "complete", data.Id, nil, nil); err != nil { - fmt.Println(err) - } - fmt.Println("Id:", data.Id, ": terminated.") - } delete(h.sessions.data, data.Id) delete(h.sessions.chans, data.Id) h.sessions.Unlock() @@ -210,12 +207,15 @@ loop: } case "stop": h.sessions.RLock() - for _, v := range h.sessions.chans[data.Id] { - v <- struct{}{} + if _, ok := h.sessions.chans[data.Id]; ok { + h.sessions.chans[data.Id] <- struct{}{} } h.sessions.RUnlock() case "connection_terminate": - exit(h.sessions) + h.sessions.RLock() + delete(h.sessions.data, data.Id) + delete(h.sessions.chans, data.Id) + h.sessions.RUnlock() break loop default: } @@ -223,16 +223,13 @@ loop: } func exit(ss *sessions) { + // closing all sessions ss.RLock() - for _, v := range ss.chans { - for _, s := range v { - s <- struct{}{} - } + for _, ch := range ss.chans { + ch <- struct{}{} } - for _, v := range ss.data { - for _, s := range v { - close(s) - } + for _, da := range ss.data { + close(da) } ss.RUnlock() } @@ -258,6 +255,9 @@ func writeResponse(w *webConn, typ, id string, r interface{}, er error) error { } } } else if typ == "error" || typ == "connection_error" { + if er == nil { + er = errors.New("connection is closed") + } str := strings.Replace(er.Error(), "\"", "\\\"", -1) payload = json.RawMessage("{ \"error\" : \"" + str + "\"}") } @@ -279,22 +279,14 @@ func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql sid := data.Id sess := make(chan *event) h.sessions.Lock() - h.sessions.data[sid] = append(h.sessions.data[sid], sess) - h.sessions.chans[sid] = append(h.sessions.chans[sid], end) + h.sessions.data[sid] = sess + h.sessions.chans[sid] = end h.sessions.Unlock() cls := func(ss *sessions, sid string) { ss.Lock() - for _, v := range ss.data[sid] { - close(v) - for range v { - } - } - for _, v := range ss.chans[sid] { - close(v) - for range v { - } - } + close(ss.data[sid]) + close(ss.chans[sid]) ss.Unlock() } @@ -305,22 +297,22 @@ func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql cls(h.sessions, sid) return nil default: - if err := func() error { - res, err := h.executor.Execute(r.Context(), schema, &schemabuilder.Subscription{msg.payload}, query) - if err == graphql.ErrNoUpdate { + // Subscription should have only one root selection + // https://spec.graphql.org/June2018/#sec-Single-root-field + if len(query.Selections) == 1 && query.Selections[0].Name == msg.typ { + if err := func() error { + res, err := h.executor.Execute(r.Context(), schema, &schemabuilder.Subscription{Payload: msg.payload}, query) + if err == graphql.ErrNoUpdate { + return nil + } + if err := writeResponse(conn, "data", data.Id, res, err); err != nil { + return err + } return nil - } - rer := err - if err := writeResponse(conn, "data", data.Id, res, rer); err != nil { - return err - } - if rer != nil { + }(); err != nil { + cls(h.sessions, sid) return err } - return nil - }(); err != nil { - cls(h.sessions, sid) - return err } } } From bf5fbae9985664ba5f768b4a5adbc335128921f4 Mon Sep 17 00:00:00 2001 From: Shivam Rathore Date: Sat, 18 Apr 2020 14:14:35 +0530 Subject: [PATCH 3/3] Maintain subscription sessions using both client & server side id --- example/test-1/server/main.go | 3 -- ws.go | 74 +++++++++++++++++++++-------------- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/example/test-1/server/main.go b/example/test-1/server/main.go index 1a1d7cc..d6442e4 100644 --- a/example/test-1/server/main.go +++ b/example/test-1/server/main.go @@ -278,9 +278,6 @@ func main() { return } } - if t < 10 { - sub.Shutdown(ctx) - } } }() f() diff --git a/ws.go b/ws.go index 201daf7..60ff8ef 100644 --- a/ws.go +++ b/ws.go @@ -23,8 +23,8 @@ import ( func HTTPSubHandler(schema *graphql.Schema, subs *pubsub.Subscription) (http.Handler, func()) { source := make(chan *event) sessions := &sessions{ - data: map[string]chan *event{}, - chans: map[string]chan struct{}{}, + data: map[string]map[string]chan *event{}, + chans: map[string]map[string]chan struct{}{}, } return &httpSubHandler{ handler: handler{ @@ -46,8 +46,10 @@ func HTTPSubHandler(schema *graphql.Schema, subs *pubsub.Subscription) (http.Han func listenSource(events chan *event, ss *sessions) { for evt := range events { ss.RLock() - for _, v := range ss.data { - v <- evt + for _, mp := range ss.data { + for _, v := range mp { + v <- evt + } } ss.RUnlock() } @@ -86,8 +88,8 @@ type event struct { type sessions struct { sync.RWMutex - data map[string]chan *event - chans map[string]chan struct{} + data map[string]map[string]chan *event + chans map[string]map[string]chan struct{} } type wsMessage struct { @@ -140,7 +142,7 @@ func (h *httpSubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } id := idgen.New("sock") - if err := writeResponse(conn, "connection_ack", id, nil, nil); err != nil { + if err := writeResponse(conn, "connection_ack", msg.Id, nil, nil); err != nil { fmt.Println(err) return } @@ -154,7 +156,6 @@ loop: } fmt.Println(err) } - data.Id = id switch data.Type { case "start": var gql gqlPayload @@ -194,27 +195,35 @@ loop: Fragments: query.SelectionSet.Fragments, }, } - go func(conn *webConn, data *wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request) { - err := h.serveHTTP(conn, *data, schema, query, end, w, r) + go func(conn *webConn, data *wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request, id string) { + err := h.serveHTTP(conn, *data, schema, query, end, w, r, id) if err := writeResponse(conn, "complete", data.Id, nil, err); err != nil { fmt.Println(err) } h.sessions.Lock() - delete(h.sessions.data, data.Id) - delete(h.sessions.chans, data.Id) + delete(h.sessions.data[data.Id], id) + delete(h.sessions.chans[data.Id], id) + if len(h.sessions.data[data.Id]) == 0 { + delete(h.sessions.data, data.Id) + delete(h.sessions.chans, data.Id) + } h.sessions.Unlock() - }(conn, &data, schema, modQuery, end, w, r) + }(conn, &data, schema, modQuery, end, w, r, id) } case "stop": h.sessions.RLock() - if _, ok := h.sessions.chans[data.Id]; ok { - h.sessions.chans[data.Id] <- struct{}{} + if _, ok := h.sessions.chans[data.Id][id]; ok { + h.sessions.chans[data.Id][id] <- struct{}{} } h.sessions.RUnlock() case "connection_terminate": h.sessions.RLock() - delete(h.sessions.data, data.Id) - delete(h.sessions.chans, data.Id) + delete(h.sessions.data[data.Id], id) + delete(h.sessions.chans[data.Id], id) + if len(h.sessions.data[data.Id]) == 0 { + delete(h.sessions.data, data.Id) + delete(h.sessions.chans, data.Id) + } h.sessions.RUnlock() break loop default: @@ -225,11 +234,10 @@ loop: func exit(ss *sessions) { // closing all sessions ss.RLock() - for _, ch := range ss.chans { - ch <- struct{}{} - } - for _, da := range ss.data { - close(da) + for _, chMp := range ss.chans { + for _, ch := range chMp { + ch <- struct{}{} + } } ss.RUnlock() } @@ -275,18 +283,24 @@ func writeResponse(w *webConn, typ, id string, r interface{}, er error) error { return nil } -func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request) error { +func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request, id string) error { sid := data.Id sess := make(chan *event) h.sessions.Lock() - h.sessions.data[sid] = sess - h.sessions.chans[sid] = end + if h.sessions.data[sid] == nil { + h.sessions.data[sid] = map[string]chan *event{} + } + h.sessions.data[sid][id] = sess + if h.sessions.chans[sid] == nil { + h.sessions.chans[sid] = map[string]chan struct{}{} + } + h.sessions.chans[sid][id] = end h.sessions.Unlock() - cls := func(ss *sessions, sid string) { + cls := func(ss *sessions, sid, id string) { ss.Lock() - close(ss.data[sid]) - close(ss.chans[sid]) + close(ss.data[sid][id]) + close(ss.chans[sid][id]) ss.Unlock() } @@ -294,7 +308,7 @@ func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql for msg := range sess { select { case <-end: - cls(h.sessions, sid) + cls(h.sessions, sid, id) return nil default: // Subscription should have only one root selection @@ -310,7 +324,7 @@ func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql } return nil }(); err != nil { - cls(h.sessions, sid) + cls(h.sessions, sid, id) return err } }