Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ and an expressive **operator DSL** (`|`, `>>`, `@`) that makes Python feel almos
- Core monads implemented:
- `Maybe` — handle missing values
- `Either` / `Result` — safe error handling
- `Validation` — accumulate multiple errors
- `Reader` — dependency injection / environment
- `Writer` — accumulate logs
- `State` — stateful computations
- Monad transformers: `MaybeT`, `ResultT`, `ReaderT`, `StateT`, `WriterT`
- Utilities: `traverse`/`sequence`, Applicative combinators
- Advanced monads: `RWST` (Reader-Writer-State)
- Operator overloads for concise DSL-style code:
- `|` → `fmap` (map)
- `>>` → `bind` (flatMap)
Expand Down Expand Up @@ -70,6 +73,24 @@ print(res2) # Err("invalid int: foo")

---

### Validation: accumulate errors via Applicative

```python
from darkcore.validation import Success, Failure

def positive(x: int):
return Failure(["non-positive"]) if x <= 0 else Success(x)

v = Success(lambda a: lambda b: a + b).ap(positive(-1)).ap(positive(0))
print(v) # Failure(['non-positive', 'non-positive'])

# Result would stop at the first failure
```

Validation is primarily intended for Applicative composition; `bind` short-circuits like `Result` and is not recommended for error accumulation scenarios.

---

### Reader

```python
Expand Down Expand Up @@ -117,6 +138,39 @@ prog = inc >> (lambda x: State(lambda s: (x+s, s)))
print(prog.run(1)) # (3, 2)
```

### Traverse utilities

```python
from darkcore.traverse import traverse_result
from darkcore.result import Ok, Err

def parse_int(s: str):
try:
return Ok(int(s))
except ValueError:
return Err(f"bad: {s}")

print(traverse_result(["1", "2"], parse_int)) # Ok([1, 2])
print(traverse_result(["1", "x"], parse_int)) # Err("bad: x")
```

`Result` short-circuits on the first `Err` in `traverse_*` / `sequence_*`, whereas `Validation` accumulates errors under Applicative composition.

### RWST

```python
from darkcore.rwst import RWST
from darkcore.result import Ok

combine = lambda a, b: a + b

action = RWST.ask(Ok.pure, combine=combine, empty=list).bind(
lambda env: RWST.tell([env], Ok.pure, combine=combine, empty=list)
)

print(action(1, 0)) # Ok(((None, 0), [1]))
```

### Operator DSL

```python
Expand Down
19 changes: 17 additions & 2 deletions darkcore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,30 @@
from .reader import Reader
from .writer import Writer
from .state import State
from .validation import Success, Failure, Validation, from_result, to_result
from .traverse import (
sequence_maybe,
sequence_result,
traverse_maybe,
traverse_result,
liftA2,
left_then,
then_right,
)
from .rwst import RWST
from .maybe_t import MaybeT
from .reader_t import ReaderT
from .state_t import StateT
from .writer_t import WriterT
from .either_t import EitherT
from .result_t import ResultT
from .validation_t import ValidationT

__all__ = [
"Maybe", "Ok", "Err", "Left", "Right",
"Reader", "Writer", "State",
"MaybeT", "ReaderT", "StateT", "WriterT", "EitherT", "ResultT"
"Reader", "Writer", "State", "Validation", "Success", "Failure",
"sequence_maybe", "sequence_result", "traverse_maybe", "traverse_result",
"liftA2", "left_then", "then_right", "RWST",
"MaybeT", "ReaderT", "StateT", "WriterT", "EitherT", "ResultT", "ValidationT",
"from_result", "to_result",
]
150 changes: 150 additions & 0 deletions darkcore/rwst.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from __future__ import annotations
from typing import Any, Callable, Generic, TypeVar, cast
from .core import Monad, MonadOpsMixin

R = TypeVar("R")
W = TypeVar("W")
S = TypeVar("S")
A = TypeVar("A")
B = TypeVar("B")

class RWST(MonadOpsMixin[A], Generic[R, W, S, A]):
"""Reader-Writer-State monad transformer."""

def __init__(
self,
run: Callable[[R, S], Monad[tuple[tuple[A, S], W]]],
*,
combine: Callable[[W, W], W],
empty: Callable[[], W],
) -> None:
self.run = run
self.combine = combine
self.empty = empty

@classmethod
def pure_with(
cls,
pure: Callable[[tuple[tuple[A, S], W]], Monad[tuple[tuple[A, S], W]]],
value: A,
*,
combine: Callable[[W, W], W],
empty: Callable[[], W],
) -> "RWST[R, W, S, A]":
return RWST(lambda _r, s: pure(((value, s), empty())), combine=combine, empty=empty)

def fmap(self, f: Callable[[A], B]) -> "RWST[R, W, S, B]":
def new_run(r: R, s: S) -> Monad[tuple[tuple[B, S], W]]:
return self.run(r, s).fmap(lambda res: ((f(res[0][0]), res[0][1]), res[1]))
return RWST(new_run, combine=self.combine, empty=self.empty)

map = fmap

def ap(
self: "RWST[R, W, S, Callable[[A], B]]",
fa: "RWST[R, W, S, A]",
) -> "RWST[R, W, S, B]":
def new_run(r: R, s: S) -> Monad[tuple[tuple[B, S], W]]:
m1 = self.run(r, s)
return m1.bind(
lambda pair_f: fa.run(r, pair_f[0][1]).bind(
lambda pair_a: cast(
Monad[tuple[tuple[B, S], W]],
cast(Any, m1).pure(
(
(pair_f[0][0](pair_a[0][0]), pair_a[0][1]),
self.combine(pair_f[1], pair_a[1]),
)
),
)
)
)
return RWST(new_run, combine=self.combine, empty=self.empty)

def bind(self, f: Callable[[A], "RWST[R, W, S, B]"]) -> "RWST[R, W, S, B]":
def new_run(r: R, s: S) -> Monad[tuple[tuple[B, S], W]]:
m1 = self.run(r, s)
return m1.bind(
lambda pair: f(pair[0][0]).run(r, pair[0][1]).bind(
lambda res: cast(
Monad[tuple[tuple[B, S], W]],
cast(Any, m1).pure(
((res[0][0], res[0][1]), self.combine(pair[1], res[1]))
),
)
)
)
return RWST(new_run, combine=self.combine, empty=self.empty)

@classmethod
def lift(
cls,
monad: Monad[A],
*,
combine: Callable[[W, W], W],
empty: Callable[[], W],
) -> "RWST[R, W, S, A]":
def run(r: R, s: S) -> Monad[tuple[tuple[A, S], W]]:
def step(a: A) -> Monad[tuple[tuple[A, S], W]]:
return cast(Monad[tuple[tuple[A, S], W]], cast(Any, monad).pure(((a, s), empty())))
return monad.bind(step)
return RWST(run, combine=combine, empty=empty)

@classmethod
def ask(
cls,
pure: Callable[[tuple[tuple[R, S], W]], Monad[tuple[tuple[R, S], W]]],
*,
combine: Callable[[W, W], W],
empty: Callable[[], W],
) -> "RWST[R, W, S, R]":
return RWST(lambda r, s: pure(((r, s), empty())), combine=combine, empty=empty)

@classmethod
def tell(
cls,
w: W,
pure: Callable[[tuple[tuple[None, S], W]], Monad[tuple[tuple[None, S], W]]],
*,
combine: Callable[[W, W], W],
empty: Callable[[], W],
) -> "RWST[R, W, S, None]":
return RWST(lambda _r, s: pure(((None, s), w)), combine=combine, empty=empty)

@classmethod
def get(
cls,
pure: Callable[[tuple[tuple[S, S], W]], Monad[tuple[tuple[S, S], W]]],
*,
combine: Callable[[W, W], W],
empty: Callable[[], W],
) -> "RWST[R, W, S, S]":
return RWST(lambda _r, s: pure(((s, s), empty())), combine=combine, empty=empty)

@classmethod
def put(
cls,
new_state: S,
pure: Callable[[tuple[tuple[None, S], W]], Monad[tuple[tuple[None, S], W]]],
*,
combine: Callable[[W, W], W],
empty: Callable[[], W],
) -> "RWST[R, W, S, None]":
return RWST(lambda _r, _s: pure(((None, new_state), empty())), combine=combine, empty=empty)

@classmethod
def modify(
cls,
f: Callable[[S], S],
pure: Callable[[tuple[tuple[None, S], W]], Monad[tuple[tuple[None, S], W]]],
*,
combine: Callable[[W, W], W],
empty: Callable[[], W],
) -> "RWST[R, W, S, None]":
return RWST(lambda _r, s: pure(((None, f(s)), empty())), combine=combine, empty=empty)

def __call__(self, r: R, s: S) -> Monad[tuple[tuple[A, S], W]]:
return self.run(r, s)

def __repr__(self) -> str:
return f"RWST({self.run!r})"
41 changes: 41 additions & 0 deletions darkcore/traverse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations
from typing import Callable, List, Sequence, TypeVar, Any, cast
from .maybe import Maybe
from .result import Result, Ok, Err
from .core import SupportsFmapBindAp

A = TypeVar("A")
B = TypeVar("B")
T = TypeVar("T")
F = TypeVar("F", bound=SupportsFmapBindAp[Any])

def sequence_maybe(xs: Sequence[Maybe[A]]) -> Maybe[List[A]]:
acc: Maybe[List[A]] = Maybe.pure(cast(List[A], []))
for m in xs:
acc = liftA2(lambda lst, v: lst + [v], acc, m)
return acc

def traverse_maybe(xs: Sequence[T], f: Callable[[T], Maybe[A]]) -> Maybe[List[A]]:
return sequence_maybe([f(x) for x in xs])

def sequence_result(xs: Sequence[Result[A]]) -> Result[List[A]]:
acc: List[A] = []
for r in xs:
if isinstance(r, Err):
return cast(Result[List[A]], r)
acc.append(cast(Ok[A], r).value)
return Ok(acc)

def traverse_result(xs: Sequence[T], f: Callable[[T], Result[A]]) -> Result[List[A]]:
return sequence_result([f(x) for x in xs])

def liftA2(
f: Callable[[A, B], T], fa: SupportsFmapBindAp[A], fb: SupportsFmapBindAp[B]
) -> Any:
return fa.fmap(lambda a: (lambda b: f(a, b))).ap(fb)

def left_then(fa: SupportsFmapBindAp[A], fb: SupportsFmapBindAp[B]) -> Any:
return liftA2(lambda a, _b: a, fa, fb)

def then_right(fa: SupportsFmapBindAp[A], fb: SupportsFmapBindAp[B]) -> Any:
return liftA2(lambda _a, b: b, fa, fb)
Loading