Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions polyglot/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,32 @@ def get_wif_securely():


def main():
parser = argparse.ArgumentParser(description='Upload a file to Bitcoin SV.')
parser = argparse.ArgumentParser(description='Load files with Bitcoin SV.')
parser.add_argument('file', help='filename')
parser.add_argument('--download', action='store', dest='url', default=None,
help='Download from a url')
parser.add_argument("--testnet", action="store_true", dest="testnet", default=False,
help="Use Testnet")
parser.add_argument("--scaling-testnet", action="store_true", dest="scalingtestnet",
default=False, help="Use Scaling Testnet")
args = parser.parse_args()
wif = get_wif_securely()

try:
bitsv.format.wif_checksum_check(wif)
bitsv.format.wif_to_bytes(wif)
except ValueError as e:
print(f"'{wif}' is not a valid WIF format private key")
sys.exit(1)

uploader = polyglot.Upload(wif, network=set_network(args))
txid = uploader.upload_easy(args.file)
print(txid)
if args.url:
downloader = polyglot.Download(network=set_network(args))
fields = downloader.download_url(args.url, args.file)
for key, value in fields.items():
if key == 'data':
continue
print(key, value)
else:
wif = get_wif_securely()

try:
bitsv.format.wif_checksum_check(wif)
bitsv.format.wif_to_bytes(wif)
except ValueError as e:
print(f"'{wif}' is not a valid WIF format private key")
sys.exit(1)

uploader = polyglot.Upload(wif, network=set_network(args))
txid = uploader.upload_easy(args.file)
print('txid', txid)
166 changes: 149 additions & 17 deletions polyglot/download.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import gzip
import os
from urllib.parse import urlparse

from bitsv.network import NetworkAPI

Expand All @@ -16,7 +18,9 @@ def binary_to_file(binary, file):

Example path (for newer users): "C://Users/username/Pictures/my_picture.jpg etc."
"""
os.makedirs(os.path.dirname(file), exist_ok=True)
subdirs = os.path.dirname(file)
if subdirs:
os.makedirs(subdirs, exist_ok=True)
with open(file, 'wb') as f:
f.write(binary)

Expand Down Expand Up @@ -123,14 +127,13 @@ def b_fields_from_txid(self, txid):
fields = {}
for script in self.scripts_from_txid(txid):
data = self.pushdata_from_script(script)
newfields = self.b_fields_from_pushdata(data)
if len(fields):
if 'extra' not in fields:
fields['extra'] = newfields
fields['extra'] = data
else:
fields['extra'].extend(newfields)
elif len(newfields):
fields = newfields
fields['extra'].extend(data)
else:
fields = self.b_fields_from_pushdata(data)
return fields

def b_file_from_txid(self, txid, file):
Expand All @@ -155,11 +158,12 @@ def bcat_part_detect_fromtxid(self, txid):
return True
return False

def bcat_part_binary_from_pushdata(self, data):
def bcat_part_binary_from_pushdata(self, data, gunzip = False):
if not self.bcat_part_detect_from_pushdata(data):
subfields = self.bcat_linker_fields_from_pushdata(data)
if subfields:
return self.bcat_binary_from_txids(subfields['parts'])
gunzip = gunzip and subfields['flag'] in ('gzip', 'nested-gzip')
return self.bcat_binary_from_txids(subfields['parts'], gunzip)
else:
return self.b_binary_from_pushdata(data)
offset = 0
Expand Down Expand Up @@ -214,24 +218,34 @@ def bcat_linker_fields_from_txid(self, txid):
break
return fields

def bcat_binary_from_txids(self, txids):
def bcat_binary_from_txids(self, txids, gunzip = False):
data = bytes()
for txid in txids:
data += self.bcat_part_binary_from_txid(txid)
if gunzip:
data = gzip.decompress(data)
return data

def bcat_fields_from_txid(self, txid, gunzip = True):
fields = self.bcat_linker_fields_from_txid(txid)
if not fields:
return fields
fields['data'] = self.bcat_binary_from_txids(fields['parts'])
if gunzip and fields['flag'] in ('gzip', 'nested-gzip'):
def bcat_fields_from_linker_fields(self, fields, gunzip = True):
fields = fields.copy()
gunzip = gunzip and fields['flag'] in ('gzip', 'nested-gzip')
fields['data'] = self.bcat_binary_from_txids(fields['parts'], gunzip)
if gunzip:
# change 'flag' to reflect that we mutated the data
fields['flag'] = fields['flag'].replace('zip','unzipped')
# unzip
fields['data'] = gzip.decompress(fields['data'])
return fields

def bcat_fields_from_pushdata(self, data, gunzip = True):
fields = self.bcat_linker_fields_from_pushdata(data)
return self.bcat_fields_from_linker_fields(fields, gunzip)

def bcat_fields_from_txid(self, txid, gunzip = True):
for script in self.scripts_from_txid(txid):
data = self.pushdata_from_script(script)
fields = self.bcat_linker_fields_from_pushdata(data)
if fields:
return self.bcat_fields_from_linker_fields(fields, gunzip)

def download_bcat(self, txid, file, gunzip = True):
fields = self.bcat_linker_fields_from_txid(txid)
if not fields:
Expand All @@ -249,3 +263,121 @@ def download_bcat(self, txid, file, gunzip = True):
f.write(data)
return fields

# D

def d_detect_from_pushdata(self, data):
return len(data) >= 3 and (data[0].decode('utf-8') == D or data[1].decode('utf-8') == D)

def d_detect_from_txid(self, txid):
for script in self.scripts_from_txid(txid):
data = self.pushdata_from_script(script)
if self.d_detect_from_pushdata(data):
return True
return False

def d_linker_fields_from_pushdata(self, data):
fields = {}
if not self.d_detect_from_pushdata(data):
return fields
offset = 0
if len(data[0]) == 0:
offset = 1
fields['key'] = self.binary_to_bsv_string(data[offset + 1])
fields['value'] = self.binary_to_bsv_string(data[offset + 2])
fields['type'] = self.binary_to_bsv_string(data[offset + 3])
fields['sequence'] = int(data[offset + 4])
if len(data) > offset + 5:
fields['extra'] = data[offset + 5:]
return fields

def d_linker_fields_from_txid(self, txid):
fields = {}
for script in self.scripts_from_txid(txid):
data = self.pushdata_from_script(script)
if len(fields):
if 'extra' not in fields:
fields['extra'] = data
else:
fields['extra'].extend(data)
else:
fields = self.d_linker_fields_from_pushdata(data)
if fields:
fields['txid'] = txid
return fields

def d_linker_fields_from_address(self, address):
for txid in self.get_transactions(address):
fields = self.d_linker_fields_from_txid(txid)
if fields:
yield fields

def d_fields_from_linker_fields(self, fields):
fields = fields.copy()
if not len(fields):
fields['data'] = None
elif fields['value'] is None:
fields['data'] = None
elif fields['type'] == 'txt':
fields['data'] = bytes(fields['value'], 'utf-8')
elif fields['type'] == 'tx' or fields['type'] == 'b' or fields['type'] == 'bcat':
fields['data'] = None
for script in self.scripts_from_txid(fields['value']):
data = self.pushdata_from_script(script)
if self.b_detect_from_pushdata(data):
newbinary = self.b_binary_from_pushdata(data)
elif self.bcat_linker_detect_from_pushdata(data):
newbinary = self.bcat_fields_from_pushdata(data)['data']
else:
raise ValueError('unrecognised tx type id {}'.format(fields['value']))
if newbinary is not None:
if fields['data'] is not None:
fields['data'] += newbinary
else:
fields['data'] = newbinary
else:
raise ValueError('unhandled d type "{}"'.format(fields['type']))
return fields

def d_fields_from_address(self, address):
for fields in self.d_linker_fields_from_address(address):
yield self.d_fields_from_linker_fields(fields)

def download_d_linker_fields(self, fields, file):
if fields['type'] == 'txt':
data = bytes(fields['value'], 'utf-8')
self.binary_to_file(data, file)
elif fields['type'] == 'tx' or fields['type'] == 'b' or fields['type'] == 'bcat':
try:
self.download_bcat(fields['value'], file)
except ValueError:
self.download_b(fields['value'], file)
else:
raise ValueError('unrecognised d value')

def download_d(self, address, key, file):
for linker_fields in self.d_linker_fields_from_address(address):
if linker_fields['key'] == key:
self.download_d_linker_fields(linker_fields, file)
raise ValueError('d tx not found')

# urls

def download_url(self, url, file):
url = urlparse(url)
if url.scheme == 'b':
return self.download_b(url.netloc, file)
elif url.scheme == 'bcat':
return self.download_bcat(url.netloc, file)
elif url.scheme == 'd':
urlpath = url.path
if len(urlpath) and urlpath[0] == '/':
urlpath = urlpath[1:]
return self.download_d(url.netloc, urlpath, file)
elif url.scheme == 'bit':
if url.netloc == D:
return self.download_url('D:/{}'.format(url.path), file)
elif url.netloc == B:
return self.download_url('B:/{}'.format(url.path), file)
elif url.netloc == BCAT:
return self.download_url('BCAT:/{}'.format(url.path), file)
raise ValueError('unrecognised url scheme')