Conversation
Improve wording in README: replace the em dash phrasing with "so" and change the period to an exclamation mark to make the sentence read more naturally and upbeat when describing manual codecs and wiring fields.
Introduce a typed cluster monitoring feature: new module distribute/cluster/monitor.gleam provides subscribe/unsubscribe and ClusterEvent variants (NodeUp/NodeDown) and runs an OTP actor that selects native node up/down messages. Add corresponding Erlang FFI helpers in src/cluster_ffi.erl and tests in test/cluster/monitor_test.gleam that simulate node events. Bump package version to 3.1.0 and document the feature in README and CHANGELOG.
Introduce internal telemetry and a variant codec, and wire telemetry into the distribute subsystem. - Add src/distribute/internal/telemetry.gleam: a thin bridge to Erlang :telemetry with helpers and specific emitters (message send, registry lookup/register, codec encode/decode, node events). - Add src/distribute/codec/variant.gleam: a VariantBuilder to construct codecs for ADTs (variant tagging, encoding/decoding, validation). - Instrument distribution code to emit telemetry: global.gleam (encode/decode/send timings and sizes), registry.gleam (lookup/register timings and outcomes), cluster/monitor.gleam (node up/down events). Also import telemetry where needed. - Minor API and docs cleanup across several files (formatting, docstring tweaks) and small ergonomic changes (unsubscribe formatting, receiver/actor comments). - Update src/distribute_ffi_utils.erl to export and implement to_atom/1 and adjust exports for telemetry support. These changes add observability for distributed operations and provide a reusable variant codec for ADT serialization. Unit/behavior semantics preserved; duplicate variant IDs are validated at build time.
Add comprehensive variant codec tests and error cases (test/codec/variant_test.gleam, test/codec/variant_error_test.gleam), plus an integration protocol actor test (test/integration/protocol_test.gleam) to exercise message variants. Enhance cluster monitor tests (test/cluster/monitor_test.gleam) with multiple-subscriber and owner-death cases and minor formatting fixes. Update expected distribute.version in test/distribute_test.gleam from "3.0.0" to "3.1.0" to match the new version.
Document a new ADT/Variant codec and telemetry support: update README with a Variant codec builder example and add entries to CHANGELOG describing variant codecs and telemetry events. Also add the telemetry dependency (>= 1.0.0 and < 2.0.0) to gleam.toml and manifest.toml (package entry and requirement).
Include 'master' alongside 'main' as workflow triggers, update otp-version from 27.0 to 28, and add rebar3-version set to "3". Leaves gleam-version unchanged and retains a commented elixir-version placeholder. These changes ensure CI runs on both branches and uses the newer OTP/rebar tooling.
There was a problem hiding this comment.
Pull request overview
Implements the distribute library v3.1.0 feature set by adding cluster node monitoring, a builder-based ADT/variant codec, and internal :telemetry instrumentation, along with version bumps and documentation updates.
Changes:
- Added cluster monitoring (
distribute/cluster/monitor) withsubscribe/unsubscribeand Erlang FFI helpers. - Added ADT/variant codec builder (
distribute/codec/variant) plus new tests and an integration test. - Added internal telemetry emission and integrated it into registry/global send/receive paths; bumped version/dependencies/docs.
Reviewed changes
Copilot reviewed 23 out of 25 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
src/distribute/cluster/monitor.gleam |
New module implementing typed NodeUp/NodeDown subscription actor. |
src/cluster_ffi.erl |
Adds Erlang-side helpers for monitoring and test simulation. |
src/distribute/codec/variant.gleam |
New variant codec builder for ADTs/enums. |
src/distribute/internal/telemetry.gleam |
New internal wrapper for emitting :telemetry events. |
src/distribute/global.gleam |
Emits telemetry around encode/send and decode on receive. |
src/distribute/registry.gleam |
Emits telemetry around registry register/lookup. |
src/distribute_ffi_utils.erl |
Adds atom conversion helper and continues to provide time/subject helpers. |
src/distribute.gleam |
Bumps facade version and exposes monitoring APIs. |
src/distribute/codec.gleam |
Docstring clarifications/formatting updates. |
src/distribute/codec/composite.gleam |
Docstring clarifications/formatting updates. |
src/distribute/codec/tagged.gleam |
Docstring clarifications/formatting updates. |
src/distribute/receiver.gleam |
Docstring clarity updates. |
src/distribute/cluster.gleam |
Docstring clarity updates. |
src/distribute/actor.gleam |
Docstring clarity updates. |
test/cluster/monitor_test.gleam |
New tests for monitoring subscription and event translation. |
test/codec/variant_test.gleam |
New tests for variant codec encode/decode and unknown tag handling. |
test/codec/variant_error_test.gleam |
New tests targeting variant codec error paths. |
test/integration/protocol_test.gleam |
New integration test exercising variant codec in an actor setup. |
test/distribute_test.gleam |
Updates expected library version string. |
gleam.toml |
Bumps version to 3.1.0 and adds telemetry dependency. |
manifest.toml |
Adds telemetry package entry. |
README.md |
Documents new modules and includes usage examples. |
CHANGELOG.md |
Adds v3.1.0 release notes. |
.gitignore |
Ignores /notes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub fn duplicate_id_panic_test() { | ||
| // We expect this to panic. In gleeunit, we can't easily catch panics | ||
| // but if we were using a more advanced runner we would. | ||
| // For now, let's verify it manually or by ensuring the code path exists. | ||
| // NOTE: This test is designed to fail/panic if run alone. | ||
| Nil | ||
| } | ||
|
|
There was a problem hiding this comment.
duplicate_id_panic_test currently doesn’t exercise any behavior (it just returns Nil) and leaves a note that it would panic “if run alone”. As written it will always pass while not asserting anything. Either implement an actual assertion for the duplicate-id panic (using whatever panic-catching support the test framework provides) or remove this placeholder to avoid a misleading test suite.
| pub fn duplicate_id_panic_test() { | |
| // We expect this to panic. In gleeunit, we can't easily catch panics | |
| // but if we were using a more advanced runner we would. | |
| // For now, let's verify it manually or by ensuring the code path exists. | |
| // NOTE: This test is designed to fail/panic if run alone. | |
| Nil | |
| } |
| @external(erlang, "distribute_ffi_utils", "system_time_ms") | ||
| pub fn system_time() -> Int | ||
|
|
||
| @external(erlang, "distribute_ffi_utils", "to_atom_safe") |
There was a problem hiding this comment.
to_atom is bound to distribute_ffi_utils:to_atom_safe/1, which returns {ok, Atom} / {error, Reason} tuples. :telemetry.execute/3 expects the event name and dict keys to be atoms, so passing these tuples will break event emission (likely badarg). Consider switching this binding to the new unsafe distribute_ffi_utils:to_atom/1 (since the inputs here are library constants), or change the Gleam type to Result and handle errors before calling erlang_execute.
| @external(erlang, "distribute_ffi_utils", "to_atom_safe") | |
| @external(erlang, "distribute_ffi_utils", "to_atom") |
| let assert Ok(bits) = inner.encoder(payload) | ||
| Ok(bit_array.append(<<id:8>>, bits)) |
There was a problem hiding this comment.
The encoder strategy uses let assert Ok(bits) = inner.encoder(payload), which will panic if the inner codec fails. That turns an expected Result failure into a process crash and also prevents build/1 from returning a meaningful EncodeError. Handle the Error case from inner.encoder explicitly and propagate it (or redesign VariantStrategy.encoder so it can represent both “wrong variant” and “encode failed”).
| let assert Ok(bits) = inner.encoder(payload) | |
| Ok(bit_array.append(<<id:8>>, bits)) | |
| case inner.encoder(payload) { | |
| Ok(bits) -> Ok(bit_array.append(<<id:8>>, bits)) | |
| Error(_) -> Error(Nil) | |
| } |
| let strategy = | ||
| VariantStrategy( | ||
| id:, | ||
| name:, | ||
| encoder: fn(value) { | ||
| case unwrap(value) { | ||
| Ok(payload) -> { | ||
| let assert Ok(bits) = inner.encoder(payload) | ||
| Ok(bit_array.append(<<id:8>>, bits)) | ||
| } | ||
| Error(_) -> Error(Nil) | ||
| } | ||
| }, | ||
| decoder: fn(data) { | ||
| case data { | ||
| <<tag:8, rest:bits>> if tag == id -> { | ||
| case inner.sized_decoder(rest) { | ||
| Ok(#(val, remaining)) -> Ok(#(wrap(val), remaining)) | ||
| Error(e) -> Error(e) | ||
| } | ||
| } | ||
| _ -> Error(codec.TagMismatch(expected: name, got: "unknown")) | ||
| } | ||
| }, | ||
| ) | ||
|
|
||
| VariantBuilder([strategy, ..builder.strategies]) |
There was a problem hiding this comment.
id is documented as 0–255, but it’s not validated before being encoded as <<id:8>>. Passing an out-of-range id will raise at runtime. Consider validating the range in add/unit (or once in build) and returning a clear panic/error message early.
| let strategy = | |
| VariantStrategy( | |
| id:, | |
| name:, | |
| encoder: fn(value) { | |
| case unwrap(value) { | |
| Ok(payload) -> { | |
| let assert Ok(bits) = inner.encoder(payload) | |
| Ok(bit_array.append(<<id:8>>, bits)) | |
| } | |
| Error(_) -> Error(Nil) | |
| } | |
| }, | |
| decoder: fn(data) { | |
| case data { | |
| <<tag:8, rest:bits>> if tag == id -> { | |
| case inner.sized_decoder(rest) { | |
| Ok(#(val, remaining)) -> Ok(#(wrap(val), remaining)) | |
| Error(e) -> Error(e) | |
| } | |
| } | |
| _ -> Error(codec.TagMismatch(expected: name, got: "unknown")) | |
| } | |
| }, | |
| ) | |
| VariantBuilder([strategy, ..builder.strategies]) | |
| case id { | |
| invalid if invalid < 0 || invalid > 255 -> | |
| panic("Variant id must be in range 0-255, got " <> int.to_string(id)) | |
| _valid -> | |
| let strategy = | |
| VariantStrategy( | |
| id:, | |
| name:, | |
| encoder: fn(value) { | |
| case unwrap(value) { | |
| Ok(payload) -> { | |
| let assert Ok(bits) = inner.encoder(payload) | |
| Ok(bit_array.append(<<id:8>>, bits)) | |
| } | |
| Error(_) -> Error(Nil) | |
| } | |
| }, | |
| decoder: fn(data) { | |
| case data { | |
| <<tag:8, rest:bits>> if tag == id -> { | |
| case inner.sized_decoder(rest) { | |
| Ok(#(val, remaining)) -> Ok(#(wrap(val), remaining)) | |
| Error(e) -> Error(e) | |
| } | |
| } | |
| _ -> Error(codec.TagMismatch(expected: name, got: "unknown")) | |
| } | |
| }, | |
| ) | |
| VariantBuilder([strategy, ..builder.strategies]) | |
| } |
| let strategy = | ||
| VariantStrategy( | ||
| id:, | ||
| name:, | ||
| encoder: fn(val) { | ||
| case match(val) { | ||
| True -> Ok(<<id:8>>) | ||
| False -> Error(Nil) | ||
| } | ||
| }, | ||
| decoder: fn(data) { | ||
| case data { | ||
| <<tag:8, rest:bits>> if tag == id -> Ok(#(value, rest)) | ||
| _ -> Error(codec.TagMismatch(expected: name, got: "unknown")) | ||
| } | ||
| }, | ||
| ) | ||
|
|
||
| VariantBuilder([strategy, ..builder.strategies]) |
There was a problem hiding this comment.
Same issue as in add: id is encoded as <<id:8>> without enforcing the documented 0–255 range. Add a range check to avoid runtime crashes from out-of-range values.
| let strategy = | |
| VariantStrategy( | |
| id:, | |
| name:, | |
| encoder: fn(val) { | |
| case match(val) { | |
| True -> Ok(<<id:8>>) | |
| False -> Error(Nil) | |
| } | |
| }, | |
| decoder: fn(data) { | |
| case data { | |
| <<tag:8, rest:bits>> if tag == id -> Ok(#(value, rest)) | |
| _ -> Error(codec.TagMismatch(expected: name, got: "unknown")) | |
| } | |
| }, | |
| ) | |
| VariantBuilder([strategy, ..builder.strategies]) | |
| case int.in_range(0, 255, id) { | |
| True -> { | |
| let strategy = | |
| VariantStrategy( | |
| id:, | |
| name:, | |
| encoder: fn(val) { | |
| case match(val) { | |
| True -> Ok(<<id:8>>) | |
| False -> Error(Nil) | |
| } | |
| }, | |
| decoder: fn(data) { | |
| case data { | |
| <<tag:8, rest:bits>> if tag == id -> Ok(#(value, rest)) | |
| _ -> Error(codec.TagMismatch(expected: name, got: "unknown")) | |
| } | |
| }, | |
| ) | |
| VariantBuilder([strategy, ..builder.strategies]) | |
| } | |
| False -> builder | |
| } |
| NodeAtom = binary_to_atom(NodeName, utf8), | ||
| Pid ! {Tag, NodeAtom}. |
There was a problem hiding this comment.
simulate_node_event/3 uses binary_to_atom/2 on caller-provided data and is exported from the library module. This allows creating arbitrary new atoms at runtime (atom table exhaustion). Consider removing it from -export, moving it to test-only code, or using binary_to_existing_atom/2/validated allowlists so it can’t create unbounded atoms.
| NodeAtom = binary_to_atom(NodeName, utf8), | |
| Pid ! {Tag, NodeAtom}. | |
| case to_atom_safe(NodeName) of | |
| {ok, NodeAtom} -> | |
| Pid ! {Tag, NodeAtom}; | |
| _ -> | |
| ok | |
| end. |
| import distribute/cluster/monitor.{NodeDown, NodeUp} | ||
| import gleam/dynamic.{type Dynamic} | ||
| import gleam/erlang/process | ||
| import gleeunit/should | ||
|
|
||
| @external(erlang, "cluster_ffi", "simulate_node_event") | ||
| fn simulate_node_event(pid: process.Pid, tag: Dynamic, node: String) -> Nil | ||
|
|
||
| @external(erlang, "cluster_ffi", "nodeup_atom") | ||
| fn nodeup_atom() -> Dynamic | ||
|
|
||
| @external(erlang, "cluster_ffi", "nodedown_atom") | ||
| fn nodedown_atom() -> Dynamic | ||
|
|
||
| pub fn monitor_lifecycle_test() { | ||
| let subj = process.new_subject() | ||
| let assert Ok(monitor_subj) = monitor.subscribe(subj) | ||
|
|
There was a problem hiding this comment.
This test file calls monitor.subscribe/monitor.unsubscribe but only imports the NodeDown/NodeUp constructors. Add import distribute/cluster/monitor as monitor (or import the functions directly) so the module reference resolves.
This pull request introduces several new features and improvements for version 3.1.0 of the
distributelibrary, focusing on cluster monitoring, easier codecs for custom types (ADTs/variants), and enhanced telemetry. It also includes documentation updates and minor API clarifications.New Features
Cluster Monitoring:
distribute/cluster/monitormodule for typed, event-driven node notifications (NodeUp,NodeDown), withsubscribeandunsubscribefunctions to manage subscriptions. [1] [2] [3]nodeup_atom/0,nodedown_atom/0,monitor_nodes/1, etc.) incluster_ffi.erl. [1] [2]ADT/Variant Codecs:
distribute/codec/variantwith a builder pattern for encoding/decoding custom types (enums/ADTs), supporting payloads and unit variants. [1] [2]Telemetry:
distribute/internal/telemetryfor emitting Erlang:telemetryevents on send, receive, encode, decode, registry, and cluster operations.Documentation and API Improvements
README.md:Versioning and Dependencies
gleam.tomland updated changelog. [1] [2]telemetryas a dependency ingleam.toml.