###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) typedef int GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
import struct
from binascii import a2b_hex, b2a_hex
from typing import Dict, List, Union
import web3
[docs]
def make_w3(gateway_config=None):
"""
Create a Web3 instance configured and ready-to-use gateway to the blockchain.
:param gateway_config: Blockchain gateway configuration.
:type gateway_config: dict
:return: Configured Web3 instance.
:rtype: :class:`web3.Web3`
"""
if gateway_config is None or gateway_config["type"] == "auto":
w3 = web3.Web3()
elif gateway_config["type"] == "user":
request_kwargs = gateway_config.get("http_options", {})
w3 = web3.Web3(web3.Web3.HTTPProvider(gateway_config["http"], request_kwargs=request_kwargs))
elif gateway_config["type"] == "infura":
request_kwargs = gateway_config.get("http_options", {})
project_id = gateway_config["key"]
# project_secret = gateway_config['secret']
http_url = "https://{}.infura.io/v3/{}".format(gateway_config["network"], project_id)
w3 = web3.Web3(web3.Web3.HTTPProvider(http_url, request_kwargs=request_kwargs))
# https://web3py.readthedocs.io/en/stable/middleware.html#geth-style-proof-of-authority
if gateway_config.get("network", None) == "rinkeby":
# This middleware is required to connect to geth --dev or the Rinkeby public network.
from web3.middleware import geth_poa_middleware
# inject the poa compatibility middleware to the innermost layer
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
# FIXME
elif gateway_config["type"] == "cloudflare":
# https://developers.cloudflare.com/web3/ethereum-gateway/reference/supported-networks/
raise NotImplementedError()
# FIXME
elif gateway_config["type"] == "zksync":
# https://v2-docs.zksync.io/dev/testnet/important-links.html
raise NotImplementedError()
else:
raise RuntimeError('invalid blockchain gateway type "{}"'.format(gateway_config["type"]))
return w3
[docs]
def unpack_uint128(data):
assert data is None or type(data) == bytes, "data must by bytes, was {}".format(type(data))
if data and type(data) == bytes:
assert len(data) == 16, "data must be bytes[16], but was bytes[{}]".format(len(data))
if data:
return web3.Web3.toInt(data)
else:
return 0
[docs]
def pack_uint128(value):
assert value is None or (type(value) == int and value >= 0 and value < 2**128)
if value:
data = web3.Web3.toBytes(value)
return b"\x00" * (16 - len(data)) + data
else:
return b"\x00" * 16
[docs]
def unpack_uint256(data):
assert data is None or type(data) == bytes, "data must by bytes, was {}".format(type(data))
if data and type(data) == bytes:
assert len(data) == 32, "data must be bytes[32], but was bytes[{}]".format(len(data))
if data:
return int(web3.Web3.toInt(data))
else:
return 0
[docs]
def pack_uint256(value):
assert value is None or (type(value) == int and value >= 0 and value < 2**256), (
"value must be uint256, but was {}".format(value)
)
if value:
data = web3.Web3.toBytes(value)
return b"\x00" * (32 - len(data)) + data
else:
return b"\x00" * 32
[docs]
def pack_ethadr(value: Union[bytes, str], return_dict: bool = False) -> Union[List[int], Dict[str, int]]:
"""
:param value:
:param return_dict:
:return:
"""
if type(value) == str:
if value.startswith("0x"):
value_bytes = a2b_hex(value[2:])
else:
value_bytes = a2b_hex(value)
elif type(value) == bytes:
value_bytes = value
else:
assert False, "invalid type {} for value".format(type(value))
assert len(value_bytes) == 20
w = []
for i in range(5):
w.append(struct.unpack("<I", value_bytes[0 + i * 4 : 4 + i * 4])[0])
if return_dict:
packed_value = {"w0": w[0], "w1": w[1], "w2": w[2], "w3": w[3], "w4": w[4]}
else:
packed_value = w
return packed_value
[docs]
def unpack_ethadr(packed_value: Union[List[int], Dict[str, int]], return_str=False) -> Union[bytes, str]:
"""
:param packed_value:
:param return_str:
:return:
"""
w = []
if type(packed_value) == dict:
for i in range(5):
w.append(struct.pack("<I", packed_value["w{}".format(i)]))
elif type(packed_value) == list:
for i in range(5):
w.append(struct.pack("<I", packed_value[i]))
else:
assert False, "should not arrive here"
if return_str:
return web3.Web3.toChecksumAddress("0x" + b2a_hex(b"".join(w)).decode())
else:
return b"".join(w)