diff --git a/hwilib/devices/ckcc/README.md b/hwilib/devices/ckcc/README.md index 8db249b74..0d99aea3d 100644 --- a/hwilib/devices/ckcc/README.md +++ b/hwilib/devices/ckcc/README.md @@ -2,7 +2,7 @@ This is a stripped down and modified version of the official [ckcc-protocol](https://github.com/Coldcard/ckcc-protocol) library. -This stripped down version was made at commit [ca8d2b7808784a9f4927f3250bf52d2623a4e15b](https://github.com/Coldcard/ckcc-protocol/tree/ca8d2b7808784a9f4927f3250bf52d2623a4e15b). +This stripped down version was made at commit [f87d30f220cb6334eb3c4ace93c1b62e04942022](https://github.com/Coldcard/ckcc-protocol/commit/f87d30f220cb6334eb3c4ace93c1b62e04942022). ## Changes diff --git a/hwilib/devices/ckcc/__init__.py b/hwilib/devices/ckcc/__init__.py index b2f0b70ea..91f9ba615 100644 --- a/hwilib/devices/ckcc/__init__.py +++ b/hwilib/devices/ckcc/__init__.py @@ -1,6 +1,4 @@ -__version__ = '1.0.2' - -__all__ = [ "client", "protocol", "constants" ] - +__version__ = '1.4.0' +__all__ = [ "client", "protocol", "constants" ] \ No newline at end of file diff --git a/hwilib/devices/ckcc/client.py b/hwilib/devices/ckcc/client.py index 1f265fb25..ed4065bfb 100644 --- a/hwilib/devices/ckcc/client.py +++ b/hwilib/devices/ckcc/client.py @@ -8,27 +8,26 @@ # # - ec_mult, ec_setup, aes_setup, mitm_verify # -import hid, sys, os, platform -from binascii import b2a_hex, a2b_hex +import hid, os, socket, atexit +from binascii import b2a_hex from hashlib import sha256 -from .protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError, MAX_MSG_LEN, MAX_BLK_LEN +from .constants import USB_NCRY_V1, USB_NCRY_V2 +from .protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError, MAX_MSG_LEN from .utils import decode_xpub, get_pubkey_string # unofficial, unpermissioned... USB numbers COINKITE_VID = 0xd13e CKCC_PID = 0xcc10 -# Unix domain socket used by the simulator -CKCC_SIMULATOR_PATH = '/tmp/ckcc-simulator.sock' +DEFAULT_SIM_SOCKET = "/tmp/ckcc-simulator.sock" + class ColdcardDevice: - def __init__(self, sn=None, dev=None, encrypt=True): + def __init__(self, sn=None, dev=None, encrypt=True, ncry_ver=USB_NCRY_V1, is_simulator=False): # Establish connection via USB (HID) or Unix Pipe - self.is_simulator = False + self.is_simulator = is_simulator - if not dev and sn and '/' in sn: - if platform.system() == 'Windows': - raise RuntimeError("Cannot connect to simulator. Is it running?") + if not dev and ((sn and ('/' in sn)) or self.is_simulator): dev = UnixSimulatorPipe(sn) found = 'simulator' self.is_simulator = True @@ -49,7 +48,7 @@ def __init__(self, sn=None, dev=None, encrypt=True): break if not dev: - raise KeyError("Could not find Coldcard!" + raise KeyError("Could not find Coldcard!" if not sn else ('Cannot find CC with serial: '+sn)) else: found = dev.get_serial_number_string() @@ -58,6 +57,7 @@ def __init__(self, sn=None, dev=None, encrypt=True): self.serial = found # they will be defined after we've established a shared secret w/ device + self.ncry_ver = ncry_ver self.session_key = None self.encrypt_request = None self.decrypt_response = None @@ -67,7 +67,7 @@ def __init__(self, sn=None, dev=None, encrypt=True): self.resync() if encrypt: - self.start_encryption() + self.start_encryption(version=self.ncry_ver) def close(self): # close underlying HID device @@ -101,17 +101,21 @@ def send_recv(self, msg, expect_errors=False, verbose=0, timeout=3000, encrypt=T # first byte of each 64-byte packet encodes length or packet-offset assert 4 <= len(msg) <= MAX_MSG_LEN, "msg length: %d" % len(msg) - if not self.encrypt_request: + if self.encrypt_request is None: # disable encryption if not already enabled for this connection encrypt = False + if self.encrypt_request and self.ncry_ver == USB_NCRY_V2: + # ncry version 2 - everything needs to be encrypted + encrypt = True + if encrypt: msg = self.encrypt_request(msg) left = len(msg) offset = 0 while left > 0: - # Note: first byte always zero (HID report number), + # Note: first byte always zero (HID report number), # [1] is framing header (length+flags) # [2:65] payload (63 bytes, perhaps including padding) here = min(63, left) @@ -224,7 +228,7 @@ def aes_setup(self, session_key): self.encrypt_request = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).encrypt self.decrypt_response = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).decrypt - def start_encryption(self): + def start_encryption(self, version=USB_NCRY_V1): # setup encryption on the link # - pick our own key pair, IV for AES # - send IV and pubkey to device @@ -233,10 +237,12 @@ def start_encryption(self): pubkey = self.ec_setup() - msg = CCProtocolPacker.encrypt_start(pubkey) + msg = CCProtocolPacker.encrypt_start(pubkey, version=version) his_pubkey, fingerprint, xpub = self.send_recv(msg, encrypt=False) + self.ncry_ver = version + self.session_key = self.ec_mult(his_pubkey) # capture some public details of remote side's master key @@ -248,7 +254,6 @@ def start_encryption(self): self.aes_setup(self.session_key) def mitm_verify(self, sig, expected_xpub): - # If Pycoin is not available, do it using ecdsa from ecdsa import BadSignatureError, SECP256k1, VerifyingKey # of the returned (pubkey, chaincode) tuple, chaincode is not used pubkey, _ = decode_xpub(expected_xpub) @@ -318,42 +323,66 @@ def download_file(self, length, checksum, blksize=1024, file_number=1): return data - def hash_password(self, text_password): + def hash_password(self, text_password, v3=False): # Turn text password into a key for use in HSM auth protocol + # - changed from pbkdf2_hmac_sha256 to pbkdf2_hmac_sha512 in version 4 of CC firmware from hashlib import pbkdf2_hmac, sha256 from .constants import PBKDF2_ITER_COUNT salt = sha256(b'pepper' + self.serial.encode('ascii')).digest() - return pbkdf2_hmac('sha256', text_password, salt, PBKDF2_ITER_COUNT) + return pbkdf2_hmac('sha256' if v3 else 'sha512', text_password, salt, PBKDF2_ITER_COUNT)[:32] + + def firmware_version(self): + return self.send_recv(CCProtocolPacker.version()).split("\n") + + def is_edge(self): + # returns True if device is running EDGE firmware version + if self.is_simulator: + cmd = "import version; RV.write(str(int(getattr(version, 'is_edge', 0))))" + rv = self.send_recv(b'EXEC' + cmd.encode('utf-8'), timeout=60000, encrypt=False) + return rv == b"1" + + return self.firmware_version()[1][-1] == "X" class UnixSimulatorPipe: # Use a UNIX pipe to the simulator instead of a real USB connection. # - emulates the API of hidapi device object. - def __init__(self, path): - import socket, atexit + def __init__(self, socket_path=None): + self.socket_path = socket_path or DEFAULT_SIM_SOCKET self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) try: - self.pipe.connect(path) + self.pipe.connect(self.socket_path) except Exception: self.close() raise RuntimeError("Cannot connect to simulator. Is it running?") - instance = 0 - while instance < 10: - pn = '/tmp/ckcc-client-%d-%d.sock' % (os.getpid(), instance) + last_err = None + for instance in range(5): + # if simulator has PID in socket path, client will have matching, or empty + pn = '/tmp/ckcc-client%s-%d-%d.sock' % (self.get_sim_pid(), os.getpid(), instance) try: self.pipe.bind(pn) # just needs any name break - except OSError: - instance += 1 + except OSError as err: + last_err = err + if os.path.exists(pn): + os.remove(pn) continue + else: + raise last_err # raise whatever was raised last in the loop self.pipe_name = pn atexit.register(self.close) + def get_sim_pid(self): + # return str PID if any in socket_path + if self.socket_path == DEFAULT_SIM_SOCKET: + return "" + return "-" + self.socket_path.split(".")[0].split("-")[-1] + def read(self, max_count, timeout_ms=None): import socket if not timeout_ms: @@ -383,7 +412,7 @@ def close(self): pass def get_serial_number_string(self): - return 'simulator' + return 'F1'*6 -# EOF +# EOF \ No newline at end of file diff --git a/hwilib/devices/ckcc/constants.py b/hwilib/devices/ckcc/constants.py index ebbd93770..2218011c2 100644 --- a/hwilib/devices/ckcc/constants.py +++ b/hwilib/devices/ckcc/constants.py @@ -6,6 +6,22 @@ except ImportError: const = int +# USB encryption versions (default USB_NCRY_V1) +# +# This introduces a new ncry version to close a potential attack vector: +# +# A malicious program may re-initialize the connection encryption by sending the ncry command a second time during USB operation. +# This may prove particularly harmful in HSM mode. +# +# Sending version USB_NCRY_V2 changes the behavior in two ways: +# * All future commands must be encrypted +# * Returns an error if the ncry command is sent again for the duration of the power cycle +# +# USB_NCRY_V2 is most suitable for HSM mode as in case of any communication issue or simply by closing `ColdcardDevice` +# Coldcard will need to reboot to recover USB operation if USB_NCRY_V2. +USB_NCRY_V1 = const(0x01) +USB_NCRY_V2 = const(0x02) + # For upload/download this is the max size of the data block. MAX_BLK_LEN = const(2048) @@ -17,17 +33,29 @@ # - the max on the wire for mainnet is 100k # - but a PSBT might contain a full txn for each input MAX_TXN_LEN = const(384*1024) +MAX_TXN_LEN_MK4 = const(2*1024*1024) # Max size of any upload (firmware.dfu files in particular) MAX_UPLOAD_LEN = const(2*MAX_TXN_LEN) +MAX_UPLOAD_LEN_MK4 = const(2*MAX_TXN_LEN_MK4) # Max length of text messages for signing MSG_SIGNING_MAX_LENGTH = const(240) +# Bitcoin limitation: max number of signatures in P2SH redeem script (non-segwit) +# - 520 byte redeem script limit <= 15*34 bytes per pubkey == 510 bytes +# - serializations of M/N in redeem scripts assume this range +MAX_SIGNERS = const(15) +# taproot artificial multisig limit +MAX_TR_SIGNERS = const(34) + +TAPROOT_LEAF_MASK = 0xfe +TAPROOT_LEAF_TAPSCRIPT = 0xc0 + # Types of user auth we support USER_AUTH_TOTP = const(1) # RFC6238 USER_AUTH_HOTP = const(2) # RFC4226 -USER_AUTH_HMAC = const(3) # PBKDF2('hmac-sha256', secret, sha256(psbt), PBKDF2_ITER_COUNT) +USER_AUTH_HMAC = const(3) # PBKDF2('hmac-sha512', scrt, sha256(psbt), PBKDF2_ITER_COUNT)[:32] USER_AUTH_SHOW_QR = const(0x80) # show secret on Coldcard screen (best for TOTP enroll) MAX_USERNAME_LEN = 16 @@ -48,6 +76,7 @@ AFC_BECH32 = const(0x04) # just how we're encoding it? AFC_SCRIPT = const(0x08) # paying into a script AFC_WRAPPED = const(0x10) # for transition/compat types for segwit vs. old +AFC_BECH32M = const(0x20) # no difference between script/key path in taproot # Numeric codes for specific address types AF_CLASSIC = AFC_PUBKEY # 1addr @@ -56,11 +85,13 @@ AF_P2WSH = AFC_SCRIPT | AFC_SEGWIT | AFC_BECH32 # segwit multisig AF_P2WPKH_P2SH = AFC_WRAPPED | AFC_PUBKEY | AFC_SEGWIT # looks classic P2SH, but p2wpkh inside AF_P2WSH_P2SH = AFC_WRAPPED | AFC_SCRIPT | AFC_SEGWIT # looks classic P2SH, segwit multisig +AF_P2TR = AFC_PUBKEY | AFC_SEGWIT | AFC_BECH32M # bc1p SUPPORTED_ADDR_FORMATS = frozenset([ AF_CLASSIC, AF_P2SH, AF_P2WPKH, + AF_P2TR, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH, @@ -68,21 +99,66 @@ # BIP-174 aka PSBT defined values # -PSBT_GLOBAL_UNSIGNED_TX = const(0) -PSBT_GLOBAL_XPUB = const(1) - -PSBT_IN_NON_WITNESS_UTXO = const(0) -PSBT_IN_WITNESS_UTXO = const(1) -PSBT_IN_PARTIAL_SIG = const(2) -PSBT_IN_SIGHASH_TYPE = const(3) -PSBT_IN_REDEEM_SCRIPT = const(4) -PSBT_IN_WITNESS_SCRIPT = const(5) -PSBT_IN_BIP32_DERIVATION = const(6) -PSBT_IN_FINAL_SCRIPTSIG = const(7) -PSBT_IN_FINAL_SCRIPTWITNESS = const(8) - -PSBT_OUT_REDEEM_SCRIPT = const(0) -PSBT_OUT_WITNESS_SCRIPT = const(1) -PSBT_OUT_BIP32_DERIVATION = const(2) - -# EOF +# GLOBAL === +PSBT_GLOBAL_UNSIGNED_TX = const(0x00) +PSBT_GLOBAL_XPUB = const(0x01) +PSBT_GLOBAL_VERSION = const(0xfb) +PSBT_GLOBAL_PROPRIETARY = const(0xfc) +# BIP-370 +PSBT_GLOBAL_TX_VERSION = const(0x02) +PSBT_GLOBAL_FALLBACK_LOCKTIME = const(0x03) +PSBT_GLOBAL_INPUT_COUNT = const(0x04) +PSBT_GLOBAL_OUTPUT_COUNT = const(0x05) +PSBT_GLOBAL_TX_MODIFIABLE = const(0x06) + +# INPUTS === +PSBT_IN_NON_WITNESS_UTXO = const(0x00) +PSBT_IN_WITNESS_UTXO = const(0x01) +PSBT_IN_PARTIAL_SIG = const(0x02) +PSBT_IN_SIGHASH_TYPE = const(0x03) +PSBT_IN_REDEEM_SCRIPT = const(0x04) +PSBT_IN_WITNESS_SCRIPT = const(0x05) +PSBT_IN_BIP32_DERIVATION = const(0x06) +PSBT_IN_FINAL_SCRIPTSIG = const(0x07) +PSBT_IN_FINAL_SCRIPTWITNESS = const(0x08) +PSBT_IN_POR_COMMITMENT = const(0x09) +PSBT_IN_RIPEMD160 = const(0x0a) +PSBT_IN_SHA256 = const(0x0b) +PSBT_IN_HASH160 = const(0x0c) +PSBT_IN_HASH256 = const(0x0d) +# BIP-370 +PSBT_IN_PREVIOUS_TXID = const(0x0e) +PSBT_IN_OUTPUT_INDEX = const(0x0f) +PSBT_IN_SEQUENCE = const(0x10) +PSBT_IN_REQUIRED_TIME_LOCKTIME = const(0x11) +PSBT_IN_REQUIRED_HEIGHT_LOCKTIME = const(0x12) +# BIP-371 +PSBT_IN_TAP_KEY_SIG = const(0x13) +PSBT_IN_TAP_SCRIPT_SIG = const(0x14) +PSBT_IN_TAP_LEAF_SCRIPT = const(0x15) +PSBT_IN_TAP_BIP32_DERIVATION = const(0x16) +PSBT_IN_TAP_INTERNAL_KEY = const(0x17) +PSBT_IN_TAP_MERKLE_ROOT = const(0x18) + +# OUTPUTS === +PSBT_OUT_REDEEM_SCRIPT = const(0x00) +PSBT_OUT_WITNESS_SCRIPT = const(0x01) +PSBT_OUT_BIP32_DERIVATION = const(0x02) +# BIP-370 +PSBT_OUT_AMOUNT = const(0x03) +PSBT_OUT_SCRIPT = const(0x04) +# BIP-371 +PSBT_OUT_TAP_INTERNAL_KEY = const(0x05) +PSBT_OUT_TAP_TREE = const(0x06) +PSBT_OUT_TAP_BIP32_DERIVATION = const(0x07) + +RFC_SIGNATURE_TEMPLATE = '''\ +-----BEGIN BITCOIN SIGNED MESSAGE----- +{msg} +-----BEGIN BITCOIN SIGNATURE----- +{addr} +{sig} +-----END BITCOIN SIGNATURE----- +''' + +# EOF \ No newline at end of file diff --git a/hwilib/devices/ckcc/protocol.py b/hwilib/devices/ckcc/protocol.py index 92431910e..54c744e6d 100644 --- a/hwilib/devices/ckcc/protocol.py +++ b/hwilib/devices/ckcc/protocol.py @@ -63,11 +63,15 @@ def check_mitm(): @staticmethod def start_backup(): - # prompts user with password for encrytped backup + # prompts user with password for encrypted backup return b'back' @staticmethod - def encrypt_start(device_pubkey, version=0x1): + def encrypt_start(device_pubkey, version=USB_NCRY_V1): + supported_versions = [USB_NCRY_V1, USB_NCRY_V2] + if version not in supported_versions: + raise ValueError("Unsupported USB encryption version. " + "Supported versions: %s" % (supported_versions)) assert len(device_pubkey) == 64, "want uncompressed 64-byte pubkey, no prefix byte" return pack('<4sI64s', b'ncry', version, device_pubkey) @@ -120,6 +124,36 @@ def multisig_enroll(length, file_sha): assert len(file_sha) == 32 return pack('<4sI32s', b'enrl', length, file_sha) + @staticmethod + def miniscript_ls(): + # list registered miniscript wallet names + return b'msls' + + @staticmethod + def miniscript_delete(name): + # delete registered miniscript wallet by name + assert 2 <= len(name) <= 40, "name len" + return b'msdl' + name.encode('ascii') + + @staticmethod + def miniscript_get(name): + # get registered miniscript wallet object by name + assert 2 <= len(name) <= 40, "name len" + return b'msgt' + name.encode('ascii') + + @staticmethod + def miniscript_address(name, change=False, idx=0): + # get miniscript address from internal or external chain by id + assert 2 <= len(name) <= 40, "name len" + assert 0 <= idx < (2**31), "child idx" + return pack('<4sII', b'msas', int(change), idx) + name.encode('ascii') + + @staticmethod + def miniscript_enroll(length, file_sha): + # miniscript details must already be uploaded as a text file, this starts approval process. + assert len(file_sha) == 32 + return pack('<4sI32s', b'mins', length, file_sha) + @staticmethod def multisig_check(M, N, xfp_xor): # do we have a wallet already that matches M+N and xor(*xfps)? @@ -140,7 +174,7 @@ def show_address(subpath, addr_fmt=AF_CLASSIC): @staticmethod def show_p2sh_address(M, xfp_paths, witdeem_script, addr_fmt=AF_P2SH): # For multisig (aka) P2SH cases, you will need all the info required to build - # the redeem script, and the Coldcard must already have been enrolled + # the redeem script, and the Coldcard must already have been enrolled # into the wallet. # - redeem script must be provided # - full subkey paths for each involved key is required in a list of lists of ints, where @@ -224,76 +258,89 @@ class CCProtocolUnpacker: # - given full rx message to work from # - this is done after un-framing - @classmethod - def decode(cls, msg): + @staticmethod + def decode(msg): assert len(msg) >= 4 sign = str(msg[0:4], 'utf8', 'ignore') - d = getattr(cls, sign, cls) - if d is cls: + d = getattr(CCProtocolUnpacker, sign, None) + if d is None: raise CCFramingError('Unknown response signature: ' + repr(sign)) return d(msg) - # struct info for each response - + + @staticmethod def okay(msg): # trivial response, w/ no content assert len(msg) == 4 return None # low-level errors + @staticmethod def fram(msg): - raise CCFramingError("Framing Error", str(msg[4:], 'utf8')) + raise CCFramingError("Framing Error: " + str(msg[4:], 'utf8')) + + @staticmethod def err_(msg): raise CCProtoError("Coldcard Error: " + str(msg[4:], 'utf8', 'ignore'), msg[4:]) + @staticmethod def refu(msg): # user didn't want to approve something raise CCUserRefused() + @staticmethod def busy(msg): # user didn't want to approve something raise CCBusyError() + @staticmethod def biny(msg): # binary string: length implied by msg framing return msg[4:] + @staticmethod def int1(msg): return unpack_from('= 15 assert len(psbt_sha) == 32 - digest = hmac.new(key, psbt_sha, sha256).digest() + digest = hmac.new(key, psbt_sha, hashlib.sha256).digest() num = struct.unpack('>I', digest[-4:])[0] & 0x7fffffff return '%06d' % (num % 1000000) -# EOF +# EOF \ No newline at end of file diff --git a/hwilib/devices/coldcard.py b/hwilib/devices/coldcard.py index e64f3619c..831fcc01c 100644 --- a/hwilib/devices/coldcard.py +++ b/hwilib/devices/coldcard.py @@ -100,6 +100,23 @@ def __init__(self, path: str, password: Optional[str] = None, expert: bool = Fal device.open_path(path.encode()) self.device = ColdcardDevice(dev=device) + self._is_edge = None + + @property + def is_edge(self): + """ + Cached property, no need to ask device more than once + :return: bool + """ + if self._is_edge is None: + try: + self._is_edge = self.device.is_edge() + except: + # silent fail, normal firmware is implied + pass + + return self._is_edge + @coldcard_exception def get_pubkey_at_path(self, path: str) -> ExtendedKey: self.device.check_mitm() @@ -132,14 +149,15 @@ def sign_tx(self, tx: PSBT) -> PSBT: # For multisigs, we may need to do multiple passes if we appear in an input multiple times passes = 1 - for psbt_in in tx.inputs: - our_keys = 0 - for key in psbt_in.hd_keypaths.keys(): - keypath = psbt_in.hd_keypaths[key] - if keypath.fingerprint == master_fp and key not in psbt_in.partial_sigs: - our_keys += 1 - if our_keys > passes: - passes = our_keys + if not self.is_edge: + for psbt_in in tx.inputs: + our_keys = 0 + for key in psbt_in.hd_keypaths.keys(): + keypath = psbt_in.hd_keypaths[key] + if keypath.fingerprint == master_fp and key not in psbt_in.partial_sigs: + our_keys += 1 + if our_keys > passes: + passes = our_keys for _ in range(passes): # Get psbt in hex and then make binary @@ -392,11 +410,10 @@ def toggle_passphrase(self) -> bool: def can_sign_taproot(self) -> bool: """ - The Coldard does not support Taproot yet. - - :returns: False, always + Only COLDCARD EDGE support taproot. + :returns: Whether Taproot is supported """ - return False + return self.is_edge def enumerate(password: Optional[str] = None, expert: bool = False, chain: Chain = Chain.MAIN, allow_emulators: bool = True) -> List[Dict[str, Any]]: @@ -422,6 +439,9 @@ def enumerate(password: Optional[str] = None, expert: bool = False, chain: Chain with handle_errors(common_err_msgs["enumerate"], d_data): try: client = ColdcardClient(path) + if client.is_edge: + d_data['label'] = 'edge' + d_data['model'] = 'edge_' + d_data['model'] d_data['fingerprint'] = client.get_master_fingerprint().hex() except RuntimeError as e: # Skip the simulator if it's not there