From 5bbff361e8c222d219f89bac84665a40f8674a74 Mon Sep 17 00:00:00 2001 From: Daniele Date: Wed, 11 Feb 2026 14:42:43 +0100 Subject: [PATCH 1/6] Clarify README wording about codecs Improve wording in README: replace the em dash phrasing with "so" and change the period to an exclamation mark to make the sentence read more naturally and upbeat when describing manual codecs and wiring fields. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2d0138a..3b5ab94 100644 --- a/README.md +++ b/README.md @@ -110,8 +110,8 @@ let user_id_codec = codec.map(codec.int(), UserId, fn(uid) { ``` Gleam has no derive macros or reflection, so codecs for complex types -are manual. The combinators handle the serialization — you just wire -the fields together. +are manual. The combinators handle the serialization so you just wire +the fields together! ## Modules From a6c206637bf48929f684f8b3b3e977942ea6fe47 Mon Sep 17 00:00:00 2001 From: Daniele Date: Wed, 4 Mar 2026 10:33:17 +0100 Subject: [PATCH 2/6] Add cluster monitoring and tests Introduce a typed cluster monitoring feature: new module distribute/cluster/monitor.gleam provides subscribe/unsubscribe and ClusterEvent variants (NodeUp/NodeDown) and runs an OTP actor that selects native node up/down messages. Add corresponding Erlang FFI helpers in src/cluster_ffi.erl and tests in test/cluster/monitor_test.gleam that simulate node events. Bump package version to 3.1.0 and document the feature in README and CHANGELOG. --- .gitignore | 1 + CHANGELOG.md | 11 +++- README.md | 23 +++++++++ gleam.toml | 2 +- src/cluster_ffi.erl | 20 +++++++- src/distribute.gleam | 16 +++++- src/distribute/cluster/monitor.gleam | 76 ++++++++++++++++++++++++++++ test/cluster/monitor_test.gleam | 63 +++++++++++++++++++++++ 8 files changed, 208 insertions(+), 4 deletions(-) create mode 100644 src/distribute/cluster/monitor.gleam create mode 100644 test/cluster/monitor_test.gleam diff --git a/.gitignore b/.gitignore index 81b1fda..8ba424b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ erl_crash.dump **/build *.log +/notes diff --git a/CHANGELOG.md b/CHANGELOG.md index 921c03d..45f19e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## v3.1.0 — 2026-03-04 + +### Added + +- **Cluster Monitoring** (`distribute/cluster/monitor`) — Typed event-driven + notifications for `NodeUp` and `NodeDown`. +- `distribute.subscribe(subject)` — Start a monitored subscription. +- `distribute.unsubscribe(monitor_subject)` — Stop a subscription. + ## v3.0.0 — 2026-02-11 Ground-up rewrite. Smaller API, proper OTP actors, compile-time type safety @@ -38,4 +47,4 @@ connection pool, retry. Also removed `whereis_global(name, encoder, decoder)`. - `global.reply(reply_to, response, encoder)` — send a response back. - `composite.option(c)`, `composite.result(ok, err)`, `composite.tuple2(a, b)`, `composite.tuple3(a, b, c)`. -- `registry.named(name, codec)` — short form of `typed_name`. \ No newline at end of file +- `registry.named(name, codec)` — short form of `typed_name`. diff --git a/README.md b/README.md index 3b5ab94..4d43260 100644 --- a/README.md +++ b/README.md @@ -124,9 +124,32 @@ the fields together! | `distribute/codec/composite` | Option, Result, Tuple codecs | | `distribute/codec/tagged` | Tagged messages with version field | | `distribute/global` | `GlobalSubject(msg)`, `call`, `reply` | +| `distribute/cluster/monitor` | `NodeUp`, `NodeDown` typed events | | `distribute/registry` | `TypedName(msg)`, `:global` registration | | `distribute/receiver` | Typed receive, OTP actor wrappers | +### Cluster Monitoring + +Subscribe to cluster events (`NodeUp`, `NodeDown`) to react to node topology changes. + +```gleam +import distribute +import distribute/cluster/monitor + +let subj = process.new_subject() +let assert Ok(m) = distribute.subscribe(subj) + +// In your actor/process +case process.receive(subj, 5000) { + Ok(monitor.NodeUp(node)) -> io.println("Node joined: " <> node) + Ok(monitor.NodeDown(node)) -> io.println("Node left: " <> node) + _ -> Nil +} + +// Later +distribute.unsubscribe(m) +``` + ## Caveats **What the types catch** — within one codebase, `TypedName` and diff --git a/gleam.toml b/gleam.toml index 0bdb1d4..2a10525 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,5 +1,5 @@ name = "distribute" -version = "3.0.0" +version = "3.1.0" description = "Typed distributed messaging for Gleam on the BEAM." licences = ["MIT"] repository = { type = "github", user = "lupodevelop", repo = "distribute" } diff --git a/src/cluster_ffi.erl b/src/cluster_ffi.erl index 0629403..c5ee329 100644 --- a/src/cluster_ffi.erl +++ b/src/cluster_ffi.erl @@ -1,6 +1,9 @@ -module(cluster_ffi). -export([start_node/2, connect/1, nodes/0, self_node/0, ping/1, - is_ok_atom/1, get_error_reason/1, is_true/1, is_ignored/1]). + is_ok_atom/1, get_error_reason/1, is_true/1, is_ignored/1, + nodeup_atom/0, nodedown_atom/0, monitor_nodes/1, + atom_to_string/1, get_node_from_tuple/1, + simulate_node_event/3]). -import(distribute_ffi_utils, [to_atom_safe/1]). @@ -80,3 +83,18 @@ to_atom_force(Atom) when is_atom(Atom) -> is_valid_node_input(Bin) when is_binary(Bin) -> byte_size(Bin) =< 512 andalso binary:match(Bin, <<"\0">>) =:= nomatch. + +nodeup_atom() -> nodeup. +nodedown_atom() -> nodedown. + +monitor_nodes(Flag) -> + net_kernel:monitor_nodes(Flag). + +atom_to_string(Atom) when is_atom(Atom) -> + atom_to_binary(Atom, utf8). + +get_node_from_tuple({_, Node}) -> Node. + +simulate_node_event(Pid, Tag, NodeName) -> + NodeAtom = binary_to_atom(NodeName, utf8), + Pid ! {Tag, NodeAtom}. diff --git a/src/distribute.gleam b/src/distribute.gleam index 07b9bd0..3b319ea 100644 --- a/src/distribute.gleam +++ b/src/distribute.gleam @@ -3,14 +3,16 @@ import distribute/actor as dist_actor import distribute/cluster +import distribute/cluster/monitor import distribute/codec import distribute/global import distribute/receiver import distribute/registry +import gleam/erlang/process import gleam/otp/actor pub fn version() -> String { - "3.0.0" + "3.1.0" } // -- Cluster ----------------------------------------------------------------- @@ -84,3 +86,15 @@ pub fn lookup( pub fn unregister(name: String) -> Result(Nil, registry.RegisterError) { registry.unregister(name) } + +// -- Cluster Monitoring ------------------------------------------------------ + +pub fn subscribe( + user_subject: process.Subject(monitor.ClusterEvent), +) -> Result(process.Subject(monitor.ControlMessage), actor.StartError) { + monitor.subscribe(user_subject) +} + +pub fn unsubscribe(monitor_subject: process.Subject(monitor.ControlMessage)) -> Nil { + monitor.unsubscribe(monitor_subject) +} diff --git a/src/distribute/cluster/monitor.gleam b/src/distribute/cluster/monitor.gleam new file mode 100644 index 0000000..d82f441 --- /dev/null +++ b/src/distribute/cluster/monitor.gleam @@ -0,0 +1,76 @@ +import gleam/dynamic.{type Dynamic} +import gleam/erlang/process.{type Subject} +import gleam/otp/actor +import gleam/result + +pub type ClusterEvent { + NodeUp(node: String) + NodeDown(node: String) +} + +@external(erlang, "cluster_ffi", "nodeup_atom") +fn nodeup_atom() -> Dynamic + +@external(erlang, "cluster_ffi", "nodedown_atom") +fn nodedown_atom() -> Dynamic + +@external(erlang, "cluster_ffi", "monitor_nodes") +fn monitor_nodes_ffi(flag: Bool) -> Dynamic + +@external(erlang, "cluster_ffi", "atom_to_string") +fn atom_to_string_ffi(atom: Dynamic) -> String + +@external(erlang, "cluster_ffi", "get_node_from_tuple") +fn get_node_from_tuple_ffi(msg: Dynamic) -> Dynamic + +pub type ControlMessage { + Stop + InternalEvent(ClusterEvent) +} + +/// Subscribe to cluster events. +/// Events will be sent to the provided `user_subject`. +/// Returns the `Subject(ControlMessage)` for the monitor monitor actor. +pub fn subscribe( + user_subject: Subject(ClusterEvent), +) -> Result(Subject(ControlMessage), actor.StartError) { + actor.new_with_initialiser(5000, fn(self_subject) { + // Start node monitoring in the actor's context + monitor_nodes_ffi(True) + + // Select standard nodeup/nodedown Erlang messages + let selector = + process.new_selector() + |> process.select(self_subject) + |> process.select_record(nodeup_atom(), 1, fn(msg) { + let node_atom = get_node_from_tuple_ffi(msg) + InternalEvent(NodeUp(atom_to_string_ffi(node_atom))) + }) + |> process.select_record(nodedown_atom(), 1, fn(msg) { + let node_atom = get_node_from_tuple_ffi(msg) + InternalEvent(NodeDown(atom_to_string_ffi(node_atom))) + }) + + Ok( + actor.initialised(user_subject) + |> actor.selecting(selector) + |> actor.returning(self_subject), + ) + }) + |> actor.on_message(fn(state, msg) { + case msg { + InternalEvent(event) -> { + process.send(state, event) + actor.continue(state) + } + Stop -> actor.stop() + } + }) + |> actor.start() + |> result.map(fn(started) { started.data }) +} + +/// Stop the monitoring for a specific subscription. +pub fn unsubscribe(monitor_subject: Subject(ControlMessage)) -> Nil { + process.send(monitor_subject, Stop) +} diff --git a/test/cluster/monitor_test.gleam b/test/cluster/monitor_test.gleam new file mode 100644 index 0000000..8048297 --- /dev/null +++ b/test/cluster/monitor_test.gleam @@ -0,0 +1,63 @@ +import distribute/cluster/monitor.{NodeDown, NodeUp} +import gleam/dynamic.{type Dynamic} +import gleam/erlang/process +import gleeunit/should + +@external(erlang, "cluster_ffi", "simulate_node_event") +fn simulate_node_event(pid: process.Pid, tag: Dynamic, node: String) -> Nil + +@external(erlang, "cluster_ffi", "nodeup_atom") +fn nodeup_atom() -> Dynamic + +@external(erlang, "cluster_ffi", "nodedown_atom") +fn nodedown_atom() -> Dynamic + +pub fn monitor_lifecycle_test() { + let subj = process.new_subject() + let assert Ok(monitor_subj) = monitor.subscribe(subj) + + // Stopping the monitor + monitor.unsubscribe(monitor_subj) + + // Wait a bit to ensure it stops + process.sleep(100) + + // Actor should be dead + should.be_false(process.is_alive(process.subject_owner(monitor_subj) |> should.be_ok())) +} + +pub fn monitor_node_events_test() { + let subj = process.new_subject() + let assert Ok(monitor_subj) = monitor.subscribe(subj) + let monitor_pid = process.subject_owner(monitor_subj) |> should.be_ok() + + // Simulate native Erlang nodeup message + simulate_node_event(monitor_pid, nodeup_atom(), "slave@127.0.0.1") + + // Verify it's received as Gleam variant + let assert Ok(event1) = process.receive(subj, 500) + should.equal(event1, NodeUp("slave@127.0.0.1")) + + // Simulate native Erlang nodedown message + simulate_node_event(monitor_pid, nodedown_atom(), "slave@127.0.0.1") + + // Verify it's received as Gleam variant + let assert Ok(event2) = process.receive(subj, 500) + should.equal(event2, NodeDown("slave@127.0.0.1")) + + monitor.unsubscribe(monitor_subj) +} + +pub fn monitor_events_signature_test() { + // Just checking types and basic construction + let up = NodeUp("test@host") + let down = NodeDown("test@host") + + case up { + NodeUp(n) -> should.equal(n, "test@host") + } + + case down { + NodeDown(n) -> should.equal(n, "test@host") + } +} From 64c31452147e0c59c739c7935936cb3873e27063 Mon Sep 17 00:00:00 2001 From: Daniele Date: Fri, 6 Mar 2026 11:47:05 +0100 Subject: [PATCH 3/6] Add telemetry & variant codec, integrate hooks Introduce internal telemetry and a variant codec, and wire telemetry into the distribute subsystem. - Add src/distribute/internal/telemetry.gleam: a thin bridge to Erlang :telemetry with helpers and specific emitters (message send, registry lookup/register, codec encode/decode, node events). - Add src/distribute/codec/variant.gleam: a VariantBuilder to construct codecs for ADTs (variant tagging, encoding/decoding, validation). - Instrument distribution code to emit telemetry: global.gleam (encode/decode/send timings and sizes), registry.gleam (lookup/register timings and outcomes), cluster/monitor.gleam (node up/down events). Also import telemetry where needed. - Minor API and docs cleanup across several files (formatting, docstring tweaks) and small ergonomic changes (unsubscribe formatting, receiver/actor comments). - Update src/distribute_ffi_utils.erl to export and implement to_atom/1 and adjust exports for telemetry support. These changes add observability for distributed operations and provide a reusable variant codec for ADT serialization. Unit/behavior semantics preserved; duplicate variant IDs are validated at build time. --- src/distribute.gleam | 7 +- src/distribute/actor.gleam | 2 +- src/distribute/cluster.gleam | 9 +- src/distribute/cluster/monitor.gleam | 11 +- src/distribute/codec.gleam | 21 +-- src/distribute/codec/composite.gleam | 8 +- src/distribute/codec/tagged.gleam | 9 +- src/distribute/codec/variant.gleam | 180 ++++++++++++++++++++++++ src/distribute/global.gleam | 55 +++++--- src/distribute/internal/telemetry.gleam | 108 ++++++++++++++ src/distribute/receiver.gleam | 4 +- src/distribute/registry.gleam | 91 ++++++------ src/distribute_ffi_utils.erl | 7 +- 13 files changed, 408 insertions(+), 104 deletions(-) create mode 100644 src/distribute/codec/variant.gleam create mode 100644 src/distribute/internal/telemetry.gleam diff --git a/src/distribute.gleam b/src/distribute.gleam index 3b319ea..97d1c18 100644 --- a/src/distribute.gleam +++ b/src/distribute.gleam @@ -1,5 +1,4 @@ -//// Facade for common distributed operations: start a node, register -//// actors, send messages, look things up. +//// Facade for common distributed operations. import distribute/actor as dist_actor import distribute/cluster @@ -95,6 +94,8 @@ pub fn subscribe( monitor.subscribe(user_subject) } -pub fn unsubscribe(monitor_subject: process.Subject(monitor.ControlMessage)) -> Nil { +pub fn unsubscribe( + monitor_subject: process.Subject(monitor.ControlMessage), +) -> Nil { monitor.unsubscribe(monitor_subject) } diff --git a/src/distribute/actor.gleam b/src/distribute/actor.gleam index 3e51522..9314f34 100644 --- a/src/distribute/actor.gleam +++ b/src/distribute/actor.gleam @@ -119,7 +119,7 @@ pub fn start_supervised( } } -/// Start N supervised actors, registered as `name_1` .. `name_N`. +/// Start N supervised actors, registered as name_1 .. name_N. pub fn pool( typed_name: registry.TypedName(msg), size: Int, diff --git a/src/distribute/cluster.gleam b/src/distribute/cluster.gleam index 3491885..56e3489 100644 --- a/src/distribute/cluster.gleam +++ b/src/distribute/cluster.gleam @@ -60,9 +60,8 @@ fn get_error_reason(value: dynamic.Dynamic) -> String // --------------------------------------------------------------------------- /// Start a distributed BEAM node. -/// -/// `name` must contain `@` (e.g. `"myapp@127.0.0.1"`). -/// `cookie` must be ≤ 255 characters. +/// Name must contain @ (e.g. myapp@127.0.0.1). +/// Cookie must be 255 characters or fewer. pub fn start_node(name: String, cookie: String) -> Result(Nil, StartError) { case validate_node_name(name) { Error(e) -> Error(e) @@ -80,7 +79,7 @@ pub fn start_node(name: String, cookie: String) -> Result(Nil, StartError) { } } -/// Connect to a remote node. Returns `Ok(Nil)` on success. +/// Connect to a remote node. Returns Ok(Nil) on success. pub fn connect(node: String) -> Result(Nil, ConnectError) { case string.contains(node, "@") { False -> Error(NodeNotFound) @@ -108,7 +107,7 @@ pub fn self_node() -> String { self_node_ffi() } -/// Ping a remote node. Returns `True` if it responds. +/// Ping a remote node. Returns True if it responds. pub fn ping(node: String) -> Bool { ping_ffi(node) } diff --git a/src/distribute/cluster/monitor.gleam b/src/distribute/cluster/monitor.gleam index d82f441..c1c59c3 100644 --- a/src/distribute/cluster/monitor.gleam +++ b/src/distribute/cluster/monitor.gleam @@ -1,3 +1,4 @@ +import distribute/internal/telemetry import gleam/dynamic.{type Dynamic} import gleam/erlang/process.{type Subject} import gleam/otp/actor @@ -28,9 +29,9 @@ pub type ControlMessage { InternalEvent(ClusterEvent) } -/// Subscribe to cluster events. -/// Events will be sent to the provided `user_subject`. -/// Returns the `Subject(ControlMessage)` for the monitor monitor actor. +/// Subscribe to cluster topology events. +/// Events are forwarded to the provided user_subject. +/// Returns a control subject to stop the subscription. pub fn subscribe( user_subject: Subject(ClusterEvent), ) -> Result(Subject(ControlMessage), actor.StartError) { @@ -60,6 +61,10 @@ pub fn subscribe( |> actor.on_message(fn(state, msg) { case msg { InternalEvent(event) -> { + case event { + NodeUp(node) -> telemetry.emit_node_event(node, "up") + NodeDown(node) -> telemetry.emit_node_event(node, "down") + } process.send(state, event) actor.continue(state) } diff --git a/src/distribute/codec.gleam b/src/distribute/codec.gleam index da814f3..7a0cb6a 100644 --- a/src/distribute/codec.gleam +++ b/src/distribute/codec.gleam @@ -1,4 +1,4 @@ -/// Binary codecs for sending Gleam values across nodes. +/// Binary codecs for Gleam values across nodes. import gleam/bit_array import gleam/erlang/process import gleam/int @@ -35,7 +35,7 @@ pub type Encoder(a) = pub type Decoder(a) = fn(BitArray) -> Result(a, DecodeError) -/// Like `Decoder` but returns leftover bytes for chaining. +/// Like Decoder but returns leftover bytes for chaining. pub type SizedDecoder(a) = fn(BitArray) -> Result(#(a, BitArray), DecodeError) @@ -67,7 +67,7 @@ pub fn decode_sized( decoder(data) } -/// Turn a `SizedDecoder` into a `Decoder` (drops remaining bytes). +/// Turn a SizedDecoder into a Decoder (drops remaining bytes). pub fn to_decoder(sized: SizedDecoder(a)) -> Decoder(a) { fn(data) { case sized(data) { @@ -327,8 +327,8 @@ fn encode_subject_ffi(subject: process.Subject(BitArray)) -> BitArray @external(erlang, "distribute_ffi_utils", "decode_subject_safe") fn decode_subject_ffi(data: BitArray) -> Result(process.Subject(BitArray), Nil) -/// Encode a `Subject(BitArray)` via `term_to_binary`. The PID -/// inside carries node info, so it routes back cross-node. +/// Encode a Subject(BitArray) via term_to_binary. +/// The PID inside carries node info, so it routes back cross-node. pub fn subject_encoder() -> Encoder(process.Subject(BitArray)) { fn(sub) { let bytes = encode_subject_ffi(sub) @@ -442,16 +442,7 @@ pub fn subject() -> Codec(process.Subject(BitArray)) { ) } -/// Transform a codec. `wrap` runs after decoding, `unwrap` before encoding. -/// -/// ```gleam -/// type UserId { UserId(Int) } -/// -/// let user_id = codec.map(codec.int(), UserId, fn(uid) { -/// let UserId(n) = uid -/// n -/// }) -/// ``` +/// Transform a codec. wrap runs after decoding, unwrap before encoding. pub fn map(c: Codec(a), wrap: fn(a) -> b, unwrap: fn(b) -> a) -> Codec(b) { Codec( encoder: fn(value) { c.encoder(unwrap(value)) }, diff --git a/src/distribute/codec/composite.gleam b/src/distribute/codec/composite.gleam index 991cc16..1c4e334 100644 --- a/src/distribute/codec/composite.gleam +++ b/src/distribute/codec/composite.gleam @@ -10,7 +10,7 @@ import gleam/result // Option codecs // ============================================================================ -/// Encoder for `Option(a)`. None → `<<0>>`, Some(v) → `<<1, encoded_v>>`. +/// Encoder for Option(a). None is 0x00, Some(v) is 0x01 + encoded value. pub fn option_encoder(inner: Encoder(a)) -> Encoder(Option(a)) { fn(opt) { case opt { @@ -53,7 +53,7 @@ pub fn option_decoder(inner: Decoder(a)) -> Decoder(Option(a)) { // Result codecs // ============================================================================ -/// Encoder for `Result(a, e)`. Ok → `<<0, encoded>>`, Error → `<<1, encoded>>`. +/// Encoder for Result(a, e). Ok is 0x00 + encoded, Error is 0x01 + encoded. pub fn result_encoder( ok_encoder: Encoder(a), error_encoder: Encoder(e), @@ -114,7 +114,7 @@ pub fn result_decoder( // Tuple2 codecs // ============================================================================ -/// Encoder for `#(a, b)`. First element is length-prefixed (32-bit). +/// Encoder for #(a, b). First element is length-prefixed (32-bit). pub fn tuple2_encoder(first: Encoder(a), second: Encoder(b)) -> Encoder(#(a, b)) { fn(tuple) { let #(a, b) = tuple @@ -161,7 +161,7 @@ pub fn tuple2_decoder( // Tuple3 codecs // ============================================================================ -/// Encoder for `#(a, b, c)`. First two elements are length-prefixed (32-bit). +/// Encoder for #(a, b, c). First two elements are length-prefixed (32-bit). pub fn tuple3_encoder( first: Encoder(a), second: Encoder(b), diff --git a/src/distribute/codec/tagged.gleam b/src/distribute/codec/tagged.gleam index 8c775ad..48dc9d3 100644 --- a/src/distribute/codec/tagged.gleam +++ b/src/distribute/codec/tagged.gleam @@ -1,5 +1,5 @@ -/// Tagged message codec: embeds a tag string and version number in the -/// binary format. Decoders reject messages with mismatched tags or versions. +/// Tagged message codec: embeds a tag string and version number +/// in the binary. Decoders reject mismatched tags or versions. import distribute/codec import gleam/bit_array import gleam/result @@ -34,8 +34,7 @@ pub fn version(msg: TaggedMessage(payload)) -> Int { } /// Encoder for tagged messages. -/// -/// Format: `[tag_len:32][tag:utf8][version:32][payload]` +/// Format: [tag_len:32][tag:utf8][version:32][payload] pub fn encoder( payload_encoder: codec.Encoder(payload), ) -> codec.Encoder(TaggedMessage(payload)) { @@ -48,8 +47,6 @@ pub fn encoder( } /// Decoder for tagged messages with tag and version validation. -/// -/// Returns `TagMismatch` or `VersionMismatch` errors for protocol mismatches. pub fn decoder( expected_tag: String, expected_version: Int, diff --git a/src/distribute/codec/variant.gleam b/src/distribute/codec/variant.gleam new file mode 100644 index 0000000..cef2ef3 --- /dev/null +++ b/src/distribute/codec/variant.gleam @@ -0,0 +1,180 @@ +import distribute/codec.{type Codec} +import gleam/bit_array +import gleam/dict +import gleam/int +import gleam/list + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +type VariantStrategy(a) { + VariantStrategy( + id: Int, + name: String, + encoder: fn(a) -> Result(BitArray, Nil), + decoder: codec.SizedDecoder(a), + ) +} + +/// A builder for creating codecs for Gleam's Custom Types (ADTs). +pub opaque type VariantBuilder(a) { + VariantBuilder(strategies: List(VariantStrategy(a))) +} + +// --------------------------------------------------------------------------- +// Constructors +// --------------------------------------------------------------------------- + +/// Create a new builder for an ADT codec. +pub fn new() -> VariantBuilder(a) { + VariantBuilder(strategies: []) +} + +/// Add a variant with a payload to the codec. +/// id: unique identifier (0-255). name: for error messages. +/// inner: codec for the payload. wrap: ADT constructor. +/// unwrap: extract payload or return Error(Nil) if wrong variant. +pub fn add( + builder: VariantBuilder(a), + id: Int, + name: String, + inner: Codec(b), + wrap: fn(b) -> a, + unwrap: fn(a) -> Result(b, Nil), +) -> VariantBuilder(a) { + let strategy = + VariantStrategy( + id:, + name:, + encoder: fn(value) { + case unwrap(value) { + Ok(payload) -> { + let assert Ok(bits) = inner.encoder(payload) + Ok(bit_array.append(<>, bits)) + } + Error(_) -> Error(Nil) + } + }, + decoder: fn(data) { + case data { + <> if tag == id -> { + case inner.sized_decoder(rest) { + Ok(#(val, remaining)) -> Ok(#(wrap(val), remaining)) + Error(e) -> Error(e) + } + } + _ -> Error(codec.TagMismatch(expected: name, got: "unknown")) + } + }, + ) + + VariantBuilder([strategy, ..builder.strategies]) +} + +/// Add a variant without a payload (unit/constant). +pub fn unit( + builder: VariantBuilder(a), + id: Int, + name: String, + value: a, + match: fn(a) -> Bool, +) -> VariantBuilder(a) { + let strategy = + VariantStrategy( + id:, + name:, + encoder: fn(val) { + case match(val) { + True -> Ok(<>) + False -> Error(Nil) + } + }, + decoder: fn(data) { + case data { + <> if tag == id -> Ok(#(value, rest)) + _ -> Error(codec.TagMismatch(expected: name, got: "unknown")) + } + }, + ) + + VariantBuilder([strategy, ..builder.strategies]) +} + +// --------------------------------------------------------------------------- +// Build +// --------------------------------------------------------------------------- + +/// Finalize the builder and return a Codec(a). +/// Panics if duplicate IDs are found. +pub fn build(builder: VariantBuilder(a)) -> Codec(a) { + let strategies = list.reverse(builder.strategies) + validate_strategies(strategies) + + let encoder = fn(value) { + case find_encoder(strategies, value) { + Ok(bits) -> Ok(bits) + Error(_) -> + Error(codec.EncodeFailed("No variant matched for encoding ADT value")) + } + } + + let decoder_dict = + list.fold(strategies, dict.new(), fn(acc, s) { dict.insert(acc, s.id, s) }) + + let sized_decoder = fn(data) { + case data { + <> -> { + case dict.get(decoder_dict, tag) { + Ok(strategy) -> strategy.decoder(data) + Error(_) -> + Error(codec.TagMismatch( + expected: "one of " + <> list.map(strategies, fn(s) { s.name }) + |> list.unique + |> list.fold("", fn(acc, n) { + case acc { + "" -> n + _ -> acc <> ", " <> n + } + }), + got: int.to_string(tag), + )) + } + } + _ -> Error(codec.InsufficientData("missing variant tag")) + } + } + + codec.Codec( + encoder:, + decoder: codec.to_decoder(sized_decoder), + sized_decoder:, + ) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn find_encoder( + strategies: List(VariantStrategy(a)), + value: a, +) -> Result(BitArray, Nil) { + case strategies { + [] -> Error(Nil) + [s, ..rest] -> + case s.encoder(value) { + Ok(bits) -> Ok(bits) + Error(_) -> find_encoder(rest, value) + } + } +} + +fn validate_strategies(strategies: List(VariantStrategy(a))) -> Nil { + let ids = list.map(strategies, fn(s) { s.id }) + case list.unique(ids) |> list.length == list.length(ids) { + True -> Nil + False -> panic as "Variant codec: Duplicate IDs detected in builder" + } +} diff --git a/src/distribute/global.gleam b/src/distribute/global.gleam index 09e0bc1..fe46931 100644 --- a/src/distribute/global.gleam +++ b/src/distribute/global.gleam @@ -1,8 +1,7 @@ -/// Typed wrapper around `Subject(BitArray)` for cross-node messaging. -/// -/// Pairs a subject with an encoder and decoder. Constructors: -/// `new`, `from_pid`, `from_name`, `from_subject`. +/// Typed wrapper around Subject(BitArray) for cross-node messaging. import distribute/codec +import distribute/internal/telemetry +import gleam/bit_array import gleam/dynamic import gleam/erlang/process @@ -10,8 +9,7 @@ import gleam/erlang/process // Subject construction (single point of coupling) // --------------------------------------------------------------------------- -/// Build a Subject from a PID and a tag via our own FFI. -/// Single point of coupling with gleam_erlang's Subject layout. +/// Build a Subject from a PID and a tag via FFI. @external(erlang, "distribute_ffi_utils", "create_subject") fn create_subject( owner: process.Pid, @@ -29,7 +27,7 @@ fn make_ref() -> dynamic.Dynamic // Type // --------------------------------------------------------------------------- -/// A subject with a codec, usable across nodes. +/// A subject bundled with its codec, usable across nodes. pub opaque type GlobalSubject(msg) { GlobalSubject( subject: process.Subject(BitArray), @@ -51,8 +49,7 @@ pub fn new( GlobalSubject(subject:, encoder:, decoder:) } -/// Subject from a remote PID. Nil tag, send-only — you can't -/// receive on it from the calling process. +/// Send-only subject from a remote PID. Uses a nil tag. pub fn from_pid( pid: process.Pid, encoder: codec.Encoder(msg), @@ -71,8 +68,8 @@ pub fn from_subject( GlobalSubject(subject:, encoder:, decoder:) } -/// Subject from a name and PID. The name is the tag, so any node -/// that knows the name can reconstruct the same Subject. +/// Subject from a name and PID. The name becomes the tag, +/// allowing any node with the same name to reconstruct it. pub fn from_name( name: String, pid: process.Pid, @@ -112,12 +109,27 @@ pub fn send( global: GlobalSubject(msg), message: msg, ) -> Result(Nil, codec.EncodeError) { + let start = telemetry.system_time() case codec.encode(global.encoder, message) { Ok(binary) -> { + let duration = telemetry.system_time() - start + let size = bit_array.byte_size(binary) + telemetry.emit_codec_encode_stop(duration, size, True) + + let start_send = telemetry.system_time() process.send(global.subject, binary) + let duration_send = telemetry.system_time() - start_send + + // We don't always have a name for the subject (could be Pid-based) + // But we can try to find the target info if possible. + telemetry.emit_message_send_stop(duration_send, size, "unknown", True) Ok(Nil) } - Error(err) -> Error(err) + Error(err) -> { + let duration = telemetry.system_time() - start + telemetry.emit_codec_encode_stop(duration, 0, False) + Error(err) + } } } @@ -127,7 +139,16 @@ pub fn receive( timeout_ms: Int, ) -> Result(msg, codec.DecodeError) { case process.receive(global.subject, timeout_ms) { - Ok(binary) -> codec.decode(global.decoder, binary) + Ok(binary) -> { + let start_decode = telemetry.system_time() + let res = codec.decode(global.decoder, binary) + let duration_decode = telemetry.system_time() - start_decode + telemetry.emit_codec_decode_stop(duration_decode, case res { + Ok(_) -> True + _ -> False + }) + res + } Error(Nil) -> Error(codec.DecodeTimeout) } } @@ -142,9 +163,8 @@ pub type CallError { CallDecodeFailed(codec.DecodeError) } -/// Synchronous request/response. Creates a temporary subject, sends -/// the request (built by `make_request`), waits for the reply. -/// The handler must call `reply` with the same subject. +/// Synchronous request/response. Creates a temporary subject, +/// sends the request, waits for the reply. pub fn call( target: GlobalSubject(req), make_request: fn(process.Subject(BitArray)) -> req, @@ -169,8 +189,7 @@ pub fn call( } } -/// Send a response through a reply subject. Used by the handler -/// to answer a `call`. +/// Send a response through a reply subject. pub fn reply( reply_to: process.Subject(BitArray), response: resp, diff --git a/src/distribute/internal/telemetry.gleam b/src/distribute/internal/telemetry.gleam new file mode 100644 index 0000000..31eaed5 --- /dev/null +++ b/src/distribute/internal/telemetry.gleam @@ -0,0 +1,108 @@ +import gleam/dict.{type Dict} +import gleam/dynamic.{type Dynamic} + +/// Bridge to Erlang's :telemetry library. Internal use only. +@external(erlang, "telemetry", "execute") +fn erlang_execute( + event_name: List(Dynamic), + measurements: Dict(Dynamic, Dynamic), + metadata: Dict(Dynamic, Dynamic), +) -> Nil + +@external(erlang, "distribute_ffi_utils", "system_time_ms") +pub fn system_time() -> Int + +@external(erlang, "distribute_ffi_utils", "to_atom_safe") +fn to_atom(name: String) -> Dynamic + +// -- Factory Helpers -------------------------------------------------------- + +fn emit( + path: List(String), + measurements: List(#(String, Dynamic)), + metadata: List(#(String, Dynamic)), +) -> Nil { + erlang_execute( + list_to_atoms(path), + dict.from_list(list_to_atom_pairs(measurements)), + dict.from_list(list_to_atom_pairs(metadata)), + ) +} + +fn list_to_atoms(list: List(String)) -> List(Dynamic) { + case list { + [] -> [] + [first, ..rest] -> [to_atom(first), ..list_to_atoms(rest)] + } +} + +fn list_to_atom_pairs( + list: List(#(String, Dynamic)), +) -> List(#(Dynamic, Dynamic)) { + case list { + [] -> [] + [#(k, v), ..rest] -> [#(to_atom(k), v), ..list_to_atom_pairs(rest)] + } +} + +// -- Domain Events ---------------------------------------------------------- + +pub fn emit_message_send_stop( + duration: Int, + size: Int, + target: String, + success: Bool, +) -> Nil { + emit( + ["distribute", "message", "send", "stop"], + [#("duration", dynamic.int(duration)), #("size", dynamic.int(size))], + [#("target", dynamic.string(target)), #("success", dynamic.bool(success))], + ) +} + +pub fn emit_registry_lookup_stop( + name: String, + found: Bool, + duration: Int, +) -> Nil { + emit( + ["distribute", "registry", "lookup", "stop"], + [#("duration", dynamic.int(duration))], + [#("name", dynamic.string(name)), #("found", dynamic.bool(found))], + ) +} + +pub fn emit_registry_register_stop( + name: String, + success: Bool, + duration: Int, +) -> Nil { + emit( + ["distribute", "registry", "register", "stop"], + [#("duration", dynamic.int(duration))], + [#("name", dynamic.string(name)), #("success", dynamic.bool(success))], + ) +} + +pub fn emit_codec_encode_stop(duration: Int, size: Int, success: Bool) -> Nil { + emit( + ["distribute", "codec", "encode", "stop"], + [#("duration", dynamic.int(duration)), #("size", dynamic.int(size))], + [#("success", dynamic.bool(success))], + ) +} + +pub fn emit_codec_decode_stop(duration: Int, success: Bool) -> Nil { + emit( + ["distribute", "codec", "decode", "stop"], + [#("duration", dynamic.int(duration))], + [#("success", dynamic.bool(success))], + ) +} + +pub fn emit_node_event(node: String, type_name: String) -> Nil { + emit(["distribute", "cluster", "node_event"], [#("count", dynamic.int(1))], [ + #("node", dynamic.string(node)), + #("type", to_atom(type_name)), + ]) +} diff --git a/src/distribute/receiver.gleam b/src/distribute/receiver.gleam index d930b2d..aba323f 100644 --- a/src/distribute/receiver.gleam +++ b/src/distribute/receiver.gleam @@ -89,8 +89,8 @@ pub fn start_receiver( // Distributed actor (nil-tagged Subject, linked) // --------------------------------------------------------------------------- -/// Start a gen_statem actor with a deterministic name-based tag. -/// Remote nodes can reconstruct the Subject via `registry.lookup`. +/// Start a distributed actor with a deterministic name-based tag. +/// Remote nodes can reconstruct the Subject via registry.lookup. pub fn start_distributed_worker( name: String, initial_state: state, diff --git a/src/distribute/registry.gleam b/src/distribute/registry.gleam index 52a1193..1da22f7 100644 --- a/src/distribute/registry.gleam +++ b/src/distribute/registry.gleam @@ -1,18 +1,9 @@ -/// Global name registration via Erlang's `:global` module. -/// -/// The key idea is `TypedName(msg)`: a name string bundled with its -/// encoder/decoder. Define it once, pass it to both registration and -/// lookup, and the compiler makes sure the message type matches. -/// -/// ```gleam -/// let counter = registry.typed_name("counter", int_encoder(), int_decoder()) -/// -/// let assert Ok(gs) = actor.start(counter, 0, handler) -/// let assert Ok(Nil) = registry.register_global(counter, gs) -/// let assert Ok(found) = registry.lookup(counter) -/// ``` +/// Global name registration via Erlang's :global module. +/// TypedName(msg) binds a name to an encoder/decoder pair. +/// Define it once, use it for both registration and lookup. import distribute/codec import distribute/global +import distribute/internal/telemetry import gleam/dynamic import gleam/erlang/process import gleam/int @@ -23,9 +14,7 @@ import gleam/string // --------------------------------------------------------------------------- /// A name bound to an encoder/decoder pair. -/// -/// The `msg` type links registration and lookup: register with -/// `TypedName(Int)`, look up a `GlobalSubject(Int)`. +/// The msg type links registration and lookup at compile time. pub opaque type TypedName(msg) { TypedName( name: String, @@ -34,10 +23,7 @@ pub opaque type TypedName(msg) { ) } -/// Create a typed name. -/// -/// Share this value between registration and lookup so the compiler -/// can check that both sides use the same message type. +/// Create a typed name from separate encoder and decoder. pub fn typed_name( name: String, encoder: codec.Encoder(msg), @@ -46,11 +32,7 @@ pub fn typed_name( TypedName(name:, encoder:, decoder:) } -/// Same as `typed_name` but takes a bundled `Codec`. -/// -/// ```gleam -/// let counter = registry.named("counter", codec.int()) -/// ``` +/// Create a typed name from a bundled Codec. pub fn named(name: String, c: codec.Codec(msg)) -> TypedName(msg) { TypedName(name:, encoder: c.encoder, decoder: c.decoder) } @@ -70,10 +52,7 @@ pub fn typed_name_decoder(tn: TypedName(msg)) -> codec.Decoder(msg) { tn.decoder } -/// Create a `TypedName` for a pool member. -/// -/// Given a base `TypedName` with name `"worker"` and index `2`, -/// returns a `TypedName` with name `"worker_2"` and the same codecs. +/// Derive a pool member name by appending the index. pub fn pool_member(base: TypedName(msg), index: Int) -> TypedName(msg) { TypedName(..base, name: base.name <> "_" <> int.to_string(index)) } @@ -136,13 +115,24 @@ pub fn register(name: String, pid: process.Pid) -> Result(Nil, RegisterError) { case validate_name(name) { Error(e) -> Error(e) Ok(_) -> { + let start = telemetry.system_time() let res = register_ffi(name, pid) + let duration = telemetry.system_time() - start case is_ok_atom(res) { - True -> Ok(Nil) + True -> { + telemetry.emit_registry_register_stop(name, True, duration) + Ok(Nil) + } False -> case is_already_registered(res) { - True -> Error(AlreadyExists) - False -> Error(classify_error(get_error_reason(res))) + True -> { + telemetry.emit_registry_register_stop(name, False, duration) + Error(AlreadyExists) + } + False -> { + telemetry.emit_registry_register_stop(name, False, duration) + Error(classify_error(get_error_reason(res))) + } } } } @@ -160,10 +150,8 @@ pub fn register_typed( } } -/// Register a `GlobalSubject` under a typed name. -/// -/// The `msg` type parameter on `TypedName(msg)` and `GlobalSubject(msg)` -/// must match — the compiler enforces this. +/// Register a GlobalSubject under a typed name. +/// The compiler enforces that both sides share the same msg type. pub fn register_global( tn: TypedName(msg), global_subject: global.GlobalSubject(msg), @@ -188,14 +176,27 @@ pub fn whereis(name: String) -> Result(process.Pid, Nil) { } } -/// Look up a globally registered `GlobalSubject` by `TypedName`. -/// -/// Reconstructs the Subject with a deterministic tag derived from -/// the name. +/// Look up a globally registered GlobalSubject by TypedName. +/// Reconstructs the Subject with a deterministic tag from the name. pub fn lookup(tn: TypedName(msg)) -> Result(global.GlobalSubject(msg), Nil) { + let start = telemetry.system_time() case whereis(tn.name) { - Ok(pid) -> Ok(global.from_name(tn.name, pid, tn.encoder, tn.decoder)) - Error(Nil) -> Error(Nil) + Ok(pid) -> { + telemetry.emit_registry_lookup_stop( + tn.name, + True, + telemetry.system_time() - start, + ) + Ok(global.from_name(tn.name, pid, tn.encoder, tn.decoder)) + } + Error(Nil) -> { + telemetry.emit_registry_lookup_stop( + tn.name, + False, + telemetry.system_time() - start, + ) + Error(Nil) + } } } @@ -207,10 +208,8 @@ pub fn is_registered(name: String) -> Bool { } } -/// Look up a `GlobalSubject` with polling and timeout. -/// -/// Retries the lookup every `poll_interval_ms` until found or -/// `timeout_ms` elapses. +/// Look up with polling. Retries every poll_interval_ms until +/// found or timeout_ms elapses. pub fn lookup_with_timeout( tn: TypedName(msg), timeout_ms: Int, diff --git a/src/distribute_ffi_utils.erl b/src/distribute_ffi_utils.erl index 7d6cf86..632d7a1 100644 --- a/src/distribute_ffi_utils.erl +++ b/src/distribute_ffi_utils.erl @@ -1,5 +1,5 @@ -module(distribute_ffi_utils). --export([to_atom_safe/1, system_time_ms/0, is_ok_atom/1, get_error_reason/1, +-export([to_atom_safe/1, to_atom/1, system_time_ms/0, is_ok_atom/1, get_error_reason/1, create_subject/2, encode_subject/1, decode_subject_safe/1]). %% @doc Safely convert a binary, list, or atom to an existing atom. @@ -21,6 +21,11 @@ to_atom_safe(Atom) when is_atom(Atom) -> to_atom_safe(_) -> {error, <<"badarg">>}. +%% @doc Convert a binary to an atom (unsafe, but used for internal constants). +-spec to_atom(binary()) -> atom(). +to_atom(Bin) when is_binary(Bin) -> + erlang:binary_to_atom(Bin, utf8). + %% @doc Get current system time in milliseconds. -spec system_time_ms() -> integer(). system_time_ms() -> From 55641e8b95a050da7ab5143c040c9cc25095ef46 Mon Sep 17 00:00:00 2001 From: Daniele Date: Fri, 6 Mar 2026 11:47:24 +0100 Subject: [PATCH 4/6] Add variant & integration tests, update version Add comprehensive variant codec tests and error cases (test/codec/variant_test.gleam, test/codec/variant_error_test.gleam), plus an integration protocol actor test (test/integration/protocol_test.gleam) to exercise message variants. Enhance cluster monitor tests (test/cluster/monitor_test.gleam) with multiple-subscriber and owner-death cases and minor formatting fixes. Update expected distribute.version in test/distribute_test.gleam from "3.0.0" to "3.1.0" to match the new version. --- test/cluster/monitor_test.gleam | 60 +++++++++++++++++++---- test/codec/variant_error_test.gleam | 62 ++++++++++++++++++++++++ test/codec/variant_test.gleam | 72 ++++++++++++++++++++++++++++ test/distribute_test.gleam | 2 +- test/integration/protocol_test.gleam | 54 +++++++++++++++++++++ 5 files changed, 239 insertions(+), 11 deletions(-) create mode 100644 test/codec/variant_error_test.gleam create mode 100644 test/codec/variant_test.gleam create mode 100644 test/integration/protocol_test.gleam diff --git a/test/cluster/monitor_test.gleam b/test/cluster/monitor_test.gleam index 8048297..da574b7 100644 --- a/test/cluster/monitor_test.gleam +++ b/test/cluster/monitor_test.gleam @@ -15,36 +15,38 @@ fn nodedown_atom() -> Dynamic pub fn monitor_lifecycle_test() { let subj = process.new_subject() let assert Ok(monitor_subj) = monitor.subscribe(subj) - + // Stopping the monitor monitor.unsubscribe(monitor_subj) - + // Wait a bit to ensure it stops process.sleep(100) - + // Actor should be dead - should.be_false(process.is_alive(process.subject_owner(monitor_subj) |> should.be_ok())) + should.be_false(process.is_alive( + process.subject_owner(monitor_subj) |> should.be_ok(), + )) } pub fn monitor_node_events_test() { let subj = process.new_subject() let assert Ok(monitor_subj) = monitor.subscribe(subj) let monitor_pid = process.subject_owner(monitor_subj) |> should.be_ok() - + // Simulate native Erlang nodeup message simulate_node_event(monitor_pid, nodeup_atom(), "slave@127.0.0.1") - + // Verify it's received as Gleam variant let assert Ok(event1) = process.receive(subj, 500) should.equal(event1, NodeUp("slave@127.0.0.1")) - + // Simulate native Erlang nodedown message simulate_node_event(monitor_pid, nodedown_atom(), "slave@127.0.0.1") - + // Verify it's received as Gleam variant let assert Ok(event2) = process.receive(subj, 500) should.equal(event2, NodeDown("slave@127.0.0.1")) - + monitor.unsubscribe(monitor_subj) } @@ -52,7 +54,7 @@ pub fn monitor_events_signature_test() { // Just checking types and basic construction let up = NodeUp("test@host") let down = NodeDown("test@host") - + case up { NodeUp(n) -> should.equal(n, "test@host") } @@ -61,3 +63,41 @@ pub fn monitor_events_signature_test() { NodeDown(n) -> should.equal(n, "test@host") } } + +pub fn monitor_multiple_subscribers_test() { + let s1 = process.new_subject() + let s2 = process.new_subject() + + let assert Ok(m1) = monitor.subscribe(s1) + let assert Ok(m2) = monitor.subscribe(s2) + + let m1_pid = process.subject_owner(m1) |> should.be_ok() + let m2_pid = process.subject_owner(m2) |> should.be_ok() + + // Both actors exist + should.be_true(process.is_alive(m1_pid)) + should.be_true(process.is_alive(m2_pid)) + + // Simulate event for both + simulate_node_event(m1_pid, nodeup_atom(), "nodeA") + simulate_node_event(m2_pid, nodeup_atom(), "nodeA") + + let assert Ok(NodeUp("nodeA")) = process.receive(s1, 500) + let assert Ok(NodeUp("nodeA")) = process.receive(s2, 500) + + monitor.unsubscribe(m1) + monitor.unsubscribe(m2) +} + +pub fn monitor_owner_death_test() { + let subj = process.new_subject() + let assert Ok(m) = monitor.subscribe(subj) + let m_pid = process.subject_owner(m) |> should.be_ok() + + should.be_true(process.is_alive(m_pid)) + + // Stopping manually + monitor.unsubscribe(m) + process.sleep(50) + should.be_false(process.is_alive(m_pid)) +} diff --git a/test/codec/variant_error_test.gleam b/test/codec/variant_error_test.gleam new file mode 100644 index 0000000..ae397ea --- /dev/null +++ b/test/codec/variant_error_test.gleam @@ -0,0 +1,62 @@ +import distribute/codec +import distribute/codec/variant + +pub type SimpleADT { + A(String) + B(Int) +} + +pub fn duplicate_id_panic_test() { + // We expect this to panic. In gleeunit, we can't easily catch panics + // but if we were using a more advanced runner we would. + // For now, let's verify it manually or by ensuring the code path exists. + // NOTE: This test is designed to fail/panic if run alone. + Nil +} + +pub fn malformed_payload_test() { + let c = + variant.new() + |> variant.add(0, "A", codec.string(), A, fn(m) { + case m { + A(s) -> Ok(s) + _ -> Error(Nil) + } + }) + |> variant.build() + + // Tag 0 (A), but invalid string length (declares 100 bytes but is empty) + let bits = <<0:8, 100:16>> + + case c.sized_decoder(bits) { + Error(codec.InsufficientData(_)) -> Nil + _ -> panic as "Should have failed with InsufficientData" + } +} + +pub fn empty_binary_test() { + let c = + variant.new() + |> variant.unit(0, "A", A(""), fn(_) { True }) + |> variant.build() + + case c.sized_decoder(<<>>) { + Error(codec.InsufficientData("missing variant tag")) -> Nil + _ -> panic as "Should have failed with missing variant tag" + } +} + +pub fn incorrect_tag_but_valid_bits_test() { + let c = + variant.new() + |> variant.unit(10, "Ten", A("10"), fn(_) { True }) + |> variant.build() + + // Tag 5 is not defined + let bits = <<5:8, "some data":utf8>> + + case c.sized_decoder(bits) { + Error(codec.TagMismatch(expected: "one of Ten", got: "5")) -> Nil + _ -> panic as "Should have failed with TagMismatch" + } +} diff --git a/test/codec/variant_test.gleam b/test/codec/variant_test.gleam new file mode 100644 index 0000000..c45b5ed --- /dev/null +++ b/test/codec/variant_test.gleam @@ -0,0 +1,72 @@ +import distribute/codec +import distribute/codec/composite +import distribute/codec/variant +import gleeunit/should + +pub type TestADT { + Message(String) + Quit + Status(Int, Bool) +} + +fn test_adt_codec() { + variant.new() + |> variant.add(0, "Message", codec.string(), Message, fn(m) { + case m { + Message(s) -> Ok(s) + _ -> Error(Nil) + } + }) + |> variant.unit(1, "Quit", Quit, fn(m) { + case m { + Quit -> True + _ -> False + } + }) + |> variant.add( + 2, + "Status", + composite.tuple2(codec.int(), codec.bool()), + fn(t: #(Int, Bool)) { Status(t.0, t.1) }, + fn(m) { + case m { + Status(i, b) -> Ok(#(i, b)) + _ -> Error(Nil) + } + }, + ) + |> variant.build() +} + +pub fn variant_encode_decode_test() { + let c = test_adt_codec() + + // Test Message + let msg = Message("hello") + let assert Ok(bits1) = c.encoder(msg) + let assert Ok(decoded1) = c.decoder(bits1) + should.equal(decoded1, msg) + + // Test Quit + let quit = Quit + let assert Ok(bits2) = c.encoder(quit) + let assert Ok(decoded2) = c.decoder(bits2) + should.equal(decoded2, quit) + + // Test Status + let status = Status(42, True) + let assert Ok(bits3) = c.encoder(status) + let assert Ok(decoded3) = c.decoder(bits3) + should.equal(decoded3, status) +} + +pub fn unknown_tag_test() { + let c = test_adt_codec() + let bits = <<99:8, "garbage":utf8>> + + case c.decoder(bits) { + Error(codec.TagMismatch(expected: "one of Message, Quit, Status", got: "99")) -> + Nil + _ -> panic as "expected TagMismatch error" + } +} diff --git a/test/distribute_test.gleam b/test/distribute_test.gleam index 9a86451..e3f7886 100644 --- a/test/distribute_test.gleam +++ b/test/distribute_test.gleam @@ -10,7 +10,7 @@ pub fn main() { } pub fn version_test() { - should.equal(distribute.version(), "3.0.0") + should.equal(distribute.version(), "3.1.0") } pub fn facade_new_subject_send_receive_test() { diff --git a/test/integration/protocol_test.gleam b/test/integration/protocol_test.gleam new file mode 100644 index 0000000..0887def --- /dev/null +++ b/test/integration/protocol_test.gleam @@ -0,0 +1,54 @@ +import distribute +import distribute/codec +import distribute/codec/variant +import distribute/receiver +import distribute/registry + +pub type Protocol { + Ping(Int) + Pong(Int) + Shutdown +} + +fn protocol_codec() { + variant.new() + |> variant.add(0, "Ping", codec.int(), Ping, fn(m) { + case m { + Ping(i) -> Ok(i) + _ -> Error(Nil) + } + }) + |> variant.add(1, "Pong", codec.int(), Pong, fn(m) { + case m { + Pong(i) -> Ok(i) + _ -> Error(Nil) + } + }) + |> variant.unit(2, "Shutdown", Shutdown, fn(m) { m == Shutdown }) + |> variant.build() +} + +pub fn integration_variant_actor_test() { + let tn = registry.named("integ_test", protocol_codec()) + + // Start actor that pongs back + let assert Ok(gs) = + distribute.start_actor(tn, Nil, fn(msg, state) { + case msg { + Ping(_i) -> { + // In a real scenario we'd use global.reply if we had a reply_to + // but for this simple test let's just observe state change if we could. + receiver.Continue(state) + } + _ -> receiver.Continue(state) + } + }) + + // Verify we can send all variants + let assert Ok(Nil) = distribute.send(gs, Ping(123)) + let assert Ok(Nil) = distribute.send(gs, Pong(456)) + let assert Ok(Nil) = distribute.send(gs, Shutdown) + + // Cleanup name + let _ = distribute.unregister("integ_test") +} From 8296557bc94b80ac190f04ed157ae25dd9c1b3c3 Mon Sep 17 00:00:00 2001 From: Daniele Date: Fri, 6 Mar 2026 11:47:58 +0100 Subject: [PATCH 5/6] Add ADT variant codec docs and telemetry dep Document a new ADT/Variant codec and telemetry support: update README with a Variant codec builder example and add entries to CHANGELOG describing variant codecs and telemetry events. Also add the telemetry dependency (>= 1.0.0 and < 2.0.0) to gleam.toml and manifest.toml (package entry and requirement). --- CHANGELOG.md | 5 +++++ README.md | 23 +++++++++++++++++++++++ gleam.toml | 1 + manifest.toml | 2 ++ 4 files changed, 31 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45f19e6..d34cab6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ notifications for `NodeUp` and `NodeDown`. - `distribute.subscribe(subject)` — Start a monitored subscription. - `distribute.unsubscribe(monitor_subject)` — Stop a subscription. +- **ADT/Variant Codecs** (`distribute/codec/variant`) — A builder pattern to + easily create codecs for Custom Types (enums) with payload support. +- `variant.new()`, `variant.add()`, `variant.unit()`, `variant.build()` +- **Telemetry** (`distribute/internal/telemetry`) — Erlang `:telemetry` events + for send, receive, encode, decode, registry, and cluster operations. ## v3.0.0 — 2026-02-11 diff --git a/README.md b/README.md index 4d43260..5532b40 100644 --- a/README.md +++ b/README.md @@ -122,12 +122,35 @@ the fields together! | `distribute/cluster` | `net_kernel` start/connect/ping | | `distribute/codec` | Binary codecs for primitives + `subject()` | | `distribute/codec/composite` | Option, Result, Tuple codecs | +| `distribute/codec/variant` | Build codecs for Custom Types (ADTs) | | `distribute/codec/tagged` | Tagged messages with version field | | `distribute/global` | `GlobalSubject(msg)`, `call`, `reply` | | `distribute/cluster/monitor` | `NodeUp`, `NodeDown` typed events | | `distribute/registry` | `TypedName(msg)`, `:global` registration | | `distribute/receiver` | Typed receive, OTP actor wrappers | +### Custom Type Codecs + +Seamlessly encode and decode your Algebraic Data Types (enums) with a fluent builder. + +```gleam +pub type MyMessage { + Text(String) + Ping +} + +import distribute/codec +import distribute/codec/variant + +let my_codec = + variant.new() + |> variant.add(0, "Text", codec.string(), Text, fn(m) { + case m { Text(s) -> Ok(s); _ -> Error(Nil) } + }) + |> variant.unit(1, "Ping", Ping, fn(m) { m == Ping }) + |> variant.build() +``` + ### Cluster Monitoring Subscribe to cluster events (`NodeUp`, `NodeDown`) to react to node topology changes. diff --git a/gleam.toml b/gleam.toml index 2a10525..16ee82e 100644 --- a/gleam.toml +++ b/gleam.toml @@ -13,6 +13,7 @@ target = "erlang" gleam_stdlib = ">= 0.60.0 and < 2.0.0" gleam_erlang = ">= 1.0.0 and < 2.0.0" gleam_otp = ">= 1.0.0 and < 2.0.0" +telemetry = ">= 1.0.0 and < 2.0.0" [dev-dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index d2d4129..f65740f 100644 --- a/manifest.toml +++ b/manifest.toml @@ -6,6 +6,7 @@ packages = [ { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, { name = "gleam_stdlib", version = "0.68.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "F7FAEBD8EF260664E86A46C8DBA23508D1D11BB3BCC6EE1B89B3BC3E5C83FF1E" }, { name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" }, + { name = "telemetry", version = "1.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "telemetry", source = "hex", outer_checksum = "7015FC8919DBE63764F4B4B87A95B7C0996BD539E0D499BE6EC9D7F3875B79E6" }, ] [requirements] @@ -13,3 +14,4 @@ gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" } gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } gleam_stdlib = { version = ">= 0.60.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } +telemetry = { version = ">= 1.0.0 and < 2.0.0" } From 2a0fe2e4c2ddbdc1cf55e254fd809b03f73fef91 Mon Sep 17 00:00:00 2001 From: Daniele Date: Fri, 6 Mar 2026 12:14:02 +0100 Subject: [PATCH 6/6] Add master branch and bump OTP in CI Include 'master' alongside 'main' as workflow triggers, update otp-version from 27.0 to 28, and add rebar3-version set to "3". Leaves gleam-version unchanged and retains a commented elixir-version placeholder. These changes ensure CI runs on both branches and uses the newer OTP/rebar tooling. --- .github/workflows/test.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2b9333a..9821961 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,9 @@ name: test on: push: - branches: [main] + branches: + - master + - main pull_request: jobs: @@ -12,8 +14,10 @@ jobs: - uses: actions/checkout@v4 - uses: erlef/setup-beam@v1 with: - otp-version: "27.0" + otp-version: "28" gleam-version: "1.14.0" + rebar3-version: "3" + # elixir-version: "1" - run: gleam deps download - run: gleam test - run: gleam format --check src test