Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions hwilib/psbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ class PartiallySignedInput:
PSBT_IN_TAP_BIP32_DERIVATION = 0x16
PSBT_IN_TAP_INTERNAL_KEY = 0x17
PSBT_IN_TAP_MERKLE_ROOT = 0x18
PSBT_IN_MUSIG2_PARTICIPANT_PUBKEYS = 0x1a
PSBT_IN_MUSIG2_PUB_NONCE = 0x1b
PSBT_IN_MUSIG2_PARTIAL_SIG = 0x1c

def __init__(self, version: int) -> None:
self.non_witness_utxo: Optional[CTransaction] = None
Expand All @@ -125,6 +128,9 @@ def __init__(self, version: int) -> None:
self.tap_bip32_paths: Dict[bytes, Tuple[Set[bytes], KeyOriginInfo]] = {}
self.tap_internal_key = b""
self.tap_merkle_root = b""
self.musig2_participant_pubkeys: Dict[bytes, List[bytes]] = {}
self.musig2_pub_nonces: Dict[Tuple[bytes, bytes, Optional[bytes]], bytes] = {}
self.musig2_partial_sigs: Dict[Tuple[bytes, bytes, Optional[bytes]], bytes] = {}
self.unknown: Dict[bytes, bytes] = {}

self.version: int = version
Expand Down Expand Up @@ -153,6 +159,9 @@ def set_null(self) -> None:
self.sequence = None
self.time_locktime = None
self.height_locktime = None
self.musig2_participant_pubkeys.clear()
self.musig2_pub_nonces.clear()
self.musig2_partial_sigs.clear()
self.unknown.clear()

def deserialize(self, f: Readable) -> None:
Expand Down Expand Up @@ -351,6 +360,51 @@ def deserialize(self, f: Readable) -> None:
self.tap_merkle_root = deser_string(f)
if len(self.tap_merkle_root) != 32:
raise PSBTSerializationError("Input Taproot merkle root is not 32 bytes")
elif key_type == PartiallySignedInput.PSBT_IN_MUSIG2_PARTICIPANT_PUBKEYS:
if key in key_lookup:
raise PSBTSerializationError("Duplicate key, input Musig2 participant pubkeys already provided")
elif len(key) != 1 + 33:
raise PSBTSerializationError("Input Musig2 aggregate compressed pubkey is not 33 bytes")

pubkeys_cat = deser_string(f)
if len(pubkeys_cat) == 0:
raise PSBTSerializationError("The list of compressed pubkeys for Musig2 cannot be empty")
if (len(pubkeys_cat) % 33) != 0:
raise PSBTSerializationError("The compressed pubkeys for Musig2 must be exactly 33 bytes long")
pubkeys = []
for i in range(0, len(pubkeys_cat), 33):
pubkeys.append(pubkeys_cat[i: i + 33])

self.musig2_participant_pubkeys[key[1:]] = pubkeys
elif key_type == PartiallySignedInput.PSBT_IN_MUSIG2_PUB_NONCE:
if key in key_lookup:
raise PSBTSerializationError("Duplicate key, Musig2 public nonce already provided")
elif len(key) not in [1 + 33 + 33, 1 + 33 + 33 + 32]:
raise PSBTSerializationError("Invalid key length for Musig2 public nonce")

providing_pubkey = key[1:1 + 33]
aggregate_pubkey = key[1 + 33:1 + 33 + 33]
tapleaf_hash = None if len(key) == 1 + 33 + 33 else key[1 + 33 + 33:]

public_nonces = deser_string(f)
if len(public_nonces) != 66:
raise PSBTSerializationError("The length of the public nonces in Musig2 must be exactly 66 bytes")

self.musig2_pub_nonces[(providing_pubkey, aggregate_pubkey, tapleaf_hash)] = public_nonces
elif key_type == PartiallySignedInput.PSBT_IN_MUSIG2_PARTIAL_SIG:
if key in key_lookup:
raise PSBTSerializationError("Duplicate key, Musig2 partial signature already provided")
elif len(key) not in [1 + 33 + 33, 1 + 33 + 33 + 32]:
raise PSBTSerializationError("Invalid key length for Musig2 partial signature")

providing_pubkey = key[1:1 + 33]
aggregate_pubkey = key[1 + 33:1 + 33 + 33]
tapleaf_hash = None if len(key) == 1 + 33 + 33 else key[1 + 33 + 33:]

partial_sig = deser_string(f)
if len(partial_sig) != 32:
raise PSBTSerializationError("The length of the partial signature in Musig2 must be exactly 32 bytes")
self.musig2_partial_sigs[(providing_pubkey, aggregate_pubkey, tapleaf_hash)] = partial_sig
else:
if key in self.unknown:
raise PSBTSerializationError("Duplicate key, key for unknown value already provided")
Expand Down Expand Up @@ -441,6 +495,20 @@ def serialize(self) -> bytes:
witstack = self.final_script_witness.serialize()
r += ser_string(witstack)

for pk, pubkeys in self.musig2_participant_pubkeys.items():
r += ser_string(ser_compact_size(PartiallySignedInput.PSBT_IN_MUSIG2_PARTICIPANT_PUBKEYS) + pk)
r += ser_string(b''.join(pubkeys))

for (pk, aggpk, hash), pubnonce in self.musig2_pub_nonces.items():
key_value = pk + aggpk + (hash or b'')
r += ser_string(ser_compact_size(PartiallySignedInput.PSBT_IN_MUSIG2_PUB_NONCE) + key_value)
r += ser_string(pubnonce)

for (pk, aggpk, hash), partial_sig in self.musig2_partial_sigs.items():
key_value = pk + aggpk + (hash or b'')
r += ser_string(ser_compact_size(PartiallySignedInput.PSBT_IN_MUSIG2_PARTIAL_SIG) + key_value)
r += ser_string(partial_sig)

if self.version >= 2:
if len(self.prev_txid) != 0:
r += ser_string(ser_compact_size(PartiallySignedInput.PSBT_IN_PREVIOUS_TXID))
Expand Down Expand Up @@ -483,6 +551,7 @@ class PartiallySignedOutput:
PSBT_OUT_TAP_INTERNAL_KEY = 0x05
PSBT_OUT_TAP_TREE = 0x06
PSBT_OUT_TAP_BIP32_DERIVATION = 0x07
PSBT_OUT_MUSIG2_PARTICIPANT_PUBKEYS = 0x08

def __init__(self, version: int) -> None:
self.redeem_script = b""
Expand All @@ -493,6 +562,7 @@ def __init__(self, version: int) -> None:
self.tap_internal_key = b""
self.tap_tree = b""
self.tap_bip32_paths: Dict[bytes, Tuple[Set[bytes], KeyOriginInfo]] = {}
self.musig2_participant_pubkeys: Dict[bytes, List[bytes]] = {}
self.unknown: Dict[bytes, bytes] = {}

self.version: int = version
Expand All @@ -509,6 +579,7 @@ def set_null(self) -> None:
self.tap_bip32_paths.clear()
self.amount = None
self.script = b""
self.musig2_participant_pubkeys = {}
self.unknown.clear()

def deserialize(self, f: Readable) -> None:
Expand Down Expand Up @@ -589,6 +660,22 @@ def deserialize(self, f: Readable) -> None:
for i in range(0, num_hashes):
leaf_hashes.add(vs.read(32))
self.tap_bip32_paths[xonly] = (leaf_hashes, KeyOriginInfo.deserialize(vs.read()))
elif key_type == PartiallySignedOutput.PSBT_OUT_MUSIG2_PARTICIPANT_PUBKEYS:
if key in key_lookup:
raise PSBTSerializationError("Duplicate key, output Musig2 participant pubkeys already provided")
elif len(key) != 1 + 33:
raise PSBTSerializationError("Output Musig2 aggregate compressed pubkey is not 33 bytes")

pubkeys_cat = deser_string(f)
if len(pubkeys_cat) == 0:
raise PSBTSerializationError("The list of compressed pubkeys for Musig2 cannot be empty")
if (len(pubkeys_cat) % 33) != 0:
raise PSBTSerializationError("The compressed pubkeys for Musig2 must be exactly 33 bytes long")
pubkeys = []
for i in range(0, len(pubkeys_cat), 33):
pubkeys.append(pubkeys_cat[i: i + 33])

self.musig2_participant_pubkeys[key[1:]] = pubkeys
else:
if key in self.unknown:
raise PSBTSerializationError("Duplicate key, key for unknown value already provided")
Expand Down Expand Up @@ -646,6 +733,11 @@ def serialize(self) -> bytes:
value += origin.serialize()
r += ser_string(value)

for pk, pubkeys in self.musig2_participant_pubkeys.items():
r += ser_string(ser_compact_size(
PartiallySignedOutput.PSBT_OUT_MUSIG2_PARTICIPANT_PUBKEYS) + pk)
r += ser_string(b''.join(pubkeys))

for key, value in sorted(self.unknown.items()):
r += ser_string(key)
r += ser_string(value)
Expand Down
Loading
Loading