import base64 import sha3 import asn1 from cryptography.hazmat.primitives.asymmetric import ec from eth_typing import ChecksumAddress from eth_account._utils.legacy_transactions import ( serializable_unsigned_transaction_from_dict, encode_transaction, ) from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from web3 import Web3 from google.cloud import kms w3 = Web3(Web3.HTTPProvider("https://1rpc.io/sepolia")) client = kms.KeyManagementServiceClient() key_version_name = client.crypto_key_version_path( "keymanagertest-449901", "asia1", "evm_test_1", "test1", "1" ) chain_id = 11155111 # Sepolia Testnet def run(): address = get_address_from_kms(key_version_name) print("Address from PEM key: ", address) tx_dict = { "nonce": w3.eth.get_transaction_count(address), "gasPrice": w3.to_wei("20", "gwei"), "gas": 21000, "to": address, "value": w3.to_wei("0.01", "ether"), "data": b"", "chainId": chain_id, } unsigned_tx = serializable_unsigned_transaction_from_dict(tx_dict) tx_hash = unsigned_tx.hash() resp = syn(digest={"sha256": base64.b64encode(tx_hash).decode("utf-8")}) der_sig = resp.signature decoder = asn1.Decoder() decoder.start(der_sig) decoder.enter() _, r = decoder.read() _, s = decoder.read() for recovery_add in [chain_id * 2 + 25, chain_id * 2 + 36]: encoded_raw = encode_transaction(unsigned_tx, (recovery_add, r, s)) tx_hash = w3.keccak(encoded_raw) recovered = w3.eth.account.recover_transaction(encoded_raw) if recovered.lower() == address.lower(): print(f"Signing matched with v={recovery_add}, from={recovered}") print(f"r: {r}, s: {s}") tx_hash = w3.eth.send_raw_transaction(encoded_raw).to_0x_hex() print(f"Transaction hash: {tx_hash}") break else: raise ValueError("Failed to find valid v for the signature") def syn(digest) -> kms.AsymmetricSignResponse: sign_response = client.asymmetric_sign( request={ "name": key_version_name, "digest": digest, } ) return sign_response def get_address_from_kms(key_version_path) -> ChecksumAddress: public_key_resp = client.get_public_key(name=key_version_path) return pem_to_address(public_key_resp.pem) def pem_to_address(pem_key: str) -> ChecksumAddress: public_key = serialization.load_pem_public_key( pem_key.encode("utf-8"), backend=default_backend() ) if isinstance(public_key, ec.EllipticCurvePublicKey): numbers = public_key.public_numbers() x_bytes = numbers.x.to_bytes(32, byteorder="big") y_bytes = numbers.y.to_bytes(32, byteorder="big") public_key_bytes = b"\x04" + x_bytes + y_bytes # 0x04 + x + y keccak = sha3.keccak_256() keccak.update(public_key_bytes[1:]) # remove 0x04 address = keccak.digest()[-20:] return w3.to_checksum_address(address) else: raise ValueError("Invalid PEM public key format") run()