aiokatcp package

Subpackages

Submodules

aiokatcp.adjtimex module

Python wrapper for the Linux-specific adjtimex() system call.

class aiokatcp.adjtimex.Timeval

Bases: Structure

See https://man7.org/linux/man-pages/man3/adjtime.3.html.

tv_sec

Structure/Union member

tv_usec

Structure/Union member

class aiokatcp.adjtimex.Timex

Bases: Structure

See https://man7.org/linux/man-pages/man2/adjtimex.2.html.

calcnt

Structure/Union member

constant

Structure/Union member

errcnt

Structure/Union member

esterror

Structure/Union member

freq

Structure/Union member

jitcnt

Structure/Union member

jitter

Structure/Union member

maxerror

Structure/Union member

modes

Structure/Union member

offset

Structure/Union member

ppsfreq

Structure/Union member

precision

Structure/Union member

shift

Structure/Union member

stabil

Structure/Union member

status

Structure/Union member

stbcnt

Structure/Union member

tai

Structure/Union member

tick

Structure/Union member

time

Structure/Union member

tolerance

Structure/Union member

aiokatcp.adjtimex.get_adjtimex() Tuple[int, Timex]

Read-only adjtimex call.

Returns:

  • int – Clock state (one of the TIME_* constants)

  • Timex – Clock information

aiokatcp.client module

class aiokatcp.client.ClientMeta(name, bases, namespace, **kwds)

Bases: type

exception aiokatcp.client.ProtocolError(msg, version)

Bases: ValueError

The server does not implement the required protocol version

class aiokatcp.client.Client(host: str, port: int, *, auto_reconnect: bool = True, limit: int = 16777216, loop: Optional[AbstractEventLoop] = None)

Bases: object

Client that connects to a katcp server.

The client will automatically connect to the server, and reconnect if the connection is lost. If you want to wait for the initial connection to complete up front, the connect() factory may be preferable to the constructor.

Parameters:
  • host – Server hostname

  • port – Server port number

  • limit – Maximum line length in a message from the server

  • loop – Event loop on which the client will run, defaulting to asyncio.get_event_loop().

is_connected

Whether the connection is currently established.

Type:

bool

last_exc

An exception object associated with the last connection attempt. It is always None if is_connected is True.

Type:

Exception

async handle_message(conn: Connection, msg: Message) None

Called by Connection for each incoming message.

handle_inform(msg: Message) None
unhandled_inform(msg: Message) None

Called if an inform is received for which no handler is registered.

The default simply logs a debug message if there are no inform callbacks registered for the message. Subclasses may override this to provide other behaviour for unknown informs.

inform_version_connect(api: str, version: str, build_state: Optional[str] = None) None
inform_disconnect(reason: str) None
add_connected_callback(callback: Callable[[], None]) None

Register a handler that is called when a connection is established.

The handler is called without arguments. Use a lambda or functools.partial() if you need arguments. Handlers are called in the order they are registered.

remove_connected_callback(callback: Callable[[], None]) None

Remove a callback registered with add_connected_callback().

add_disconnected_callback(callback: Callable[[], None]) None

Register a handler that is called when a connection is lost.

The handler is called without arguments. Use a lambda or functools.partial() if you need arguments. Handlers are called in the reverse of order of registration.

remove_disconnected_callback(callback: Callable[[], None]) None

Remove a callback registered with add_disconnected_callback().

add_failed_connect_callback(callback: Callable[[Exception], None]) None

Register a handler that is called when a connection attempt fails.

The handler is passed an exception object. Handlers are called in the order of registration.

remove_failed_connect_callback(callback: Callable[[Exception], None]) None

Remove a callback registered with add_failed_connect_callback().

add_inform_callback(name: str, callback: Callable[[...], None]) None

Add a callback called on every asynchronous inform.

The message arguments are unpacked according to the type annotations on the arguments of the callback. Callbacks are called in the order registered, after any handlers defined by methods in the class.

remove_inform_callback(name: str, callback: Callable[[...], None]) None

Remove a callback registered with add_inform_callback().

close() None

Start closing the connection.

Closing completes asynchronously. Use wait_closed() to wait for it to be fully complete.

async wait_closed() None

Wait for the process started by close() to complete.

async wait_connected() None

Wait until a connection is established.

If construct with auto_reconnect=False, then this will raise an exception if the single connection attempt failed. Otherwise, it will block indefinitely until a connection is successful.

Note

On return, it is possible that is_connected is false, because the connection may fail immediately after waking up the waiter.

async wait_disconnected() None

Wait until there is no connection

async classmethod connect(host: str, port: int, *, auto_reconnect: bool = True, limit: int = 16777216, loop: Optional[AbstractEventLoop] = None) Client

Factory function that creates a client and waits until it is connected.

Refer to the constructor documentation for details of the parameters.

async request_raw(name: str, *args: Any) Tuple[Message, List[Message]]

Make a request to the server and await the reply, without decoding it.

Parameters:
  • name – Message name

  • args – Message arguments, which will be encoded by Message.

Returns:

  • reply – Reply message

  • informs – List of synchronous informs received

Raises:
  • BrokenPipeError – if not connected at the time the request was made

  • ConnectionError – if the connection was lost before the reply was received

async request(name: str, *args: Any) Tuple[List[bytes], List[Message]]

Make a request to the server and await the reply.

It expects the first argument of the reply to be ok, fail or invalid, and raises exceptions in the latter two cases. If this is undesirable, use request_raw() instead.

Parameters:
  • name – Message name

  • args – Message arguments, which will be encoded by Message.

Returns:

  • reply – Reply message arguments, excluding the ok

  • informs – List of synchronous informs received

Raises:
  • FailReply – if the server replied with fail

  • InvalidReply – if the server replied anything except ok or fail

  • BrokenPipeError – if not connected at the time the request was made

  • ConnectionError – if the connection was lost before the reply was received

async sensor_reading(sensor_name: str, sensor_type: None = None) Reading
async sensor_reading(sensor_name: str, sensor_type: Type[_T]) Reading[_T]

Request the reading of a single sensor from the server.

This is a wrapper around a ?sensor-value request that decodes the result. If you know the type of the sensor, it can be passed as a parameter; if it is not specified, ?sensor-list is used to determine it. Note that this introduces a race condition (but an unlikely one) where the sensor could be replaced by one of a different type between the two requests.

If sensor_type is not given and the sensor has a discrete type, the returned reading will contain a byte string rather than an enum. Similarly, string sensors are returned as byte strings, but sensor_type can be passed as str to override this.

This is not a high-performance interface. If you need to sample a large number of sensors, better performance can be obtained with hand-coded implementations, such as by pipelining multiple requests.

Raises:
  • FailReply – If any of the requests fails e.g., because the sensor does not exist.

  • InvalidReply – If any of the requests is invalid. This generally indicates a bug, either in this function or in the server.

async sensor_value(sensor_name: str, sensor_type: None = None) Any
async sensor_value(sensor_name: str, sensor_type: Type[_T]) _T

Request the value of a single sensor from the server.

See sensor_reading() for more information. This is a thin wrapper that just returns the value from the reading.

Raises:

ValueError – if the sensor status indicates that the value is invalid.

add_sensor_watcher(watcher: AbstractSensorWatcher) None
remove_sensor_watcher(watcher: AbstractSensorWatcher) None
class aiokatcp.client.SyncState(value)

Bases: Enum

State of synchronisation of an AbstractSensorWatcher

DISCONNECTED = 1

Not currently connected to the server

SYNCING = 2

Connected to the server, but still subscribing to sensors

SYNCED = 3

Connected to the server and sensor list is up to date

CLOSED = 4

Client object has been closed (Client.close())

class aiokatcp.client.AbstractSensorWatcher

Bases: object

Base class for receiving notifications about sensor changes.

This class is intended to be subclassed to implement any of the notification callbacks.

sensor_added(name: str, description: str, units: str, type_name: str, *args: bytes) None

A sensor was added on the remote server.

This is also called if a sensor changed its properties. In that case there is no call to sensor_removed().

sensor_removed(name: str) None

A sensor disappeared from the remote server.

sensor_updated(name: str, value: bytes, status: Status, timestamp: float) None

The value of a sensor changed on the remote server.

batch_start() None

Called at the start of a batch of back-to-back updates.

Calls to sensor_added(), sensor_removed() and sensor_updated() will always be bracketed by batch_start() and batch_stop(). This does not apply to state_updated().

batch_stop() None

Called at the end of a batch of back-to-back updates.

state_updated(state: SyncState) None

Indicates the state of the synchronisation state machine.

Implementations should assume the initial state is SyncState.DISCONNECTED.

class aiokatcp.client.DiscreteMixin

Bases: object

property katcp_value
class aiokatcp.client.SensorWatcher(client: Client, enum_types: Sequence[Type[Enum]] = ())

Bases: AbstractSensorWatcher

Sensor watcher that mirrors sensors into a SensorSet.

Parameters:
  • client – Client to which this watcher will be attached. It is currently only used to get the correct logger and event loop.

  • enum_types – Enum types to be used for discrete sensors. An enum type is used if it has the same legal values in the same order as the remote sensor. If a discrete sensor has no matching enum type, one is synthesized on the fly.

sensors

The mirrored sensors

Type:

SensorSet

synced

Event that is set whenever the state is SyncState.SYNCED

Type:

asyncio.Event

SENSOR_TYPES = {'address': <class 'aiokatcp.core.Address'>, 'boolean': <class 'bool'>, 'discrete': <enum 'Enum'>, 'float': <class 'float'>, 'integer': <class 'int'>, 'string': <class 'bytes'>, 'timestamp': <class 'aiokatcp.core.Timestamp'>}
rewrite_name(name: str) Union[str, Sequence[str]]

Convert name of incoming sensor to name to use in the sensor set.

This defaults to the identity, but can be overridden to provide name mangling. It may also return a sequence of names, in which case the original sensor is replicated under each of those names.

make_type(type_name: str, parameters: Sequence[bytes]) type

Get the sensor type for a given type name

sensor_added(name: str, description: str, units: str, type_name: str, *args: bytes) None

A sensor was added on the remote server.

This is also called if a sensor changed its properties. In that case there is no call to sensor_removed().

sensor_removed(name: str) None

A sensor disappeared from the remote server.

sensor_updated(name: str, value: bytes, status: Status, timestamp: float) None

The value of a sensor changed on the remote server.

state_updated(state: SyncState) None

Indicates the state of the synchronisation state machine.

Implementations should assume the initial state is SyncState.DISCONNECTED.

aiokatcp.connection module

class aiokatcp.connection.ConvertCRProtocol(stream_reader, client_connected_cb=None, loop=None)

Bases: StreamReaderProtocol

Protocol that converts incoming carriage returns to newlines.

This simplifies extracting the data with asyncio.StreamReader, whose readuntil() method is limited to a single separator.

data_received(data: bytes) None

Called when some data is received.

The argument is a bytes object.

async aiokatcp.connection.read_message(stream: StreamReader) Optional[Message]

Read a single message from an asynchronous stream.

If EOF is reached before reading the newline, returns None if there was no data, otherwise raises aiokatcp.core.KatcpSyntaxError.

Parameters:

stream – Input stream

Raises:

aiokatcp.core.KatcpSyntaxError – if the line was too long or malformed.

exception aiokatcp.connection.FailReply

Bases: Exception

Indicate to the remote end that a request failed, without backtrace

exception aiokatcp.connection.InvalidReply

Bases: Exception

Indicate to the remote end that a request was unrecognised

class aiokatcp.connection.ConnectionLoggerAdapter(logger, extra)

Bases: LoggerAdapter

process(msg, kwargs)

Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.

Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.

class aiokatcp.connection.Connection(owner: _ConnectionOwner[_C], reader: StreamReader, writer: StreamWriter, is_server: bool)

Bases: object

write_messages(msgs: Iterable[Message]) None

Write a stream of messages to the connection.

Connection errors are logged and swallowed.

write_message(msg: Message) None

Write a message to the connection.

Connection errors are logged and swallowed.

async drain() None

Block until the outgoing write buffer is small enough.

close() None

Start closing the connection.

Any currently running message handler will be cancelled. The closing process completes asynchronously. Use wait_closed() to wait for things to be completely closed off.

async wait_closed() None

Wait until the connection is closed.

This can be used either after close(), or without close() to wait for the remote end to close the connection.

aiokatcp.connection.wrap_handler(name: str, handler: Callable, fixed: int) Callable

Convert a handler that takes a sequence of typed arguments into one that takes a message.

The message is unpacked to the types given by the signature. If it could not be unpacked, the wrapper raises FailReply.

Parameters:
  • name – Name of the message (only used to form error messages).

  • handler – The callable to wrap (may be a coroutine).

  • fixed – Number of leading parameters in handler that do not correspond to message arguments.

aiokatcp.core module

class aiokatcp.core.Address(host: Union[IPv4Address, IPv6Address], port: Optional[int] = None)

Bases: object

A katcp address.

Parameters:
  • host – Host address

  • port – Port number

property host: Union[IPv4Address, IPv6Address]

Host address

property port: Optional[int]

Port number

classmethod parse(raw: bytes) Address

Construct an Address from a katcp message argument

Parameters:

raw – Unescaped value in katcp message argument

Raises:

ValueError – If raw does not represent a valid address

class aiokatcp.core.Timestamp(x=0, /)

Bases: float

A katcp timestamp.

This is just a thin wrapper around float to allow the type to be distinguished. It represents time in seconds as a UNIX timestamp.

class aiokatcp.core.Now(value)

Bases: Enum

Singleton for representing a timestamp specified as now in the protocol.

NOW = 0
class aiokatcp.core.LogLevel(value)

Bases: IntEnum

katcp log level, with values matching Python log levels

ALL = 0
TRACE = 0
DEBUG = 10
INFO = 20
WARN = 30
ERROR = 40
FATAL = 50
OFF = 60
classmethod from_python(level: int) LogLevel

Map Python log level to katcp log level

class aiokatcp.core.DeviceStatus(value)

Bases: Enum

Discrete device-status readings.

OK = 1
DEGRADED = 2
FAIL = 3
class aiokatcp.core.TypeInfo(type_: Type[_T], name: str, encode: Callable[[_T], bytes], get_decoder: Callable[[Type[_T]], Callable[[bytes], _T]], default: Callable[[Type[_T]], _T])

Bases: Generic[_T]

Type database entry. Refer to register_type() for details.

decode(cls: Type[_T], value: bytes) _T
aiokatcp.core.register_type(type_: Type[_T], name: str, encode: Callable[[_T], bytes], get_decoder: Callable[[Type[_T]], Callable[[bytes], _T]], default: Optional[Callable[[Type[_T]], _T]] = None) None

Register a type for encoding and decoding in messages.

The registration is also used for subclasses of type_ if no more specific registration has been made. This is particularly used for the registration for enum.Enum, which is used for all enum types.

Parameters:
  • type – Python class.

  • encode – Function to encode values of this type to bytes

  • get_decoder – Function to that takes the actual derived class and returns a decoder that converts instances of bytes to that class.

  • default – Function to generate a default value of this type (used by the sensor framework). It is given the actual derived class as the first argument.

aiokatcp.core.encode(value: Any) bytes

Encode a value to raw bytes for katcp.

Parameters:

value – Value to encode

Raises:

TypeError – if the type of value has not been registered

See also

register_type()

aiokatcp.core.get_type(type_: Type[_T]) TypeInfo[_T]

Retrieve the type information previously registered with register_type().

It returns the last type info registered that is a superclass of type_ (according to issubclass.

Raises:

TypeError – if none of the registrations match type_

aiokatcp.core.get_decoder(cls: Type[_T]) Callable[[bytes], _T]

Get a decoder function.

See decode() for more details. This function is useful for efficiency: the decoder for a class can be looked up once then used many times.

aiokatcp.core.decode(cls: Any, value: bytes) Any

Decode value in katcp message to a type.

If a union type is provided, the value must decode successfully (i.e., without raising ValueError) for exactly one of the types in the union, otherwise a ValueError is raised.

Parameters:
  • cls – The target type, or a typing.Union of types.

  • value – Raw (but unescaped) value in katcp message

Raises:
  • ValueError – if value does not have a valid value for cls

  • TypeError – if cls is not a registered type or union of registered types.

See also

register_type()

exception aiokatcp.core.KatcpSyntaxError(message: str, raw: Optional[bytes] = None)

Bases: ValueError

Raised by parsers when encountering a syntax error.

class aiokatcp.core.Message(mtype: MessageType, name: str, *arguments: Any, mid: Optional[int] = None)

Bases: object

Type

alias of MessageType

OK = b'ok'
FAIL = b'fail'
INVALID = b'invalid'
mtype
name
arguments
mid
classmethod request(name: str, *arguments: Any, mid: Optional[int] = None) Message
classmethod reply(name: str, *arguments: Any, mid: Optional[int] = None) Message
classmethod inform(name: str, *arguments: Any, mid: Optional[int] = None) Message
classmethod reply_to_request(msg: Message, *arguments: Any) Message
classmethod inform_reply(msg: Message, *arguments: Any) Message
classmethod escape_argument(arg: bytes) bytes

Escape special bytes in an argument

classmethod unescape_argument(arg: bytes) bytes

Reverse of escape_argument()

classmethod parse(raw) Message

Create a Message from encoded representation.

Parameters:

raw – Bytes from the wire, including the trailing newline

Raises:

KatcpSyntaxError – If raw is not validly encoded.

reply_ok() bool

Return True if this is a reply and its first argument is ‘ok’.

aiokatcp.sensor module

class aiokatcp.sensor.DeltaObserver(*args, **kwargs)

Bases: Protocol[_T]

class aiokatcp.sensor.ClassicObserver(*args, **kwargs)

Bases: Protocol[_T]

class aiokatcp.sensor.Reading(timestamp: float, status: Status, value: _T)

Bases: Generic[_T]

Sensor reading

Parameters:
  • timestamp (float) – The UNIX timestamp at which the sensor value was determined

  • status (aiokatcp.sensor.Sensor.Status) – Sensor status at timestamp

  • value (aiokatcp.sensor._T) – Sensor value at timestamp

timestamp: float
status: Status
value: _T
class aiokatcp.sensor.Sensor(sensor_type: ~typing.Type[~aiokatcp.sensor._T], name: str, description: str = '', units: str = '', default: ~typing.Optional[~aiokatcp.sensor._T] = None, initial_status: ~aiokatcp.sensor.Sensor.Status = Status.UNKNOWN, *, status_func: ~typing.Callable[[~aiokatcp.sensor._T], ~aiokatcp.sensor.Sensor.Status] = <function _default_status_func>, auto_strategy: ~typing.Optional[~aiokatcp.sensor.SensorSampler.Strategy] = None, auto_strategy_parameters: ~typing.Iterable[~typing.Any] = ())

Bases: Generic[_T]

A sensor in a DeviceServer.

A sensor has some static configuration (name, description, units etc) and dynamic state consisting of a Reading (value, status and timestamp). Other code can attach observers to the sensor to be informed of updates.

Parameters:
  • sensor_type – The type of the sensor.

  • name – Sensor name

  • description – More detailed explanation of the sensor

  • units – Physical units of the sensor

  • default – Initial value of the sensor. When setting this, it may be desirable to specify initial_status too.

  • initial_status – Initial status of the sensor

  • status_func – Function that maps a value to a status in set_value() if none is given. The default is a function that always returns NOMINAL.

  • auto_strategy – Sampling strategy to use when a client requests the auto strategy. The default is to send all updates of the value to the client immediately.

  • auto_strategy_parameters – Parameters to use with auto_strategy. They must be already-decoded values.

class Status(value)

Bases: IntEnum

An enumeration.

UNKNOWN = 0
NOMINAL = 1
WARN = 2
ERROR = 3
FAILURE = 4
UNREACHABLE = 5
INACTIVE = 6
valid_value() bool

True if this state is one where the value provided is valid.

notify(reading: Reading[_T], old_reading: Reading[_T]) None

Notify all observers of changes to this sensor.

Users should not usually call this directly. It is called automatically by set_value().

set_value(value: _T, status: Optional[Status] = None, timestamp: Optional[float] = None) None

Set the current value of the sensor.

Parameters:
  • value – The value of the sensor (the type should be appropriate to the sensor’s type).

  • status – Whether the value represents an error condition or not. If not given, the status_func given to the constructor is used to determine the status from the value.

  • timestamp – The time at which the sensor value was determined (seconds). If not given, it defaults to time.time().

property value: _T

The current value of the sensor.

Modifying it invokes set_value().

property timestamp: float
property status: Status
property reading: Reading[_T]
property params: List[bytes]
attach(observer: Union[ClassicObserver[_T], DeltaObserver[_T]]) None
detach(observer: Union[ClassicObserver[_T], DeltaObserver[_T]]) None
class aiokatcp.sensor.SensorSampler(sensor: Sensor[_T], observer: Optional[Callable[[Sensor[_T], Reading[_T]], None]], loop: AbstractEventLoop, difference: Optional[_T] = None, shortest: Timestamp = 0.0, longest: Optional[Timestamp] = None, *, always_update: bool = False, is_auto: bool = False)

Bases: Generic[_T]

Implement the strategies defined by the sensor-sampling request.

This is an abstract base class. Instances should be constructed by calling factory().

It takes an “observer”, which is a callback function that is called when a sensor update should be sent to the subscribed client. When the sampler is constructed, the observer is called immediately, and then again when appropriate to the strategy.

It is possible to construct this class without an observer, and set it later. This is used by the sensor-sampling implementation to first validate the parameters before sending any readings.

Parameters:
  • sensor – The sensor to observe

  • observer – Callback function to invoke

  • loop – Asyncio event loop

  • difference – Minimum change in value before sending an update

  • shortest – Minimum time between updates

  • longest – Maximum time between updates (or None for no maximum)

  • always_update – If true, update on every sensor value assignment

  • is_auto – True if this sampler was created from the “auto” strategy

class Strategy(value)

Bases: Enum

An enumeration.

NONE = 0
AUTO = 1
PERIOD = 2
EVENT = 3
DIFFERENTIAL = 4
EVENT_RATE = 5
DIFFERENTIAL_RATE = 6
property observer: Optional[Callable[[Sensor[_T], Reading[_T]], None]]
close() None

Stop monitoring the sensor.

This should be called when the sampler is no longer needed. It is not valid to call any methods on the sampler after this.

parameters() tuple

Return the parameters with which the sensor was created.

classmethod factory(sensor: Sensor[_T], observer: Optional[Callable[[Sensor[_T], Reading[_T]], None]], loop: AbstractEventLoop, strategy: Strategy, *args: bytes) Optional[SensorSampler[_T]]
class aiokatcp.sensor.SensorSet

Bases: Mapping[str, Sensor]

A dict-like and set-like collection of sensors.

It is possible to monitor for removal of sensors using add_remove_callback().

add_remove_callback(callback: Callable[[Sensor], None]) None

Add a callback that will be passed any sensor removed from the set.

add_add_callback(callback: Callable[[Sensor], None]) None

Add a callback that will be passed any sensor added to the set.

remove_remove_callback(callback: Callable[[Sensor], None]) None

Remove a callback registered with add_remove_callback().

remove_add_callback(callback: Callable[[Sensor], None]) None

Remove a callback registered with add_add_callback().

add(elem: Sensor) None

Add an element to a set.

This has no effect if the element is already present.

remove(elem: Sensor) None

Remove an element from a set; it must be a member.

If the element is not a member, raise a KeyError.

discard(elem: Sensor) None

Remove an element from a set if it is a member.

If the element is not a member, do nothing.

clear() None.  Remove all items from D.
popitem() Tuple[str, Sensor]

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised

get(name: str) Optional[Sensor]
get(name: str, default: Union[Sensor, _T]) Union[Sensor, _T]

Return the value for key if key is in the dictionary, else default.

keys() a set-like object providing a view on D's keys
values() an object providing a view on D's values
items() a set-like object providing a view on D's items
copy() a shallow copy of D
class aiokatcp.sensor.AggregateSensor(target: SensorSet, sensor_type: Type[_T], name: str, description: str = '', units: str = '', *, auto_strategy: Optional[Strategy] = None, auto_strategy_parameters: Iterable[Any] = ())

Bases: Sensor[_T]

A Sensor with its reading determined by several other Sensors.

This is an abstract class: the user must implement update_aggregate(). This method is called whenever the target SensorSet has a sensor added, removed, or one of the sensors changes its reading. The user-defined update_aggregate() returns the new reading for the AggregateSensor.

Parameters are all as per Sensor, with the exception of target, which is the SensorSet from which the aggregated sensor will determine its own reading.

target

The set of sensors which will determine the reading of the aggregate one. The aggregate sensor may be included in the set (e.g. in self.sensors of a server) but it will not affect its own value.

Type:

SensorSet

abstract update_aggregate(updated_sensor: Optional[Sensor[_U]], reading: Optional[Reading[_U]], old_reading: Optional[Reading[_U]]) Optional[Reading[_T]]

Update the aggregated sensor.

The user is required to override this function, which must return the updated Reading (i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.

Parameters:
  • updated_sensor – The sensor in the target SensorSet which has changed in some way.

  • reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.

  • old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.

Returns:

The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.

Return type:

Optional[Reading]

filter_aggregate(sensor: Sensor) bool

Decide whether another sensor is part of the aggregation.

Users can override this function to exclude certain categories of sensors, such as other aggregates, to prevent circular references.

Returns:

True if sensor should be included in calculation of the aggregate, False if not.

Return type:

bool

class aiokatcp.sensor.SimpleAggregateSensor(target: SensorSet, sensor_type: Type[_T], name: str, description: str = '', units: str = '', *, auto_strategy: Optional[Strategy] = None, auto_strategy_parameters: Iterable[Any] = ())

Bases: AggregateSensor[_T]

A simplified version of AggregateSensor for common use cases.

This class is suitable when:

  • You don’t need direct control of the returned timestamp. It will be determined automatically.

  • The aggregate value and status can be determined from an internal state which is updated by adding or removing readings.

Subclasses must override aggregate_add(), aggregate_remove(), aggregate_compute() and optionally filter_aggregate().

Currently, the timestamp will be set to the larger of the last sensor update time (using the sensor timestamp) and the last addition or removal time (wallclock time). This may be changed in future releases based on implementation experience. This class is best suited to cases where child sensors are only added on startup and removed on teardown, rather than continuously added and removed.

abstract aggregate_add(sensor: Sensor[_U], reading: Reading[_U]) bool

Update internal state with an additional reading.

Returns:

True if the new reading should result in a state update.

Return type:

bool

abstract aggregate_remove(sensor: Sensor[_U], reading: Reading[_U]) bool

Update internal state by removing a reading.

Returns:

True if removing the reading should result in a state update.

Return type:

bool

abstract aggregate_compute() Tuple[Status, _T]

Compute aggregate status and value from the internal state.

update_aggregate(updated_sensor: Optional[Sensor[_U]], reading: Optional[Reading[_U]], old_reading: Optional[Reading[_U]]) Optional[Reading[_T]]

Update the aggregated sensor.

The user is required to override this function, which must return the updated Reading (i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.

Parameters:
  • updated_sensor – The sensor in the target SensorSet which has changed in some way.

  • reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.

  • old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.

Returns:

The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.

Return type:

Optional[Reading]

aiokatcp.server module

class aiokatcp.server.ClientConnection(owner: DeviceServer, reader: StreamReader, writer: StreamWriter)

Bases: Connection

Server’s view of the connection from a single client.

samplers_lock

Protects against concurrent request_sensor_sampling (but not sensor removal)

close() None

Start closing the connection.

Any currently running message handler will be cancelled. The closing process completes asynchronously. Use wait_closed() to wait for things to be completely closed off.

set_sampler(s: Sensor, sampler: Optional[SensorSampler]) None

Set or clear the sampler for a sensor.

get_sampler(s: Sensor) Optional[SensorSampler]

Retrieve the sampler for a sensor

sensor_update(s: Sensor, reading: Reading) None

Report a new sensor value. This is used as the callback for the sampler.

class aiokatcp.server.RequestContext(conn: ClientConnection, req: Message)

Bases: object

Interface for informs and replies to a request.

Parameters:
  • conn – Client connection from which the request originated

  • req – The request itself

property replied: bool

Whether a reply has currently been sent

reply(*args: Any) None

Send a reply to the request.

Parameters:

*args – The fields of the reply

Raises:

RuntimeError – If the request has already been replied to.

inform(*args: Any) None

Send an inform in response to the request.

Parameters:

*args – The fields of the inform

Raises:

RuntimeError – If the request has already been replied to.

informs(informs: Iterable[Iterable], *, send_reply=True) None

Write a sequence of informs and send an ok reply with the count.

Parameters:

informs – Each element is an iterable of fields in the inform.

Raises:

RuntimeError – If the request has already been replied to.

async drain() None

Wait for the outgoing queue to be below a threshold.

class aiokatcp.server.DeviceServerMeta(name, bases, namespace, **kwds)

Bases: type

class aiokatcp.server.DeviceServer(host: str, port: int, *, limit: int = 16777216, max_pending: int = 100, max_backlog: Optional[int] = None, loop: Optional[AbstractEventLoop] = None)

Bases: object

Server that handles katcp.

Parameters:
  • host – Hostname to listen on (empty for all interfaces)

  • port – Port number to listen on

  • limit – Maximum line length in a request

  • max_pending – Maximum number of asynchronous requests that can be in progress. Once this number of reached, new requests are blocked.

  • loop – Event loop on which the server will run, defaulting to asyncio.get_event_loop().

  • max_backlog

    Maximum backlog in the write queue to a client.

    If a message is to be sent to a client and it has more than this many bytes in its backlog, it is disconnected instead to prevent the server running out of memory. At present this is only applied to asynchronous informs, but it may be applied to other messages in future.

    If not specified it defaults to twice limit.

VERSION: str = None
BUILD_STATE: str = None
class LogHandler(server: DeviceServer)

Bases: Handler

Log handler that issues log messages as #log informs.

It is automatically initialised with a filter that discard log messages from the aiokatcp module, because otherwise one may get into an infinite recursion where log messages are communications with a client are communicated to the client.

It is also initialised with a default formatter that emits only the the log message itself.

Typical usage to inform clients about all log messages in the application is

logging.getLogger().addHandler(DeviceServer.LogHandler(server))
emit(record: LogRecord) None

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

async start() None

Start the server running on the event loop.

Raises:

RuntimeError – if the server is already running

async on_stop() None

Extension point for subclasses to run shutdown code.

This is called after the TCP server has been shut down and all in-flight requests have been completed or cancelled, but before service tasks are cancelled. Subclasses should override this function rather than stop() to run late shutdown code because this is called before the flag is set to wake up join().

It is only called if the server was running when stop() was called.

async stop(cancel: bool = True) None

Shut down the server.

Parameters:

cancel – If true (default), cancel any pending asynchronous requests.

halt(cancel: bool = True) Task

Begin server shutdown, but do not wait for it to complete.

Parameters:

cancel – If true (default), cancel any pending asynchronous requests.

Returns:

Task that is performing the stop.

Return type:

task

async join() None

Block until the server has stopped.

This will re-raise any exception raised by on_stop() or a service task.

property server: Optional[Server]

Return the underlying TCP server

property sockets: Tuple[socket, ...]

Sockets associated with the underlying server.

If start() has not yet been called, this will be empty.

property service_tasks: Tuple[Task, ...]
add_service_task(task: Task) None

Register an asynchronous task that runs as part of the server.

The task will be cancelled when the server is stopped. If it throws an exception (other than asyncio.CancelledError), the server will be halted and the exception will be rethrown from stop() or join().

Note

The task must actually be an instance of asyncio.Task rather than a coroutine.

send_version_info(ctx: RequestContext, *, send_reply=True) None

Send version information informs to the client.

This is used for asynchronous #version-connect informs when a client connects and in response to a ?version-list request.

Returns:

Number of informs sent

Return type:

num_informs

async unhandled_request(ctx: RequestContext, req: Message) None

Called when a request is received for which no handler is registered.

Subclasses may override this to do dynamic handling.

async handle_message(conn: ClientConnection, msg: Message) None

Called by ClientConnection for each incoming message.

mass_inform(name: str, *args: Any) None

Send an asynchronous inform to all clients.

Parameters:
  • name – Inform name

  • *args – Fields for the inform

async request_help(ctx: RequestContext, name: Optional[str] = None) None

Return help on the available requests.

Return a description of the available requests using a sequence of #help informs.

Parameters:

request (str, optional) – The name of the request to return help for (the default is to return help for all requests).

Informs:
  • request (str) – The name of a request.

  • description (str) – Documentation for the named request.

Returns:

  • success ({‘ok’, ‘fail’}) – Whether sending the help succeeded.

  • informs (int) – Number of #help inform messages sent.

Examples

?help
#help halt ...description...
#help help ...description...
...
!help ok 5

?help halt
#help halt ...description...
!help ok 1
async request_halt(ctx: RequestContext) None

Halt the device server.

Returns:

success – Whether scheduling the halt succeeded.

Return type:

{‘ok’, ‘fail’}

Examples

?halt
!halt ok
async request_watchdog(ctx: RequestContext) None

Check that the server is still alive.

Returns:

success

Return type:

{‘ok’}

Examples

?watchdog
!watchdog ok
async request_version_list(ctx: RequestContext) None

Request the list of versions of roles and subcomponents.

Informs:
  • name (str) – Name of the role or component.

  • version (str) – A string identifying the version of the component. Individual components may define the structure of this argument as they choose. In the absence of other information clients should treat it as an opaque string.

  • build_state_or_serial_number (str) – A unique identifier for a particular instance of a component. This should change whenever the component is replaced or updated.

Returns:

  • success ({‘ok’, ‘fail’}) – Whether sending the version list succeeded.

  • informs (int) – Number of #version-list inform messages sent.

Examples

?version-list
#version-list katcp-protocol 5.1-MIB
#version-list katcp-library katcp-python-0.4 katcp-python-0.4.1-py2
#version-list katcp-device foodevice-1.0 foodevice-1.0.0rc1
!version-list ok 3
async request_sensor_list(ctx: RequestContext, name: Optional[str] = None) int

Request the list of sensors.

The list of sensors is sent as a sequence of #sensor-list informs.

Parameters:

name (str, optional) – Name of the sensor to list (the default is to list all sensors). If name starts and ends with ‘/’ it is treated as a regular expression and all sensors whose names contain the regular expression are returned.

Informs:
  • name (str) – The name of the sensor being described.

  • description (str) – Description of the named sensor.

  • units (str) – Units for the value of the named sensor.

  • type (str) – Type of the named sensor.

  • params (list of str, optional) – Additional sensor parameters (type dependent). For discrete sensors the additional parameters are the allowed values. For all other types no additional parameters are sent.

Returns:

  • success ({‘ok’, ‘fail’}) – Whether sending the sensor list succeeded.

  • informs (int) – Number of #sensor-list inform messages sent.

Examples

?sensor-list
#sensor-list psu.voltage PSU\_voltage. V float
#sensor-list cpu.status CPU\_status. \@ discrete on off error
...
!sensor-list ok 5

?sensor-list cpu.power.on
#sensor-list cpu.power.on Whether\_CPU\_has\_power. \@ boolean
!sensor-list ok 1

?sensor-list /voltage/
#sensor-list psu.voltage PSU\_voltage. V float
#sensor-list cpu.voltage CPU\_voltage. V float
!sensor-list ok 2
async request_sensor_value(ctx: RequestContext, name: Optional[str] = None) None

Request the value of a sensor or sensors.

A list of sensor values as a sequence of #sensor-value informs.

Parameters:

name (str, optional) – Name of the sensor to poll (the default is to send values for all sensors). If name starts and ends with ‘/’ it is treated as a regular expression and all sensors whose names contain the regular expression are returned.

Informs:
  • timestamp (float) – Timestamp of the sensor reading in seconds since the Unix epoch, or milliseconds for katcp versions <= 4.

  • count ({1}) – Number of sensors described in this #sensor-value inform. Will always be one. It exists to keep this inform compatible with #sensor-status.

  • name (str) – Name of the sensor whose value is being reported.

  • status (Sensor.Status) – Sensor status (see Sensor.Status enum)

  • value (object) – Value of the named sensor. Type depends on the type of the sensor.

Returns:

  • success ({‘ok’, ‘fail’}) – Whether sending the list of values succeeded.

  • informs (int) – Number of #sensor-value inform messages sent.

Examples

?sensor-value
#sensor-value 1244631611.415231 1 psu.voltage nominal 4.5
#sensor-value 1244631611.415200 1 cpu.status warn off
...
!sensor-value ok 5

?sensor-value cpu.power.on
#sensor-value 1244631611.415231 1 cpu.power.on error 0
!sensor-value ok 1
async request_sensor_sampling(ctx: RequestContext, name: str, strategy: Optional[Strategy] = None, *args: bytes) tuple

Configure or query the way a sensor is sampled.

Sampled values are reported asynchronously using the #sensor-status message.

Parameters:
  • name – Name of the sensor whose sampling strategy to query or configure. When configuring it may be a comma-separated list of names.

  • strategy – Type of strategy to use to report the sensor value. The differential strategy type may only be used with integer or float sensors. If this parameter is supplied, it sets the new strategy.

  • *args – Additional strategy parameters (dependent on the strategy type). For the differential strategy, the parameter is an integer or float giving the amount by which the sensor value may change before an updated value is sent. For the period strategy, the parameter is the sampling period in float seconds. The event strategy has no parameters. Note that this has changed from KATCPv4. For the event-rate strategy, a minimum period between updates and a maximum period between updates (both in float seconds) must be given. If the event occurs more than once within the minimum period, only one update will occur. Whether or not the event occurs, the sensor value will be updated at least once per maximum period.

Returns:

  • success ({‘ok’, ‘fail’}) – Whether the sensor-sampling request succeeded.

  • name (str) – Name of the sensor queried or configured.

  • strategy (SensorSampler.Strategy) – Name of the new or current sampling strategy for the sensor.

  • params (list of str) – Additional strategy parameters (see description under Parameters).

Examples

?sensor-sampling cpu.power.on
!sensor-sampling ok cpu.power.on none

?sensor-sampling cpu.power.on period 500
!sensor-sampling ok cpu.power.on period 500
async request_client_list(ctx: RequestContext) None

Request the list of connected clients.

The list of clients is sent as a sequence of #client-list informs.

Informs:

addr (str) – The address of the client as host:port with host in dotted quad notation. If the address of the client could not be determined (because, for example, the client disconnected suddenly) then a unique string representing the client is sent instead.

Returns:

  • success ({‘ok’, ‘fail’}) – Whether sending the client list succeeded.

  • informs (int) – Number of #client-list inform messages sent.

Examples

?client-list
#client-list 127.0.0.1:53600
!client-list ok 1
async request_log_level(ctx: RequestContext, level: Optional[LogLevel] = None) LogLevel

Query or set the current logging level.

Parameters:

level ({'all', 'trace', 'debug', 'info', 'warn', 'error', 'fatal', 'off'}, optional) – Name of the logging level to set the device server to (the default is to leave the log level unchanged).

Returns:

  • success ({‘ok’, ‘fail’}) – Whether the request succeeded.

  • level ({‘all’, ‘trace’, ‘debug’, ‘info’, ‘warn’, ‘error’, ‘fatal’, ‘off’}) – The log level after processing the request.

Examples

?log-level
!log-level ok warn

?log-level info
!log-level ok info

aiokatcp.time_sync module

Utilities for creating time synchronisation sensors.

These use the Linux-specific adjtimex() system call to get information about time synchronisation. This does not give access to all the statistics that an NTP or PTP server would provide, but should be sufficient to check that time is being kept synchronised, and (at least on Linux) is accessible from inside a container with no privileges required.

class aiokatcp.time_sync.ClockState(value)

Bases: Enum

An enumeration.

OK = 0
INS = 1
DEL = 2
OOP = 3
WAIT = 4
ERROR = 5
class aiokatcp.time_sync.TimeSyncUpdater(sensor_map: Mapping[str, Sensor])

Bases: object

Maps raw adjtimex(2) fields to sensor values.

The sensors to populate are specified by a mapping, whose keys can be any subset of:

state

Return value of the adjtimex() call (as a ClockState)

maxerror

Maximum error, in seconds

esterror

Estimated error, in seconds

update() None

Update the sensors now.

Module contents

aiokatcp.minor_version()