8
8
#
9
9
# - ec_mult, ec_setup, aes_setup, mitm_verify
10
10
#
11
- import hid , sys , os , platform
12
- from binascii import b2a_hex , a2b_hex
11
+ import hid , os , socket , atexit
12
+ from binascii import b2a_hex
13
13
from hashlib import sha256
14
- from .protocol import CCProtocolPacker , CCProtocolUnpacker , CCProtoError , MAX_MSG_LEN , MAX_BLK_LEN
14
+ from .constants import USB_NCRY_V1 , USB_NCRY_V2
15
+ from .protocol import CCProtocolPacker , CCProtocolUnpacker , CCProtoError , MAX_MSG_LEN
15
16
from .utils import decode_xpub , get_pubkey_string
16
17
17
18
# unofficial, unpermissioned... USB numbers
18
19
COINKITE_VID = 0xd13e
19
20
CKCC_PID = 0xcc10
20
21
21
- # Unix domain socket used by the simulator
22
- CKCC_SIMULATOR_PATH = '/tmp/ckcc-simulator.sock'
22
+ DEFAULT_SIM_SOCKET = "/tmp/ckcc- simulator.sock"
23
+
23
24
24
25
class ColdcardDevice :
25
- def __init__ (self , sn = None , dev = None , encrypt = True ):
26
+ def __init__ (self , sn = None , dev = None , encrypt = True , ncry_ver = USB_NCRY_V1 , is_simulator = False ):
26
27
# Establish connection via USB (HID) or Unix Pipe
27
- self .is_simulator = False
28
+ self .is_simulator = is_simulator
28
29
29
- if not dev and sn and '/' in sn :
30
- if platform .system () == 'Windows' :
31
- raise RuntimeError ("Cannot connect to simulator. Is it running?" )
30
+ if not dev and ((sn and ('/' in sn )) or self .is_simulator ):
32
31
dev = UnixSimulatorPipe (sn )
33
32
found = 'simulator'
34
33
self .is_simulator = True
@@ -49,7 +48,7 @@ def __init__(self, sn=None, dev=None, encrypt=True):
49
48
break
50
49
51
50
if not dev :
52
- raise KeyError ("Could not find Coldcard!"
51
+ raise KeyError ("Could not find Coldcard!"
53
52
if not sn else ('Cannot find CC with serial: ' + sn ))
54
53
else :
55
54
found = dev .get_serial_number_string ()
@@ -58,6 +57,7 @@ def __init__(self, sn=None, dev=None, encrypt=True):
58
57
self .serial = found
59
58
60
59
# they will be defined after we've established a shared secret w/ device
60
+ self .ncry_ver = ncry_ver
61
61
self .session_key = None
62
62
self .encrypt_request = None
63
63
self .decrypt_response = None
@@ -67,7 +67,7 @@ def __init__(self, sn=None, dev=None, encrypt=True):
67
67
self .resync ()
68
68
69
69
if encrypt :
70
- self .start_encryption ()
70
+ self .start_encryption (version = self . ncry_ver )
71
71
72
72
def close (self ):
73
73
# close underlying HID device
@@ -101,17 +101,21 @@ def send_recv(self, msg, expect_errors=False, verbose=0, timeout=3000, encrypt=T
101
101
# first byte of each 64-byte packet encodes length or packet-offset
102
102
assert 4 <= len (msg ) <= MAX_MSG_LEN , "msg length: %d" % len (msg )
103
103
104
- if not self .encrypt_request :
104
+ if self .encrypt_request is None :
105
105
# disable encryption if not already enabled for this connection
106
106
encrypt = False
107
107
108
+ if self .encrypt_request and self .ncry_ver == USB_NCRY_V2 :
109
+ # ncry version 2 - everything needs to be encrypted
110
+ encrypt = True
111
+
108
112
if encrypt :
109
113
msg = self .encrypt_request (msg )
110
114
111
115
left = len (msg )
112
116
offset = 0
113
117
while left > 0 :
114
- # Note: first byte always zero (HID report number),
118
+ # Note: first byte always zero (HID report number),
115
119
# [1] is framing header (length+flags)
116
120
# [2:65] payload (63 bytes, perhaps including padding)
117
121
here = min (63 , left )
@@ -224,7 +228,7 @@ def aes_setup(self, session_key):
224
228
self .encrypt_request = pyaes .AESModeOfOperationCTR (session_key , pyaes .Counter (0 )).encrypt
225
229
self .decrypt_response = pyaes .AESModeOfOperationCTR (session_key , pyaes .Counter (0 )).decrypt
226
230
227
- def start_encryption (self ):
231
+ def start_encryption (self , version = USB_NCRY_V1 ):
228
232
# setup encryption on the link
229
233
# - pick our own key pair, IV for AES
230
234
# - send IV and pubkey to device
@@ -233,10 +237,12 @@ def start_encryption(self):
233
237
234
238
pubkey = self .ec_setup ()
235
239
236
- msg = CCProtocolPacker .encrypt_start (pubkey )
240
+ msg = CCProtocolPacker .encrypt_start (pubkey , version = version )
237
241
238
242
his_pubkey , fingerprint , xpub = self .send_recv (msg , encrypt = False )
239
243
244
+ self .ncry_ver = version
245
+
240
246
self .session_key = self .ec_mult (his_pubkey )
241
247
242
248
# capture some public details of remote side's master key
@@ -248,7 +254,6 @@ def start_encryption(self):
248
254
self .aes_setup (self .session_key )
249
255
250
256
def mitm_verify (self , sig , expected_xpub ):
251
- # If Pycoin is not available, do it using ecdsa
252
257
from ecdsa import BadSignatureError , SECP256k1 , VerifyingKey
253
258
# of the returned (pubkey, chaincode) tuple, chaincode is not used
254
259
pubkey , _ = decode_xpub (expected_xpub )
@@ -318,42 +323,54 @@ def download_file(self, length, checksum, blksize=1024, file_number=1):
318
323
319
324
return data
320
325
321
- def hash_password (self , text_password ):
326
+ def hash_password (self , text_password , v3 = False ):
322
327
# Turn text password into a key for use in HSM auth protocol
328
+ # - changed from pbkdf2_hmac_sha256 to pbkdf2_hmac_sha512 in version 4 of CC firmware
323
329
from hashlib import pbkdf2_hmac , sha256
324
330
from .constants import PBKDF2_ITER_COUNT
325
331
326
332
salt = sha256 (b'pepper' + self .serial .encode ('ascii' )).digest ()
327
333
328
- return pbkdf2_hmac ('sha256' , text_password , salt , PBKDF2_ITER_COUNT )
334
+ return pbkdf2_hmac ('sha256' if v3 else 'sha512' , text_password , salt , PBKDF2_ITER_COUNT )[: 32 ]
329
335
330
336
331
337
class UnixSimulatorPipe :
332
338
# Use a UNIX pipe to the simulator instead of a real USB connection.
333
339
# - emulates the API of hidapi device object.
334
340
335
- def __init__ (self , path ):
336
- import socket , atexit
341
+ def __init__ (self , socket_path = None ):
342
+ self . socket_path = socket_path or DEFAULT_SIM_SOCKET
337
343
self .pipe = socket .socket (socket .AF_UNIX , socket .SOCK_DGRAM )
338
344
try :
339
- self .pipe .connect (path )
345
+ self .pipe .connect (self . socket_path )
340
346
except Exception :
341
347
self .close ()
342
348
raise RuntimeError ("Cannot connect to simulator. Is it running?" )
343
349
344
- instance = 0
345
- while instance < 10 :
346
- pn = '/tmp/ckcc-client-%d-%d.sock' % (os .getpid (), instance )
350
+ last_err = None
351
+ for instance in range (5 ):
352
+ # if simulator has PID in socket path, client will have matching, or empty
353
+ pn = '/tmp/ckcc-client%s-%d-%d.sock' % (self .get_sim_pid (), os .getpid (), instance )
347
354
try :
348
355
self .pipe .bind (pn ) # just needs any name
349
356
break
350
- except OSError :
351
- instance += 1
357
+ except OSError as err :
358
+ last_err = err
359
+ if os .path .exists (pn ):
360
+ os .remove (pn )
352
361
continue
362
+ else :
363
+ raise last_err # raise whatever was raised last in the loop
353
364
354
365
self .pipe_name = pn
355
366
atexit .register (self .close )
356
367
368
+ def get_sim_pid (self ):
369
+ # return str PID if any in socket_path
370
+ if self .socket_path == DEFAULT_SIM_SOCKET :
371
+ return ""
372
+ return "-" + self .socket_path .split ("." )[0 ].split ("-" )[- 1 ]
373
+
357
374
def read (self , max_count , timeout_ms = None ):
358
375
import socket
359
376
if not timeout_ms :
@@ -383,7 +400,7 @@ def close(self):
383
400
pass
384
401
385
402
def get_serial_number_string (self ):
386
- return 'simulator'
403
+ return 'F1' * 6
387
404
388
405
389
- # EOF
406
+ # EOF
0 commit comments