Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ name: test

on:
push:
branches: [main]
branches:
- master
- main
pull_request:

jobs:
Expand All @@ -12,8 +14,10 @@ jobs:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
with:
otp-version: "27.0"
otp-version: "28"
gleam-version: "1.14.0"
rebar3-version: "3"
# elixir-version: "1"
- run: gleam deps download
- run: gleam test
- run: gleam format --check src test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
erl_crash.dump
**/build
*.log
/notes

16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## v3.1.0 — 2026-03-04

### Added

- **Cluster Monitoring** (`distribute/cluster/monitor`) — Typed event-driven
notifications for `NodeUp` and `NodeDown`.
- `distribute.subscribe(subject)` — Start a monitored subscription.
- `distribute.unsubscribe(monitor_subject)` — Stop a subscription.
- **ADT/Variant Codecs** (`distribute/codec/variant`) — A builder pattern to
easily create codecs for Custom Types (enums) with payload support.
- `variant.new()`, `variant.add()`, `variant.unit()`, `variant.build()`
- **Telemetry** (`distribute/internal/telemetry`) — Erlang `:telemetry` events
for send, receive, encode, decode, registry, and cluster operations.

## v3.0.0 — 2026-02-11

Ground-up rewrite. Smaller API, proper OTP actors, compile-time type safety
Expand Down Expand Up @@ -38,4 +52,4 @@ connection pool, retry. Also removed `whereis_global(name, encoder, decoder)`.
- `global.reply(reply_to, response, encoder)` — send a response back.
- `composite.option(c)`, `composite.result(ok, err)`,
`composite.tuple2(a, b)`, `composite.tuple3(a, b, c)`.
- `registry.named(name, codec)` — short form of `typed_name`.
- `registry.named(name, codec)` — short form of `typed_name`.
50 changes: 48 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ let user_id_codec = codec.map(codec.int(), UserId, fn(uid) {
```

Gleam has no derive macros or reflection, so codecs for complex types
are manual. The combinators handle the serialization you just wire
the fields together.
are manual. The combinators handle the serialization so you just wire
the fields together!

## Modules

Expand All @@ -122,11 +122,57 @@ the fields together.
| `distribute/cluster` | `net_kernel` start/connect/ping |
| `distribute/codec` | Binary codecs for primitives + `subject()` |
| `distribute/codec/composite` | Option, Result, Tuple codecs |
| `distribute/codec/variant` | Build codecs for Custom Types (ADTs) |
| `distribute/codec/tagged` | Tagged messages with version field |
| `distribute/global` | `GlobalSubject(msg)`, `call`, `reply` |
| `distribute/cluster/monitor` | `NodeUp`, `NodeDown` typed events |
| `distribute/registry` | `TypedName(msg)`, `:global` registration |
| `distribute/receiver` | Typed receive, OTP actor wrappers |

### Custom Type Codecs

Seamlessly encode and decode your Algebraic Data Types (enums) with a fluent builder.

```gleam
pub type MyMessage {
Text(String)
Ping
}

import distribute/codec
import distribute/codec/variant

let my_codec =
variant.new()
|> variant.add(0, "Text", codec.string(), Text, fn(m) {
case m { Text(s) -> Ok(s); _ -> Error(Nil) }
})
|> variant.unit(1, "Ping", Ping, fn(m) { m == Ping })
|> variant.build()
```

### Cluster Monitoring

Subscribe to cluster events (`NodeUp`, `NodeDown`) to react to node topology changes.

```gleam
import distribute
import distribute/cluster/monitor

let subj = process.new_subject()
let assert Ok(m) = distribute.subscribe(subj)

// In your actor/process
case process.receive(subj, 5000) {
Ok(monitor.NodeUp(node)) -> io.println("Node joined: " <> node)
Ok(monitor.NodeDown(node)) -> io.println("Node left: " <> node)
_ -> Nil
}

// Later
distribute.unsubscribe(m)
```

## Caveats

**What the types catch** — within one codebase, `TypedName` and
Expand Down
3 changes: 2 additions & 1 deletion gleam.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name = "distribute"
version = "3.0.0"
version = "3.1.0"
description = "Typed distributed messaging for Gleam on the BEAM."
licences = ["MIT"]
repository = { type = "github", user = "lupodevelop", repo = "distribute" }
Expand All @@ -13,6 +13,7 @@ target = "erlang"
gleam_stdlib = ">= 0.60.0 and < 2.0.0"
gleam_erlang = ">= 1.0.0 and < 2.0.0"
gleam_otp = ">= 1.0.0 and < 2.0.0"
telemetry = ">= 1.0.0 and < 2.0.0"

[dev-dependencies]
gleeunit = ">= 1.0.0 and < 2.0.0"
2 changes: 2 additions & 0 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ packages = [
{ name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" },
{ name = "gleam_stdlib", version = "0.68.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "F7FAEBD8EF260664E86A46C8DBA23508D1D11BB3BCC6EE1B89B3BC3E5C83FF1E" },
{ name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" },
{ name = "telemetry", version = "1.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "telemetry", source = "hex", outer_checksum = "7015FC8919DBE63764F4B4B87A95B7C0996BD539E0D499BE6EC9D7F3875B79E6" },
]

[requirements]
gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" }
gleam_otp = { version = ">= 1.0.0 and < 2.0.0" }
gleam_stdlib = { version = ">= 0.60.0 and < 2.0.0" }
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
telemetry = { version = ">= 1.0.0 and < 2.0.0" }
20 changes: 19 additions & 1 deletion src/cluster_ffi.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
-module(cluster_ffi).
-export([start_node/2, connect/1, nodes/0, self_node/0, ping/1,
is_ok_atom/1, get_error_reason/1, is_true/1, is_ignored/1]).
is_ok_atom/1, get_error_reason/1, is_true/1, is_ignored/1,
nodeup_atom/0, nodedown_atom/0, monitor_nodes/1,
atom_to_string/1, get_node_from_tuple/1,
simulate_node_event/3]).

-import(distribute_ffi_utils, [to_atom_safe/1]).

Expand Down Expand Up @@ -80,3 +83,18 @@ to_atom_force(Atom) when is_atom(Atom) ->
is_valid_node_input(Bin) when is_binary(Bin) ->
byte_size(Bin) =< 512 andalso
binary:match(Bin, <<"\0">>) =:= nomatch.

nodeup_atom() -> nodeup.
nodedown_atom() -> nodedown.

monitor_nodes(Flag) ->
net_kernel:monitor_nodes(Flag).

atom_to_string(Atom) when is_atom(Atom) ->
atom_to_binary(Atom, utf8).

get_node_from_tuple({_, Node}) -> Node.

simulate_node_event(Pid, Tag, NodeName) ->
NodeAtom = binary_to_atom(NodeName, utf8),
Pid ! {Tag, NodeAtom}.
Comment on lines +99 to +100
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simulate_node_event/3 uses binary_to_atom/2 on caller-provided data and is exported from the library module. This allows creating arbitrary new atoms at runtime (atom table exhaustion). Consider removing it from -export, moving it to test-only code, or using binary_to_existing_atom/2/validated allowlists so it can’t create unbounded atoms.

Suggested change
NodeAtom = binary_to_atom(NodeName, utf8),
Pid ! {Tag, NodeAtom}.
case to_atom_safe(NodeName) of
{ok, NodeAtom} ->
Pid ! {Tag, NodeAtom};
_ ->
ok
end.

Copilot uses AI. Check for mistakes.
21 changes: 18 additions & 3 deletions src/distribute.gleam
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
//// Facade for common distributed operations: start a node, register
//// actors, send messages, look things up.
//// Facade for common distributed operations.

import distribute/actor as dist_actor
import distribute/cluster
import distribute/cluster/monitor
import distribute/codec
import distribute/global
import distribute/receiver
import distribute/registry
import gleam/erlang/process
import gleam/otp/actor

pub fn version() -> String {
"3.0.0"
"3.1.0"
}

// -- Cluster -----------------------------------------------------------------
Expand Down Expand Up @@ -84,3 +85,17 @@ pub fn lookup(
pub fn unregister(name: String) -> Result(Nil, registry.RegisterError) {
registry.unregister(name)
}

// -- Cluster Monitoring ------------------------------------------------------

pub fn subscribe(
user_subject: process.Subject(monitor.ClusterEvent),
) -> Result(process.Subject(monitor.ControlMessage), actor.StartError) {
monitor.subscribe(user_subject)
}

pub fn unsubscribe(
monitor_subject: process.Subject(monitor.ControlMessage),
) -> Nil {
monitor.unsubscribe(monitor_subject)
}
2 changes: 1 addition & 1 deletion src/distribute/actor.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub fn start_supervised(
}
}

/// Start N supervised actors, registered as `name_1` .. `name_N`.
/// Start N supervised actors, registered as name_1 .. name_N.
pub fn pool(
typed_name: registry.TypedName(msg),
size: Int,
Expand Down
9 changes: 4 additions & 5 deletions src/distribute/cluster.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ fn get_error_reason(value: dynamic.Dynamic) -> String
// ---------------------------------------------------------------------------

/// Start a distributed BEAM node.
///
/// `name` must contain `@` (e.g. `"myapp@127.0.0.1"`).
/// `cookie` must be ≤ 255 characters.
/// Name must contain @ (e.g. myapp@127.0.0.1).
/// Cookie must be 255 characters or fewer.
pub fn start_node(name: String, cookie: String) -> Result(Nil, StartError) {
case validate_node_name(name) {
Error(e) -> Error(e)
Expand All @@ -80,7 +79,7 @@ pub fn start_node(name: String, cookie: String) -> Result(Nil, StartError) {
}
}

/// Connect to a remote node. Returns `Ok(Nil)` on success.
/// Connect to a remote node. Returns Ok(Nil) on success.
pub fn connect(node: String) -> Result(Nil, ConnectError) {
case string.contains(node, "@") {
False -> Error(NodeNotFound)
Expand Down Expand Up @@ -108,7 +107,7 @@ pub fn self_node() -> String {
self_node_ffi()
}

/// Ping a remote node. Returns `True` if it responds.
/// Ping a remote node. Returns True if it responds.
pub fn ping(node: String) -> Bool {
ping_ffi(node)
}
Expand Down
81 changes: 81 additions & 0 deletions src/distribute/cluster/monitor.gleam
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import distribute/internal/telemetry
import gleam/dynamic.{type Dynamic}
import gleam/erlang/process.{type Subject}
import gleam/otp/actor
import gleam/result

pub type ClusterEvent {
NodeUp(node: String)
NodeDown(node: String)
}

@external(erlang, "cluster_ffi", "nodeup_atom")
fn nodeup_atom() -> Dynamic

@external(erlang, "cluster_ffi", "nodedown_atom")
fn nodedown_atom() -> Dynamic

@external(erlang, "cluster_ffi", "monitor_nodes")
fn monitor_nodes_ffi(flag: Bool) -> Dynamic

@external(erlang, "cluster_ffi", "atom_to_string")
fn atom_to_string_ffi(atom: Dynamic) -> String

@external(erlang, "cluster_ffi", "get_node_from_tuple")
fn get_node_from_tuple_ffi(msg: Dynamic) -> Dynamic

pub type ControlMessage {
Stop
InternalEvent(ClusterEvent)
}

/// Subscribe to cluster topology events.
/// Events are forwarded to the provided user_subject.
/// Returns a control subject to stop the subscription.
pub fn subscribe(
user_subject: Subject(ClusterEvent),
) -> Result(Subject(ControlMessage), actor.StartError) {
actor.new_with_initialiser(5000, fn(self_subject) {
// Start node monitoring in the actor's context
monitor_nodes_ffi(True)

// Select standard nodeup/nodedown Erlang messages
let selector =
process.new_selector()
|> process.select(self_subject)
|> process.select_record(nodeup_atom(), 1, fn(msg) {
let node_atom = get_node_from_tuple_ffi(msg)
InternalEvent(NodeUp(atom_to_string_ffi(node_atom)))
})
|> process.select_record(nodedown_atom(), 1, fn(msg) {
let node_atom = get_node_from_tuple_ffi(msg)
InternalEvent(NodeDown(atom_to_string_ffi(node_atom)))
})

Ok(
actor.initialised(user_subject)
|> actor.selecting(selector)
|> actor.returning(self_subject),
)
})
|> actor.on_message(fn(state, msg) {
case msg {
InternalEvent(event) -> {
case event {
NodeUp(node) -> telemetry.emit_node_event(node, "up")
NodeDown(node) -> telemetry.emit_node_event(node, "down")
}
process.send(state, event)
actor.continue(state)
}
Stop -> actor.stop()
}
})
|> actor.start()
|> result.map(fn(started) { started.data })
}

/// Stop the monitoring for a specific subscription.
pub fn unsubscribe(monitor_subject: Subject(ControlMessage)) -> Nil {
process.send(monitor_subject, Stop)
}
21 changes: 6 additions & 15 deletions src/distribute/codec.gleam
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/// Binary codecs for sending Gleam values across nodes.
/// Binary codecs for Gleam values across nodes.
import gleam/bit_array
import gleam/erlang/process
import gleam/int
Expand Down Expand Up @@ -35,7 +35,7 @@ pub type Encoder(a) =
pub type Decoder(a) =
fn(BitArray) -> Result(a, DecodeError)

/// Like `Decoder` but returns leftover bytes for chaining.
/// Like Decoder but returns leftover bytes for chaining.
pub type SizedDecoder(a) =
fn(BitArray) -> Result(#(a, BitArray), DecodeError)

Expand Down Expand Up @@ -67,7 +67,7 @@ pub fn decode_sized(
decoder(data)
}

/// Turn a `SizedDecoder` into a `Decoder` (drops remaining bytes).
/// Turn a SizedDecoder into a Decoder (drops remaining bytes).
pub fn to_decoder(sized: SizedDecoder(a)) -> Decoder(a) {
fn(data) {
case sized(data) {
Expand Down Expand Up @@ -327,8 +327,8 @@ fn encode_subject_ffi(subject: process.Subject(BitArray)) -> BitArray
@external(erlang, "distribute_ffi_utils", "decode_subject_safe")
fn decode_subject_ffi(data: BitArray) -> Result(process.Subject(BitArray), Nil)

/// Encode a `Subject(BitArray)` via `term_to_binary`. The PID
/// inside carries node info, so it routes back cross-node.
/// Encode a Subject(BitArray) via term_to_binary.
/// The PID inside carries node info, so it routes back cross-node.
pub fn subject_encoder() -> Encoder(process.Subject(BitArray)) {
fn(sub) {
let bytes = encode_subject_ffi(sub)
Expand Down Expand Up @@ -442,16 +442,7 @@ pub fn subject() -> Codec(process.Subject(BitArray)) {
)
}

/// Transform a codec. `wrap` runs after decoding, `unwrap` before encoding.
///
/// ```gleam
/// type UserId { UserId(Int) }
///
/// let user_id = codec.map(codec.int(), UserId, fn(uid) {
/// let UserId(n) = uid
/// n
/// })
/// ```
/// Transform a codec. wrap runs after decoding, unwrap before encoding.
pub fn map(c: Codec(a), wrap: fn(a) -> b, unwrap: fn(b) -> a) -> Codec(b) {
Codec(
encoder: fn(value) { c.encoder(unwrap(value)) },
Expand Down
Loading