From 6a1fc841c1d58f56a7b6ea1bc840fc6e2e58eb6e Mon Sep 17 00:00:00 2001 From: pm47 Date: Thu, 2 Sep 2021 11:04:59 +0200 Subject: [PATCH] keep former channel_update in channel state This is very similar to #1918 (ef31de1), but we use the internal actor state instead of the `ChannelData`. Pros: - conceptually simple, low risk of regression - `ChannelData` stays untouched Cons: - now there is a `var` in the channel Those are the minimal changes to have the simplest diff, but we can include some improvements made in ef31de1. --- .../fr/acinq/eclair/channel/Channel.scala | 14 ++- .../fr/acinq/eclair/db/DbEventHandler.scala | 8 +- .../acinq/eclair/router/Announcements.scala | 3 + .../{RecoverySpec.scala => RestoreSpec.scala} | 110 +++++++++++++++--- 4 files changed, 109 insertions(+), 26 deletions(-) rename eclair-core/src/test/scala/fr/acinq/eclair/channel/{RecoverySpec.scala => RestoreSpec.scala} (51%) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 5092bdc923..a1de3897c1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -157,6 +157,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // this will be used to make sure the current commitment fee is up-to-date context.system.eventStream.subscribe(self, classOf[CurrentFeerates]) + private var previousChannelUpdate_opt: Option[ChannelUpdate] = None + /* 8888888 888b 888 8888888 88888888888 888 8888b 888 888 888 @@ -312,6 +314,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp) context.system.scheduler.scheduleWithFixedDelay(initialDelay = periodicRefreshInitialDelay, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh)) + previousChannelUpdate_opt = Some(normal.channelUpdate) goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1) case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED => @@ -978,6 +981,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId Some(Helpers.makeAnnouncementSignatures(nodeParams, d.commitments, shortChannelId)) } else None // we use GOTO instead of stay() because we want to fire transitions + previousChannelUpdate_opt = Some(d.channelUpdate) goto(NORMAL) using d.copy(shortChannelId = shortChannelId, buried = true, channelUpdate = channelUpdate) storing() sending localAnnSigs_opt.toSeq case Event(remoteAnnSigs: AnnouncementSignatures, d: DATA_NORMAL) if d.commitments.announceChannel => @@ -1022,6 +1026,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo replyTo ! RES_SUCCESS(c, d.channelId) // we use GOTO instead of stay() because we want to fire transitions + previousChannelUpdate_opt = Some(d.channelUpdate) goto(NORMAL) using d.copy(channelUpdate = channelUpdate) storing() case Event(BroadcastChannelUpdate(reason), d: DATA_NORMAL) => @@ -1035,6 +1040,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case _ => log.debug("refreshing channel_update announcement (reason={})", reason) // we use GOTO instead of stay() because we want to fire transitions + previousChannelUpdate_opt = Some(d.channelUpdate) goto(NORMAL) using d.copy(channelUpdate = channelUpdate1) storing() } @@ -1059,6 +1065,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } else { d } + previousChannelUpdate_opt = Some(d.channelUpdate) goto(OFFLINE) using d1 case Event(e: Error, d: DATA_NORMAL) => handleRemoteError(e, d) @@ -1557,6 +1564,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo replyTo ! RES_SUCCESS(c, d.channelId) // we're in OFFLINE state, we don't broadcast the new update right away, we will do that when next time we go to NORMAL state + previousChannelUpdate_opt = Some(d.channelUpdate) stay() using d.copy(channelUpdate = channelUpdate) storing() case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSinceBlock, d.fundingTx) @@ -1883,11 +1891,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case _ => () } - val previousChannelUpdate_opt = stateData match { - case data: DATA_NORMAL => Some(data.channelUpdate) - case _ => None - } - (state, nextState, stateData, nextStateData) match { // ORDER MATTERS! case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) => @@ -2144,6 +2147,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // then we update the state and replay the request self forward c // we use goto to fire transitions + previousChannelUpdate_opt = Some(d.channelUpdate) goto(stateName) using d.copy(channelUpdate = channelUpdate) } else { // channel is already disabled, we reply to the request diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 7b09d84050..696f748e94 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags} import fr.acinq.eclair.payment._ +import fr.acinq.eclair.router.Announcements /** * This actor sits at the interface between our event stream and the database. @@ -119,12 +120,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { case u: LocalChannelUpdate => u.previousChannelUpdate_opt match { - case Some(previous) if - u.channelUpdate.feeBaseMsat == previous.feeBaseMsat && - u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths && - u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta && - u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat && - u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => () + case Some(previous) if Announcements.areSameIgnoreFlags(previous, u.channelUpdate) => () // channel update hasn't changed => () case _ => auditDb.addChannelUpdate(u) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Announcements.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Announcements.scala index 03efd43d61..7e93e8e7a3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Announcements.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Announcements.scala @@ -126,6 +126,9 @@ object Announcements { def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean = u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0) + def areSameIgnoreFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean = + u1.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) + def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte() def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte() diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/RestoreSpec.scala similarity index 51% rename from eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala rename to eclair-core/src/test/scala/fr/acinq/eclair/channel/RestoreSpec.scala index abe96276a7..31f51642bc 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/RestoreSpec.scala @@ -1,23 +1,27 @@ package fr.acinq.eclair.channel -import akka.testkit.TestProbe +import akka.actor.typed.scaladsl.adapter.actorRefAdapter +import akka.testkit.{TestFSMRef, TestProbe} import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin._ -import fr.acinq.eclair.TestConstants.Alice +import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingSpentTriggered import fr.acinq.eclair.channel.states.ChannelStateTestsBase +import fr.acinq.eclair.channel.states.ChannelStateTestsHelperMethods.FakeTxPublisherFactory import fr.acinq.eclair.crypto.Generators import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager +import fr.acinq.eclair.payment.relay.Relayer.{RelayFees, RelayParams} +import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.transactions.Transactions.{ClaimP2WPKHOutputTx, DefaultCommitmentFormat, InputInfo, TxOwner} import fr.acinq.eclair.wire.protocol.{ChannelReestablish, CommitSig, Error, Init, RevokeAndAck} -import fr.acinq.eclair.{TestConstants, TestKitBaseClass, _} +import fr.acinq.eclair.{TestKitBaseClass, _} import org.scalatest.Outcome import org.scalatest.funsuite.FixtureAnyFunSuiteLike import scala.concurrent.duration._ -class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase { +class RestoreSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase { type FixtureParam = SetupFixture @@ -35,16 +39,16 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha } } - def aliceInit = Init(TestConstants.Alice.nodeParams.features) + def aliceInit = Init(Alice.nodeParams.features) - def bobInit = Init(TestConstants.Bob.nodeParams.features) + def bobInit = Init(Bob.nodeParams.features) test("use funding pubkeys from publish commitment to spend our output") { f => import f._ val sender = TestProbe() // we start by storing the current state - val oldStateData = alice.stateData + val oldStateData = alice.stateData.asInstanceOf[HasCommitments] // then we add an htlc and sign it addHtlc(250000000 msat, alice, bob, alice2bob, bob2alice) sender.send(alice, CMD_SIGN()) @@ -60,11 +64,12 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha awaitCond(alice.stateName == OFFLINE) awaitCond(bob.stateName == OFFLINE) - // then we manually replace alice's state with an older one - alice.setState(OFFLINE, oldStateData) + // we restart Alice + val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref) + newAlice ! INPUT_RESTORED(oldStateData) // then we reconnect them - sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)) + sender.send(newAlice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)) sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)) // peers exchange channel_reestablish messages @@ -72,19 +77,19 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha val ce = bob2alice.expectMsgType[ChannelReestablish] // alice then realizes it has an old state... - bob2alice.forward(alice) + bob2alice.forward(newAlice) // ... and ask bob to publish its current commitment val error = alice2bob.expectMsgType[Error] - assert(new String(error.data.toArray) === PleasePublishYourCommitment(channelId(alice)).getMessage) + assert(new String(error.data.toArray) === PleasePublishYourCommitment(channelId(newAlice)).getMessage) // alice now waits for bob to publish its commitment - awaitCond(alice.stateName == WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) + awaitCond(newAlice.stateName == WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) // bob is nice and publishes its commitment val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.fullySignedLocalCommitTx(bob.underlyingActor.nodeParams.channelKeyManager).tx // actual tests starts here: let's see what we can do with Bob's commit tx - sender.send(alice, WatchFundingSpentTriggered(bobCommitTx)) + sender.send(newAlice, WatchFundingSpentTriggered(bobCommitTx)) // from Bob's commit tx we can extract both funding public keys val OP_2 :: OP_PUSHDATA(pub1, _) :: OP_PUSHDATA(pub2, _) :: OP_2 :: OP_CHECKMULTISIG :: Nil = Script.parse(bobCommitTx.txIn(0).witness.stack.last) @@ -93,7 +98,7 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha val OP_0 :: OP_PUSHDATA(pubKeyHash, _) :: Nil = Script.parse(ourOutput.publicKeyScript) - val keyManager = TestConstants.Alice.nodeParams.channelKeyManager + val keyManager = Alice.nodeParams.channelKeyManager // find our funding pub key val fundingPubKey = Seq(PublicKey(pub1), PublicKey(pub2)).find { @@ -122,4 +127,79 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha val tx1 = tx.updateWitness(0, ScriptWitness(Scripts.der(sig) :: ourToRemotePubKey.value :: Nil)) Transaction.correctlySpends(tx1, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS) } + + test("restore channel without configuration change") { f => + import f._ + val sender = TestProbe() + + // we start by storing the current state + assert(alice.stateData.isInstanceOf[DATA_NORMAL]) + val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL] + + // we simulate a disconnection + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // we restart Alice + val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref) + newAlice ! INPUT_RESTORED(oldStateData) + + // we send out the new channel update + val u1 = channelUpdateListener.expectMsgType[LocalChannelUpdate] + assert(u1.previousChannelUpdate_opt.nonEmpty) + assert(Announcements.areSameIgnoreFlags(u1.previousChannelUpdate_opt.get, u1.channelUpdate)) + assert(u1.previousChannelUpdate_opt.get === oldStateData.channelUpdate) + + newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit) + bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit) + alice2bob.expectMsgType[ChannelReestablish] + bob2alice.expectMsgType[ChannelReestablish] + alice2bob.forward(bob) + bob2alice.forward(newAlice) + awaitCond(newAlice.stateName == NORMAL) + + // no new channel update + channelUpdateListener.expectNoMessage() + } + + test("restore channel with configuration change") { f => + import f._ + val sender = TestProbe() + + // we start by storing the current state + assert(alice.stateData.isInstanceOf[DATA_NORMAL]) + val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL] + + // we simulate a disconnection + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // we restart Alice with a different configuration + val newFees = RelayFees(765 msat, 2345) + val newConfig = Alice.nodeParams.copy(relayParams = RelayParams(newFees, newFees, newFees)) + val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(newConfig, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref) + newAlice ! INPUT_RESTORED(oldStateData) + + // we send out the new channel update + val u1 = channelUpdateListener.expectMsgType[LocalChannelUpdate] + assert(u1.previousChannelUpdate_opt.nonEmpty) + assert(!Announcements.areSameIgnoreFlags(u1.previousChannelUpdate_opt.get, u1.channelUpdate)) + assert(u1.previousChannelUpdate_opt.get === oldStateData.channelUpdate) + + newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit) + bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit) + alice2bob.expectMsgType[ChannelReestablish] + bob2alice.expectMsgType[ChannelReestablish] + alice2bob.forward(bob) + bob2alice.forward(newAlice) + awaitCond(newAlice.stateName == NORMAL) + + // no new channel update + channelUpdateListener.expectNoMessage() + } + }