Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

/**
* A blockchain watcher that:
* - receives bitcoin events (new blocks and new txes) directly from the bitcoin network
* - also uses bitcoin-core rpc api, most notably for tx confirmation count and blockcount (because reorgs)
* Created by PM on 21/02/2016.
*/

/**
* A blockchain watcher that:
* - receives bitcoin events (new blocks and new txs) directly from the bitcoin network
* - also uses bitcoin-core rpc api, most notably for tx confirmation count and block count (because reorgs)
*/
class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging {

import ZmqWatcher._
Expand Down Expand Up @@ -180,13 +183,15 @@ class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: Extend
case PublishAsap(tx) =>
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
if (csvTimeout > 0) {
require(tx.txIn.size == 1, s"watcher only supports tx with 1 input, this tx has ${tx.txIn.size} inputs")
val parentTxid = tx.txIn.head.outPoint.txid
log.info(s"txid=${tx.txid} has a relative timeout of $csvTimeout blocks, watching parenttxid=$parentTxid tx={}", tx)
val parentPublicKey = fr.acinq.bitcoin.Script.write(fr.acinq.bitcoin.Script.pay2wsh(tx.txIn.head.witness.stack.last))
self ! WatchConfirmed(self, parentTxid, parentPublicKey, minDepth = 1, BITCOIN_PARENT_TX_CONFIRMED(tx))
val csvTimeouts = Scripts.csvTimeouts(tx)
if (csvTimeouts.nonEmpty) {
// watcher supports txs with multiple csv-delayed inputs: we watch all delayed parents and try to publish every
// time a parent's relative delays are satisfied, so we will eventually succeed.
csvTimeouts.foreach { case (parentTxId, csvTimeout) =>
log.info(s"txid=${tx.txid} has a relative timeout of $csvTimeout blocks, watching parentTxId=$parentTxId tx={}", tx)
val parentPublicKeyScript = Script.write(Script.pay2wsh(tx.txIn.find(_.outPoint.txid == parentTxId).get.witness.stack.last))
self ! WatchConfirmed(self, parentTxId, parentPublicKeyScript, minDepth = csvTimeout, BITCOIN_PARENT_TX_CONFIRMED(tx))
}
} else if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
Expand All @@ -197,11 +202,9 @@ class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: Extend
log.info(s"parent tx of txid=${tx.txid} has been confirmed")
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = math.max(blockHeight + csvTimeout, cltvTimeout)
if (absTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(absTimeout, block2tx.getOrElse(absTimeout, Seq.empty[Transaction]) :+ tx)
if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
context become watching(watches, watchedUtxos, block2tx1, nextTick)
} else publish(tx)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,15 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
case PublishAsap(tx) =>
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
if (csvTimeout > 0) {
require(tx.txIn.size == 1, s"watcher only supports tx with 1 input, this tx has ${tx.txIn.size} inputs")
val parentTxid = tx.txIn.head.outPoint.txid
log.info(s"txid=${tx.txid} has a relative timeout of $csvTimeout blocks, watching parenttxid=$parentTxid tx={}", tx)
val parentPublicKeyScript = WatchConfirmed.extractPublicKeyScript(tx.txIn.head.witness)
self ! WatchConfirmed(self, parentTxid, parentPublicKeyScript, minDepth = 1, BITCOIN_PARENT_TX_CONFIRMED(tx))
val csvTimeouts = Scripts.csvTimeouts(tx)
if (csvTimeouts.nonEmpty) {
// watcher supports txs with multiple csv-delayed inputs: we watch all delayed parents and try to publish every
// time a parent's relative delays are satisfied, so we will eventually succeed.
csvTimeouts.foreach { case (parentTxId, csvTimeout) =>
log.info(s"txid=${tx.txid} has a relative timeout of $csvTimeout blocks, watching parentTxId=$parentTxId tx={}", tx)
val parentPublicKeyScript = WatchConfirmed.extractPublicKeyScript(tx.txIn.find(_.outPoint.txid == parentTxId).get.witness)
self ! WatchConfirmed(self, parentTxId, parentPublicKeyScript, minDepth = csvTimeout, BITCOIN_PARENT_TX_CONFIRMED(tx))
}
} else if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
Expand All @@ -193,11 +195,9 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
log.info(s"parent tx of txid=${tx.txid} has been confirmed")
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = math.max(blockHeight + csvTimeout, cltvTimeout)
if (absTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(absTimeout, block2tx.getOrElse(absTimeout, Seq.empty[Transaction]) :+ tx)
if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
context become running(height, tip, watches, scriptHashStatus, block2tx1, sent)
} else {
publish(tx)
Expand Down
23 changes: 11 additions & 12 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, tx_opt) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
tx_opt.foreach(claimTx => blockchain ! PublishAsap(claimTx))
tx_opt.foreach(claimTx => blockchain ! WatchSpent(self, tx, claimTx.txIn.head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
tx_opt.foreach(claimTx => blockchain ! WatchSpent(self, tx, claimTx.txIn.filter(_.outPoint.txid == tx.txid).head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
rev1
}
stay using d.copy(revokedCommitPublished = revokedCommitPublished1) storing()
Expand Down Expand Up @@ -2116,10 +2116,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

/**
* This helper method will publish txes only if they haven't yet reached minDepth
* This helper method will publish txs only if they haven't yet reached minDepth
*/
def publishIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
def publishIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach { tx =>
log.info(s"publishing txid=${tx.txid}")
blockchain ! PublishAsap(tx)
Expand All @@ -2128,20 +2128,20 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

/**
* This helper method will watch txes only if they haven't yet reached minDepth
* This helper method will watch txs only if they haven't yet reached minDepth
*/
def watchConfirmedIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchConfirmed(self, tx, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(tx)))
skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed"))
}

/**
* This helper method will watch txes only if the utxo they spend hasn't already been irrevocably spent
* This helper method will watch txs only if the utxo they spend hasn't already been irrevocably spent
*/
def watchSpentIfNeeded(parentTx: Transaction, txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchSpent(self, parentTx, tx.txIn.head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
def watchSpentIfNeeded(parentTx: Transaction, txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchSpent(self, parentTx, tx.txIn.filter(_.outPoint.txid == parentTx.txid).head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed"))
}

Expand Down Expand Up @@ -2226,7 +2226,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

def handleRemoteSpentOther(tx: Transaction, d: HasCommitments) = {
log.warning(s"funding tx spent in txid=${tx.txid}")

Helpers.Closing.claimRevokedRemoteCommitTxOutputs(keyManager, d.commitments, tx, nodeParams.db.channels, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets) match {
case Some(revokedCommitPublished) =>
log.warning(s"txid=${tx.txid} was a revoked commitment, publishing the penalty tx")
Expand Down
Loading