Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// this will be used to make sure the current commitment fee is up-to-date
context.system.eventStream.subscribe(self, classOf[CurrentFeerates])

private var previousChannelUpdate_opt: Option[ChannelUpdate] = None

/*
8888888 888b 888 8888888 88888888888
888 8888b 888 888 888
Expand Down Expand Up @@ -312,6 +314,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp)
context.system.scheduler.scheduleWithFixedDelay(initialDelay = periodicRefreshInitialDelay, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))

previousChannelUpdate_opt = Some(normal.channelUpdate)
goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1)

case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
Expand Down Expand Up @@ -978,6 +981,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
Some(Helpers.makeAnnouncementSignatures(nodeParams, d.commitments, shortChannelId))
} else None
// we use GOTO instead of stay() because we want to fire transitions
previousChannelUpdate_opt = Some(d.channelUpdate)
goto(NORMAL) using d.copy(shortChannelId = shortChannelId, buried = true, channelUpdate = channelUpdate) storing() sending localAnnSigs_opt.toSeq

case Event(remoteAnnSigs: AnnouncementSignatures, d: DATA_NORMAL) if d.commitments.announceChannel =>
Expand Down Expand Up @@ -1022,6 +1026,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we use GOTO instead of stay() because we want to fire transitions
previousChannelUpdate_opt = Some(d.channelUpdate)
goto(NORMAL) using d.copy(channelUpdate = channelUpdate) storing()

case Event(BroadcastChannelUpdate(reason), d: DATA_NORMAL) =>
Expand All @@ -1035,6 +1040,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ =>
log.debug("refreshing channel_update announcement (reason={})", reason)
// we use GOTO instead of stay() because we want to fire transitions
previousChannelUpdate_opt = Some(d.channelUpdate)
goto(NORMAL) using d.copy(channelUpdate = channelUpdate1) storing()
}

Expand All @@ -1059,6 +1065,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
} else {
d
}
previousChannelUpdate_opt = Some(d.channelUpdate)
goto(OFFLINE) using d1

case Event(e: Error, d: DATA_NORMAL) => handleRemoteError(e, d)
Expand Down Expand Up @@ -1557,6 +1564,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we're in OFFLINE state, we don't broadcast the new update right away, we will do that when next time we go to NORMAL state
previousChannelUpdate_opt = Some(d.channelUpdate)
stay() using d.copy(channelUpdate = channelUpdate) storing()

case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSinceBlock, d.fundingTx)
Expand Down Expand Up @@ -1883,11 +1891,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => ()
}

val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}

(state, nextState, stateData, nextStateData) match {
// ORDER MATTERS!
case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) =>
Expand Down Expand Up @@ -2144,6 +2147,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// then we update the state and replay the request
self forward c
// we use goto to fire transitions
previousChannelUpdate_opt = Some(d.channelUpdate)
goto(stateName) using d.copy(channelUpdate = channelUpdate)
} else {
// channel is already disabled, we reply to the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router.Announcements

/**
* This actor sits at the interface between our event stream and the database.
Expand Down Expand Up @@ -119,12 +120,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {

case u: LocalChannelUpdate =>
u.previousChannelUpdate_opt match {
case Some(previous) if
u.channelUpdate.feeBaseMsat == previous.feeBaseMsat &&
u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths &&
u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta &&
u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat &&
u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => ()
case Some(previous) if Announcements.areSameIgnoreFlags(previous, u.channelUpdate) => () // channel update hasn't changed => ()
case _ => auditDb.addChannelUpdate(u)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ object Announcements {
def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0)

def areSameIgnoreFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0)

def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte()

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package fr.acinq.eclair.channel

import akka.testkit.TestProbe
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.TestConstants.Alice
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingSpentTriggered
import fr.acinq.eclair.channel.states.ChannelStateTestsBase
import fr.acinq.eclair.channel.states.ChannelStateTestsHelperMethods.FakeTxPublisherFactory
import fr.acinq.eclair.crypto.Generators
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.payment.relay.Relayer.{RelayFees, RelayParams}
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.transactions.Transactions.{ClaimP2WPKHOutputTx, DefaultCommitmentFormat, InputInfo, TxOwner}
import fr.acinq.eclair.wire.protocol.{ChannelReestablish, CommitSig, Error, Init, RevokeAndAck}
import fr.acinq.eclair.{TestConstants, TestKitBaseClass, _}
import fr.acinq.eclair.{TestKitBaseClass, _}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike

import scala.concurrent.duration._

class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {
class RestoreSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {

type FixtureParam = SetupFixture

Expand All @@ -35,16 +39,16 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha
}
}

def aliceInit = Init(TestConstants.Alice.nodeParams.features)
def aliceInit = Init(Alice.nodeParams.features)

def bobInit = Init(TestConstants.Bob.nodeParams.features)
def bobInit = Init(Bob.nodeParams.features)

test("use funding pubkeys from publish commitment to spend our output") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
val oldStateData = alice.stateData
val oldStateData = alice.stateData.asInstanceOf[HasCommitments]
// then we add an htlc and sign it
addHtlc(250000000 msat, alice, bob, alice2bob, bob2alice)
sender.send(alice, CMD_SIGN())
Expand All @@ -60,31 +64,32 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// then we manually replace alice's state with an older one
alice.setState(OFFLINE, oldStateData)
// we restart Alice
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)

// then we reconnect them
sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
sender.send(newAlice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit))

// peers exchange channel_reestablish messages
alice2bob.expectMsgType[ChannelReestablish]
val ce = bob2alice.expectMsgType[ChannelReestablish]

// alice then realizes it has an old state...
bob2alice.forward(alice)
bob2alice.forward(newAlice)
// ... and ask bob to publish its current commitment
val error = alice2bob.expectMsgType[Error]
assert(new String(error.data.toArray) === PleasePublishYourCommitment(channelId(alice)).getMessage)
assert(new String(error.data.toArray) === PleasePublishYourCommitment(channelId(newAlice)).getMessage)

// alice now waits for bob to publish its commitment
awaitCond(alice.stateName == WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT)
awaitCond(newAlice.stateName == WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT)

// bob is nice and publishes its commitment
val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.fullySignedLocalCommitTx(bob.underlyingActor.nodeParams.channelKeyManager).tx

// actual tests starts here: let's see what we can do with Bob's commit tx
sender.send(alice, WatchFundingSpentTriggered(bobCommitTx))
sender.send(newAlice, WatchFundingSpentTriggered(bobCommitTx))

// from Bob's commit tx we can extract both funding public keys
val OP_2 :: OP_PUSHDATA(pub1, _) :: OP_PUSHDATA(pub2, _) :: OP_2 :: OP_CHECKMULTISIG :: Nil = Script.parse(bobCommitTx.txIn(0).witness.stack.last)
Expand All @@ -93,7 +98,7 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha

val OP_0 :: OP_PUSHDATA(pubKeyHash, _) :: Nil = Script.parse(ourOutput.publicKeyScript)

val keyManager = TestConstants.Alice.nodeParams.channelKeyManager
val keyManager = Alice.nodeParams.channelKeyManager

// find our funding pub key
val fundingPubKey = Seq(PublicKey(pub1), PublicKey(pub2)).find {
Expand Down Expand Up @@ -122,4 +127,79 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha
val tx1 = tx.updateWitness(0, ScriptWitness(Scripts.der(sig) :: ourToRemotePubKey.value :: Nil))
Transaction.correctlySpends(tx1, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
}

test("restore channel without configuration change") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
assert(alice.stateData.isInstanceOf[DATA_NORMAL])
val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL]

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// we restart Alice
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)

// we send out the new channel update
val u1 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(u1.previousChannelUpdate_opt.nonEmpty)
assert(Announcements.areSameIgnoreFlags(u1.previousChannelUpdate_opt.get, u1.channelUpdate))
assert(u1.previousChannelUpdate_opt.get === oldStateData.channelUpdate)

newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)

// no new channel update
channelUpdateListener.expectNoMessage()
}

test("restore channel with configuration change") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
assert(alice.stateData.isInstanceOf[DATA_NORMAL])
val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL]

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// we restart Alice with a different configuration
val newFees = RelayFees(765 msat, 2345)
val newConfig = Alice.nodeParams.copy(relayParams = RelayParams(newFees, newFees, newFees))
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(newConfig, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)

// we send out the new channel update
val u1 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(u1.previousChannelUpdate_opt.nonEmpty)
assert(!Announcements.areSameIgnoreFlags(u1.previousChannelUpdate_opt.get, u1.channelUpdate))
assert(u1.previousChannelUpdate_opt.get === oldStateData.channelUpdate)

newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)

// no new channel update
channelUpdateListener.expectNoMessage()
}

}