Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 35 additions & 37 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -283,33 +283,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case normal: DATA_NORMAL =>
watchFundingTx(data.commitments)
context.system.eventStream.publish(ShortChannelIdAssigned(self, normal.channelId, normal.channelUpdate.shortChannelId, None))

// we rebuild a new channel_update with values from the configuration because they may have changed while eclair was down
val fees = getRelayFees(nodeParams, remoteNodeId, data.commitments)
val candidateChannelUpdate = Announcements.makeChannelUpdate(
nodeParams.chainHash,
nodeParams.privateKey,
remoteNodeId,
normal.channelUpdate.shortChannelId,
nodeParams.expiryDelta,
normal.commitments.remoteParams.htlcMinimum,
fees.feeBase,
fees.feeProportionalMillionths,
normal.commitments.capacity.toMilliSatoshi,
enable = Announcements.isEnabled(normal.channelUpdate.channelFlags))
val channelUpdate1 = if (Announcements.areSame(candidateChannelUpdate, normal.channelUpdate)) {
// if there was no configuration change we keep the existing channel update
normal.channelUpdate
} else {
log.info("refreshing channel_update due to configuration changes old={} new={}", normal.channelUpdate, candidateChannelUpdate)
candidateChannelUpdate
}
// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
// we take into account the date of the last update so that we don't send superfluous updates when we restart the app
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp)
context.system.scheduler.scheduleWithFixedDelay(initialDelay = periodicRefreshInitialDelay, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))

goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1)
goto(OFFLINE) using normal

case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
watchFundingTx(funding.commitments)
Expand Down Expand Up @@ -1549,11 +1523,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) =>
log.info(s"updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, c.feeBase, d.channelUpdate.feeProportionalMillionths, c.feeProportionalMillionths)
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we're in OFFLINE state, we don't broadcast the new update right away, we will do that when next time we go to NORMAL state
stay() using d.copy(channelUpdate = channelUpdate) storing()
// we're in OFFLINE state, there is nothing to do, the channel update will be recomputed when SYNCING
stay()

case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSinceBlock, d.fundingTx)

Expand Down Expand Up @@ -1665,6 +1638,32 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
}

// we rebuild a new channel_update with values from the configuration because they may have changed while eclair was down
val fees = getRelayFees(nodeParams, remoteNodeId, d.commitments)
val candidateChannelUpdate = Announcements.makeChannelUpdate(
nodeParams.chainHash,
nodeParams.privateKey,
remoteNodeId,
d.channelUpdate.shortChannelId,
nodeParams.expiryDelta,
d.commitments.remoteParams.htlcMinimum,
fees.feeBase,
fees.feeProportionalMillionths,
d.commitments.capacity.toMilliSatoshi,
enable = Announcements.isEnabled(d.channelUpdate.channelFlags))
val channelUpdate1 = if (Announcements.areSame(candidateChannelUpdate, d.channelUpdate)) {
// if there was no configuration change we keep the existing channel update
d.channelUpdate
} else {
log.info("refreshing channel_update due to configuration changes old={} new={}", d.channelUpdate, candidateChannelUpdate)
candidateChannelUpdate
}
// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
// we take into account the date of the last update so that we don't send superfluous updates when we restart the app
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp)
context.system.scheduler.scheduleWithFixedDelay(initialDelay = periodicRefreshInitialDelay, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))


if (d.commitments.announceChannel) {
// we will re-enable the channel after some delay to prevent flappy updates in case the connection is unstable
startSingleTimer(Reconnected.toString, BroadcastChannelUpdate(Reconnected), 10 seconds)
Expand All @@ -1687,7 +1686,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
}

goto(NORMAL) using d.copy(commitments = commitments1) sending sendQueue
goto(NORMAL) using d.copy(commitments = commitments1, channelUpdate = channelUpdate1) sending sendQueue
}

case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)
Expand Down Expand Up @@ -1879,24 +1878,23 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => ()
}

val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}

(state, nextState, stateData, nextStateData) match {
// ORDER MATTERS!
case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) =>
Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.debug("re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, Some(normal.channelUpdate), normal.commitments))
case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
// don't do anything if neither the channel_update nor the channel_announcement didn't change
()
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | OFFLINE | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
// when we do WAIT_FOR_FUNDING_LOCKED->NORMAL or NORMAL->NORMAL or SYNCING->NORMAL or NORMAL->OFFLINE, we send out the new channel_update (most of the time it will just be to enable/disable the channel)
log.info("emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
case (_, _, _: DATA_NORMAL, _: DATA_NORMAL) =>
// in any other case (e.g. OFFLINE->SYNCING) we do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router.Announcements

/**
* This actor sits at the interface between our event stream and the database.
Expand Down Expand Up @@ -119,15 +120,9 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {

case u: LocalChannelUpdate =>
u.previousChannelUpdate_opt match {
case Some(previous) if
u.channelUpdate.feeBaseMsat == previous.feeBaseMsat &&
u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths &&
u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta &&
u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat &&
u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => ()
case Some(previous) if Announcements.areSameWithoutFlags(previous, u.channelUpdate) => () // channel update hasn't changed
case _ => auditDb.addChannelUpdate(u)
}

}

override def unhandled(message: Any): Unit = log.warning(s"unhandled msg=$message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ object Announcements {
def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0)

def areSameWithoutFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0)

def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte()

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.transactions._
import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
import fr.acinq.eclair.wire.protocol.UpdateMessage
import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage}
import scodec.bits.{BitVector, ByteVector}
import scodec.codecs._
import scodec.{Attempt, Codec}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0
import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
import fr.acinq.eclair.wire.protocol.UpdateMessage
import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage}
import scodec.bits.ByteVector
import scodec.codecs._
import scodec.{Attempt, Codec}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0
import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
import fr.acinq.eclair.wire.protocol.UpdateMessage
import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage}
import scodec.bits.ByteVector
import scodec.codecs._
import scodec.{Attempt, Codec}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.transactions.{CommitmentSpec, DirectedHtlc, IncomingHtlc, OutgoingHtlc}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
import fr.acinq.eclair.wire.protocol.UpdateMessage
import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage}
import fr.acinq.eclair.{FeatureSupport, Features, MilliSatoshi}
import scodec.bits.{BitVector, ByteVector}
import scodec.codecs._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package fr.acinq.eclair.channel

import akka.testkit.TestProbe
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.TestConstants.Alice
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingSpentTriggered
import fr.acinq.eclair.channel.states.ChannelStateTestsBase
import fr.acinq.eclair.channel.states.ChannelStateTestsHelperMethods.FakeTxPublisherFactory
import fr.acinq.eclair.crypto.Generators
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.payment.relay.Relayer.{RelayFees, RelayParams}
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.transactions.Transactions.{ClaimP2WPKHOutputTx, DefaultCommitmentFormat, InputInfo, TxOwner}
import fr.acinq.eclair.wire.protocol.{ChannelReestablish, CommitSig, Error, Init, RevokeAndAck}
Expand Down Expand Up @@ -122,4 +126,71 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha
val tx1 = tx.updateWitness(0, ScriptWitness(Scripts.der(sig) :: ourToRemotePubKey.value :: Nil))
Transaction.correctlySpends(tx1, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
}

test("restore channel without configuration change") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
assert(alice.stateData.isInstanceOf[DATA_NORMAL])
val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL]

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// we restart Alice
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(TestConstants.Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)
val u = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(u.previousChannelUpdate_opt.nonEmpty)
assert(Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, u.channelUpdate))
assert(Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, oldStateData.channelUpdate))
channelUpdateListener.expectNoMessage(1 second)
}

test("restore channel with configuration change") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
assert(alice.stateData.isInstanceOf[DATA_NORMAL])
val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL]

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// we restart Alice with a different configuration
val newFees = RelayFees(765 msat, 2345)
val newConfig = TestConstants.Alice.nodeParams.copy(relayParams = RelayParams(newFees, newFees, newFees))
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(newConfig, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)
// First LocalChannelUpdate is outdated
val u1 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(u1.previousChannelUpdate_opt.nonEmpty)
assert(Announcements.areSameWithoutFlags(u1.previousChannelUpdate_opt.get, u1.channelUpdate))
assert(Announcements.areSameWithoutFlags(u1.previousChannelUpdate_opt.get, oldStateData.channelUpdate))
// Second LocalChannelUpdate is the right one
val u2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(u2.previousChannelUpdate_opt.nonEmpty)
assert(!Announcements.areSameWithoutFlags(u2.previousChannelUpdate_opt.get, u2.channelUpdate))
assert(Announcements.areSameWithoutFlags(u2.previousChannelUpdate_opt.get, oldStateData.channelUpdate))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,42 +394,6 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(error === Error(channelId(alice), InvalidRevokedCommitProof(channelId(alice), 0, 42, invalidReestablish.yourLastPerCommitmentSecret).getMessage))
}

test("change relay fee while offline") { f =>
import f._
val sender = TestProbe()

// we simulate a disconnection
disconnect(alice, bob)

// alice and bob will not announce that their channel is OFFLINE
channelUpdateListener.expectNoMessage(300 millis)

// we make alice update here relay fee
alice ! CMD_UPDATE_RELAY_FEE(sender.ref, 4200 msat, 123456)
sender.expectMsgType[RES_SUCCESS[CMD_UPDATE_RELAY_FEE]]

// alice doesn't broadcast the new channel_update yet
channelUpdateListener.expectNoMessage(300 millis)

// then we reconnect them
reconnect(alice, bob, alice2bob, bob2alice)

// peers exchange channel_reestablish messages
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
// note that we don't forward the channel_reestablish so that only alice reaches NORMAL state, it facilitates the test below
bob2alice.forward(alice)

// then alice reaches NORMAL state, and after a delay she broadcasts the channel_update
val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](20 seconds).channelUpdate
assert(channelUpdate.feeBaseMsat === 4200.msat)
assert(channelUpdate.feeProportionalMillionths === 123456)
assert(Announcements.isEnabled(channelUpdate.channelFlags))

// no more messages
channelUpdateListener.expectNoMessage(300 millis)
}

test("broadcast disabled channel_update while offline") { f =>
import f._
val sender = TestProbe()
Expand Down