diff --git a/.DS_Store b/.DS_Store index bcdcb57..83b2345 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/README.md b/README.md index 38b6b20..3c16228 100644 --- a/README.md +++ b/README.md @@ -27,9 +27,9 @@ import ( "net/http" "github.com/google/uuid" - "go.appointy.com/jaal" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" ) type Server struct { diff --git a/call.go b/call.go index 6108ffc..ad0cfa6 100644 --- a/call.go +++ b/call.go @@ -7,7 +7,7 @@ import ( "net/http" "time" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/jerrors" ) // HttpCall sends an HTTP Request to the specified url and returns response in map of map diff --git a/call_test.go b/call_test.go index 5a85212..278f168 100644 --- a/call_test.go +++ b/call_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/schemabuilder" ) func TestHttpCall(t *testing.T) { diff --git a/client.go b/client.go index e2fed8d..2a352f4 100644 --- a/client.go +++ b/client.go @@ -6,7 +6,7 @@ import ( "fmt" "net/http" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/jerrors" ) type Decoder interface { diff --git a/example/main.go b/example/main.go index 08fc81a..cb0f249 100644 --- a/example/main.go +++ b/example/main.go @@ -11,9 +11,9 @@ import ( "time" "github.com/google/uuid" - "go.appointy.com/jaal" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" ) func init() { diff --git a/example/test-1/server/main.go b/example/test-1/server/main.go index 18aff23..d6442e4 100644 --- a/example/test-1/server/main.go +++ b/example/test-1/server/main.go @@ -9,10 +9,10 @@ import ( "time" "github.com/appointy/idgen" - "go.appointy.com/jaal" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/mempubsub" "golang.org/x/net/context" @@ -278,9 +278,6 @@ func main() { return } } - if t < 10 { - sub.Shutdown(ctx) - } } }() f() diff --git a/go.mod b/go.mod index b0ecedf..040e55c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ -module go.appointy.com/jaal +module go.saastack.io/jaal -go 1.13 +go 1.14 require ( github.com/appointy/idgen v0.0.0-20190227121039-a884768ebb9d @@ -15,6 +15,7 @@ require ( go.opencensus.io v0.22.0 // indirect gocloud.dev v0.15.0 golang.org/x/net v0.0.0-20190514140710-3ec191127204 + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect google.golang.org/api v0.6.0 // indirect google.golang.org/genproto v0.0.0-20190508193815-b515fa19cec8 google.golang.org/grpc v1.21.0 diff --git a/go.sum b/go.sum index 9a274d4..409f773 100644 --- a/go.sum +++ b/go.sum @@ -230,6 +230,8 @@ golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 h1:PPwnA7z1Pjf7XYaBP9GL1VAMZmcIWyFz7QCMSIIa3Bg= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.3.2/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= diff --git a/graphql/clone_test.go b/graphql/clone_test.go index 40477fa..54698ee 100644 --- a/graphql/clone_test.go +++ b/graphql/clone_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" ) func TestClone(t *testing.T) { diff --git a/graphql/end_to_end_test.go b/graphql/end_to_end_test.go index 5960067..9d7290b 100644 --- a/graphql/end_to_end_test.go +++ b/graphql/end_to_end_test.go @@ -8,9 +8,9 @@ import ( "github.com/kylelemons/godebug/pretty" "github.com/stretchr/testify/assert" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/internal" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/internal" + "go.saastack.io/jaal/schemabuilder" ) func TestInterface(t *testing.T) { diff --git a/graphql/execute.go b/graphql/execute.go index 35a58c0..7b3be42 100644 --- a/graphql/execute.go +++ b/graphql/execute.go @@ -8,7 +8,7 @@ import ( "runtime" "strings" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/jerrors" ) type Executor struct { diff --git a/graphql/execute_test.go b/graphql/execute_test.go index ceda6ab..2b08c1e 100644 --- a/graphql/execute_test.go +++ b/graphql/execute_test.go @@ -7,10 +7,10 @@ import ( "testing" "github.com/davecgh/go-spew/spew" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/internal" - "go.appointy.com/jaal/jerrors" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/internal" + "go.saastack.io/jaal/jerrors" + "go.saastack.io/jaal/schemabuilder" "google.golang.org/grpc/codes" ) diff --git a/graphql/lazy_execute_test.go b/graphql/lazy_execute_test.go index ee3fd9a..3a13586 100644 --- a/graphql/lazy_execute_test.go +++ b/graphql/lazy_execute_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/jerrors" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/jerrors" + "go.saastack.io/jaal/schemabuilder" ) func TestLazyExecution(t *testing.T) { diff --git a/graphql/parser.go b/graphql/parser.go index 5d47cff..54ab615 100644 --- a/graphql/parser.go +++ b/graphql/parser.go @@ -10,7 +10,7 @@ import ( "github.com/graphql-go/graphql/language/ast" "github.com/graphql-go/graphql/language/parser" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/jerrors" ) type Query struct { diff --git a/graphql/parser_test.go b/graphql/parser_test.go index 8a056bf..e7357fd 100644 --- a/graphql/parser_test.go +++ b/graphql/parser_test.go @@ -5,7 +5,7 @@ import ( "reflect" "testing" - . "go.appointy.com/jaal/graphql" + . "go.saastack.io/jaal/graphql" ) func TestParseSupported(t *testing.T) { diff --git a/graphql/union_test.go b/graphql/union_test.go index ed6a9c9..d724ae2 100644 --- a/graphql/union_test.go +++ b/graphql/union_test.go @@ -7,9 +7,9 @@ import ( "testing" "github.com/kylelemons/godebug/pretty" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/internal" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/internal" + "go.saastack.io/jaal/schemabuilder" ) type GatewayType int diff --git a/gtypes/global.go b/gtypes/global.go index 116c621..1043eac 100644 --- a/gtypes/global.go +++ b/gtypes/global.go @@ -6,7 +6,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/iancoleman/strcase" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/schemabuilder" "google.golang.org/genproto/protobuf/field_mask" ) diff --git a/http.go b/http.go index f7d81c8..3ac4586 100644 --- a/http.go +++ b/http.go @@ -6,8 +6,8 @@ import ( "errors" "net/http" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/jerrors" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/jerrors" ) type HandlerOption func(*handlerOptions) diff --git a/http_test.go b/http_test.go index 80d93ae..1cef2a1 100644 --- a/http_test.go +++ b/http_test.go @@ -7,8 +7,8 @@ import ( "testing" "github.com/kylelemons/godebug/pretty" - "go.appointy.com/jaal" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal" + "go.saastack.io/jaal/schemabuilder" ) func testHTTPRequest(req *http.Request) *httptest.ResponseRecorder { diff --git a/introspection/introspection.go b/introspection/introspection.go index 1ebeb34..fa1fbbd 100644 --- a/introspection/introspection.go +++ b/introspection/introspection.go @@ -6,8 +6,8 @@ import ( "fmt" "sort" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/schemabuilder" ) type introspection struct { diff --git a/introspection/introspection_test.go b/introspection/introspection_test.go index 2263a7b..85859e4 100644 --- a/introspection/introspection_test.go +++ b/introspection/introspection_test.go @@ -7,11 +7,11 @@ import ( "strings" "testing" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" "github.com/stretchr/testify/require" - "go.appointy.com/jaal/introspection" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/introspection" + "go.saastack.io/jaal/schemabuilder" ) type User struct { diff --git a/middleware.go b/middleware.go index aac4ad5..0ede14d 100644 --- a/middleware.go +++ b/middleware.go @@ -2,7 +2,7 @@ package jaal import "context" -import "go.appointy.com/jaal/graphql" +import "go.saastack.io/jaal/graphql" type HandlerFunc func(context.Context, graphql.Type, *graphql.Query) (interface{}, error) diff --git a/schema/schema.proto b/schema/schema.proto index 101caf6..873a35a 100644 --- a/schema/schema.proto +++ b/schema/schema.proto @@ -5,7 +5,7 @@ package graphql; import "google/protobuf/descriptor.proto"; -option go_package="go.appointy.com/jaal/schema"; +option go_package="go.saastack.io/jaal/schema"; extend google.protobuf.MethodOptions { // schema is used to tag an rpc as query or mutation. diff --git a/schemabuilder/build.go b/schemabuilder/build.go index c319d53..6b47cab 100644 --- a/schemabuilder/build.go +++ b/schemabuilder/build.go @@ -6,7 +6,7 @@ import ( "github.com/golang/protobuf/ptypes/duration" "github.com/golang/protobuf/ptypes/timestamp" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // schemaBuilder is a struct for holding all the graph information for types as diff --git a/schemabuilder/function.go b/schemabuilder/function.go index 52dbd77..bf3b7b4 100644 --- a/schemabuilder/function.go +++ b/schemabuilder/function.go @@ -5,7 +5,7 @@ import ( "fmt" "reflect" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // buildFunction takes the reflect type of an object and a method attached to that object to build a GraphQL Field diff --git a/schemabuilder/input.go b/schemabuilder/input.go index a942047..3566daf 100644 --- a/schemabuilder/input.go +++ b/schemabuilder/input.go @@ -9,7 +9,7 @@ import ( "github.com/golang/protobuf/ptypes/duration" "github.com/golang/protobuf/ptypes/timestamp" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // argField is a representation of an input parameter field for a function. It diff --git a/schemabuilder/input_object.go b/schemabuilder/input_object.go index 4e83518..ccea98d 100644 --- a/schemabuilder/input_object.go +++ b/schemabuilder/input_object.go @@ -5,7 +5,7 @@ import ( "fmt" "reflect" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // makeInputObjectParser constructs an argParser for the passed in args struct i.e. the input struct which contains all the objects to be given as input. For eg: diff --git a/schemabuilder/output.go b/schemabuilder/output.go index 7b9b650..76ab676 100644 --- a/schemabuilder/output.go +++ b/schemabuilder/output.go @@ -6,7 +6,7 @@ import ( "reflect" "sort" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // buildStruct is a function for building the graphql.Type for a passed in struct type. diff --git a/schemabuilder/reflect.go b/schemabuilder/reflect.go index de6b2a1..61a26e5 100644 --- a/schemabuilder/reflect.go +++ b/schemabuilder/reflect.go @@ -7,7 +7,7 @@ import ( "strings" "unicode" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // graphQLFieldInfo contains basic struct field information related to GraphQL. diff --git a/schemabuilder/schema.go b/schemabuilder/schema.go index ee7b50c..d766e91 100644 --- a/schemabuilder/schema.go +++ b/schemabuilder/schema.go @@ -4,7 +4,7 @@ import ( "fmt" "reflect" - "go.appointy.com/jaal/graphql" + "go.saastack.io/jaal/graphql" ) // Schema is a struct that can be used to build out a GraphQL schema. Functions diff --git a/ws.go b/ws.go index 4629db1..60ff8ef 100644 --- a/ws.go +++ b/ws.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/appointy/idgen" "log" "net/http" "strings" @@ -13,17 +14,17 @@ import ( "github.com/gorilla/websocket" "gocloud.dev/pubsub" - "go.appointy.com/jaal/graphql" - "go.appointy.com/jaal/jerrors" - "go.appointy.com/jaal/schemabuilder" + "go.saastack.io/jaal/graphql" + "go.saastack.io/jaal/jerrors" + "go.saastack.io/jaal/schemabuilder" ) // HTTPSubHandler implements the handler required for executing the graphql subscriptions -func HTTPSubHandler(schema *graphql.Schema, s *pubsub.Subscription) (http.Handler, func()) { +func HTTPSubHandler(schema *graphql.Schema, subs *pubsub.Subscription) (http.Handler, func()) { source := make(chan *event) sessions := &sessions{ - data: map[string][]chan *event{}, - chans: map[string][]chan struct{}{}, + data: map[string]map[string]chan *event{}, + chans: map[string]map[string]chan struct{}{}, } return &httpSubHandler{ handler: handler{ @@ -35,7 +36,7 @@ func HTTPSubHandler(schema *graphql.Schema, s *pubsub.Subscription) (http.Handle source: source, sessions: sessions, }, func() { - go startListening(s, source, func() { + go startListening(subs, source, func() { exit(sessions) }) go listenSource(source, sessions) @@ -45,9 +46,9 @@ func HTTPSubHandler(schema *graphql.Schema, s *pubsub.Subscription) (http.Handle func listenSource(events chan *event, ss *sessions) { for evt := range events { ss.RLock() - for _, v := range ss.data { - for _, s := range v { - s <- evt + for _, mp := range ss.data { + for _, v := range mp { + v <- evt } } ss.RUnlock() @@ -58,7 +59,8 @@ func startListening(s *pubsub.Subscription, source chan<- *event, cancel func()) for { msg, err := s.Receive(context.Background()) if err != nil { - fmt.Println("Pubsub failed: ", err) + fmt.Println("Pubsub failed with error:", err) + fmt.Println("Closing all sessions") cancel() return } @@ -86,8 +88,8 @@ type event struct { type sessions struct { sync.RWMutex - data map[string][]chan *event - chans map[string][]chan struct{} + data map[string]map[string]chan *event + chans map[string]map[string]chan struct{} } type wsMessage struct { @@ -139,7 +141,8 @@ func (h *httpSubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } - if err := writeResponse(conn, "connection_ack", "", nil, nil); err != nil { + id := idgen.New("sock") + if err := writeResponse(conn, "connection_ack", msg.Id, nil, nil); err != nil { fmt.Println(err) return } @@ -157,7 +160,7 @@ loop: case "start": var gql gqlPayload if err := json.Unmarshal(data.Payload, &gql); err != nil { - if err := writeResponse(conn, "connection_error", "", nil, err); err != nil { + if err := writeResponse(conn, "connection_error", data.Id, nil, err); err != nil { fmt.Println(err) return } @@ -192,30 +195,36 @@ loop: Fragments: query.SelectionSet.Fragments, }, } - go func(conn *webConn, data *wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request) { - if err := h.serveHTTP(conn, *data, schema, query, end, w, r); err != nil { - fmt.Println("Id:", data.Id, ": terminated: ", err) + go func(conn *webConn, data *wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request, id string) { + err := h.serveHTTP(conn, *data, schema, query, end, w, r, id) + if err := writeResponse(conn, "complete", data.Id, nil, err); err != nil { + fmt.Println(err) } h.sessions.Lock() - if _, ok := h.sessions.data[data.Id]; ok { - if err := writeResponse(conn, "complete", data.Id, nil, nil); err != nil { - fmt.Println(err) - } - fmt.Println("Id:", data.Id, ": terminated.") + delete(h.sessions.data[data.Id], id) + delete(h.sessions.chans[data.Id], id) + if len(h.sessions.data[data.Id]) == 0 { + delete(h.sessions.data, data.Id) + delete(h.sessions.chans, data.Id) } - delete(h.sessions.data, data.Id) - delete(h.sessions.chans, data.Id) h.sessions.Unlock() - }(conn, &data, schema, modQuery, end, w, r) + }(conn, &data, schema, modQuery, end, w, r, id) } case "stop": h.sessions.RLock() - for _, v := range h.sessions.chans[data.Id] { - v <- struct{}{} + if _, ok := h.sessions.chans[data.Id][id]; ok { + h.sessions.chans[data.Id][id] <- struct{}{} } h.sessions.RUnlock() case "connection_terminate": - exit(h.sessions) + h.sessions.RLock() + delete(h.sessions.data[data.Id], id) + delete(h.sessions.chans[data.Id], id) + if len(h.sessions.data[data.Id]) == 0 { + delete(h.sessions.data, data.Id) + delete(h.sessions.chans, data.Id) + } + h.sessions.RUnlock() break loop default: } @@ -223,15 +232,11 @@ loop: } func exit(ss *sessions) { + // closing all sessions ss.RLock() - for _, v := range ss.chans { - for _, s := range v { - s <- struct{}{} - } - } - for _, v := range ss.data { - for _, s := range v { - close(s) + for _, chMp := range ss.chans { + for _, ch := range chMp { + ch <- struct{}{} } } ss.RUnlock() @@ -258,6 +263,9 @@ func writeResponse(w *webConn, typ, id string, r interface{}, er error) error { } } } else if typ == "error" || typ == "connection_error" { + if er == nil { + er = errors.New("connection is closed") + } str := strings.Replace(er.Error(), "\"", "\\\"", -1) payload = json.RawMessage("{ \"error\" : \"" + str + "\"}") } @@ -275,26 +283,24 @@ func writeResponse(w *webConn, typ, id string, r interface{}, er error) error { return nil } -func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request) error { +func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql.Type, query *graphql.Query, end chan struct{}, w http.ResponseWriter, r *http.Request, id string) error { sid := data.Id sess := make(chan *event) h.sessions.Lock() - h.sessions.data[sid] = append(h.sessions.data[sid], sess) - h.sessions.chans[sid] = append(h.sessions.chans[sid], end) + if h.sessions.data[sid] == nil { + h.sessions.data[sid] = map[string]chan *event{} + } + h.sessions.data[sid][id] = sess + if h.sessions.chans[sid] == nil { + h.sessions.chans[sid] = map[string]chan struct{}{} + } + h.sessions.chans[sid][id] = end h.sessions.Unlock() - cls := func(ss *sessions, sid string) { + cls := func(ss *sessions, sid, id string) { ss.Lock() - for _, v := range ss.data[sid] { - close(v) - for range v { - } - } - for _, v := range ss.chans[sid] { - close(v) - for range v { - } - } + close(ss.data[sid][id]) + close(ss.chans[sid][id]) ss.Unlock() } @@ -302,25 +308,25 @@ func (h *httpSubHandler) serveHTTP(conn *webConn, data wsMessage, schema graphql for msg := range sess { select { case <-end: - cls(h.sessions, sid) + cls(h.sessions, sid, id) return nil default: - if err := func() error { - res, err := h.executor.Execute(r.Context(), schema, &schemabuilder.Subscription{msg.payload}, query) - if err == graphql.ErrNoUpdate { + // Subscription should have only one root selection + // https://spec.graphql.org/June2018/#sec-Single-root-field + if len(query.Selections) == 1 && query.Selections[0].Name == msg.typ { + if err := func() error { + res, err := h.executor.Execute(r.Context(), schema, &schemabuilder.Subscription{Payload: msg.payload}, query) + if err == graphql.ErrNoUpdate { + return nil + } + if err := writeResponse(conn, "data", data.Id, res, err); err != nil { + return err + } return nil - } - rer := err - if err := writeResponse(conn, "data", data.Id, res, rer); err != nil { - return err - } - if rer != nil { + }(); err != nil { + cls(h.sessions, sid, id) return err } - return nil - }(); err != nil { - cls(h.sessions, sid) - return err } } }