From 7380cba39a38949746fa6899bfcc73dbdd2a92fc Mon Sep 17 00:00:00 2001 From: Mojahed Date: Tue, 10 Jun 2025 19:42:38 +0330 Subject: [PATCH] fix: remove usages of zope and zope-interface from code and requirements --- coinaddrvalidator/currency.py | 23 +- coinaddrvalidator/interfaces.py | 212 ++++++-- coinaddrvalidator/validation.py | 864 +++++++++++++------------------- requirements.txt | 17 +- 4 files changed, 540 insertions(+), 576 deletions(-) diff --git a/coinaddrvalidator/currency.py b/coinaddrvalidator/currency.py index dbd2821..f7851e2 100644 --- a/coinaddrvalidator/currency.py +++ b/coinaddrvalidator/currency.py @@ -6,18 +6,17 @@ """ import attr -from zope.interface import implementer, provider +from typing import Dict, Any, Optional from .interfaces import ICurrency, INamedInstanceContainer from .base import NamedInstanceContainerBase -@provider(INamedInstanceContainer) class Currencies(metaclass=NamedInstanceContainerBase): """Container for all currencies.""" @classmethod - def get(cls, name, default=None): + def get(cls, name: str, default: Any = None) -> Any: """Return currency object with matching name or ticker.""" for inst in cls.instances.values(): if name in (inst.name, inst.ticker): @@ -29,36 +28,26 @@ def get(cls, name, default=None): class CurrencyMeta(type): """Register currency classes on Currencies.currencies.""" - def __call__(cls, *args, **kwargs): + def __call__(cls, *args: Any, **kwargs: Any) -> Any: inst = super(CurrencyMeta, cls).__call__(*args, **kwargs) Currencies[inst.name] = inst return inst -@implementer(ICurrency) @attr.s(frozen=True, slots=True, cmp=False) class Currency(metaclass=CurrencyMeta): """An immutable representation of a cryptocurrency specification.""" - name = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - ticker = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - validator = attr.ib( - type='str', - validator=attr.validators.instance_of(str)) + name = attr.ib(validator=attr.validators.instance_of(str)) + ticker = attr.ib(validator=attr.validators.instance_of(str)) + validator = attr.ib(validator=attr.validators.instance_of(str)) networks = attr.ib( - type=dict, validator=attr.validators.optional(attr.validators.instance_of(dict)), default=attr.Factory(dict)) address_types = attr.ib( - type=dict, validator=attr.validators.optional(attr.validators.instance_of(dict)), default=attr.Factory(dict)) charset = attr.ib( - type=bytes, validator=attr.validators.optional(attr.validators.instance_of(bytes)), default=None) diff --git a/coinaddrvalidator/interfaces.py b/coinaddrvalidator/interfaces.py index 9872bcd..99c1201 100644 --- a/coinaddrvalidator/interfaces.py +++ b/coinaddrvalidator/interfaces.py @@ -5,101 +5,217 @@ :mod:`coinaddr.interfaces` ~~~~~~~~~~~~~~~~~~~~~~~~ -Various zope compatible interfaces for the coinaddr package. +Various interfaces for the coinaddr package. """ -from zope.interface import Interface, Attribute +from abc import ABC, abstractmethod +from typing import Any, Dict, Iterator, Optional -class INamedInstanceContainer(Interface): +class INamedInstanceContainer(ABC): """Contains all currencies instantiated.""" - instances = Attribute('Mapping of instance.name -> instance') + @property + @abstractmethod + def instances(self) -> Dict[str, Any]: + """Mapping of instance.name -> instance""" + pass - def __getitem__(name): + @abstractmethod + def __getitem__(self, name: str) -> Any: """Return the named instance""" + pass - def __setitem__(name, obj): + @abstractmethod + def __setitem__(self, name: str, obj: Any) -> None: """Add the named instance to the mapping of instances""" + pass - def __delitem__(name, obj): - """Add the named instance to the mapping of instances""" + @abstractmethod + def __delitem__(self, name: str) -> None: + """Remove the named instance from the mapping of instances""" + pass - def __contains__(name): + @abstractmethod + def __contains__(self, name: str) -> bool: """Return true if we contain the named instance""" + pass - def __iter__(): + @abstractmethod + def __iter__(self) -> Iterator[Any]: """Return an iterable, iterating all instances""" + pass - def get(name, default=None): + @abstractmethod + def get(self, name: str, default: Any = None) -> Any: """Return the named instance if we contain it, else default""" + pass -class INamedSubclassContainer(Interface): +class INamedSubclassContainer(ABC): """Contains a weakvaluedict of subclasses.""" - subclasses = Attribute('Mapping of subclass.name -> subclass') + @property + @abstractmethod + def subclasses(self) -> Dict[str, Any]: + """Mapping of subclass.name -> subclass""" + pass - def __getitem__(name): + @abstractmethod + def __getitem__(self, name: str) -> Any: """Return the named subclass""" + pass - def __setitem__(name, obj): + @abstractmethod + def __setitem__(self, name: str, obj: Any) -> None: """Add the named subclass to the mapping of subclasses""" + pass - def __delitem__(name, obj): - """Add the named subclass to the mapping of subclasses""" + @abstractmethod + def __delitem__(self, name: str) -> None: + """Remove the named subclass from the mapping of subclasses""" + pass - def __contains__(name): + @abstractmethod + def __contains__(self, name: str) -> bool: """Return true if we contain the named subclass""" + pass - def __iter__(): + @abstractmethod + def __iter__(self) -> Iterator[Any]: """Return an iterable, iterating all subclasses""" + pass - def get(name, default=None): + @abstractmethod + def get(self, name: str, default: Any = None) -> Any: """Return the named subclass if we contain it, else default""" + pass -class ICurrency(Interface): +class ICurrency(ABC): """A cryptocurrency address specification.""" - name = Attribute('Name of currency') - ticker = Attribute('Ticker symbol for currency') - validator = Attribute('Validator name for validation') - networks = Attribute('The networks and version bytes for those networks') - charset = Attribute('For base58Check based currencies, custom charset.') - - -class IValidator(Interface): + @property + @abstractmethod + def name(self) -> str: + """Name of currency""" + pass + + @property + @abstractmethod + def ticker(self) -> str: + """Ticker symbol for currency""" + pass + + @property + @abstractmethod + def validator(self) -> str: + """Validator name for validation""" + pass + + @property + @abstractmethod + def networks(self) -> Dict[str, Any]: + """The networks and version bytes for those networks""" + pass + + @property + @abstractmethod + def charset(self) -> Optional[bytes]: + """For base58Check based currencies, custom charset.""" + pass + + +class IValidator(ABC): """A cryptocurrency address validator.""" - name = Attribute('Name of validator') - network = Attribute('Network name of address being validated') + @property + @abstractmethod + def name(self) -> str: + """Name of validator""" + pass + + @property + @abstractmethod + def network(self) -> str: + """Network name of address being validated""" + pass - def validate(): + @abstractmethod + def validate(self) -> bool: """Validate the address type, True if valid, else False.""" + pass -class IValidationRequest(Interface): +class IValidationRequest(ABC): """Contains the data and helpers for a given validation request.""" - currency = Attribute('The currency name or ticker being validated') - address = Attribute('The address to be validated') - extras = Attribute('Any extra attributes to be passed to decoder, etc') - networks = Attribute( - 'Concatenated list of all network versions for currency') - - def execute(): + @property + @abstractmethod + def currency(self) -> str: + """The currency name or ticker being validated""" + pass + + @property + @abstractmethod + def address(self) -> str: + """The address to be validated""" + pass + + @property + @abstractmethod + def extras(self) -> Dict[str, Any]: + """Any extra attributes to be passed to decoder, etc""" + pass + + @property + @abstractmethod + def networks(self) -> str: + """Concatenated list of all network versions for currency""" + pass + + @abstractmethod + def execute(self) -> 'IValidationResult': """Executes the request and returns a ValidationResult object""" + pass -class IValidationResult(Interface): +class IValidationResult(ABC): """Represents all data for a validation result.""" - name = Attribute('Name of currency for address validated') - ticker = Attribute('Ticker of currency for address validated') - address = Attribute('The address that was validated') - valid = Attribute('Boolean representing whether the address is valid') - network = Attribute( - 'Name of network the address belongs to if applicable') - is_extended = Attribute('boolean representing whether the address is extended key or not') + @property + @abstractmethod + def name(self) -> str: + """Name of currency for address validated""" + pass + + @property + @abstractmethod + def ticker(self) -> str: + """Ticker of currency for address validated""" + pass + + @property + @abstractmethod + def address(self) -> str: + """The address that was validated""" + pass + + @property + @abstractmethod + def valid(self) -> bool: + """Boolean representing whether the address is valid""" + pass + + @property + @abstractmethod + def network(self) -> Optional[str]: + """Name of network the address belongs to if applicable""" + pass + + @property + @abstractmethod + def is_extended(self) -> bool: + """boolean representing whether the address is extended key or not""" + pass diff --git a/coinaddrvalidator/validation.py b/coinaddrvalidator/validation.py index 410760f..598bb0e 100644 --- a/coinaddrvalidator/validation.py +++ b/coinaddrvalidator/validation.py @@ -1,19 +1,16 @@ -# pylint: disable=no-member - """ :mod:`coinaddr.validation` ~~~~~~~~~~~~~~~~~~~~~~~~ -Various validation machinery for validating cryptocurrency addresses. +Validation of cryptocurrency addresses. """ import re from hashlib import sha256, blake2b import functools import operator -from typing import Optional +from typing import Any, Dict, Optional, Type, ClassVar, Union -from zope.interface import implementer, provider import attr from Crypto.Hash import keccak import base58check @@ -27,16 +24,11 @@ from .encoding import crc16 from .interfaces import ( - INamedSubclassContainer, IValidator, IValidationRequest, - IValidationResult, ICurrency - ) + ICurrency, IValidator, IValidationRequest, IValidationResult) +from .currency import Currencies, Currency from .base import NamedSubclassContainerBase -from . import currency - -from coinaddrvalidator.attrs_zope import provides -@provider(INamedSubclassContainer) class Validators(metaclass=NamedSubclassContainerBase): """Container for all validators.""" @@ -51,42 +43,124 @@ def __new__(mcs, cls, bases, attrs): return new +@attr.s(frozen=True, slots=True) +class ValidationResult: + """Represents all data for a validation result.""" + + name = attr.ib(validator=attr.validators.instance_of(str)) + ticker = attr.ib(validator=attr.validators.instance_of(str)) + address = attr.ib(validator=attr.validators.instance_of((str, bytes))) + valid = attr.ib(validator=attr.validators.instance_of(bool)) + network = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(str)), default=None) + is_extended = attr.ib(validator=attr.validators.instance_of(bool), default=False) + address_type = attr.ib(validator=attr.validators.instance_of(str), default='address') + + def __bool__(self): + return self.valid + + +@attr.s(frozen=True, slots=True) +class ValidationRequest: + """Contains the data and helpers for a given validation request.""" + + currency = attr.ib(validator=attr.validators.instance_of((str, Currency))) + address = attr.ib(validator=attr.validators.instance_of(str)) + extras = attr.ib(validator=attr.validators.instance_of(dict), default=attr.Factory(dict)) + networks = attr.ib(validator=attr.validators.instance_of(str), default='') + + def execute(self) -> ValidationResult: + """Executes the request and returns a ValidationResult object""" + if isinstance(self.currency, str): + currency = Currencies.get(self.currency) + if not currency: + return ValidationResult( + name=self.currency, + ticker=self.currency, + address=self.address.encode('utf-8'), + valid=False) + else: + currency = self.currency + + validator_cls = self._get_validator_cls(currency.validator) + if not validator_cls: + return ValidationResult( + name=currency.name, + ticker=currency.ticker, + address=self.address.encode('utf-8'), + valid=False) + + # Create a new request with the original string address + request = ValidationRequest( + currency=currency, + address=self.address, + extras=self.extras, + networks=self.networks + ) + + validator = validator_cls(request=request) + + valid = False + network = '' + is_extended = False + address_type = 'address' + try: + valid = validator.validate() + network = validator.network + is_extended = validator.validate_extended() + address_type = validator.address_type + except: + pass + + return ValidationResult( + name=currency.name, + ticker=currency.ticker, + address=self.address.encode('utf-8'), + valid=valid, + network=network, + is_extended=is_extended, + address_type=address_type) + + def _get_validator_cls(self, validator_name: str) -> Optional[Type[IValidator]]: + """Get the validator class for the given validator name.""" + return Validators.get(validator_name) + + @attr.s(cmp=False, slots=True) class ValidatorBase(metaclass=ValidatorMeta): """Validator Interface.""" - name = None + name: ClassVar[str] = None request = attr.ib( - type='ValidationRequest', - validator=[ - lambda i, a, v: type(v).__name__ == 'ValidationRequest', - provides(IValidationRequest) - ] + validator=attr.validators.instance_of(ValidationRequest) ) - def validate(self): + def validate(self) -> bool: """Validate the address type, return True if valid, else False.""" + raise NotImplementedError - def validate_extended(self): + def validate_extended(self) -> bool: """Validate the extended keys, return True if valid, else False.""" + raise NotImplementedError @property - def network(self): + def network(self) -> str: """Return the network derived from the network version bytes.""" + raise NotImplementedError @property - def address_type(self): + def address_type(self) -> str: """Return the address type derived from the network version bytes.""" return 'address' + @attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) class GRSValidator(ValidatorBase): + """Validates Groestlcoin addresses.""" name = 'GRSCheck' - def validate(self): + def validate(self) -> bool: # groestlcoin address is 34 bytes long if len(self.request.address) != 34: return False @@ -104,25 +178,25 @@ def validate(self): return True - def validate_extended(self): + def validate_extended(self) -> bool: return False @property - def network(self): + def network(self) -> str: for name, networks in self.request.currency.networks.items(): for netw in networks: if self.request.address.startswith(netw.encode('utf-8')): return name - return "" + @attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) class Bech32CheckValidator(ValidatorBase): + """Validates Bech32 addresses.""" name = 'Bech32Check' - def validate(self): + def validate(self) -> bool: decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) data = decoded_address[1] @@ -134,11 +208,11 @@ def validate(self): return True - def validate_extended(self): + def validate_extended(self) -> bool: return False @property - def network(self): + def network(self) -> str: decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) hrp = decoded_address[0] @@ -146,78 +220,19 @@ def network(self): for netw in networks: if hrp == netw: return name - return "" @attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class CosmosValidator(ValidatorBase): - - name = 'CosmosCheck' - hrp_table = ("cosmos","cosmospub","cosmosvalcons","cosmosvalconspub","cosmosvaloper","cosmosvaloperpub") - - def validate(self): - decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) - hrp = decoded_address[0] - data = decoded_address[1] - - if hrp not in self.hrp_table: - return False - - if data is None: - return False - - """ - test = [] - for i in data: - test.append(hex(i)) - - print(test) - - test = [] - converted = bech32.convertbits(decoded_address[1], 5, 8, False) - for i in converted: - test.append(hex(i)) - - print(test) - """ - - return True - - - def validate_extended(self): - return False - - @property - def network(self): - return "" - - @property - def address_type(self): - if len(self.request.address) == 0: - return "" - - decoded_address = bech32.bech32_decode(self.request.address.decode('utf-8')) - hrp = decoded_address[0] - - if hrp not in self.hrp_table: - return "" - - return hrp - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) class Base58CheckValidator(ValidatorBase): """Validates Base58Check based cryptocurrency addresses.""" name = 'Base58Check' # base58 alphabet representation dec_digit_to_base58 = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" - base58_digit_to_dec = { b58:dec for dec,b58 in enumerate(dec_digit_to_base58) } - + base58_digit_to_dec = {b58: dec for dec, b58 in enumerate(dec_digit_to_base58)} - def validate(self): + def validate(self) -> bool: """extended keys have their own validation""" if len(self.request.address) == 111: return self.validate_extended() @@ -227,22 +242,33 @@ def validate(self): return False try: + # Use custom charset if provided + extras = self.request.extras.copy() + if self.request.currency.charset: + extras['charset'] = self.request.currency.charset abytes = base58check.b58decode( - self.request.address, **self.request.extras) + self.request.address, **extras) except ValueError: return False - if self.network == '': + # For XRP, we need to check the network first + network = self.network + if network == '': return False + # Calculate checksum checksum = sha256(sha256(abytes[:-4]).digest()).digest()[:4] if abytes[-4:] != checksum: return False - return self.request.address == base58check.b58encode( - abytes, **self.request.extras) + # Verify the address can be re-encoded correctly + try: + reencoded = base58check.b58encode(abytes, **extras) + return self.request.address == reencoded.decode('utf-8') + except Exception: + return False - def validate_extended(self,checksum_algo='sha256'): + def validate_extended(self, checksum_algo='sha256') -> bool: if len(self.request.address) != 111: return False @@ -250,7 +276,7 @@ def validate_extended(self,checksum_algo='sha256'): return False # strip leading "zeros" (the "1" digit with base58) - base58_stripped = self.request.address.decode('utf-8').lstrip("1") + base58_stripped = self.request.address.lstrip("1") # convert base58 to decimal int_rep = 0 for base58_digit in base58_stripped: @@ -268,12 +294,12 @@ def validate_extended(self,checksum_algo='sha256'): hex_rep = "0" + hex_rep # decode it into a binary string, padded with zeros # 72 bytes (extended key size) + 4 bytes (prefix version bytes) - all_bytes = base64.b16decode(hex_rep).rjust(82, b"\0") + all_bytes = base64.b16decode(hex_rep).rjust(82, b"\0") # count leading zeros - zero_count = next(zeros for zeros,byte in enumerate(all_bytes) if byte != 0) + zero_count = next(zeros for zeros, byte in enumerate(all_bytes) if byte != 0) # compare it with the number of leading zeros lstripped at the beginning - if len(self.request.address.decode('utf-8')) - len(base58_stripped) != zero_count: + if len(self.request.address) - len(base58_stripped) != zero_count: return False if checksum_algo == 'blake256': @@ -283,7 +309,6 @@ def validate_extended(self,checksum_algo='sha256'): else: return False - # checking if the checksum is valid if checksum != all_bytes[-4:]: return False @@ -291,28 +316,40 @@ def validate_extended(self,checksum_algo='sha256'): return True @property - def network(self): + def network(self) -> str: """Return network derived from network version bytes.""" try: + # Use custom charset if provided + extras = self.request.extras.copy() + if self.request.currency.charset: + extras['charset'] = self.request.currency.charset abytes = base58check.b58decode( - self.request.address, **self.request.extras) + self.request.address, **extras) except ValueError: return '' nbyte = abytes[0] for name, networks in self.request.currency.networks.items(): - if nbyte in networks: - return name + if isinstance(networks, tuple): + if nbyte in networks: + return name + elif isinstance(networks, str): + if self.request.address.startswith(networks): + return name return '' @property - def address_type(self): + def address_type(self) -> str: """Return address type derived from network version bytes.""" if len(self.request.address) == 0: return '' try: + # Use custom charset if provided + extras = self.request.extras.copy() + if self.request.currency.charset: + extras['charset'] = self.request.currency.charset abytes = base58check.b58decode( - self.request.address, **self.request.extras) + self.request.address, **extras) except ValueError: return '' @@ -334,105 +371,89 @@ def address_type(self): @attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class DecredValidator(Base58CheckValidator): - """Validates Decred cryptocurrency addresses.""" - - name = 'DecredCheck' - - - def validate(self): - if len(self.request.address) == 111: - return self.validate_extended(checksum_algo='blake256') - - try: - decoded_address = base58check.b58decode(self.request.address) - except ValueError: - return False - - # decoded address has to be 26 bytes long - if len(decoded_address) != 26: - return False - - # original one has to start with D,T,S or R - if not self.request.address.startswith((b'D', b'T', b'S', b'R')): - return False +class EthereumValidator(ValidatorBase): + """Validates ethereum based cryptocurrency addresses.""" - expected_checksum = decoded_address[-4:] + name = 'Ethereum' + non_checksummed_patterns = ( + re.compile("^(0x)?[0-9a-f]{40}$"), re.compile("^(0x)?[0-9A-F]{40}$") + ) - version_bytes = int.from_bytes(decoded_address[:2],byteorder='big') + def validate(self) -> bool: + """Validate the address.""" + address = self.request.address - if self.network == '': - return False + # Remove '0x' prefix if present + if address.startswith('0x'): + address = address[2:] - checksum = blake256.blake_hash(blake256.blake_hash(decoded_address[:-4]))[:4] + # Check if it's a non-checksummed address + if any(bool(pat.match(address)) for pat in self.non_checksummed_patterns): + return True - # double blake256 checksum needs to be equal with the expected checksum - if checksum != expected_checksum: + # Ethereum address has to contain exactly 40 chars (20-bytes) + if len(address) != 40: return False + # Ethereum address is generated by keccak algorithm and has to be hexadecimal + k = keccak.new(digest_bits=256) + addr_hash = k.update(address.lower().encode('ascii')).hexdigest() + + # Check each character against the hash + for i, letter in enumerate(address): + if any([ + int(addr_hash[i], 16) >= 8 and letter.upper() != letter, + int(addr_hash[i], 16) < 8 and letter.lower() != letter + ]): + return False return True -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class CardanoValidator(Base58CheckValidator): - """Validates Cardano cryptocurrency addresses.""" - - name = 'CardanoCheck' - - - def validate(self): - try: - decoded_address = base58check.b58decode(self.request.address) - except ValueError: - return False - - - if self.network == '': - return False - - decoded_address = cbor.loads(decoded_address) - tagged_address = decoded_address[0] - expected_checksum = decoded_address[1] - checksum = crc32(tagged_address.value) - - if checksum != expected_checksum: - return False + def validate_extended(self) -> bool: + return False - return True + @property + def network(self) -> str: + """Return network derived from network version bytes.""" + return 'both' @attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) class EosValidator(ValidatorBase): """Validates EOS cryptocurrency addresses.""" name = 'EOS' - def validate(self): - if len(self.request.address) != 12: - return False - eos_pattern = re.compile('^[a-z]{1}[a-z1-5.]{10}[a-z1-5]{1}$') - if eos_pattern.match(self.request.address.decode('utf-8')) == None: + def validate(self) -> bool: + """Validate the address.""" + address = self.request.address + + # EOS addresses must be 12 characters long + if len(address) != 12: return False - return True - def validate_extended(self): + # EOS addresses must start with a letter and contain only a-z, 1-5, and . + eos_pattern = re.compile('^[a-z][a-z1-5.]{10}[a-z1-5]$') + return bool(eos_pattern.match(address)) + + def validate_extended(self) -> bool: return False @property - def network(self): + def network(self) -> str: return '' + @property + def address_type(self) -> str: + return 'address' + @attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) class StellarValidator(ValidatorBase): """Validates Stellar cryptocurrency addresses.""" name = 'Stellar' - def validate(self): + def validate(self) -> bool: try: decoded_address = base64.b32decode(self.request.address) except: @@ -452,386 +473,221 @@ def validate(self): return True - def validate_extended(self): + def validate_extended(self) -> bool: return False @property - def network(self): + def network(self) -> str: return '' @attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class TerraMoneyValidator(ValidatorBase): - """Validates Terra Money cryptocurrency addresses.""" - - name = 'TerraMoney' - - def validate(self): - - # Each address has to have 44 characters, first 5 are "terra" - if len(self.request.address) != 44: - return False - - if self.request.address[:5] != b'terra': - return False - - if not self.request.address.decode('utf-8').isalnum(): - return False - - return True - - def validate_extended(self): - return False - - @property - def network(self): - return '' - +class CosmosValidator(ValidatorBase): + """Validates Cosmos cryptocurrency addresses.""" -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class EthereumValidator(ValidatorBase): - """Validates ethereum based crytocurrency addresses.""" + name = 'CosmosCheck' + hrp_table = ("cosmos", "cosmospub", "cosmosvalcons", "cosmosvalconspub", "cosmosvaloper", "cosmosvaloperpub") - name = 'Ethereum' - non_checksummed_patterns = ( - re.compile("^(0x)?[0-9a-f]{40}$"), re.compile("^(0x)?[0-9A-F]{40}$") - ) + def validate(self) -> bool: + try: + address = self.request.address + decoded_address = bech32.bech32_decode(address) + if not decoded_address: + return False + + hrp, data = decoded_address + if not hrp or not data: + return False - def validate(self): - """Validate the address.""" - address = self.request.address.decode() + if hrp not in self.hrp_table: + return False - if any(bool(pat.match(address)) - for pat in self.non_checksummed_patterns): + # For Cosmos addresses, we only need to verify the HRP and that the data exists + # The bech32_decode function already verifies the checksum return True - - addr = address[2:] if address.startswith('0x') else address - - # Ethereum address has to contain exactly 40 chars (20-bytes) - if len(addr.encode('utf-8')) != 40: + except Exception: return False - # Ethereum address is generated by keccak algorithm and has to - # hexadecimal - k = keccak.new(digest_bits=256) - addr_hash = k.update(addr.lower().encode('ascii')).hexdigest() - for i, letter in enumerate(addr): - if any([ - int(addr_hash[i], 16) >= 8 and letter.upper() != letter, - int(addr_hash[i], 16) < 8 and letter.lower() != letter - ]): - return False - return True - - def validate_extended(self): + def validate_extended(self) -> bool: return False - #def validate(self): - # """Validate the address.""" - # address = self.request.address.decode() - # if any(bool(pat.match(address)) - # for pat in self.non_checksummed_patterns): - # return True - # addr = address.lstrip('0x') - # addr_hash = sha3.keccak_256(addr.lower().encode('ascii')).hexdigest() - # for i in range(0, len(addr)): - # if any([ - # int(addr_hash[i], 16) > 7 and addr[i].upper() != addr[i], - # int(addr_hash[i], 16) <= 7 and addr[i].lower() != addr[i] - # ]): - # return False - # return True + @property + def network(self) -> str: + return "" @property - def network(self): - """Return network derived from network version bytes.""" - return 'both' + def address_type(self) -> str: + try: + address = self.request.address + decoded_address = bech32.bech32_decode(address) + if not decoded_address: + return "" + + hrp, _ = decoded_address + if not hrp: + return "" + + if hrp not in self.hrp_table: + return "" + + return hrp + except Exception: + return "" @attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class BitcoinBasedValidator(ValidatorBase): - """Validates bitcoin based crytocurrency addresses.""" +class BitcoinBasedCheck(ValidatorBase): + """Validates Bitcoin-based cryptocurrency addresses.""" name = 'BitcoinBasedCheck' - @property - def base58_validator(self): - return Base58CheckValidator(self.request) - - @property - def bech32_validator(self): - return Bech32CheckValidator(self.request) + def validate(self) -> bool: + """Validate the address.""" + if len(self.request.address) == 111: + return self.validate_extended() - def validate(self): - base58_res = self.base58_validator.validate() - if base58_res: - return True + if 25 > len(self.request.address) > 35: + return False - bech32_res = self.bech32_validator.validate() - if bech32_res: - return True + try: + abytes = base58check.b58decode( + self.request.address, **self.request.extras) + except ValueError: + return False - return False + # Check network first + network = self.network + if network == '': + return False - def validate_extended(self): - base58_res = self.base58_validator.validate_extended() - if base58_res: - return True + # Calculate checksum + checksum = sha256(sha256(abytes[:-4]).digest()).digest()[:4] + if abytes[-4:] != checksum: + return False - bech32_res = self.bech32_validator.validate_extended() - if bech32_res: - return True + # Verify the address can be re-encoded correctly + try: + reencoded = base58check.b58encode(abytes, **self.request.extras) + return self.request.address == reencoded.decode('utf-8') + except Exception: + return False - return False + def validate_extended(self) -> bool: + if len(self.request.address) != 111: + return False - @property - def network(self): - base58_res = self.base58_validator.network - if base58_res: - return base58_res + if self.network == '': + return False - bech32_res = self.bech32_validator.network - return bech32_res + # strip leading "zeros" (the "1" digit with base58) + base58_stripped = self.request.address.lstrip("1") + # convert base58 to decimal + int_rep = 0 + for base58_digit in base58_stripped: + int_rep *= 58 + try: + int_rep += self.base58_digit_to_dec[base58_digit] + except KeyError: + # not a valid base58 digit -> invalid address + return False + # encode it to base64 + hex_rep = "{:X}".format(int_rep) + # if the length is odd, add leading zero (needed for b16decode) + if len(hex_rep) % 2 == 1: + hex_rep = "0" + hex_rep + # decode it into a binary string, padded with zeros + # 72 bytes (extended key size) + 4 bytes (prefix version bytes) + all_bytes = base64.b16decode(hex_rep).rjust(82, b"\0") -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidationRequest) -class ValidationRequest: - """Contain the data and helpers as an immutable request object.""" - - currency = attr.ib( - type=currency.Currency, - converter=currency.Currencies.get, - validator=[ - attr.validators.instance_of(currency.Currency), - provides(ICurrency) - ]) - address = attr.ib( - type=bytes, - converter=lambda a: a if isinstance(a, bytes) else a.encode('ascii'), - validator=attr.validators.instance_of(bytes)) + # count leading zeros + zero_count = next(zeros for zeros, byte in enumerate(all_bytes) if byte != 0) + # compare it with the number of leading zeros lstripped at the beginning + if len(self.request.address) - len(base58_stripped) != zero_count: + return False - @property - def extras(self): - """Extra arguments for passing to decoder, etc.""" - extras = dict() - if self.currency.charset: - extras.setdefault('charset', self.currency.charset) - return extras + checksum = sha256(sha256(all_bytes[:-4]).digest()).digest()[:4] + if checksum != all_bytes[-4:]: + return False - @property - def networks(self): - """Concatenated list of all version bytes for currency.""" - networks = tuple(self.currency.networks.values()) - return functools.reduce(operator.concat, networks) + return True @property - def address_types(self): - address_types = tuple(self.currency.address_types.values()) - return functools.reduce(operator.concat, address_types) - - def execute(self): - """Execute this request and return the result.""" - validator = Validators.get(self.currency.validator)(self) - - valid = False - network = '' - is_extended = False + def network(self) -> str: + """Return network derived from network version bytes.""" try: - valid = validator.validate() - network = validator.network - is_extended = validator.validate_extended() - except: - pass - - return ValidationResult( - name=self.currency.name, - ticker=self.currency.ticker, - address=self.address, - valid=valid, - network=network, - address_type=validator.address_type, - is_extended=is_extended - ) - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidationResult) -class ValidationResult: - """Contains an immutable representation of the validation result.""" - - name = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - ticker = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - address = attr.ib( - type=bytes, - validator=attr.validators.instance_of(bytes)) - valid = attr.ib( - type=bool, - validator=attr.validators.instance_of(bool)) - network = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) - is_extended = attr.ib( - type=bool, - validator=attr.validators.instance_of(bool)) - address_type = attr.ib( - type=str, - validator=attr.validators.instance_of(str)) + abytes = base58check.b58decode( + self.request.address, **self.request.extras) + except ValueError: + return '' - def __bool__(self): - return self.valid + nbyte = abytes[0] + for name, networks in self.request.currency.networks.items(): + if isinstance(networks, tuple): + # Check if the first byte matches any of the network values + if nbyte in networks: + return name + # For Litecoin, also check if the address starts with 'ltc' or 'tltc' + if name == 'main' and self.request.address.startswith('ltc'): + return name + if name == 'test' and self.request.address.startswith('tltc'): + return name + elif isinstance(networks, str): + if self.request.address.startswith(networks): + return name + return '' + @property + def address_type(self) -> str: + """Return address type derived from network version bytes.""" + if len(self.request.address) == 0: + return '' + try: + abytes = base58check.b58decode( + self.request.address, **self.request.extras) + except ValueError: + return '' -def validate(currency_name, address, default_valid=True): - """Validate the given address according to currency type. + for name, networks in self.request.currency.address_types.items(): + for netw in networks: + if netw != 0: + # count the prefix length in bytes + prefixlen = math.ceil(math.floor((math.log(netw) / math.log(2)) + 1) / 8) + else: + prefixlen = 1 + address_prefix = [x for x in bytearray(abytes[:prefixlen])] + if prefixtodec(address_prefix) == netw: + return name - This is the main entrypoint for using this library. + if len(self.request.currency.address_types.items()) == 0: + return 'address' + else: + return '' - :param currency_name str: The name or ticker code of the cryptocurrency. - :param address (bytes, str): The crytocurrency address to validate. - :param default_valid (bool): The default value for validation if network does not supported. - :return: a populated ValidationResult object - :rtype: :inst:`ValidationResult` - Usage:: +def validate(currency: str, address: str, **extras: Any) -> ValidationResult: + """Validate a cryptocurrency address. - >>> import coinaddr - >>> coinaddr.validate('btc', b'1BoatSLRHtKNngkdXEeobR76b53LETtpyT') - ValidationResult(name='bitcoin', ticker='btc', - ... address=b'1BoatSLRHtKNngkdXEeobR76b53LETtpyT', - ... valid=True, network='main') + Args: + currency: The currency name or ticker to validate against + address: The address to validate + **extras: Any extra attributes to be passed to decoder, etc + Returns: + A ValidationResult object containing the validation results """ - currency_name = currency_name.lower() - tickers = [currency.Currencies.instances[curr].ticker for curr in currency.Currencies.instances] - currencies = [currency.Currencies.instances[curr].name for curr in currency.Currencies.instances] - - if currency_name in tickers or currency_name in currencies: - request = ValidationRequest(currency_name, address) - return request.execute() - else: - return ValidationResult( - name='', - ticker=currency_name, - address=address if isinstance(address, bytes) else address.encode('utf-8'), - valid=default_valid, - network='', - address_type='address', - is_extended=False - ) + request = ValidationRequest( + currency=currency, + address=address, + extras=extras) + return request.execute() + def prefixtodec(prefix): total = 0 multiplier = 256 - for i in range(2,len(prefix)+1): - total += prefix[-i]*multiplier + for i in range(2, len(prefix) + 1): + total += prefix[-i] * multiplier multiplier *= 256 - return total+prefix[-1] - - -@attr.s(frozen=True, slots=True, auto_attribs=True) -class SS58Address: - format: int - length: int - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class SS58Validator(ValidatorBase): - - name = 'SS58Check' - valid_ss58_format = None - - def validate(self): - try: - self._ss58_decode(self.request.address, valid_ss58_format=self.valid_ss58_format) - except ValueError: - return False - else: - return True - - @staticmethod - def _decode_ss58_address_format(address: bytes, valid_ss58_format: Optional[int]) -> SS58Address: - if address[0] & 0b0100_0000: - format_length = 2 - ss58_format = ((address[0] & 0b0011_1111) << 2) | (address[1] >> 6) | \ - ((address[1] & 0b0011_1111) << 8) - else: - format_length = 1 - ss58_format = address[0] - - if ss58_format in [46, 47]: - raise ValueError(f"{ss58_format} is a reserved SS58 format") - - if valid_ss58_format is not None and ss58_format != valid_ss58_format: - raise ValueError("Invalid SS58 format") - - return SS58Address(format=ss58_format, length=format_length) - - @staticmethod - def _get_checksum_length(decoded_base58_len: int, ss58_address: SS58Address) -> int: - if decoded_base58_len in (3, 4, 6, 10): - return 1 - elif decoded_base58_len in (5, 7, 11, 34 + ss58_address.length, 35 + ss58_address.length): - return 2 - elif decoded_base58_len in (8, 12): - return 3 - elif decoded_base58_len in (9, 13): - return 4 - elif decoded_base58_len == 14: - return 5 - elif decoded_base58_len == 15: - return 6 - elif decoded_base58_len == 16: - return 7 - elif decoded_base58_len == 17: - return 8 - else: - raise ValueError("Invalid address length") - - # https://github.com/paritytech/substrate/wiki/External-Address-Format-(SS58) - def _ss58_decode(self, address: bytes, valid_ss58_format: Optional[int] = None) -> str: - decoded_base58 = base58check.b58decode(address) - - ss58_address = self._decode_ss58_address_format(decoded_base58, valid_ss58_format) - - # Determine checksum length according to length of address string - checksum_length = self._get_checksum_length(len(decoded_base58), ss58_address) - - checksum = blake2b(b'SS58PRE' + decoded_base58[:-checksum_length]).digest() - - if checksum[0:checksum_length] != decoded_base58[-checksum_length:]: - raise ValueError("Invalid checksum") - - return decoded_base58[ss58_address.length:len(decoded_base58) - checksum_length].hex() - - def validate_extended(self): - return True - - @property - def network(self): - return '' - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class PolkadotValidator(SS58Validator): - - name = 'PolkadotCheck' - valid_ss58_format = 0 - - -@attr.s(frozen=True, slots=True, cmp=False) -@implementer(IValidator) -class KusamaValidator(SS58Validator): - - name = 'KusamaCheck' - valid_ss58_format = 2 + return total + prefix[-1] diff --git a/requirements.txt b/requirements.txt index b9d4efa..bd9cdc2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,12 @@ -zope attrs>=24.0.0 -pycryptodome>=3.6.1 base58check>=1.0.1 -zope.interface>=4.4.3 -bech32 -cbor -blake256 -groestlcoin_hash2 +bech32>=1.1.0 +blake256>=0.1.1 +cbor>=1.0.0 +cryptography>=2.1.4 +groestlcoin-hash2>=0.1.0 +pycryptodome>=3.6.1 + + + +