Source code for xbr._config

###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) typedef int GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################

import binascii
import configparser
import io
import os
import struct
import sys
import uuid
from time import time_ns
from typing import Dict, List, Optional

import click
import nacl
import numpy as np
import web3
from autobahn.websocket.util import parse_url
from eth_utils.conversions import hexstr_if_str, to_hex

from xbr._wallet import pkm_from_argon2_secret

[docs] _HAS_COLOR_TERM = False
try: import colorama # https://github.com/tartley/colorama/issues/48
[docs] term = None
if sys.platform == "win32" and "TERM" in os.environ: term = os.environ.pop("TERM") colorama.init() _HAS_COLOR_TERM = True if term: os.environ["TERM"] = term except ImportError: pass
[docs] class Profile(object): """ User profile, stored as named section in ``${HOME}/.xbrnetwork/config.ini``: .. code-block:: INI [default] # username used with this profile username=joedoe # user email used with the profile (e.g. for verification emails) email=joe.doe@example.com # XBR network node used as a directory server and gateway to XBR smart contracts network_url=ws://localhost:8090/ws # WAMP realm on network node, usually "xbrnetwork" network_realm=xbrnetwork # user private WAMP-cryptosign key (for client authentication) cskey=0xb18bbe88ca0e189689e99f87b19addfb179d46aab3d59ec5d93a15286b949eb6 # user private Ethereum key (for signing transactions and e2e data encryption) ethkey=0xfbada363e724d4db2faa2eeaa7d7aca37637b1076dd8cf6fefde13983abaa2ef """ def __init__( self, path: Optional[str] = None, name: Optional[str] = None, member_adr: Optional[str] = None, ethkey: Optional[bytes] = None, cskey: Optional[bytes] = None, username: Optional[str] = None, email: Optional[str] = None, network_url: Optional[str] = None, network_realm: Optional[str] = None, member_oid: Optional[uuid.UUID] = None, vaction_oid: Optional[uuid.UUID] = None, vaction_requested: Optional[np.datetime64] = None, vaction_verified: Optional[np.datetime64] = None, data_url: Optional[str] = None, data_realm: Optional[str] = None, infura_url: Optional[str] = None, infura_network: Optional[str] = None, infura_key: Optional[str] = None, infura_secret: Optional[str] = None, ): """ :param path: :param name: :param member_adr: :param ethkey: :param cskey: :param username: :param email: :param network_url: :param network_realm: :param member_oid: :param vaction_oid: :param vaction_requested: :param vaction_verified: :param data_url: :param data_realm: :param infura_url: :param infura_network: :param infura_key: :param infura_secret: """ from txaio import make_logger
[docs] self.log = make_logger()
[docs] self.path = path
[docs] self.name = name
[docs] self.member_adr = member_adr
[docs] self.ethkey = ethkey
[docs] self.cskey = cskey
[docs] self.username = username
[docs] self.email = email
[docs] self.network_url = network_url
[docs] self.network_realm = network_realm
[docs] self.member_oid = member_oid
[docs] self.vaction_oid = vaction_oid
[docs] self.vaction_requested = vaction_requested
[docs] self.vaction_verified = vaction_verified
[docs] self.data_url = data_url
[docs] self.data_realm = data_realm
[docs] self.infura_url = infura_url
[docs] self.infura_network = infura_network
[docs] self.infura_key = infura_key
[docs] self.infura_secret = infura_secret
[docs] def marshal(self): obj = {} obj["member_adr"] = self.member_adr or "" obj["ethkey"] = "0x{}".format(binascii.b2a_hex(self.ethkey).decode()) if self.ethkey else "" obj["cskey"] = "0x{}".format(binascii.b2a_hex(self.cskey).decode()) if self.cskey else "" obj["username"] = self.username or "" obj["email"] = self.email or "" obj["network_url"] = self.network_url or "" obj["network_realm"] = self.network_realm or "" obj["member_oid"] = str(self.member_oid) if self.member_oid else "" obj["vaction_oid"] = str(self.vaction_oid) if self.vaction_oid else "" obj["vaction_requested"] = str(self.vaction_requested) if self.vaction_requested else "" obj["vaction_verified"] = str(self.vaction_verified) if self.vaction_verified else "" obj["data_url"] = self.data_url or "" obj["data_realm"] = self.data_realm or "" obj["infura_url"] = self.infura_url or "" obj["infura_network"] = self.infura_network or "" obj["infura_key"] = self.infura_key or "" obj["infura_secret"] = self.infura_secret or "" return obj
@staticmethod
[docs] def parse(path, name, items): member_adr = None ethkey = None cskey = None username = None email = None network_url = None network_realm = None member_oid = None vaction_oid = None vaction_requested = None vaction_verified = None data_url = None data_realm = None infura_network = None infura_key = None infura_secret = None infura_url = None for k, v in items: if k == "network_url": network_url = str(v) elif k == "network_realm": network_realm = str(v) elif k == "vaction_oid": if type(v) == str and v != "": vaction_oid = uuid.UUID(v) else: vaction_oid = None elif k == "member_adr": if type(v) == str and v != "": member_adr = v else: member_adr = None elif k == "member_oid": if type(v) == str and v != "": member_oid = uuid.UUID(v) else: member_oid = None elif k == "vaction_requested": if type(v) == int and v: vaction_requested = np.datetime64(v, "ns") else: vaction_requested = v elif k == "vaction_verified": if type(v) == int: vaction_verified = np.datetime64(v, "ns") else: vaction_verified = v elif k == "data_url": data_url = str(v) elif k == "data_realm": data_realm = str(v) elif k == "ethkey": ethkey = binascii.a2b_hex(v[2:]) elif k == "cskey": cskey = binascii.a2b_hex(v[2:]) elif k == "username": username = str(v) elif k == "email": email = str(v) elif k == "infura_network": infura_network = str(v) elif k == "infura_key": infura_key = str(v) elif k == "infura_secret": infura_secret = str(v) elif k == "infura_url": infura_url = str(v) elif k in ["path", "name"]: pass else: # skip unknown attribute print('unprocessed config attribute "{}"'.format(k)) return Profile( path, name, member_adr, ethkey, cskey, username, email, network_url, network_realm, member_oid, vaction_oid, vaction_requested, vaction_verified, data_url, data_realm, infura_url, infura_network, infura_key, infura_secret, )
[docs] class UserConfig(object): """ Local user configuration file. The data is either a plain text (unencrypted) .ini file, or such a file encrypted with XSalsa20-Poly1305, and with a binary file header of 48 octets. """ def __init__(self, config_path): """ :param config_path: The user configuration file path. """ from txaio import make_logger
[docs] self.log = make_logger()
[docs] self._config_path = os.path.abspath(config_path)
[docs] self._profiles = {}
@property
[docs] def config_path(self) -> List[str]: """ Return the path to the user configuration file exposed by this object., :return: Local filesystem path. """ return self._config_path
@property
[docs] def profiles(self) -> Dict[str, object]: """ Access to a map of user profiles in this user configuration. :return: Map of user profiles. """ return self._profiles
[docs] def save(self, password: Optional[str] = None): """ Save this user configuration to the underlying configuration file. The user configuration file can be encrypted using Argon2id when a ``password`` is given. :param password: The optional Argon2id password. :return: Number of octets written to the user configuration file. """ written = 0 config = configparser.ConfigParser() for profile_name, profile in self._profiles.items(): if profile_name not in config.sections(): config.add_section(profile_name) written += 1 pd = profile.marshal() for option, value in pd.items(): config.set(profile_name, option, value) with io.StringIO() as fp1: config.write(fp1) config_data = fp1.getvalue().encode("utf8") if password: # binary file format header (48 bytes): # # * 8 bytes: 0xdeadbeef 0x00000666 magic number (big endian) # * 4 bytes: 0x00000001 encryption type 1 for "argon2id" # * 4 bytes data length (big endian) # * 8 bytes created timestamp ns (big endian) # * 8 bytes unused (filled 0x00 currently) # * 16 bytes salt # salt = os.urandom(16) context = "xbrnetwork-config" priv_key = pkm_from_argon2_secret(email="", password=password, context=context, salt=salt) box = nacl.secret.SecretBox(priv_key) config_data_ciphertext = box.encrypt(config_data) dl = [ b"\xde\xad\xbe\xef", b"\x00\x00\x06\x66", b"\x00\x00\x00\x01", struct.pack(">L", len(config_data_ciphertext)), struct.pack(">Q", time_ns()), b"\x00" * 8, salt, config_data_ciphertext, ] data = b"".join(dl) else: data = config_data with open(self._config_path, "wb") as fp2: fp2.write(data) self.log.debug( "configuration with {sections} sections, {bytes_written} bytes written to {written_to}", sections=written, bytes_written=len(data), written_to=self._config_path, ) return len(data)
[docs] def load(self, cb_get_password=None) -> List[str]: """ Load this object from the underlying user configuration file. When the file is encrypted, call back into ``cb_get_password`` to get the user password. :param cb_get_password: Callback called when password is needed. :return: List of profiles loaded. """ if not os.path.exists(self._config_path) or not os.path.isfile(self._config_path): raise RuntimeError('config path "{}" cannot be loaded: so such file'.format(self._config_path)) with open(self._config_path, "rb") as fp: data = fp.read() if len(data) >= 48 and data[:8] == b"\xde\xad\xbe\xef\x00\x00\x06\x66": # binary format detected header = data[:48] body = data[48:] algo = struct.unpack(">L", header[8:12])[0] data_len = struct.unpack(">L", header[12:16])[0] created = struct.unpack(">Q", header[16:24])[0] # created_ts = np.datetime64(created, 'ns') assert algo in [0, 1] assert data_len == len(body) assert created < time_ns() salt = header[32:48] context = "xbrnetwork-config" if cb_get_password: password = cb_get_password() else: password = "" priv_key = pkm_from_argon2_secret(email="", password=password, context=context, salt=salt) box = nacl.secret.SecretBox(priv_key) body = box.decrypt(body) else: header = None body = data config = configparser.ConfigParser() config.read_string(body.decode("utf8")) profiles = {} for profile_name in config.sections(): citems = config.items(profile_name) profile = Profile.parse(self._config_path, profile_name, citems) profiles[profile_name] = profile self._profiles = profiles loaded_profiles = sorted(self.profiles.keys()) return loaded_profiles
if "CROSSBAR_FABRIC_URL" in os.environ:
[docs] _DEFAULT_CFC_URL = os.environ["CROSSBAR_FABRIC_URL"]
else: _DEFAULT_CFC_URL = "wss://master.xbr.network/ws"
[docs] def style_error(text): if _HAS_COLOR_TERM: return click.style(text, fg="red", bold=True) else: return text
[docs] def style_ok(text): if _HAS_COLOR_TERM: return click.style(text, fg="green", bold=True) else: return text
[docs] class WampUrl(click.ParamType): """ WAMP transport URL validator. """
[docs] name = "WAMP transport URL"
def __init__(self): click.ParamType.__init__(self)
[docs] def convert(self, value, param, ctx): try: parse_url(value) except Exception as e: self.fail(style_error(str(e))) else: return value
[docs] def prompt_for_wamp_url(msg, default=None): """ Prompt user for WAMP transport URL (eg "wss://planet.xbr.network/ws"). """ value = click.prompt(msg, type=WampUrl(), default=default) return value
[docs] class EthereumAddress(click.ParamType): """ Ethereum address validator. """
[docs] name = "Ethereum address"
def __init__(self): click.ParamType.__init__(self)
[docs] def convert(self, value, param, ctx): try: value = web3.Web3.toChecksumAddress(value) adr = binascii.a2b_hex(value[2:]) if len(value) != 20: raise ValueError("Ethereum addres must be 20 bytes (160 bit), but was {} bytes".format(len(adr))) except Exception as e: self.fail(style_error(str(e))) else: return value
[docs] def prompt_for_ethereum_address(msg): """ Prompt user for an Ethereum (public) address. """ value = click.prompt(msg, type=EthereumAddress()) return value
[docs] class PrivateKey(click.ParamType): """ Private key (32 bytes in HEX) validator. """
[docs] name = "Private key"
def __init__(self, key_len): click.ParamType.__init__(self)
[docs] self._key_len = key_len
[docs] def convert(self, value, param, ctx): try: value = hexstr_if_str(to_hex, value) if value[:2] in ["0x", "\\x"]: key = binascii.a2b_hex(value[2:]) else: key = binascii.a2b_hex(value) if len(key) != self._key_len: raise ValueError("key length must be {} bytes, but was {} bytes".format(self._key_len, len(key))) except Exception as e: self.fail(style_error(str(e))) else: return value
[docs] def prompt_for_key(msg, key_len, default=None): """ Prompt user for a binary key of given length (in HEX). """ value = click.prompt(msg, type=PrivateKey(key_len), default=default) return value
# default configuration stored in $HOME/.xbrnetwork/config.ini
[docs] _DEFAULT_CONFIG = """[default] # username used with this profile username={username} # user email used with the profile (e.g. for verification emails) email={email} # XBR network node used as a directory server and gateway to XBR smart contracts network_url={network_url} # WAMP realm on network node, usually "xbrnetwork" network_realm={network_realm} # user private WAMP-cryptosign key (for client authentication) cskey={cskey} # user private Ethereum key (for signing transactions and e2e data encryption) ethkey={ethkey} """
# # default XBR market URL to connect to # market_url={market_url} # market_realm={market_realm} # # Infura blockchain gateway configuration # infura_url={infura_url} # infura_network={infura_network} # infura_key={infura_key} # infura_secret={infura_secret}
[docs] def load_or_create_profile( dotdir=None, profile=None, default_url=None, default_realm=None, default_email=None, default_username=None ): dotdir = dotdir or "~/.xbrnetwork" profile = profile or "default" default_url = default_url or "wss://planet.xbr.network/ws" default_realm = default_realm or "xbrnetwork" config_dir = os.path.expanduser(dotdir) if not os.path.isdir(config_dir): os.mkdir(config_dir) click.echo("created new local user directory {}".format(style_ok(config_dir))) config_path = os.path.join(config_dir, "config.ini") if not os.path.isfile(config_path): click.echo('creating new user profile "{}"'.format(style_ok(profile))) with open(config_path, "w") as f: network_url = prompt_for_wamp_url( "enter the WAMP router URL of the network directory node", default=default_url ) network_realm = click.prompt( "enter the WAMP realm to join on the network directory node", type=str, default=default_realm ) cskey = prompt_for_key( "your private WAMP client key", 32, default="0x" + binascii.b2a_hex(os.urandom(32)).decode() ) ethkey = prompt_for_key( "your private Etherum key", 32, default="0x" + binascii.b2a_hex(os.urandom(32)).decode() ) email = click.prompt("user email used for with profile", type=str, default=default_email) username = click.prompt("user name used with this profile", type=str, default=default_username) f.write( _DEFAULT_CONFIG.format( network_url=network_url, network_realm=network_realm, ethkey=ethkey, cskey=cskey, email=email, username=username, ) ) click.echo("created new local user configuration {}".format(style_ok(config_path))) config_obj = UserConfig(config_path) config_obj.load() profile_obj = config_obj.profiles.get(profile, None) if not profile_obj: raise click.ClickException('no such profile: "{}"'.format(profile)) return profile_obj