Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- master
- fix/*

env:
ECR_REPOSITORY: electrumx
Expand All @@ -20,6 +21,11 @@ jobs:
fetch-depth: 0
persist-credentials: false

- name: Set environment variables for fix branch
if: github.ref != 'ref/heads/master'
run:
echo "IMAGE_TAG=fix-$IMAGE_TAG" >> "$GITHUB_ENV"

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
Expand All @@ -38,6 +44,7 @@ jobs:

- name: Python Semantic Release
uses: relekang/python-semantic-release@master
if: github.ref == 'ref/heads/master'
with:
github_token: ${{ secrets.TS_GH_TOKEN }}
pypi_token: false
Expand All @@ -48,6 +55,9 @@ jobs:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
run: docker pull $ECR_REGISTRY/$ECR_REPOSITORY:latest

- name: Show version
run: cat electrumx/__init__.py

- name: Builder image
run: docker build -t $BUILD_NAME .

Expand Down
2 changes: 1 addition & 1 deletion electrumx/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 'ElectrumX 2.0'
version = 'ElectrumX 2.0.a'
version_short = version.split()[-1]

from electrumx.server.controller import Controller
Expand Down
45 changes: 35 additions & 10 deletions electrumx/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ async def _maybe_flush(self):
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.time() + 30
self.next_cache_check = time.time() + (60 * 5) # flush every five minutes

def check_cache_size(self):
'''Flush a cache if it gets too big.'''
Expand Down Expand Up @@ -660,14 +660,17 @@ async def fetch_and_process_blocks(self, caught_up_event):
'''
self._caught_up_event = caught_up_event
await self._first_open_dbs()
try:
async with TaskGroup() as group:
await group.spawn(self.prefetcher.main_loop(self.height))
await group.spawn(self._process_prefetched_blocks())
finally:
# Shut down block processing
self.logger.info('flushing to DB for a clean shutdown...')
await self.flush(True)
# try:
async with TaskGroup() as group:
await group.spawn(self.prefetcher.main_loop(self.height))
await group.spawn(self._process_prefetched_blocks())
# except Exception as ex:
# print (ex)
# finally:

# Shut down block processing
self.logger.info('flushing to DB for a clean shutdown...')
await self.flush(True)

def force_chain_reorg(self, count):
'''Force a reorg of the given number of blocks.
Expand Down Expand Up @@ -826,6 +829,7 @@ def __init__(self, env, db, daemon, notifications):
# helper counter, atx are counted also for tx_count
self.atx_count = 0
self.ratx_count = 0
self.flush_height = 0

def estimate_txs_remaining(self):
# Try to estimate how many txs there are to go
Expand All @@ -839,6 +843,7 @@ def estimate_txs_remaining(self):

def flush_data(self):
assert self.state_lock.locked()
self.flush_height = self.height
return BitcoinVaultFlushData(self.height, self.tx_count, self.headers,
self.tx_hashes, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip, self.tx_types)
Expand All @@ -860,6 +865,13 @@ def advance_blocks(self, blocks):
self.tip = self.coin.header_hash(headers[-1])

def advance_txs_and_atxs(self, txs, atxs):
# stdout.write(f"Block: {self.height} \r'")
# stdout.flush()
if (len(atxs) != 0):
print(f'---------- advance_txs_and_atxs {self.height} ----------')
print(f'len(txs) {len(txs)}, ')
print(f'len(atxs) {len(atxs)}, ')

# Use local vars for speed in the loops
undo_info = []
tx_num = self.tx_count
Expand Down Expand Up @@ -1098,7 +1110,20 @@ def backup_txs_and_atx(self, txs, atxs):
self.ratx_count = 0

def get_tx_hash_from_cache(self, tx_num, tx_height):

height_tx_count = self.db.tx_counts[tx_height - 1]
index = (tx_num - height_tx_count) * 32
tx_hash = self.tx_hashes[tx_height][index:index + 32]

print("--- get_tx_hash_from_cache ---")
print(f'tx_num {tx_num}')
print(f'height_tx_count {height_tx_count}')
print(f'self.flush_height {self.flush_height}')
print(f'tx_height {tx_height}')
print(f'len(self.tx_hashes) {len(self.tx_hashes)}')

assert tx_height-self.flush_height > 0
tx_hash = self.tx_hashes[tx_height-self.flush_height][index:index + 32]


print(f'tx_hash.hex() {tx_hash.hex()}')
return tx_hash
6 changes: 6 additions & 0 deletions electrumx/server/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ async def getrawtransaction(self, hex_hash, verbose=False):
return await self._send_single('getrawtransaction',
(hex_hash, int(verbose)))

async def getblock(self, hex_hash, verbose=False):
'''Return the serialized block with the given hash.'''
# Cast to int because some coin daemons are old and require it
return await self._send_single('getblock',
(hex_hash, int(verbose)))

async def getrawtransactions(self, hex_hashes, replace_errs=True):
'''Return the serialized raw transactions with the given hashes.

Expand Down
33 changes: 32 additions & 1 deletion electrumx/server/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,23 +737,54 @@ async def _read_tx_counts(self):
self.atx_counts = array.array('I', atx_counts)

def flush_fs(self, flush_data):
print('------------ flush_fs ------------')
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
print(f'prior_tx_count: {prior_tx_count}')

print(f'len(flush_data.block_tx_hashes) : {len(flush_data.block_tx_hashes)}')
print(f'len(flush_data.headers) : {len(flush_data.headers)}')
assert len(flush_data.block_tx_hashes) == len(flush_data.headers)

print(f'len(flush_data.block_tx_types) : {len(flush_data.block_tx_types)}')
print(f'len(flush_data.headers) : {len(flush_data.headers)}')
assert len(flush_data.block_tx_types) == len(flush_data.headers)

print(f'flush_data.height : {flush_data.height}')
print(f'self.fs_height + len(flush_data.headers) : {self.fs_height + len(flush_data.headers)}')
assert flush_data.height == self.fs_height + len(flush_data.headers)

print(f'flush_data.tx_count : {flush_data.tx_count}')
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)

print(f'len(self.tx_counts) : {len(self.tx_counts)}')
print(f'flush_data.height + 1 : {flush_data.height + 1}')
assert len(self.tx_counts) == flush_data.height + 1

print(f'len(self.atx_counts) : {len(self.atx_counts)}')
print(f'flush_data.height + 1 : {flush_data.height + 1}')
assert len(self.atx_counts) == flush_data.height + 1

print(f'len(self.ratx_counts) : {len(self.ratx_counts)}')
print(f'flush_data.height + 1 : {flush_data.height + 1}')
assert len(self.ratx_counts) == flush_data.height + 1

hashes = b''.join(flush_data.block_tx_hashes)
print(f'len(hashes) : {len(hashes)}')
flush_data.block_tx_hashes.clear()
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count

# TODO: We need to know why there is +1, and why it is working

print(f'flush_data.tx_count + 1 : {flush_data.tx_count + 1}')
print(f'prior_tx_count : {prior_tx_count}')
assert not len(hashes) // 32 == flush_data.tx_count + 1 - prior_tx_count

types = b''.join(flush_data.block_tx_types)
flush_data.block_tx_types.clear()
print(f'len(types) : {len(types) }')
print(f'flush_data.tx_count - prior_tx_count : {flush_data.tx_count - prior_tx_count}')
assert len(types) == flush_data.tx_count - prior_tx_count

# Write the headers, tx counts, and tx hashes
Expand Down
7 changes: 7 additions & 0 deletions electrumx/server/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,12 @@ async def block_header(self, height, cp_height=0):
result.update(await self._merkle_proof(cp_height, height))
return result

async def get_block(self, block_hash, verbose=False):
'''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.'''
self.bump_cost(1.0)
return await self.daemon_request('getblock', block_hash, verbose)

async def block_headers(self, start_height, count, cp_height=0):
'''Return count concatenated block headers as hex for the main chain;
starting at start_height.
Expand Down Expand Up @@ -1401,6 +1407,7 @@ def set_request_handlers(self, ptuple):

handlers = {
'blockchain.block.header': self.block_header,
'blockchain.block.get_block': self.get_block,
'blockchain.block.headers': self.block_headers,
'blockchain.estimatefee': self.estimatefee,
'blockchain.headers.subscribe': self.headers_subscribe,
Expand Down