aiokatcp package
Subpackages
Submodules
aiokatcp.adjtimex module
Python wrapper for the Linux-specific adjtimex()
system call.
- class aiokatcp.adjtimex.Timeval
Bases:
Structure
See https://man7.org/linux/man-pages/man3/adjtime.3.html.
- tv_sec
Structure/Union member
- tv_usec
Structure/Union member
- class aiokatcp.adjtimex.Timex
Bases:
Structure
See https://man7.org/linux/man-pages/man2/adjtimex.2.html.
- calcnt
Structure/Union member
- constant
Structure/Union member
- errcnt
Structure/Union member
- esterror
Structure/Union member
- freq
Structure/Union member
- jitcnt
Structure/Union member
- jitter
Structure/Union member
- maxerror
Structure/Union member
- modes
Structure/Union member
- offset
Structure/Union member
- ppsfreq
Structure/Union member
- precision
Structure/Union member
- shift
Structure/Union member
- stabil
Structure/Union member
- status
Structure/Union member
- stbcnt
Structure/Union member
- tai
Structure/Union member
- tick
Structure/Union member
- time
Structure/Union member
- tolerance
Structure/Union member
aiokatcp.client module
- class aiokatcp.client.ClientMeta(name, bases, namespace, **kwds)
Bases:
type
- exception aiokatcp.client.ProtocolError(msg, version)
Bases:
ValueError
The server does not implement the required protocol version
- class aiokatcp.client.Client(host: str, port: int, *, auto_reconnect: bool = True, limit: int = 16777216, loop: Optional[AbstractEventLoop] = None)
Bases:
object
Client that connects to a katcp server.
The client will automatically connect to the server, and reconnect if the connection is lost. If you want to wait for the initial connection to complete up front, the
connect()
factory may be preferable to the constructor.- Parameters:
host – Server hostname
port – Server port number
limit – Maximum line length in a message from the server
loop – Event loop on which the client will run, defaulting to
asyncio.get_event_loop()
.
- is_connected
Whether the connection is currently established.
- Type:
bool
- last_exc
An exception object associated with the last connection attempt. It is always
None
ifis_connected
is True.- Type:
Exception
- async handle_message(conn: Connection, msg: Message) None
Called by
Connection
for each incoming message.
- unhandled_inform(msg: Message) None
Called if an inform is received for which no handler is registered.
The default simply logs a debug message if there are no inform callbacks registered for the message. Subclasses may override this to provide other behaviour for unknown informs.
- inform_version_connect(api: str, version: str, build_state: Optional[str] = None) None
- inform_disconnect(reason: str) None
- add_connected_callback(callback: Callable[[], None]) None
Register a handler that is called when a connection is established.
The handler is called without arguments. Use a lambda or
functools.partial()
if you need arguments. Handlers are called in the order they are registered.
- remove_connected_callback(callback: Callable[[], None]) None
Remove a callback registered with
add_connected_callback()
.
- add_disconnected_callback(callback: Callable[[], None]) None
Register a handler that is called when a connection is lost.
The handler is called without arguments. Use a lambda or
functools.partial()
if you need arguments. Handlers are called in the reverse of order of registration.
- remove_disconnected_callback(callback: Callable[[], None]) None
Remove a callback registered with
add_disconnected_callback()
.
- add_failed_connect_callback(callback: Callable[[Exception], None]) None
Register a handler that is called when a connection attempt fails.
The handler is passed an exception object. Handlers are called in the order of registration.
- remove_failed_connect_callback(callback: Callable[[Exception], None]) None
Remove a callback registered with
add_failed_connect_callback()
.
- add_inform_callback(name: str, callback: Callable[[...], None]) None
Add a callback called on every asynchronous inform.
The message arguments are unpacked according to the type annotations on the arguments of the callback. Callbacks are called in the order registered, after any handlers defined by methods in the class.
- remove_inform_callback(name: str, callback: Callable[[...], None]) None
Remove a callback registered with
add_inform_callback()
.
- close() None
Start closing the connection.
Closing completes asynchronously. Use
wait_closed()
to wait for it to be fully complete.
- async wait_connected() None
Wait until a connection is established.
If construct with
auto_reconnect=False
, then this will raise an exception if the single connection attempt failed. Otherwise, it will block indefinitely until a connection is successful.Note
On return, it is possible that
is_connected
is false, because the connection may fail immediately after waking up the waiter.
- async wait_disconnected() None
Wait until there is no connection
- async classmethod connect(host: str, port: int, *, auto_reconnect: bool = True, limit: int = 16777216, loop: Optional[AbstractEventLoop] = None) Client
Factory function that creates a client and waits until it is connected.
Refer to the constructor documentation for details of the parameters.
- async request_raw(name: str, *args: Any) Tuple[Message, List[Message]]
Make a request to the server and await the reply, without decoding it.
- Parameters:
name – Message name
args – Message arguments, which will be encoded by
Message
.
- Returns:
reply – Reply message
informs – List of synchronous informs received
- Raises:
BrokenPipeError – if not connected at the time the request was made
ConnectionError – if the connection was lost before the reply was received
- async request(name: str, *args: Any) Tuple[List[bytes], List[Message]]
Make a request to the server and await the reply.
It expects the first argument of the reply to be
ok
,fail
orinvalid
, and raises exceptions in the latter two cases. If this is undesirable, userequest_raw()
instead.- Parameters:
name – Message name
args – Message arguments, which will be encoded by
Message
.
- Returns:
reply – Reply message arguments, excluding the
ok
informs – List of synchronous informs received
- Raises:
FailReply – if the server replied with
fail
InvalidReply – if the server replied anything except
ok
orfail
BrokenPipeError – if not connected at the time the request was made
ConnectionError – if the connection was lost before the reply was received
- async sensor_reading(sensor_name: str, sensor_type: None = None) Reading
- async sensor_reading(sensor_name: str, sensor_type: Type[_T]) Reading[_T]
Request the reading of a single sensor from the server.
This is a wrapper around a
?sensor-value
request that decodes the result. If you know the type of the sensor, it can be passed as a parameter; if it is not specified,?sensor-list
is used to determine it. Note that this introduces a race condition (but an unlikely one) where the sensor could be replaced by one of a different type between the two requests.If sensor_type is not given and the sensor has a discrete type, the returned reading will contain a byte string rather than an enum. Similarly, string sensors are returned as byte strings, but sensor_type can be passed as str to override this.
This is not a high-performance interface. If you need to sample a large number of sensors, better performance can be obtained with hand-coded implementations, such as by pipelining multiple requests.
- Raises:
FailReply – If any of the requests fails e.g., because the sensor does not exist.
InvalidReply – If any of the requests is invalid. This generally indicates a bug, either in this function or in the server.
- async sensor_value(sensor_name: str, sensor_type: None = None) Any
- async sensor_value(sensor_name: str, sensor_type: Type[_T]) _T
Request the value of a single sensor from the server.
See
sensor_reading()
for more information. This is a thin wrapper that just returns the value from the reading.- Raises:
ValueError – if the sensor status indicates that the value is invalid.
- add_sensor_watcher(watcher: AbstractSensorWatcher) None
- remove_sensor_watcher(watcher: AbstractSensorWatcher) None
- class aiokatcp.client.SyncState(value)
Bases:
Enum
State of synchronisation of an
AbstractSensorWatcher
- DISCONNECTED = 1
Not currently connected to the server
- SYNCING = 2
Connected to the server, but still subscribing to sensors
- SYNCED = 3
Connected to the server and sensor list is up to date
- CLOSED = 4
Client object has been closed (
Client.close()
)
- class aiokatcp.client.AbstractSensorWatcher
Bases:
object
Base class for receiving notifications about sensor changes.
This class is intended to be subclassed to implement any of the notification callbacks.
- sensor_added(name: str, description: str, units: str, type_name: str, *args: bytes) None
A sensor was added on the remote server.
This is also called if a sensor changed its properties. In that case there is no call to
sensor_removed()
.
- sensor_removed(name: str) None
A sensor disappeared from the remote server.
- sensor_updated(name: str, value: bytes, status: Status, timestamp: float) None
The value of a sensor changed on the remote server.
- batch_start() None
Called at the start of a batch of back-to-back updates.
Calls to
sensor_added()
,sensor_removed()
andsensor_updated()
will always be bracketed bybatch_start()
andbatch_stop()
. This does not apply tostate_updated()
.
- batch_stop() None
Called at the end of a batch of back-to-back updates.
- state_updated(state: SyncState) None
Indicates the state of the synchronisation state machine.
Implementations should assume the initial state is
SyncState.DISCONNECTED
.
- class aiokatcp.client.SensorWatcher(client: Client, enum_types: Sequence[Type[Enum]] = ())
Bases:
AbstractSensorWatcher
Sensor watcher that mirrors sensors into a
SensorSet
.- Parameters:
client – Client to which this watcher will be attached. It is currently only used to get the correct logger and event loop.
enum_types – Enum types to be used for discrete sensors. An enum type is used if it has the same legal values in the same order as the remote sensor. If a discrete sensor has no matching enum type, one is synthesized on the fly.
- sensors
The mirrored sensors
- Type:
SensorSet
- synced
Event that is set whenever the state is
SyncState.SYNCED
- Type:
asyncio.Event
- SENSOR_TYPES = {'address': <class 'aiokatcp.core.Address'>, 'boolean': <class 'bool'>, 'discrete': <enum 'Enum'>, 'float': <class 'float'>, 'integer': <class 'int'>, 'string': <class 'bytes'>, 'timestamp': <class 'aiokatcp.core.Timestamp'>}
- rewrite_name(name: str) Union[str, Sequence[str]]
Convert name of incoming sensor to name to use in the sensor set.
This defaults to the identity, but can be overridden to provide name mangling. It may also return a sequence of names, in which case the original sensor is replicated under each of those names.
- make_type(type_name: str, parameters: Sequence[bytes]) type
Get the sensor type for a given type name
- sensor_added(name: str, description: str, units: str, type_name: str, *args: bytes) None
A sensor was added on the remote server.
This is also called if a sensor changed its properties. In that case there is no call to
sensor_removed()
.
- sensor_removed(name: str) None
A sensor disappeared from the remote server.
- sensor_updated(name: str, value: bytes, status: Status, timestamp: float) None
The value of a sensor changed on the remote server.
- state_updated(state: SyncState) None
Indicates the state of the synchronisation state machine.
Implementations should assume the initial state is
SyncState.DISCONNECTED
.
aiokatcp.connection module
- class aiokatcp.connection.ConvertCRProtocol(stream_reader, client_connected_cb=None, loop=None)
Bases:
StreamReaderProtocol
Protocol that converts incoming carriage returns to newlines.
This simplifies extracting the data with
asyncio.StreamReader
, whosereaduntil()
method is limited to a single separator.- data_received(data: bytes) None
Called when some data is received.
The argument is a bytes object.
- async aiokatcp.connection.read_message(stream: StreamReader) Optional[Message]
Read a single message from an asynchronous stream.
If EOF is reached before reading the newline, returns
None
if there was no data, otherwise raisesaiokatcp.core.KatcpSyntaxError
.- Parameters:
stream – Input stream
- Raises:
aiokatcp.core.KatcpSyntaxError – if the line was too long or malformed.
- exception aiokatcp.connection.FailReply
Bases:
Exception
Indicate to the remote end that a request failed, without backtrace
- exception aiokatcp.connection.InvalidReply
Bases:
Exception
Indicate to the remote end that a request was unrecognised
- class aiokatcp.connection.ConnectionLoggerAdapter(logger, extra)
Bases:
LoggerAdapter
- process(msg, kwargs)
Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.
Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.
- class aiokatcp.connection.Connection(owner: _ConnectionOwner[_C], reader: StreamReader, writer: StreamWriter, is_server: bool)
Bases:
object
- write_messages(msgs: Iterable[Message]) None
Write a stream of messages to the connection.
Connection errors are logged and swallowed.
- write_message(msg: Message) None
Write a message to the connection.
Connection errors are logged and swallowed.
- async drain() None
Block until the outgoing write buffer is small enough.
- close() None
Start closing the connection.
Any currently running message handler will be cancelled. The closing process completes asynchronously. Use
wait_closed()
to wait for things to be completely closed off.
- aiokatcp.connection.wrap_handler(name: str, handler: Callable, fixed: int) Callable
Convert a handler that takes a sequence of typed arguments into one that takes a message.
The message is unpacked to the types given by the signature. If it could not be unpacked, the wrapper raises
FailReply
.- Parameters:
name – Name of the message (only used to form error messages).
handler – The callable to wrap (may be a coroutine).
fixed – Number of leading parameters in handler that do not correspond to message arguments.
aiokatcp.core module
- class aiokatcp.core.Address(host: Union[IPv4Address, IPv6Address], port: Optional[int] = None)
Bases:
object
A katcp address.
- Parameters:
host – Host address
port – Port number
- property host: Union[IPv4Address, IPv6Address]
Host address
- property port: Optional[int]
Port number
- class aiokatcp.core.Timestamp(x=0, /)
Bases:
float
A katcp timestamp.
This is just a thin wrapper around
float
to allow the type to be distinguished. It represents time in seconds as a UNIX timestamp.
- class aiokatcp.core.Now(value)
Bases:
Enum
Singleton for representing a timestamp specified as
now
in the protocol.- NOW = 0
- class aiokatcp.core.LogLevel(value)
Bases:
IntEnum
katcp log level, with values matching Python log levels
- ALL = 0
- TRACE = 0
- DEBUG = 10
- INFO = 20
- WARN = 30
- ERROR = 40
- FATAL = 50
- OFF = 60
- class aiokatcp.core.DeviceStatus(value)
Bases:
Enum
Discrete device-status readings.
- OK = 1
- DEGRADED = 2
- FAIL = 3
- class aiokatcp.core.TypeInfo(type_: Type[_T], name: str, encode: Callable[[_T], bytes], get_decoder: Callable[[Type[_T]], Callable[[bytes], _T]], default: Callable[[Type[_T]], _T])
Bases:
Generic
[_T
]Type database entry. Refer to
register_type()
for details.- decode(cls: Type[_T], value: bytes) _T
- aiokatcp.core.register_type(type_: Type[_T], name: str, encode: Callable[[_T], bytes], get_decoder: Callable[[Type[_T]], Callable[[bytes], _T]], default: Optional[Callable[[Type[_T]], _T]] = None) None
Register a type for encoding and decoding in messages.
The registration is also used for subclasses of type_ if no more specific registration has been made. This is particularly used for the registration for
enum.Enum
, which is used for all enum types.- Parameters:
type – Python class.
encode – Function to encode values of this type to bytes
get_decoder – Function to that takes the actual derived class and returns a decoder that converts instances of
bytes
to that class.default – Function to generate a default value of this type (used by the sensor framework). It is given the actual derived class as the first argument.
- aiokatcp.core.encode(value: Any) bytes
Encode a value to raw bytes for katcp.
- Parameters:
value – Value to encode
- Raises:
TypeError – if the type of value has not been registered
See also
- aiokatcp.core.get_type(type_: Type[_T]) TypeInfo[_T]
Retrieve the type information previously registered with
register_type()
.It returns the last type info registered that is a superclass of type_ (according to
issubclass
.- Raises:
TypeError – if none of the registrations match type_
- aiokatcp.core.get_decoder(cls: Type[_T]) Callable[[bytes], _T]
Get a decoder function.
See
decode()
for more details. This function is useful for efficiency: the decoder for a class can be looked up once then used many times.
- aiokatcp.core.decode(cls: Any, value: bytes) Any
Decode value in katcp message to a type.
If a union type is provided, the value must decode successfully (i.e., without raising
ValueError
) for exactly one of the types in the union, otherwise aValueError
is raised.- Parameters:
cls – The target type, or a
typing.Union
of types.value – Raw (but unescaped) value in katcp message
- Raises:
ValueError – if value does not have a valid value for cls
TypeError – if cls is not a registered type or union of registered types.
See also
- exception aiokatcp.core.KatcpSyntaxError(message: str, raw: Optional[bytes] = None)
Bases:
ValueError
Raised by parsers when encountering a syntax error.
- class aiokatcp.core.Message(mtype: MessageType, name: str, *arguments: Any, mid: Optional[int] = None)
Bases:
object
- Type
alias of
MessageType
- OK = b'ok'
- FAIL = b'fail'
- INVALID = b'invalid'
- mtype
- name
- arguments
- mid
- classmethod escape_argument(arg: bytes) bytes
Escape special bytes in an argument
- classmethod unescape_argument(arg: bytes) bytes
Reverse of
escape_argument()
- classmethod parse(raw) Message
Create a
Message
from encoded representation.- Parameters:
raw – Bytes from the wire, including the trailing newline
- Raises:
KatcpSyntaxError – If raw is not validly encoded.
- reply_ok() bool
Return True if this is a reply and its first argument is ‘ok’.
aiokatcp.sensor module
- class aiokatcp.sensor.DeltaObserver(*args, **kwargs)
Bases:
Protocol
[_T
]
- class aiokatcp.sensor.ClassicObserver(*args, **kwargs)
Bases:
Protocol
[_T
]
- class aiokatcp.sensor.Reading(timestamp: float, status: Status, value: _T)
Bases:
Generic
[_T
]Sensor reading
- Parameters:
timestamp (float) – The UNIX timestamp at which the sensor value was determined
status (aiokatcp.sensor.Sensor.Status) – Sensor status at timestamp
value (aiokatcp.sensor._T) – Sensor value at timestamp
- timestamp: float
- value: _T
- class aiokatcp.sensor.Sensor(sensor_type: ~typing.Type[~aiokatcp.sensor._T], name: str, description: str = '', units: str = '', default: ~typing.Optional[~aiokatcp.sensor._T] = None, initial_status: ~aiokatcp.sensor.Sensor.Status = Status.UNKNOWN, *, status_func: ~typing.Callable[[~aiokatcp.sensor._T], ~aiokatcp.sensor.Sensor.Status] = <function _default_status_func>, auto_strategy: ~typing.Optional[~aiokatcp.sensor.SensorSampler.Strategy] = None, auto_strategy_parameters: ~typing.Iterable[~typing.Any] = ())
Bases:
Generic
[_T
]A sensor in a
DeviceServer
.A sensor has some static configuration (name, description, units etc) and dynamic state consisting of a
Reading
(value, status and timestamp). Other code can attach observers to the sensor to be informed of updates.- Parameters:
sensor_type – The type of the sensor.
name – Sensor name
description – More detailed explanation of the sensor
units – Physical units of the sensor
default – Initial value of the sensor. When setting this, it may be desirable to specify initial_status too.
initial_status – Initial status of the sensor
status_func – Function that maps a value to a status in
set_value()
if none is given. The default is a function that always returns NOMINAL.auto_strategy – Sampling strategy to use when a client requests the
auto
strategy. The default is to send all updates of the value to the client immediately.auto_strategy_parameters – Parameters to use with auto_strategy. They must be already-decoded values.
- class Status(value)
Bases:
IntEnum
An enumeration.
- UNKNOWN = 0
- NOMINAL = 1
- WARN = 2
- ERROR = 3
- FAILURE = 4
- UNREACHABLE = 5
- INACTIVE = 6
- valid_value() bool
True if this state is one where the value provided is valid.
- notify(reading: Reading[_T], old_reading: Reading[_T]) None
Notify all observers of changes to this sensor.
Users should not usually call this directly. It is called automatically by
set_value()
.
- set_value(value: _T, status: Optional[Status] = None, timestamp: Optional[float] = None) None
Set the current value of the sensor.
- Parameters:
value – The value of the sensor (the type should be appropriate to the sensor’s type).
status – Whether the value represents an error condition or not. If not given, the status_func given to the constructor is used to determine the status from the value.
timestamp – The time at which the sensor value was determined (seconds). If not given, it defaults to
time.time()
.
- property value: _T
The current value of the sensor.
Modifying it invokes
set_value()
.
- property timestamp: float
- property params: List[bytes]
- attach(observer: Union[ClassicObserver[_T], DeltaObserver[_T]]) None
- detach(observer: Union[ClassicObserver[_T], DeltaObserver[_T]]) None
- class aiokatcp.sensor.SensorSampler(sensor: Sensor[_T], observer: Optional[Callable[[Sensor[_T], Reading[_T]], None]], loop: AbstractEventLoop, difference: Optional[_T] = None, shortest: Timestamp = 0.0, longest: Optional[Timestamp] = None, *, always_update: bool = False, is_auto: bool = False)
Bases:
Generic
[_T
]Implement the strategies defined by the
sensor-sampling
request.This is an abstract base class. Instances should be constructed by calling
factory()
.It takes an “observer”, which is a callback function that is called when a sensor update should be sent to the subscribed client. When the sampler is constructed, the observer is called immediately, and then again when appropriate to the strategy.
It is possible to construct this class without an observer, and set it later. This is used by the
sensor-sampling
implementation to first validate the parameters before sending any readings.- Parameters:
sensor – The sensor to observe
observer – Callback function to invoke
loop – Asyncio event loop
difference – Minimum change in value before sending an update
shortest – Minimum time between updates
longest – Maximum time between updates (or None for no maximum)
always_update – If true, update on every sensor value assignment
is_auto – True if this sampler was created from the “auto” strategy
- class Strategy(value)
Bases:
Enum
An enumeration.
- NONE = 0
- AUTO = 1
- PERIOD = 2
- EVENT = 3
- DIFFERENTIAL = 4
- EVENT_RATE = 5
- DIFFERENTIAL_RATE = 6
- close() None
Stop monitoring the sensor.
This should be called when the sampler is no longer needed. It is not valid to call any methods on the sampler after this.
- parameters() tuple
Return the parameters with which the sensor was created.
- class aiokatcp.sensor.SensorSet
Bases:
Mapping
[str
,Sensor
]A dict-like and set-like collection of sensors.
It is possible to monitor for removal of sensors using
add_remove_callback()
.- add_remove_callback(callback: Callable[[Sensor], None]) None
Add a callback that will be passed any sensor removed from the set.
- add_add_callback(callback: Callable[[Sensor], None]) None
Add a callback that will be passed any sensor added to the set.
- remove_remove_callback(callback: Callable[[Sensor], None]) None
Remove a callback registered with
add_remove_callback()
.
- remove_add_callback(callback: Callable[[Sensor], None]) None
Remove a callback registered with
add_add_callback()
.
- add(elem: Sensor) None
Add an element to a set.
This has no effect if the element is already present.
- remove(elem: Sensor) None
Remove an element from a set; it must be a member.
If the element is not a member, raise a KeyError.
- discard(elem: Sensor) None
Remove an element from a set if it is a member.
If the element is not a member, do nothing.
- clear() None. Remove all items from D.
- popitem() Tuple[str, Sensor]
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- pop(k[, d]) v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised
- get(name: str) Optional[Sensor]
- get(name: str, default: Union[Sensor, _T]) Union[Sensor, _T]
Return the value for key if key is in the dictionary, else default.
- keys() a set-like object providing a view on D's keys
- values() an object providing a view on D's values
- items() a set-like object providing a view on D's items
- copy() a shallow copy of D
- class aiokatcp.sensor.AggregateSensor(target: SensorSet, sensor_type: Type[_T], name: str, description: str = '', units: str = '', *, auto_strategy: Optional[Strategy] = None, auto_strategy_parameters: Iterable[Any] = ())
Bases:
Sensor
[_T
]A Sensor with its reading determined by several other Sensors.
This is an abstract class: the user must implement
update_aggregate()
. This method is called whenever the targetSensorSet
has a sensor added, removed, or one of the sensors changes its reading. The user-definedupdate_aggregate()
returns the new reading for the AggregateSensor.Parameters are all as per
Sensor
, with the exception of target, which is theSensorSet
from which the aggregated sensor will determine its own reading.- target
The set of sensors which will determine the reading of the aggregate one. The aggregate sensor may be included in the set (e.g. in self.sensors of a server) but it will not affect its own value.
- Type:
- abstract update_aggregate(updated_sensor: Optional[Sensor[_U]], reading: Optional[Reading[_U]], old_reading: Optional[Reading[_U]]) Optional[Reading[_T]]
Update the aggregated sensor.
The user is required to override this function, which must return the updated
Reading
(i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.- Parameters:
updated_sensor – The sensor in the target
SensorSet
which has changed in some way.reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.
old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.
- Returns:
The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.
- Return type:
Optional[Reading]
- filter_aggregate(sensor: Sensor) bool
Decide whether another sensor is part of the aggregation.
Users can override this function to exclude certain categories of sensors, such as other aggregates, to prevent circular references.
- Returns:
True if sensor should be included in calculation of the aggregate, False if not.
- Return type:
bool
- class aiokatcp.sensor.SimpleAggregateSensor(target: SensorSet, sensor_type: Type[_T], name: str, description: str = '', units: str = '', *, auto_strategy: Optional[Strategy] = None, auto_strategy_parameters: Iterable[Any] = ())
Bases:
AggregateSensor
[_T
]A simplified version of
AggregateSensor
for common use cases.This class is suitable when:
You don’t need direct control of the returned timestamp. It will be determined automatically.
The aggregate value and status can be determined from an internal state which is updated by adding or removing readings.
Subclasses must override
aggregate_add()
,aggregate_remove()
,aggregate_compute()
and optionallyfilter_aggregate()
.Currently, the timestamp will be set to the larger of the last sensor update time (using the sensor timestamp) and the last addition or removal time (wallclock time). This may be changed in future releases based on implementation experience. This class is best suited to cases where child sensors are only added on startup and removed on teardown, rather than continuously added and removed.
- abstract aggregate_add(sensor: Sensor[_U], reading: Reading[_U]) bool
Update internal state with an additional reading.
- Returns:
True if the new reading should result in a state update.
- Return type:
bool
- abstract aggregate_remove(sensor: Sensor[_U], reading: Reading[_U]) bool
Update internal state by removing a reading.
- Returns:
True if removing the reading should result in a state update.
- Return type:
bool
- abstract aggregate_compute() Tuple[Status, _T]
Compute aggregate status and value from the internal state.
- update_aggregate(updated_sensor: Optional[Sensor[_U]], reading: Optional[Reading[_U]], old_reading: Optional[Reading[_U]]) Optional[Reading[_T]]
Update the aggregated sensor.
The user is required to override this function, which must return the updated
Reading
(i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.- Parameters:
updated_sensor – The sensor in the target
SensorSet
which has changed in some way.reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.
old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.
- Returns:
The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.
- Return type:
Optional[Reading]
aiokatcp.server module
- class aiokatcp.server.ClientConnection(owner: DeviceServer, reader: StreamReader, writer: StreamWriter)
Bases:
Connection
Server’s view of the connection from a single client.
- samplers_lock
Protects against concurrent request_sensor_sampling (but not sensor removal)
- close() None
Start closing the connection.
Any currently running message handler will be cancelled. The closing process completes asynchronously. Use
wait_closed()
to wait for things to be completely closed off.
- set_sampler(s: Sensor, sampler: Optional[SensorSampler]) None
Set or clear the sampler for a sensor.
- get_sampler(s: Sensor) Optional[SensorSampler]
Retrieve the sampler for a sensor
- class aiokatcp.server.RequestContext(conn: ClientConnection, req: Message)
Bases:
object
Interface for informs and replies to a request.
- Parameters:
conn – Client connection from which the request originated
req – The request itself
- property replied: bool
Whether a reply has currently been sent
- reply(*args: Any) None
Send a reply to the request.
- Parameters:
*args – The fields of the reply
- Raises:
RuntimeError – If the request has already been replied to.
- inform(*args: Any) None
Send an inform in response to the request.
- Parameters:
*args – The fields of the inform
- Raises:
RuntimeError – If the request has already been replied to.
- informs(informs: Iterable[Iterable], *, send_reply=True) None
Write a sequence of informs and send an
ok
reply with the count.- Parameters:
informs – Each element is an iterable of fields in the inform.
- Raises:
RuntimeError – If the request has already been replied to.
- async drain() None
Wait for the outgoing queue to be below a threshold.
- class aiokatcp.server.DeviceServerMeta(name, bases, namespace, **kwds)
Bases:
type
- class aiokatcp.server.DeviceServer(host: str, port: int, *, limit: int = 16777216, max_pending: int = 100, max_backlog: Optional[int] = None, loop: Optional[AbstractEventLoop] = None)
Bases:
object
Server that handles katcp.
- Parameters:
host – Hostname to listen on (empty for all interfaces)
port – Port number to listen on
limit – Maximum line length in a request
max_pending – Maximum number of asynchronous requests that can be in progress. Once this number of reached, new requests are blocked.
loop – Event loop on which the server will run, defaulting to
asyncio.get_event_loop()
.max_backlog –
Maximum backlog in the write queue to a client.
If a message is to be sent to a client and it has more than this many bytes in its backlog, it is disconnected instead to prevent the server running out of memory. At present this is only applied to asynchronous informs, but it may be applied to other messages in future.
If not specified it defaults to twice limit.
- VERSION: str = None
- BUILD_STATE: str = None
- class LogHandler(server: DeviceServer)
Bases:
Handler
Log handler that issues log messages as
#log
informs.It is automatically initialised with a filter that discard log messages from the aiokatcp module, because otherwise one may get into an infinite recursion where log messages are communications with a client are communicated to the client.
It is also initialised with a default formatter that emits only the the log message itself.
Typical usage to inform clients about all log messages in the application is
logging.getLogger().addHandler(DeviceServer.LogHandler(server))
- emit(record: LogRecord) None
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
- async start() None
Start the server running on the event loop.
- Raises:
RuntimeError – if the server is already running
- async on_stop() None
Extension point for subclasses to run shutdown code.
This is called after the TCP server has been shut down and all in-flight requests have been completed or cancelled, but before service tasks are cancelled. Subclasses should override this function rather than
stop()
to run late shutdown code because this is called before the flag is set to wake upjoin()
.It is only called if the server was running when
stop()
was called.
- async stop(cancel: bool = True) None
Shut down the server.
- Parameters:
cancel – If true (default), cancel any pending asynchronous requests.
- halt(cancel: bool = True) Task
Begin server shutdown, but do not wait for it to complete.
- Parameters:
cancel – If true (default), cancel any pending asynchronous requests.
- Returns:
Task that is performing the stop.
- Return type:
task
- async join() None
Block until the server has stopped.
This will re-raise any exception raised by
on_stop()
or a service task.
- property server: Optional[Server]
Return the underlying TCP server
- property sockets: Tuple[socket, ...]
Sockets associated with the underlying server.
If
start()
has not yet been called, this will be empty.
- property service_tasks: Tuple[Task, ...]
- add_service_task(task: Task) None
Register an asynchronous task that runs as part of the server.
The task will be cancelled when the server is stopped. If it throws an exception (other than
asyncio.CancelledError
), the server will be halted and the exception will be rethrown fromstop()
orjoin()
.Note
The task must actually be an instance of
asyncio.Task
rather than a coroutine.
- send_version_info(ctx: RequestContext, *, send_reply=True) None
Send version information informs to the client.
This is used for asynchronous #version-connect informs when a client connects and in response to a ?version-list request.
- Returns:
Number of informs sent
- Return type:
num_informs
- async unhandled_request(ctx: RequestContext, req: Message) None
Called when a request is received for which no handler is registered.
Subclasses may override this to do dynamic handling.
- async handle_message(conn: ClientConnection, msg: Message) None
Called by
ClientConnection
for each incoming message.
- mass_inform(name: str, *args: Any) None
Send an asynchronous inform to all clients.
- Parameters:
name – Inform name
*args – Fields for the inform
- async request_help(ctx: RequestContext, name: Optional[str] = None) None
Return help on the available requests.
Return a description of the available requests using a sequence of #help informs.
- Parameters:
request (str, optional) – The name of the request to return help for (the default is to return help for all requests).
- Informs:
request (str) – The name of a request.
description (str) – Documentation for the named request.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the help succeeded.
informs (int) – Number of #help inform messages sent.
Examples
?help #help halt ...description... #help help ...description... ... !help ok 5 ?help halt #help halt ...description... !help ok 1
- async request_halt(ctx: RequestContext) None
Halt the device server.
- Returns:
success – Whether scheduling the halt succeeded.
- Return type:
{‘ok’, ‘fail’}
Examples
?halt !halt ok
- async request_watchdog(ctx: RequestContext) None
Check that the server is still alive.
- Returns:
success
- Return type:
{‘ok’}
Examples
?watchdog !watchdog ok
- async request_version_list(ctx: RequestContext) None
Request the list of versions of roles and subcomponents.
- Informs:
name (str) – Name of the role or component.
version (str) – A string identifying the version of the component. Individual components may define the structure of this argument as they choose. In the absence of other information clients should treat it as an opaque string.
build_state_or_serial_number (str) – A unique identifier for a particular instance of a component. This should change whenever the component is replaced or updated.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the version list succeeded.
informs (int) – Number of #version-list inform messages sent.
Examples
?version-list #version-list katcp-protocol 5.1-MIB #version-list katcp-library katcp-python-0.4 katcp-python-0.4.1-py2 #version-list katcp-device foodevice-1.0 foodevice-1.0.0rc1 !version-list ok 3
- async request_sensor_list(ctx: RequestContext, name: Optional[str] = None) int
Request the list of sensors.
The list of sensors is sent as a sequence of #sensor-list informs.
- Parameters:
name (str, optional) – Name of the sensor to list (the default is to list all sensors). If name starts and ends with ‘/’ it is treated as a regular expression and all sensors whose names contain the regular expression are returned.
- Informs:
name (str) – The name of the sensor being described.
description (str) – Description of the named sensor.
units (str) – Units for the value of the named sensor.
type (str) – Type of the named sensor.
params (list of str, optional) – Additional sensor parameters (type dependent). For discrete sensors the additional parameters are the allowed values. For all other types no additional parameters are sent.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the sensor list succeeded.
informs (int) – Number of #sensor-list inform messages sent.
Examples
?sensor-list #sensor-list psu.voltage PSU\_voltage. V float #sensor-list cpu.status CPU\_status. \@ discrete on off error ... !sensor-list ok 5 ?sensor-list cpu.power.on #sensor-list cpu.power.on Whether\_CPU\_has\_power. \@ boolean !sensor-list ok 1 ?sensor-list /voltage/ #sensor-list psu.voltage PSU\_voltage. V float #sensor-list cpu.voltage CPU\_voltage. V float !sensor-list ok 2
- async request_sensor_value(ctx: RequestContext, name: Optional[str] = None) None
Request the value of a sensor or sensors.
A list of sensor values as a sequence of #sensor-value informs.
- Parameters:
name (str, optional) – Name of the sensor to poll (the default is to send values for all sensors). If name starts and ends with ‘/’ it is treated as a regular expression and all sensors whose names contain the regular expression are returned.
- Informs:
timestamp (float) – Timestamp of the sensor reading in seconds since the Unix epoch, or milliseconds for katcp versions <= 4.
count ({1}) – Number of sensors described in this #sensor-value inform. Will always be one. It exists to keep this inform compatible with #sensor-status.
name (str) – Name of the sensor whose value is being reported.
status (Sensor.Status) – Sensor status (see Sensor.Status enum)
value (object) – Value of the named sensor. Type depends on the type of the sensor.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the list of values succeeded.
informs (int) – Number of #sensor-value inform messages sent.
Examples
?sensor-value #sensor-value 1244631611.415231 1 psu.voltage nominal 4.5 #sensor-value 1244631611.415200 1 cpu.status warn off ... !sensor-value ok 5 ?sensor-value cpu.power.on #sensor-value 1244631611.415231 1 cpu.power.on error 0 !sensor-value ok 1
- async request_sensor_sampling(ctx: RequestContext, name: str, strategy: Optional[Strategy] = None, *args: bytes) tuple
Configure or query the way a sensor is sampled.
Sampled values are reported asynchronously using the #sensor-status message.
- Parameters:
name – Name of the sensor whose sampling strategy to query or configure. When configuring it may be a comma-separated list of names.
strategy – Type of strategy to use to report the sensor value. The differential strategy type may only be used with integer or float sensors. If this parameter is supplied, it sets the new strategy.
*args – Additional strategy parameters (dependent on the strategy type). For the differential strategy, the parameter is an integer or float giving the amount by which the sensor value may change before an updated value is sent. For the period strategy, the parameter is the sampling period in float seconds. The event strategy has no parameters. Note that this has changed from KATCPv4. For the event-rate strategy, a minimum period between updates and a maximum period between updates (both in float seconds) must be given. If the event occurs more than once within the minimum period, only one update will occur. Whether or not the event occurs, the sensor value will be updated at least once per maximum period.
- Returns:
success ({‘ok’, ‘fail’}) – Whether the sensor-sampling request succeeded.
name (str) – Name of the sensor queried or configured.
strategy (
SensorSampler.Strategy
) – Name of the new or current sampling strategy for the sensor.params (list of str) – Additional strategy parameters (see description under Parameters).
Examples
?sensor-sampling cpu.power.on !sensor-sampling ok cpu.power.on none ?sensor-sampling cpu.power.on period 500 !sensor-sampling ok cpu.power.on period 500
- async request_client_list(ctx: RequestContext) None
Request the list of connected clients.
The list of clients is sent as a sequence of #client-list informs.
- Informs:
addr (str) – The address of the client as host:port with host in dotted quad notation. If the address of the client could not be determined (because, for example, the client disconnected suddenly) then a unique string representing the client is sent instead.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the client list succeeded.
informs (int) – Number of #client-list inform messages sent.
Examples
?client-list #client-list 127.0.0.1:53600 !client-list ok 1
- async request_log_level(ctx: RequestContext, level: Optional[LogLevel] = None) LogLevel
Query or set the current logging level.
- Parameters:
level ({'all', 'trace', 'debug', 'info', 'warn', 'error', 'fatal', 'off'}, optional) – Name of the logging level to set the device server to (the default is to leave the log level unchanged).
- Returns:
success ({‘ok’, ‘fail’}) – Whether the request succeeded.
level ({‘all’, ‘trace’, ‘debug’, ‘info’, ‘warn’, ‘error’, ‘fatal’, ‘off’}) – The log level after processing the request.
Examples
?log-level !log-level ok warn ?log-level info !log-level ok info
aiokatcp.time_sync module
Utilities for creating time synchronisation sensors.
These use the Linux-specific adjtimex()
system call to get information
about time synchronisation. This does not give access to all the statistics
that an NTP or PTP server would provide, but should be sufficient to check that
time is being kept synchronised, and (at least on Linux) is accessible from
inside a container with no privileges required.
- class aiokatcp.time_sync.ClockState(value)
Bases:
Enum
An enumeration.
- OK = 0
- INS = 1
- DEL = 2
- OOP = 3
- WAIT = 4
- ERROR = 5
- class aiokatcp.time_sync.TimeSyncUpdater(sensor_map: Mapping[str, Sensor])
Bases:
object
Maps raw adjtimex(2) fields to sensor values.
The sensors to populate are specified by a mapping, whose keys can be any subset of:
- state
Return value of the
adjtimex()
call (as aClockState
)- maxerror
Maximum error, in seconds
- esterror
Estimated error, in seconds
- update() None
Update the sensors now.
Module contents
- aiokatcp.minor_version()