diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index bb29d7f0d5..beb2296d37 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -227,6 +227,7 @@ eclair { channel-exclude-duration = 60 seconds // when a temporary channel failure is returned, we exclude the channel from our payment routes for this duration broadcast-interval = 60 seconds // see BOLT #7 init-timeout = 5 minutes + balance-estimate-half-life = 1 day // time after which the confidence of the balance estimate is halved sync { request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index ba73517dbf..77f0ea6a17 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -492,7 +492,8 @@ object NodeParams extends Logging { encodingType = EncodingType.UNCOMPRESSED, channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"), channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"), - pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments")) + pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments")), + balanceEstimateHalfLife = FiniteDuration(config.getDuration("router.balance-estimate-half-life").getSeconds, TimeUnit.SECONDS), ), socksProxy_opt = socksProxy_opt, maxPaymentAttempts = config.getInt("max-payment-attempts"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala index 249b2e9616..f9f0e7f1a3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala @@ -102,6 +102,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A handleLocalFail(d, DisconnectedException, isFatal = false) case Event(RES_ADD_SETTLED(_, htlc, fulfill: HtlcResult.Fulfill), d: WaitingForComplete) => + router ! Router.RouteDidRelay(d.route) Metrics.PaymentAttempt.withTag(Tags.MultiPart, value = false).record(d.failures.size + 1) val p = PartialPayment(id, d.c.finalPayload.amount, d.cmd.amount - d.c.finalPayload.amount, htlc.channelId, Some(cfg.fullRoute(d.route))) myStop(d.c, Right(cfg.createPaymentSent(fulfill.paymentPreimage, p :: Nil))) @@ -198,41 +199,57 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A log.warning(s"cannot parse returned error: ${t.getMessage}, route=${route.printNodes()}") val failure = UnreadableRemoteFailure(d.c.finalPayload.amount, cfg.fullRoute(route)) retry(failure, d) - case Success(e@Sphinx.DecryptedFailurePacket(nodeId, failureMessage: Node)) => - log.info(s"received 'Node' type error message from nodeId=$nodeId, trying to route around it (failure=$failureMessage)") - val failure = RemoteFailure(d.c.finalPayload.amount, cfg.fullRoute(route), e) - retry(failure, d) - case Success(e@Sphinx.DecryptedFailurePacket(nodeId, failureMessage: Update)) => - log.info(s"received 'Update' type error message from nodeId=$nodeId, retrying payment (failure=$failureMessage)") - val failure = RemoteFailure(d.c.finalPayload.amount, cfg.fullRoute(route), e) - if (Announcements.checkSig(failureMessage.update, nodeId)) { - val assistedRoutes1 = handleUpdate(nodeId, failureMessage, d) - val ignore1 = PaymentFailure.updateIgnored(failure, ignore) - // let's try again, router will have updated its state - c match { - case _: SendPaymentToRoute => - log.error("unexpected retry during SendPaymentToRoute") - stop(FSM.Normal) - case c: SendPaymentToNode => - router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.maxFee, assistedRoutes1, ignore1, c.routeParams, paymentContext = Some(cfg.paymentContext)) - goto(WAITING_FOR_ROUTE) using WaitingForRoute(c, failures :+ failure, ignore1) - } - } else { - // this node is fishy, it gave us a bad sig!! let's filter it out - log.warning(s"got bad signature from node=$nodeId update=${failureMessage.update}") - c match { - case _: SendPaymentToRoute => - log.error("unexpected retry during SendPaymentToRoute") - stop(FSM.Normal) - case c: SendPaymentToNode => - router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.maxFee, c.assistedRoutes, ignore + nodeId, c.routeParams, paymentContext = Some(cfg.paymentContext)) - goto(WAITING_FOR_ROUTE) using WaitingForRoute(c, failures :+ failure, ignore + nodeId) - } - } case Success(e@Sphinx.DecryptedFailurePacket(nodeId, failureMessage)) => - log.info(s"received an error message from nodeId=$nodeId, trying to use a different channel (failure=$failureMessage)") - val failure = RemoteFailure(d.c.finalPayload.amount, cfg.fullRoute(route), e) - retry(failure, d) + // We have discovered some liquidity information with this payment: we update the router accordingly. + val stoppedRoute = d.route.stopAt(nodeId) + if (stoppedRoute.hops.length > 1) { + router ! Router.RouteCouldRelay(stoppedRoute) + } + failureMessage match { + case TemporaryChannelFailure(update) => + d.route.hops.find(_.nodeId == nodeId) match { + case Some(failingHop) if Announcements.areSame(failingHop.lastUpdate, update) => router ! Router.ChannelCouldNotRelay(stoppedRoute.amount, failingHop) + case _ => // otherwise the relay parameters may have changed, so it's not necessarily a liquidity issue + } + case _ => // other errors should not be used for liquidity issues + } + failureMessage match { + case failureMessage: Node => + log.info(s"received 'Node' type error message from nodeId=$nodeId, trying to route around it (failure=$failureMessage)") + val failure = RemoteFailure(d.c.finalPayload.amount, cfg.fullRoute(route), e) + retry(failure, d) + case failureMessage: Update => + log.info(s"received 'Update' type error message from nodeId=$nodeId, retrying payment (failure=$failureMessage)") + val failure = RemoteFailure(d.c.finalPayload.amount, cfg.fullRoute(route), e) + if (Announcements.checkSig(failureMessage.update, nodeId)) { + val assistedRoutes1 = handleUpdate(nodeId, failureMessage, d) + val ignore1 = PaymentFailure.updateIgnored(failure, ignore) + // let's try again, router will have updated its state + c match { + case _: SendPaymentToRoute => + log.error("unexpected retry during SendPaymentToRoute") + stop(FSM.Normal) + case c: SendPaymentToNode => + router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.maxFee, assistedRoutes1, ignore1, c.routeParams, paymentContext = Some(cfg.paymentContext)) + goto(WAITING_FOR_ROUTE) using WaitingForRoute(c, failures :+ failure, ignore1) + } + } else { + // this node is fishy, it gave us a bad sig!! let's filter it out + log.warning(s"got bad signature from node=$nodeId update=${failureMessage.update}") + c match { + case _: SendPaymentToRoute => + log.error("unexpected retry during SendPaymentToRoute") + stop(FSM.Normal) + case c: SendPaymentToNode => + router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.maxFee, c.assistedRoutes, ignore + nodeId, c.routeParams, paymentContext = Some(cfg.paymentContext)) + goto(WAITING_FOR_ROUTE) using WaitingForRoute(c, failures :+ failure, ignore + nodeId) + } + } + case failureMessage => + log.info(s"received an error message from nodeId=$nodeId, trying to use a different channel (failure=$failureMessage)") + val failure = RemoteFailure(d.c.finalPayload.amount, cfg.fullRoute(route), e) + retry(failure, d) + } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala index d343fb6bc3..0c9f26b0a9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala @@ -96,7 +96,8 @@ object EclairInternalsSerializer { .typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) :: ("channelRangeChunkSize" | int32) :: ("channelQueryChunkSize" | int32) :: - ("pathFindingExperimentConf" | pathFindingExperimentConfCodec)).as[RouterConf] + ("pathFindingExperimentConf" | pathFindingExperimentConfCodec) :: + ("balanceEstimateHalfLife" | finiteDurationCodec)).as[RouterConf] val overrideFeaturesListCodec: Codec[List[(PublicKey, Features[Feature])]] = listOfN(uint16, publicKey ~ variableSizeBytes(uint16, featuresCodec)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala new file mode 100644 index 0000000000..bef25fcad0 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/BalanceEstimate.scala @@ -0,0 +1,290 @@ +/* + * Copyright 2021 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.router + +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.{LexicographicalOrdering, Satoshi} +import fr.acinq.eclair.MilliSatoshi.toMilliSatoshi +import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge} +import fr.acinq.eclair.router.Router.{ChannelHop, PublicChannel} +import fr.acinq.eclair.{MilliSatoshi, TimestampSecond} + +import scala.concurrent.duration.FiniteDuration + +/** + * Estimates the balance between a pair of nodes + * + * @param low lower bound on the balance + * @param lowTimestamp time at which the lower bound was known to be correct + * @param high upper bound on the balance + * @param highTimestamp time at which the upper bound was known to be correct + * @param totalCapacity total capacity of all the channels between the pair of nodes + * @param halfLife time after which the certainty of the lower/upper bounds is halved + */ +case class BalanceEstimate private(low: MilliSatoshi, + lowTimestamp: TimestampSecond, + high: MilliSatoshi, highTimestamp: TimestampSecond, + totalCapacity: Satoshi, + halfLife: FiniteDuration) { + + /* The goal of this class is to estimate the probability that a given edge can relay the amount that we plan to send + * through it. We model this probability with 3 pieces of linear functions. + * + * Without any information we use the following baseline (x is the amount we're sending and y the probability it can be relayed): + * + * 1 |**** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * | **** + * 0 +------------------------------------------------**** + * 0 capacity + * + * If we get the information that the edge can (or can't) relay a given amount (because we tried), then we get a lower + * bound (or upper bound) that we can use and our model becomes: + * + * 1 |*************** + * | |* + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * | | * + * 0 +--------------|-----------|************************* + * 0 low high capacity + * + * However this lower bound (or upper bound) is only valid at the moment we got that information. If we wait, the + * information decays and we slowly go back towards our baseline: + * + * 1 |***** + * | ***** + * | ***** + * | |** + * | | * + * | | ** + * | | ** + * | | * + * | | ** + * | | * + * | | ******** + * | | | ********* + * 0 +--------------|-----------|----------------********* + * 0 low high capacity + */ + + /** + * We model the decay with a half-life H: every H units of time, our confidence decreases by half and our estimated + * probability distribution gets closer to the baseline uniform distribution of balances between 0 and totalCapacity. + * + * @param amount the amount that we knew we could send or not send at time t + * @param successProbabilityAtT probability that we could relay amount at time t (usually 0 or 1) + * @param t time at which we knew if we could or couldn't send amount + * @return the probability that we can send amount now + */ + private def decay(amount: MilliSatoshi, successProbabilityAtT: Double, t: TimestampSecond): Double = { + val decayRatio = 1 / math.pow(2, (TimestampSecond.now() - t) / halfLife) + val baseline = 1 - amount.toLong.toDouble / toMilliSatoshi(totalCapacity).toLong.toDouble + baseline * (1 - decayRatio) + successProbabilityAtT * decayRatio + } + + def otherSide: BalanceEstimate = + BalanceEstimate(totalCapacity - high, highTimestamp, totalCapacity - low, lowTimestamp, totalCapacity, halfLife) + + def couldNotSend(amount: MilliSatoshi, timestamp: TimestampSecond): BalanceEstimate = { + if (amount <= low) { + // the balance is actually below `low`, we discard our previous lower bound + copy(low = MilliSatoshi(0), lowTimestamp = timestamp, high = amount, highTimestamp = timestamp) + } else if (amount < high) { + // the balance is actually below `high`, we discard our previous higher bound + copy(high = amount, highTimestamp = timestamp) + } else { + // We already expected not to be able to relay that amount as it above our upper bound. However if the upper bound + // was old enough that replacing it with the current amount decreases the success probability for `high`, then we + // replace it. + val updated = copy(high = amount, highTimestamp = timestamp) + if (updated.canSend(high) < this.canSend(high)) { + updated + } else { + this + } + } + } + + def couldSend(amount: MilliSatoshi, timestamp: TimestampSecond): BalanceEstimate = + otherSide.couldNotSend(totalCapacity - amount, timestamp).otherSide + + def didSend(amount: MilliSatoshi, timestamp: TimestampSecond): BalanceEstimate = { + val newLow = (low - amount) max MilliSatoshi(0) + val newHigh = (high - amount) max MilliSatoshi(0) + // We could shift everything left by amount without changing the timestamps (a) but we may get more information by + // ignoring the old high (b) if if has decayed too much. We try both and choose the one that gives the lowest + // probability for high. + val a = copy(low = newLow, high = newHigh) + val b = copy(low = newLow, high = (totalCapacity - amount) max MilliSatoshi(0), highTimestamp = timestamp) + if (a.canSend(newHigh) < b.canSend(newHigh)) { + a + } else { + b + } + } + + def addChannel(capacity: Satoshi): BalanceEstimate = + copy(high = high + toMilliSatoshi(capacity), totalCapacity = totalCapacity + capacity) + + def removeChannel(capacity: Satoshi): BalanceEstimate = + copy(low = (low - toMilliSatoshi(capacity)) max MilliSatoshi(0), high = high min toMilliSatoshi(totalCapacity - capacity), totalCapacity = totalCapacity - capacity) + + /** + * Estimate the probability that we can successfully send `amount` through the channel + * + * We estimate this probability with a piecewise linear function: + * - probability that it can relay a payment of 0 is 1 + * - probability that it can relay a payment of low is decay(low, 1, lowTimestamp) which is close to 1 if lowTimestamp is recent + * - probability that it can relay a payment of high is decay(high, 0, highTimestamp) which is close to 0 if highTimestamp is recent + * - probability that it can relay a payment of totalCapacity is 0 + */ + def canSend(amount: MilliSatoshi): Double = { + val a = amount.toLong.toDouble + val l = low.toLong.toDouble + val h = high.toLong.toDouble + val c = toMilliSatoshi(totalCapacity).toLong.toDouble + + // Success probability at the low and high points + val pLow = decay(low, 1, lowTimestamp) + val pHigh = decay(high, 0, highTimestamp) + + if (amount < low) { + (l - a * (1.0 - pLow)) / l + } else if (amount <= high) { + ((h - a) * pLow + (a - l) * pHigh) / (h - l) + } else { + ((c - a) * pHigh) / (c - h) + } + } +} + +object BalanceEstimate { + def baseline(capacity: Satoshi, halfLife: FiniteDuration): BalanceEstimate = + BalanceEstimate(MilliSatoshi(0), TimestampSecond(0), MilliSatoshi(0), TimestampSecond(0), Satoshi(0), halfLife) + .addChannel(capacity) +} + +/** A pair of nodes, lexicographically ordered. */ +case class OrderedNodePair private(node1: PublicKey, node2: PublicKey) + +object OrderedNodePair { + def create(a: PublicKey, b: PublicKey): OrderedNodePair = + if (LexicographicalOrdering.isLessThan(a.value, b.value)) OrderedNodePair(a, b) + else OrderedNodePair(b, a) +} + +/** + * Balance estimates for the whole routing graph. + * Balance estimates are symmetrical: we can compute the balance estimate b -> a based on the balance estimate a -> b, + * so we only store it for one direction. + */ +case class BalancesEstimates(balances: Map[OrderedNodePair, BalanceEstimate], defaultHalfLife: FiniteDuration) { + private def get(a: PublicKey, b: PublicKey): Option[BalanceEstimate] = { + val nodePair = OrderedNodePair.create(a, b) + if (nodePair.node1 == a) { + balances.get(nodePair) + } else { + balances.get(nodePair).map(_.otherSide) + } + } + + def get(edge: GraphEdge): BalanceEstimate = + get(edge.desc.a, edge.desc.b).getOrElse(BalanceEstimate.baseline(edge.capacity, defaultHalfLife)) + + def addChannel(channel: PublicChannel): BalancesEstimates = + BalancesEstimates( + balances.updatedWith(OrderedNodePair.create(channel.ann.nodeId1, channel.ann.nodeId2)) { + case None => Some(BalanceEstimate.baseline(channel.capacity, defaultHalfLife)) + case Some(balance) => Some(balance) + }, + defaultHalfLife) + + def removeChannel(channel: PublicChannel): BalancesEstimates = + BalancesEstimates( + balances.updatedWith(OrderedNodePair.create(channel.ann.nodeId1, channel.ann.nodeId2)) { + case None => None + case Some(balance) => + val newBalance = balance.removeChannel(channel.capacity) + if (newBalance.totalCapacity.toLong > 0) { + Some(newBalance) + } else { + None + } + }, defaultHalfLife) + + private def channelXSend(x: (BalanceEstimate, MilliSatoshi, TimestampSecond) => BalanceEstimate, hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = { + val nodePair = OrderedNodePair(hop.nodeId, hop.nextNodeId) + if (nodePair.node1 == hop.nodeId) { + BalancesEstimates(balances.updatedWith(nodePair)(_.map(b => x(b, amount, TimestampSecond.now()))), defaultHalfLife) + } else { + BalancesEstimates(balances.updatedWith(nodePair)(_.map(b => x(b.otherSide, amount, TimestampSecond.now()).otherSide)), defaultHalfLife) + } + } + + def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = { + get(hop.nodeId, hop.nextNodeId).foreach { balance => + val estimatedProbability = balance.canSend(amount) + Monitoring.Metrics.remoteEdgeRelaySuccess(estimatedProbability) + } + channelXSend(_.couldSend(_, _), hop, amount) + } + + def channelCouldNotSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = { + get(hop.nodeId, hop.nextNodeId).foreach { balance => + val estimatedProbability = balance.canSend(amount) + Monitoring.Metrics.remoteEdgeRelayFailure(estimatedProbability) + } + channelXSend(_.couldNotSend(_, _), hop, amount) + } + + def channelDidSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = { + get(hop.nodeId, hop.nextNodeId).foreach { balance => + val estimatedProbability = balance.canSend(amount) + Monitoring.Metrics.remoteEdgeRelaySuccess(estimatedProbability) + } + channelXSend(_.didSend(_, _), hop, amount) + } + +} + +object BalancesEstimates { + def baseline(graph: DirectedGraph, defaultHalfLife: FiniteDuration): BalancesEstimates = BalancesEstimates( + graph.edgeSet().foldLeft[Map[OrderedNodePair, BalanceEstimate]](Map.empty) { + case (m, edge) => m.updatedWith(OrderedNodePair.create(edge.desc.a, edge.desc.b)) { + case None => Some(BalanceEstimate.baseline(edge.capacity, defaultHalfLife)) + case Some(balance) => Some(balance.addChannel(edge.capacity)) + } + }, + defaultHalfLife) +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala index bd0e76b068..5daf5383ee 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala @@ -66,6 +66,12 @@ object Monitoring { case _: GossipDecision.Accepted => GossipResult.withTag("result", "accepted") case rejected: GossipDecision.Rejected => GossipResult.withTag("result", "rejected").withTag("reason", getSimpleClassName(rejected)) } + + private val RelayProbabilityEstimate = Kamon.histogram("router.balance-estimates.remote-edge-relay", "Estimated probability (in percent) that the relay will be successful") + + def remoteEdgeRelaySuccess(estimatedProbability: Double) = RelayProbabilityEstimate.withTag("success", (estimatedProbability * 100).toLong) + + def remoteEdgeRelayFailure(estimatedProbability: Double) = RelayProbabilityEstimate.withTag("failure", (estimatedProbability * 100).toLong) } object Tags { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 7a6f1fcc4e..6364b0f7f9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -100,7 +100,16 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm log.info(s"initialization completed, ready to process messages") Try(initialized.map(_.success(Done))) - startWith(NORMAL, Data(initNodes, initChannels, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty)) + startWith(NORMAL, Data( + initNodes, initChannels, + Stash(Map.empty, Map.empty), + rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), + awaiting = Map.empty, + privateChannels = Map.empty, + excludedChannels = Set.empty, + graph, + balances = BalancesEstimates.baseline(graph, nodeParams.routerConf.balanceEstimateHalfLife), + sync = Map.empty)) } when(NORMAL) { @@ -254,6 +263,22 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyShortChannelIdsEnd), d) => stay() using Sync.handleReplyShortChannelIdsEnd(d, RemoteGossip(peerConnection, remoteNodeId), r) + case Event(RouteCouldRelay(route), d) => + val (balances1, _) = route.hops.foldRight((d.balances, route.amount)) { + case (hop, (balances, amount)) => + (balances.channelCouldSend(hop, amount) , amount + hop.fee(amount)) + } + stay() using d.copy(balances = balances1) + + case Event(RouteDidRelay(route), d) => + val (balances1, _) = route.hops.foldRight((d.balances, route.amount)) { + case (hop, (balances, amount)) => + (balances.channelDidSend(hop, amount) , amount + hop.fee(amount)) + } + stay() using d.copy(balances = balances1) + + case Event(ChannelCouldNotRelay(amount, hop), d) => + stay() using d.copy(balances = d.balances.channelCouldNotSend(hop, amount)) } initialize() @@ -304,7 +329,8 @@ object Router { encodingType: EncodingType, channelRangeChunkSize: Int, channelQueryChunkSize: Int, - pathFindingExperimentConf: PathFindingExperimentConf) { + pathFindingExperimentConf: PathFindingExperimentConf, + balanceEstimateHalfLife: FiniteDuration) { require(channelRangeChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel range chunk size exceeds the size of a lightning message") require(channelQueryChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel query chunk size exceeds the size of a lightning message") } @@ -490,6 +516,10 @@ object Router { def printChannels(): String = hops.map(_.lastUpdate.shortChannelId).mkString("->") + def stopAt(nodeId: PublicKey): Route = { + val amountAtStop = hops.reverse.takeWhile(_.nextNodeId != nodeId).foldLeft(amount) { case (amount1, hop) => amount1 + hop.fee(amount1) } + Route(amountAtStop, hops.takeWhile(_.nodeId != nodeId)) + } } case class RouteResponse(routes: Seq[Route]) { @@ -579,6 +609,7 @@ object Router { privateChannels: Map[ShortChannelId, PrivateChannel], excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graph: DirectedGraph, + balances: BalancesEstimates, sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message ) @@ -603,4 +634,13 @@ object Router { def isRelatedTo(c: ChannelAnnouncement, nodeId: PublicKey) = nodeId == c.nodeId1 || nodeId == c.nodeId2 def hasChannels(nodeId: PublicKey, channels: Iterable[PublicChannel]): Boolean = channels.exists(c => isRelatedTo(c.ann, nodeId)) + + /** We know that this route could relay because we have tried it but the payment was eventually cancelled */ + case class RouteCouldRelay(route: Route) + + /** We have relayed using this route. */ + case class RouteDidRelay(route: Route) + + /** We have tried to relay this amount from this channel and it failed. */ + case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index 5f4d66c73c..4f9d5475a7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -146,12 +146,14 @@ object Validation { case Some(pc) => // note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update // right after the channel_announcement, channel_updates will be moved from private to public at that time + val balances1 = d0.balances.addChannel(pc) val d1 = d0.copy( channels = d0.channels + (c.shortChannelId -> pc), privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue stash = stash1, - awaiting = awaiting1) + awaiting = awaiting1, + balances = balances1) // we only reprocess updates and nodes if validation succeeded val d2 = reprocessUpdates.foldLeft(d1) { case (d, (u, origins)) => Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, origins)), wasStashed = true) @@ -170,7 +172,8 @@ object Validation { def handleChannelSpent(d: Data, db: NetworkDb, shortChannelId: ShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors - val lostChannel = d.channels(shortChannelId).ann + val lostPublicChannel = d.channels(shortChannelId) + val lostChannel = lostPublicChannel.ann log.info("funding tx of channelId={} has been spent", shortChannelId) // we need to remove nodes that aren't tied to any channels anymore val channels1 = d.channels - lostChannel.shortChannelId @@ -182,6 +185,7 @@ object Validation { val graph1 = d.graph .removeEdge(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2)) .removeEdge(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId2, lostChannel.nodeId1)) + val balances1 = d.balances.removeChannel(lostPublicChannel) ctx.system.eventStream.publish(ChannelLost(shortChannelId)) lostNodes.foreach { @@ -190,7 +194,7 @@ object Validation { db.removeNode(nodeId) ctx.system.eventStream.publish(NodeLost(nodeId)) } - d.copy(nodes = d.nodes -- lostNodes, channels = d.channels - shortChannelId, graph = graph1) + d.copy(nodes = d.nodes -- lostNodes, channels = d.channels - shortChannelId, graph = graph1, balances = balances1) } def handleNodeAnnouncement(d: Data, db: NetworkDb, origins: Set[GossipOrigin], n: NodeAnnouncement, wasStashed: Boolean = false)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index b77d9fa3af..d13a320ce2 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -185,7 +185,8 @@ object TestConstants { maxParts = 10, ), experimentName = "alice-test-experiment", - experimentPercentage = 100))) + experimentPercentage = 100))), + balanceEstimateHalfLife = 1 day, ), socksProxy_opt = None, maxPaymentAttempts = 5, @@ -323,7 +324,8 @@ object TestConstants { maxParts = 10, ), experimentName = "bob-test-experiment", - experimentPercentage = 100))) + experimentPercentage = 100))), + balanceEstimateHalfLife = 1 day, ), socksProxy_opt = None, maxPaymentAttempts = 5, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala index 1ae69f9230..5a38c2234e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala @@ -450,6 +450,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec { sender.send(paymentFSM, addCompleted(HtlcResult.RemoteFail(UpdateFailHtlc(ByteVector32.Zeroes, 0, Sphinx.FailurePacket.create(sharedSecrets1.head._1, failure))))) // payment lifecycle will ask the router to temporarily exclude this channel from its route calculations + routerForwarder.expectMsgType[ChannelCouldNotRelay] routerForwarder.expectMsg(ExcludeChannel(ChannelDesc(update_bc.shortChannelId, b, c))) routerForwarder.forward(routerFixture.router) // payment lifecycle forwards the embedded channelUpdate to the router diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala new file mode 100644 index 0000000000..c9c8d05385 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BalanceEstimateSpec.scala @@ -0,0 +1,168 @@ +/* + * Copyright 2021 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.router + +import fr.acinq.bitcoin.scalacompat.SatoshiLong +import fr.acinq.eclair.{MilliSatoshiLong, TimestampSecond, TimestampSecondLong} +import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper +import org.scalatest.funsuite.AnyFunSuite + +import scala.concurrent.duration.DurationInt + +class BalanceEstimateSpec extends AnyFunSuite { + def isValid(balance: BalanceEstimate): Boolean = { + balance.low >= 0.msat && + balance.low < balance.high && + balance.high <= balance.totalCapacity + } + + test("symmetry") { + var balanceA = BalanceEstimate.baseline(40000 sat, 1 day) + var balanceB = balanceA.otherSide + assert(isValid(balanceA)) + assert(isValid(balanceB)) + assert(balanceB.otherSide === balanceA) + + balanceA = balanceA.couldNotSend(35000000 msat, 1000 unixsec) + balanceB = balanceB.couldSend(balanceB.totalCapacity - 35000000.msat, 1000 unixsec) + assert(isValid(balanceA)) + assert(isValid(balanceB)) + assert(balanceB.otherSide === balanceA) + + balanceA = balanceA.couldSend(10000000 msat, 2000 unixsec) + balanceB = balanceB.couldNotSend(balanceB.totalCapacity - 10000000.msat, 2000 unixsec) + assert(isValid(balanceA)) + assert(isValid(balanceB)) + assert(balanceB.otherSide === balanceA) + + balanceA = balanceA.addChannel(5000 sat) + balanceB = balanceB.addChannel(5000 sat) + assert(isValid(balanceA)) + assert(isValid(balanceB)) + assert(balanceB.otherSide === balanceA) + + balanceA = balanceA.couldSend(balanceA.totalCapacity - 1000000.msat, 15000 unixsec) + balanceB = balanceB.couldNotSend(1000000 msat, 15000 unixsec) + assert(isValid(balanceA)) + assert(isValid(balanceB)) + assert(balanceB.otherSide === balanceA) + + balanceA = balanceA.removeChannel(5000 sat) + balanceB = balanceB.removeChannel(5000 sat) + assert(isValid(balanceA)) + assert(isValid(balanceB)) + assert(balanceB.otherSide === balanceA) + } + + test("no balance information") { + val balance = BalanceEstimate.baseline(100 sat, 1 day) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(1 msat) === 1.0 +- 0.001) + assert(balance.canSend(50000 msat) === 0.5 +- 0.001) + assert(balance.canSend(99999 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("can send balance info bounds") { + val now = TimestampSecond.now() + val balance = + BalanceEstimate.baseline(100 sat, 1 day) + .couldSend(24000 msat, now) + .couldNotSend(30000 msat, now) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(1 msat) === 1.0 +- 0.001) + assert(balance.canSend(23999 msat) === 1.0 +- 0.001) + assert(balance.canSend(24000 msat) === 1.0 +- 0.001) + assert(balance.canSend(24001 msat) === 1.0 +- 0.001) + assert(balance.canSend(27000 msat) === 0.5 +- 0.001) + assert(balance.canSend(29999 msat) === 0.0 +- 0.001) + assert(balance.canSend(30000 msat) === 0.0 +- 0.001) + assert(balance.canSend(30001 msat) === 0.0 +- 0.001) + assert(balance.canSend(99999 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("could and couldn't send at the same time") { + val now = TimestampSecond.now() + val balance = + BalanceEstimate.baseline(100 sat, 1 day) + .couldSend(26000 msat, now) + .couldNotSend(26000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(1 msat) === 1.0 +- 0.001) + assert(balance.canSend(26000 msat) >= 0) + assert(balance.canSend(26000 msat) <= 1) + assert(balance.canSend(99999 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("couldn't and could send at the same time") { + val now = TimestampSecond.now() + val balance = + BalanceEstimate.baseline(100 sat, 1 day) + .couldNotSend(26000 msat, now) + .couldSend(26000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(0 msat) === 1.0 +- 0.001) + assert(balance.canSend(1 msat) === 1.0 +- 0.001) + assert(balance.canSend(26000 msat) >= 0) + assert(balance.canSend(26000 msat) <= 1) + assert(balance.canSend(99999 msat) === 0.0 +- 0.001) + assert(balance.canSend(100000 msat) === 0.0 +- 0.001) + } + + test("decay") { + val longAgo = TimestampSecond.now() - 1.day + val balance = + BalanceEstimate.baseline(100 sat, 1 second) + .couldNotSend(32000 msat, longAgo) + .couldSend(28000 msat, longAgo) + assert(isValid(balance)) + assert(balance.canSend(1 msat) === 1.0 +- 0.01) + assert(balance.canSend(33333 msat) === 0.666 +- 0.01) + assert(balance.canSend(66666 msat) === 0.333 +- 0.01) + assert(balance.canSend(99999 msat) === 0.0 +- 0.01) + } + + test("sending shifts amounts") { + val now = TimestampSecond.now() + val balance = + BalanceEstimate.baseline(100 sat, 1 day) + .couldNotSend(80000 msat, now) + .couldSend(50000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(50000 msat) === 1.0 +- 0.001) + assert(balance.canSend(80000 msat) === 0.0 +- 0.001) + val balanceAfterSend = balance.didSend(20000 msat, now) + assert(isValid(balanceAfterSend)) + assert(balanceAfterSend.canSend(30000 msat) === 1.0 +- 0.001) + assert(balanceAfterSend.canSend(60000 msat) === 0.0 +- 0.001) + } + + test("sending after decay") { + val longAgo = TimestampSecond.now() - 1.day + val now = TimestampSecond.now() + val balance = + BalanceEstimate.baseline(100 sat, 1 second) + .couldNotSend(80000 msat, longAgo) + .couldSend(50000 msat, longAgo) + .didSend(40000 msat, now) + assert(isValid(balance)) + assert(balance.canSend(60000 msat) === 0.0 +- 0.01) + } +}