diff --git a/.gitignore b/.gitignore index c1a4ec28..c5944f6a 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ /.vscode/*.sql /.vscode/settings.json /.vscode/launch.json +/captive-core* .idea debug .bundle @@ -12,4 +13,5 @@ debug *.crt *.csr *.key +*.txt stellar-etl diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index 9e93327d..d6bdf1e5 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "fmt" "math" "os" @@ -11,8 +12,11 @@ import ( "github.com/stellar/stellar-etl/internal/input" "github.com/stellar/stellar-etl/internal/transform" "github.com/stellar/stellar-etl/internal/utils" + "github.com/stellar/stellar-etl/internal/utils/verify" ) +const verifyBatchSize = 24000 + var exportLedgerEntryChangesCmd = &cobra.Command{ Use: "export_ledger_entry_changes", Short: "This command exports the changes in accounts, offers, trustlines and liquidity pools.", @@ -29,12 +33,17 @@ be exported.`, endNum, strictExport, isTest, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger) cmdLogger.StrictExport = strictExport env := utils.GetEnvironmentDetails(isTest) + archive, err := utils.CreateHistoryArchiveClient(env.ArchiveURLs) + if err != nil { + cmdLogger.Fatalf("error creating history archive: ", err) + } execPath, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger) exportAccounts, exportOffers, exportTrustlines, exportPools, exportBalances := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger) gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger) + ctx := context.Background() - err := os.MkdirAll(outputFolder, os.ModePerm) + err = os.MkdirAll(outputFolder, os.ModePerm) if err != nil { cmdLogger.Fatalf("unable to mkdir %s: %v", outputFolder, err) } @@ -71,6 +80,7 @@ be exported.`, endNum = math.MaxInt32 } + verifyOutputs := make(map[uint32]transform.TransformedOutput, endNum) changeChan := make(chan input.ChangeBatch) closeChan := make(chan int) go input.StreamChanges(core, startNum, endNum, batchSize, changeChan, closeChan, env, cmdLogger) @@ -91,77 +101,120 @@ be exported.`, "trustlines": {}, "liquidity_pools": {}, } + for entryType, changes := range batch.Changes { switch entryType { case xdr.LedgerEntryTypeAccount: for _, change := range changes { + entry, _, _, _ := utils.ExtractEntryFromChange(change) if changed, err := change.AccountChangedExceptSigners(); err != nil { cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err)) continue } else if changed { acc, err := transform.TransformAccount(change) if err != nil { - entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) continue } transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc) + + if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok { + x := verifyOutputs[actualLedger] + x.Accounts = append(x.Accounts, acc) + verifyOutputs[actualLedger] = x + } } if change.AccountSignersChanged() { signers, err := transform.TransformSigners(change) if err != nil { - entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err)) continue } for _, s := range signers { transformedOutputs["signers"] = append(transformedOutputs["signers"], s) + + if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok { + x := verifyOutputs[actualLedger] + x.Signers = append(x.Signers, s) + verifyOutputs[actualLedger] = x + } } } } case xdr.LedgerEntryTypeClaimableBalance: for _, change := range changes { + entry, _, _, _ := utils.ExtractEntryFromChange(change) balance, err := transform.TransformClaimableBalance(change) if err != nil { - entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) continue } transformedOutputs["claimable_balances"] = append(transformedOutputs["claimable_balances"], balance) + + if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok { + x := verifyOutputs[actualLedger] + x.Claimable_balances = append(x.Claimable_balances, balance) + verifyOutputs[actualLedger] = x + } } case xdr.LedgerEntryTypeOffer: for _, change := range changes { + entry, _, _, _ := utils.ExtractEntryFromChange(change) offer, err := transform.TransformOffer(change) if err != nil { - entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) continue } transformedOutputs["offers"] = append(transformedOutputs["offers"], offer) + + if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok { + x := verifyOutputs[actualLedger] + x.Offers = append(x.Offers, offer) + verifyOutputs[actualLedger] = x + } } case xdr.LedgerEntryTypeTrustline: for _, change := range changes { + entry, _, _, _ := utils.ExtractEntryFromChange(change) trust, err := transform.TransformTrustline(change) if err != nil { - entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) continue } transformedOutputs["trustlines"] = append(transformedOutputs["trustlines"], trust) + + if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok { + x := verifyOutputs[actualLedger] + x.Trustlines = append(x.Trustlines, trust) + verifyOutputs[actualLedger] = x + } } case xdr.LedgerEntryTypeLiquidityPool: for _, change := range changes { + entry, _, _, _ := utils.ExtractEntryFromChange(change) pool, err := transform.TransformPool(change) if err != nil { - entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) continue } transformedOutputs["liquidity_pools"] = append(transformedOutputs["liquidity_pools"], pool) + + if ok, actualLedger := utils.LedgerIsCheckpoint(entry.LastModifiedLedgerSeq); ok { + x := verifyOutputs[actualLedger] + x.Liquidity_pools = append(x.Liquidity_pools, pool) + verifyOutputs[actualLedger] = x + } } } } + for checkpointLedgers := range verifyOutputs { + err = verify.VerifyState(ctx, verifyOutputs[checkpointLedgers], archive, checkpointLedgers, verifyBatchSize) + if err != nil { + panic(err) + } + } + err := exportTransformedData(batch.BatchStart, batch.BatchEnd, outputFolder, transformedOutputs, gcpCredentials, gcsBucket, extra) if err != nil { cmdLogger.LogError(err) diff --git a/go.mod b/go.mod index 02e38664..1d13059d 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,14 @@ go 1.17 require ( cloud.google.com/go/storage v1.10.0 github.com/guregu/null v2.1.3-0.20151024101046-79c5bd36b615+incompatible + github.com/lib/pq v1.9.0 github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.7.0 github.com/spf13/cobra v1.1.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.1 - github.com/stellar/go v0.0.0-20220527123553-6af10a1f8a3f + github.com/stellar/go v0.0.0-20221209134558-b4ba6f8e67f2 github.com/stretchr/testify v1.7.0 ) @@ -40,7 +41,6 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect - github.com/lib/pq v1.9.0 // indirect github.com/magiconair/properties v1.8.4 // indirect github.com/mattn/go-sqlite3 v1.14.5 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect @@ -63,13 +63,14 @@ require ( github.com/stretchr/objx v0.3.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect go.opencensus.io v0.23.0 // indirect + golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect - golang.org/x/mod v0.4.2 // indirect + golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 // indirect golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/tools v0.1.4 // indirect + golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/api v0.50.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 5ff29655..107b5555 100644 --- a/go.sum +++ b/go.sum @@ -83,6 +83,7 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/creachadair/jrpc2 v0.41.1/go.mod h1:k2mGfjsgE2h2Vo12C9NzZguUzzl3gnfGCmLIvg84pVE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -99,6 +100,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -171,8 +173,9 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -402,8 +405,8 @@ github.com/spf13/viper v0.0.0-20150621231900-db7ff930a189/go.mod h1:A8kyI5cUJhb8 github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/stellar/go v0.0.0-20220527123553-6af10a1f8a3f h1:EEZPJNnDWiuOti27jj62XV4eSMAUB9vYo1t7mQukTp0= -github.com/stellar/go v0.0.0-20220527123553-6af10a1f8a3f/go.mod h1:KwWGjhG5DceU63MoF4eAsjbt++Pn7hwYfdXS66WouGw= +github.com/stellar/go v0.0.0-20221209134558-b4ba6f8e67f2 h1:7nqKUAw7kKdIkd8HGWfnRwIO3Cv38FWcb0EQ3IMLjwA= +github.com/stellar/go v0.0.0-20221209134558-b4ba6f8e67f2/go.mod h1:AbsNBrmclUvQcs0EeN+LDhubhX7tyo18r5fw0SCz/oM= github.com/stellar/go-xdr v0.0.0-20211103144802-8017fc4bdfee h1:fbVs0xmXpBvVS4GBeiRmAE3Le70ofAqFMch1GTiq/e8= github.com/stellar/go-xdr v0.0.0-20211103144802-8017fc4bdfee/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps= github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/go.mod h1:7CJ23pXirXBJq45DqvO6clzTEGM/l1SfKrgrzLry8b4= @@ -440,6 +443,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -461,6 +465,7 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -472,6 +477,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e h1:+WEEuIdZHnUeJJmEUjyYC2gfUMj69yZXw17EnHg/otA= +golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e/go.mod h1:Kr81I6Kryrl9sr8s2FK3vxD90NdsKWRuOIl2O4CvYbA= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -497,8 +504,9 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= +golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -544,6 +552,7 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -572,6 +581,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -628,6 +638,7 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 h1:nhht2DYV/Sn3qOayu8lM+cU1ii9sTLUeBQwQQfUHtrs= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -700,8 +711,9 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20= +golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/transform/account.go b/internal/transform/account.go index 02ba43b8..810e0123 100644 --- a/internal/transform/account.go +++ b/internal/transform/account.go @@ -9,7 +9,7 @@ import ( "github.com/stellar/stellar-etl/internal/utils" ) -//TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery +// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { @@ -77,27 +77,34 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) { outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) transformedAccount := AccountOutput{ - AccountID: outputID, - Balance: utils.ConvertStroopValueToReal(outputBalance), - BuyingLiabilities: utils.ConvertStroopValueToReal(outputBuyingLiabilities), - SellingLiabilities: utils.ConvertStroopValueToReal(outputSellingLiabilities), - SequenceNumber: outputSequenceNumber, - SequenceLedger: zero.IntFrom(int64(outputSequenceLedger)), - SequenceTime: zero.IntFrom(int64(outputSequenceTime)), - NumSubentries: outputNumSubentries, - InflationDestination: outputInflationDest, - Flags: outputFlags, - HomeDomain: outputHomeDomain, - MasterWeight: outputMasterWeight, - ThresholdLow: outputThreshLow, - ThresholdMedium: outputThreshMed, - ThresholdHigh: outputThreshHigh, - LastModifiedLedger: outputLastModifiedLedger, - Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), - NumSponsored: uint32(accountEntry.NumSponsored()), - NumSponsoring: uint32(accountEntry.NumSponsoring()), - LedgerEntryChange: uint32(changeType), - Deleted: outputDeleted, + AccountID: outputID, + RawBalance: outputBalance, + RawBuyingLiabilities: outputBuyingLiabilities, + RawSellingLiabilities: outputSellingLiabilities, + Balance: utils.ConvertStroopValueToReal(outputBalance), + BuyingLiabilities: utils.ConvertStroopValueToReal(outputBuyingLiabilities), + SellingLiabilities: utils.ConvertStroopValueToReal(outputSellingLiabilities), + SequenceNumber: outputSequenceNumber, + SequenceLedger: zero.IntFrom(int64(outputSequenceLedger)), + SequenceTime: zero.IntFrom(int64(outputSequenceTime)), + NumSubentries: outputNumSubentries, + InflationDestination: outputInflationDest, + Flags: outputFlags, + HomeDomain: outputHomeDomain, + MasterWeight: outputMasterWeight, + ThresholdLow: outputThreshLow, + ThresholdMedium: outputThreshMed, + ThresholdHigh: outputThreshHigh, + LastModifiedLedger: outputLastModifiedLedger, + Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), + NumSponsored: uint32(accountEntry.NumSponsored()), + NumSponsoring: uint32(accountEntry.NumSponsoring()), + LedgerEntryChange: uint32(changeType), + Deleted: outputDeleted, + RawMasterWeight: accountEntry.MasterKeyWeight(), + RawThresholdLow: accountEntry.ThresholdLow(), + RawThresholdMedium: accountEntry.ThresholdMedium(), + RawThresholdHigh: accountEntry.ThresholdHigh(), } return transformedAccount, nil } diff --git a/internal/transform/account_test.go b/internal/transform/account_test.go index 482160b9..11a8bede 100644 --- a/internal/transform/account_test.go +++ b/internal/transform/account_test.go @@ -151,24 +151,31 @@ func makeAccountTestInput() ingest.Change { func makeAccountTestOutput() AccountOutput { return AccountOutput{ - AccountID: testAccount1Address, - Balance: 1.0959979, - BuyingLiabilities: 0.0001, - SellingLiabilities: 0.00015, - SequenceNumber: 117801117454198833, - NumSubentries: 141, - InflationDestination: testAccount2Address, - Flags: 4, - HomeDomain: "examplehome.com", - MasterWeight: 2, - ThresholdLow: 1, - ThresholdMedium: 3, - ThresholdHigh: 5, - Sponsor: null.StringFrom(testAccount3Address), - NumSponsored: 3, - NumSponsoring: 1, - LastModifiedLedger: 30705278, - LedgerEntryChange: 2, - Deleted: true, + AccountID: testAccount1Address, + RawBalance: 10959979, + RawBuyingLiabilities: 1000, + RawSellingLiabilities: 1500, + Balance: 1.0959979, + BuyingLiabilities: 0.0001, + SellingLiabilities: 0.00015, + SequenceNumber: 117801117454198833, + NumSubentries: 141, + InflationDestination: testAccount2Address, + Flags: 4, + HomeDomain: "examplehome.com", + RawMasterWeight: 2, + RawThresholdLow: 1, + RawThresholdMedium: 3, + RawThresholdHigh: 5, + MasterWeight: 2, + ThresholdLow: 1, + ThresholdMedium: 3, + ThresholdHigh: 5, + Sponsor: null.StringFrom(testAccount3Address), + NumSponsored: 3, + NumSponsoring: 1, + LastModifiedLedger: 30705278, + LedgerEntryChange: 2, + Deleted: true, } } diff --git a/internal/transform/claimable_balance.go b/internal/transform/claimable_balance.go index 2ed25e85..fc96895f 100644 --- a/internal/transform/claimable_balance.go +++ b/internal/transform/claimable_balance.go @@ -20,7 +20,7 @@ func transformClaimants(claimants []xdr.Claimant) []Claimant { return transformed } -//TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery +// TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { @@ -47,11 +47,13 @@ func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutp transformed := ClaimableBalanceOutput{ BalanceID: balanceID, + Asset: balanceEntry.Asset, AssetCode: outputAsset.AssetCode, AssetIssuer: outputAsset.AssetIssuer, AssetType: outputAsset.AssetType, Claimants: outputClaimants, AssetAmount: float64(outputAmount) / 1.0e7, + RawAssetAmount: outputAmount, Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), LastModifiedLedger: outputLastModifiedLedger, LedgerEntryChange: uint32(changeType), diff --git a/internal/transform/claimable_balance_test.go b/internal/transform/claimable_balance_test.go index 0cd7a23f..78abecbc 100644 --- a/internal/transform/claimable_balance_test.go +++ b/internal/transform/claimable_balance_test.go @@ -98,10 +98,12 @@ func makeClaimableBalanceTestOutput() ClaimableBalanceOutput { }, }, }, + Asset: xdr.MustNewCreditAsset("\x01\x02\x03\x04\x05\x06\a\b\t", "GBT4YAEGJQ5YSFUMNKX6BPBUOCPNAIOFAVZOF6MIME2CECBMEIUXFZZN"), AssetIssuer: "GBT4YAEGJQ5YSFUMNKX6BPBUOCPNAIOFAVZOF6MIME2CECBMEIUXFZZN", AssetType: "credit_alphanum12", AssetCode: "\x01\x02\x03\x04\x05\x06\a\b\t", AssetAmount: 999, + RawAssetAmount: 9990000000, Sponsor: null.StringFrom("GAAQEAYEAUDAOCAJAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABO3W"), Flags: 10, LastModifiedLedger: 30705278, diff --git a/internal/transform/liquidity_pool.go b/internal/transform/liquidity_pool.go index 276fa78c..2288d877 100644 --- a/internal/transform/liquidity_pool.go +++ b/internal/transform/liquidity_pool.go @@ -8,7 +8,7 @@ import ( "github.com/stellar/stellar-etl/internal/utils" ) -//TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery +// TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { @@ -46,14 +46,17 @@ func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { transformedPool := PoolOutput{ PoolID: PoolIDToString(lp.LiquidityPoolId), + RawPoolType: lp.Body.Type, PoolType: poolType, PoolFee: uint32(cp.Params.Fee), TrustlineCount: uint64(cp.PoolSharesTrustLineCount), PoolShareCount: utils.ConvertStroopValueToReal(cp.TotalPoolShares), + AssetA: cp.Params.AssetA, AssetAType: assetAType, AssetACode: assetACode, AssetAIssuer: assetAIssuer, AssetAReserve: utils.ConvertStroopValueToReal(cp.ReserveA), + AssetB: cp.Params.AssetB, AssetBType: assetBType, AssetBCode: assetBCode, AssetBIssuer: assetBIssuer, @@ -61,6 +64,9 @@ func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, + RawAssetAReserve: cp.ReserveA, + RawAssetBReserve: cp.ReserveB, + RawPoolShareCount: cp.TotalPoolShares, } return transformedPool, nil } diff --git a/internal/transform/liquidity_pool_test.go b/internal/transform/liquidity_pool_test.go index 996e9c1f..35ecb4dd 100644 --- a/internal/transform/liquidity_pool_test.go +++ b/internal/transform/liquidity_pool_test.go @@ -107,5 +107,11 @@ func makePoolTestOutput() PoolOutput { LastModifiedLedger: 30705278, LedgerEntryChange: 2, Deleted: true, + AssetA: xdr.MustNewNativeAsset(), + AssetB: xdr.MustNewCreditAsset("USSD", "GBVVRXLMNCJQW3IDDXC3X6XCH35B5Q7QXNMMFPENSOGUPQO7WO7HGZPA"), + RawAssetAReserve: 105, + RawAssetBReserve: 10, + RawPoolShareCount: 35, + RawPoolType: 0, } } diff --git a/internal/transform/offer.go b/internal/transform/offer.go index 294f0faa..1cb3c37d 100644 --- a/internal/transform/offer.go +++ b/internal/transform/offer.go @@ -71,6 +71,8 @@ func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) { transformedOffer := OfferOutput{ SellerID: outputSellerID, OfferID: outputOfferID, + SellingAsset: offerEntry.Selling, + BuyingAsset: offerEntry.Buying, SellingAssetType: outputSellingAsset.AssetType, SellingAssetCode: outputSellingAsset.AssetCode, SellingAssetIssuer: outputSellingAsset.AssetIssuer, @@ -78,6 +80,7 @@ func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) { BuyingAssetCode: outputBuyingAsset.AssetCode, BuyingAssetIssuer: outputBuyingAsset.AssetIssuer, Amount: utils.ConvertStroopValueToReal(outputAmount), + RawAmount: outputAmount, PriceN: outputPriceN, PriceD: outputPriceD, Price: outputPrice, diff --git a/internal/transform/offer_test.go b/internal/transform/offer_test.go index 453e6098..323f4482 100644 --- a/internal/transform/offer_test.go +++ b/internal/transform/offer_test.go @@ -156,5 +156,8 @@ func makeOfferTestOutput() OfferOutput { LedgerEntryChange: 2, Deleted: true, Sponsor: null.StringFrom(testAccount3Address), + SellingAsset: xdr.MustNewNativeAsset(), + BuyingAsset: xdr.MustNewCreditAsset("ETH", "GBT4YAEGJQ5YSFUMNKX6BPBUOCPNAIOFAVZOF6MIME2CECBMEIUXFZZN"), + RawAmount: 2628450327, } } diff --git a/internal/transform/schema.go b/internal/transform/schema.go index 9de414ee..c08c7e22 100644 --- a/internal/transform/schema.go +++ b/internal/transform/schema.go @@ -63,27 +63,34 @@ type TransactionOutput struct { // AccountOutput is a representation of an account that aligns with the BigQuery table accounts type AccountOutput struct { - AccountID string `json:"account_id"` // account address - Balance float64 `json:"balance"` - BuyingLiabilities float64 `json:"buying_liabilities"` - SellingLiabilities float64 `json:"selling_liabilities"` - SequenceNumber int64 `json:"sequence_number"` - SequenceLedger zero.Int `json:"sequence_ledger"` - SequenceTime zero.Int `json:"sequence_time"` - NumSubentries uint32 `json:"num_subentries"` - InflationDestination string `json:"inflation_destination"` - Flags uint32 `json:"flags"` - HomeDomain string `json:"home_domain"` - MasterWeight int32 `json:"master_weight"` - ThresholdLow int32 `json:"threshold_low"` - ThresholdMedium int32 `json:"threshold_medium"` - ThresholdHigh int32 `json:"threshold_high"` - Sponsor null.String `json:"sponsor"` - NumSponsored uint32 `json:"num_sponsored"` - NumSponsoring uint32 `json:"num_sponsoring"` - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + AccountID string `json:"account_id"` // account address + RawBalance xdr.Int64 `json:"-"` + RawBuyingLiabilities xdr.Int64 `json:"-"` + RawSellingLiabilities xdr.Int64 `json:"-"` + Balance float64 `json:"balance"` + BuyingLiabilities float64 `json:"buying_liabilities"` + SellingLiabilities float64 `json:"selling_liabilities"` + SequenceNumber int64 `json:"sequence_number"` + SequenceLedger zero.Int `json:"sequence_ledger"` + SequenceTime zero.Int `json:"sequence_time"` + NumSubentries uint32 `json:"num_subentries"` + InflationDestination string `json:"inflation_destination"` + Flags uint32 `json:"flags"` + HomeDomain string `json:"home_domain"` + MasterWeight int32 `json:"master_weight"` + ThresholdLow int32 `json:"threshold_low"` + ThresholdMedium int32 `json:"threshold_medium"` + ThresholdHigh int32 `json:"threshold_high"` + Sponsor null.String `json:"sponsor"` + NumSponsored uint32 `json:"num_sponsored"` + NumSponsoring uint32 `json:"num_sponsoring"` + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + RawMasterWeight byte `json:"-"` + RawThresholdLow byte `json:"-"` + RawThresholdMedium byte `json:"-"` + RawThresholdHigh byte `json:"-"` } // AccountSignerOutput is a representation of an account signer that aligns with the BigQuery table account_signers @@ -112,10 +119,12 @@ type OperationOutput struct { type ClaimableBalanceOutput struct { BalanceID string `json:"balance_id"` Claimants []Claimant `json:"claimants"` + Asset xdr.Asset `json:"-"` AssetCode string `json:"asset_code"` AssetIssuer string `json:"asset_issuer"` AssetType string `json:"asset_type"` AssetAmount float64 `json:"asset_amount"` + RawAssetAmount xdr.Int64 `json:"-"` Sponsor null.String `json:"sponsor"` Flags uint32 `json:"flags"` LastModifiedLedger uint32 `json:"last_modified_ledger"` @@ -156,22 +165,28 @@ type LiquidityPoolAsset struct { // PoolOutput is a representation of a liquidity pool that aligns with the Bigquery table liquidity_pools type PoolOutput struct { - PoolID string `json:"liquidity_pool_id"` - PoolType string `json:"type"` - PoolFee uint32 `json:"fee"` - TrustlineCount uint64 `json:"trustline_count"` - PoolShareCount float64 `json:"pool_share_count"` - AssetAType string `json:"asset_a_type"` - AssetACode string `json:"asset_a_code"` - AssetAIssuer string `json:"asset_a_issuer"` - AssetAReserve float64 `json:"asset_a_amount"` - AssetBType string `json:"asset_b_type"` - AssetBCode string `json:"asset_b_code"` - AssetBIssuer string `json:"asset_b_issuer"` - AssetBReserve float64 `json:"asset_b_amount"` - LastModifiedLedger uint32 `json:"last_modified_ledger"` - LedgerEntryChange uint32 `json:"ledger_entry_change"` - Deleted bool `json:"deleted"` + PoolID string `json:"liquidity_pool_id"` + RawPoolType xdr.LiquidityPoolType `json:"-"` + PoolType string `json:"type"` + PoolFee uint32 `json:"fee"` + TrustlineCount uint64 `json:"trustline_count"` + PoolShareCount float64 `json:"pool_share_count"` + AssetA xdr.Asset `json:"-"` + AssetAType string `json:"asset_a_type"` + AssetACode string `json:"asset_a_code"` + AssetAIssuer string `json:"asset_a_issuer"` + AssetAReserve float64 `json:"asset_a_amount"` + AssetB xdr.Asset `json:"-"` + AssetBType string `json:"asset_b_type"` + AssetBCode string `json:"asset_b_code"` + AssetBIssuer string `json:"asset_b_issuer"` + AssetBReserve float64 `json:"asset_b_amount"` + LastModifiedLedger uint32 `json:"last_modified_ledger"` + LedgerEntryChange uint32 `json:"ledger_entry_change"` + Deleted bool `json:"deleted"` + RawAssetAReserve xdr.Int64 `json:"-"` + RawAssetBReserve xdr.Int64 `json:"-"` + RawPoolShareCount xdr.Int64 `json:"-"` } // AssetOutput is a representation of an asset that aligns with the BigQuery table history_assets @@ -199,12 +214,17 @@ type TrustlineOutput struct { LedgerEntryChange uint32 `json:"ledger_entry_change"` Sponsor null.String `json:"sponsor"` Deleted bool `json:"deleted"` + RawBuying xdr.Int64 `json:"-"` + RawSelling xdr.Int64 `json:"-"` + RawBalance xdr.Int64 `json:"-"` } // OfferOutput is a representation of an offer that aligns with the BigQuery table offers type OfferOutput struct { SellerID string `json:"seller_id"` // Account address of the seller OfferID int64 `json:"offer_id"` + SellingAsset xdr.Asset `json:"-"` + BuyingAsset xdr.Asset `json:"-"` SellingAssetType string `json:"selling_asset_type"` SellingAssetCode string `json:"selling_asset_code"` SellingAssetIssuer string `json:"selling_asset_issuer"` @@ -212,6 +232,7 @@ type OfferOutput struct { BuyingAssetCode string `json:"buying_asset_code"` BuyingAssetIssuer string `json:"buying_asset_issuer"` Amount float64 `json:"amount"` + RawAmount xdr.Int64 `json:"-"` PriceN int32 `json:"pricen"` PriceD int32 `json:"priced"` Price float64 `json:"price"` @@ -436,3 +457,12 @@ type TestTransaction struct { MetaXDR string Hash string } + +type TransformedOutput struct { + Accounts []AccountOutput + Signers []AccountSignerOutput + Claimable_balances []ClaimableBalanceOutput + Offers []OfferOutput + Trustlines []TrustlineOutput + Liquidity_pools []PoolOutput +} diff --git a/internal/transform/trustline.go b/internal/transform/trustline.go index 51475a3a..481a54cc 100644 --- a/internal/transform/trustline.go +++ b/internal/transform/trustline.go @@ -13,7 +13,7 @@ import ( "github.com/stellar/go/xdr" ) -//TransformTrustline converts a trustline from the history archive ingestion system into a form suitable for BigQuery +// TransformTrustline converts a trustline from the history archive ingestion system into a form suitable for BigQuery func TransformTrustline(ledgerChange ingest.Change) (TrustlineOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { @@ -55,16 +55,19 @@ func TransformTrustline(ledgerChange ingest.Change) (TrustlineOutput, error) { AssetType: int32(asset.Type), AssetCode: outputAssetCode, AssetIssuer: outputAssetIssuer, - Balance: utils.ConvertStroopValueToReal(trustEntry.Balance), TrustlineLimit: int64(trustEntry.Limit), LiquidityPoolID: poolID, - BuyingLiabilities: utils.ConvertStroopValueToReal(liabilities.Buying), - SellingLiabilities: utils.ConvertStroopValueToReal(liabilities.Selling), Flags: uint32(trustEntry.Flags), LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), Deleted: outputDeleted, + Balance: utils.ConvertStroopValueToReal(trustEntry.Balance), + BuyingLiabilities: utils.ConvertStroopValueToReal(liabilities.Buying), + SellingLiabilities: utils.ConvertStroopValueToReal(liabilities.Selling), + RawBalance: trustEntry.Balance, + RawBuying: liabilities.Buying, + RawSelling: liabilities.Selling, } return transformedTrustline, nil diff --git a/internal/transform/trustline_test.go b/internal/transform/trustline_test.go index d7f2256a..af4e08dd 100644 --- a/internal/transform/trustline_test.go +++ b/internal/transform/trustline_test.go @@ -124,6 +124,9 @@ func makeTrustlineTestOutput() []TrustlineOutput { LastModifiedLedger: 24229503, LedgerEntryChange: 1, Deleted: false, + RawBalance: 6203000, + RawBuying: 1000, + RawSelling: 2000, }, { LedgerKey: "AAAAAQAAAAAcR0GXGO76pFs4y38vJVAanjnLg4emNun7zAx0pHcDGAAAAAMBAwQFBwkAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", @@ -138,6 +141,9 @@ func makeTrustlineTestOutput() []TrustlineOutput { LastModifiedLedger: 123456789, LedgerEntryChange: 1, Deleted: false, + RawBalance: 5000000, + RawBuying: 15000, + RawSelling: 5000, }, } } diff --git a/internal/utils/main.go b/internal/utils/main.go index 2bd8985c..d00788e5 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -544,6 +544,13 @@ type EnvironmentDetails struct { CoreConfig string } +func LedgerIsCheckpoint(ledger xdr.Uint32) (bool, uint32) { + actualLedger := uint32(ledger) + actualLedgerIsCheckpoint := uint32(GetMostRecentCheckpoint(uint32(actualLedger))) + + return actualLedger == actualLedgerIsCheckpoint, actualLedger +} + // GetPassphrase returns the correct Network Passphrase based on env preference func GetEnvironmentDetails(isTest bool) (details EnvironmentDetails) { if isTest { diff --git a/internal/utils/verify/verify.go b/internal/utils/verify/verify.go new file mode 100644 index 00000000..6529be0a --- /dev/null +++ b/internal/utils/verify/verify.go @@ -0,0 +1,467 @@ +// lucas zanotelli dos santos 2022-11-18 +// this code was mostly copied from https://github.com/stellar/go/blob/master/services/horizon/internal/ingest/verify.go + +package verify + +import ( + "context" + "encoding/hex" + "fmt" + + "github.com/guregu/null" + "github.com/stellar/stellar-etl/internal/transform" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/verify" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +// stateVerifierExpectedIngestionVersion defines a version of ingestion system +// required by state verifier. This is done to prevent situations where +// ingestion has been updated with new features but state verifier does not +// check them. +// There is a test that checks it, to fix it: update the actual `verifyState` +// method instead of just updating this value! +// const stateVerifierExpectedIngestionVersion = 15 + +// verifyState is called as a go routine from pipeline post hook every 64 +// ledgers. It checks if the state is correct. If another go routine is already +// running it exits. +func VerifyState( + ctx context.Context, + transformedOutput transform.TransformedOutput, + archive historyarchive.ArchiveInterface, + ledgerSequence uint32, + verifyBatchSize int) error { + + stateReader, _ := ingest.NewCheckpointChangeReader(ctx, archive, ledgerSequence) + verifier := verify.NewStateVerifier(stateReader, nil) + + var keys []xdr.LedgerKey + keys, err := verifier.GetLedgerKeys(verifyBatchSize) + if err != nil { + return errors.Wrap(err, "verifier.GetLedgerKeys") + } + + if len(keys) == 0 { + return errors.Wrap(err, "no keys") + } + + err = addAccountsToStateVerifier(ctx, verifier, transformedOutput.Accounts, transformedOutput.Signers) + if err != nil { + return errors.Wrap(err, "addAccountsToStateVerifier failed") + } + + err = addOffersToStateVerifier(ctx, verifier, transformedOutput.Offers) + if err != nil { + return errors.Wrap(err, "addOffersToStateVerifier failed") + } + + err = addTrustLinesToStateVerifier(ctx, verifier, transformedOutput.Trustlines) + if err != nil { + return errors.Wrap(err, "addTrustLinesToStateVerifier failed") + } + + err = addClaimableBalanceToStateVerifier(ctx, verifier, transformedOutput.Claimable_balances) + if err != nil { + return errors.Wrap(err, "addClaimableBalanceToStateVerifier failed") + } + + err = addLiquidityPoolsToStateVerifier(ctx, verifier, transformedOutput.Liquidity_pools) + if err != nil { + return errors.Wrap(err, "addLiquidityPoolsToStateVerifier failed") + } + + return nil +} + +func addAccountsToStateVerifier(ctx context.Context, + verifier *verify.StateVerifier, + accounts []transform.AccountOutput, + signers []transform.AccountSignerOutput, +) error { + if len(accounts) == 0 && len(signers) == 0 { + return nil + } + + masterWeightMap := make(map[string]int32) + signersMap := make(map[string][]xdr.Signer) + + sponsoringSignersMap := make(map[string]map[string]string) + for _, row := range signers { + if row.Deleted { + break + } + if row.AccountID == row.Signer { + masterWeightMap[row.AccountID] = row.Weight + } else { + signersMap[row.AccountID] = append( + signersMap[row.AccountID], + xdr.Signer{ + Key: xdr.MustSigner(row.Signer), + Weight: xdr.Uint32(row.Weight), + }, + ) + if sponsoringSignersMap[row.AccountID] == nil { + sponsoringSignersMap[row.AccountID] = make(map[string]string) + } + sponsoringSignersMap[row.AccountID][row.Signer] = row.Sponsor.String + } + } + + for _, row := range accounts { + if row.Deleted { + break + } + var inflationDest *xdr.AccountId + if row.InflationDestination != "" { + t := xdr.MustAddress(row.InflationDestination) + inflationDest = &t + } + + // Ensure master weight matches, if not it's a state error! + if masterWeight, found := masterWeightMap[row.AccountID]; found { + if int32(row.MasterWeight) != masterWeight { + return ingest.NewStateError( + fmt.Errorf( + "master key weight in account %s does not match (expected=%d, actual=%d)", + row.AccountID, + masterWeightMap[row.AccountID], + int32(row.MasterWeight), + ), + ) + } + } + + signers := xdr.SortSignersByKey(signersMap[row.AccountID]) + signerSponsoringIDs := make([]xdr.SponsorshipDescriptor, len(signers)) + for i, signer := range signers { + sponsor := sponsoringSignersMap[row.AccountID][signer.Key.Address()] + if sponsor != "" { + signerSponsoringIDs[i] = xdr.MustAddressPtr(sponsor) + } + } + + // Accounts that haven't done anything since Protocol 19 will not have a + // V3 extension, so we need to check whether or not this extension needs + // to be filled out. + v3extension := xdr.AccountEntryExtensionV2Ext{V: 0} + if row.SequenceLedger.Valid && row.SequenceTime.Valid { + v3extension.V = 3 + v3extension.V3 = &xdr.AccountEntryExtensionV3{ + SeqLedger: xdr.Uint32(row.SequenceLedger.Int64), + SeqTime: xdr.TimePoint(row.SequenceTime.Int64), + } + } + + account := &xdr.AccountEntry{ + AccountId: xdr.MustAddress(row.AccountID), + Balance: row.RawBalance, + SeqNum: xdr.SequenceNumber(row.SequenceNumber), + NumSubEntries: xdr.Uint32(row.NumSubentries), + InflationDest: inflationDest, + Flags: xdr.Uint32(row.Flags), + HomeDomain: xdr.String32(row.HomeDomain), + Thresholds: xdr.Thresholds{ + row.RawMasterWeight, + row.RawThresholdLow, + row.RawThresholdMedium, + row.RawThresholdHigh, + }, + Signers: signers, + Ext: xdr.AccountEntryExt{ + V: 1, + V1: &xdr.AccountEntryExtensionV1{ + Liabilities: xdr.Liabilities{ + Buying: row.RawBuyingLiabilities, + Selling: row.RawSellingLiabilities, + }, + Ext: xdr.AccountEntryExtensionV1Ext{ + V: 2, + V2: &xdr.AccountEntryExtensionV2{ + NumSponsored: xdr.Uint32(row.NumSponsored), + NumSponsoring: xdr.Uint32(row.NumSponsoring), + SignerSponsoringIDs: signerSponsoringIDs, + Ext: v3extension, + }, + }, + }, + }, + } + + entry := xdr.LedgerEntry{ + LastModifiedLedgerSeq: xdr.Uint32(row.LastModifiedLedger), + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: account, + }, + } + addLedgerEntrySponsor(&entry, row.Sponsor) + err := verifier.Write(entry) + if err != nil { + return err + } + } + + return nil +} + +func addOffersToStateVerifier( + ctx context.Context, + verifier *verify.StateVerifier, + offers []transform.OfferOutput, +) error { + if len(offers) == 0 { + return nil + } + + for _, row := range offers { + if row.Deleted { + break + } + offerXDR := offerToXDR(row) + entry := xdr.LedgerEntry{ + LastModifiedLedgerSeq: xdr.Uint32(row.LastModifiedLedger), + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &offerXDR, + }, + } + addLedgerEntrySponsor(&entry, row.Sponsor) + err := verifier.Write(entry) + if err != nil { + return err + } + } + + return nil +} + +func offerToXDR(row transform.OfferOutput) xdr.OfferEntry { + return xdr.OfferEntry{ + SellerId: xdr.MustAddress(row.SellerID), + OfferId: xdr.Int64(row.OfferID), + Selling: row.SellingAsset, + Buying: row.BuyingAsset, + Amount: row.RawAmount, + Price: xdr.Price{ + N: xdr.Int32(row.PriceN), + D: xdr.Int32(row.PriceD), + }, + Flags: xdr.Uint32(row.Flags), + } +} + +func addClaimableBalanceToStateVerifier( + ctx context.Context, + verifier *verify.StateVerifier, + claims []transform.ClaimableBalanceOutput, +) error { + if len(claims) == 0 { + return nil + } + + for _, row := range claims { + if row.Deleted { + break + } + claimants := []xdr.Claimant{} + for _, claimant := range row.Claimants { + claimants = append(claimants, xdr.Claimant{ + Type: xdr.ClaimantTypeClaimantTypeV0, + V0: &xdr.ClaimantV0{ + Destination: xdr.MustAddress(claimant.Destination), + Predicate: claimant.Predicate, + }, + }) + } + claimants = xdr.SortClaimantsByDestination(claimants) + var balanceID xdr.ClaimableBalanceId + if err := xdr.SafeUnmarshalHex(row.BalanceID, &balanceID); err != nil { + return err + } + cBalance := xdr.ClaimableBalanceEntry{ + BalanceId: balanceID, + Claimants: claimants, + Asset: row.Asset, + Amount: xdr.Int64(row.RawAssetAmount), + } + if row.Flags != 0 { + cBalance.Ext = xdr.ClaimableBalanceEntryExt{ + V: 1, + V1: &xdr.ClaimableBalanceEntryExtensionV1{ + Flags: xdr.Uint32(row.Flags), + }, + } + } + entry := xdr.LedgerEntry{ + LastModifiedLedgerSeq: xdr.Uint32(row.LastModifiedLedger), + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &cBalance, + }, + } + addLedgerEntrySponsor(&entry, row.Sponsor) + if err := verifier.Write(entry); err != nil { + return err + } + } + + return nil +} + +func addLiquidityPoolsToStateVerifier( + ctx context.Context, + verifier *verify.StateVerifier, + lPools []transform.PoolOutput, +) error { + if len(lPools) == 0 { + return nil + } + + for _, row := range lPools { + if row.Deleted { + break + } + lPoolEntry, err := liquidityPoolToXDR(row) + if err != nil { + return errors.Wrap(err, "Invalid liquidity pool row") + } + + entry := xdr.LedgerEntry{ + LastModifiedLedgerSeq: xdr.Uint32(row.LastModifiedLedger), + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeLiquidityPool, + LiquidityPool: &lPoolEntry, + }, + } + if err := verifier.Write(entry); err != nil { + return err + } + } + + return nil +} + +func liquidityPoolToXDR(row transform.PoolOutput) (xdr.LiquidityPoolEntry, error) { + id, err := hex.DecodeString(row.PoolID) + if err != nil { + return xdr.LiquidityPoolEntry{}, errors.Wrap(err, "Error decoding pool ID") + } + var poolID xdr.PoolId + if len(id) != len(poolID) { + return xdr.LiquidityPoolEntry{}, fmt.Errorf("error decoding pool ID, incorrect length (%d)", len(id)) + } + copy(poolID[:], id) + + var lPoolEntry = xdr.LiquidityPoolEntry{ + LiquidityPoolId: poolID, + Body: xdr.LiquidityPoolEntryBody{ + Type: row.RawPoolType, + ConstantProduct: &xdr.LiquidityPoolEntryConstantProduct{ + Params: xdr.LiquidityPoolConstantProductParameters{ + AssetA: row.AssetA, + AssetB: row.AssetB, + Fee: xdr.Int32(row.PoolFee), + }, + ReserveA: xdr.Int64(row.RawAssetAReserve), + ReserveB: xdr.Int64(row.RawAssetBReserve), + TotalPoolShares: xdr.Int64(row.RawPoolShareCount), + PoolSharesTrustLineCount: xdr.Int64(row.TrustlineCount), + }, + }, + } + return lPoolEntry, nil +} + +func addLedgerEntrySponsor(entry *xdr.LedgerEntry, sponsor null.String) { + ledgerEntrySponsor := xdr.SponsorshipDescriptor(nil) + + if !sponsor.IsZero() { + ledgerEntrySponsor = xdr.MustAddressPtr(sponsor.String) + } + entry.Ext = xdr.LedgerEntryExt{ + V: 1, + V1: &xdr.LedgerEntryExtensionV1{ + SponsoringId: ledgerEntrySponsor, + }, + } +} + +func addTrustLinesToStateVerifier( + ctx context.Context, + verifier *verify.StateVerifier, + trusts []transform.TrustlineOutput, +) error { + if len(trusts) == 0 { + return nil + } + + for _, row := range trusts { + if row.Deleted { + break + } + var entry xdr.LedgerEntry + entry, err := trustLineToXDR(row) + if err != nil { + return err + } + + if err = verifier.Write(entry); err != nil { + return err + } + } + + return nil +} + +func trustLineToXDR(row transform.TrustlineOutput) (xdr.LedgerEntry, error) { + var asset xdr.TrustLineAsset + switch row.AssetType { + case int32(xdr.AssetTypeAssetTypePoolShare): + asset = xdr.TrustLineAsset{ + Type: xdr.AssetTypeAssetTypePoolShare, + LiquidityPoolId: &xdr.PoolId{}, + } + _, err := hex.Decode((*asset.LiquidityPoolId)[:], []byte(row.LiquidityPoolID)) + if err != nil { + return xdr.LedgerEntry{}, errors.Wrap(err, "Error decoding liquidity pool id") + } + case int32(xdr.AssetTypeAssetTypeNative): + asset = xdr.MustNewNativeAsset().ToTrustLineAsset() + default: + creditAsset, err := xdr.NewCreditAsset(row.AssetCode, row.AssetIssuer) + if err != nil { + return xdr.LedgerEntry{}, errors.Wrap(err, "Error decoding credit asset") + } + asset = creditAsset.ToTrustLineAsset() + } + + trustline := xdr.TrustLineEntry{ + AccountId: xdr.MustAddress(row.AccountID), + Asset: asset, + Balance: row.RawBalance, + Limit: xdr.Int64(row.TrustlineLimit), + Flags: xdr.Uint32(row.Flags), + Ext: xdr.TrustLineEntryExt{ + V: 1, + V1: &xdr.TrustLineEntryV1{ + Liabilities: xdr.Liabilities{ + Buying: row.RawBuying, + Selling: row.RawSelling, + }, + }, + }, + } + entry := xdr.LedgerEntry{ + LastModifiedLedgerSeq: xdr.Uint32(row.LastModifiedLedger), + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTrustline, + TrustLine: &trustline, + }, + } + addLedgerEntrySponsor(&entry, row.Sponsor) + return entry, nil +}