Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions src/main/scala/org/constellation/lb/Manager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.http4s.server.Router
import org.http4s.server.blaze.BlazeServerBuilder
import NemOListOps._

import scala.collection.immutable.SortedMap
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
import scala.language.postfixOps
Expand All @@ -41,7 +42,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(
.resource

private lazy val hostsRef =
Ref.unsafe[IO, NonEmptyMap[Addr, Option[List[Info]]]](init.map(addr => addr -> None).toNem)
Ref.unsafe[IO, NonEmptyMap[Addr, Option[NonEmptyList[Info]]]](init.map(addr => addr -> None).toNem)

private lazy val sessionRef = Ref.unsafe[IO, Option[Long]](None)

Expand Down Expand Up @@ -94,10 +95,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(
logger.warn(
s"Evicted nodes not in cluster session ${clusterSession}: ${otherSessionNodes.keySet} "
)
} >> {
val entries = sameSessionNodes.toList.map { case (k, v) => (k, Option(v)) }
IO.pure(NonEmptyList.fromListUnsafe(entries).toNem)
}
} >> IO.pure(NonEmptyMap.fromMapUnsafe(SortedMap.from(sameSessionNodes)))
}
case _ =>
logger.warn(
Expand All @@ -113,7 +111,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(
case (activeHosts, inactiveHosts) =>
val clusterHosts = (activeHosts ++ inactiveHosts) // TODO: Add Eviction of inactive hosts
.filterNot(addr => status.contains(addr))
.foldLeft(status)((acc, addr) => acc.add(addr, Option.empty[List[Info]]))
.foldLeft(status)((acc, addr) => acc.add(addr, None))

hostsRef.set(clusterHosts).flatTap(_ => updateLbSetup(activeHosts))
}
Expand Down Expand Up @@ -204,12 +202,12 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(

private val settingsRoutes = HttpRoutes.of[IO] {
case POST -> Root / "maintenance" =>
enableMaintenanceMode.flatMap(_ => Ok())
enableMaintenanceMode >> Ok()
case DELETE -> Root / "maintenance" =>
disableMaintenanceMode.flatMap(_ => Ok())
disableMaintenanceMode >> Ok()
}

def buildClusterStatus(init: NonEmptyMap[Addr, Option[List[Info]]]): IO[(Set[Addr], Set[Addr])] = {
def buildClusterStatus(init: NonEmptyMap[Addr, Option[NonEmptyList[Info]]]): IO[(Set[Addr], Set[Addr])] = {
val tresholdLevel = init.keys.size / 2

def isActive(addr: Addr, proof: List[Info]) =
Expand All @@ -218,7 +216,8 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(
IO {
val (active, other) = rotateMap(init)(toAddress).toList
.partition {
case (addr: Addr, proof: List[Info]) => isActive(addr, proof)
case (addr, Some(proof)) => isActive(addr, proof.toList)
case _ => false
}

active.map(_._1).toSet -> other.map(_._1).toSet
Expand All @@ -230,25 +229,29 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)(

def getClusterInfo(
hosts: NonEmptySet[Addr]
)(implicit client: Client[IO]): IO[NonEmptyMap[Addr, Option[List[Info]]]] = {
)(implicit client: Client[IO]): IO[NonEmptyMap[Addr, Option[NonEmptyList[Info]]]] = {
IO.apply(logger.info(s"Fetch cluster status from following ${hosts.size} hosts: ${hosts.toList.take(5)}"))
.flatMap(
_ =>
hosts.toNonEmptyList
.map(
addr =>
node(addr).getInfo
.flatMap(
result =>
.flatMap {
case result @ x :: xs =>
logger
.debug(s"Node $addr returned $result")
.map(_ => addr -> Option(result))
)
.map(_ => addr -> Some(NonEmptyList(x, xs)))
case Nil =>
logger
.warn(s"Node $addr returned empty node list")
.map(_ => addr -> None)
}
.recoverWith {
case error =>
logger
.info(s"Cannot retrieve cluster info from addr=$addr error=$error")
.map(_ => addr -> Option.empty[List[Info]])
.map(_ => addr -> None)
}
)
.parSequence
Expand Down
46 changes: 33 additions & 13 deletions src/main/scala/org/constellation/lb/NemOListOps.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.constellation.lb

import cats.data.{NonEmptyMap, NonEmptySet}
import cats.data.{NonEmptyList, NonEmptyMap, NonEmptySet}

import scala.collection.immutable.{SortedMap, SortedSet}

Expand All @@ -14,7 +14,7 @@ object NemOListOps {
* @tparam V
* @return new keys
*/
def extractNewKeys[K: Ordering, V](nem: NonEmptyMap[K, Option[List[V]]])(v2k: V => K): SortedSet[K] =
def extractNewKeys[K: Ordering, V](nem: NonEmptyMap[K, Option[NonEmptyList[V]]])(v2k: V => K): SortedSet[K] =
nem.toNel.foldLeft(SortedSet.empty[K])(
(acc, bcc) =>
bcc match {
Expand All @@ -31,8 +31,10 @@ object NemOListOps {
* @tparam V
* @return (map of elements that satisfy the property, map of elements that don't)
*/
def splitByElementProp[K, V](map: Map[K, List[V]])(prop: V => Boolean): (Map[K, List[V]], Map[K, List[V]]) =
map.partition { case (_, v) => v.forall(prop) }
def splitByElementProp[K, V](
map: Map[K, Option[NonEmptyList[V]]]
)(prop: V => Boolean): (Map[K, Option[NonEmptyList[V]]], Map[K, Option[NonEmptyList[V]]]) =
map.partition { case (_, v) => v.exists(_.forall(prop)) }

/**
* Finds the first element of the lists in the map that satisfies a property
Expand All @@ -41,8 +43,8 @@ object NemOListOps {
* @tparam V
* @return first element wrapped in Some if found or None
*/
def findFirstElement[K, V](map: Map[K, Option[List[V]]]): Option[V] = {
map.collectFirst { case (_, Some(e :: _)) => e }
def findFirstElement[K, V](map: Map[K, Option[NonEmptyList[V]]]): Option[V] = {
map.collectFirst { case (_, Some(e)) => e.head }
}

/**
Expand All @@ -57,11 +59,6 @@ object NemOListOps {
def filterKeys[K, V](validKeys: NonEmptySet[K], map: NonEmptyMap[K, V]): SortedMap[K, V] =
map.toSortedMap.filter { case (k, _) => validKeys.contains(k) }

/**
* Builds a peer map grouping the peer info by each address
* @param clusterInfo cluster info
* @return map by address of peers
*/
/**
* "matrix-like" rotation of the map, creating a new map based regrouping by the keys extracted from the entry lists.
* @param map map of option lists
Expand All @@ -70,12 +67,35 @@ object NemOListOps {
* @tparam V
* @return map with lists from entries.
*/
def rotateMap[K, V](map: NonEmptyMap[K, Option[List[V]]])(v2k: V => K): Map[K, List[V]] =
def rotateMap[K, V](map: NonEmptyMap[K, Option[NonEmptyList[V]]])(v2k: V => K): Map[K, Option[NonEmptyList[V]]] =
map.toNel
.collect {
case (_, Some(el)) => el
case (_, Some(el)) => el.toList
}
.flatten
.groupBy(v2k)
.map { case (k, vs) => k -> list2oNel(vs) }

/**
* Convert a option non empty list to list
* @param l list of t
* @tparam T element type
* @return option non empty list
*/
def oNel2List[T](onel: Option[NonEmptyList[T]]): List[T] = onel match { // onel.fold(List[T]())(_.toList)
case Some(nel) => nel.toList
case _ => List()
}

/**
* Convert a list to option non empty list
* @param l list of t
* @tparam T element type
* @return option non empty list
*/
def list2oNel[T](l: List[T]): Option[NonEmptyList[T]] = l match {
case x :: xs => Some(NonEmptyList(x, xs))
case _ => None
}

}
44 changes: 22 additions & 22 deletions src/test/scala/org/constellation/lb/NemOListOpsSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.constellation.lb

import cats.data.{NonEmptyMap, NonEmptySet}
import cats.data.{NonEmptyList, NonEmptyMap, NonEmptySet}
import cats.implicits.catsKernelStdOrderForInt
import org.constellation.lb.NemOListOps._
import org.constellation.primitives.node.{Addr, Id, Info, NodeState}
Expand All @@ -14,14 +14,14 @@ class NemOListOpsSpec extends FunSpec with Matchers {

describe("extractNewKeys") {
it("extract new keys") {
extractNewKeys(NonEmptyMap.of(1 -> Some(List(1, 3, 4)), 2 -> Some(List(1, 3, 5))))(identity) shouldBe SortedSet(
extractNewKeys(NonEmptyMap.of(1 -> Some(NonEmptyList.of(1, 3, 4)), 2 -> Some(NonEmptyList.of(1, 3, 5))))(identity) shouldBe SortedSet(
3,
4,
5
)
}
it("no new keys") {
extractNewKeys(NonEmptyMap.of(1 -> Some(List(1, 3)), 2 -> Some(List(2, 1)), 3 -> None))(identity) shouldBe empty
extractNewKeys(NonEmptyMap.of(1 -> Some(NonEmptyList.of(1, 3)), 2 -> Some(NonEmptyList.of(2, 1)), 3 -> None))(identity) shouldBe empty
}
}

Expand All @@ -40,22 +40,22 @@ class NemOListOpsSpec extends FunSpec with Matchers {
}

it("rotate of unit map") {
rotateMap(NonEmptyMap.of(1 -> Some(List(2 -> "2"))))(_._1) shouldBe SortedMap(2 -> List(2 -> "2"))
rotateMap(NonEmptyMap.of(1 -> Some(NonEmptyList.of(2 -> "2"))))(_._1) shouldBe SortedMap(2 -> Some(NonEmptyList.of(2 -> "2")))
}

it("rotate an assorted map") {
rotateMap(
NonEmptyMap.of(
1 -> Some(List(1 -> "1", 2 -> "2", 3 -> "3")),
2 -> Some(List(1 -> "1", 2 -> "2", 3 -> "3")),
1 -> Some(NonEmptyList.of(1 -> "1", 2 -> "2", 3 -> "3")),
2 -> Some(NonEmptyList.of(1 -> "1", 2 -> "2", 3 -> "3")),
3 -> None,
4 -> Some(List(1 -> "1", 2 -> "2", 3 -> "3"))
4 -> Some(NonEmptyList.of(1 -> "1", 2 -> "2", 3 -> "3"))
)
)(_._1) shouldBe
SortedMap(
1 -> List(1 -> "1", 1 -> "1", 1 -> "1"),
2 -> List(2 -> "2", 2 -> "2", 2 -> "2"),
3 -> List(3 -> "3", 3 -> "3", 3 -> "3")
1 -> Some(NonEmptyList.of(1 -> "1", 1 -> "1", 1 -> "1")),
2 -> Some(NonEmptyList.of(2 -> "2", 2 -> "2", 2 -> "2")),
3 -> Some(NonEmptyList.of(3 -> "3", 3 -> "3", 3 -> "3"))
)
}

Expand All @@ -65,13 +65,13 @@ class NemOListOpsSpec extends FunSpec with Matchers {
it("find a session from a nodes map") {
val nodes = Map(
Addr(ipv4"127.0.0.1".toInet4Address, 1000) -> Some(
List(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready))
NonEmptyList.of(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready))
),
Addr(ipv4"127.0.0.30".toInet4Address, 444) -> Some(
List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 456L, NodeState.Ready))
NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 456L, NodeState.Ready))
),
Addr(ipv4"127.0.0.5".toInet4Address, 88) -> Some(
List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 550L, NodeState.Ready))
NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 550L, NodeState.Ready))
)
)
findFirstElement(nodes) shouldBe Some(
Expand All @@ -81,10 +81,10 @@ class NemOListOpsSpec extends FunSpec with Matchers {

it("find a session from a nodes map with empty values") {
val nodes = Map(
Addr(ipv4"127.0.0.30".toInet4Address, 444) -> Some(List()),
Addr(ipv4"127.0.0.30".toInet4Address, 444) -> None,
Addr(ipv4"127.0.0.5".toInet4Address, 88) -> None,
Addr(ipv4"127.0.0.1".toInet4Address, 1000) -> Some(
List(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready))
NonEmptyList.of(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready))
)
)
findFirstElement(nodes) shouldBe Some(
Expand All @@ -94,7 +94,7 @@ class NemOListOpsSpec extends FunSpec with Matchers {

it("return None if there's no session") {
val nodes = Map(
Addr(ipv4"127.0.0.30".toInet4Address, 444) -> Some(List()),
Addr(ipv4"127.0.0.30".toInet4Address, 444) -> None,
Addr(ipv4"127.0.0.5".toInet4Address, 88) -> None
)
findFirstElement(nodes) shouldBe None
Expand All @@ -106,23 +106,23 @@ class NemOListOpsSpec extends FunSpec with Matchers {

val seedHosts = Map(
Addr(ipv4"127.0.0.1".toInet4Address, 1000) ->
List(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready)),
Some(NonEmptyList.of(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready))),
Addr(ipv4"127.0.0.30".toInet4Address, 444) ->
List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 456L, NodeState.Ready))
Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 456L, NodeState.Ready)))
)

val sameSessionHosts = Map(
Addr(ipv4"127.0.0.4".toInet4Address, 888) ->
List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 500L, NodeState.Ready)),
Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 500L, NodeState.Ready))),
Addr(ipv4"127.0.0.5".toInet4Address, 88) ->
List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 550L, NodeState.Ready))
Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 550L, NodeState.Ready)))
)

val otherSessionHosts = Map(
Addr(ipv4"127.0.0.14".toInet4Address, 888) ->
List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 222L, 500L, NodeState.Ready)),
Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 222L, 500L, NodeState.Ready))),
Addr(ipv4"127.0.0.15".toInet4Address, 88) ->
List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 222L, 550L, NodeState.Ready))
Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 222L, 550L, NodeState.Ready)))
)

it("node map is unchanged for the same clusterSession") {
Expand Down