Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2276,7 +2276,7 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall
Closing
.onChainOutgoingHtlcs(d.commitments.latest.localCommit, d.commitments.latest.remoteCommit, d.commitments.latest.nextRemoteCommit_opt, tx)
.map(add => (add, d.commitments.originChannels.get(add.id).map(_.upstream).collect { case Upstream.Local(id) => id })) // we resolve the payment id if this was a local payment
.collect { case (add, Some(id)) => context.system.eventStream.publish(PaymentSettlingOnChain(id, amount = add.amountMsat, add.paymentHash)) }
.collect { case (add, Some(id)) => context.system.eventStream.publish(PaymentSettlingOnChain(id, d.channelId, add.amountMsat, add.paymentHash)) }
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay()
Closing.isClosed(d1, Some(tx)) match {
case Some(closingType) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object WeakEntropyPool {
def apply(collector: EntropyCollector): Behavior[Command] = {
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[NewBlock](e => WrappedNewBlock(e.blockId)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelPaymentRelayed](e => WrappedPaymentRelayed(e.paymentHash, e.timestamp)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelPaymentRelayed](e => WrappedPaymentRelayed(e.paymentHash, e.settledAt)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[PeerConnected](e => WrappedPeerConnected(e.nodeId)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[NodeUpdated](e => WrappedNodeUpdated(e.ann.signature)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelSignatureReceived](e => WrappedChannelSignature(e.commitments.latest.localCommit.remoteSig)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ object FailureType extends Enumeration {
object FailureSummary {
def apply(f: PaymentFailure): FailureSummary = f match {
case LocalFailure(_, route, t) => FailureSummary(FailureType.LOCAL, t.getMessage, route.map(h => HopSummary(h)).toList, route.headOption.map(_.nodeId))
case RemoteFailure(_, route, e) => FailureSummary(FailureType.REMOTE, e.failureMessage.message, route.map(h => HopSummary(h)).toList, Some(e.originNode))
case UnreadableRemoteFailure(_, route, _, _) => FailureSummary(FailureType.UNREADABLE_REMOTE, "could not decrypt failure onion", route.map(h => HopSummary(h)).toList, None)
case RemoteFailure(_, route, e, _, _) => FailureSummary(FailureType.REMOTE, e.failureMessage.message, route.map(h => HopSummary(h)).toList, Some(e.originNode))
case UnreadableRemoteFailure(_, route, _, _, _, _) => FailureSummary(FailureType.UNREADABLE_REMOTE, "could not decrypt failure onion", route.map(h => HopSummary(h)).toList, None)
}
}

Expand Down
20 changes: 11 additions & 9 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.setString(7, e.paymentPreimage.toHex)
statement.setString(8, e.recipientNodeId.value.toHex)
statement.setString(9, p.toChannelId.toHex)
statement.setTimestamp(10, p.timestamp.toSqlTimestamp)
statement.setTimestamp(10, p.settledAt.toSqlTimestamp)
statement.addBatch()
})
statement.executeBatch()
Expand All @@ -230,7 +230,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.setLong(1, p.amount.toLong)
statement.setString(2, e.paymentHash.toHex)
statement.setString(3, p.fromChannelId.toHex)
statement.setTimestamp(4, p.timestamp.toSqlTimestamp)
statement.setTimestamp(4, p.receivedAt.toSqlTimestamp)
statement.addBatch()
})
statement.executeBatch()
Expand All @@ -251,7 +251,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, nextTrampolineAmount.toLong)
statement.setString(3, nextTrampolineNodeId.value.toHex)
statement.setTimestamp(4, e.timestamp.toSqlTimestamp)
statement.setTimestamp(4, e.settledAt.toSqlTimestamp)
statement.executeUpdate()
}
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
Expand Down Expand Up @@ -362,20 +362,22 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
MilliSatoshi(rs.getLong("fees_msat")),
rs.getByteVector32FromHex("to_channel_id"),
None, // we don't store the route in the audit DB
TimestampMilli.fromSqlTimestamp(rs.getTimestamp("timestamp")))
// TODO: store startedAt when updating the DB schema instead of duplicating settledAt.
startedAt = TimestampMilli.fromSqlTimestamp(rs.getTimestamp("timestamp")),
settledAt = TimestampMilli.fromSqlTimestamp(rs.getTimestamp("timestamp")))
val sent = sentByParentId.get(parentId) match {
case Some(s) => s.copy(parts = s.parts :+ part)
case None => PaymentSent(
parentId,
rs.getByteVector32FromHex("payment_hash"),
rs.getByteVector32FromHex("payment_preimage"),
MilliSatoshi(rs.getLong("recipient_amount_msat")),
PublicKey(rs.getByteVectorFromHex("recipient_node_id")),
Seq(part),
None)
None,
part.startedAt)
}
sentByParentId + (parentId -> sent)
}.values.toSeq.sortBy(_.timestamp)
}.values.toSeq.sortBy(_.settledAt)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
Expand All @@ -400,7 +402,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
case None => PaymentReceived(paymentHash, Seq(part))
}
receivedByHash + (paymentHash -> received)
}.values.toSeq.sortBy(_.timestamp)
}.values.toSeq.sortBy(_.settledAt)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
Expand Down Expand Up @@ -454,7 +456,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
Seq(OnTheFlyFundingPaymentRelayed(paymentHash, incoming, outgoing))
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
}.toSeq.sortBy(_.settledAt)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.sent SET (completed_at, payment_preimage, fees_msat, payment_route) = (?, ?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
paymentResult.parts.foreach(p => {
statement.setTimestamp(1, p.timestamp.toSqlTimestamp)
statement.setTimestamp(1, p.settledAt.toSqlTimestamp)
statement.setString(2, paymentResult.paymentPreimage.toHex)
statement.setLong(3, p.feesPaid.toLong)
statement.setBytes(4, encodeRoute(p.route.getOrElse(Nil).map(h => HopSummary(h)).toList))
Expand All @@ -155,7 +155,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.sent SET (completed_at, failures) = (?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
statement.setTimestamp(1, paymentResult.timestamp.toSqlTimestamp)
statement.setTimestamp(1, paymentResult.settledAt.toSqlTimestamp)
statement.setBytes(2, encodeFailures(paymentResult.failures.map(f => FailureSummary(f)).toList))
statement.setString(3, paymentResult.id.toString)
if (statement.executeUpdate() == 0) throw new IllegalArgumentException(s"Tried to mark an outgoing payment as failed but already in final status (id=${paymentResult.id})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.setBytes(7, e.paymentPreimage.toArray)
statement.setBytes(8, e.recipientNodeId.value.toArray)
statement.setBytes(9, p.toChannelId.toArray)
statement.setLong(10, p.timestamp.toLong)
statement.setLong(10, p.settledAt.toLong)
statement.addBatch()
})
statement.executeBatch()
Expand All @@ -220,7 +220,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.setLong(1, p.amount.toLong)
statement.setBytes(2, e.paymentHash.toArray)
statement.setBytes(3, p.fromChannelId.toArray)
statement.setLong(4, p.timestamp.toLong)
statement.setLong(4, p.receivedAt.toLong)
statement.addBatch()
})
statement.executeBatch()
Expand All @@ -239,7 +239,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.setBytes(1, e.paymentHash.toArray)
statement.setLong(2, nextTrampolineAmount.toLong)
statement.setBytes(3, nextTrampolineNodeId.value.toArray)
statement.setLong(4, e.timestamp.toLong)
statement.setLong(4, e.settledAt.toLong)
statement.executeUpdate()
}
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
Expand Down Expand Up @@ -336,20 +336,22 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
MilliSatoshi(rs.getLong("fees_msat")),
rs.getByteVector32("to_channel_id"),
None, // we don't store the route in the audit DB
TimestampMilli(rs.getLong("timestamp")))
// TODO: store startedAt when updating the DB schema instead of duplicating settledAt.
startedAt = TimestampMilli(rs.getLong("timestamp")),
settledAt = TimestampMilli(rs.getLong("timestamp")))
val sent = sentByParentId.get(parentId) match {
case Some(s) => s.copy(parts = s.parts :+ part)
case None => PaymentSent(
parentId,
rs.getByteVector32("payment_hash"),
rs.getByteVector32("payment_preimage"),
MilliSatoshi(rs.getLong("recipient_amount_msat")),
PublicKey(rs.getByteVector("recipient_node_id")),
Seq(part),
None)
None,
part.startedAt)
}
sentByParentId + (parentId -> sent)
}.values.toSeq.sortBy(_.timestamp)
}.values.toSeq.sortBy(_.settledAt)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
Expand All @@ -372,7 +374,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
case None => PaymentReceived(paymentHash, Seq(part))
}
receivedByHash + (paymentHash -> received)
}.values.toSeq.sortBy(_.timestamp)
}.values.toSeq.sortBy(_.settledAt)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
Expand Down Expand Up @@ -425,7 +427,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
Seq(OnTheFlyFundingPaymentRelayed(paymentHash, incoming, outgoing))
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
}.toSeq.sortBy(_.settledAt)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class SqlitePaymentsDb(val sqlite: Connection) extends PaymentsDb with Logging {
override def updateOutgoingPayment(paymentResult: PaymentSent): Unit = withMetrics("payments/update-outgoing-sent", DbBackends.Sqlite) {
using(sqlite.prepareStatement("UPDATE sent_payments SET (completed_at, payment_preimage, fees_msat, payment_route) = (?, ?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
paymentResult.parts.foreach(p => {
statement.setLong(1, p.timestamp.toLong)
statement.setLong(1, p.settledAt.toLong)
statement.setBytes(2, paymentResult.paymentPreimage.toArray)
statement.setLong(3, p.feesPaid.toLong)
statement.setBytes(4, encodeRoute(p.route.getOrElse(Nil).map(h => HopSummary(h)).toList))
Expand All @@ -181,7 +181,7 @@ class SqlitePaymentsDb(val sqlite: Connection) extends PaymentsDb with Logging {

override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed", DbBackends.Sqlite) {
using(sqlite.prepareStatement("UPDATE sent_payments SET (completed_at, failures) = (?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
statement.setLong(1, paymentResult.timestamp.toLong)
statement.setLong(1, paymentResult.settledAt.toLong)
statement.setBytes(2, encodeFailures(paymentResult.failures.map(f => FailureSummary(f)).toList))
statement.setString(3, paymentResult.id.toString)
if (statement.executeUpdate() == 0) throw new IllegalArgumentException(s"Tried to mark an outgoing payment as failed but already in final status (id=${paymentResult.id})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,17 @@ object PaymentFailedSummarySerializer extends ConvertClassSerializer[PaymentFail
val route = f.route.map(_.nodeId) ++ f.route.lastOption.map(_.nextNodeId)
val message = f match {
case LocalFailure(_, _, t) => t.getMessage
case RemoteFailure(_, _, Sphinx.DecryptedFailurePacket(origin, _, failureMessage)) => s"$origin returned: ${failureMessage.message}"
case RemoteFailure(_, _, Sphinx.DecryptedFailurePacket(origin, _, failureMessage), _, _) => s"$origin returned: ${failureMessage.message}"
case _: UnreadableRemoteFailure => "unreadable remote failure"
}
PaymentFailureSummaryJson(f.amount, route, message)
})
))
// @formatter:on

private case class PaymentSentJson(id: UUID, paymentHash: ByteVector32, paymentPreimage: ByteVector32, recipientAmount: MilliSatoshi, recipientNodeId: PublicKey, parts: Seq[PaymentSent.PartialPayment], fees: MilliSatoshi, startedAt: TimestampMilli, settledAt: TimestampMilli)
object PaymentSentSerializer extends ConvertClassSerializer[PaymentSent](p => PaymentSentJson(p.id, p.paymentHash, p.paymentPreimage, p.recipientAmount, p.recipientNodeId, p.parts, p.feesPaid, p.startedAt, p.settledAt))

object ThrowableSerializer extends MinimalSerializer({
case t: Throwable if t.getMessage != null => JString(t.getMessage)
case t: Throwable => JString(t.getClass.getSimpleName)
Expand Down Expand Up @@ -680,10 +683,11 @@ object CustomTypeHints {

val paymentEvent: CustomTypeHints = CustomTypeHints(Map(
classOf[PaymentSent] -> "payment-sent",
classOf[PaymentSentJson] -> "payment-sent",
classOf[ChannelPaymentRelayed] -> "payment-relayed",
classOf[TrampolinePaymentRelayed] -> "trampoline-payment-relayed",
classOf[OnTheFlyFundingPaymentRelayed] -> "on-the-fly-funding-payment-relayed",
classOf[PaymentReceived] -> "payment-received",
classOf[PaymentSettlingOnChain] -> "payment-settling-onchain",
classOf[PaymentFailed] -> "payment-failed",
))

Expand Down Expand Up @@ -795,6 +799,7 @@ object JsonSerializers {
GlobalBalanceSerializer +
PeerInfoSerializer +
PaymentFailedSummarySerializer +
PaymentSentSerializer +
OnionMessageReceivedSerializer +
ShortIdAliasesSerializer +
FundingTxStatusSerializer +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ object Monitoring {

def apply(pf: PaymentFailure): String = pf match {
case LocalFailure(_, _, t) => t.getClass.getSimpleName
case RemoteFailure(_, _, e) => e.failureMessage.getClass.getSimpleName
case RemoteFailure(_, _, e, _, _) => e.failureMessage.getClass.getSimpleName
case _: UnreadableRemoteFailure => "UnreadableRemoteFailure"
}
}
Expand Down
Loading