forked from crypto-org-chain/chain-main
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_maxsupply.py
More file actions
195 lines (154 loc) · 6.3 KB
/
test_maxsupply.py
File metadata and controls
195 lines (154 loc) · 6.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import time
from pathlib import Path
import pytest
from .utils import (
approve_proposal,
check_proposal_exist,
cluster_fixture,
find_event_proposal_id,
module_address,
query_command,
wait_for_new_blocks,
)
MAXSUPPLY = "maxsupply"
TOTALSUPPLY = "total-supply-of"
BANK_MODULE = "bank"
PARAM = "max-supply"
DENOM = "basecro"
AMOUNT = "amount"
MSG = "/chainmain.maxsupply.v1.MsgUpdateParams"
ERROR = "failed to apply block; error the total supply has exceeded the maximum supply"
pytestmark = pytest.mark.normal
@pytest.fixture(scope="module")
def cluster(worker_index, tmp_path_factory):
"override cluster fixture for this test module"
yield from cluster_fixture(
Path(__file__).parent / "configs/default.jsonnet",
worker_index,
tmp_path_factory.mktemp("data"),
)
def _create_max_supply_proposal(params):
"""Create a governance proposal to update max supply parameters"""
authority = module_address("gov")
proposal_src = {
"messages": [
{
"@type": MSG,
"authority": authority,
"params": params,
}
],
"deposit": "100000000basecro",
"title": "Update Max Supply",
"summary": "Increase maximum supply limit",
}
return proposal_src
def test_max_supply_cli_query(cluster):
"""Test querying max supply parameters"""
# Query max supply parameters
rsp = query_command(cluster, MAXSUPPLY, PARAM)
assert "max_supply" in rsp
assert int(rsp["max_supply"]) == 0 # the max supply is 0 by default
def test_max_supply_persistence(cluster):
"""Test that max supply persists across chain restarts"""
# Get initial max supply
initial_max_supply_rsp = query_command(cluster, MAXSUPPLY, PARAM)
initial_max_supply = initial_max_supply_rsp["max_supply"]
# Restart the chain (this would require cluster restart functionality)
# For now, just verify the value is consistent
wait_for_new_blocks(cluster, 1)
# Query again after some blocks
final_max_supply_rsp = query_command(cluster, MAXSUPPLY, PARAM)
final_max_supply = final_max_supply_rsp["max_supply"]
assert (
initial_max_supply == final_max_supply
), "Max supply should persist across blocks"
def test_max_supply_update_via_governance(cluster):
"""Test updating max supply through governance proposal"""
# Get current max supply
rsp = query_command(cluster, MAXSUPPLY, PARAM)
current_max_supply = int(rsp["max_supply"])
# Prepare new max supply (increase by 2000000000000)
new_max_supply = current_max_supply + 2000000000000
rsp["max_supply"] = str(new_max_supply)
proposal_src = _create_max_supply_proposal(rsp)
rsp = cluster.gov_propose_since_cosmos_sdk_v0_50(
"community", "submit-proposal", proposal_src
)
assert rsp["code"] == 0, rsp["raw_log"]
# Extract proposal ID from the response
proposal_id = find_event_proposal_id(rsp["events"])
# Wait for proposal to be available
wait_for_new_blocks(cluster, 1)
# Check if proposal exists before voting
check_proposal_exist(cluster, proposal_id)
# Vote on proposal
approve_proposal(cluster, rsp, msg=f",{MSG}")
print("check params have been updated now")
# Verify max supply has been updated
updated_max_supply_rsp = query_command(cluster, MAXSUPPLY, PARAM)
updated_max_supply = int(updated_max_supply_rsp["max_supply"])
assert (
updated_max_supply == new_max_supply
), f"Max supply should be updated to {new_max_supply}"
def test_begin_blocker_halt_on_excess_supply(cluster):
"""Test that chain halts when total supply exceeds max supply"""
total_supply_rsp = query_command(cluster, BANK_MODULE, TOTALSUPPLY, DENOM)
current_total_supply = int(total_supply_rsp[AMOUNT][AMOUNT])
print("current_total_supply:", current_total_supply)
# Get current max supply
rsp = query_command(cluster, MAXSUPPLY, PARAM)
assert "max_supply" in rsp
# Prepare new max supply (increase by 450000) and submit a proposal
# Around 13 blocks should pass before total supply exceeds max supply
new_max_supply = current_total_supply + 450000
rsp["max_supply"] = str(new_max_supply)
proposal_src = _create_max_supply_proposal(rsp)
rsp = cluster.gov_propose_since_cosmos_sdk_v0_50(
"community", "submit-proposal", proposal_src
)
assert rsp["code"] == 0, rsp["raw_log"]
# Extract proposal ID from the response
proposal_id = find_event_proposal_id(rsp["events"])
# Wait for proposal to be available
wait_for_new_blocks(cluster, 1)
# Check if proposal exists before voting
check_proposal_exist(cluster, proposal_id)
# Vote on proposal
approve_proposal(cluster, rsp, msg=f",{MSG}")
# Verify max supply has been updated
updated_max_supply_rsp = query_command(cluster, MAXSUPPLY, PARAM)
updated_max_supply = int(updated_max_supply_rsp["max_supply"])
assert (
updated_max_supply == new_max_supply
), f"Max supply should be updated to {new_max_supply}"
def _halted_chain():
print(" halting chain...")
node0_info = cluster.supervisor.getProcessInfo(f"{cluster.chain_id}-node0")
halted0 = node0_info["state"] != "RUNNING"
node1_info = cluster.supervisor.getProcessInfo(f"{cluster.chain_id}-node1")
halted1 = node1_info["state"] != "RUNNING"
return halted0 and halted1
# Wait new blocks in order to reach the halt
timeout_seconds = 120
start_time = time.time()
try:
while not _halted_chain():
if time.time() - start_time > timeout_seconds:
timeout_msg = (
f"Timeout after {timeout_seconds} seconds "
f"waiting for chain to halt"
)
assert False, timeout_msg
time.sleep(1)
print("Chain has been halted")
time.sleep(1)
# Check the node's log for errors matches the expected message
log_file = f"{cluster.home(0)}/../node0.log"
with open(log_file, "r") as f:
log_content = f.read()
print("log_content:", log_content)
assert ERROR in log_content, "Expected error message not found in log"
pass
except Exception as e:
assert False, f"Test case failed due to exception: {e}."