diff --git a/commune/__init__.py b/commune/__init__.py index d1c4f58f9..4b8928925 100755 --- a/commune/__init__.py +++ b/commune/__init__.py @@ -1,15 +1,20 @@ +from .module import Module # the module module +M = c = Block = Agent = Module # alias c.Module as c.Block, c.Lego, c.M +from .vali.vali import Vali # the vali module +from .server.server import Server # the server module +from .server.client import Client # the client module +from .key.key import Key # the key module - -from .module import Module # the module module -M = c = Block = Agent = Module # alias c.Module as c.Block, c.Lego, c.M -from .vali import Vali # the vali module -from .server import Server, Client # the server module -from .key import Key # the key module # set the module functions as globalsw c.add_to_globals(globals()) +network = c.network + v = Vali s = Server k = Key # set the module functions as globals -key = c.get_key # override key function with file key in commune/key.py TODO: remove this line with a better solution \ No newline at end of file +key = ( + c.get_key +) # override key function with file key in commune/key.py TODO: remove this line with a better solution + diff --git a/commune/key/constants.py b/commune/key/constants.py new file mode 100644 index 000000000..af3f083ff --- /dev/null +++ b/commune/key/constants.py @@ -0,0 +1,32 @@ +from ecdsa.curves import SECP256k1 + +SS58_FORMAT = 42 + +ERROR_INVALID_KEY = "Invalid key provided." +ERROR_KEY_GENERATION_FAILED = "Key generation failed." +ERROR_KEY_VALIDATION_FAILED = "Key validation failed." + +DEV_PHRASE = 'bottom drive obey lake curtain smoke basket hold race lonely fit walk' + +JUNCTION_ID_LEN = 32 +RE_JUNCTION = r'(\/\/?)([^/]+)' + +NONCE_LENGTH = 24 +SCRYPT_LENGTH = 32 + (3 * 4) +PKCS8_DIVIDER = bytes([161, 35, 3, 33, 0]) +PKCS8_HEADER = bytes([48, 83, 2, 1, 1, 48, 5, 6, 3, 43, 101, 112, 4, 34, 4, 32]) +PUB_LENGTH = 32 +SALT_LENGTH = 32 +SEC_LENGTH = 64 +SEED_LENGTH = 32 + +SCRYPT_N = 1 << 15 +SCRYPT_P = 1 +SCRYPT_R = 8 + +BIP39_PBKDF2_ROUNDS = 2048 +BIP39_SALT_MODIFIER = "mnemonic" +BIP32_PRIVDEV = 0x80000000 +BIP32_CURVE = SECP256k1 +BIP32_SEED_MODIFIER = b"Bitcoin seed" +ETH_DERIVATION_PATH = "m/44'/60'/0'/0" \ No newline at end of file diff --git a/commune/key/key.py b/commune/key/key.py index febcb925d..55e48f6a2 100644 --- a/commune/key/key.py +++ b/commune/key/key.py @@ -13,297 +13,22 @@ from Crypto.Cipher import AES import nacl.bindings import nacl.public -from eth_keys.datatypes import PrivateKey from scalecodec.utils.ss58 import ss58_encode, ss58_decode, get_ss58_format from scalecodec.base import ScaleBytes -from bip39 import bip39_to_mini_secret, bip39_generate, bip39_validate -import sr25519 -import ed25519_zebra +from bip39 import bip39_generate, bip39_validate import commune as c -import re -from hashlib import blake2b -from math import ceil -from scalecodec.utils.ss58 import ss58_decode, ss58_encode, is_valid_ss58_address, get_ss58_format -import base64 - -import json -from os import urandom -from typing import Union -from nacl.hashlib import scrypt -from nacl.secret import SecretBox -from sr25519 import pair_from_ed25519_secret_key -from scalecodec.types import Bytes -import hashlib -import hmac -import struct -from ecdsa.curves import SECP256k1 -from eth_keys.datatypes import Signature, PrivateKey -from eth_utils import to_checksum_address, keccak as eth_utils_keccak - -BIP39_PBKDF2_ROUNDS = 2048 -BIP39_SALT_MODIFIER = "mnemonic" -BIP32_PRIVDEV = 0x80000000 -BIP32_CURVE = SECP256k1 -BIP32_SEED_MODIFIER = b'Bitcoin seed' -ETH_DERIVATION_PATH = "m/44'/60'/0'/0" - -class PublicKey: - def __init__(self, private_key): - self.point = int.from_bytes(private_key, byteorder='big') * BIP32_CURVE.generator - - def __bytes__(self): - xstr = int(self.point.x()).to_bytes(32, byteorder='big') - parity = int(self.point.y()) & 1 - return (2 + parity).to_bytes(1, byteorder='big') + xstr - - def address(self): - x = int(self.point.x()) - y = int(self.point.y()) - s = x.to_bytes(32, 'big') + y.to_bytes(32, 'big') - return to_checksum_address(eth_utils_keccak(s)[12:]) - -def mnemonic_to_bip39seed(mnemonic, passphrase): - mnemonic = bytes(mnemonic, 'utf8') - salt = bytes(BIP39_SALT_MODIFIER + passphrase, 'utf8') - return hashlib.pbkdf2_hmac('sha512', mnemonic, salt, BIP39_PBKDF2_ROUNDS) - -def bip39seed_to_bip32masternode(seed): - h = hmac.new(BIP32_SEED_MODIFIER, seed, hashlib.sha512).digest() - key, chain_code = h[:32], h[32:] - return key, chain_code - -def derive_bip32childkey(parent_key, parent_chain_code, i): - assert len(parent_key) == 32 - assert len(parent_chain_code) == 32 - k = parent_chain_code - if (i & BIP32_PRIVDEV) != 0: - key = b'\x00' + parent_key - else: - key = bytes(PublicKey(parent_key)) - d = key + struct.pack('>L', i) - while True: - h = hmac.new(k, d, hashlib.sha512).digest() - key, chain_code = h[:32], h[32:] - a = int.from_bytes(key, byteorder='big') - b = int.from_bytes(parent_key, byteorder='big') - key = (a + b) % int(BIP32_CURVE.order) - if a < BIP32_CURVE.order and key != 0: - key = key.to_bytes(32, byteorder='big') - break - d = b'\x01' + h[32:] + struct.pack('>L', i) - return key, chain_code - -def parse_derivation_path(str_derivation_path): - path = [] - if str_derivation_path[0:2] != 'm/': - raise ValueError("Can't recognize derivation path. It should look like \"m/44'/60/0'/0\".") - for i in str_derivation_path.lstrip('m/').split('/'): - if "'" in i: - path.append(BIP32_PRIVDEV + int(i[:-1])) - else: - path.append(int(i)) - return path - - -def mnemonic_to_ecdsa_private_key(mnemonic: str, str_derivation_path: str = None, passphrase: str = "") -> bytes: - - if str_derivation_path is None: - str_derivation_path = f'{ETH_DERIVATION_PATH}/0' - - derivation_path = parse_derivation_path(str_derivation_path) - bip39seed = mnemonic_to_bip39seed(mnemonic, passphrase) - master_private_key, master_chain_code = bip39seed_to_bip32masternode(bip39seed) - private_key, chain_code = master_private_key, master_chain_code - for i in derivation_path: - private_key, chain_code = derive_bip32childkey(private_key, chain_code, i) - return private_key - - -def ecdsa_sign(private_key: bytes, message: bytes) -> bytes: - signer = PrivateKey(private_key) - return signer.sign_msg(message).to_bytes() - - -def ecdsa_verify(signature: bytes, data: bytes, address: bytes) -> bool: - signature_obj = Signature(signature) - recovered_pubkey = signature_obj.recover_public_key_from_msg(data) - return recovered_pubkey.to_canonical_address() == address - -NONCE_LENGTH = 24 -SCRYPT_LENGTH = 32 + (3 * 4) -PKCS8_DIVIDER = bytes([161, 35, 3, 33, 0]) -PKCS8_HEADER = bytes([48, 83, 2, 1, 1, 48, 5, 6, 3, 43, 101, 112, 4, 34, 4, 32]) -PUB_LENGTH = 32 -SALT_LENGTH = 32 -SEC_LENGTH = 64 -SEED_LENGTH = 32 -SCRYPT_N = 1 << 15 -SCRYPT_P = 1 -SCRYPT_R = 8 - -def decode_pair_from_encrypted_json(json_data: Union[str, dict], passphrase: str) -> tuple: - """ - Decodes encrypted PKCS#8 message from PolkadotJS JSON format - - Parameters - ---------- - json_data - passphrase - - Returns - ------- - tuple containing private and public key - """ - if type(json_data) is str: - json_data = json.loads(json_data) - - # Check requirements - if json_data.get('encoding', {}).get('version') != "3": - raise ValueError("Unsupported JSON format") - - encrypted = base64.b64decode(json_data['encoded']) - - if 'scrypt' in json_data['encoding']['type']: - salt = encrypted[0:32] - n = int.from_bytes(encrypted[32:36], byteorder='little') - p = int.from_bytes(encrypted[36:40], byteorder='little') - r = int.from_bytes(encrypted[40:44], byteorder='little') - - password = scrypt(passphrase.encode(), salt, n=n, r=r, p=p, dklen=32, maxmem=2 ** 26) - encrypted = encrypted[SCRYPT_LENGTH:] - - else: - password = passphrase.encode().rjust(32, b'\x00') - - if "xsalsa20-poly1305" not in json_data['encoding']['type']: - raise ValueError("Unsupported encoding type") - - nonce = encrypted[0:NONCE_LENGTH] - message = encrypted[NONCE_LENGTH:] - - secret_box = SecretBox(key=password) - decrypted = secret_box.decrypt(message, nonce) - - # Decode PKCS8 message - secret_key, public_key = decode_pkcs8(decrypted) - - if 'sr25519' in json_data['encoding']['content']: - # Secret key from PolkadotJS is an Ed25519 expanded secret key, so has to be converted - # https://github.com/polkadot-js/wasm/blob/master/packages/wasm-crypto/src/rs/sr25519.rs#L125 - converted_public_key, secret_key = pair_from_ed25519_secret_key(secret_key) - assert(public_key == converted_public_key) - - return secret_key, public_key - - -def decode_pkcs8(ciphertext: bytes) -> tuple: - current_offset = 0 - - header = ciphertext[current_offset:len(PKCS8_HEADER)] - if header != PKCS8_HEADER: - raise ValueError("Invalid Pkcs8 header found in body") - - current_offset += len(PKCS8_HEADER) - - secret_key = ciphertext[current_offset:current_offset + SEC_LENGTH] - current_offset += SEC_LENGTH - - divider = ciphertext[current_offset:current_offset + len(PKCS8_DIVIDER)] - - if divider != PKCS8_DIVIDER: - raise ValueError("Invalid Pkcs8 divider found in body") - - current_offset += len(PKCS8_DIVIDER) - - public_key = ciphertext[current_offset: current_offset + PUB_LENGTH] - - return secret_key, public_key -def encode_pkcs8(public_key: bytes, private_key: bytes) -> bytes: - return PKCS8_HEADER + private_key + PKCS8_DIVIDER + public_key +from scalecodec.utils.ss58 import is_valid_ss58_address -def encode_pair(public_key: bytes, private_key: bytes, passphrase: str) -> bytes: - """ - Encode a public/private pair to PKCS#8 format, encrypted with provided passphrase +from commune.key.constants import * +from .utils import * - Parameters - ---------- - public_key: 32 bytes public key - private_key: 64 bytes private key - passphrase: passphrase to encrypt the PKCS#8 message - - Returns - ------- - (Encrypted) PKCS#8 message bytes - """ - message = encode_pkcs8(public_key, private_key) - - salt = urandom(SALT_LENGTH) - password = scrypt(passphrase.encode(), salt, n=SCRYPT_N, r=SCRYPT_R, p=SCRYPT_P, dklen=32, maxmem=2 ** 26) - - secret_box = SecretBox(key=password) - message = secret_box.encrypt(message) - - scrypt_params = SCRYPT_N.to_bytes(4, 'little') + SCRYPT_P.to_bytes(4, 'little') + SCRYPT_R.to_bytes(4, 'little') - - return salt + scrypt_params + message.nonce + message.ciphertext - - - - -JUNCTION_ID_LEN = 32 -RE_JUNCTION = r'(\/\/?)([^/]+)' - - -class DeriveJunction: - def __init__(self, chain_code, is_hard=False): - self.chain_code = chain_code - self.is_hard = is_hard - - @classmethod - def from_derive_path(cls, path: str, is_hard=False): - - if path.isnumeric(): - byte_length = ceil(int(path).bit_length() / 8) - chain_code = int(path).to_bytes(byte_length, 'little').ljust(32, b'\x00') - - else: - path_scale = Bytes() - path_scale.encode(path) - - if len(path_scale.data) > JUNCTION_ID_LEN: - chain_code = blake2b(path_scale.data.data, digest_size=32).digest() - else: - chain_code = bytes(path_scale.data.data.ljust(32, b'\x00')) - - return cls(chain_code=chain_code, is_hard=is_hard) - - -def extract_derive_path(derive_path: str): - - path_check = '' - junctions = [] - paths = re.findall(RE_JUNCTION, derive_path) - - if paths: - path_check = ''.join(''.join(path) for path in paths) - - for path_separator, path_value in paths: - junctions.append(DeriveJunction.from_derive_path( - path=path_value, is_hard=path_separator == '//') - ) - - if path_check != derive_path: - raise ValueError('Reconstructed path "{}" does not match input'.format(path_check)) - - return junctions - -DEV_PHRASE = 'bottom drive obey lake curtain smoke basket hold race lonely fit walk' class KeyType: ED25519 = 0 SR25519 = 1 ECDSA = 2 + SOLANA = 3 KeyType.crypto_types = [k for k in KeyType.__dict__.keys() if not k.startswith('_')] KeyType.crypto_type_map = {k.lower():v for k,v in KeyType.__dict__.items() if k in KeyType.crypto_types } KeyType.crypto_types = list(KeyType.crypto_type_map.keys()) @@ -312,45 +37,55 @@ class Key(c.Module): crypto_types = KeyType.crypto_types crypto_type_map = KeyType.crypto_type_map crypto_types = list(crypto_type_map.keys()) - ss58_format = 42 - crypto_type = 'sr25519' - def __init__(self, - private_key: Union[bytes, str] = None, - ss58_format: int = ss58_format, - crypto_type: int = crypto_type, - derive_path: str = None, - path:str = None, - **kwargs): - self.set_private_key(private_key=private_key, - ss58_format=ss58_format, - crypto_type=crypto_type, - derive_path=derive_path, - path=path, **kwargs) - + + def __new__( + cls, + crypto_type: Union[str, int] = KeyType.SR25519, + **kwargs, + ): + crypto_type = cls.resolve_crypto_type(crypto_type) + if crypto_type == KeyType.SR25519: + from .types.dot.sr25519 import DotSR25519 + return super().__new__(DotSR25519) + elif crypto_type == KeyType.ED25519: + from commune.key.types.dot.ed25519 import DotED25519 + return super().__new__(DotED25519) + elif crypto_type == KeyType.ECDSA: + from .types.eth import ECDSA + return super().__new__(ECDSA) + elif crypto_type == KeyType.SOLANA: + from .types.sol import Solana + return super().__new__(Solana) + else: + raise NotImplementedError(f"unsupported crypto_type {crypto_type}") + @property def short_address(self): n = 4 - return self.ss58_address[:n] + '...' + self.ss58_address[-n:] - + return self.ss58_address[:n] + "..." + self.ss58_address[-n:] + def set_crypto_type(self, crypto_type): crypto_type = self.resolve_crypto_type(crypto_type) if crypto_type != self.crypto_type: kwargs = { - 'private_key': self.private_key, - 'ss58_format': self.ss58_format, - 'derive_path': self.derive_path, - 'path': self.path, - 'crypto_type': crypto_type # update crypto_type + "private_key": self.private_key, + "ss58_format": self.ss58_format, + "derive_path": self.derive_path, + "path": self.path, + "crypto_type": crypto_type, # update crypto_type } return self.set_private_key(**kwargs) else: - return {'success': False, 'message': f'crypto_type already set to {crypto_type}'} + return { + "success": False, + "message": f"crypto_type already set to {crypto_type}", + } def set_private_key(self, private_key: Union[bytes, str] = None, - ss58_format: int = ss58_format, - crypto_type: int = crypto_type, + ss58_format: int = SS58_FORMAT, + crypto_type: int = KeyType.SR25519, derive_path: str = None, path:str = None, **kwargs @@ -366,96 +101,64 @@ def set_private_key(self, private_key: hex string or bytes of private key ss58_format: Substrate address format, default to 42 when omitted seed_hex: hex string of seed - crypto_type: Use KeyType.SR25519 or KeyType.ED25519 cryptography for generating the Key """ - crypto_type = self.resolve_crypto_type(crypto_type) - # If no arguments are provided, generate a random keypair - if private_key == None: - private_key = self.new_key(crypto_type=crypto_type).private_key - if type(private_key) == str: - private_key = c.str2bytes(private_key) - crypto_type = self.resolve_crypto_type(crypto_type) - if crypto_type == KeyType.SR25519: - if len(private_key) != 64: - private_key = sr25519.pair_from_seed(private_key)[1] - public_key = sr25519.public_from_secret_key(private_key) - key_address = ss58_encode(public_key, ss58_format=ss58_format) - hash_type = 'ss58' - elif crypto_type == KeyType.ED25519: - private_key = private_key[:32] if len(private_key) == 64 else private_key - public_key, private_key = ed25519_zebra.ed_from_seed(private_key) - key_address = ss58_encode(public_key, ss58_format=ss58_format) - hash_type = 'ss58' - elif crypto_type == KeyType.ECDSA: - private_key = private_key[0:32] - private_key_obj = PrivateKey(private_key) - public_key = private_key_obj.public_key.to_address() - key_address = private_key_obj.public_key.to_checksum_address() - hash_type = 'h160' - else: - raise ValueError('crypto_type "{}" not supported'.format(crypto_type)) - if type(public_key) is str: - public_key = bytes.fromhex(public_key.replace('0x', '')) - - self.hash_type = hash_type - self.public_key = public_key - self.address = self.key_address = self.ss58_address = key_address - self.private_key = private_key - self.crypto_type = crypto_type - self.derive_path = derive_path - self.path = path - self.ss58_format = ss58_format - self.key_address = self.ss58_address - self.key_type = self.crypto_type2name(self.crypto_type) - return {'key_address':key_address, 'crypto_type':crypto_type} + raise NotImplementedError("set_private_key not implemented") @classmethod - def add_key(cls, path:str, mnemonic:str = None, password:str=None, refresh:bool=False, private_key=None, **kwargs): - if cls.key_exists(path) and not refresh : - c.print(f'key already exists at {path}') - key_json = cls.get(path) - if key_json != None: - return cls.from_json(cls.get(path)) - key = cls.new_key(mnemonic=mnemonic, private_key=private_key, **kwargs) + def add_key( + cls, + path: str, + mnemonic: str = None, + password: str = None, + refresh: bool = False, + private_key=None, + crypto_type: Union[str, int] = KeyType.SR25519, + **kwargs, + ): + if cls.key_exists(path) and not refresh: + c.print(f"key already exists at {path}") + return cls.get(path) + key = cls.new_key(mnemonic=mnemonic, private_key=private_key, crypto_type=crypto_type, **kwargs) key.path = path key_json = key.to_json() if password != None: key_json = cls.encrypt(data=key_json, password=password) c.print(cls.put(path, key_json)) cls.update() - return json.loads(key_json) - + return json.loads(key_json) + @classmethod - def ticket(cls , data=None, key=None, **kwargs): - return cls.get_key(key).sign({'data':data, 'time': c.time()} , to_json=True, **kwargs) + def ticket(cls, data=None, key=None, **kwargs): + return cls.get_key(key).sign( + {"data": data, "time": c.time()}, to_json=True, **kwargs + ) @classmethod - def mv_key(cls, path, new_path): - assert cls.key_exists(path), f'key does not exist at {path}' - cls.put(new_path, cls.get_key(path).to_json()) + def mv_key(cls, path, new_path, crypto_type=KeyType.SR25519): + assert cls.key_exists(path), f"key does not exist at {path}" + cls.put(new_path, cls.get_key(path, crypto_type=crypto_type).to_json()) cls.rm_key(path) - assert cls.key_exists(new_path), f'key does not exist at {new_path}' - assert not cls.key_exists(path), f'key still exists at {path}' - new_key = cls.get_key(new_path) - return {'success': True, 'from': path , 'to': new_path, 'key': new_key} - + assert cls.key_exists(new_path), f"key does not exist at {new_path}" + assert not cls.key_exists(path), f"key still exists at {path}" + new_key = cls.get_key(new_path, crypto_type=crypto_type) + return {"success": True, "from": path, "to": new_path, "key": new_key} + @classmethod def copy_key(cls, path, new_path): - assert cls.key_exists(path), f'key does not exist at {path}' + assert cls.key_exists(path), f"key does not exist at {path}" cls.put(new_path, cls.get_key(path).to_json()) - assert cls.key_exists(new_path), f'key does not exist at {new_path}' - assert cls.get_key(path) == cls.get_key(new_path), f'key does not match' + assert cls.key_exists(new_path), f"key does not exist at {new_path}" + assert cls.get_key(path) == cls.get_key(new_path), "key does not match" new_key = cls.get_key(new_path) - return {'success': True, 'from': path , 'to': new_path, 'key': new_key} - - + return {"success": True, "from": path, "to": new_path, "key": new_key} + @classmethod - def add_keys(cls, name, n=100, verbose:bool = False, **kwargs): + def add_keys(cls, name, n=100, verbose: bool = False, **kwargs): response = [] for i in range(n): - key_name = f'{name}.{i}' + key_name = f"{name}.{i}" if bool == True: - c.print(f'generating key {key_name}') + c.print(f"generating key {key_name}") response.append(cls.add_key(key_name, **kwargs)) return response @@ -466,97 +169,110 @@ def key2encrypted(self): for k in keys: key2encrypted[k] = self.is_key_encrypted(k) return key2encrypted - + def encrypted_keys(self): - return [k for k,v in self.key2encrypted().items() if v == True] - + return [k for k, v in self.key2encrypted().items() if v == True] + @classmethod - def key_info(cls, path='module', **kwargs): + def key_info(cls, path="module", **kwargs): return cls.get_key_json(path) - + @classmethod def load_key(cls, path=None): key_info = cls.get(path) key_info = c.jload(key_info) - if key_info['path'] == None: - key_info['path'] = path.replace('.json', '').split('/')[-1] + if key_info["path"] == None: + key_info["path"] = path.replace(".json", "").split("/")[-1] cls.add_key(**key_info) - return {'status': 'success', 'message': f'key loaded from {path}'} + return {"status": "success", "message": f"key loaded from {path}"} - @classmethod - def save_keys(cls, path='saved_keys.json', **kwargs): + def save_keys(cls, path="saved_keys.json", **kwargs): path = cls.resolve_path(path) - c.print(f'saving mems to {path}') + c.print(f"saving mems to {path}") key2mnemonic = cls.key2mnemonic() c.put_json(path, key2mnemonic) - return {'success': True, 'msg': 'saved keys', 'path':path, 'n': len(key2mnemonic)} - + return { + "success": True, + "msg": "saved keys", + "path": path, + "n": len(key2mnemonic), + } + @classmethod - def load_keys(cls, path='saved_keys.json', refresh=False, **kwargs): + def load_keys(cls, path="saved_keys.json", refresh=False, **kwargs): key2mnemonic = c.get_json(path) - for k,mnemonic in key2mnemonic.items(): + for k, mnemonic in key2mnemonic.items(): try: cls.add_key(k, mnemonic=mnemonic, refresh=refresh, **kwargs) - except Exception as e: + except Exception: # c.print(f'failed to load mem {k} due to {e}', color='red') pass - return {'loaded_mems':list(key2mnemonic.keys()), 'path':path} + return {"loaded_mems": list(key2mnemonic.keys()), "path": path} + loadkeys = loadmems = load_keys - + @classmethod def key2mnemonic(cls, search=None) -> dict[str, str]: """ keyname (str) --> mnemonic (str) - """ mems = {} for key in cls.keys(search): try: mems[key] = cls.get_mnemonic(key) except Exception as e: - c.print(f'failed to get mem for {key} due to {e}') + + c.print(f"failed to get mem for {key} due to {e}") if search: - mems = {k:v for k,v in mems.items() if search in k or search in v} + mems = {k: v for k, v in mems.items() if search in k or search in v} return mems @classmethod - def get_key(cls, - path:str,password:str=None, - create_if_not_exists:bool = True, - crypto_type=crypto_type, - **kwargs): - for k in ['crypto_type', 'key_type', 'type']: - if k in kwargs: - crypto_type = kwargs.pop(k) - break - if hasattr(path, 'key_address'): - key = path + def get_key( + cls, + path: str, + crypto_type: Union[str, int] = KeyType.SR25519, + password: str = None, + create_if_not_exists: bool = True, + **kwargs, + ): + crypto_type = cls.resolve_crypto_type(crypto_type) + if hasattr(path, "key_address"): + key = path return key - path = path or 'module' + path = path or "module" + # if ss58_address is provided, get key from address if cls.valid_ss58_address(path): path = cls.address2key().get(path) if not cls.key_exists(path): if create_if_not_exists: - key = cls.add_key(path, **kwargs) - c.print(f'key does not exist, generating new key -> {key["path"]}') + key = cls.add_key(path, crypto_type=crypto_type, **kwargs) + c.print(f"key does not exist, generating new key -> {key['path']}") else: print(path) - raise ValueError(f'key does not exist at --> {path}') + raise ValueError(f"key does not exist at --> {path}") + key_json = cls.get(path) # if key is encrypted, decrypt it if cls.is_encrypted(key_json): key_json = c.decrypt(data=key_json, password=password) if key_json == None: - c.print({'status': 'error', 'message': f'key is encrypted, please {path} provide password'}) + c.print( + { + "status": "error", + "message": f"key is encrypted, please {path} provide password", + } + ) return None key_json = c.jload(key_json) if isinstance(key_json, str) else key_json - key = cls.from_json(key_json, crypto_type=crypto_type) + key = cls.from_json(key_json, crypto_type=crypto_type) key.path = path return key - + + @classmethod def get_keys(cls, search=None, clean_failed_keys=False): keys = {} @@ -564,53 +280,61 @@ def get_keys(cls, search=None, clean_failed_keys=False): if str(search) in key or search == None: try: keys[key] = cls.get_key(key) - except Exception as e: + + except Exception: + continue if keys[key] == None: if clean_failed_keys: cls.rm_key(key) - keys.pop(key) + keys.pop(key) return keys - + @classmethod def key2address(cls, search=None, max_age=10, update=False, **kwargs): - path = 'key2address' + path = "key2address" key2address = cls.get(path, None, max_age=max_age, update=update) if key2address == None: - key2address = { k: v.ss58_address for k,v in cls.get_keys(search).items()} + key2address = {k: v.ss58_address for k, v in cls.get_keys(search).items()} cls.put(path, key2address) return key2address - + @classmethod def n(cls, search=None, **kwargs): return len(cls.key2address(search, **kwargs)) @classmethod - def address2key(cls, search:Optional[str]=None, update:bool=False): - address2key = { v: k for k,v in cls.key2address(update=update).items()} - if search != None : + def address2key(cls, search: Optional[str] = None, update: bool = False): + address2key = {v: k for k, v in cls.key2address(update=update).items()} + if search != None: return address2key.get(search, None) return address2key - + @classmethod def get_address(cls, key): return cls.get_key(key).ss58_address + get_addy = get_address + @classmethod def key_paths(cls): return cls.ls() - address_seperator = '_address=' + + address_seperator = "_address=" + + @classmethod def key2path(cls) -> dict: """ defines the path for each key """ - path2key_fn = lambda path: '.'.join(path.split('/')[-1].split('.')[:-1]) - key2path = {path2key_fn(path):path for path in cls.key_paths()} + path2key_fn = lambda path: ".".join(path.split("/")[-1].split(".")[:-1]) + key2path = {path2key_fn(path): path for path in cls.key_paths()} return key2path @classmethod - def keys(cls, search : str = None, **kwargs): + def keys(cls, search: str = None, **kwargs): + keys = list(cls.key2path().keys()) if search != None: keys = [key for key in keys if search in key] @@ -619,141 +343,148 @@ def keys(cls, search : str = None, **kwargs): @classmethod def n(cls, *args, **kwargs): return len(cls.key2address(*args, **kwargs)) - + @classmethod def key_exists(cls, key, **kwargs): path = cls.get_key_path(key) import os return os.path.exists(path) - + @classmethod def get_key_path(cls, key): storage_dir = cls.storage_dir() - key_path = storage_dir + '/' + key + '.json' + key_path = storage_dir + "/" + key + ".json" return key_path + @classmethod def get_key_json(cls, key): storage_dir = cls.storage_dir() - key_path = storage_dir + '/' + key + '.json' + key_path = storage_dir + "/" + key + ".json" return c.get(key_path) + @classmethod - def get_key_address(cls, key): + def get_key_address(cls, key): return cls.get_key(key).ss58_address - + + @classmethod def rm_key(cls, key=None): key2path = cls.key2path() keys = list(key2path.keys()) if key not in keys: - raise Exception(f'key {key} not found, available keys: {keys}') + raise Exception(f"key {key} not found, available keys: {keys}") c.rm(key2path[key]) - return {'deleted':[key]} - - + return {"deleted": [key]} @classmethod - def crypto_name2type(cls, name:str): + def crypto_name2type(cls, name: str): crypto_type_map = cls.crypto_type_map name = name.lower() - if not name in crypto_type_map: - raise ValueError(f'crypto_type {name} not supported {crypto_type_map}') + if name not in crypto_type_map: + raise ValueError(f"crypto_type {name} not supported {crypto_type_map}") return crypto_type_map[name] - + @classmethod - def crypto_type2name(cls, crypto_type:str): - crypto_type_map ={v:k for k,v in cls.crypto_type_map.items()} + def crypto_type2name(cls, crypto_type: str): + crypto_type_map = {v: k for k, v in cls.crypto_type_map.items()} return crypto_type_map[crypto_type] - + @classmethod def resolve_crypto_type_name(cls, crypto_type): return cls.crypto_type2name(cls.resolve_crypto_type(crypto_type)) - + @classmethod def resolve_crypto_type(cls, crypto_type): - if isinstance(crypto_type, int) or (isinstance(crypto_type, str) and c.is_int(crypto_type)): + if isinstance(crypto_type, int) or ( + isinstance(crypto_type, str) and c.is_int(crypto_type) + ): crypto_type = int(crypto_type) crypto_type_map = cls.crypto_type_map - reverse_crypto_type_map = {v:k for k,v in crypto_type_map.items()} - assert crypto_type in reverse_crypto_type_map, f'crypto_type {crypto_type} not supported {crypto_type_map}' + reverse_crypto_type_map = {v: k for k, v in crypto_type_map.items()} + assert crypto_type in reverse_crypto_type_map, ( + f"crypto_type {crypto_type} not supported {crypto_type_map}" + ) + crypto_type = reverse_crypto_type_map[crypto_type] if isinstance(crypto_type, str): crypto_type = crypto_type.lower() crypto_type = cls.crypto_name2type(crypto_type) return int(crypto_type) - + @classmethod - def new_private_key(cls, crypto_type='ecdsa'): - return cls.new_key(crypto_type=crypto_type).private_key.hex() - + def new_private_key(cls): + return cls.new_key().private_key.hex() + @classmethod - def new_key(cls, - mnemonic:str = None, - suri:str = None, - private_key: str = None, - crypto_type: Union[int,str] = 'sr25519', - verbose:bool=False, - **kwargs): - ''' + def new_key( + cls, + mnemonic: str = None, + suri: str = None, + private_key: str = None, + verbose: bool = False, + crypto_type: Union[str, int] = KeyType.SR25519, + **kwargs, + ): + """ yo rody, this is a class method you can gen keys whenever fam - ''' + """ if verbose: - c.print(f'generating {crypto_type} keypair, {suri}') - - crypto_type = cls.resolve_crypto_type(crypto_type) + c.print(f"generating polkadot keypair, {suri}") if suri: - key = cls.create_from_uri(suri, crypto_type=crypto_type) + key = cls.create_from_uri(suri, crypto_type=crypto_type) elif mnemonic: key = cls.create_from_mnemonic(mnemonic, crypto_type=crypto_type) elif private_key: - key = cls.create_from_private_key(private_key,crypto_type=crypto_type) + key = cls.create_from_private_key(private_key, crypto_type=crypto_type) else: mnemonic = cls.generate_mnemonic() key = cls.create_from_mnemonic(mnemonic, crypto_type=crypto_type) - return key - + create = gen = new_key - - def to_json(self, password: str = None ) -> dict: - state_dict = c.copy(self.__dict__) - for k,v in state_dict.items(): - if type(v) in [bytes]: - state_dict[k] = v.hex() + + def to_json(self, password: str = None) -> dict: + state_dict = c.copy(self.__dict__) + for k, v in state_dict.items(): + if type(v) in [bytes]: + state_dict[k] = v.hex() if password != None: state_dict[k] = self.encrypt(data=state_dict[k], password=password) - if '_ss58_address' in state_dict: - state_dict['ss58_address'] = state_dict.pop('_ss58_address') + if "_ss58_address" in state_dict: + state_dict["ss58_address"] = state_dict.pop("_ss58_address") state_dict = json.dumps(state_dict) - + return state_dict - + @classmethod - def from_json(cls, obj: Union[str, dict], password: str = None, crypto_type=None) -> dict: + def from_json(cls, obj: Union[str, dict], password: str = None, crypto_type: Union[str, int] = KeyType.SR25519) -> dict: if type(obj) == str: obj = json.loads(obj) if obj == None: - return None - if cls.is_encrypted(obj) and password != None: - obj = cls.decrypt(data=obj, password=password) - if 'ss58_address' in obj: - obj['_ss58_address'] = obj.pop('ss58_address') - if crypto_type != None: - obj['crypto_type'] = crypto_type - return cls(**obj) + return None + for k, v in obj.items(): + if cls.is_encrypted(obj[k]) and password != None: + obj[k] = cls.decrypt(data=obj[k], password=password) + if "ss58_address" in obj: + obj["_ss58_address"] = obj.pop("ss58_address") + obj["crypto_type"] = crypto_type + return cls(**obj) + @classmethod def generate_mnemonic(cls, words: int = 12, language_code: str = "en") -> str: """ params: words: The amount of words to generate, valid values are 12, 15, 18, 21 and 24 - language_code: The language to use, valid values are: 'en', 'zh-hans', - 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. + language_code: The language to use, valid values are: 'en', 'zh-hans', + 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` """ - mnemonic = bip39_generate(words, language_code) - assert cls.validate_mnemonic(mnemonic, language_code), 'mnemonic is invalid' + mnemonic = bip39_generate(words, language_code) + assert cls.validate_mnemonic(mnemonic, language_code), "mnemonic is invalid" + return mnemonic @classmethod @@ -774,7 +505,10 @@ def validate_mnemonic(cls, mnemonic: str, language_code: str = "en") -> bool: @classmethod - def create_from_mnemonic(cls, mnemonic: str = None, ss58_format=ss58_format, crypto_type=KeyType.SR25519, language_code: str = "en") -> 'Key': + def create_from_mnemonic( + cls, mnemonic: str, ss58_format: int = SS58_FORMAT, language_code: str = "en", crypto_type: Union[str, int] = KeyType.SR25519 + ) -> "Key": + """ Create a Key for given memonic @@ -789,30 +523,33 @@ def create_from_mnemonic(cls, mnemonic: str = None, ss58_format=ss58_format, cry ------- Key """ - if not mnemonic: - mnemonic = cls.generate_mnemonic(language_code=language_code) - - if crypto_type == KeyType.ECDSA: - if language_code != "en": - raise ValueError("ECDSA mnemonic only supports english") - private_key = mnemonic_to_ecdsa_private_key(mnemonic) - keypair = cls.create_from_private_key(private_key, ss58_format=ss58_format, crypto_type=crypto_type) + crypto_type = cls.resolve_crypto_type(crypto_type) + if crypto_type == KeyType.SR25519: + from commune.key.types.dot.sr25519 import DotSR25519 + return DotSR25519.create_from_mnemonic(mnemonic=mnemonic, ss58_format=ss58_format, language_code=language_code, crypto_type=crypto_type) + elif crypto_type == KeyType.ED25519: + from commune.key.types.dot.ed25519 import DotED25519 + return DotED25519.create_from_mnemonic(mnemonic=mnemonic, ss58_format=ss58_format, language_code=language_code, crypto_type=crypto_type) + elif crypto_type == KeyType.ECDSA: + from commune.key.types.eth import ECDSA + return ECDSA.create_from_mnemonic(mnemonic=mnemonic, ss58_format=ss58_format, language_code=language_code, crypto_type=crypto_type) + elif crypto_type == KeyType.SOLANA: + from commune.key.types.sol import Solana + return Solana.create_from_mnemonic(mnemonic=mnemonic, ss58_format=ss58_format, language_code=language_code, crypto_type=crypto_type) else: - keypair = cls.create_from_seed( - seed_hex=binascii.hexlify(bytearray(bip39_to_mini_secret(mnemonic, "", language_code))).decode("ascii"), - ss58_format=ss58_format, - crypto_type=crypto_type, - ) - - keypair.mnemonic = mnemonic - - return keypair + raise NotImplementedError("create_from_mnemonic not implemented") from_mnemonic = from_mem = create_from_mnemonic @classmethod - def create_from_seed(cls, seed_hex: Union[bytes, str], ss58_format: Optional[int] = ss58_format, crypto_type=KeyType.SR25519) -> 'Key': + def create_from_seed( + cls, + seed_hex: Union[bytes, str] = None, + ss58_format: Optional[int] = SS58_FORMAT, + crypto_type: Union[str, int] = KeyType.SR25519 + ) -> "Key": + """ Create a Key for given seed @@ -827,42 +564,38 @@ def create_from_seed(cls, seed_hex: Union[bytes, str], ss58_format: Optional[int Key """ crypto_type = cls.resolve_crypto_type(crypto_type) - if type(seed_hex) is str: - seed_hex = bytes.fromhex(seed_hex.replace('0x', '')) if crypto_type == KeyType.SR25519: - public_key, private_key = sr25519.pair_from_seed(seed_hex) + from commune.key.types.dot.sr25519 import DotSR25519 + return DotSR25519.create_from_seed(seed_hex=seed_hex, ss58_format=ss58_format, crypto_type=crypto_type) elif crypto_type == KeyType.ED25519: - private_key, public_key = ed25519_zebra.ed_from_seed(seed_hex) + from commune.key.types.dot.ed25519 import DotED25519 + return DotED25519.create_from_seed(seed_hex=seed_hex, ss58_format=ss58_format, crypto_type=crypto_type) + elif crypto_type == KeyType.ECDSA: + from commune.key.types.eth import ECDSA + return ECDSA.create_from_seed(seed_hex=seed_hex, ss58_format=ss58_format, crypto_type=crypto_type) + elif crypto_type == KeyType.SOLANA: + from commune.key.types.sol import Solana + return Solana.create_from_seed(seed_hex=seed_hex, ss58_format=ss58_format, crypto_type=crypto_type) else: - raise ValueError('crypto_type "{}" not supported'.format(crypto_type)) - - ss58_address = ss58_encode(public_key, ss58_format) + raise NotImplementedError("create_from_seed not implemented") - kwargs = dict( - ss58_address=ss58_address, - public_key=public_key, - private_key=private_key, - ss58_format=ss58_format, - crypto_type=crypto_type, - ) - - return cls(**kwargs) @classmethod - def create_from_password(cls, password:str, crypto_type=2, **kwargs): - key= cls.create_from_uri(password, crypto_type=1, **kwargs) + def create_from_password(cls, password: str, crypto_type: Union[str, int] = KeyType.SR25519, **kwargs): + key = cls.create_from_uri(password, crypto_type=crypto_type, **kwargs) key.set_crypto_type(crypto_type) - return key - + + str2key = pwd2key = password2key = from_password = create_from_password @classmethod def create_from_uri( - cls, - suri: str, - ss58_format: Optional[int] = ss58_format, - crypto_type=KeyType.SR25519, - language_code: str = "en" - ) -> 'Key': + cls, + suri: str, + ss58_format: int = SS58_FORMAT, + crypto_type: Union[str, int] = KeyType.SR25519, + language_code: str = "en", + ) -> "Key": + """ Creates Key for specified suri in following format: `[mnemonic]/[soft-path]//[hard-path]` @@ -877,78 +610,31 @@ def create_from_uri( ------- Key """ - crypto_type = cls.resolve_crypto_type(crypto_type) - suri = str(suri) - if not suri.startswith('//'): - suri = '//' + suri - - if suri and suri.startswith('/'): - suri = DEV_PHRASE + suri - - suri_regex = re.match(r'^(?P.[^/]+( .[^/]+)*)(?P(//?[^/]+)*)(///(?P.*))?$', suri) - - suri_parts = suri_regex.groupdict() - - if crypto_type == KeyType.ECDSA: - if language_code != "en": - raise ValueError("ECDSA mnemonic only supports english") - print(suri_parts) - private_key = mnemonic_to_ecdsa_private_key( - mnemonic=suri_parts['phrase'], - str_derivation_path=suri_parts['path'], - passphrase=suri_parts['password'] - ) - derived_keypair = cls.create_from_private_key(private_key, ss58_format=ss58_format, crypto_type=crypto_type) + if crypto_type == KeyType.SR25519: + from commune.key.types.dot.sr25519 import DotSR25519 + return DotSR25519.create_from_uri(suri=suri, ss58_format=ss58_format, language_code=language_code) + elif crypto_type == KeyType.ED25519: + from commune.key.types.dot.ed25519 import DotED25519 + return DotED25519.create_from_uri(suri=suri, ss58_format=ss58_format, language_code=language_code) + elif crypto_type == KeyType.ECDSA: + from commune.key.types.eth import ECDSA + return ECDSA.create_from_uri(suri=suri, ss58_format=ss58_format, language_code=language_code) + elif crypto_type == KeyType.SOLANA: + from commune.key.types.sol import Solana + return Solana.create_from_uri(suri=suri, ss58_format=ss58_format, language_code=language_code) else: + raise NotImplementedError("create_from_uri not implemented") - if suri_parts['password']: - raise NotImplementedError(f"Passwords in suri not supported for crypto_type '{crypto_type}'") - - derived_keypair = cls.create_from_mnemonic( - suri_parts['phrase'], ss58_format=ss58_format, crypto_type=crypto_type, language_code=language_code - ) - - if suri_parts['path'] != '': - - derived_keypair.derive_path = suri_parts['path'] - if crypto_type not in [KeyType.SR25519]: - raise NotImplementedError('Derivation paths for this crypto type not supported') - - derive_junctions = extract_derive_path(suri_parts['path']) - - child_pubkey = derived_keypair.public_key - child_privkey = derived_keypair.private_key - - for junction in derive_junctions: - - if junction.is_hard: - - _, child_pubkey, child_privkey = sr25519.hard_derive_keypair( - (junction.chain_code, child_pubkey, child_privkey), - b'' - ) - - else: - - _, child_pubkey, child_privkey = sr25519.derive_keypair( - (junction.chain_code, child_pubkey, child_privkey), - b'' - ) - - derived_keypair = Key(public_key=child_pubkey, private_key=child_privkey, ss58_format=ss58_format) - - return derived_keypair - from_mnem = from_mnemonic = create_from_mnemonic @classmethod def create_from_private_key( - cls, - private_key: Union[bytes, str], - public_key: Union[bytes, str] = None, - ss58_address: str = None, - ss58_format: int = ss58_format, - crypto_type: int = KeyType.SR25519 - ) -> 'Key': + cls, + private_key: Union[bytes, str], + public_key: Union[bytes, str] = None, + ss58_address: str = None, + ss58_format: int = SS58_FORMAT, + crypto_type: Union[str, int] = KeyType.SR25519, + ) -> "Key": """ Creates Key for specified public/private keys Parameters @@ -963,18 +649,20 @@ def create_from_private_key( ------- Key """ - - return cls(ss58_address=ss58_address, - public_key=public_key, - private_key=private_key, - ss58_format=ss58_format, - crypto_type=crypto_type + return cls( + private_key=private_key, + public_key=public_key, + crypto_type=crypto_type, + ss58_format=ss58_format, + ss58_address=ss58_address ) + from_private_key = create_from_private_key @classmethod - def create_from_encrypted_json(cls, json_data: Union[str, dict], passphrase: str, - ss58_format: int = None) -> 'Key': + def create_from_encrypted_json( + cls, json_data: Union[str, dict], passphrase: str, ss58_format: int = None + ) -> "Key": """ Create a Key from a PolkadotJS format encrypted JSON file @@ -989,11 +677,8 @@ def create_from_encrypted_json(cls, json_data: Union[str, dict], passphrase: str Key """ - crypto_type = cls.resolve_crypto_type(crypto_type) - if type(json_data) is str: json_data = json.loads(json_data) - private_key, public_key = decode_pair_from_encrypted_json(json_data, passphrase) if 'sr25519' in json_data['encoding']['content']: @@ -1002,8 +687,12 @@ def create_from_encrypted_json(cls, json_data: Union[str, dict], passphrase: str crypto_type = KeyType.ED25519 # Strip the nonce part of the private key private_key = private_key[0:32] + elif 'solana' in json_data['encoding']['content']: + crypto_type = KeyType.SOLANA + private_key = private_key[0:32] else: raise NotImplementedError("Unknown KeyType found in JSON") + if ss58_format is None and 'address' in json_data: ss58_format = get_ss58_format(json_data['address']) @@ -1023,33 +712,11 @@ def export_to_encrypted_json(self, passphrase: str, name: str = None) -> dict: ------- dict """ - if not name: - name = self.ss58_address - - if self.crypto_type != KeyType.SR25519: - raise NotImplementedError(f"Cannot create JSON for crypto_type '{self.crypto_type}'") - - # Secret key from PolkadotJS is an Ed25519 expanded secret key, so has to be converted - # https://github.com/polkadot-js/wasm/blob/master/packages/wasm-crypto/src/rs/sr25519.rs#L125 - converted_private_key = sr25519.convert_secret_key_to_ed25519(self.private_key) + raise NotImplementedError("export_to_encrypted_json not implemented") - encoded = encode_pair(self.public_key, converted_private_key, passphrase) - - json_data = { - "encoded": b64encode(encoded).decode(), - "encoding": {"content": ["pkcs8", "sr25519"], "type": ["scrypt", "xsalsa20-poly1305"], "version": "3"}, - "address": self.ss58_address, - "meta": { - "name": name, "tags": [], "whenCreated": int(time.time()) - } - } - - return json_data - seperator = "::signature=" - - def sign(self, data: Union[ScaleBytes, bytes, str], to_json = False) -> bytes: + def sign(self, data: Union[ScaleBytes, bytes, str], to_json=False) -> bytes: """ Creates a signature for given data Parameters @@ -1060,37 +727,12 @@ def sign(self, data: Union[ScaleBytes, bytes, str], to_json = False) -> bytes: signature in bytes """ - if not isinstance(data, str): - data = c.python2str(data) - if type(data) is ScaleBytes: - data = bytes(data.data) - elif data[0:2] == '0x': - data = bytes.fromhex(data[2:]) - elif type(data) is str: - data = data.encode() - if not self.private_key: - raise Exception('No private key set to create signatures') - if self.crypto_type == KeyType.SR25519: - signature = sr25519.sign((self.public_key, self.private_key), data) - elif self.crypto_type == KeyType.ED25519: - signature = ed25519_zebra.ed_sign(self.private_key, data) - elif self.crypto_type == KeyType.ECDSA: - signature = ecdsa_sign(self.private_key, data) - else: - raise Exception("Crypto type not supported") - - if to_json: - return {'data':data.decode(), - 'crypto_type':self.crypto_type, - 'signature':signature.hex(), - 'address': self.ss58_address,} - return signature - + raise NotImplementedError("sign not implemented") @classmethod - def bytes2str(cls, data: bytes, mode: str = 'utf-8') -> str: - - if hasattr(data, 'hex'): + def bytes2str(cls, data: bytes, mode: str = "utf-8") -> str: + if hasattr(data, "hex"): + return data.hex() else: if isinstance(data, str): @@ -1098,10 +740,10 @@ def bytes2str(cls, data: bytes, mode: str = 'utf-8') -> str: return bytes.decode(data, mode) @classmethod - def python2str(cls, input): + def python2str(cls, input): from copy import deepcopy import json - + input = deepcopy(input) input_type = type(input) if input_type == str: @@ -1116,16 +758,17 @@ def python2str(cls, input): input = str(input) return input - def verify(self, - data: Union[ScaleBytes, bytes, str, dict], - signature: Union[bytes, str] = None, - public_key:Optional[str]= None, - return_address = False, - ss58_format = ss58_format, - max_age = None, - address = None, - **kwargs - ) -> bool: + def verify( + self, + data: Union[ScaleBytes, bytes, str, dict], + signature: Union[bytes, str], + public_key: Optional[str], + return_address, + ss58_format, + max_age, + address, + **kwargs, + ) -> bool: """ Verifies data with specified signature @@ -1139,80 +782,20 @@ def verify(self, ------- True if data is signed with this Key, otherwise False """ - data = c.copy(data) - - if isinstance(data, dict): - if self.is_ticket(data): - address = data.pop('address') - signature = data.pop('signature') - elif 'data' in data and 'signature' in data and 'address' in data: - signature = data.pop('signature') - address = data.pop('address', address) - data = data.pop('data') - else: - assert signature != None, 'signature not found in data' - assert address != None, 'address not found in data' - - if max_age != None: - if isinstance(data, int): - staleness = c.timestamp() - int(data) - elif 'timestamp' in data or 'time' in data: - timestamp = data.get('timestamp', data.get('time')) - staleness = c.timestamp() - int(timestamp) - else: - raise ValueError('data should be a timestamp or a dict with a timestamp key') - assert staleness < max_age, f'data is too old, {staleness} seconds old, max_age is {max_age}' - - if not isinstance(data, str): - data = c.python2str(data) - if address != None: - if self.valid_ss58_address(address): - public_key = ss58_decode(address) - if public_key == None: - public_key = self.public_key - if isinstance(public_key, str): - public_key = bytes.fromhex(public_key.replace('0x', '')) - if type(data) is ScaleBytes: - data = bytes(data.data) - elif data[0:2] == '0x': - data = bytes.fromhex(data[2:]) - elif type(data) is str: - data = data.encode() - if type(signature) is str and signature[0:2] == '0x': - signature = bytes.fromhex(signature[2:]) - elif type(signature) is str: - signature = bytes.fromhex(signature) - if type(signature) is not bytes: - raise TypeError("Signature should be of type bytes or a hex-string") - - - if self.crypto_type == KeyType.SR25519: - crypto_verify_fn = sr25519.verify - elif self.crypto_type == KeyType.ED25519: - crypto_verify_fn = ed25519_zebra.ed_verify - elif self.crypto_type == KeyType.ECDSA: - crypto_verify_fn = ecdsa_verify - else: - raise Exception("Crypto type not supported") - verified = crypto_verify_fn(signature, data, public_key) - if not verified: - # Another attempt with the data wrapped, as discussed in https://github.com/polkadot-js/extension/pull/743 - # Note: As Python apps are trusted sources on its own, no need to wrap data when signing from this lib - verified = crypto_verify_fn(signature, b'' + data + b'', public_key) - if return_address: - return ss58_encode(public_key, ss58_format=ss58_format) - return verified + raise NotImplementedError("verify not implemented") def is_ticket(self, data): - return all([k in data for k in ['data','signature', 'address', 'crypto_type']]) and any([k in data for k in ['time', 'timestamp']]) + return all( + [k in data for k in ["data", "signature", "address", "crypto_type"]] + ) and any([k in data for k in ["time", "timestamp"]]) - def resolve_encryption_password(self, password:str=None) -> str: + def resolve_encryption_password(self, password: str = None) -> str: if password == None: password = self.private_key if isinstance(password, str): password = password.encode() return hashlib.sha256(password).digest() - + def resolve_encryption_data(self, data): if not isinstance(data, str): data = str(data) @@ -1221,25 +804,28 @@ def resolve_encryption_data(self, data): def encrypt(self, data, password=None): data = self.resolve_encryption_data(data) password = self.resolve_encryption_password(password) - data = data + (AES.block_size - len(data) % AES.block_size) * chr(AES.block_size - len(data) % AES.block_size) + data = data + (AES.block_size - len(data) % AES.block_size) * chr( + AES.block_size - len(data) % AES.block_size + ) iv = Random.new().read(AES.block_size) cipher = AES.new(password, AES.MODE_CBC, iv) encrypted_bytes = base64.b64encode(iv + cipher.encrypt(data.encode())) - return encrypted_bytes.decode() + return encrypted_bytes.decode() - def decrypt(self, data, password=None): + def decrypt(self, data, password=None): password = self.resolve_encryption_password(password) data = base64.b64decode(data) - iv = data[:AES.block_size] + iv = data[: AES.block_size] cipher = AES.new(password, AES.MODE_CBC, iv) - data = cipher.decrypt(data[AES.block_size:]) - data = data[:-ord(data[len(data)-1:])].decode('utf-8') + data = cipher.decrypt(data[AES.block_size :]) + data = data[: -ord(data[len(data) - 1 :])].decode("utf-8") return data def encrypt_message( - self, - message: Union[bytes, str], - recipient_public_key: bytes, + self, + message: Union[bytes, str], + recipient_public_key: bytes, + nonce: bytes = secrets.token_bytes(24), ) -> bytes: """ @@ -1256,17 +842,25 @@ def encrypt_message( Encrypted message """ if not self.private_key: - raise Exception('No private key set to encrypt') + raise Exception("No private key set to encrypt") if self.crypto_type != KeyType.ED25519: - raise Exception('Only ed25519 keypair type supported') - curve25519_public_key = nacl.bindings.crypto_sign_ed25519_pk_to_curve25519(recipient_public_key) + raise Exception("Only ed25519 keypair type supported") + curve25519_public_key = nacl.bindings.crypto_sign_ed25519_pk_to_curve25519( + recipient_public_key + ) recipient = nacl.public.PublicKey(curve25519_public_key) - private_key = nacl.bindings.crypto_sign_ed25519_sk_to_curve25519(self.private_key + self.public_key) + private_key = nacl.bindings.crypto_sign_ed25519_sk_to_curve25519( + self.private_key + self.public_key + ) sender = nacl.public.PrivateKey(private_key) box = nacl.public.Box(sender, recipient) - return box.encrypt(message if isinstance(message, bytes) else message.encode("utf-8"), nonce) + return box.encrypt( + message if isinstance(message, bytes) else message.encode("utf-8"), nonce + ) - def decrypt_message(self, encrypted_message_with_nonce: bytes, sender_public_key: bytes) -> bytes: + def decrypt_message( + self, encrypted_message_with_nonce: bytes, sender_public_key: bytes + ) -> bytes: """ Decrypts message from a specified sender @@ -1281,76 +875,85 @@ def decrypt_message(self, encrypted_message_with_nonce: bytes, sender_public_key """ if not self.private_key: - raise Exception('No private key set to decrypt') + raise Exception("No private key set to decrypt") if self.crypto_type != KeyType.ED25519: - raise Exception('Only ed25519 keypair type supported') - private_key = nacl.bindings.crypto_sign_ed25519_sk_to_curve25519(self.private_key + self.public_key) + raise Exception("Only ed25519 keypair type supported") + private_key = nacl.bindings.crypto_sign_ed25519_sk_to_curve25519( + self.private_key + self.public_key + ) recipient = nacl.public.PrivateKey(private_key) - curve25519_public_key = nacl.bindings.crypto_sign_ed25519_pk_to_curve25519(sender_public_key) + curve25519_public_key = nacl.bindings.crypto_sign_ed25519_pk_to_curve25519( + sender_public_key + ) sender = nacl.public.PublicKey(curve25519_public_key) return nacl.public.Box(recipient, sender).decrypt(encrypted_message_with_nonce) - encrypted_prefix = 'ENCRYPTED::' + encrypted_prefix = "ENCRYPTED::" @classmethod - def encrypt_key(cls, path = 'test.enc', password=None): - assert cls.key_exists(path), f'file {path} does not exist' - assert not cls.is_key_encrypted(path), f'{path} already encrypted' + def encrypt_key(cls, path="test.enc", password=None): + assert cls.key_exists(path), f"file {path} does not exist" + assert not cls.is_key_encrypted(path), f"{path} already encrypted" data = cls.get(path) - enc_text = {'data': c.encrypt(data, password=password), - 'encrypted': True} + enc_text = {"data": c.encrypt(data, password=password), "encrypted": True} cls.put(path, enc_text) - return {'number_of_characters_encrypted':len(enc_text), 'path':path } - + return {"number_of_characters_encrypted": len(enc_text), "path": path} + + @classmethod def is_key_encrypted(cls, key, data=None): data = data or cls.get(key) return cls.is_encrypted(data) - + @classmethod - def decrypt_key(cls, path = 'test.enc', password=None, key=None): - assert cls.key_exists(path), f'file {path} does not exist' - assert cls.is_key_encrypted(path), f'{path} not encrypted' + def decrypt_key(cls, path="test.enc", password=None, key=None): + assert cls.key_exists(path), f"file {path} does not exist" + assert cls.is_key_encrypted(path), f"{path} not encrypted" data = cls.get(path) - assert cls.is_encrypted(data), f'{path} not encrypted' - dec_text = c.decrypt(data['data'], password=password) + assert cls.is_encrypted(data), f"{path} not encrypted" + dec_text = c.decrypt(data["data"], password=password) cls.put(path, dec_text) - assert not cls.is_key_encrypted(path), f'failed to decrypt {path}' + assert not cls.is_key_encrypted(path), f"failed to decrypt {path}" loaded_key = c.get_key(path) - return { 'path':path , - 'key_address': loaded_key.ss58_address, - 'crypto_type': loaded_key.crypto_type} + return { + "path": path, + "key_address": loaded_key.ss58_address, + "crypto_type": loaded_key.crypto_type, + } @classmethod def get_mnemonic(cls, key): return cls.get_key(key).mnemonic def __str__(self): - return f'' - + return ( + f"" + ) + def save(self, path=None): if path == None: path = self.path c.put_json(path, self.to_json()) - return {'saved':path} - + return {"saved": path} + def __repr__(self): return self.__str__() - + @classmethod - def from_private_key(cls, private_key:str): + def from_private_key(cls, private_key: str): return cls(private_key=private_key) @classmethod - def valid_ss58_address(cls, address: str, ss58_format=ss58_format ) -> bool: + def valid_ss58_address(cls, address: str, ss58_format:int = SS58_FORMAT) -> bool: + """ Checks if the given address is a valid ss58 address. """ try: - return is_valid_ss58_address( address , valid_ss58_format =ss58_format ) - except Exception as e: + return is_valid_ss58_address(address, valid_ss58_format=ss58_format) + except Exception: return False - + @classmethod def is_encrypted(cls, data): if isinstance(data, str): @@ -1359,24 +962,20 @@ def is_encrypted(cls, data): else: try: data = json.loads(data) - except: + except: return False if isinstance(data, dict): - return bool(data.get('encrypted', False)) + return bool(data.get("encrypted", False)) else: return False - + @staticmethod def ss58_encode(*args, **kwargs): return ss58_encode(*args, **kwargs) - + @staticmethod def ss58_decode(*args, **kwargs): return ss58_decode(*args, **kwargs) - - @classmethod - def get_key_address(cls, key): - return cls.get_key(key).ss58_address @classmethod def resolve_key_address(cls, key): @@ -1386,37 +985,59 @@ def resolve_key_address(cls, key): else: address = key return address - + @classmethod def valid_h160_address(cls, address): # Check if it starts with '0x' - if not address.startswith('0x'): + if not address.startswith("0x"): return False - + # Remove '0x' prefix address = address[2:] - + # Check length if len(address) != 40: return False - + # Check if it contains only valid hex characters - if not re.match('^[0-9a-fA-F]{40}$', address): + if not re.match("^[0-9a-fA-F]{40}$", address): return False - + return True - def storage_migration(self): + def storage_migration(self): + key2path = self.key2path() new_key2path = {} for k_name, k_path in key2path.items(): try: key = c.get_key(k_name) - new_k_path = '/'.join(k_path.split('/')[:-1]) + '/' + f'{k_name}_address={key.ss58_address}_type={key.crypto_type}.json' + new_k_path = ( + "/".join(k_path.split("/")[:-1]) + + "/" + + f"{k_name}_address={key.ss58_address}_type={key.crypto_type}.json" + ) new_key2path[k_name] = new_k_path except Exception as e: - c.print(f'failed to migrate {k_name} due to {e}', color='red') - + c.print(f"failed to migrate {k_name} due to {e}", color="red") + + return new_key2path + + def storage_migration(self): + key2path = self.key2path() + new_key2path = {} + for k_name, k_path in key2path.items(): + try: + key = c.get_key(k_name) + new_k_path = ( + "/".join(k_path.split("/")[:-1]) + + "/" + + f"{k_name}_address={key.ss58_address}_type={key.crypto_type}.json" + ) + new_key2path[k_name] = new_k_path + except Exception as e: + c.print(f"failed to migrate {k_name} due to {e}", color="red") + return new_key2path @classmethod @@ -1424,13 +1045,3 @@ def test(cls): from .test import Test return Test().test() - -# if __name__ == "__main__": -# Key.run() - - - - - - - diff --git a/commune/key/test_key.py b/commune/key/test_key.py new file mode 100644 index 000000000..ebe2aed98 --- /dev/null +++ b/commune/key/test_key.py @@ -0,0 +1,109 @@ +import pytest +import commune as c + +crypto_types = ['ecdsa', 'solana', 'sr25519', 'ed25519'] + +@pytest.mark.parametrize('crypto_type', crypto_types) +def test_encryption(crypto_type, values = [10, 'fam', 'hello world']): + cls = c.module('key') + for value in values: + value = str(value) + key = cls.new_key(crypto_type=crypto_type) + enc = key.encrypt(value) + dec = key.decrypt(enc) + assert dec == value, f'encryption failed, {dec} != {value}' + return {'encrypted':enc, 'decrypted': dec} + +@pytest.mark.parametrize('crypto_type', crypto_types) +def test_encryption_with_password(crypto_type, value = 10, password = 'fam'): + cls = c.module('key') + value = str(value) + key = cls.new_key(crypto_type=crypto_type) + enc = key.encrypt(value, password=password) + dec = key.decrypt(enc, password=password) + assert dec == value, f'encryption failed, {dec} != {value}' + return {'encrypted':enc, 'decrypted': dec} + +@pytest.mark.parametrize('crypto_type', crypto_types) +def test_key_encryption(crypto_type, test_key='test.key'): + self = c.module('key') + key = self.add_key(test_key, refresh=True, crypto_type=crypto_type) + og_key = self.get_key(test_key, crypto_type=crypto_type) + r = self.encrypt_key(test_key) + self.decrypt_key(test_key, password=r['password']) + key = self.get_key(test_key, crypto_type=crypto_type) + assert key.ss58_address == og_key.ss58_address, f'key encryption failed, {key.ss58_address} != {self.ss58_address}' + return {'success': True, 'msg': 'test_key_encryption passed'} + +@pytest.mark.parametrize('crypto_type', crypto_types) +def test_key_management(crypto_type, key1='test.key' , key2='test2.key'): + key = c.module('key') + if key.key_exists(key1): + key.rm_key(key1) + if key.key_exists(key2): + key.rm_key(key2) + key.add_key(key1, crypto_type=crypto_type) + k1 = key.get_key(key1, crypto_type=crypto_type) + assert key.key_exists(key1), f'Key management failed, key still exists' + key.mv_key(key1, key2, crypto_type=crypto_type) + k2 = key.get_key(key2, crypto_type=crypto_type) + assert k1.ss58_address == k2.ss58_address, f'Key management failed, {k1.ss58_address} != {k2.ss58_address}' + assert key.key_exists(key2), f'Key management failed, key does not exist' + assert not key.key_exists(key1), f'Key management failed, key still exists' + key.mv_key(key2, key1, crypto_type=crypto_type) + assert key.key_exists(key1), f'Key management failed, key does not exist' + assert not key.key_exists(key2), f'Key management failed, key still exists' + key.rm_key(key1) + # key.rm_key(key2) + assert not key.key_exists(key1), f'Key management failed, key still exists' + assert not key.key_exists(key2), f'Key management failed, key still exists' + return {'success': True, 'msg': 'test_key_management passed'} + +@pytest.mark.parametrize('crypto_type', crypto_types) +def test_signing(crypto_type,): + self = c.module('key')(crypto_type=crypto_type) + sig = self.sign('test') + assert self.verify('test',sig, self.public_key) + return {'success':True} + +@pytest.mark.parametrize('crypto_type', crypto_types) +def test_key_encryption(crypto_type, password='1234'): + cls = c.module('key') + path = 'test.enc' + cls.add_key('test.enc', refresh=True, crypto_type=crypto_type) + assert cls.is_key_encrypted(path) == False, f'file {path} is encrypted' + cls.encrypt_key(path, password=password) + assert cls.is_key_encrypted(path) == True, f'file {path} is not encrypted' + cls.decrypt_key(path, password=password) + assert cls.is_key_encrypted(path) == False, f'file {path} is encrypted' + cls.rm(path) + print('file deleted', path, c.exists, 'fam') + assert not c.exists(path), f'file {path} not deleted' + return {'success': True, 'msg': 'test_key_encryption passed'} + +@pytest.mark.parametrize('crypto_type', crypto_types) +def test_move_key(crypto_type): + self = c.module('key')(crypto_type=crypto_type) + self.add_key('testfrom', crypto_type=crypto_type) + assert self.key_exists('testfrom') + og_key = self.get_key('testfrom', crypto_type=crypto_type) + self.mv_key('testfrom', 'testto', crypto_type=crypto_type) + assert self.key_exists('testto', crypto_type=crypto_type) + assert not self.key_exists('testfrom') + new_key = self.get_key('testto', crypto_type=crypto_type) + assert og_key.ss58_address == new_key.ss58_address + self.rm_key('testto') + assert not self.key_exists('testto') + return {'success':True, 'msg':'test_move_key passed', 'key':new_key.ss58_address} + + +def test_ss58_encoding(): + self = c.module('key') + keypair = self.create_from_uri('//Alice') + ss58_address = keypair.ss58_address + public_key = keypair.public_key + assert keypair.ss58_address == self.ss58_encode(public_key, ss58_format=42) + assert keypair.ss58_address == self.ss58_encode(public_key, ss58_format=42) + assert keypair.public_key.hex() == self.ss58_decode(ss58_address) + assert keypair.public_key.hex() == self.ss58_decode(ss58_address) + return {'success':True} diff --git a/commune/key/types/__init__.py b/commune/key/types/__init__.py new file mode 100644 index 000000000..fda498480 --- /dev/null +++ b/commune/key/types/__init__.py @@ -0,0 +1,6 @@ +from .dot.ed25519 import DotED25519 +from .dot.sr25519 import DotSR25519 +from .eth import ECDSA +from .sol import Solana + +__all__ = ["DotED25519", "DotSR25519", "ECDSA", "Solana"] \ No newline at end of file diff --git a/commune/key/types/dot/ed25519.py b/commune/key/types/dot/ed25519.py new file mode 100644 index 000000000..0a2e8c04a --- /dev/null +++ b/commune/key/types/dot/ed25519.py @@ -0,0 +1,348 @@ +import ed25519_zebra +import sr25519 +import commune as c + +from commune.key.constants import * +from commune.key.utils import * + +import binascii +import re +import nacl.public +from scalecodec.base import ScaleBytes +from bip39 import bip39_to_mini_secret +from scalecodec.utils.ss58 import ( + ss58_decode, + ss58_encode +) +from typing import Union, Optional +from commune.key.key import Key, KeyType +class DotED25519(Key): + def __init__( + self, + private_key: Union[bytes, str] = None, + ss58_format: int = SS58_FORMAT, + derive_path: str = None, + path: str = None, + crypto_type: Union[str, int] = KeyType.ED25519, + **kwargs + ): + self.crypto_type = KeyType.ED25519 + self.set_private_key(private_key=private_key, ss58_format = ss58_format, derive_path=derive_path, path=path, **kwargs) + + def set_private_key( + self, + private_key: Union[bytes, str] = None, + ss58_format: int = 42, + derive_path: str = None, + path: str = None, + **kwargs, + ): + """ + Allows generation of Keys from a variety of input combination, such as a public/private key combination, + mnemonic or URI containing soft and hard derivation paths. With these Keys data can be signed and verified + + Parameters + ---------- + ss58_address: Substrate address + public_key: hex string or bytes of public_key key + private_key: hex string or bytes of private key + ss58_format: Substrate address format, default to 42 when omitted + seed_hex: hex string of seed + """ + # If no arguments are provided, generate a random keypair + + + if private_key == None: + private_key = self.new_key(crypto_type=self.crypto_type).private_key + if type(private_key) == str: + private_key = c.str2bytes(private_key) + if self.crypto_type == KeyType.ED25519: + private_key = private_key[:32] if len(private_key) == 64 else private_key + private_key, public_key = ed25519_zebra.ed_from_seed(private_key) + key_address = ss58_encode(public_key, ss58_format=ss58_format) + hash_type = 'ss58' + else: + raise ValueError('crypto_type "{}" not supported'.format(self.crypto_type)) + if type(public_key) is str: + public_key = bytes.fromhex(public_key.replace("0x", "")) + + self.hash_type = hash_type + self.public_key = public_key + self.address = self.key_address = self.ss58_address = key_address + self.private_key = private_key + self.derive_path = derive_path + self.path = path + self.ss58_format = ss58_format + self.key_address = self.ss58_address + self.key_type = self.crypto_type2name(self.crypto_type) + return {"key_address": key_address, "crypto_type": self.crypto_type} + + + @classmethod + def create_from_mnemonic( + cls, mnemonic: str = None, ss58_format=SS58_FORMAT, language_code: str = "en", crypto_type=KeyType.ED25519 + ) -> "DotED25519": + """ + Create a Key for given memonic + + Parameters + ---------- + mnemonic: Seed phrase + ss58_format: Substrate address format + language_code: The language to use, valid values are: 'en', 'zh-hans', 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` + + Returns + ------- + Key + """ + if not mnemonic: + mnemonic = cls.generate_mnemonic(language_code=language_code) + + keypair = cls.create_from_seed( + seed_hex=binascii.hexlify( + bytearray(bip39_to_mini_secret(mnemonic, "", language_code)) + ).decode("ascii"), + ss58_format=ss58_format, + ) + + keypair.mnemonic = mnemonic + + return keypair + + @classmethod + def create_from_seed( + cls, seed_hex: Union[bytes, str], ss58_format: Optional[int] = SS58_FORMAT + ) -> "DotED25519": + """ + Create a Key for given seed + + Parameters + ---------- + seed_hex: hex string of seed + ss58_format: Substrate address format + + Returns + ------- + Key + """ + if type(seed_hex) is str: + seed_hex = bytes.fromhex(seed_hex.replace("0x", "")) + private_key, public_key = ed25519_zebra.ed_from_seed(seed_hex) + ss58_address = ss58_encode(public_key, ss58_format) + kwargs = dict( + ss58_address=ss58_address, + public_key=public_key, + private_key=private_key, + ss58_format=ss58_format, + crypto_type=KeyType.ED25519, + ) + + return cls(**kwargs) + + @classmethod + def create_from_uri( + cls, + suri: str, + ss58_format: Optional[int] = SS58_FORMAT, + language_code: str = "en", + crypto_type: Union[str, int] = KeyType.ED25519, + ) -> "DotED25519": + """ + Creates Key for specified suri in following format: `[mnemonic]/[soft-path]//[hard-path]` + + Parameters + ---------- + suri: + ss58_format: Substrate address format + language_code: The language to use, valid values are: 'en', 'zh-hans', 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` + + Returns + ------- + DotED25519 + """ + crypto_type = cls.resolve_crypto_type(crypto_type) + suri = str(suri) + if not suri.startswith("//"): + suri = "//" + suri + + if suri and suri.startswith("/"): + suri = DEV_PHRASE + suri + + suri_regex = re.match( + r"^(?P.[^/]+( .[^/]+)*)(?P(//?[^/]+)*)(///(?P.*))?$", + suri, + ) + + suri_parts = suri_regex.groupdict() + + if suri_parts["password"]: + raise NotImplementedError( + f"Passwords in suri not supported for crypto_type '{KeyType.ED25519}'" + ) + + derived_keypair = cls.create_from_mnemonic( + suri_parts["phrase"], ss58_format=ss58_format, language_code=language_code + ) + + if suri_parts["path"] != "": + derived_keypair.derive_path = suri_parts["path"] + + derive_junctions = extract_derive_path(suri_parts["path"]) + + child_pubkey = derived_keypair.public_key + child_privkey = derived_keypair.private_key + + for junction in derive_junctions: + if junction.is_hard: + _, child_pubkey, child_privkey = sr25519.hard_derive_keypair( + (junction.chain_code, child_pubkey, child_privkey), b"" + ) + + else: + _, child_pubkey, child_privkey = sr25519.derive_keypair( + (junction.chain_code, child_pubkey, child_privkey), b"" + ) + + derived_keypair = DotED25519( + public_key=child_pubkey, + private_key=child_privkey, + ss58_format=ss58_format, + ) + + return derived_keypair + + def export_to_encrypted_json(self, passphrase: str, name: str = None) -> dict: + """ + Export Key to PolkadotJS format encrypted JSON file + + Parameters + ---------- + passphrase: Used to encrypt the keypair + name: Display name of Key used + + Returns + ------- + dict + """ + raise NotImplementedError(f"Cannot create JSON for crypto_type '{self.crypto_type}'") + + seperator = "::signature=" + + def sign(self, data: Union[ScaleBytes, bytes, str], to_json=False) -> bytes: + """ + Creates a signature for given data + Parameters + ---------- + data: data to sign in `Scalebytes`, bytes or hex string format + Returns + ------- + signature in bytes + + """ + if not isinstance(data, str): + data = c.python2str(data) + if type(data) is ScaleBytes: + data = bytes(data.data) + elif data[0:2] == "0x": + data = bytes.fromhex(data[2:]) + elif type(data) is str: + data = data.encode() + if not self.private_key: + raise Exception("No private key set to create signatures") + signature = ed25519_zebra.ed_sign(self.private_key, data) + + if to_json: + return { + "data": data.decode(), + "crypto_type": self.crypto_type, + "signature": signature.hex(), + "address": self.ss58_address, + } + return signature + + def verify( + self, + data: Union[ScaleBytes, bytes, str, dict], + signature: Union[bytes, str] = None, + public_key: Optional[str] = None, + return_address=False, + ss58_format=SS58_FORMAT, + max_age=None, + address=None, + **kwargs, + ) -> bool: + """ + Verifies data with specified signature + + Parameters + ---------- + data: data to be verified in `Scalebytes`, bytes or hex string format + signature: signature in bytes or hex string format + public_key: public key in bytes or hex string format + + Returns + ------- + True if data is signed with this Key, otherwise False + """ + data = c.copy(data) + + if isinstance(data, dict): + if self.is_ticket(data): + address = data.pop("address") + signature = data.pop("signature") + elif "data" in data and "signature" in data and "address" in data: + signature = data.pop("signature") + address = data.pop("address", address) + data = data.pop("data") + else: + assert signature != None, "signature not found in data" + assert address != None, "address not found in data" + + if max_age != None: + if isinstance(data, int): + staleness = c.timestamp() - int(data) + elif "timestamp" in data or "time" in data: + timestamp = data.get("timestamp", data.get("time")) + staleness = c.timestamp() - int(timestamp) + else: + raise ValueError( + "data should be a timestamp or a dict with a timestamp key" + ) + assert staleness < max_age, ( + f"data is too old, {staleness} seconds old, max_age is {max_age}" + ) + + if not isinstance(data, str): + data = c.python2str(data) + if address != None: + if self.valid_ss58_address(address): + public_key = ss58_decode(address) + if public_key == None: + public_key = self.public_key + if isinstance(public_key, str): + public_key = bytes.fromhex(public_key.replace("0x", "")) + if type(data) is ScaleBytes: + data = bytes(data.data) + elif data[0:2] == "0x": + data = bytes.fromhex(data[2:]) + elif type(data) is str: + data = data.encode() + if type(signature) is str and signature[0:2] == "0x": + signature = bytes.fromhex(signature[2:]) + elif type(signature) is str: + signature = bytes.fromhex(signature) + if type(signature) is not bytes: + raise TypeError("Signature should be of type bytes or a hex-string") + + crypto_verify_fn = ed25519_zebra.ed_verify + + verified = crypto_verify_fn(signature, data, public_key) + if not verified: + # Another attempt with the data wrapped, as discussed in https://github.com/polkadot-js/extension/pull/743 + # Note: As Python apps are trusted sources on its own, no need to wrap data when signing from this lib + verified = crypto_verify_fn( + signature, b"" + data + b"", public_key + ) + if return_address: + return ss58_encode(public_key, ss58_format=ss58_format) + return verified diff --git a/commune/key/types/dot/sr25519.py b/commune/key/types/dot/sr25519.py new file mode 100644 index 000000000..eb77cd8cc --- /dev/null +++ b/commune/key/types/dot/sr25519.py @@ -0,0 +1,383 @@ +import sr25519 +import commune as c + +from commune.key.constants import * +from commune.key.utils import * + +import json + +import time +import os +import binascii +import re +import secrets +import base64 +from base64 import b64encode +import hashlib +from Crypto import Random +from Crypto.Cipher import AES +import nacl.bindings +import nacl.public +from scalecodec.base import ScaleBytes +from bip39 import bip39_to_mini_secret +from scalecodec.utils.ss58 import ( + ss58_decode, + ss58_encode, + is_valid_ss58_address, + get_ss58_format, +) +from typing import Union, Optional +from ...key import Key, KeyType +class DotSR25519(Key): + def __init__( + self, + private_key: Union[bytes, str] = None, + ss58_format: int = SS58_FORMAT, + derive_path: str = None, + path: str = None, + crypto_type: Union[str, int] = KeyType.SR25519, + **kwargs + ): + self.crypto_type = KeyType.SR25519 + self.set_private_key(private_key=private_key, ss58_format = ss58_format, derive_path=derive_path, path=path, **kwargs) + + def set_private_key( + self, + private_key: Union[bytes, str] = None, + ss58_format: int = 42, + derive_path: str = None, + path: str = None, + **kwargs, + ): + """ + Allows generation of Keys from a variety of input combination, such as a public/private key combination, + mnemonic or URI containing soft and hard derivation paths. With these Keys data can be signed and verified + + Parameters + ---------- + ss58_address: Substrate address + public_key: hex string or bytes of public_key key + private_key: hex string or bytes of private key + ss58_format: Substrate address format, default to 42 when omitted + seed_hex: hex string of seed + """ + # If no arguments are provided, generate a random keypair + if private_key == None: + private_key = self.new_key().private_key + if type(private_key) == str: + private_key = c.str2bytes(private_key) + if self.crypto_type == KeyType.SR25519: + if len(private_key) != 64: + private_key = sr25519.pair_from_seed(private_key)[1] + public_key = sr25519.public_from_secret_key(private_key) + key_address = ss58_encode(public_key, ss58_format=ss58_format) + hash_type = "ss58" + else: + raise ValueError('crypto_type "{}" not supported'.format(self.crypto_type)) + if type(public_key) is str: + public_key = bytes.fromhex(public_key.replace("0x", "")) + + self.hash_type = hash_type + self.public_key = public_key + self.address = self.key_address = self.ss58_address = key_address + self.private_key = private_key + self.derive_path = derive_path + self.path = path + self.ss58_format = ss58_format + self.key_address = self.ss58_address + self.key_type = self.crypto_type2name(self.crypto_type) + return {"key_address": key_address, "crypto_type": self.crypto_type} + + + @classmethod + def create_from_mnemonic( + cls, mnemonic: str = None, ss58_format=SS58_FORMAT, language_code: str = "en", crypto_type=KeyType.SR25519 + ) -> "DotSR25519": + """ + Create a Key for given memonic + + Parameters + ---------- + mnemonic: Seed phrase + ss58_format: Substrate address format + language_code: The language to use, valid values are: 'en', 'zh-hans', 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` + + Returns + ------- + Key + """ + if not mnemonic: + mnemonic = cls.generate_mnemonic(language_code=language_code) + + keypair = cls.create_from_seed( + seed_hex=binascii.hexlify( + bytearray(bip39_to_mini_secret(mnemonic, "", language_code)) + ).decode("ascii"), + ss58_format=ss58_format, + ) + + keypair.mnemonic = mnemonic + + return keypair + + from_mnemonic = from_mem = create_from_mnemonic + + @classmethod + def create_from_seed( + cls, seed_hex: Union[bytes, str], ss58_format: Optional[int] = SS58_FORMAT + ) -> "DotSR25519": + """ + Create a Key for given seed + + Parameters + ---------- + seed_hex: hex string of seed + ss58_format: Substrate address format + + Returns + ------- + Key + """ + if type(seed_hex) is str: + seed_hex = bytes.fromhex(seed_hex.replace("0x", "")) + public_key, private_key = sr25519.pair_from_seed(seed_hex) + + ss58_address = ss58_encode(public_key, ss58_format) + kwargs = dict( + ss58_address=ss58_address, + public_key=public_key, + private_key=private_key, + ss58_format=ss58_format, + crypto_type=KeyType.SR25519, + ) + + return cls(**kwargs) + + @classmethod + def create_from_uri( + cls, + suri: str, + ss58_format: Optional[int] = SS58_FORMAT, + language_code: str = "en", + crypto_type: Union[str, int] = KeyType.SR25519, + ) -> "DotSR25519": + """ + Creates Key for specified suri in following format: `[mnemonic]/[soft-path]//[hard-path]` + + Parameters + ---------- + suri: + ss58_format: Substrate address format + language_code: The language to use, valid values are: 'en', 'zh-hans', 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` + + Returns + ------- + Key + """ + crypto_type = cls.resolve_crypto_type(crypto_type) + suri = str(suri) + if not suri.startswith("//"): + suri = "//" + suri + + if suri and suri.startswith("/"): + suri = DEV_PHRASE + suri + + suri_regex = re.match( + r"^(?P.[^/]+( .[^/]+)*)(?P(//?[^/]+)*)(///(?P.*))?$", + suri, + ) + + suri_parts = suri_regex.groupdict() + + if suri_parts["password"]: + raise NotImplementedError( + f"Passwords in suri not supported for crypto_type '{KeyType.SR25519}'" + ) + + derived_keypair = cls.create_from_mnemonic( + suri_parts["phrase"], ss58_format=ss58_format, language_code=language_code + ) + + if suri_parts["path"] != "": + derived_keypair.derive_path = suri_parts["path"] + + derive_junctions = extract_derive_path(suri_parts["path"]) + + child_pubkey = derived_keypair.public_key + child_privkey = derived_keypair.private_key + + for junction in derive_junctions: + if junction.is_hard: + _, child_pubkey, child_privkey = sr25519.hard_derive_keypair( + (junction.chain_code, child_pubkey, child_privkey), b"" + ) + + else: + _, child_pubkey, child_privkey = sr25519.derive_keypair( + (junction.chain_code, child_pubkey, child_privkey), b"" + ) + + derived_keypair = DotSR25519( + public_key=child_pubkey, + private_key=child_privkey, + ss58_format=ss58_format, + ) + + return derived_keypair + + from_mnem = from_mnemonic = create_from_mnemonic + + def export_to_encrypted_json(self, passphrase: str, name: str = None) -> dict: + """ + Export Key to PolkadotJS format encrypted JSON file + + Parameters + ---------- + passphrase: Used to encrypt the keypair + name: Display name of Key used + + Returns + ------- + dict + """ + if not name: + name = self.ss58_address + + # Secret key from PolkadotJS is an Ed25519 expanded secret key, so has to be converted + # https://github.com/polkadot-js/wasm/blob/master/packages/wasm-crypto/src/rs/sr25519.rs#L125 + converted_private_key = sr25519.convert_secret_key_to_ed25519(self.private_key) + encoded = encode_pair(self.public_key, converted_private_key, passphrase) + encoding_content = ["pkcs8", "sr25519"] + + json_data = { + "encoded": b64encode(encoded).decode(), + "encoding": { + "content": encoding_content, + "type": ["scrypt", "xsalsa20-poly1305"], + "version": "3", + }, + "address": self.ss58_address, + "meta": {"name": name, "tags": [], "whenCreated": int(time.time())}, + } + return json_data + + seperator = "::signature=" + + def sign(self, data: Union[ScaleBytes, bytes, str], to_json=False) -> bytes: + """ + Creates a signature for given data + Parameters + ---------- + data: data to sign in `Scalebytes`, bytes or hex string format + Returns + ------- + signature in bytes + + """ + if not isinstance(data, str): + data = c.python2str(data) + if type(data) is ScaleBytes: + data = bytes(data.data) + elif data[0:2] == "0x": + data = bytes.fromhex(data[2:]) + elif type(data) is str: + data = data.encode() + if not self.private_key: + raise Exception("No private key set to create signatures") + signature = sr25519.sign((self.public_key, self.private_key), data) + + if to_json: + return { + "data": data.decode(), + "crypto_type": self.crypto_type, + "signature": signature.hex(), + "address": self.ss58_address, + } + return signature + + def verify( + self, + data: Union[ScaleBytes, bytes, str, dict], + signature: Union[bytes, str] = None, + public_key: Optional[str] = None, + return_address=False, + ss58_format=SS58_FORMAT, + max_age=None, + address=None, + **kwargs, + ) -> bool: + """ + Verifies data with specified signature + + Parameters + ---------- + data: data to be verified in `Scalebytes`, bytes or hex string format + signature: signature in bytes or hex string format + public_key: public key in bytes or hex string format + + Returns + ------- + True if data is signed with this Key, otherwise False + """ + data = c.copy(data) + + if isinstance(data, dict): + if self.is_ticket(data): + address = data.pop("address") + signature = data.pop("signature") + elif "data" in data and "signature" in data and "address" in data: + signature = data.pop("signature") + address = data.pop("address", address) + data = data.pop("data") + else: + assert signature != None, "signature not found in data" + assert address != None, "address not found in data" + + if max_age != None: + if isinstance(data, int): + staleness = c.timestamp() - int(data) + elif "timestamp" in data or "time" in data: + timestamp = data.get("timestamp", data.get("time")) + staleness = c.timestamp() - int(timestamp) + else: + raise ValueError( + "data should be a timestamp or a dict with a timestamp key" + ) + assert staleness < max_age, ( + f"data is too old, {staleness} seconds old, max_age is {max_age}" + ) + + if not isinstance(data, str): + data = c.python2str(data) + if address != None: + if self.valid_ss58_address(address): + public_key = ss58_decode(address) + if public_key == None: + public_key = self.public_key + if isinstance(public_key, str): + public_key = bytes.fromhex(public_key.replace("0x", "")) + if type(data) is ScaleBytes: + data = bytes(data.data) + elif data[0:2] == "0x": + data = bytes.fromhex(data[2:]) + elif type(data) is str: + data = data.encode() + if type(signature) is str and signature[0:2] == "0x": + signature = bytes.fromhex(signature[2:]) + elif type(signature) is str: + signature = bytes.fromhex(signature) + if type(signature) is not bytes: + raise TypeError("Signature should be of type bytes or a hex-string") + + crypto_verify_fn = sr25519.verify + + verified = crypto_verify_fn(signature, data, public_key) + if not verified: + # Another attempt with the data wrapped, as discussed in https://github.com/polkadot-js/extension/pull/743 + # Note: As Python apps are trusted sources on its own, no need to wrap data when signing from this lib + verified = crypto_verify_fn( + signature, b"" + data + b"", public_key + ) + if return_address: + return ss58_encode(public_key, ss58_format=ss58_format) + return verified diff --git a/commune/key/types/eth.py b/commune/key/types/eth.py new file mode 100644 index 000000000..28b9bbd60 --- /dev/null +++ b/commune/key/types/eth.py @@ -0,0 +1,331 @@ +import sr25519 +import commune as c + +from commune.key.constants import * +from commune.key.utils import * + +import json + +import time +import os +import binascii +import re +import secrets +import base64 +from base64 import b64encode +import hashlib +from Crypto import Random +from Crypto.Cipher import AES +import nacl.bindings +import nacl.public +from scalecodec.base import ScaleBytes +from bip39 import bip39_to_mini_secret +from scalecodec.utils.ss58 import ( + ss58_decode, + ss58_encode, + is_valid_ss58_address, + get_ss58_format, +) +from typing import Union, Optional +from ..key import Key, KeyType +class ECDSA(Key): + def __init__( + self, + private_key: Union[bytes, str] = None, + ss58_format: int = SS58_FORMAT, + derive_path: str = None, + path: str = None, + crypto_type: Union[str, int] = KeyType.ECDSA, + **kwargs + ): + self.crypto_type = KeyType.ECDSA + self.set_private_key(private_key=private_key, ss58_format = ss58_format, derive_path=derive_path, path=path, **kwargs) + + def set_private_key( + self, + private_key: Union[bytes, str] = None, + ss58_format: int = 42, + derive_path: str = None, + path: str = None, + **kwargs, + ): + """ + Allows generation of Keys from a variety of input combination, such as a public/private key combination, + mnemonic or URI containing soft and hard derivation paths. With these Keys data can be signed and verified + + Parameters + ---------- + ss58_address: Substrate address + public_key: hex string or bytes of public_key key + private_key: hex string or bytes of private key + ss58_format: Substrate address format, default to 42 when omitted + seed_hex: hex string of seed + """ + # If no arguments are provided, generate a random keypair + if private_key == None: + private_key = self.new_key(crypto_type=self.crypto_type).private_key + if type(private_key) == str: + private_key = c.str2bytes(private_key) + if self.crypto_type == KeyType.ECDSA: + private_key = private_key[0:32] + private_key_obj = PrivateKey(private_key) + public_key = private_key_obj.public_key.to_address() + key_address = private_key_obj.public_key.to_checksum_address() + hash_type = 'h160' + else: + raise ValueError('crypto_type "{}" not supported'.format(self.crypto_type)) + if type(public_key) is str: + public_key = bytes.fromhex(public_key.replace("0x", "")) + + self.hash_type = hash_type + self.public_key = public_key + self.address = self.key_address = self.ss58_address = key_address + self.private_key = private_key + self.derive_path = derive_path + self.path = path + self.ss58_format = ss58_format + self.key_address = self.ss58_address + self.key_type = self.crypto_type2name(self.crypto_type) + return {"key_address": key_address, "crypto_type": self.crypto_type} + + + @classmethod + def create_from_mnemonic( + cls, mnemonic: str = None, ss58_format=SS58_FORMAT, language_code: str = "en", crypto_type=KeyType.ECDSA + ) -> "ECDSA": + """ + Create a Key for given memonic + + Parameters + ---------- + mnemonic: Seed phrase + ss58_format: Substrate address format + language_code: The language to use, valid values are: 'en', 'zh-hans', 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` + + Returns + ------- + Key + """ + if not mnemonic: + mnemonic = cls.generate_mnemonic(language_code=language_code) + + if language_code != "en": + raise ValueError("ECDSA mnemonic only supports english") + private_key = mnemonic_to_ecdsa_private_key(mnemonic) + keypair = cls.create_from_private_key(private_key, ss58_format=ss58_format, crypto_type=crypto_type) + + keypair.mnemonic = mnemonic + + return keypair + + from_mnemonic = from_mem = create_from_mnemonic + + @classmethod + def create_from_seed( + cls, seed_hex: Union[bytes, str], ss58_format: Optional[int] = SS58_FORMAT + ) -> "ECDSA": + """ + Create a Key for given seed + + Parameters + ---------- + seed_hex: hex string of seed + ss58_format: Substrate address format + + Returns + ------- + Key + """ + raise ValueError('crypto_type "{}" not supported'.format(KeyType.ECDSA)) + + @classmethod + def create_from_uri( + cls, + suri: str, + ss58_format: Optional[int] = SS58_FORMAT, + language_code: str = "en", + crypto_type: Union[str, int] = KeyType.ECDSA, + ) -> "ECDSA": + """ + Creates Key for specified suri in following format: `[mnemonic]/[soft-path]//[hard-path]` + + Parameters + ---------- + suri: + ss58_format: Substrate address format + language_code: The language to use, valid values are: 'en', 'zh-hans', 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` + + Returns + ------- + Key + """ + crypto_type = cls.resolve_crypto_type(crypto_type) + suri = str(suri) + if not suri.startswith("//"): + suri = "//" + suri + + if suri and suri.startswith("/"): + suri = DEV_PHRASE + suri + + suri_regex = re.match( + r"^(?P.[^/]+( .[^/]+)*)(?P(//?[^/]+)*)(///(?P.*))?$", + suri, + ) + + suri_parts = suri_regex.groupdict() + + if language_code != "en": + raise ValueError("ECDSA mnemonic only supports english") + print(suri_parts) + private_key = mnemonic_to_ecdsa_private_key( + mnemonic=suri_parts['phrase'], + str_derivation_path=suri_parts['path'], + passphrase=suri_parts['password'] + ) + derived_keypair = cls.create_from_private_key(private_key, ss58_format=ss58_format, crypto_type=crypto_type) + + return derived_keypair + + from_mnem = from_mnemonic = create_from_mnemonic + + def export_to_encrypted_json(self, passphrase: str, name: str = None) -> dict: + """ + Export Key to PolkadotJS format encrypted JSON file + + Parameters + ---------- + passphrase: Used to encrypt the keypair + name: Display name of Key used + + Returns + ------- + dict + """ + if not name: + name = self.ss58_address + + # Secret key from PolkadotJS is an Ed25519 expanded secret key, so has to be converted + # https://github.com/polkadot-js/wasm/blob/master/packages/wasm-crypto/src/rs/sr25519.rs#L125 + raise NotImplementedError("export_to_encrypted_json not implemented") + + seperator = "::signature=" + + def sign(self, data: Union[ScaleBytes, bytes, str], to_json=False) -> bytes: + """ + Creates a signature for given data + Parameters + ---------- + data: data to sign in `Scalebytes`, bytes or hex string format + Returns + ------- + signature in bytes + + """ + if not isinstance(data, str): + data = c.python2str(data) + if type(data) is ScaleBytes: + data = bytes(data.data) + elif data[0:2] == "0x": + data = bytes.fromhex(data[2:]) + elif type(data) is str: + data = data.encode() + if not self.private_key: + raise Exception("No private key set to create signatures") + + signature = ecdsa_sign(self.private_key, data) + + if to_json: + return { + "data": data.decode(), + "crypto_type": self.crypto_type, + "signature": signature.hex(), + "address": self.ss58_address, + } + return signature + + def verify( + self, + data: Union[ScaleBytes, bytes, str, dict], + signature: Union[bytes, str] = None, + public_key: Optional[str] = None, + return_address=False, + ss58_format=SS58_FORMAT, + max_age=None, + address=None, + **kwargs, + ) -> bool: + """ + Verifies data with specified signature + + Parameters + ---------- + data: data to be verified in `Scalebytes`, bytes or hex string format + signature: signature in bytes or hex string format + public_key: public key in bytes or hex string format + + Returns + ------- + True if data is signed with this Key, otherwise False + """ + data = c.copy(data) + + if isinstance(data, dict): + if self.is_ticket(data): + address = data.pop("address") + signature = data.pop("signature") + elif "data" in data and "signature" in data and "address" in data: + signature = data.pop("signature") + address = data.pop("address", address) + data = data.pop("data") + else: + assert signature != None, "signature not found in data" + assert address != None, "address not found in data" + + if max_age != None: + if isinstance(data, int): + staleness = c.timestamp() - int(data) + elif "timestamp" in data or "time" in data: + timestamp = data.get("timestamp", data.get("time")) + staleness = c.timestamp() - int(timestamp) + else: + raise ValueError( + "data should be a timestamp or a dict with a timestamp key" + ) + assert staleness < max_age, ( + f"data is too old, {staleness} seconds old, max_age is {max_age}" + ) + + if not isinstance(data, str): + data = c.python2str(data) + if address != None: + if self.valid_ss58_address(address): + public_key = ss58_decode(address) + if public_key == None: + public_key = self.public_key + if isinstance(public_key, str): + public_key = bytes.fromhex(public_key.replace("0x", "")) + if type(data) is ScaleBytes: + data = bytes(data.data) + elif data[0:2] == "0x": + data = bytes.fromhex(data[2:]) + elif type(data) is str: + data = data.encode() + if type(signature) is str and signature[0:2] == "0x": + signature = bytes.fromhex(signature[2:]) + elif type(signature) is str: + signature = bytes.fromhex(signature) + if type(signature) is not bytes: + raise TypeError("Signature should be of type bytes or a hex-string") + + crypto_verify_fn = ecdsa_verify + + verified = crypto_verify_fn(signature, data, public_key) + if not verified: + # Another attempt with the data wrapped, as discussed in https://github.com/polkadot-js/extension/pull/743 + # Note: As Python apps are trusted sources on its own, no need to wrap data when signing from this lib + verified = crypto_verify_fn( + signature, b"" + data + b"", public_key + ) + if return_address: + return ss58_encode(public_key, ss58_format=ss58_format) + return verified diff --git a/commune/key/types/sol.py b/commune/key/types/sol.py new file mode 100644 index 000000000..e59b2d2f3 --- /dev/null +++ b/commune/key/types/sol.py @@ -0,0 +1,306 @@ +import sr25519 +import commune as c + +from commune.key.constants import * +from commune.key.utils import * + +import re +import nacl.public +from scalecodec.base import ScaleBytes +from scalecodec.utils.ss58 import ( + ss58_decode, +) +from base58 import b58encode +from typing import Union, Optional +from ..key import Key, KeyType +class Solana(Key): + def __init__( + self, + private_key: Union[bytes, str] = None, + ss58_format: int = SS58_FORMAT, + derive_path: str = None, + path: str = None, + crypto_type: Union[str, int] = KeyType.SOLANA, + **kwargs + ): + self.crypto_type = KeyType.SOLANA + self.set_private_key(private_key=private_key, ss58_format = ss58_format, derive_path=derive_path, path=path, **kwargs) + + def set_private_key( + self, + private_key: Union[bytes, str] = None, + ss58_format: int = 44, + derive_path: str = None, + path: str = None, + **kwargs, + ): + """ + Allows generation of Keys from a variety of input combination, such as a public/private key combination, + mnemonic or URI containing soft and hard derivation paths. With these Keys data can be signed and verified + + Parameters + ---------- + ss58_address: Substrate address + public_key: hex string or bytes of public_key key + private_key: hex string or bytes of private key + ss58_format: Substrate address format, default to 42 when omitted + seed_hex: hex string of seed + """ + # If no arguments are provided, generate a random keypair + if private_key == None: + private_key = self.new_key(crypto_type=self.crypto_type).private_key + if type(private_key) == str: + private_key = c.str2bytes(private_key) + if self.crypto_type == KeyType.SOLANA: + private_key = private_key[0:32] + keypair = SolanaKeypair.from_seed(private_key) + public_key = keypair.pubkey().__bytes__() + private_key = keypair.secret() + key_address = b58encode(bytes(public_key)).decode('utf-8') + hash_type = 'base58' + else: + raise ValueError('crypto_type "{}" not supported'.format(self.crypto_type)) + if type(public_key) is str: + public_key = bytes.fromhex(public_key.replace("0x", "")) + + self.hash_type = hash_type + self.public_key = public_key + self.address = self.key_address = self.ss58_address = key_address + self.private_key = private_key + self.derive_path = derive_path + self.path = path + self.key_address = self.ss58_address + self.key_type = self.crypto_type2name(self.crypto_type) + return {"key_address": key_address, "crypto_type": self.crypto_type} + + + @classmethod + def create_from_mnemonic( + cls, mnemonic: str = None, ss58_format=SS58_FORMAT, language_code: str = "en", crypto_type=KeyType.SOLANA + ) -> "Solana": + """ + Create a Key for given memonic + + Parameters + ---------- + mnemonic: Seed phrase + ss58_format: Substrate address format + language_code: The language to use, valid values are: 'en', 'zh-hans', 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` + + Returns + ------- + Key + """ + if not mnemonic: + mnemonic = cls.generate_mnemonic(language_code=language_code) + + if language_code != "en": + raise ValueError("Solana mnemonic only supports english") + + private_key = SolanaKeypair.from_seed_phrase_and_passphrase(mnemonic, "").secret() + keypair = cls.create_from_private_key(private_key, ss58_format=ss58_format, crypto_type=crypto_type) + + keypair.mnemonic = mnemonic + + return keypair + + from_mnemonic = from_mem = create_from_mnemonic + + @classmethod + def create_from_seed( + cls, seed_hex: Union[bytes, str], ss58_format: Optional[int] = SS58_FORMAT + ) -> "Solana": + """ + Create a Key for given seed + + Parameters + ---------- + seed_hex: hex string of seed + ss58_format: Substrate address format + + Returns + ------- + Key + """ + raise ValueError('crypto_type "{}" not supported'.format(KeyType.SOLANA)) + + @classmethod + def create_from_uri( + cls, + suri: str, + ss58_format: Optional[int] = SS58_FORMAT, + language_code: str = "en", + crypto_type: Union[str, int] = KeyType.SOLANA, + ) -> "Solana": + """ + Creates Key for specified suri in following format: `[mnemonic]/[soft-path]//[hard-path]` + + Parameters + ---------- + suri: + ss58_format: Substrate address format + language_code: The language to use, valid values are: 'en', 'zh-hans', 'zh-hant', 'fr', 'it', 'ja', 'ko', 'es'. Defaults to `"en"` + + Returns + ------- + Key + """ + crypto_type = cls.resolve_crypto_type(crypto_type) + suri = str(suri) + if not suri.startswith("//"): + suri = "//" + suri + + if suri and suri.startswith("/"): + suri = DEV_PHRASE + suri + + suri_regex = re.match( + r"^(?P.[^/]+( .[^/]+)*)(?P(//?[^/]+)*)(///(?P.*))?$", + suri, + ) + + suri_parts = suri_regex.groupdict() + + if language_code != "en": + raise ValueError("Solana mnemonic only supports english") + private_key = SolanaKeypair.from_seed_phrase_and_passphrase(suri_parts['phrase'], passphrase=suri_parts['password']).secret() + derived_keypair = cls.create_from_private_key(private_key, ss58_format=ss58_format, crypto_type=crypto_type) + return derived_keypair + + from_mnem = from_mnemonic = create_from_mnemonic + + def export_to_encrypted_json(self, passphrase: str, name: str = None) -> dict: + """ + Export Key to PolkadotJS format encrypted JSON file + + Parameters + ---------- + passphrase: Used to encrypt the keypair + name: Display name of Key used + + Returns + ------- + dict + """ + + # Secret key from PolkadotJS is an Ed25519 expanded secret key, so has to be converted + # https://github.com/polkadot-js/wasm/blob/master/packages/wasm-crypto/src/rs/sr25519.rs#L125 + raise NotImplementedError("export_to_encrypted_json not implemented") + + seperator = "::signature=" + + def sign(self, data: Union[ScaleBytes, bytes, str], to_json=False) -> bytes: + """ + Creates a signature for given data + Parameters + ---------- + data: data to sign in `Scalebytes`, bytes or hex string format + Returns + ------- + signature in bytes + + """ + if not isinstance(data, str): + data = c.python2str(data) + if type(data) is ScaleBytes: + data = bytes(data.data) + elif data[0:2] == "0x": + data = bytes.fromhex(data[2:]) + elif type(data) is str: + data = data.encode() + if not self.private_key: + raise Exception("No private key set to create signatures") + + signature = solana_sign(self.private_key, data) + + if to_json: + return { + "data": data.decode(), + "crypto_type": self.crypto_type, + "signature": signature.hex(), + "address": self.ss58_address, + } + return signature + + def verify( + self, + data: Union[ScaleBytes, bytes, str, dict], + signature: Union[bytes, str] = None, + public_key: Optional[str] = None, + return_address=False, + ss58_format=SS58_FORMAT, + max_age=None, + address=None, + **kwargs, + ) -> bool: + """ + Verifies data with specified signature + + Parameters + ---------- + data: data to be verified in `Scalebytes`, bytes or hex string format + signature: signature in bytes or hex string format + public_key: public key in bytes or hex string format + + Returns + ------- + True if data is signed with this Key, otherwise False + """ + data = c.copy(data) + + if isinstance(data, dict): + if self.is_ticket(data): + address = data.pop("address") + signature = data.pop("signature") + elif "data" in data and "signature" in data and "address" in data: + signature = data.pop("signature") + address = data.pop("address", address) + data = data.pop("data") + else: + assert signature != None, "signature not found in data" + assert address != None, "address not found in data" + + if max_age != None: + if isinstance(data, int): + staleness = c.timestamp() - int(data) + elif "timestamp" in data or "time" in data: + timestamp = data.get("timestamp", data.get("time")) + staleness = c.timestamp() - int(timestamp) + else: + raise ValueError( + "data should be a timestamp or a dict with a timestamp key" + ) + assert staleness < max_age, ( + f"data is too old, {staleness} seconds old, max_age is {max_age}" + ) + + if not isinstance(data, str): + data = c.python2str(data) + if public_key == None: + public_key = self.public_key + if isinstance(public_key, str): + public_key = bytes.fromhex(public_key.replace("0x", "")) + if type(data) is ScaleBytes: + data = bytes(data.data) + elif data[0:2] == "0x": + data = bytes.fromhex(data[2:]) + elif type(data) is str: + data = data.encode() + if type(signature) is str and signature[0:2] == "0x": + signature = bytes.fromhex(signature[2:]) + elif type(signature) is str: + signature = bytes.fromhex(signature) + if type(signature) is not bytes: + raise TypeError("Signature should be of type bytes or a hex-string") + + crypto_verify_fn = solana_verify + + verified = crypto_verify_fn(signature, data, public_key) + if not verified: + # Another attempt with the data wrapped, as discussed in https://github.com/polkadot-js/extension/pull/743 + # Note: As Python apps are trusted sources on its own, no need to wrap data when signing from this lib + verified = crypto_verify_fn( + signature, b"" + data + b"", public_key + ) + if return_address: + return b58encode(public_key).decode('utf-8') + return verified diff --git a/commune/key/utils.py b/commune/key/utils.py new file mode 100644 index 000000000..fd2657c9e --- /dev/null +++ b/commune/key/utils.py @@ -0,0 +1,264 @@ +import re +import json +import base64 +import hashlib +import hmac +import struct +from nacl.secret import SecretBox +from nacl.pwhash import scrypt + +from typing import Union +from scalecodec.types import Bytes +from hashlib import blake2b +from math import ceil +from os import urandom +from sr25519 import pair_from_ed25519_secret_key +from eth_keys.datatypes import PrivateKey, Signature +from eth_utils import to_checksum_address, keccak as eth_utils_keccak +from solders.keypair import Keypair as SolanaKeypair +from solders.pubkey import Pubkey as SolanaPubkey +from solders.signature import Signature as SolanaSignature + +from .constants import * + +class PublicKey: + def __init__(self, private_key): + self.point = int.from_bytes(private_key, byteorder='big') * BIP32_CURVE.generator + + def __bytes__(self): + xstr = int(self.point.x()).to_bytes(32, byteorder='big') + parity = int(self.point.y()) & 1 + return (2 + parity).to_bytes(1, byteorder='big') + xstr + + def address(self): + x = int(self.point.x()) + y = int(self.point.y()) + s = x.to_bytes(32, 'big') + y.to_bytes(32, 'big') + return to_checksum_address(eth_utils_keccak(s)[12:]) + +def mnemonic_to_bip39seed(mnemonic, passphrase): + mnemonic = bytes(mnemonic, 'utf8') + salt = bytes(BIP39_SALT_MODIFIER + passphrase, 'utf8') + return hashlib.pbkdf2_hmac('sha512', mnemonic, salt, BIP39_PBKDF2_ROUNDS) + +def bip39seed_to_bip32masternode(seed): + h = hmac.new(BIP32_SEED_MODIFIER, seed, hashlib.sha512).digest() + key, chain_code = h[:32], h[32:] + return key, chain_code + +def derive_bip32childkey(parent_key, parent_chain_code, i): + assert len(parent_key) == 32 + assert len(parent_chain_code) == 32 + k = parent_chain_code + if (i & BIP32_PRIVDEV) != 0: + key = b'\x00' + parent_key + else: + key = bytes(PublicKey(parent_key)) + d = key + struct.pack('>L', i) + while True: + h = hmac.new(k, d, hashlib.sha512).digest() + key, chain_code = h[:32], h[32:] + a = int.from_bytes(key, byteorder='big') + b = int.from_bytes(parent_key, byteorder='big') + key = (a + b) % int(BIP32_CURVE.order) + if a < BIP32_CURVE.order and key != 0: + key = key.to_bytes(32, byteorder='big') + break + d = b'\x01' + h[32:] + struct.pack('>L', i) + return key, chain_code + +def parse_derivation_path(str_derivation_path): + path = [] + if str_derivation_path[0:2] != 'm/': + raise ValueError("Can't recognize derivation path. It should look like \"m/44'/60/0'/0\".") + for i in str_derivation_path.lstrip('m/').split('/'): + if "'" in i: + path.append(BIP32_PRIVDEV + int(i[:-1])) + else: + path.append(int(i)) + return path + + +def mnemonic_to_ecdsa_private_key(mnemonic: str, str_derivation_path: str = None, passphrase: str = "") -> bytes: + + if str_derivation_path is None: + str_derivation_path = f'{ETH_DERIVATION_PATH}/0' + + derivation_path = parse_derivation_path(str_derivation_path) + bip39seed = mnemonic_to_bip39seed(mnemonic, passphrase) + master_private_key, master_chain_code = bip39seed_to_bip32masternode(bip39seed) + private_key, chain_code = master_private_key, master_chain_code + for i in derivation_path: + private_key, chain_code = derive_bip32childkey(private_key, chain_code, i) + return private_key + + +def ecdsa_sign(private_key: bytes, message: bytes) -> bytes: + signer = PrivateKey(private_key) + return signer.sign_msg(message).to_bytes() + + +def ecdsa_verify(signature: bytes, data: bytes, address: bytes) -> bool: + signature_obj = Signature(signature) + recovered_pubkey = signature_obj.recover_public_key_from_msg(data) + return recovered_pubkey.to_canonical_address() == address + +def solana_sign(private_key: bytes, message: bytes) -> bytes: + keypair = SolanaKeypair.from_seed(private_key) + return bytes(keypair.sign_message(message)) + +def solana_verify(signature: bytes, message: bytes, public_key: bytes) -> bool: + signature = SolanaSignature.from_bytes(signature) + pubkey = SolanaPubkey(public_key) + return signature.verify(pubkey, message) + +class DeriveJunction: + def __init__(self, chain_code, is_hard=False): + self.chain_code = chain_code + self.is_hard = is_hard + + @classmethod + def from_derive_path(cls, path: str, is_hard=False): + + if path.isnumeric(): + byte_length = ceil(int(path).bit_length() / 8) + chain_code = int(path).to_bytes(byte_length, 'little').ljust(32, b'\x00') + + else: + path_scale = Bytes() + path_scale.encode(path) + + if len(path_scale.data) > JUNCTION_ID_LEN: + chain_code = blake2b(path_scale.data.data, digest_size=32).digest() + else: + chain_code = bytes(path_scale.data.data.ljust(32, b'\x00')) + + return cls(chain_code=chain_code, is_hard=is_hard) + +def extract_derive_path(derive_path: str): + + path_check = '' + junctions = [] + paths = re.findall(RE_JUNCTION, derive_path) + + if paths: + path_check = ''.join(''.join(path) for path in paths) + + for path_separator, path_value in paths: + junctions.append(DeriveJunction.from_derive_path( + path=path_value, is_hard=path_separator == '//') + ) + + if path_check != derive_path: + raise ValueError('Reconstructed path "{}" does not match input'.format(path_check)) + + return junctions + + +def decode_pair_from_encrypted_json(json_data: Union[str, dict], passphrase: str) -> tuple: + """ + Decodes encrypted PKCS#8 message from PolkadotJS JSON format + + Parameters + ---------- + json_data + passphrase + + Returns + ------- + tuple containing private and public key + """ + if type(json_data) is str: + json_data = json.loads(json_data) + + # Check requirements + if json_data.get('encoding', {}).get('version') != "3": + raise ValueError("Unsupported JSON format") + + encrypted = base64.b64decode(json_data['encoded']) + + if 'scrypt' in json_data['encoding']['type']: + salt = encrypted[0:32] + n = int.from_bytes(encrypted[32:36], byteorder='little') + p = int.from_bytes(encrypted[36:40], byteorder='little') + r = int.from_bytes(encrypted[40:44], byteorder='little') + + password = scrypt(passphrase.encode(), salt, n=n, r=r, p=p, dklen=32, maxmem=2 ** 26) + encrypted = encrypted[SCRYPT_LENGTH:] + + else: + password = passphrase.encode().rjust(32, b'\x00') + + if "xsalsa20-poly1305" not in json_data['encoding']['type']: + raise ValueError("Unsupported encoding type") + + nonce = encrypted[0:NONCE_LENGTH] + message = encrypted[NONCE_LENGTH:] + + secret_box = SecretBox(key=password) + decrypted = secret_box.decrypt(message, nonce) + + # Decode PKCS8 message + secret_key, public_key = decode_pkcs8(decrypted) + + if 'sr25519' in json_data['encoding']['content']: + # Secret key from PolkadotJS is an Ed25519 expanded secret key, so has to be converted + # https://github.com/polkadot-js/wasm/blob/master/packages/wasm-crypto/src/rs/sr25519.rs#L125 + converted_public_key, secret_key = pair_from_ed25519_secret_key(secret_key) + assert(public_key == converted_public_key) + + return secret_key, public_key + + +def decode_pkcs8(ciphertext: bytes) -> tuple: + current_offset = 0 + + header = ciphertext[current_offset:len(PKCS8_HEADER)] + if header != PKCS8_HEADER: + raise ValueError("Invalid Pkcs8 header found in body") + + current_offset += len(PKCS8_HEADER) + + secret_key = ciphertext[current_offset:current_offset + SEC_LENGTH] + current_offset += SEC_LENGTH + + divider = ciphertext[current_offset:current_offset + len(PKCS8_DIVIDER)] + + if divider != PKCS8_DIVIDER: + raise ValueError("Invalid Pkcs8 divider found in body") + + current_offset += len(PKCS8_DIVIDER) + + public_key = ciphertext[current_offset: current_offset + PUB_LENGTH] + + return secret_key, public_key + +def encode_pkcs8(public_key: bytes, private_key: bytes) -> bytes: + return PKCS8_HEADER + private_key + PKCS8_DIVIDER + public_key + +def encode_pair(public_key: bytes, private_key: bytes, passphrase: str) -> bytes: + """ + Encode a public/private pair to PKCS#8 format, encrypted with provided passphrase + + Parameters + ---------- + public_key: 32 bytes public key + private_key: 64 bytes private key + passphrase: passphrase to encrypt the PKCS#8 message + + Returns + ------- + (Encrypted) PKCS#8 message bytes + """ + message = encode_pkcs8(public_key, private_key) + + salt = urandom(SALT_LENGTH) + password = scrypt(passphrase.encode(), salt, n=SCRYPT_N, r=SCRYPT_R, p=SCRYPT_P, dklen=32, maxmem=2 ** 26) + + secret_box = SecretBox(key=password) + message = secret_box.encrypt(message) + + scrypt_params = SCRYPT_N.to_bytes(4, 'little') + SCRYPT_P.to_bytes(4, 'little') + SCRYPT_R.to_bytes(4, 'little') + + return salt + scrypt_params + message.nonce + message.ciphertext + diff --git a/commune/module.py b/commune/module.py index 2a6e528e8..b3f1b3c0f 100755 --- a/commune/module.py +++ b/commune/module.py @@ -321,7 +321,7 @@ def is_module_folder(cls, module = None) -> bool: @classmethod def get_key(cls,key:str = None , **kwargs) -> None: - from commune.key import Key + from commune.key.key import Key if not isinstance(key, str) and hasattr(key,"module_name" ): key = key.module_name() return Key.get_key(key, **kwargs) diff --git a/commune/server/server.py b/commune/server/server.py index 63905e419..05c3518ee 100644 --- a/commune/server/server.py +++ b/commune/server/server.py @@ -25,6 +25,7 @@ async def dispatch(self, request: Request, call_next): response = await call_next(request) return response + class Server(c.Module): tag_seperator:str='::' user_data_lifetime = 3600 @@ -503,6 +504,8 @@ def start_process(cls, cmd = cmd + f' -- --fn {run_fn} --kwargs "{kwargs_str}"' stdout = c.cmd(cmd, env=env, verbose=verbose, cwd=cwd) return {'success':True, 'msg':f'Launched {module}', 'cmd': cmd, 'stdout':stdout} + remote_fn = launch = start_process + @classmethod def restart(cls, name:str): @@ -658,6 +661,7 @@ def add_endpoint(self, name, fn): assert hasattr(self, name), f'{name} not added to {self.__class__.__name__}' return {'success':True, 'message':f'Added {fn} to {self.__class__.__name__}'} + @classmethod def test(cls, **kwargs): from .test import Test diff --git a/tests/test_server.py b/commune/server/test_server.py similarity index 98% rename from tests/test_server.py rename to commune/server/test_server.py index a8e3fa1f2..140f40ed7 100644 --- a/tests/test_server.py +++ b/commune/server/test_server.py @@ -1,8 +1,8 @@ - import commune as c -def test(): + +def test_serializer(): self = c.module('serializer')() import torch, time data_list = [ diff --git a/tests/test_subspace.py b/commune/subspace/test_subspace.py similarity index 100% rename from tests/test_subspace.py rename to commune/subspace/test_subspace.py diff --git a/tests/test_vali.py b/commune/vali/test_validator.py similarity index 100% rename from tests/test_vali.py rename to commune/vali/test_validator.py diff --git a/commune/vali/vali.py b/commune/vali/vali.py index 2ab85f135..25f4db206 100644 --- a/commune/vali/vali.py +++ b/commune/vali/vali.py @@ -64,6 +64,7 @@ def set_network(self, network:str, self.search = search self.path = os.path.abspath(path or self.resolve_path(f'{network}/{subnet}' if subnet else network)) self.is_voting_network = any([v in self.network for v in self.voting_networks]) + self.set_score(score) self.sync(update=update) @@ -92,6 +93,7 @@ def get_client(self, module:dict): if module['key'] in self._clients: client = self._clients[module['key']] else: + if isinstance(module, str): address = module else: @@ -108,6 +110,7 @@ def score_module(self, module:dict, **kwargs): key: str time: int """ + if isinstance(module, str): module = self.network_module.get_module(module) module['time'] = c.time() # the timestamp @@ -134,6 +137,7 @@ def score_modules(self, modules: List[dict]): except Exception as e: c.print(f'ERROR({c.detailed_error(e)})', color='red', verbose=1) print(module_results) + return module_results def epoch(self): @@ -233,15 +237,42 @@ def module_paths(self): @classmethod def run_epoch(cls, network='local', run_loop=False, update=False, **kwargs): return cls(network=network, run_loop=run_loop, update=update, **kwargs).epoch() + @staticmethod - def test(): - from .test import Test - return Test().test() + def test( + n=2, + tag = 'vali_test_net', + miner='module', + trials = 5, + tempo = 4, + update=True, + path = '/tmp/commune/vali_test', + network='local' + ): + test_miners = [f'{miner}::{tag}{i}' for i in range(n)] + modules = test_miners + search = tag + assert len(modules) == n, f'Number of miners not equal to n {len(modules)} != {n}' + for m in modules: + c.serve(m) + namespace = c.namespace() + for m in modules: + assert m in namespace, f'Miner not in namespace {m}' + vali = Vali(network=network, search=search, path=path, update=update, tempo=tempo, run_loop=False) + print(vali.modules) + scoreboard = [] + while len(scoreboard) < n: + c.sleep(1) + scoreboard = vali.epoch() + trials -= 1 + assert trials > 0, f'Trials exhausted {trials}' + for miner in modules: + c.print(c.kill(miner)) + return {'success': True, 'msg': 'subnet test passed'} + def refresh_scoreboard(self): path = self.path c.rm(path) return {'success': True, 'msg': 'Leaderboard removed', 'path': path} - - diff --git a/tests/test_key.py b/tests/test_key.py deleted file mode 100644 index 27c211a51..000000000 --- a/tests/test_key.py +++ /dev/null @@ -1,103 +0,0 @@ - -import commune as c - -def test_encryption(values = [10, 'fam', 'hello world']): - cls = c.module('key') - for value in values: - value = str(value) - key = cls.new_key() - enc = key.encrypt(value) - dec = key.decrypt(enc) - assert dec == value, f'encryption failed, {dec} != {value}' - return {'encrypted':enc, 'decrypted': dec} - -def test_encryption_with_password(value = 10, password = 'fam'): - cls = c.module('key') - value = str(value) - key = cls.new_key() - enc = key.encrypt(value, password=password) - dec = key.decrypt(enc, password=password) - assert dec == value, f'encryption failed, {dec} != {value}' - return {'encrypted':enc, 'decrypted': dec} - -def test_key_encryption(test_key='test.key'): - self = c.module('key') - key = self.add_key(test_key, refresh=True) - og_key = self.get_key(test_key) - r = self.encrypt_key(test_key) - self.decrypt_key(test_key, password=r['password']) - key = self.get_key(test_key) - assert key.ss58_address == og_key.ss58_address, f'key encryption failed, {key.ss58_address} != {self.ss58_address}' - return {'success': True, 'msg': 'test_key_encryption passed'} - -def test_key_management(key1='test.key' , key2='test2.key'): - self = c.module('key') - if self.key_exists(key1): - self.rm_key(key1) - if self.key_exists(key2): - self.rm_key(key2) - self.add_key(key1) - k1 = self.get_key(key1) - assert self.key_exists(key1), f'Key management failed, key still exists' - self.mv_key(key1, key2) - k2 = self.get_key(key2) - assert k1.ss58_address == k2.ss58_address, f'Key management failed, {k1.ss58_address} != {k2.ss58_address}' - assert self.key_exists(key2), f'Key management failed, key does not exist' - assert not self.key_exists(key1), f'Key management failed, key still exists' - self.mv_key(key2, key1) - assert self.key_exists(key1), f'Key management failed, key does not exist' - assert not self.key_exists(key2), f'Key management failed, key still exists' - self.rm_key(key1) - # self.rm_key(key2) - assert not self.key_exists(key1), f'Key management failed, key still exists' - assert not self.key_exists(key2), f'Key management failed, key still exists' - return {'success': True, 'msg': 'test_key_management passed'} - - -def test_signing(): - self = c.module('key')() - sig = self.sign('test') - assert self.verify('test',sig, self.public_key) - return {'success':True} - -def test_key_encryption(password='1234'): - cls = c.module('key') - path = 'test.enc' - cls.add_key('test.enc', refresh=True) - assert cls.is_key_encrypted(path) == False, f'file {path} is encrypted' - cls.encrypt_key(path, password=password) - assert cls.is_key_encrypted(path) == True, f'file {path} is not encrypted' - cls.decrypt_key(path, password=password) - assert cls.is_key_encrypted(path) == False, f'file {path} is encrypted' - cls.rm(path) - print('file deleted', path, c.exists, 'fam') - assert not c.exists(path), f'file {path} not deleted' - return {'success': True, 'msg': 'test_key_encryption passed'} - -def test_move_key(): - self = c.module('key')() - self.add_key('testfrom') - assert self.key_exists('testfrom') - og_key = self.get_key('testfrom') - self.mv_key('testfrom', 'testto') - assert self.key_exists('testto') - assert not self.key_exists('testfrom') - new_key = self.get_key('testto') - assert og_key.ss58_address == new_key.ss58_address - self.rm_key('testto') - assert not self.key_exists('testto') - return {'success':True, 'msg':'test_move_key passed', 'key':new_key.ss58_address} - - -def test_ss58_encoding(): - self = c.module('key') - keypair = self.create_from_uri('//Alice') - ss58_address = keypair.ss58_address - public_key = keypair.public_key - assert keypair.ss58_address == self.ss58_encode(public_key, ss58_format=42) - assert keypair.ss58_address == self.ss58_encode(public_key, ss58_format=42) - assert keypair.public_key.hex() == self.ss58_decode(ss58_address) - assert keypair.public_key.hex() == self.ss58_decode(ss58_address) - return {'success':True} - -