Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,11 @@ interface Environment<T, P : Position<out P>> :
*/
var linkingRule: LinkingRule<T, P>

/**
* Given a [node], this method returns its neighborhood.
*/
fun getNeighborhood(node: Node<T>): Neighborhood<T>

/**
* Given a [node], this method returns an observable view of
* its neighborhood.
*/
fun observeNeighborhood(node: Node<T>): Observable<Neighborhood<T>>
fun getNeighborhood(node: Node<T>): Observable<Neighborhood<T>>

/**
* Allows accessing a [Node] in this [Environment] known its [id].
Expand All @@ -121,15 +116,10 @@ interface Environment<T, P : Position<out P>> :
*/
val observableNodes: ObservableSet<Node<T>>

/**
* Returns the number of [Node]s currently in the [Environment].
*/
val nodeCount: Int

/**
* Returns an [Observable] view of the number of [Node]s currently in the [Environment].
*/
val observeNodeCount: Observable<Int>
val nodeCount: Observable<Int>

/**
* Given a [node] this method returns a list of all the surrounding
Expand Down Expand Up @@ -167,14 +157,14 @@ interface Environment<T, P : Position<out P>> :
val offset: DoubleArray

/**
* Calculates the position of a [node].
* Observe the position of a [node].
*/
fun getPosition(node: Node<T>): P
fun getPosition(node: Node<T>): Observable<P>

/**
* Observe the position of a [node].
* Retrieves [node]'s current position.
*/
fun observePosition(node: Node<T>): Observable<P>
fun getCurrentPosition(node: Node<T>): P = getPosition(node).current

/**
* Return the current [Simulation], if present, or throws an [IllegalStateException] otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ interface EuclideanEnvironment<T, P> : Environment<T, P> where P : Position<P>,
* method may suffice.
*/
fun moveNode(node: Node<T>, direction: P) {
val oldcoord = getPosition(node)
val oldcoord = getCurrentPosition(node)
moveNodeToPosition(node, oldcoord.plus(direction))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import arrow.core.some
import java.util.Collections

/**
* An abstract implementation of the `Observable` interface designed to support observables whose states
* are derived from other data sources. Manages the lifecycle of observation and update propagation.
* An abstract implementation of the [Observable] interface designed to support observables whose states
* are derived from other data sources. Manages the lifecycle of observation and update propagation while
* keeping a minimal set of active subscriptions with the underlying observables. Moreover, observation of
* the sources is enabled if only there are observers registered with this derived observable.
*
* @param emitOnDistinct whether to emit when the new derived value is different from the current one.
* @param T The type of data being observed.
*/
abstract class DerivedObservable<T>(private val emitOnDistinct: Boolean = true) : Observable<T> {
private val callbacks = LinkedHashMap<Any, List<(T) -> Unit>>()

private var cached: Option<T> = none()
protected var cached: Option<T> = none()

private var isListening = false

Expand All @@ -45,17 +47,20 @@ abstract class DerivedObservable<T>(private val emitOnDistinct: Boolean = true)
}
}

override fun onChange(registrant: Any, callback: (T) -> Unit) {
override fun onChange(registrant: Any, invokeOnRegistration: Boolean, callback: (T) -> Unit) {
val wasEmpty = callbacks.isEmpty()
callbacks[registrant] = callbacks[registrant].orEmpty() + callback
if (wasEmpty) {
val initial = computeFresh()
cached = initial.some()
callback(initial)

isListening = true
startMonitoring()
} else {
if (invokeOnRegistration) {
val initial = computeFresh()
cached = initial.some()
callback(initial)
startMonitoring(true)
} else {
startMonitoring(true)
}
} else if (invokeOnRegistration) {
callback(current)
}
}
Expand All @@ -80,10 +85,39 @@ abstract class DerivedObservable<T>(private val emitOnDistinct: Boolean = true)
cached = none()
}

/**
* Initiates monitoring for changes or updates in the implementing class.
* This method should enable necessary mechanisms to observe and react to changes
* based on the specific implementation of the derived class.
*/
protected abstract fun startMonitoring()

/**
* Initiates monitoring for changes or updates in the implementing class.
* This method should enable necessary mechanisms to observe and react to changes
* based on the specific implementation of the derived class.
*
* @param lazy whether the monitoring should be started lazily (avoiding immediate callbacks)
*/
protected open fun startMonitoring(lazy: Boolean) {
startMonitoring()
}

/**
* Stops monitoring for changes or updates in the implementing class.
* This method should disable any active observation mechanisms and ensure
* that resources or listeners associated with monitoring are appropriately released.
* It is intended to complement the `startMonitoring` function.
*/
protected abstract fun stopMonitoring()

/**
* Computes and returns a fresh value of type [T]. This method is expected to be implemented
* in derived classes to provide a new value that represents the updated state or computation
* result of the observable entity.
*
* @return a fresh, computed value of type [T]
*/
protected abstract fun computeFresh(): T

protected fun updateAndNotify(newValue: T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/*
* Copyright (C) 2010-2026, Danilo Pianini and contributors
* listed, for each module, in the respective subproject's build.gradle.kts file.
*
* This file is part of Alchemist, and is distributed under the terms of the
* GNU General Public License, with a linking exception,
* as described in the file LICENSE in the Alchemist distribution's top directory.
*/

package it.unibo.alchemist.model.observation

/**
Expand All @@ -24,9 +33,9 @@ class EventObservable : Observable<Unit> {
observingCallbacks.values.forEach { callbacks -> callbacks.forEach { it(Unit) } }
}

override fun onChange(registrant: Any, callback: (Unit) -> Unit) {
override fun onChange(registrant: Any, invokeOnRegistration: Boolean, callback: (Unit) -> Unit) {
observingCallbacks[registrant] = observingCallbacks[registrant].orEmpty() + callback
callback(current)
if (invokeOnRegistration) callback(current)
}

override fun stopWatching(registrant: Any) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,18 @@ interface Observable<T> : Disposable {
* @param callback The callback to be executed when the observable's state changes. It receives
* the updated value of the observable as an argument.
*/
fun onChange(registrant: Any, callback: (T) -> Unit)
fun onChange(registrant: Any, callback: (T) -> Unit) = onChange(registrant, true, callback)

/**
* Registers a callback to be notified of changes in the observable. The callback is invoked
* whenever the state of the observable changes.
*
* @param registrant The entity registering the callback.
* @param invokeOnRegistration Whether the callback should be invoked immediately upon registration.
* @param callback The callback to be executed when the observable's state changes. It receives
* the updated value of the observable as an argument.
*/
fun onChange(registrant: Any, invokeOnRegistration: Boolean, callback: (T) -> Unit)

/**
* Unregisters the specified registrant from watching for changes or updates in the observable.
Expand Down Expand Up @@ -68,8 +79,10 @@ interface Observable<T> : Disposable {

override fun computeFresh(): S = transform(this@Observable.current)

override fun startMonitoring() {
this@Observable.onChange(this) {
override fun startMonitoring() = startMonitoring(false)

override fun startMonitoring(lazy: Boolean) {
this@Observable.onChange(this, !lazy) {
updateAndNotify(transform(it))
}
}
Expand All @@ -94,13 +107,15 @@ interface Observable<T> : Disposable {

override fun computeFresh(): R = merge(this@Observable.current, other.current)

override fun startMonitoring() {
override fun startMonitoring() = startMonitoring(false)

override fun startMonitoring(lazy: Boolean) {
val handleUpdate: (Any?) -> Unit = {
updateAndNotify(merge(this@Observable.current, other.current))
}

listOf(this@Observable, other).forEach { obs ->
obs.onChange(this, handleUpdate)
obs.onChange(this, !lazy, handleUpdate)
}
}

Expand Down Expand Up @@ -140,10 +155,10 @@ interface Observable<T> : Disposable {

override val observingCallbacks: MutableMap<Any, List<(T) -> Unit>> = mutableMapOf()

override fun onChange(registrant: Any, callback: (T) -> Unit) {
override fun onChange(registrant: Any, invokeOnRegistration: Boolean, callback: (T) -> Unit) {
observers += registrant
observingCallbacks[registrant] = observingCallbacks[registrant].orEmpty() + callback
this@asMutable.onChange(this to registrant, callback)
this@asMutable.onChange(this to registrant, invokeOnRegistration, callback)
}

override fun stopWatching(registrant: Any) {
Expand Down Expand Up @@ -176,7 +191,7 @@ interface MutableObservable<T> : Observable<T> {
}

/**
* Factory methods container.
* Factories and extension methods container.
*/
companion object {

Expand All @@ -201,8 +216,10 @@ interface MutableObservable<T> : Observable<T> {

override val observers: List<Any> get() = observingCallbacks.keys.toList()

override fun onChange(registrant: Any, callback: (T) -> Unit) {
callback(current)
override fun onChange(registrant: Any, invokeOnRegistration: Boolean, callback: (T) -> Unit) {
if (invokeOnRegistration) {
callback(current)
}
observingCallbacks[registrant] = observingCallbacks[registrant]?.let {
it + callback
} ?: listOf(callback)
Expand Down
Loading
Loading