core package

Submodules

core.intercom module

class core.intercom.MQTTMessageBroker(host: str = '127.0.0.1', port: int = 1883)

Bases: object

Wrapper class for the HBMQTT message broker.

run(loop: asyncio.events.AbstractEventLoop) → None

Runs the HBMQTT broker asynchronously.

start() → None
class core.intercom.MQTTMessenger(manager: Any, client_id: str)

Bases: object

MQTTMessenger connects to an MQTT message broker and exchanges messages.

property client
connect() → None

Connect to the message broker.

disconnect() → None

Disconnect from the message broker.

property host
property is_connected
property port
publish(topic: str, message: str, qos: int = 0, retain: bool = False) → None

Send message to the message broker.

Parameters
  • topic – Topic to publish to.

  • message – Message to publish.

  • qos – Quality of Service (0, 1, or 2).

  • retain – Retained message or not.

subscribe(topic) → None

Set the topic the client should subscribe from the message broker.

property topic

core.logging module

Various data structures, filters, formatters, and handlers for logging.

class core.logging.RingBuffer(max_length: int)

Bases: object

RingBuffer stores elements in a deque. It is a FIFO list with fixed size to cache a number of elements, like log messages. The oldest elements get removed when the number of elements is greater than the maximum length.

append(x: Any) → None

Appends an element to the deque.

Parameters

x – Element to append.

list() → Any
pop() → Any

Pops an element.

Returns

String on the left side of the deque.

to_string() → str

Returns the whole deque as a string.

Returns

String containing all string elements in the deque.

class core.logging.RingBufferLogHandler(level: int, size: int)

Bases: logging.Handler

RingBufferLogHandler stores a number of log messages in a RingBuffer.

property buffer
emit(record: logging.LogRecord) → None

Adds a log record to the internal ring buffer.

Parameters

record – The log record.

get_logs() → str

Returns all log messages as a concatenated string.

Returns

All log messages.

property size
class core.logging.RootFilter(name='')

Bases: logging.Filter

RootFilter is a helper class to filter unwanted log messages from external Python modules.

filter(record: logging.LogRecord) → bool

Returns whether a logging.LogRecord should be logged or not. Log messages from selected Python modules will be discarded.

class core.logging.StringFormatter

Bases: logging.Formatter

StringFormatter simply returns a formatted string of a log record.

format(record: logging.LogRecord) → str

Return formatted string of log record.

Parameters

record – The log record.

Returns

Formatted string of log record.

core.manager module

Collection of manager classes.

class core.manager.ConfigManager(path: str, schema_manager)

Bases: object

ConfigManager loads and stores the OpenADMS Node configuration.

property config
get(key: str) → Dict[str, Any]

Returns a single configuration.

Parameters

key – The name of the configuration.

Returns

A dictionary with the configuration.

get_valid_config(schema_name: str, *args) → Dict[str, Any]

Returns the validated configuration of a module. Raises a ValueError exception if the configuration is invalid.

Parameters
  • schema_name – Name of the JSON schema.

  • *args – Key names to the module’s configuration.

Returns

A dictionary with the module’s configuration.

Raises

ValueError – If module configuration is invalid.

load_all()

Loads the configuration.

load_config_from_file(config_path: str) → bool

Loads configuration from a JSON file.

Parameters

config_path – The path to the JSON file.

Returns

True if file has been loaded, False if not.

property path
remove_all() → None

Clears everything.

class core.manager.Manager

Bases: object

Manager is a container class for all the managers.

property config
property module
property node
property project
property schema
property sensor
class core.manager.ModuleManager(manager: core.manager.Manager)

Bases: object

ModuleManager loads and manages OpenADMS Node modules.

add(name: str, class_path: str)

Instantiates a worker, instantiates a messenger, and bundles both to a module. The module will be added to the modules dictionary.

Parameters
  • name – The name of the module.

  • class_path – The path to the Python class.

Returns

True if module has been added, False if not.

Raises

ValueError – If module file not exists.

get(name: str) → core.module.Module

Returns a specific module.

Parameters

name – The name of the module.

get_modules_list() → KeysView

Returns a list with all names of all modules.

Returns

List of modules names.

get_worker_instance(module_name: str, class_path: str) → core.prototype.Prototype

Loads a Python class from a given path and returns the instance.

Parameters
  • module_name – Name of the module.

  • class_path – Path to the Python class.

Returns

Instance of Python class or None.

has_module(name: str) → bool

Returns whether or not module is found.

Parameters

name – The name of the module.

Returns

True if module is found, False if not.

kill(name: str) → None

Kills a module (stops worker and messenger).

Parameters

name – The name of the module.

kill_all() → None

Kills all modules (stops all workers and messengers).

load_all() → None

Loads all modules.

module_exists(class_path: str) → bool

Returns whether or not a OpenADMS Node module exists at the given class path.

Parameters

class_path – The path to the class.

Returns

True if module exists, False if not.

property modules
remove(name: str) → None

Removes a module.

Parameters

name – The name of the module.

remove_all() → None

Removes all modules.

start(name: str) → None

Starts a module.

Parameters

name – The name of the module.

start_all() → None

Starts all modules.

stop(name: str) → None

Stops a module.

Parameters

name – The name of the module.

stop_all() → None

Stops all modules.

class core.manager.Node(name: str, id: str, description: str)

Bases: object

Node stores name, description, and ID of the sensor node.

property description
property id
property name
class core.manager.NodeManager(manager: core.manager.Manager)

Bases: object

NodeManager loads and stores the node configuration.

load_all() → None

Loads node configuration.

property node
remove_all() → None

Clears everything.

class core.manager.Project(name: str, id: str, description: str)

Bases: object

Project stores name, description, and ID of the monitoring project.

property description
property id
property name
class core.manager.ProjectManager(manager: core.manager.Manager)

Bases: object

ProjectManager loads and stores the project configuration.

load_all() → None

Loads the project configuration and meta information.

property project
remove_all() → None

Clears everything.

class core.manager.SchemaManager(schemas_root_path: str = 'schemas')

Bases: object

SchemaManager stores JSON schemas and validates given data with them.

add_schema(data_type: str, path: str) → bool

Reads a JSON schema file from the given path and stores it in the internal dictionary.

Parameters
  • data_type – The name of the data type (e.g., ‘observation’).

  • path – The path to the JSON schema file.

Returns

True if schema has been added, False if not.

get_schema_path(class_path: str) → pathlib.Path

Uses the class path of a module to generate the path to the configuration schema file.

For instance, the given class path modules.schedule.Scheduler will be converted to the file path modules/schedule/scheduler.json.

Parameters

class_path – The class path of a module.

Returns

The path to the JSON schema of the module’s configuration.

has_schema(name: str) → bool

Returns whether or not a JSON schema for the given name exists.

Parameters

name – Name of the schema (e.g., ‘observation’).

Returns

True if schema exists, False if not.

is_valid(data: Dict, schema_name: str) → bool

Validates data with JSON schema and returns result.

Parameters
  • data – The data.

  • schema_name – The name of the schema used for validation.

Returns

True if data is valid, False if not.

load_all() → None

Initialises the schemas dictionary.

remove(name: str) → None

Removes a schema.

Parameters

name – The name of the schema.

remove_all() → None

Removes all schemas.

class core.manager.SensorManager(config_manager: core.manager.ConfigManager)

Bases: object

SensorManager stores and manages objects of type Sensor.

add_sensor(name: str, sensor: core.sensor.Sensor) → None

Adds a sensor to the sensors dictionary.

Parameters
  • name – The name of the sensor.

  • sensor – The sensor object.

get(name: str) → core.sensor.Sensor

Returns the sensor object with the given name.

get_sensors_names() → KeysView

Returns a list with all sensor names.

load_all() → None

Creates the sensors defined in the configuration.

remove(name: str) → None

Removes a sensor from the sensors dictionary.

remove_all() → None

Removes all sensors.

property sensors

core.module module

The module class to bundle an OpenADMS Node worker with a messenger.

class core.module.Module(messenger: core.intercom.MQTTMessenger, worker: core.prototype.Prototype)

Bases: threading.Thread

Module bundles a worker with a messenger and manages the communication between them.

property messenger
publish(target: str, message: str, qos: int = 0, retain: bool = False) → None

Sends an Observation object to the next receiver by using the messenger.

Parameters
  • target – Name of the topic.

  • message – Message in JSON format.

  • qos – Quality of Service (0, 1, or 2).

  • retain – Retained message or not.

retrieve(message: List[Dict]) → None

Callback function for the messenger. New data from the message broker lands here.

Parameters

message – Header and payload of the message, both Dict.

run() → None

Checks the inbox for new messages and calls the handle() method of the worker for further processing. Runs within a thread.

start_worker() → None

Starts the worker.

stop() → None

Stops the thread.

stop_worker() → None

Stops the worker.

property topic
property worker

core.monitor module

Main monitoring module. Everything starts here.

class core.monitor.Monitor(config_file_path: str)

Bases: object

Monitor is used to manage the monitoring process by creating a schema manager, configuration manager, a sensor manager, and a module manager.

kill_all() → None

Kills all modules

load_all() → None

Calls managers to load and initialise everything.

remove_all() → None

Clears all managers.

restart() → None

Clears and restarts everything.

start() → None

Starts all modules.

stop() → None

Stops all modules.

core.observation module

Structures to store observation data from sensors.

class core.observation.Observation(data=None)

Bases: object

Observation stores all information regarding a request to and a response by a sensor in a dictionary. Filled with initial information from the configuration file and later supplemented by data of the processing modules. Can easily be transformed to JSON format.

static create_response_set(type: str, unit: str, value: ResponseType) → Dict[str, ResponseType]

Creates a response set containing type, unit, and value.

Parameters
  • type – Type of the response (e.g., ‘float’).

  • unit – Unit of the response (e.g., ‘m’).

  • value – The value of the response (e.g., ‘17.53’).

Returns

Dictionary with type, unit, and value.

property data
get(key: str, default: Any = None) → ValueType

Returns the value to a given key.

Parameters
  • key – The key of the value.

  • default – Default return value.

Returns

Single value from the observation data.

static get_header() → Dict[str, str]

Returns the header of an observation message.

Returns

Dictionary with header information.

static get_new_id() → str

Returns a new observation id.

Returns

New UUID4.

get_response_type(name: str) → Optional[str]

Returns the type of a given response set.

Parameters

name – Name of the response set.

Returns

Type of the response set.

get_response_unit(name: str) → Optional[str]

Returns the unit of a given response set.

Parameters

name – Name of the response set.

Returns

Unit of the response set.

get_response_value(name: str) → Optional[ResponseType]

Returns the value of a given response set.

Parameters

name – Name of the response set.

Returns

Value of the response set.

get_value(*args: str) → Optional[ResponseType]

Returns the value of a set of keys.

Parameters

*args – The keys.

Returns

Single value from the observation data.

has_response_type(name: str) → bool

Returns whether the type of a given response set exists or not.

Parameters

name – Name of the response set.

Returns

True if type exists, else if not.

has_response_unit(name: str) → bool

Returns whether the unit of a given response set exists or not.

Parameters

name – Name of the response set.

Returns

True if unit exists, else if not.

has_response_value(name: str) → bool

Returns whether the value of a given response set exists or not.

Parameters

name – Name of the response set.

Returns

True if value exists, else if not.

set(key: str, value: Any) → None

Sets key and value in the data set.

Parameters
  • key – The key of the data set value.

  • value – The data set value.

to_json() → str

Returns a dump of the data set in JSON format.

Returns

Data in JSON format.

Return type

str

core.prototype module

Prototype class which can be used as a blueprint for other OpenADMS modules.

class core.prototype.Prototype(module_name: str, module_type: str, manager: Any)

Bases: object

Prototype is used as a blueprint for OpenADMS workers.

add_handler(data_type: str, func: Callable[[Dict, Dict], None]) → None

Registers a callback function for handling of messages.

Parameters
  • data_type – Name of the data type (observation, service, …).

  • func – Callback function for handling the message.

do_handle_observation(header: Dict, payload: Dict) → None

Handles an observation by forwarding it to the processing method and prepares the result for publishing.

Parameters
  • header – Message header.

  • payload – Message payload.

do_handle_service(header: Dict, payload: Dict) → None

Processes service messages and starts or stops the receiving module.

Parameters
  • header – Message header.

  • payload – Message payload.

Example

A message to stop a module can be, for instance:

{
    {
        "type": "service",
        "from": "foo"
    },
    {
        "action": "stop"
    }
}

Set action to start to start the module.

get_module_config(*args)

Returns the validated configuration of the module. If no JSON schemas is available, the function just returns an unchecked configuration.

Parameters

*args – Key names to the configuration in the dictionary.

Returns

A dictionary with the module’s configuration.

handle(message: List[Dict]) → None

Processes messages by calling callback functions for data handling.

Parameters

message – Header and payload of the message.

property is_running
is_sequence(arg: Any) → bool

Checks whether the argument is a list/a tuple or not.

Returns

True if argument is a sequence, False if not.

is_valid(data: Dict, data_type: str) → bool

Returns whether or not given data is valid by checking against the JSON schemas.

Parameters
  • data – Data to check.

  • data_type – Name of the data type.

Returns

True if data is valid, False if not.

property name
process_observation(obs: core.observation.Observation) → core.observation.Observation

Processes an observation object. Will be overridden by actual worker.

Parameters

obs – Observation object.

Returns

The processed observation object.

publish(target: str, header: Dict, payload: Dict, qos: int = 0, retain: bool = False) → None

Appends header and payload to a list, converts the list to a JSON string and sends it to the designated target by using the callback function _uplink(). The JSON string has the format:

{
  "header": <header>,
  "payload": <payload>
}
Parameters
  • target – Name of the target.

  • header – Header of the message.

  • payload – Payload of the message.

  • qos – Quality of Service (0, 1, or 2).

  • retain – Retained message or not.

publish_observation(obs: core.observation.Observation) → None

Prepares the observation for publishing and forwards it to the messenger.

Parameters

obs – Observation object.

start() → None

Starts the worker.

stop() → None

Stops the worker.

property type

core.sensor module

Structures to store sensor information.

class core.sensor.Sensor(name: str, config: Dict[str, Any])

Bases: object

Sensor stores the configuration of a sensor, especially, all defined observations.

create_observation(data: Dict[str, Any]) → core.observation.Observation

Creates an observation object.

Parameters

data – The observation data.

property description
get_observation(name: str) → None

Returns a single observation.

Parameters

name – The name of the observation.

get_observations() → Dict[str, core.observation.Observation]

Returns all observations.

Returns

Dictionary with observation objects.

property name
property type
class core.sensor.SensorType

Bases: object

SensorType is a static class used to determine the type of a sensor.

static is_total_station(name: str) → bool

Returns whether or not the given name is a total station.

Returns

True if name is a total station, False if not.

static is_weather_station(name: str) → bool

Returns whether or not the given name is a weather station.

Returns

True if name is a weather station, False if not.

total_stations = ['rts', 'tachymeter', 'totalstation', 'tps', 'tst']
weather_stations = ['meteo', 'meteorologicalsensor', 'meteorologicalstation', 'weatherstation']

core.system module

Methods for getting system information.

class core.system.System

Bases: object

System is a helper class to retrieve information about the used platform and system resources.

static get_current_year() → int

Returns the current year.

Returns

Current year.

static get_date_time() → str

Returns local date and time.

Returns

String containing date and time.

static get_host_name() → str

Returns the host name of the system.

Returns

String containing the host name.

static get_machine() → str

Returns the hardware architecture.

Returns

String with the hardware architecture.

static get_openadms_string() → str

Returns a string with OpenADMS Node version and version name:

Returns

Complete OpenADMS Node version string.

static get_openadms_version() → float

Returns the current version of OpenADMS:

Returns

Version number.

static get_openadms_version_name() → str

Returns the code name of the current OpenADMS Node version.

Returns

String with the version name.

static get_os_name() → str

Returns the name of the operating system.

Returns

String with the OS name.

static get_os_version() → str

Returns the version of the operating system.

Returns

Release number of the OS.

static get_python_version() → str

Returns Python implementation and version (e.g., ‘CPython 3.5.1’).

Returns

String with name and version number.

static get_root_dir() → pathlib.Path

Returns the root directory of OpenADMS.

Returns

Path object.

static get_software_uptime_string() → str

Returns the software uptime as a formatted string (days, hours, minutes, seconds).

Returns

String with the software uptime.

static get_system_string() → str

Returns a string containing operating system and hardware architecture (e.g., ‘Windows 7 (AMD64)’).

Returns

String with OS name and architecture.

static get_system_uptime_string() → str

Returns the system uptime as a formatted string (days, hours, minutes, seconds).

Returns

String with the uptime.

static get_uptime() → float

Returns the system uptime in seconds.

Returns

Uptime in seconds.

static is_windows() → bool

Returns whether the current operating system is a version of Microsoft Windows or not.

Returns

True if Windows, false if not.

start_time = <Arrow [2020-03-23T13:42:05.602281+01:00]>

core.util module

Static utility routines.

core.util.gon_to_rad(angle: float) → float

Converts from gon (grad) to radiant.

Parameters

angle – Angle in gon.

Returns

Converted angle in rad.

core.util.rad_to_gon(angle: float) → float

Converts from radiant to gon (grad).

Parameters

angle – Angle in rad.

Returns

Converted angle in gon.

core.version module

The current version number and name of OpenADMS.

Module contents