xbr._schema

Attributes

Classes

FbsAttribute

FbsEnum

FlatBuffers enum type.

FbsEnumValue

FbsField

FbsObject

FbsRPCCall

FbsRepository

crossbar.interfaces.IInventory

FbsSchema

FbsService

FbsType

Flatbuffers type.

Functions

parse_attr(obj)

parse_calls(repository, schema, svc_obj[, objs_lst])

parse_docs(obj)

parse_fields(repository, schema, obj[, objs_lst])

validate_scalar(field, value)

Module Contents

class FbsAttribute[source]

Bases: object

__str__()[source]
class FbsEnum(repository: FbsRepository, schema: FbsSchema, declaration_file: str, name: str, id: int, values: Dict[str, FbsEnumValue], values_by_id: List[FbsEnumValue], is_union: bool, underlying_type: int, attrs: Dict[str, FbsAttribute], docs: str)[source]

Bases: object

FlatBuffers enum type.

See https://github.com/google/flatbuffers/blob/11a19887053534c43f73e74786b46a615ecbf28e/reflection/reflection.fbs#L61

__str__()[source]
_attrs[source]
_declaration_file[source]
_docs[source]
_id[source]
_is_union[source]
_name[source]
_repository[source]
_schema[source]
_underlying_type[source]
_values[source]
_values_by_id[source]
property attrs[source]
property declaration_file[source]
property docs[source]
property id[source]
property is_union[source]
marshal()[source]
property name[source]
property repository[source]
property schema[source]
property underlying_type[source]
property values[source]
property values_by_id[source]
class FbsEnumValue(repository: FbsRepository, schema: FbsSchema, name: str, id: int, value, docs)[source]

Bases: object

__str__()[source]
_attrs[source]
_docs[source]
_id[source]
_name[source]
_repository[source]
_schema[source]
_value[source]
property attrs[source]
property docs[source]
property id[source]
marshal()[source]
property name[source]
property repository[source]
property schema[source]
property value[source]
class FbsField(repository: FbsRepository, schema: FbsSchema, name: str, type: FbsType, id: int, offset: int, default_int: int, default_real: float, deprecated: bool, required: bool, attrs: Dict[str, FbsAttribute], docs: str)[source]

Bases: object

__slots__ = ('_repository', '_schema', '_name', '_type', '_id', '_offset', '_default_int', '_default_real',...[source]
__str__() str[source]
_attrs[source]
_default_int[source]
_default_real[source]
_deprecated[source]
_docs[source]
_id[source]
_name[source]
_offset[source]
_repository[source]
_required[source]
_schema[source]
_type[source]
property attrs: Dict[str, FbsAttribute][source]
property default_int: int[source]
property default_real: float[source]
property deprecated: bool[source]
property docs: str[source]
property id: int[source]
marshal() Dict[str, Any][source]
property name: str[source]
property offset: int[source]
property repository: FbsRepository[source]
property required: bool[source]
property schema: FbsSchema[source]
property type: FbsType[source]
class FbsObject(repository: FbsRepository, schema: FbsSchema, declaration_file: str, name: str, fields: Dict[str, FbsField], fields_by_id: List[FbsField], is_struct: bool, min_align: int, bytesize: int, attrs: Dict[str, FbsAttribute], docs: str)[source]

Bases: object

__slots__ = ('_repository', '_schema', '_declaration_file', '_name', '_fields', '_fields_by_id',...[source]
__str__() str[source]
_attrs[source]
_bytesize[source]
_declaration_file[source]
_docs[source]
_fields[source]
_fields_by_id[source]
_is_struct[source]
_min_align[source]
_name[source]
_repository[source]
_schema[source]
property attrs: Dict[str, FbsAttribute][source]
property bytesize: int[source]
property declaration_file: str[source]
property docs: str[source]
property fields: Dict[str, FbsField][source]
property fields_by_id: List[FbsField][source]
property is_struct: bool[source]
map(language: str, required: bool | None = True, objtype_as_string: bool = False) str[source]
map_import(language: str) str[source]
marshal() Dict[str, Any][source]
property min_align: int[source]
property name: str[source]
static parse(repository, schema, fbs_obj, objs_lst=None)[source]
property repository: FbsRepository[source]
property schema: FbsSchema[source]
class FbsRPCCall(repository: FbsRepository, schema: FbsSchema, name: str, id: int, request: FbsObject, response: FbsObject, docs: str, attrs: Dict[str, FbsAttribute])[source]

Bases: object

__str__()[source]
_attrs[source]
_docs[source]
_id[source]
_name[source]
_repository[source]
_request[source]
_response[source]
_schema[source]
property attrs[source]
property docs[source]
property id[source]
marshal()[source]
property name[source]
property repository[source]
property request[source]
property response[source]
property schema[source]
class FbsRepository(basemodule: str)[source]

Bases: object

crossbar.interfaces.IInventory
  • add: FbsRepository[] - load: FbsSchema[]

https://github.com/google/flatbuffers/blob/master/reflection/reflection.fbs

_basemodule[source]
_enums: Dict[str, FbsEnum][source]
_objs: Dict[str, FbsObject][source]
_schemata: Dict[str, FbsSchema][source]
_services: Dict[str, FbsService][source]
property basemodule: str[source]
property enums: Dict[str, FbsEnum][source]
static from_address(address: str) FbsRepository[source]
static from_archive(filename: str) FbsRepository[source]
load(filename: str) Tuple[int, int][source]

Load and add all schemata from Flatbuffers binary schema files (*.bfbs) found in the given directory. Alternatively, a path to a single schema file can be provided.

Parameters:

filename – Filesystem path of a directory or single file from which to load and add Flatbuffers schemata.

log[source]
property objs: Dict[str, FbsObject][source]
print_summary()[source]
render(jinja2_env, output_dir, output_lang)[source]
Parameters:
  • jinja2_env

  • output_dir

  • output_lang

Returns:

property schemas: Dict[str, FbsSchema][source]
property services: Dict[str, FbsService][source]
summary(keys=False)[source]
property total_count[source]
validate(validation_type: str, args: List[Any], kwargs: Dict[str, Any]) FbsObject | None[source]

Validate the WAMP application payload provided in positional argument in args and in keyword-based arguments in kwargs against the FlatBuffers table type validation_type from this repository.

If the application payload does not validate against the provided type, an autobahn.wamp.exception.InvalidPayload is raised.

Parameters:
  • validation_type – Flatbuffers type (fully qualified) against to validate application payload.

  • args – The application payload WAMP positional arguments.

  • kwargs – The application payload WAMP keyword-based arguments.

Returns:

The validation type object from this repository (reference in validation_type) which has been used for validation.

validate_obj(validation_type: str | None, value: Any | None)[source]

Validate value against the validation type given.

If the application payload does not validate against the provided type, an autobahn.wamp.exception.InvalidPayload is raised.

Parameters:
  • validation_type – Flatbuffers type (fully qualified) against to validate application payload.

  • value – Value to validate.

Returns:

class FbsSchema(repository: FbsRepository, file_name: str, file_sha256: str, file_size: int, file_ident: str, file_ext: str, fbs_files: List[Dict[str, str]], root_table: FbsObject, root: zlmdb.flatbuffers.reflection.Schema.Schema, objs: Dict[str, FbsObject] | None = None, objs_by_id: List[FbsObject] | None = None, enums: Dict[str, FbsEnum] | None = None, enums_by_id: List[FbsEnum] | None = None, services: Dict[str, FbsService] | None = None, services_by_id: List[FbsService] | None = None)[source]

Bases: object

__str__()[source]
_enums = None[source]
_enums_by_id = None[source]
_fbs_files[source]
_file_ext[source]
_file_ident[source]
_file_name[source]
_file_sha256[source]
_file_size[source]
_objs = None[source]
_objs_by_id = None[source]
_repository[source]
_root[source]
_root_table[source]
_services = None[source]
_services_by_id = None[source]
property enums[source]
property enums_by_id[source]
property fbs_files[source]
property file_ext[source]
property file_ident[source]
property file_name[source]
property file_sha256[source]
property file_size[source]
static load(repository: FbsRepository, sfile: str | io.RawIOBase | IO[bytes], filename: str | None = None) FbsSchema[source]
Parameters:
  • repository

  • sfile

  • filename

Returns:

marshal() Dict[str, object][source]
Returns:

property objs[source]
property objs_by_id[source]
property repository[source]
property root[source]
property root_table[source]
property services[source]
property services_by_id[source]
class FbsService(repository: FbsRepository, schema: FbsSchema, declaration_file: str, name: str, calls: Dict[str, FbsRPCCall], calls_by_id: List[FbsRPCCall], attrs: Dict[str, FbsAttribute], docs: str)[source]

Bases: object

__str__()[source]
_attrs[source]
_calls[source]
_calls_by_id[source]
_declaration_file[source]
_docs[source]
_name[source]
_repository[source]
_schema[source]
property attrs[source]
property calls[source]
property calls_by_id[source]
property declaration_file[source]
property docs[source]
marshal()[source]
property name[source]
property repository[source]
property schema[source]
class FbsType(repository: FbsRepository, schema: FbsSchema, basetype: int, element: int, index: int, objtype: str | None = None, elementtype: str | None = None)[source]

Bases: object

Flatbuffers type.

See: https://github.com/google/flatbuffers/blob/11a19887053534c43f73e74786b46a615ecbf28e/reflection/reflection.fbs#L33

Bool = 2[source]
Byte = 3[source]
Double = 12[source]
FBS2FLAGS[source]
FBS2PREPEND[source]
FBS2PY[source]
FBS2PY_TYPE[source]
FBS2STR[source]
Float = 11[source]
Int = 7[source]
Long = 9[source]
Obj = 15[source]
SCALAR_TYPES[source]
STR2FBS[source]
STRUCTURED_TYPES[source]
Short = 5[source]
String = 13[source]
UByte = 4[source]
UInt = 8[source]
ULong = 10[source]
UShort = 6[source]
UType = 1[source]
Union = 16[source]
Vector = 14[source]
__slots__ = ('_repository', '_schema', '_basetype', '_element', '_index', '_objtype', '_elementtype')[source]
__str__() str[source]
_basetype[source]
_element[source]
_elementtype = None[source]
_index[source]
_objtype = None[source]
_repository[source]
_schema[source]
property basetype: int[source]

Flatbuffers base type.

Returns:

property element: int[source]

Only if basetype == Vector

Returns:

property elementtype: str | None[source]

If basetype == Vector, fully qualified element type name.

Returns:

property index: int[source]

If basetype == Object, index into “objects”. If base_type == Union, UnionType, or integral derived from an enum, index into “enums”. If base_type == Vector && element == Union or UnionType.

Returns:

map(language: str, attrs: Dict | None = None, required: bool | None = True, objtype_as_string: bool = False) str[source]
Parameters:
  • language

  • attrs

  • required

  • objtype_as_string

Returns:

marshal() Dict[str, Any][source]
property objtype: str | None[source]

If basetype == Object, fully qualified object type name.

Returns:

property repository: FbsRepository[source]
property schema: FbsSchema[source]
FormatCode = None[source]
parse_attr(obj)[source]
parse_calls(repository, schema, svc_obj, objs_lst=None)[source]
parse_docs(obj)[source]
parse_fields(repository, schema, obj, objs_lst=None)[source]
validate_scalar(field, value: Any | None)[source]