Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
for (nodeId <- nodes) {
appKit.nodeParams.db.peers.addOrUpdateRelayFees(nodeId, RelayFees(feeBaseMsat, feeProportionalMillionths))
}
(appKit.router ? Router.GetLocalChannels).mapTo[Iterable[LocalChannel]]
.map(channels => channels.filter(c => nodes.contains(c.remoteNodeId)).map(c => Right(c.shortChannelId)))
.flatMap(channels => sendToChannels[CommandResponse[CMD_UPDATE_RELAY_FEE]](channels.toList, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, feeBaseMsat, feeProportionalMillionths)))
sendToNodes(nodes, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, feeBaseMsat, feeProportionalMillionths, cltvExpiryDelta_opt = None))
}

override def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] = for {
Expand Down Expand Up @@ -424,6 +422,14 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
Future.foldLeft(commands)(Map.empty[ApiTypes.ChannelIdentifier, Either[Throwable, T]])(_ + _)
}

/** Send a request to multiple channels using node ids */
private def sendToNodes[T: ClassTag](nodeids: List[PublicKey], request: Any)(implicit timeout: Timeout): Future[Map[ApiTypes.ChannelIdentifier, Either[Throwable, T]]] = {
for {
channelIds <- (appKit.register ? Symbol("channelsTo")).mapTo[Map[ByteVector32, PublicKey]].map(_.filter(kv => nodeids.contains(kv._2)).keys)
res <- sendToChannels[T](channelIds.map(Left(_)).toList, request)
} yield res
}

override def getInfo()(implicit timeout: Timeout): Future[GetInfoResponse] = Future.successful(
GetInfoResponse(
version = Kit.getVersionLong,
Expand Down
109 changes: 57 additions & 52 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ final case class CMD_SIGN(replyTo_opt: Option[ActorRef] = None) extends HasOptio
sealed trait CloseCommand extends HasReplyToCommand
final case class CMD_CLOSE(replyTo: ActorRef, scriptPubKey: Option[ByteVector]) extends CloseCommand
final case class CMD_FORCECLOSE(replyTo: ActorRef) extends CloseCommand
final case class CMD_UPDATE_RELAY_FEE(replyTo: ActorRef, feeBase: MilliSatoshi, feeProportionalMillionths: Long) extends HasReplyToCommand
final case class CMD_UPDATE_RELAY_FEE(replyTo: ActorRef, feeBase: MilliSatoshi, feeProportionalMillionths: Long, cltvExpiryDelta_opt: Option[CltvExpiryDelta]) extends HasReplyToCommand
final case class CMD_GETSTATE(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GETSTATEDATA(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GETINFO(replyTo: ActorRef)extends HasReplyToCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ case class ChannelIdAssigned(channel: ActorRef, remoteNodeId: PublicKey, tempora

case class ShortChannelIdAssigned(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, previousShortChannelId: Option[ShortChannelId]) extends ChannelEvent

case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, previousChannelUpdate_opt: Option[ChannelUpdate], commitments: AbstractCommitments) extends ChannelEvent
case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, commitments: AbstractCommitments) extends ChannelEvent

case class ChannelUpdateParametersChanged(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelUpdate: ChannelUpdate) extends ChannelEvent

case class LocalChannelDown(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey) extends ChannelEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait AuditDb extends Closeable {

def add(channelErrorOccurred: ChannelErrorOccurred): Unit

def addChannelUpdate(localChannelUpdate: LocalChannelUpdate): Unit
def addChannelUpdate(channelUpdateParametersChanged: ChannelUpdateParametersChanged): Unit

def listSent(from: Long, to: Long): Seq[PaymentSent]

Expand Down
14 changes: 3 additions & 11 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred])
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
context.system.eventStream.subscribe(self, classOf[ChannelClosed])
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[ChannelUpdateParametersChanged])

override def receive: Receive = {

Expand Down Expand Up @@ -117,16 +117,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
auditDb.add(ChannelEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event))
channelsDb.updateChannelMeta(e.channelId, event)

case u: LocalChannelUpdate =>
u.previousChannelUpdate_opt match {
case Some(previous) if
u.channelUpdate.feeBaseMsat == previous.feeBaseMsat &&
u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths &&
u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta &&
u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat &&
u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => ()
case _ => auditDb.addChannelUpdate(u)
}
case u: ChannelUpdateParametersChanged =>
auditDb.addChannelUpdate(u)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ case class DualAuditDb(sqlite: SqliteAuditDb, postgres: PgAuditDb) extends Audit
sqlite.add(channelErrorOccurred)
}

override def addChannelUpdate(localChannelUpdate: LocalChannelUpdate): Unit = {
runAsync(postgres.addChannelUpdate(localChannelUpdate))
sqlite.addChannelUpdate(localChannelUpdate)
override def addChannelUpdate(channelUpdateParametersChanged: ChannelUpdateParametersChanged): Unit = {
runAsync(postgres.addChannelUpdate(channelUpdateParametersChanged))
sqlite.addChannelUpdate(channelUpdateParametersChanged)
}

override def listSent(from: Long, to: Long): Seq[PaymentSent] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package fr.acinq.eclair.db.pg

import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalChannelUpdate, LocalError, NetworkFeePaid, RemoteError}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
Expand Down Expand Up @@ -243,7 +243,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def addChannelUpdate(u: LocalChannelUpdate): Unit = withMetrics("audit/add-channel-update", DbBackends.Postgres) {
override def addChannelUpdate(u: ChannelUpdateParametersChanged): Unit = withMetrics("audit/add-channel-update", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO audit.channel_updates VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, u.channelId.toHex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package fr.acinq.eclair.db.sqlite

import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalChannelUpdate, LocalError, NetworkFeePaid, RemoteError}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
Expand Down Expand Up @@ -240,7 +240,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
}
}

override def addChannelUpdate(u: LocalChannelUpdate): Unit = withMetrics("audit/add-channel-update", DbBackends.Sqlite) {
override def addChannelUpdate(u: ChannelUpdateParametersChanged): Unit = withMetrics("audit/add-channel-update", DbBackends.Sqlite) {
using(sqlite.prepareStatement("INSERT INTO channel_updates VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, u.channelId.toArray)
statement.setBytes(2, u.remoteNodeId.value.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object ChannelRelayer {
replyTo ! Relayer.OutgoingChannels(channels.toSeq)
Behaviors.same

case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, _, commitments)) =>
case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments)) =>
context.log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> Relayer.OutgoingChannel(remoteNodeId, channelUpdate, commitments))
val node2channels1 = node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ object Announcements {
def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0)

def areSameIgnoreFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.feeBaseMsat == u2.feeBaseMsat &&
u1.feeProportionalMillionths == u2.feeProportionalMillionths &&
u1.cltvExpiryDelta == u2.cltvExpiryDelta &&
u1.htlcMinimumMsat == u2.htlcMinimumMsat &&
u1.htlcMaximumMsat == u2.htlcMaximumMsat

def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte()

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ case class ChannelUpdate(signature: ByteVector64,
require(((messageFlags & 1) != 0) == htlcMaximumMsat.isDefined, "htlcMaximumMsat is not consistent with messageFlags")

def isNode1 = Announcements.isNode1(channelFlags)

def toStringShort: String = s"cltvExpiryDelta=$cltvExpiryDelta,feeBase=$feeBaseMsat,feeProportionalMillionths=$feeProportionalMillionths"
}

// @formatter:off
Expand Down
125 changes: 0 additions & 125 deletions eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala

This file was deleted.

Loading