Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/export_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var assetsCmd = &cobra.Command{
totalNumBytes := 0
var transformedAssets []transform.SchemaParquet
for _, transformInput := range paymentOps {
transformed, err := transform.TransformAsset(transformInput.Operation, transformInput.OperationIndex, transformInput.TransactionIndex, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta)
transformed, err := transform.TransformAsset(transformInput.Operation, transformInput.OperationIndex, transformInput.TransactionIndex, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
if err != nil {
txIndex := transformInput.TransactionIndex
cmdLogger.LogError(fmt.Errorf("could not extract asset from operation %d in transaction %d in ledger %d: ", transformInput.OperationIndex, txIndex, transformInput.LedgerSeqNum))
Expand Down
8 changes: 4 additions & 4 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i])
balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i], env.NetworkPassphrase)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -180,7 +180,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i])
offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i], env.NetworkPassphrase)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -193,7 +193,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i])
trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i], env.NetworkPassphrase)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -206,7 +206,7 @@ be exported.`,
continue
}
for i, change := range changes.Changes {
pool, err := transform.TransformPool(change, changes.LedgerHeaders[i])
pool, err := transform.TransformPool(change, changes.LedgerHeaders[i], env.NetworkPassphrase)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var tradesCmd = &cobra.Command{
totalNumBytes := 0
var transformedTrades []transform.SchemaParquet
for _, tradeInput := range trades {
trades, err := transform.TransformTrade(tradeInput.OperationIndex, tradeInput.OperationHistoryID, tradeInput.Transaction, tradeInput.CloseTime)
trades, err := transform.TransformTrade(tradeInput.OperationIndex, tradeInput.OperationHistoryID, tradeInput.Transaction, tradeInput.CloseTime, env.NetworkPassphrase)
if err != nil {
parsedID := toid.Parse(tradeInput.OperationHistoryID)
cmdLogger.LogError(fmt.Errorf("from ledger %d, transaction %d, operation %d: %v", parsedID.LedgerSequence, parsedID.TransactionOrder, parsedID.OperationOrder, err))
Expand Down
12 changes: 6 additions & 6 deletions internal/input/orderbooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type OrderbookParser struct {
}

// convertOffer converts an offer to its normalized form and adds it to the AllConvertedOffers
func (o *OrderbookParser) convertOffer(allConvertedOffers []transform.NormalizedOfferOutput, index int, offer ingest.Change, seq uint32, wg *sync.WaitGroup) {
func (o *OrderbookParser) convertOffer(allConvertedOffers []transform.NormalizedOfferOutput, index int, offer ingest.Change, seq uint32, wg *sync.WaitGroup, passphrase string) {
defer wg.Done()
transformed, err := transform.TransformOfferNormalized(offer, seq)
transformed, err := transform.TransformOfferNormalized(offer, seq, passphrase)
if err != nil {
errorMsg := fmt.Errorf("error json marshalling offer #%d in ledger sequence number #%d: %s", index, seq, err)
o.Logger.LogError(errorMsg)
Expand All @@ -59,12 +59,12 @@ func NewOrderbookParser(logger *utils.EtlLogger) OrderbookParser {
}
}

func (o *OrderbookParser) parseOrderbook(orderbook []ingest.Change, seq uint32) {
func (o *OrderbookParser) parseOrderbook(orderbook []ingest.Change, seq uint32, passphrase string) {
var group sync.WaitGroup
allConverted := make([]transform.NormalizedOfferOutput, len(orderbook))
for i, v := range orderbook {
group.Add(1)
go o.convertOffer(allConverted, i, v, seq, &group)
go o.convertOffer(allConverted, i, v, seq, &group, passphrase)
}

group.Wait()
Expand Down Expand Up @@ -237,7 +237,7 @@ func StreamOrderbooks(core *ledgerbackend.CaptiveStellarCore, start, end, batchS
}

// ReceiveParsedOrderbooks reads a batch from the orderbookChannel, parses it using an orderbook parser, and returns the parser.
func ReceiveParsedOrderbooks(orderbookChannel chan OrderbookBatch, logger *utils.EtlLogger) *OrderbookParser {
func ReceiveParsedOrderbooks(orderbookChannel chan OrderbookBatch, logger *utils.EtlLogger, passphrase string) *OrderbookParser {
batchParser := NewOrderbookParser(logger)
batchRead := false
for {
Expand All @@ -250,7 +250,7 @@ func ReceiveParsedOrderbooks(orderbookChannel chan OrderbookBatch, logger *utils
}

for seq, orderbook := range batch.Orderbooks {
batchParser.parseOrderbook(orderbook, seq)
batchParser.parseOrderbook(orderbook, seq, passphrase)
}

batchRead = true
Expand Down
18 changes: 15 additions & 3 deletions internal/transform/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"github.com/stellar/stellar-etl/v2/internal/toid"
"github.com/stellar/stellar-etl/v2/internal/utils"

"github.com/stellar/go/strkey"
"github.com/stellar/go/xdr"
)

// TransformAsset converts an asset from a payment operation into a form suitable for BigQuery
func TransformAsset(operation xdr.Operation, operationIndex int32, transactionIndex int32, ledgerSeq int32, lcm xdr.LedgerCloseMeta) (AssetOutput, error) {
func TransformAsset(operation xdr.Operation, operationIndex int32, transactionIndex int32, ledgerSeq int32, lcm xdr.LedgerCloseMeta, passphrase string) (AssetOutput, error) {
operationID := toid.New(ledgerSeq, int32(transactionIndex), operationIndex).ToInt64()

opType := operation.Body.Type
Expand All @@ -37,7 +38,7 @@ func TransformAsset(operation xdr.Operation, operationIndex int32, transactionIn

}

outputAsset, err := transformSingleAsset(asset)
outputAsset, err := transformSingleAsset(asset, passphrase)
if err != nil {
return AssetOutput{}, fmt.Errorf("%s (id %d)", err.Error(), operationID)
}
Expand All @@ -52,7 +53,7 @@ func TransformAsset(operation xdr.Operation, operationIndex int32, transactionIn
return outputAsset, nil
}

func transformSingleAsset(asset xdr.Asset) (AssetOutput, error) {
func transformSingleAsset(asset xdr.Asset, passphrase string) (AssetOutput, error) {
var outputAssetType, outputAssetCode, outputAssetIssuer string
err := asset.Extract(&outputAssetType, &outputAssetCode, &outputAssetIssuer)
if err != nil {
Expand All @@ -61,11 +62,22 @@ func transformSingleAsset(asset xdr.Asset) (AssetOutput, error) {

farmAssetID := FarmHashAsset(outputAssetCode, outputAssetIssuer, outputAssetType)

contractIdByte, err := asset.ContractID(passphrase)
if err != nil {
return AssetOutput{}, err
}

contractId, err := strkey.Encode(strkey.VersionByteContract, contractIdByte[:])
if err != nil {
return AssetOutput{}, err
}

return AssetOutput{
AssetCode: outputAssetCode,
AssetIssuer: outputAssetIssuer,
AssetType: outputAssetType,
AssetID: farmAssetID,
ContractId: contractId,
}, nil
}

Expand Down
17 changes: 11 additions & 6 deletions internal/transform/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func TestTransformAsset(t *testing.T) {
index int32
txnIndex int32
// transaction xdr.TransactionEnvelope
lcm xdr.LedgerCloseMeta
lcm xdr.LedgerCloseMeta
passphrase string
}

type transformTest struct {
Expand Down Expand Up @@ -49,17 +50,19 @@ func TestTransformAsset(t *testing.T) {
for i, op := range hardCodedInputTransaction.Envelope.Operations() {
tests = append(tests, transformTest{
input: assetInput{
operation: op,
index: int32(i),
txnIndex: int32(i),
lcm: genericLedgerCloseMeta},
operation: op,
index: int32(i),
txnIndex: int32(i),
lcm: genericLedgerCloseMeta,
passphrase: "test passphrase",
},
wantOutput: hardCodedOutputArray[i],
wantErr: nil,
})
}

for _, test := range tests {
actualOutput, actualError := TransformAsset(test.input.operation, test.input.index, test.input.txnIndex, 0, test.input.lcm)
actualOutput, actualError := TransformAsset(test.input.operation, test.input.index, test.input.txnIndex, 0, test.input.lcm, test.input.passphrase)
assert.Equal(t, test.wantErr, actualError)
assert.Equal(t, test.wantOutput, actualOutput)
}
Expand Down Expand Up @@ -110,6 +113,7 @@ func makeAssetTestOutput() (transformedAssets []AssetOutput) {
AssetID: -8205667356306085451,
ClosedAt: time.Date(1970, time.January, 1, 0, 0, 10, 0, time.UTC),
LedgerSequence: 2,
ContractId: "CCJCSP2CMLAWALLQ4ZBPML2EPYNN7AIC5COPC43PSDR6HZ3ZCVXUNL5M",
},
{
AssetCode: "",
Expand All @@ -118,6 +122,7 @@ func makeAssetTestOutput() (transformedAssets []AssetOutput) {
AssetID: -5706705804583548011,
ClosedAt: time.Date(1970, time.January, 1, 0, 0, 10, 0, time.UTC),
LedgerSequence: 2,
ContractId: "CCCVYPOBCE4ZKFBTNRI465A7N2AT3YMLEWXEY5LN76O6NYCHYXYPVXY7",
},
}
return
Expand Down
5 changes: 3 additions & 2 deletions internal/transform/claimable_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func transformClaimants(claimants []xdr.Claimant) []Claimant {
}

// TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery
func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ClaimableBalanceOutput, error) {
func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry, passphrase string) (ClaimableBalanceOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
return ClaimableBalanceOutput{}, err
Expand All @@ -40,7 +40,7 @@ func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHead
balanceIDStrkey := balanceEntry.BalanceId.MustEncodeToStrkey()

outputFlags := uint32(balanceEntry.Flags())
outputAsset, err := transformSingleAsset(balanceEntry.Asset)
outputAsset, err := transformSingleAsset(balanceEntry.Asset, passphrase)
if err != nil {
return ClaimableBalanceOutput{}, err
}
Expand Down Expand Up @@ -72,6 +72,7 @@ func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHead
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
BalanceIDStrkey: balanceIDStrkey,
ContractId: outputAsset.ContractId,
}
return transformed, nil
}
3 changes: 2 additions & 1 deletion internal/transform/claimable_balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestTransformClaimableBalance(t *testing.T) {
LedgerSeq: 10,
},
}
actualOutput, actualError := TransformClaimableBalance(test.input.ingest, header)
actualOutput, actualError := TransformClaimableBalance(test.input.ingest, header, "test passphrase")
assert.Equal(t, test.wantErr, actualError)
assert.Equal(t, test.wantOutput, actualOutput)
}
Expand Down Expand Up @@ -128,5 +128,6 @@ func makeClaimableBalanceTestOutput() ClaimableBalanceOutput {
LedgerSequence: 10,
ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC),
BalanceIDStrkey: "BAAACAQDAQCQMBYIBEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACPGI",
ContractId: "CAVDLQWFTGGUWJ4DDFOAQBZYVMJUXJDJGTCFP5NLL4JWH2ITE2VWCUML",
}
}
38 changes: 19 additions & 19 deletions internal/transform/effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (e *effectsWrapper) addPaymentEffects() {
op := e.operation.operation.Body.MustPaymentOp()

details := map[string]interface{}{"amount": amount.String(op.Amount)}
addAssetDetails(details, op.Asset, "")
addAssetDetails(details, op.Asset, "", e.operation.network)

e.addMuxed(
&op.Destination,
Expand All @@ -453,7 +453,7 @@ func (e *effectsWrapper) pathPaymentStrictReceiveEffects() error {
source := e.operation.SourceAccount()

details := map[string]interface{}{"amount": amount.String(op.DestAmount)}
addAssetDetails(details, op.DestAsset, "")
addAssetDetails(details, op.DestAsset, "", e.operation.network)

e.addMuxed(
&op.Destination,
Expand All @@ -463,7 +463,7 @@ func (e *effectsWrapper) pathPaymentStrictReceiveEffects() error {

result := e.operation.OperationResult().MustPathPaymentStrictReceiveResult()
details = map[string]interface{}{"amount": amount.String(result.SendAmount())}
addAssetDetails(details, op.SendAsset, "")
addAssetDetails(details, op.SendAsset, "", e.operation.network)

e.addMuxed(
source,
Expand All @@ -481,11 +481,11 @@ func (e *effectsWrapper) addPathPaymentStrictSendEffects() error {
result := e.operation.OperationResult().MustPathPaymentStrictSendResult()

details := map[string]interface{}{"amount": amount.String(result.DestAmount())}
addAssetDetails(details, op.DestAsset, "")
addAssetDetails(details, op.DestAsset, "", e.operation.network)
e.addMuxed(&op.Destination, EffectAccountCredited, details)

details = map[string]interface{}{"amount": amount.String(op.SendAmount)}
addAssetDetails(details, op.SendAsset, "")
addAssetDetails(details, op.SendAsset, "", e.operation.network)
e.addMuxed(source, EffectAccountDebited, details)

return e.addIngestTradeEffects(*source, resultSuccess.Offers, true)
Expand Down Expand Up @@ -687,7 +687,7 @@ func (e *effectsWrapper) addChangeTrustEffects() error {
return err
}
} else {
addAssetDetails(details, op.Line.ToAsset(), "")
addAssetDetails(details, op.Line.ToAsset(), "", e.operation.network)
}

e.addMuxed(source, effect, details)
Expand All @@ -704,7 +704,7 @@ func (e *effectsWrapper) addAllowTrustEffects() error {
details := map[string]interface{}{
"trustor": op.Trustor.Address(),
}
addAssetDetails(details, asset, "")
addAssetDetails(details, asset, "", e.operation.network)

switch {
case xdr.TrustLineFlags(op.Authorize).IsAuthorized():
Expand Down Expand Up @@ -851,7 +851,7 @@ func (e *effectsWrapper) addCreateClaimableBalanceEffects(changes []ingest.Chang
details := map[string]interface{}{
"amount": amount.String(cb.Amount),
}
addAssetDetails(details, cb.Asset, "")
addAssetDetails(details, cb.Asset, "", e.operation.network)
e.addMuxed(
source,
EffectAccountDebited,
Expand Down Expand Up @@ -955,7 +955,7 @@ func (e *effectsWrapper) addClaimClaimableBalanceEffects(changes []ingest.Change
details = map[string]interface{}{
"amount": amount.String(cBalance.Amount),
}
addAssetDetails(details, cBalance.Asset, "")
addAssetDetails(details, cBalance.Asset, "", e.operation.network)
e.addMuxed(
source,
EffectAccountCredited,
Expand Down Expand Up @@ -984,7 +984,7 @@ func (e *effectsWrapper) addIngestTradeEffects(buyer xdr.MuxedAccount, claims []

func (e *effectsWrapper) addClaimTradeEffects(buyer xdr.MuxedAccount, claim xdr.ClaimAtom, isPathPayment bool) {
seller := claim.SellerId()
bd, sd := tradeDetails(buyer, seller, claim)
bd, sd := tradeDetails(buyer, seller, claim, e.operation.network)

tradeEffects := []EffectType{
EffectTrade,
Expand Down Expand Up @@ -1039,7 +1039,7 @@ func (e *effectsWrapper) addClawbackEffects() error {
"amount": amount.String(op.Amount),
}
source := e.operation.SourceAccount()
addAssetDetails(details, op.Asset, "")
addAssetDetails(details, op.Asset, "", e.operation.network)

// The funds will be burned, but even with that, we generated an account credited effect
e.addMuxed(
Expand Down Expand Up @@ -1078,7 +1078,7 @@ func (e *effectsWrapper) addClawbackClaimableBalanceEffects(changes []ingest.Cha
if c.Type == xdr.LedgerEntryTypeClaimableBalance && c.Post == nil && c.Pre != nil {
cb := c.Pre.Data.ClaimableBalance
details = map[string]interface{}{"amount": amount.String(cb.Amount)}
addAssetDetails(details, cb.Asset, "")
addAssetDetails(details, cb.Asset, "", e.operation.network)
e.addMuxed(
source,
EffectAccountCredited,
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (e *effectsWrapper) addTrustLineFlagsEffect(
details := map[string]interface{}{
"trustor": trustor.Address(),
}
addAssetDetails(details, asset, "")
addAssetDetails(details, asset, "", e.operation.network)

var flagDetailsAdded bool
if setFlags != nil {
Expand Down Expand Up @@ -1226,24 +1226,24 @@ func setAuthFlagDetails(flagDetails map[string]interface{}, flags xdr.AccountFla
}
}

func tradeDetails(buyer xdr.MuxedAccount, seller xdr.AccountId, claim xdr.ClaimAtom) (bd map[string]interface{}, sd map[string]interface{}) {
func tradeDetails(buyer xdr.MuxedAccount, seller xdr.AccountId, claim xdr.ClaimAtom, passphrase string) (bd map[string]interface{}, sd map[string]interface{}) {
bd = map[string]interface{}{
"offer_id": claim.OfferId(),
"seller": seller.Address(),
"bought_amount": amount.String(claim.AmountSold()),
"sold_amount": amount.String(claim.AmountBought()),
}
addAssetDetails(bd, claim.AssetSold(), "bought_")
addAssetDetails(bd, claim.AssetBought(), "sold_")
addAssetDetails(bd, claim.AssetSold(), "bought_", passphrase)
addAssetDetails(bd, claim.AssetBought(), "sold_", passphrase)

sd = map[string]interface{}{
"offer_id": claim.OfferId(),
"bought_amount": amount.String(claim.AmountBought()),
"sold_amount": amount.String(claim.AmountSold()),
}
addAccountAndMuxedAccountDetails(sd, buyer, "seller")
addAssetDetails(sd, claim.AssetBought(), "bought_")
addAssetDetails(sd, claim.AssetSold(), "sold_")
addAssetDetails(sd, claim.AssetBought(), "bought_", passphrase)
addAssetDetails(sd, claim.AssetSold(), "sold_", passphrase)

return
}
Expand Down Expand Up @@ -1332,7 +1332,7 @@ func (e *effectsWrapper) addInvokeHostFunctionEffects(events []contractevents.Ev
}

details := make(map[string]interface{}, 4)
addAssetDetails(details, evt.GetAsset(), "")
addAssetDetails(details, evt.GetAsset(), "", e.operation.network)

//
// Note: We ignore effects that involve contracts (until the day we have
Expand Down
Loading
Loading