From 97903f6ffd78f72556b7b3d9985274a9ff826b4f Mon Sep 17 00:00:00 2001 From: Gabriel Claramunt Date: Thu, 28 Nov 2024 12:42:28 -0300 Subject: [PATCH] chore: simplify code --- .../scala/org/constellation/lb/Manager.scala | 35 +++++++------- .../org/constellation/lb/NemOListOps.scala | 46 +++++++++++++------ .../constellation/lb/NemOListOpsSpec.scala | 44 +++++++++--------- 3 files changed, 74 insertions(+), 51 deletions(-) diff --git a/src/main/scala/org/constellation/lb/Manager.scala b/src/main/scala/org/constellation/lb/Manager.scala index 36bd825..871ee55 100644 --- a/src/main/scala/org/constellation/lb/Manager.scala +++ b/src/main/scala/org/constellation/lb/Manager.scala @@ -21,6 +21,7 @@ import org.http4s.server.Router import org.http4s.server.blaze.BlazeServerBuilder import NemOListOps._ +import scala.collection.immutable.SortedMap import scala.concurrent.ExecutionContext.global import scala.concurrent.duration._ import scala.language.postfixOps @@ -41,7 +42,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)( .resource private lazy val hostsRef = - Ref.unsafe[IO, NonEmptyMap[Addr, Option[List[Info]]]](init.map(addr => addr -> None).toNem) + Ref.unsafe[IO, NonEmptyMap[Addr, Option[NonEmptyList[Info]]]](init.map(addr => addr -> None).toNem) private lazy val sessionRef = Ref.unsafe[IO, Option[Long]](None) @@ -94,10 +95,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)( logger.warn( s"Evicted nodes not in cluster session ${clusterSession}: ${otherSessionNodes.keySet} " ) - } >> { - val entries = sameSessionNodes.toList.map { case (k, v) => (k, Option(v)) } - IO.pure(NonEmptyList.fromListUnsafe(entries).toNem) - } + } >> IO.pure(NonEmptyMap.fromMapUnsafe(SortedMap.from(sameSessionNodes))) } case _ => logger.warn( @@ -113,7 +111,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)( case (activeHosts, inactiveHosts) => val clusterHosts = (activeHosts ++ inactiveHosts) // TODO: Add Eviction of inactive hosts .filterNot(addr => status.contains(addr)) - .foldLeft(status)((acc, addr) => acc.add(addr, Option.empty[List[Info]])) + .foldLeft(status)((acc, addr) => acc.add(addr, None)) hostsRef.set(clusterHosts).flatTap(_ => updateLbSetup(activeHosts)) } @@ -204,12 +202,12 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)( private val settingsRoutes = HttpRoutes.of[IO] { case POST -> Root / "maintenance" => - enableMaintenanceMode.flatMap(_ => Ok()) + enableMaintenanceMode >> Ok() case DELETE -> Root / "maintenance" => - disableMaintenanceMode.flatMap(_ => Ok()) + disableMaintenanceMode >> Ok() } - def buildClusterStatus(init: NonEmptyMap[Addr, Option[List[Info]]]): IO[(Set[Addr], Set[Addr])] = { + def buildClusterStatus(init: NonEmptyMap[Addr, Option[NonEmptyList[Info]]]): IO[(Set[Addr], Set[Addr])] = { val tresholdLevel = init.keys.size / 2 def isActive(addr: Addr, proof: List[Info]) = @@ -218,7 +216,8 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)( IO { val (active, other) = rotateMap(init)(toAddress).toList .partition { - case (addr: Addr, proof: List[Info]) => isActive(addr, proof) + case (addr, Some(proof)) => isActive(addr, proof.toList) + case _ => false } active.map(_._1).toSet -> other.map(_._1).toSet @@ -230,7 +229,7 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)( def getClusterInfo( hosts: NonEmptySet[Addr] - )(implicit client: Client[IO]): IO[NonEmptyMap[Addr, Option[List[Info]]]] = { + )(implicit client: Client[IO]): IO[NonEmptyMap[Addr, Option[NonEmptyList[Info]]]] = { IO.apply(logger.info(s"Fetch cluster status from following ${hosts.size} hosts: ${hosts.toList.take(5)}")) .flatMap( _ => @@ -238,17 +237,21 @@ class Manager(init: NonEmptyList[Addr], config: LoadbalancerConfig)( .map( addr => node(addr).getInfo - .flatMap( - result => + .flatMap { + case result @ x :: xs => logger .debug(s"Node $addr returned $result") - .map(_ => addr -> Option(result)) - ) + .map(_ => addr -> Some(NonEmptyList(x, xs))) + case Nil => + logger + .warn(s"Node $addr returned empty node list") + .map(_ => addr -> None) + } .recoverWith { case error => logger .info(s"Cannot retrieve cluster info from addr=$addr error=$error") - .map(_ => addr -> Option.empty[List[Info]]) + .map(_ => addr -> None) } ) .parSequence diff --git a/src/main/scala/org/constellation/lb/NemOListOps.scala b/src/main/scala/org/constellation/lb/NemOListOps.scala index 4a45f62..6623725 100644 --- a/src/main/scala/org/constellation/lb/NemOListOps.scala +++ b/src/main/scala/org/constellation/lb/NemOListOps.scala @@ -1,6 +1,6 @@ package org.constellation.lb -import cats.data.{NonEmptyMap, NonEmptySet} +import cats.data.{NonEmptyList, NonEmptyMap, NonEmptySet} import scala.collection.immutable.{SortedMap, SortedSet} @@ -14,7 +14,7 @@ object NemOListOps { * @tparam V * @return new keys */ - def extractNewKeys[K: Ordering, V](nem: NonEmptyMap[K, Option[List[V]]])(v2k: V => K): SortedSet[K] = + def extractNewKeys[K: Ordering, V](nem: NonEmptyMap[K, Option[NonEmptyList[V]]])(v2k: V => K): SortedSet[K] = nem.toNel.foldLeft(SortedSet.empty[K])( (acc, bcc) => bcc match { @@ -31,8 +31,10 @@ object NemOListOps { * @tparam V * @return (map of elements that satisfy the property, map of elements that don't) */ - def splitByElementProp[K, V](map: Map[K, List[V]])(prop: V => Boolean): (Map[K, List[V]], Map[K, List[V]]) = - map.partition { case (_, v) => v.forall(prop) } + def splitByElementProp[K, V]( + map: Map[K, Option[NonEmptyList[V]]] + )(prop: V => Boolean): (Map[K, Option[NonEmptyList[V]]], Map[K, Option[NonEmptyList[V]]]) = + map.partition { case (_, v) => v.exists(_.forall(prop)) } /** * Finds the first element of the lists in the map that satisfies a property @@ -41,8 +43,8 @@ object NemOListOps { * @tparam V * @return first element wrapped in Some if found or None */ - def findFirstElement[K, V](map: Map[K, Option[List[V]]]): Option[V] = { - map.collectFirst { case (_, Some(e :: _)) => e } + def findFirstElement[K, V](map: Map[K, Option[NonEmptyList[V]]]): Option[V] = { + map.collectFirst { case (_, Some(e)) => e.head } } /** @@ -57,11 +59,6 @@ object NemOListOps { def filterKeys[K, V](validKeys: NonEmptySet[K], map: NonEmptyMap[K, V]): SortedMap[K, V] = map.toSortedMap.filter { case (k, _) => validKeys.contains(k) } - /** - * Builds a peer map grouping the peer info by each address - * @param clusterInfo cluster info - * @return map by address of peers - */ /** * "matrix-like" rotation of the map, creating a new map based regrouping by the keys extracted from the entry lists. * @param map map of option lists @@ -70,12 +67,35 @@ object NemOListOps { * @tparam V * @return map with lists from entries. */ - def rotateMap[K, V](map: NonEmptyMap[K, Option[List[V]]])(v2k: V => K): Map[K, List[V]] = + def rotateMap[K, V](map: NonEmptyMap[K, Option[NonEmptyList[V]]])(v2k: V => K): Map[K, Option[NonEmptyList[V]]] = map.toNel .collect { - case (_, Some(el)) => el + case (_, Some(el)) => el.toList } .flatten .groupBy(v2k) + .map { case (k, vs) => k -> list2oNel(vs) } + + /** + * Convert a option non empty list to list + * @param l list of t + * @tparam T element type + * @return option non empty list + */ + def oNel2List[T](onel: Option[NonEmptyList[T]]): List[T] = onel match { // onel.fold(List[T]())(_.toList) + case Some(nel) => nel.toList + case _ => List() + } + + /** + * Convert a list to option non empty list + * @param l list of t + * @tparam T element type + * @return option non empty list + */ + def list2oNel[T](l: List[T]): Option[NonEmptyList[T]] = l match { + case x :: xs => Some(NonEmptyList(x, xs)) + case _ => None + } } diff --git a/src/test/scala/org/constellation/lb/NemOListOpsSpec.scala b/src/test/scala/org/constellation/lb/NemOListOpsSpec.scala index c4a1529..c719952 100644 --- a/src/test/scala/org/constellation/lb/NemOListOpsSpec.scala +++ b/src/test/scala/org/constellation/lb/NemOListOpsSpec.scala @@ -1,6 +1,6 @@ package org.constellation.lb -import cats.data.{NonEmptyMap, NonEmptySet} +import cats.data.{NonEmptyList, NonEmptyMap, NonEmptySet} import cats.implicits.catsKernelStdOrderForInt import org.constellation.lb.NemOListOps._ import org.constellation.primitives.node.{Addr, Id, Info, NodeState} @@ -14,14 +14,14 @@ class NemOListOpsSpec extends FunSpec with Matchers { describe("extractNewKeys") { it("extract new keys") { - extractNewKeys(NonEmptyMap.of(1 -> Some(List(1, 3, 4)), 2 -> Some(List(1, 3, 5))))(identity) shouldBe SortedSet( + extractNewKeys(NonEmptyMap.of(1 -> Some(NonEmptyList.of(1, 3, 4)), 2 -> Some(NonEmptyList.of(1, 3, 5))))(identity) shouldBe SortedSet( 3, 4, 5 ) } it("no new keys") { - extractNewKeys(NonEmptyMap.of(1 -> Some(List(1, 3)), 2 -> Some(List(2, 1)), 3 -> None))(identity) shouldBe empty + extractNewKeys(NonEmptyMap.of(1 -> Some(NonEmptyList.of(1, 3)), 2 -> Some(NonEmptyList.of(2, 1)), 3 -> None))(identity) shouldBe empty } } @@ -40,22 +40,22 @@ class NemOListOpsSpec extends FunSpec with Matchers { } it("rotate of unit map") { - rotateMap(NonEmptyMap.of(1 -> Some(List(2 -> "2"))))(_._1) shouldBe SortedMap(2 -> List(2 -> "2")) + rotateMap(NonEmptyMap.of(1 -> Some(NonEmptyList.of(2 -> "2"))))(_._1) shouldBe SortedMap(2 -> Some(NonEmptyList.of(2 -> "2"))) } it("rotate an assorted map") { rotateMap( NonEmptyMap.of( - 1 -> Some(List(1 -> "1", 2 -> "2", 3 -> "3")), - 2 -> Some(List(1 -> "1", 2 -> "2", 3 -> "3")), + 1 -> Some(NonEmptyList.of(1 -> "1", 2 -> "2", 3 -> "3")), + 2 -> Some(NonEmptyList.of(1 -> "1", 2 -> "2", 3 -> "3")), 3 -> None, - 4 -> Some(List(1 -> "1", 2 -> "2", 3 -> "3")) + 4 -> Some(NonEmptyList.of(1 -> "1", 2 -> "2", 3 -> "3")) ) )(_._1) shouldBe SortedMap( - 1 -> List(1 -> "1", 1 -> "1", 1 -> "1"), - 2 -> List(2 -> "2", 2 -> "2", 2 -> "2"), - 3 -> List(3 -> "3", 3 -> "3", 3 -> "3") + 1 -> Some(NonEmptyList.of(1 -> "1", 1 -> "1", 1 -> "1")), + 2 -> Some(NonEmptyList.of(2 -> "2", 2 -> "2", 2 -> "2")), + 3 -> Some(NonEmptyList.of(3 -> "3", 3 -> "3", 3 -> "3")) ) } @@ -65,13 +65,13 @@ class NemOListOpsSpec extends FunSpec with Matchers { it("find a session from a nodes map") { val nodes = Map( Addr(ipv4"127.0.0.1".toInet4Address, 1000) -> Some( - List(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready)) + NonEmptyList.of(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready)) ), Addr(ipv4"127.0.0.30".toInet4Address, 444) -> Some( - List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 456L, NodeState.Ready)) + NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 456L, NodeState.Ready)) ), Addr(ipv4"127.0.0.5".toInet4Address, 88) -> Some( - List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 550L, NodeState.Ready)) + NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 550L, NodeState.Ready)) ) ) findFirstElement(nodes) shouldBe Some( @@ -81,10 +81,10 @@ class NemOListOpsSpec extends FunSpec with Matchers { it("find a session from a nodes map with empty values") { val nodes = Map( - Addr(ipv4"127.0.0.30".toInet4Address, 444) -> Some(List()), + Addr(ipv4"127.0.0.30".toInet4Address, 444) -> None, Addr(ipv4"127.0.0.5".toInet4Address, 88) -> None, Addr(ipv4"127.0.0.1".toInet4Address, 1000) -> Some( - List(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready)) + NonEmptyList.of(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready)) ) ) findFirstElement(nodes) shouldBe Some( @@ -94,7 +94,7 @@ class NemOListOpsSpec extends FunSpec with Matchers { it("return None if there's no session") { val nodes = Map( - Addr(ipv4"127.0.0.30".toInet4Address, 444) -> Some(List()), + Addr(ipv4"127.0.0.30".toInet4Address, 444) -> None, Addr(ipv4"127.0.0.5".toInet4Address, 88) -> None ) findFirstElement(nodes) shouldBe None @@ -106,23 +106,23 @@ class NemOListOpsSpec extends FunSpec with Matchers { val seedHosts = Map( Addr(ipv4"127.0.0.1".toInet4Address, 1000) -> - List(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready)), + Some(NonEmptyList.of(Info(Id("node-1"), InetAddress.getLoopbackAddress, 9997, 9998, 111L, 123L, NodeState.Ready))), Addr(ipv4"127.0.0.30".toInet4Address, 444) -> - List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 456L, NodeState.Ready)) + Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 456L, NodeState.Ready))) ) val sameSessionHosts = Map( Addr(ipv4"127.0.0.4".toInet4Address, 888) -> - List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 500L, NodeState.Ready)), + Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 500L, NodeState.Ready))), Addr(ipv4"127.0.0.5".toInet4Address, 88) -> - List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 550L, NodeState.Ready)) + Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 111L, 550L, NodeState.Ready))) ) val otherSessionHosts = Map( Addr(ipv4"127.0.0.14".toInet4Address, 888) -> - List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 222L, 500L, NodeState.Ready)), + Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 222L, 500L, NodeState.Ready))), Addr(ipv4"127.0.0.15".toInet4Address, 88) -> - List(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 222L, 550L, NodeState.Ready)) + Some(NonEmptyList.of(Info(Id("node-2"), InetAddress.getLoopbackAddress, 9999, 10000, 222L, 550L, NodeState.Ready))) ) it("node map is unchanged for the same clusterSession") {