From 801c0fd31e07d1075098a61e27b6afdef01295b5 Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Fri, 10 Dec 2021 14:18:17 +0100 Subject: [PATCH 1/9] feat(neuron_subscription.ex): support for subscriptions where transport is Phoenix Channel Implementation is based on abshinte_websocket library. fix #33 --- lib/neuron_subscription.ex | 39 +++++++++++++++++++++++++++++++ mix.exs | 5 ++-- mix.lock | 9 +++++++ test/neuron_subscription_test.exs | 31 ++++++++++++++++++++++++ 4 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 lib/neuron_subscription.ex create mode 100644 test/neuron_subscription_test.exs diff --git a/lib/neuron_subscription.ex b/lib/neuron_subscription.ex new file mode 100644 index 0000000..e8cc632 --- /dev/null +++ b/lib/neuron_subscription.ex @@ -0,0 +1,39 @@ +defmodule Neuron.Subscription do + use GenServer + + def start_link(opts) do + GenServer.start_link(Neuron.Subscription, opts) + end + + @impl :true + def init(opts) do + query = Keyword.fetch!(opts, :query) + name = Keyword.fetch!(opts, :name) + variables = Keyword.fetch!(opts, :variables) + callback = Keyword.fetch!(opts, :callback) + + subscription_server_name = Module.concat([name, Caller, SubscriptionServer]) + + AbsintheWebSocket.SubscriptionServer.subscribe( + subscription_server_name, + name, + callback, + query, + variables + ) + {:ok, opts} + end + + def supervisor(subscriber: subscriber) do + + {AbsintheWebSocket.Supervisor, + [ + subscriber: subscriber, + url: subscriber.url, + token: nil, + base_name: subscriber, + async: true + ]} + + end +end diff --git a/mix.exs b/mix.exs index 1b50525..ac11329 100644 --- a/mix.exs +++ b/mix.exs @@ -34,11 +34,12 @@ defmodule Neuron.Mixfile do {:dialyxir, "~> 1.0.0-rc.4", only: :dev, runtime: false}, {:httpoison, "~> 1.0"}, {:jason, "~> 1.1", optional: true}, - {:poison, "~> 4.0", only: :test}, + {:poison, "~> 4.0", only: :test, override: true}, {:mock, "~> 0.3.3", only: :test}, {:coverex, "~> 1.5", only: :test}, {:credo, "~> 1.1", only: [:dev, :test]}, - {:ex_doc, ">= 0.0.0", only: :dev, runtime: false} + {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, + {:absinthe_websocket, "~> 0.2.0"} ] end diff --git a/mix.lock b/mix.lock index 2e26321..6306075 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,5 @@ %{ + "absinthe_websocket": {:hex, :absinthe_websocket, "0.2.2", "1d7576b269e47d04d2ce2cf8aab892f043f1c2026d07cecd4662668711ff5e8f", [:mix], [{:poison, "~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}, {:websockex, "~> 0.4", [hex: :websockex, repo: "hexpm", optional: false]}], "hexpm", "01de825eacc2744f0f884e9050011fd3cfb7e8973cdbd980649fa505bf32a7f8"}, "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"}, "coverex": {:hex, :coverex, "1.5.0", "a4248302f09562993041f1b056866bfd5688d3a03c429de80c47ea6663989ecc", [:mix], [{:hackney, "~> 1.5", [hex: :hackney, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 3.1 or ~> 4.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm", "21a8f6e734a277b86c01b3cc44b7cc8b77cb1886adbebb708eefc6398567dcac"}, @@ -17,11 +18,19 @@ "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm", "d34f013c156db51ad57cc556891b9720e6a1c1df5fe2e15af999c84d6cebeb1a"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, + "mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mock": {:hex, :mock, "0.3.3", "42a433794b1291a9cf1525c6d26b38e039e0d3a360732b5e467bfc77ef26c914", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "a280d1f7b6f4bbcbd9282616e57502721781c66ee5b540720efabeaf627cc7eb"}, "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, + "phoenix": {:hex, :phoenix, "1.6.4", "bc9a757f0a4eac88e1e3501245a6259e74d30970df8c072836d755608dbc4c7d", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 1.0", [hex: :phoenix_view, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9b6cb3f31e3ea1049049852703eca794f7afdb0c1dc111d8f166ba032c103a80"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"}, + "phoenix_view": {:hex, :phoenix_view, "1.0.0", "fea71ecaaed71178b26dd65c401607de5ec22e2e9ef141389c721b3f3d4d8011", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "82be3e2516f5633220246e2e58181282c71640dab7afc04f70ad94253025db0c"}, + "plug": {:hex, :plug, "1.12.1", "645678c800601d8d9f27ad1aebba1fdb9ce5b2623ddb961a074da0b96c35187d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d57e799a777bc20494b784966dc5fbda91eb4a09f571f76545b72a634ce0d30b"}, + "plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"}, "poison": {:hex, :poison, "4.0.1", "bcb755a16fac91cad79bfe9fc3585bb07b9331e50cfe3420a24bcc2d735709ae", [:mix], [], "hexpm", "ba8836feea4b394bb718a161fc59a288fe0109b5006d6bdf97b6badfcf6f0f25"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm", "603561dc0fd62f4f2ea9b890f4e20e1a0d388746d6e20557cafb1b16950de88c"}, + "telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"}, + "websockex": {:hex, :websockex, "0.4.3", "92b7905769c79c6480c02daacaca2ddd49de936d912976a4d3c923723b647bf0", [:mix], [], "hexpm", "95f2e7072b85a3a4cc385602d42115b73ce0b74a9121d0d6dbbf557645ac53e4"}, } diff --git a/test/neuron_subscription_test.exs b/test/neuron_subscription_test.exs new file mode 100644 index 0000000..954ffde --- /dev/null +++ b/test/neuron_subscription_test.exs @@ -0,0 +1,31 @@ +defmodule SubscriptionExample do + + @url "https://my.awesome.graphql/endpoint" + @query """ + subscription { + user { + name + } + } + """ + + def start_link() do + Neuron.Subscription.start_link(ws_url: @url, query: @query, name: __MODULE__, subscriber: __MODULE__, variables: %{}, callback: :handle_update) + end + + def handle_update(data, state) do + IO.puts("Received Update - #{inspect(data)}") + + {:ok, state} + end +end + +defmodule NeuronSubscriptionTest do + use ExUnit.Case + + describe "subscription" do + test "ok" do + {:ok, _pid} = SubscriptionExample.start_link() + end + end +end From 3169e4f9a7764b2103b6b20400f45f14950b59ce Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Fri, 10 Dec 2021 14:51:47 +0100 Subject: [PATCH 2/9] refactor(neuron_subscription.ex): send to supervisor method url as separate attribute Calling url on subscriber value did not work. --- lib/neuron_subscription.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/neuron_subscription.ex b/lib/neuron_subscription.ex index e8cc632..1e2eb2e 100644 --- a/lib/neuron_subscription.ex +++ b/lib/neuron_subscription.ex @@ -24,12 +24,12 @@ defmodule Neuron.Subscription do {:ok, opts} end - def supervisor(subscriber: subscriber) do + def supervisor(subscriber: subscriber, url: url) do {AbsintheWebSocket.Supervisor, [ subscriber: subscriber, - url: subscriber.url, + url: url, token: nil, base_name: subscriber, async: true From 2c1825c332bcd85a5caaf15aaca44a0f0a06234e Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Wed, 15 Dec 2021 15:13:11 +0100 Subject: [PATCH 3/9] feat(neuron_subscription.ex): abshinthe subscriptions Abshinthe subscriptions implementation based on elixir library abshinthe websocket. #33 --- lib/neuron_subscription.ex | 38 +++++++++++++------------------ test/neuron_subscription_test.exs | 31 ------------------------- 2 files changed, 16 insertions(+), 53 deletions(-) delete mode 100644 test/neuron_subscription_test.exs diff --git a/lib/neuron_subscription.ex b/lib/neuron_subscription.ex index 1e2eb2e..c75cc5d 100644 --- a/lib/neuron_subscription.ex +++ b/lib/neuron_subscription.ex @@ -1,39 +1,33 @@ defmodule Neuron.Subscription do use GenServer - def start_link(opts) do - GenServer.start_link(Neuron.Subscription, opts) - end - - @impl :true + @impl true def init(opts) do - query = Keyword.fetch!(opts, :query) - name = Keyword.fetch!(opts, :name) - variables = Keyword.fetch!(opts, :variables) - callback = Keyword.fetch!(opts, :callback) - - subscription_server_name = Module.concat([name, Caller, SubscriptionServer]) - - AbsintheWebSocket.SubscriptionServer.subscribe( - subscription_server_name, - name, - callback, - query, - variables - ) {:ok, opts} end - def supervisor(subscriber: subscriber, url: url) do - + def supervisor(subscriber: subscriber, url: url, token: token) do {AbsintheWebSocket.Supervisor, [ subscriber: subscriber, url: url, - token: nil, + token: token, base_name: subscriber, async: true ]} + end + def subscribe(module, query) do + callback = fn result -> + apply(module, :handle_update, [result]) + end + + AbsintheWebSocket.SubscriptionServer.subscribe( + Module.concat(module, SubscriptionServer), + Neuron.Subscription, + callback, + query, + [] + ) end end diff --git a/test/neuron_subscription_test.exs b/test/neuron_subscription_test.exs deleted file mode 100644 index 954ffde..0000000 --- a/test/neuron_subscription_test.exs +++ /dev/null @@ -1,31 +0,0 @@ -defmodule SubscriptionExample do - - @url "https://my.awesome.graphql/endpoint" - @query """ - subscription { - user { - name - } - } - """ - - def start_link() do - Neuron.Subscription.start_link(ws_url: @url, query: @query, name: __MODULE__, subscriber: __MODULE__, variables: %{}, callback: :handle_update) - end - - def handle_update(data, state) do - IO.puts("Received Update - #{inspect(data)}") - - {:ok, state} - end -end - -defmodule NeuronSubscriptionTest do - use ExUnit.Case - - describe "subscription" do - test "ok" do - {:ok, _pid} = SubscriptionExample.start_link() - end - end -end From 166f3b1038fb9dde4c1c39308e3378ed5dbbd696 Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Fri, 17 Dec 2021 10:40:01 +0100 Subject: [PATCH 4/9] feat(neuron_subscription.ex): support for query variables in subscription Just pass query variables to AbsintheWebSocket.SubscriptionServer. --- lib/neuron_subscription.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/neuron_subscription.ex b/lib/neuron_subscription.ex index c75cc5d..2904ca9 100644 --- a/lib/neuron_subscription.ex +++ b/lib/neuron_subscription.ex @@ -17,7 +17,7 @@ defmodule Neuron.Subscription do ]} end - def subscribe(module, query) do + def subscribe(module, query, %{} = variables) do callback = fn result -> apply(module, :handle_update, [result]) end @@ -27,7 +27,7 @@ defmodule Neuron.Subscription do Neuron.Subscription, callback, query, - [] + variables ) end end From 807d5b636263bd90e1fa5949a7451c10e411440a Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Fri, 17 Dec 2021 12:20:18 +0100 Subject: [PATCH 5/9] docs(neuron_subscription.ex): how to set up Abshinthe subscriptions --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/README.md b/README.md index 4542781..42387f3 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,47 @@ iex> Neuron.query(""" ) ``` +You can do subscriptions: + +```elixir +defmodule AddUserSubscription do + + @url "ws://localhost:4000/socket/websocket" + + @query """ + subscription {userAdded { + name + age + color + uuid + }} + """ + + + def supervisor() do + Neuron.Subscription.supervisor(subscriber: __MODULE__, url: @url, token: "") + end + + def handle_update(data) do + IO.puts("Received Update - #{inspect(data)}") + end + + def subscribe() do + Neuron.Subscription.subscribe(__MODULE__, @query, %{}) + end + +end + +``` + +and add this to your application children: + +```elixir + +AddUserSubscription.supervisor() + +``` + ### Overriding HTTP Timeout `HTTPoison` default timeout is 5000ms, in case we need to handle longer timeout, using default `Neuron.Connection` module, we could set `connection_opts` which will be passed to `HTTPoison`. So to override timeout to 15000ms, we could do: From 7b690df4c7c16c4a1b004d809d355b73b36a9816 Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Fri, 17 Dec 2021 13:37:24 +0100 Subject: [PATCH 6/9] add to readme.md server side absinthe subscription implementation. --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 42387f3..599aeac 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,8 @@ AddUserSubscription.supervisor() ``` +Absinthe GraphQL subscription server side: https://github.com/karlosmid/zoom + ### Overriding HTTP Timeout `HTTPoison` default timeout is 5000ms, in case we need to handle longer timeout, using default `Neuron.Connection` module, we could set `connection_opts` which will be passed to `HTTPoison`. So to override timeout to 15000ms, we could do: From 93e645a06d949200a3fcea395f045a38eb6e7d26 Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Fri, 17 Dec 2021 17:01:05 +0100 Subject: [PATCH 7/9] absinthe websocket bug fix for double callbacks on reconnect. --- mix.exs | 2 +- mix.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index ac11329..b3bbf32 100644 --- a/mix.exs +++ b/mix.exs @@ -39,7 +39,7 @@ defmodule Neuron.Mixfile do {:coverex, "~> 1.5", only: :test}, {:credo, "~> 1.1", only: [:dev, :test]}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, - {:absinthe_websocket, "~> 0.2.0"} + {:absinthe_websocket, git: "https://github.com/karlosmid/absinthe_websocket"} ] end diff --git a/mix.lock b/mix.lock index 6306075..d2429b8 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,5 @@ %{ - "absinthe_websocket": {:hex, :absinthe_websocket, "0.2.2", "1d7576b269e47d04d2ce2cf8aab892f043f1c2026d07cecd4662668711ff5e8f", [:mix], [{:poison, "~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}, {:websockex, "~> 0.4", [hex: :websockex, repo: "hexpm", optional: false]}], "hexpm", "01de825eacc2744f0f884e9050011fd3cfb7e8973cdbd980649fa505bf32a7f8"}, + "absinthe_websocket": {:git, "https://github.com/karlosmid/absinthe_websocket", "b08e0cf07c1dcdbb98e14c3905d8ee7b777aee66", []}, "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"}, "coverex": {:hex, :coverex, "1.5.0", "a4248302f09562993041f1b056866bfd5688d3a03c429de80c47ea6663989ecc", [:mix], [{:hackney, "~> 1.5", [hex: :hackney, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 3.1 or ~> 4.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm", "21a8f6e734a277b86c01b3cc44b7cc8b77cb1886adbebb708eefc6398567dcac"}, From b38500cdfe9f0aa25ff1389ca575cbb428d65368 Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Mon, 20 Dec 2021 08:21:23 +0100 Subject: [PATCH 8/9] refactor(neuron_subscription.ex): add to supervisor method id parameter Now we can start several supervised subscriptions. --- lib/neuron_subscription.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/neuron_subscription.ex b/lib/neuron_subscription.ex index 2904ca9..ad6bc40 100644 --- a/lib/neuron_subscription.ex +++ b/lib/neuron_subscription.ex @@ -6,15 +6,15 @@ defmodule Neuron.Subscription do {:ok, opts} end - def supervisor(subscriber: subscriber, url: url, token: token) do - {AbsintheWebSocket.Supervisor, + def supervisor(subscriber: subscriber, url: url, token: token, id: id) do + Supervisor.child_spec({AbsintheWebSocket.Supervisor, [ subscriber: subscriber, url: url, token: token, base_name: subscriber, async: true - ]} + ]}, id: id) end def subscribe(module, query, %{} = variables) do From d00ddab7b6b580a3fce6e40b6cf9262b9dac1d5f Mon Sep 17 00:00:00 2001 From: Karlo Smid Date: Tue, 21 Dec 2021 14:53:12 +0100 Subject: [PATCH 9/9] docs(neuron_subscriptions): logger configuration that logs subscription errors --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 599aeac..7631aea 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,13 @@ and add this to your application children: AddUserSubscription.supervisor() ``` +To have more logs in case of errors in your subscription module: + +```elixir +config :logger, + level: :debug, + handle_sasl_reports: true +``` Absinthe GraphQL subscription server side: https://github.com/karlosmid/zoom