Source code for xbr._eip712_delegate_certificate

###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) typedef int GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################

import os
import pprint
from binascii import a2b_hex, b2a_hex
from typing import Any, Dict, List, Optional

import cbor2
import web3
from autobahn.wamp.message import _URI_PAT_REALM_NAME_ETH
from py_eth_sig_utils.eip712 import encode_typed_data

from xbr._secmod import EthereumKey

from ._eip712_base import (
    is_address,
    is_block_number,
    is_chain_id,
    is_cs_pubkey,
    is_eth_privkey,
    is_signature,
    recover,
    sign,
)
from ._eip712_certificate import EIP712Certificate


[docs] def create_eip712_delegate_certificate( chainId: int, verifyingContract: bytes, validFrom: int, delegate: bytes, csPubKey: bytes, bootedAt: int, meta: str ) -> dict: """ Delegate certificate: dynamic/one-time, off-chain. :param chainId: :param verifyingContract: :param validFrom: :param delegate: :param csPubKey: :param bootedAt: :param meta: :return: """ assert is_chain_id(chainId) assert is_address(verifyingContract) assert is_block_number(validFrom) assert is_address(delegate) assert is_cs_pubkey(csPubKey) assert type(bootedAt) == int assert meta is None or type(meta) == str data = { "types": { "EIP712Domain": [ {"name": "name", "type": "string"}, {"name": "version", "type": "string"}, ], "EIP712DelegateCertificate": [ {"name": "chainId", "type": "uint256"}, {"name": "verifyingContract", "type": "address"}, {"name": "validFrom", "type": "uint256"}, {"name": "delegate", "type": "address"}, {"name": "csPubKey", "type": "bytes32"}, {"name": "bootedAt", "type": "uint64"}, {"name": "meta", "type": "string"}, ], }, "primaryType": "EIP712DelegateCertificate", "domain": { "name": "WMP", "version": "1", }, "message": { "chainId": chainId, "verifyingContract": verifyingContract, "validFrom": validFrom, "delegate": delegate, "csPubKey": csPubKey, "bootedAt": bootedAt, "meta": meta or "", }, } return data
[docs] def sign_eip712_delegate_certificate( eth_privkey: bytes, chainId: int, verifyingContract: bytes, validFrom: int, delegate: bytes, csPubKey: bytes, bootedAt: int, meta: str, ) -> bytes: """ Sign the given data using a EIP712 based signature with the provided private key. :param eth_privkey: Signing key. :param chainId: :param verifyingContract: :param validFrom: :param delegate: :param csPubKey: :param bootedAt: :param meta: :return: The signature according to EIP712 (32+32+1 raw bytes). """ assert is_eth_privkey(eth_privkey) data = create_eip712_delegate_certificate( chainId, verifyingContract, validFrom, delegate, csPubKey, bootedAt, meta ) return sign(eth_privkey, data)
[docs] def recover_eip712_delegate_certificate( chainId: int, verifyingContract: bytes, validFrom: int, delegate: bytes, csPubKey: bytes, bootedAt: int, meta: str, signature: bytes, ) -> bytes: """ Recover the signer address the given EIP712 signature was signed with. :param chainId: :param verifyingContract: :param validFrom: :param delegate: :param csPubKey: :param bootedAt: :param signature: :param meta: :return: The (computed) signer address the signature was signed with. """ assert is_signature(signature) data = create_eip712_delegate_certificate( chainId, verifyingContract, validFrom, delegate, csPubKey, bootedAt, meta ) return recover(data, signature)
[docs] class EIP712DelegateCertificate(EIP712Certificate):
[docs] __slots__ = ( # EIP712 attributes "chainId", "verifyingContract", "validFrom", "delegate", "csPubKey", "bootedAt", "meta", # additional attributes "signatures", "hash", )
def __init__( self, chainId: int, verifyingContract: bytes, validFrom: int, delegate: bytes, csPubKey: bytes, bootedAt: int, meta: str, signatures: Optional[List[bytes]] = None, ): super().__init__(chainId, verifyingContract, validFrom)
[docs] self.delegate = delegate
[docs] self.csPubKey = csPubKey
[docs] self.bootedAt = bootedAt
[docs] self.meta = meta
[docs] self.signatures = signatures
eip712 = create_eip712_delegate_certificate( chainId, verifyingContract, validFrom, delegate, csPubKey, bootedAt, meta )
[docs] self.hash = encode_typed_data(eip712)
[docs] def __eq__(self, other: Any) -> bool: if not isinstance(other, self.__class__): return False if not EIP712DelegateCertificate.__eq__(self, other): return False if other.chainId != self.chainId: return False if other.verifyingContract != self.verifyingContract: return False if other.validFrom != self.validFrom: return False if other.delegate != self.delegate: return False if other.csPubKey != self.csPubKey: return False if other.bootedAt != self.bootedAt: return False if other.meta != self.meta: return False if other.signatures != self.signatures: return False if other.hash != self.hash: return False return True
[docs] def __ne__(self, other: Any) -> bool: return not self.__eq__(other)
[docs] def __str__(self) -> str: return pprint.pformat(self.marshal())
[docs] def sign(self, key: EthereumKey, binary: bool = False) -> bytes: eip712 = create_eip712_delegate_certificate( self.chainId, self.verifyingContract, self.validFrom, self.delegate, self.csPubKey, self.bootedAt, self.meta, ) return key.sign_typed_data(eip712, binary=binary)
[docs] def recover(self, signature: bytes) -> bytes: return recover_eip712_delegate_certificate( self.chainId, self.verifyingContract, self.validFrom, self.delegate, self.csPubKey, self.bootedAt, self.meta, signature, )
[docs] def marshal(self, binary: bool = False) -> Dict[str, Any]: obj = create_eip712_delegate_certificate( chainId=self.chainId, verifyingContract=self.verifyingContract, validFrom=self.validFrom, delegate=self.delegate, csPubKey=self.csPubKey, bootedAt=self.bootedAt, meta=self.meta, ) if not binary: obj["message"]["verifyingContract"] = ( web3.Web3.toChecksumAddress(obj["message"]["verifyingContract"]) if obj["message"]["verifyingContract"] else None ) obj["message"]["delegate"] = ( web3.Web3.toChecksumAddress(obj["message"]["delegate"]) if obj["message"]["delegate"] else None ) obj["message"]["csPubKey"] = ( b2a_hex(obj["message"]["csPubKey"]).decode() if obj["message"]["csPubKey"] else None ) return obj
@staticmethod
[docs] def parse(obj) -> "EIP712DelegateCertificate": if type(obj) != dict: raise ValueError("invalid type {} for object in EIP712DelegateCertificate.parse".format(type(obj))) primaryType = obj.get("primaryType", None) if primaryType != "EIP712DelegateCertificate": raise ValueError('invalid primaryType "{}" - expected "EIP712DelegateCertificate"'.format(primaryType)) # FIXME: check EIP712 types, domain data = obj.get("message", None) if type(data) != dict: raise ValueError("invalid type {} for EIP712DelegateCertificate".format(type(data))) for k in data: if k not in [ "type", "chainId", "verifyingContract", "delegate", "validFrom", "csPubKey", "bootedAt", "meta", ]: raise ValueError('invalid attribute "{}" in EIP712DelegateCertificate'.format(k)) _type = data.get("type", None) if _type and _type != "EIP712DelegateCertificate": raise ValueError('unexpected type "{}" in EIP712DelegateCertificate'.format(_type)) chainId = data.get("chainId", None) if chainId is None: raise ValueError("missing chainId in EIP712DelegateCertificate") if type(chainId) != int: raise ValueError("invalid type {} for chainId in EIP712DelegateCertificate".format(type(chainId))) verifyingContract = data.get("verifyingContract", None) if verifyingContract is None: raise ValueError("missing verifyingContract in EIP712DelegateCertificate") if type(verifyingContract) != str: raise ValueError( "invalid type {} for verifyingContract in EIP712DelegateCertificate".format(type(verifyingContract)) ) if not _URI_PAT_REALM_NAME_ETH.match(verifyingContract): raise ValueError( 'invalid value "{}" for verifyingContract in EIP712DelegateCertificate'.format(verifyingContract) ) verifyingContract = a2b_hex(verifyingContract[2:]) validFrom = data.get("validFrom", None) if validFrom is None: raise ValueError("missing validFrom in EIP712DelegateCertificate") if type(validFrom) != int: raise ValueError("invalid type {} for validFrom in EIP712DelegateCertificate".format(type(validFrom))) delegate = data.get("delegate", None) if delegate is None: raise ValueError("missing delegate in EIP712DelegateCertificate") if type(delegate) != str: raise ValueError("invalid type {} for delegate in EIP712DelegateCertificate".format(type(delegate))) if not _URI_PAT_REALM_NAME_ETH.match(delegate): raise ValueError('invalid value "{}" for verifyingContract in EIP712DelegateCertificate'.format(delegate)) delegate = a2b_hex(delegate[2:]) csPubKey = data.get("csPubKey", None) if csPubKey is None: raise ValueError("missing csPubKey in EIP712DelegateCertificate") if type(csPubKey) != str: raise ValueError("invalid type {} for csPubKey in EIP712DelegateCertificate".format(type(csPubKey))) if len(csPubKey) != 64: raise ValueError('invalid value "{}" for csPubKey in EIP712DelegateCertificate'.format(csPubKey)) csPubKey = a2b_hex(csPubKey) bootedAt = data.get("bootedAt", None) if bootedAt is None: raise ValueError("missing bootedAt in EIP712DelegateCertificate") if type(bootedAt) != int: raise ValueError("invalid type {} for bootedAt in EIP712DelegateCertificate".format(type(bootedAt))) meta = data.get("meta", None) if meta is None: raise ValueError("missing meta in EIP712DelegateCertificate") if type(meta) != str: raise ValueError("invalid type {} for meta in EIP712DelegateCertificate".format(type(meta))) obj = EIP712DelegateCertificate( chainId=chainId, verifyingContract=verifyingContract, validFrom=validFrom, delegate=delegate, csPubKey=csPubKey, bootedAt=bootedAt, meta=meta, ) return obj
[docs] def save(self, filename: str) -> int: """ Save certificate to file. File format (serialized as CBOR): [cert_hash: bytes, cert_eip712: Dict[str, Any], cert_signatures: List[bytes]] :param filename: :return: """ cert_obj = [self.hash, self.marshal(binary=True), self.signatures or []] with open(filename, "wb") as f: data = cbor2.dumps(cert_obj) f.write(data) return len(data)
@staticmethod
[docs] def load(filename) -> "EIP712DelegateCertificate": if not os.path.isfile(filename): raise RuntimeError( 'cannot create EIP712DelegateCertificate from filename "{}": not a file'.format(filename) ) with open(filename, "rb") as f: cert_hash, cert_eip712, cert_signatures = cbor2.loads(f.read()) cert = EIP712DelegateCertificate.parse(cert_eip712, binary=True) assert cert_hash == cert.hash cert.signatures = cert_signatures return cert