diff --git a/lib/ae_mdw/db/model.ex b/lib/ae_mdw/db/model.ex index 305433db1..c60782869 100644 --- a/lib/ae_mdw/db/model.ex +++ b/lib/ae_mdw/db/model.ex @@ -55,7 +55,7 @@ defmodule AeMdw.Db.Model do # index is timestamp (daylight saving order should be handle case by case) @typep timestamp :: pos_integer() - @type async_task_type :: :update_aex9_state | :store_acc_balance | :migrate | :update_tx_stats + @type async_task_type :: :update_aex9_state | :store_acc_balance | :migrate @type async_task_index :: {timestamp(), async_task_type()} @type async_task_args :: list() @@ -157,8 +157,13 @@ defmodule AeMdw.Db.Model do ) # txs table : - # index = tx_index (0..), id = tx_id, block_index = {kbi, mbi} time = time, fee = fee - @tx_defaults [index: nil, id: nil, block_index: nil, time: nil, fee: nil] + # index = tx_index (0..), + # id = tx_id + # block_index = {kbi, mbi} + # time = time + # fee = fee + # accumulated_fee = sum of previous fees + @tx_defaults [index: nil, id: nil, block_index: nil, time: nil, fee: nil, accumulated_fee: nil] defrecord :tx, @tx_defaults @type tx_index() :: txi() @@ -168,7 +173,8 @@ defmodule AeMdw.Db.Model do id: Txs.tx_hash(), block_index: block_index(), time: Blocks.time(), - fee: non_neg_integer() + fee: non_neg_integer(), + accumulated_fee: non_neg_integer() ) # txs time index : diff --git a/lib/ae_mdw/db/mutations/transaction_fee_mutation.ex b/lib/ae_mdw/db/mutations/transaction_fee_mutation.ex new file mode 100644 index 000000000..6149a3b8d --- /dev/null +++ b/lib/ae_mdw/db/mutations/transaction_fee_mutation.ex @@ -0,0 +1,60 @@ +defmodule AeMdw.Db.TransactionFeeMutation do + @moduledoc """ + Add + """ + + alias AeMdw.Blocks + alias AeMdw.Db.Model + alias AeMdw.Db.State + alias AeMdw.Txs + + require Model + + @derive AeMdw.Db.Mutation + defstruct [:txi, :tx_hash, :block_index, :time, :fee] + + @typep fee() :: non_neg_integer() + + @opaque t() :: %__MODULE__{ + txi: Txs.txi(), + tx_hash: Txs.tx_hash(), + block_index: Blocks.block_index(), + time: Blocks.time(), + fee: fee() + } + + @spec new(Txs.txi(), Txs.tx_hash(), Blocks.block_index(), Blocks.time(), fee()) :: t() + def new(txi, tx_hash, block_index, mb_time, fee) do + %__MODULE__{txi: txi, tx_hash: tx_hash, block_index: block_index, time: mb_time, fee: fee} + end + + @spec execute(t(), State.t()) :: State.t() + def execute( + %__MODULE__{ + txi: txi, + tx_hash: tx_hash, + block_index: block_index, + time: mb_time, + fee: fee + }, + state + ) do + accumulated_fee = + case State.get(state, Model.Tx, txi - 1) do + {:ok, Model.tx(accumulated_fee: accumulated_fee)} -> accumulated_fee + fee + :not_found -> fee + end + + m_tx = + Model.tx( + index: txi, + id: tx_hash, + block_index: block_index, + time: mb_time, + fee: fee, + accumulated_fee: accumulated_fee + ) + + State.put(state, Model.Tx, m_tx) + end +end diff --git a/lib/ae_mdw/db/sync/transaction.ex b/lib/ae_mdw/db/sync/transaction.ex index b42fea892..2e4edb1dc 100644 --- a/lib/ae_mdw/db/sync/transaction.ex +++ b/lib/ae_mdw/db/sync/transaction.ex @@ -18,6 +18,7 @@ defmodule AeMdw.Db.Sync.Transaction do alias AeMdw.Db.Sync.Name, as: SyncName alias AeMdw.Db.Sync.Oracle alias AeMdw.Db.Sync.Origin + alias AeMdw.Db.TransactionFeeMutation alias AeMdw.Db.WriteFieldsMutation alias AeMdw.Db.WriteMutation alias AeMdw.Db.Mutation @@ -85,7 +86,6 @@ defmodule AeMdw.Db.Sync.Transaction do {type, tx} = :aetx.specialize_type(:aetx_sign.tx(signed_tx)) tx_hash = :aetx_sign.hash(signed_tx) fee = Db.get_tx_fee(tx_hash) - m_tx = Model.tx(index: txi, id: tx_hash, block_index: block_index, time: mb_time, fee: fee) tx_context = TxContext.new( @@ -100,7 +100,7 @@ defmodule AeMdw.Db.Sync.Transaction do ) [ - WriteMutation.new(Model.Tx, m_tx), + TransactionFeeMutation.new(txi, tx_hash, block_index, mb_time, fee), WriteMutation.new(Model.Type, Model.type(index: {type, txi})), WriteMutation.new(Model.Time, Model.time(index: {mb_time, txi})), WriteFieldsMutation.new(type, tx, block_index, txi) diff --git a/lib/ae_mdw/stats.ex b/lib/ae_mdw/stats.ex index 4ecffcce8..4a02177fe 100644 --- a/lib/ae_mdw/stats.ex +++ b/lib/ae_mdw/stats.ex @@ -787,25 +787,47 @@ defmodule AeMdw.Stats do end defp last_24hs_txs_count_and_fee_with_trend(state) do - state - |> State.get(Model.Stat, :tx_stats) - |> case do - {:ok, - Model.stat( - payload: - {_started_at, {{txs_count_24hs, trend_str}, {average_tx_fees_24hs_str, fee_trend_str}}} - )} -> - trend = String.to_float(trend_str) - fees_trend = String.to_float(fee_trend_str) - average_tx_fees_24hs = String.to_float(average_tx_fees_24hs_str) - - {{txs_count_24hs, trend}, {average_tx_fees_24hs, fees_trend}} - - :not_found -> + time_24hs_ago = :aeu_time.now_in_msecs() - @seconds_per_day * 1_000 + + with {:ok, {_time, tx_index_24hs_ago}} <- + State.next(state, Model.Time, {time_24hs_ago, -1}), + {:ok, last_tx_index} <- State.prev(state, Model.Tx, nil), + time_48hs_ago <- time_24hs_ago - @seconds_per_day * 1_000, + {:ok, {_time, tx_index_48hs_ago}} <- State.next(state, Model.Time, {time_48hs_ago, -1}), + txs_count_24hs when txs_count_24hs > 0 <- last_tx_index - tx_index_24hs_ago + 1, + txs_count_48hs <- tx_index_24hs_ago - tx_index_48hs_ago, + trend <- Float.round((txs_count_24hs - txs_count_48hs) / txs_count_24hs, 2), + average_tx_fees_24hs when average_tx_fees_24hs > 0 <- + average_tx_fees(state, tx_index_24hs_ago, last_tx_index) do + average_tx_fees_48hs = average_tx_fees(state, tx_index_48hs_ago, tx_index_24hs_ago) + + fee_trend = + Float.round((average_tx_fees_24hs - average_tx_fees_48hs) / average_tx_fees_24hs, 2) + + {{txs_count_24hs, trend}, {average_tx_fees_24hs, fee_trend}} + else + _error -> {{0, 0.0}, {0.0, 0.0}} end end + defp average_tx_fees(state, txi, txi) do + Model.tx(fee: fee) = State.fetch!(state, Model.Tx, txi) + + fee * 1.0 + end + + defp average_tx_fees(state, start_txi, end_txi) do + txs_count = end_txi - start_txi + 1 + + Model.tx(accumulated_fee: start_accumulated_fee, fee: fee) = + State.fetch!(state, Model.Tx, start_txi) + + Model.tx(accumulated_fee: end_accumulated_fee) = State.fetch!(state, Model.Tx, end_txi) + + (end_accumulated_fee - start_accumulated_fee + fee) / txs_count + end + defp months_to_iso(months) do year = div(months, 12) month = rem(months, 12) + 1 diff --git a/lib/ae_mdw/sync/async_tasks/update_tx_stats.ex b/lib/ae_mdw/sync/async_tasks/update_tx_stats.ex index a12cb3ab5..4472cbe9e 100644 --- a/lib/ae_mdw/sync/async_tasks/update_tx_stats.ex +++ b/lib/ae_mdw/sync/async_tasks/update_tx_stats.ex @@ -1,100 +1,13 @@ defmodule AeMdw.Sync.AsyncTasks.UpdateTxStats do @moduledoc """ - Async work to update tx count, fee and stats without blocking sync. + Temporary module to get rid of pending tasks. """ @behaviour AeMdw.Sync.AsyncTasks.Work - alias AeMdw.Db.Model - alias AeMdw.Db.WriteMutation - alias AeMdw.Db.RocksDbCF - alias AeMdw.Db.State - - alias AeMdw.Sync.AsyncStoreServer - - require Model - require Logger - - @microsecs 1_000_000 - @seconds_per_day 24 * 3_600 - @spec process(args :: list(), done_fn :: fun()) :: :ok - def process([started_at], done_fn) do - state = State.mem_state() - - state - |> State.get(Model.Stat, :tx_stats) - |> case do - {:ok, Model.stat(payload: {old_started_at, _old_stats})} when old_started_at > started_at -> - done_fn.() - :ok - - {:ok, _old_stats} -> - update_stats(state, started_at, done_fn) - - :not_found -> - update_stats(state, started_at, done_fn) - end - end - - defp update_stats(state, started_at, done_fn) do - {time_delta, :ok} = - :timer.tc(fn -> - tx_stats = - calculate_fees(state, started_at) - - write_mutation = - WriteMutation.new( - Model.Stat, - Model.stat(index: :tx_stats, payload: tx_stats) - ) - - AsyncStoreServer.write_mutations( - [write_mutation], - done_fn - ) - end) - - Logger.debug("[update_tx_stats] after #{time_delta / @microsecs}s") + def process(_args, done_fn) do + _task = Task.async(done_fn) :ok end - - defp calculate_fees(state, started_at) do - time_24hs_ago = started_at - @seconds_per_day * 1_000 - - with {:ok, {_time, tx_index_24hs_ago}} <- State.next(state, Model.Time, {time_24hs_ago, -1}), - {:ok, last_tx_index} <- State.prev(state, Model.Tx, nil), - time_48hs_ago <- time_24hs_ago - @seconds_per_day * 1_000, - {:ok, {_time, tx_index_48hs_ago}} <- State.next(state, Model.Time, {time_48hs_ago, -1}), - txs_count_24hs when txs_count_24hs > 0 <- last_tx_index - tx_index_24hs_ago + 1, - txs_count_48hs <- tx_index_24hs_ago - tx_index_48hs_ago, - trend <- Float.round((txs_count_24hs - txs_count_48hs) / txs_count_24hs, 2), - average_tx_fees_24hs when average_tx_fees_24hs > 0 <- - average_tx_fees(tx_index_24hs_ago, last_tx_index), - average_tx_fees_48hs <- average_tx_fees(tx_index_48hs_ago, tx_index_24hs_ago), - fee_trend <- - Float.round((average_tx_fees_24hs - average_tx_fees_48hs) / average_tx_fees_24hs, 2) do - {started_at, - {{txs_count_24hs, Float.to_string(trend)}, - {Float.to_string(average_tx_fees_24hs), Float.to_string(fee_trend)}}} - else - _error -> - {started_at, {{0, "0.0"}, {"0.0", "0.0"}}} - end - end - - defp average_tx_fees(start_txi, end_txi) do - txs_count = end_txi - start_txi + 1 - - if txs_count != 0 do - Model.Tx - |> RocksDbCF.stream(key_boundary: {start_txi, end_txi}) - |> Enum.reduce(0, fn Model.tx(fee: fee), acc -> - acc + fee - end) - |> then(&(&1 / txs_count)) - else - 0 - end - end end diff --git a/lib/ae_mdw/sync/server.ex b/lib/ae_mdw/sync/server.ex index 008db1108..d61cfc5ef 100644 --- a/lib/ae_mdw/sync/server.ex +++ b/lib/ae_mdw/sync/server.ex @@ -329,7 +329,7 @@ defmodule AeMdw.Sync.Server do :ok = profile_sync("sync_db", height, ts, blocks_mutations) - add_tx_fees_job(new_state) + new_state end) broadcast_blocks(gens_mutations) @@ -421,9 +421,7 @@ defmodule AeMdw.Sync.Server do block_mutations end) - state - |> State.commit_mem(mutations) - |> add_tx_fees_job() + State.commit_mem(state, mutations) end) :ok = profile_sync("sync_mem", height, ts, gen_mutations) @@ -525,9 +523,4 @@ defmodule AeMdw.Sync.Server do :ok end - - defp add_tx_fees_job(state) do - now = :aeu_time.now_in_msecs() - State.enqueue(state, :update_tx_stats, [now], []) - end end diff --git a/priv/migrations/20250421120001_add_accumulated_fee_to_txs.ex b/priv/migrations/20250421120001_add_accumulated_fee_to_txs.ex new file mode 100644 index 000000000..5e569707f --- /dev/null +++ b/priv/migrations/20250421120001_add_accumulated_fee_to_txs.ex @@ -0,0 +1,60 @@ +defmodule AeMdw.Migrations.AddAccumulatedFeeToTxs do + @moduledoc """ + Adds the new accumulated fee field to tx. + """ + alias AeMdw.Db.WriteMutation + alias AeMdw.Db.Util, as: DbUtil + alias AeMdw.Db.State + alias AeMdw.Db.Model + + import Record, only: [defrecord: 2] + + require Model + require Logger + + defrecord(:tx, index: nil, id: nil, block_index: nil, time: nil, fee: nil) + + @spec run(State.t(), boolean()) :: {:ok, non_neg_integer()} + def run(state, from_start?) do + case DbUtil.last_txi(state) do + {:ok, last_txi} -> run(state, from_start?, last_txi) + :none -> {:ok, 0} + end + end + + defp run(state, _from_start?, last_txi) do + 1..last_txi + |> Stream.map(&State.fetch!(state, Model.Tx, &1)) + |> Stream.transform(0, fn + tx(index: index, id: id, block_index: block_index, time: time, fee: fee), acc_fee -> + acc_fee = acc_fee + fee + + if rem(index, 10_000) == 0 do + Logger.info("Processed #{index} out of #{last_txi}") + end + + tx = + Model.tx( + index: index, + id: id, + block_index: block_index, + time: time, + fee: fee, + accumulated_fee: acc_fee + ) + + {[WriteMutation.new(Model.Tx, tx)], acc_fee} + + Model.tx(accumulated_fee: acc_fee), _acc_fee -> + {[], acc_fee} + end) + |> Stream.chunk_every(10_000) + |> Stream.map(fn mutations -> + _state = State.commit_db(state, mutations) + + length(mutations) + end) + |> Enum.sum() + |> then(&{:ok, &1}) + end +end diff --git a/test/ae_mdw_web/controllers/stats_controller_test.exs b/test/ae_mdw_web/controllers/stats_controller_test.exs index 7bda754c3..ccedbc954 100644 --- a/test/ae_mdw_web/controllers/stats_controller_test.exs +++ b/test/ae_mdw_web/controllers/stats_controller_test.exs @@ -6,8 +6,6 @@ defmodule AeMdwWeb.StatsControllerTest do alias :aeser_api_encoder, as: Enc alias AeMdw.Db.Model alias AeMdw.Db.Store - alias AeMdw.Db.RocksDbCF - alias AeMdw.Collection require Model @@ -1102,19 +1100,22 @@ defmodule AeMdwWeb.StatsControllerTest do last_txi = 21 - fee_avg = Enum.sum((last_txi - 3)..last_txi) / Enum.count((last_txi - 3)..last_txi) - txs_count = 4 - - encoded_txs_stats = - {{txs_count, "-0.25"}, {"#{fee_avg}", "0.21"}} + fee_avg = + 0..last_txi + |> Enum.map_reduce(0, fn txi, acc_fee -> + {acc_fee + txi, acc_fee + txi} + end) + |> then(fn {accums, _acc} -> accums end) + |> then(fn accums -> + (Enum.at(accums, last_txi) - Enum.at(accums, last_txi - 4) + last_txi - 4) / 5 + end) store = store - |> add_transactions_every_5_hours(1, last_txi, now) + |> add_transactions_every_5_hours(0, last_txi, now) |> Store.put(Model.Stat, Model.stat(index: :miners_count, payload: 2)) |> Store.put(Model.Stat, Model.stat(index: :max_tps, payload: {2, <<0::256>>})) |> Store.put(Model.Stat, Model.stat(index: Stats.holders_count_key(), payload: 3)) - |> Store.put(Model.Stat, Model.stat(index: :tx_stats, payload: {now, encoded_txs_stats})) |> Store.put(Model.Block, Model.block(index: {1, -1}, hash: <<1::256>>)) |> Store.put(Model.Block, Model.block(index: {10, -1}, hash: <<2::256>>)) @@ -1131,9 +1132,9 @@ defmodule AeMdwWeb.StatsControllerTest do end} ]) do assert %{ - "last_24hs_transactions" => 4, - "transactions_trend" => -0.25, - "fees_trend" => 0.21, + "last_24hs_transactions" => 5, + "transactions_trend" => 0.0, + "fees_trend" => 0.24, "last_24hs_average_transaction_fees" => ^fee_avg, "milliseconds_per_block" => ^three_minutes, "holders_count" => 3 @@ -1152,16 +1153,12 @@ defmodule AeMdwWeb.StatsControllerTest do now = :aeu_time.now_in_msecs() three_minutes = 3 * 60 * 1_000 - endcoded_txs_stats = - {{1, "1.0"}, {"1.0", "0.0"}} - store = store |> add_transactions_every_5_hours(1, 1, now) |> Store.put(Model.Stat, Model.stat(index: :miners_count, payload: 2)) |> Store.put(Model.Stat, Model.stat(index: :max_tps, payload: {2, <<0::256>>})) |> Store.put(Model.Stat, Model.stat(index: Stats.holders_count_key(), payload: 3)) - |> Store.put(Model.Stat, Model.stat(index: :tx_stats, payload: {now, endcoded_txs_stats})) |> Store.put(Model.Block, Model.block(index: {1, -1}, hash: <<1::256>>)) |> Store.put(Model.Block, Model.block(index: {10, -1}, hash: <<2::256>>)) @@ -1175,17 +1172,6 @@ defmodule AeMdwWeb.StatsControllerTest do time_in_msecs: fn :first_block -> now - 10 * three_minutes :other_block -> now - end}, - {RocksDbCF, [], - stream: fn - Model.Tx, [key_boundary: {start_txi, end_txi}] -> - store - |> State.new() - |> Collection.stream(Model.Tx, :forward, {start_txi, end_txi}, nil) - |> Stream.map(fn index -> - {:ok, tx} = Store.get(store, Model.Tx, index) - tx - end) end} ]) do assert %{ @@ -1217,6 +1203,7 @@ defmodule AeMdwWeb.StatsControllerTest do |> Store.put(Model.Stat, Model.stat(index: Stats.holders_count_key(), payload: 3)) |> Store.put(Model.Block, Model.block(index: {1, -1}, hash: <<1::256>>)) |> Store.put(Model.Block, Model.block(index: {10, -1}, hash: <<2::256>>)) + |> Store.put(Model.Tx, Model.tx(index: <<3::256>>, id: <<2::256>>, accumulated_fee: 123)) with_mocks([ {:aec_chain, [], @@ -1542,16 +1529,23 @@ defmodule AeMdwWeb.StatsControllerTest do end defp add_transactions_every_5_hours(store, start_txi, end_txi, now) do - end_txi..start_txi - |> Enum.reduce({store, 1}, fn txi, {store, i} -> + init_time = now - :timer.hours((end_txi - start_txi) * 5) + + start_txi..end_txi + |> Enum.reduce({store, 0}, fn txi, {store, acc_fee} -> + acc_fee = acc_fee + txi + { store - |> Store.put(Model.Tx, Model.tx(index: txi, id: <>, fee: txi)) + |> Store.put( + Model.Tx, + Model.tx(index: txi, id: <>, fee: txi, accumulated_fee: acc_fee) + ) |> Store.put( Model.Time, - Model.time(index: {now - :timer.hours(i * 5), txi}) + Model.time(index: {init_time + :timer.hours(txi * 5), txi}) ), - i + 1 + acc_fee } end) |> elem(0)