aiokatcp package
Subpackages
Submodules
aiokatcp.adjtimex module
Python wrapper for the Linux-specific adjtimex() system call.
- class aiokatcp.adjtimex.Timeval
Bases:
StructureSee https://man7.org/linux/man-pages/man3/adjtime.3.html.
- tv_sec
Structure/Union member
- tv_usec
Structure/Union member
- class aiokatcp.adjtimex.Timex
Bases:
StructureSee https://man7.org/linux/man-pages/man2/adjtimex.2.html.
- calcnt
Structure/Union member
- constant
Structure/Union member
- errcnt
Structure/Union member
- esterror
Structure/Union member
- freq
Structure/Union member
- jitcnt
Structure/Union member
- jitter
Structure/Union member
- maxerror
Structure/Union member
- modes
Structure/Union member
- offset
Structure/Union member
- ppsfreq
Structure/Union member
- precision
Structure/Union member
- shift
Structure/Union member
- stabil
Structure/Union member
- status
Structure/Union member
- stbcnt
Structure/Union member
- tai
Structure/Union member
- tick
Structure/Union member
- time
Structure/Union member
- tolerance
Structure/Union member
aiokatcp.client module
- exception aiokatcp.client.ProtocolError(msg, version)
Bases:
ValueErrorThe server does not implement the required protocol version
- class aiokatcp.client.Client(host: str, port: int, *, auto_reconnect: bool = True, limit: int = 16777216, loop: AbstractEventLoop | None = None)
Bases:
objectClient that connects to a katcp server.
The client will automatically connect to the server, and reconnect if the connection is lost. If you want to wait for the initial connection to complete up front, the
connect()factory may be preferable to the constructor.- Parameters:
host – Server hostname
port – Server port number
limit – Maximum line length in a message from the server
loop – Event loop on which the client will run, defaulting to
asyncio.get_event_loop().
- last_exc
An exception object associated with the last connection attempt. It is always
Noneifis_connectedis True.- Type:
- handle_message(conn: Connection, msg: Message) None
Called by
Connectionfor each incoming message.
- unhandled_inform(msg: Message) None
Called if an inform is received for which no handler is registered.
The default simply logs a debug message if there are no inform callbacks registered for the message. Subclasses may override this to provide other behaviour for unknown informs.
- add_connected_callback(callback: Callable[[], None]) None
Register a handler that is called when a connection is established.
The handler is called without arguments. Use a lambda or
functools.partial()if you need arguments. Handlers are called in the order they are registered.
- remove_connected_callback(callback: Callable[[], None]) None
Remove a callback registered with
add_connected_callback().
- add_disconnected_callback(callback: Callable[[], None]) None
Register a handler that is called when a connection is lost.
The handler is called without arguments. Use a lambda or
functools.partial()if you need arguments. Handlers are called in the reverse of order of registration.
- remove_disconnected_callback(callback: Callable[[], None]) None
Remove a callback registered with
add_disconnected_callback().
- add_failed_connect_callback(callback: Callable[[Exception], None]) None
Register a handler that is called when a connection attempt fails.
The handler is passed an exception object. Handlers are called in the order of registration.
- remove_failed_connect_callback(callback: Callable[[Exception], None]) None
Remove a callback registered with
add_failed_connect_callback().
- add_inform_callback(name: str, callback: Callable[[...], None]) None
Add a callback called on every asynchronous inform.
The message arguments are unpacked according to the type annotations on the arguments of the callback. Callbacks are called in the order registered, after any handlers defined by methods in the class.
- remove_inform_callback(name: str, callback: Callable[[...], None]) None
Remove a callback registered with
add_inform_callback().
- close() None
Start closing the connection.
Closing completes asynchronously. Use
wait_closed()to wait for it to be fully complete.
- async wait_connected() None
Wait until a connection is established.
If the client is closed (either manually or because
auto_reconnectis false and the single connection attempt failed) this will raise an exception.Note
On return, it is possible that
is_connectedis false, because the connection may fail immediately after waking up the waiter.
- async classmethod connect(host: str, port: int, *, auto_reconnect: bool = True, limit: int = 16777216, loop: AbstractEventLoop | None = None) Client
Factory function that creates a client and waits until it is connected.
Refer to the constructor documentation for details of the parameters.
- async request_raw(name: str, *args: Any) Tuple[Message, List[Message]]
Make a request to the server and await the reply, without decoding it.
- Parameters:
name – Message name
args – Message arguments, which will be encoded by
Message.
- Returns:
reply – Reply message
informs – List of synchronous informs received
- Raises:
BrokenPipeError – if not connected at the time the request was made
ConnectionError – if the connection was lost before the reply was received
- async request(name: str, *args: Any) Tuple[List[bytes], List[Message]]
Make a request to the server and await the reply.
It expects the first argument of the reply to be
ok,failorinvalid, and raises exceptions in the latter two cases. If this is undesirable, userequest_raw()instead.- Parameters:
name – Message name
args – Message arguments, which will be encoded by
Message.
- Returns:
reply – Reply message arguments, excluding the
okinforms – List of synchronous informs received
- Raises:
FailReply – if the server replied with
failInvalidReply – if the server replied anything except
okorfailBrokenPipeError – if not connected at the time the request was made
ConnectionError – if the connection was lost before the reply was received
- async sensor_reading(sensor_name: str, sensor_type: None = None) Reading
- async sensor_reading(sensor_name: str, sensor_type: Type[_T]) Reading[_T]
Request the reading of a single sensor from the server.
This is a wrapper around a
?sensor-valuerequest that decodes the result. If you know the type of the sensor, it can be passed as a parameter; if it is not specified,?sensor-listis used to determine it. Note that this introduces a race condition (but an unlikely one) where the sensor could be replaced by one of a different type between the two requests.If sensor_type is not given and the sensor has a discrete type, the returned reading will contain a byte string rather than an enum. Similarly, string sensors are returned as byte strings, but sensor_type can be passed as str to override this.
This is not a high-performance interface. If you need to sample a large number of sensors, better performance can be obtained with hand-coded implementations, such as by pipelining multiple requests.
- Raises:
FailReply – If any of the requests fails e.g., because the sensor does not exist.
InvalidReply – If any of the requests is invalid. This generally indicates a bug, either in this function or in the server.
- async sensor_value(sensor_name: str, sensor_type: None = None) Any
- async sensor_value(sensor_name: str, sensor_type: Type[_T]) _T
Request the value of a single sensor from the server.
See
sensor_reading()for more information. This is a thin wrapper that just returns the value from the reading.- Raises:
ValueError – if the sensor status indicates that the value is invalid.
- add_sensor_watcher(watcher: AbstractSensorWatcher) None
- remove_sensor_watcher(watcher: AbstractSensorWatcher) None
- class aiokatcp.client.SyncState(*values)
Bases:
EnumState of synchronisation of an
AbstractSensorWatcher- DISCONNECTED = 1
Not currently connected to the server
- SYNCING = 2
Connected to the server, but still subscribing to sensors
- SYNCED = 3
Connected to the server and sensor list is up to date
- CLOSED = 4
Client object has been closed (
Client.close())
- class aiokatcp.client.AbstractSensorWatcher
Bases:
objectBase class for receiving notifications about sensor changes.
This class is intended to be subclassed to implement any of the notification callbacks.
Subclasses must not override
__hash__()or__eq__(): the implementation depends on being able to store the watchers in a dictionary keyed by identity.- filter(name: str, description: str, units: str, type_name: str, *args: bytes) bool
Query whether this watcher is interested in this sensor.
If it returns
False, this watcher will not receive any of the other callbacks for this sensor.By default, all sensors are used.
- sensor_added(name: str, description: str, units: str, type_name: str, *args: bytes) None
A sensor was added on the remote server.
This is also called if a sensor changed its properties. In that case there is no call to
sensor_removed(), unless thefilter()returnedFalsefor the new version.
- sensor_removed(name: str) None
A sensor disappeared from the remote server.
This will also be called if the sensor changed properties and
filter()returnedFalsefor the new version.
- sensor_updated(name: str, value: bytes, status: Status, timestamp: float) None
The value of a sensor changed on the remote server.
- batch_start() None
Called at the start of a batch of back-to-back updates.
Calls to
sensor_added(),sensor_removed()andsensor_updated()will always be bracketed bybatch_start()andbatch_stop(). This does not apply tostate_updated().
- state_updated(state: SyncState) None
Indicates the state of the synchronisation state machine.
Implementations should assume the initial state is
SyncState.DISCONNECTED.
- class aiokatcp.client.SensorWatcher(client: Client, enum_types: Sequence[Type[Enum]] = ())
Bases:
AbstractSensorWatcherSensor watcher that mirrors sensors into a
SensorSet.- Parameters:
client – Client to which this watcher will be attached. It is currently only used to get the correct logger and event loop.
enum_types – Enum types to be used for discrete sensors. An enum type is used if it has the same legal values in the same order as the remote sensor. If a discrete sensor has no matching enum type, one is synthesized on the fly.
- sensors
The mirrored sensors
- Type:
SensorSet
- synced
Event that is set whenever the state is
SyncState.SYNCED- Type:
- SENSOR_TYPES = {'address': <class 'aiokatcp.core.Address'>, 'boolean': <class 'bool'>, 'discrete': <enum 'Enum'>, 'float': <class 'float'>, 'integer': <class 'int'>, 'string': <class 'bytes'>, 'timestamp': <class 'aiokatcp.core.Timestamp'>}
- rewrite_name(name: str) str | Sequence[str]
Convert name of incoming sensor to name to use in the sensor set.
This defaults to the identity, but can be overridden to provide name mangling. It may also return a sequence of names, in which case the original sensor is replicated under each of those names.
- make_type(type_name: str, parameters: Sequence[bytes]) type
Get the sensor type for a given type name
- sensor_added(name: str, description: str, units: str, type_name: str, *args: bytes) None
A sensor was added on the remote server.
This is also called if a sensor changed its properties. In that case there is no call to
sensor_removed(), unless thefilter()returnedFalsefor the new version.
- sensor_removed(name: str) None
A sensor disappeared from the remote server.
This will also be called if the sensor changed properties and
filter()returnedFalsefor the new version.
- sensor_updated(name: str, value: bytes, status: Status, timestamp: float) None
The value of a sensor changed on the remote server.
- state_updated(state: SyncState) None
Indicates the state of the synchronisation state machine.
Implementations should assume the initial state is
SyncState.DISCONNECTED.
aiokatcp.connection module
- exception aiokatcp.connection.FailReply
Bases:
ExceptionIndicate to the remote end that a request failed, without backtrace
- exception aiokatcp.connection.InvalidReply
Bases:
ExceptionIndicate to the remote end that a request was unrecognised
- class aiokatcp.connection.ConnectionLoggerAdapter(logger, extra=None)
Bases:
LoggerAdapter- process(msg, kwargs)
Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.
Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.
- class aiokatcp.connection.Connection(owner: _ConnectionOwner[Self], is_server: bool, limit: int)
Bases:
BufferedProtocol- logger: LoggerAdapter
- connection_made(transport: BaseTransport) None
Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
- connection_lost(exc: Exception | None) None
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
- eof_received() bool
Called when the other end calls write_eof() or equivalent.
If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol.
- pause_writing() None
Called when the transport’s buffer goes over the high-water mark.
Pause and resume calls are paired – pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.
Note that if the buffer size equals the high-water mark, pause_writing() is not called – it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.
NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() – if it were, it would have no effect when it’s most needed (when the app keeps writing without yielding until pause_writing() is called).
- resume_writing() None
Called when the transport’s buffer drains below the low-water mark.
See pause_writing() for details.
- get_buffer(sizehint: int) memoryview
Called to allocate a new receive buffer.
sizehint is a recommended minimal size for the returned buffer. When set to -1, the buffer size can be arbitrary.
Must return an object that implements the buffer protocol. It is an error to return a zero-sized buffer.
- buffer_updated(nbytes: int) None
Called when the buffer was updated with the received data.
nbytes is the total number of bytes that were written to the buffer.
- write_messages(msgs: Iterable[Message]) None
Write an iterable of messages to the connection.
Connection errors are logged and swallowed.
- write_message(msg: Message) None
Write a message to the connection.
Connection errors are logged and swallowed.
- close() None
Start closing the connection.
The closing process completes asynchronously. Use
wait_closed()to wait for things to be completely closed off.
- aiokatcp.connection.wrap_handler(name: str, handler: Callable, fixed: int) Callable
Convert a handler that takes a sequence of typed arguments into one that takes a message.
The message is unpacked to the types given by the signature. If it could not be unpacked, the wrapper raises
FailReply.- Parameters:
name – Name of the message (only used to form error messages).
handler – The callable to wrap (may be a coroutine).
fixed – Number of leading parameters in handler that do not correspond to message arguments.
aiokatcp.core module
- class aiokatcp.core.Address(host: IPv4Address | IPv6Address, port: int | None = None)
Bases:
objectA katcp address.
- Parameters:
host – Host address
port – Port number
- property host: IPv4Address | IPv6Address
Host address
- classmethod parse(raw: bytes) Address
Construct an
Addressfrom a katcp message argument- Parameters:
raw – Unescaped value in katcp message argument
- Raises:
ValueError – If raw does not represent a valid address
- class aiokatcp.core.Timestamp(x=0, /)
Bases:
floatA katcp timestamp.
This is just a thin wrapper around
floatto allow the type to be distinguished. It represents time in seconds as a UNIX timestamp.
- class aiokatcp.core.Now(*values)
Bases:
EnumSingleton for representing a timestamp specified as
nowin the protocol.- NOW = 0
- class aiokatcp.core.LogLevel(*values)
Bases:
IntEnumkatcp log level, with values matching Python log levels
- ALL = 0
- TRACE = 0
- DEBUG = 10
- INFO = 20
- WARN = 30
- ERROR = 40
- FATAL = 50
- OFF = 60
- class aiokatcp.core.DeviceStatus(*values)
Bases:
EnumDiscrete device-status readings.
- OK = 1
- DEGRADED = 2
- FAIL = 3
- class aiokatcp.core.TypeInfo(type_: Type[_T], name: str, encode: Callable[[_T], bytes], get_decoder: Callable[[Type[_T]], Callable[[bytes], _T]], default: Callable[[Type[_T]], _T])
Bases:
Generic[_T]Type database entry. Refer to
register_type()for details.
- aiokatcp.core.register_type(type_: Type[_T], name: str, encode: Callable[[_T], bytes], get_decoder: Callable[[Type[_T]], Callable[[bytes], _T]], default: Callable[[Type[_T]], _T] | None = None) None
Register a type for encoding and decoding in messages.
The registration is also used for subclasses of type_ if no more specific registration has been made. This is particularly used for the registration for
enum.Enum, which is used for all enum types.- Parameters:
type – Python class.
encode – Function to encode values of this type to bytes
get_decoder – Function to that takes the actual derived class and returns a decoder that converts instances of
bytesto that class.default – Function to generate a default value of this type (used by the sensor framework). It is given the actual derived class as the first argument.
- aiokatcp.core.encode(value: Any) bytes
Encode a value to raw bytes for katcp.
- Parameters:
value – Value to encode
- Raises:
TypeError – if the type of value has not been registered
See also
- aiokatcp.core.get_type(type_: Type[_T]) TypeInfo[_T]
Retrieve the type information previously registered with
register_type().It returns the last type info registered that is a superclass of type_ (according to
issubclass.- Raises:
TypeError – if none of the registrations match type_
- aiokatcp.core.get_decoder(cls: Type[_T]) Callable[[bytes], _T]
Get a decoder function.
See
decode()for more details. This function is useful for efficiency: the decoder for a class can be looked up once then used many times.
- aiokatcp.core.decode(cls: Any, value: bytes) Any
Decode value in katcp message to a type.
If a union type is provided, the value must decode successfully (i.e., without raising
ValueError) for exactly one of the types in the union, otherwise aValueErroris raised.- Parameters:
cls – The target type, or a
typing.Unionof types.value – Raw (but unescaped) value in katcp message
- Raises:
ValueError – if value does not have a valid value for cls
TypeError – if cls is not a registered type or union of registered types.
See also
- exception aiokatcp.core.KatcpSyntaxError(message: str, raw: bytes | None = None)
Bases:
ValueErrorRaised by parsers when encountering a syntax error.
- class aiokatcp.core.Message(mtype: MessageType, name: str, *arguments: Any, mid: int | None = None)
Bases:
object- Type
alias of
MessageType
- OK = b'ok'
- FAIL = b'fail'
- INVALID = b'invalid'
- mtype
- name
- arguments
- mid
- classmethod unescape_argument(arg: bytes) bytes
Reverse of
escape_argument()
- classmethod parse(raw) Message
Create a
Messagefrom encoded representation.- Parameters:
raw – Bytes from the wire, including the trailing newline
- Raises:
KatcpSyntaxError – If raw is not validly encoded.
aiokatcp.sensor module
- class aiokatcp.sensor.DeltaObserver(*args, **kwargs)
Bases:
Protocol[_T]
- class aiokatcp.sensor.ClassicObserver(*args, **kwargs)
Bases:
Protocol[_T]
- class aiokatcp.sensor.Reading(timestamp: float, status: Status, value: _T)
Bases:
Generic[_T]Sensor reading
- Parameters:
timestamp (float) – The UNIX timestamp at which the sensor value was determined
status (aiokatcp.sensor.Sensor.Status) – Sensor status at timestamp
value (aiokatcp.sensor._T) – Sensor value at timestamp
- value: _T
- class aiokatcp.sensor.Sensor(sensor_type: ~typing.Type[~aiokatcp.sensor._T], name: str, description: str = '', units: str = '', default: ~aiokatcp.sensor._T | None = None, initial_status: ~aiokatcp.sensor.Sensor.Status = Status.UNKNOWN, *, status_func: ~typing.Callable[[~aiokatcp.sensor._T], ~aiokatcp.sensor.Sensor.Status] = <function _default_status_func>, auto_strategy: ~aiokatcp.sensor.SensorSampler.Strategy | None = None, auto_strategy_parameters: ~typing.Iterable[~typing.Any] = ())
Bases:
Generic[_T]A sensor in a
DeviceServer.A sensor has some static configuration (name, description, units etc) and dynamic state consisting of a
Reading(value, status and timestamp). Other code can attach observers to the sensor to be informed of updates.- Parameters:
sensor_type – The type of the sensor.
name – Sensor name
description – More detailed explanation of the sensor
units – Physical units of the sensor
default – Initial value of the sensor. When setting this, it may be desirable to specify initial_status too.
initial_status – Initial status of the sensor
status_func – Function that maps a value to a status in
set_value()if none is given. The default is a function that always returns NOMINAL.auto_strategy – Sampling strategy to use when a client requests the
autostrategy. The default is to send all updates of the value to the client immediately.auto_strategy_parameters – Parameters to use with auto_strategy. They must be already-decoded values.
- class Status(*values)
Bases:
IntEnum- UNKNOWN = 0
- NOMINAL = 1
- WARN = 2
- ERROR = 3
- FAILURE = 4
- UNREACHABLE = 5
- INACTIVE = 6
- notify(reading: Reading[_T], old_reading: Reading[_T]) None
Notify all observers of changes to this sensor.
Users should not usually call this directly. It is called automatically by
set_value().
- set_value(value: Any, status: Status | None = None, timestamp: float | None = None) None
Set the current value of the sensor.
Also validate that the incoming value type is compatible with the core type of this sensor. If compatible, coerce it to an instance of the
stype.- Parameters:
value – The value of the sensor (the type should be appropriate to the sensor’s type).
status – Whether the value represents an error condition or not. If not given, the status_func given to the constructor is used to determine the status from the value.
timestamp – The time at which the sensor value was determined (seconds). If not given, it defaults to
time.time().
- Raises:
TypeError – If the incoming value type is not compatible with the sensor’s core type.
- property value: _T
The current value of the sensor.
Modifying it invokes
set_value().
- attach(observer: ClassicObserver[_T] | DeltaObserver[_T]) None
- detach(observer: ClassicObserver[_T] | DeltaObserver[_T]) None
- class aiokatcp.sensor.SensorSampler(sensor: Sensor[_T], observer: Callable[[Sensor[_T], Reading[_T]], None] | None, loop: AbstractEventLoop, difference: _T | None = None, shortest: Timestamp = 0.0, longest: Timestamp | None = None, *, always_update: bool = False, is_auto: bool = False)
Bases:
Generic[_T]Implement the strategies defined by the
sensor-samplingrequest.This is an abstract base class. Instances should be constructed by calling
factory().It takes an “observer”, which is a callback function that is called when a sensor update should be sent to the subscribed client. When the sampler is constructed, the observer is called immediately, and then again when appropriate to the strategy.
It is possible to construct this class without an observer, and set it later. This is used by the
sensor-samplingimplementation to first validate the parameters before sending any readings.- Parameters:
sensor – The sensor to observe
observer – Callback function to invoke
loop – Asyncio event loop
difference – Minimum change in value before sending an update
shortest – Minimum time between updates
longest – Maximum time between updates (or None for no maximum)
always_update – If true, update on every sensor value assignment
is_auto – True if this sampler was created from the “auto” strategy
- class Strategy(*values)
Bases:
Enum- NONE = 0
- AUTO = 1
- PERIOD = 2
- EVENT = 3
- DIFFERENTIAL = 4
- EVENT_RATE = 5
- DIFFERENTIAL_RATE = 6
- class aiokatcp.sensor.SensorSet
-
A dict-like and set-like collection of sensors.
It is possible to monitor for removal of sensors using
add_remove_callback().- add_remove_callback(callback: Callable[[Sensor], None]) None
Add a callback that will be passed any sensor removed from the set.
- add_add_callback(callback: Callable[[Sensor], None]) None
Add a callback that will be passed any sensor added to the set.
- remove_remove_callback(callback: Callable[[Sensor], None]) None
Remove a callback registered with
add_remove_callback().
- remove_add_callback(callback: Callable[[Sensor], None]) None
Remove a callback registered with
add_add_callback().
- add(elem: Sensor) None
Add an element to a set.
This has no effect if the element is already present.
- remove(elem: Sensor) None
Remove an element from a set; it must be a member.
If the element is not a member, raise a KeyError.
- discard(elem: Sensor) None
Remove an element from a set if it is a member.
Unlike set.remove(), the discard() method does not raise an exception when an element is missing from the set.
- clear() None. Remove all items from D.
- popitem() Tuple[str, Sensor]
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- get(name: str) Sensor | None
- get(name: str, default: Sensor | _T) Sensor | _T
Return the value for key if key is in the dictionary, else default.
- keys() a set-like object providing a view on D's keys
- values() an object providing a view on D's values
- items() a set-like object providing a view on D's items
- copy() a shallow copy of D
- class aiokatcp.sensor.AggregateSensor(target: SensorSet, sensor_type: Type[_T], name: str, description: str = '', units: str = '', *, auto_strategy: Strategy | None = None, auto_strategy_parameters: Iterable[Any] = ())
Bases:
Sensor[_T]A Sensor with its reading determined by several other Sensors.
This is an abstract class: the user must implement
update_aggregate(). This method is called whenever the targetSensorSethas a sensor added, removed, or one of the sensors changes its reading. The user-definedupdate_aggregate()returns the new reading for the AggregateSensor.Parameters are all as per
Sensor, with the exception of target, which is theSensorSetfrom which the aggregated sensor will determine its own reading.- target
The set of sensors which will determine the reading of the aggregate one. The aggregate sensor may be included in the set (e.g. in self.sensors of a server) but it will not affect its own value.
- Type:
- abstractmethod update_aggregate(updated_sensor: Sensor[_U] | None, reading: Reading[_U] | None, old_reading: Reading[_U] | None) Reading[_T] | None
Update the aggregated sensor.
The user is required to override this function, which must return the updated
Reading(i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.- Parameters:
updated_sensor – The sensor in the target
SensorSetwhich has changed in some way.reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.
old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.
- Returns:
The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.
- Return type:
Optional[Reading]
- filter_aggregate(sensor: Sensor) bool
Decide whether another sensor is part of the aggregation.
Users can override this function to exclude certain categories of sensors, such as other aggregates, to prevent circular references.
- Returns:
True if sensor should be included in calculation of the aggregate, False if not.
- Return type:
- class aiokatcp.sensor.SimpleAggregateSensor(target: SensorSet, sensor_type: Type[_T], name: str, description: str = '', units: str = '', *, auto_strategy: Strategy | None = None, auto_strategy_parameters: Iterable[Any] = ())
Bases:
AggregateSensor[_T]A simplified version of
AggregateSensorfor common use cases.This class is suitable when:
You don’t need direct control of the returned timestamp. It will be determined automatically.
The aggregate value and status can be determined from an internal state which is updated by adding or removing readings.
Subclasses must override
aggregate_add(),aggregate_remove(),aggregate_compute()and optionallyfilter_aggregate().Currently, the timestamp will be set to the larger of the last sensor update time (using the sensor timestamp) and the last addition or removal time (wallclock time). This may be changed in future releases based on implementation experience. This class is best suited to cases where child sensors are only added on startup and removed on teardown, rather than continuously added and removed.
- abstractmethod aggregate_add(sensor: Sensor[_U], reading: Reading[_U]) bool
Update internal state with an additional reading.
- Returns:
True if the new reading should result in a state update.
- Return type:
- abstractmethod aggregate_remove(sensor: Sensor[_U], reading: Reading[_U]) bool
Update internal state by removing a reading.
- Returns:
True if removing the reading should result in a state update.
- Return type:
- abstractmethod aggregate_compute() Tuple[Status, _T]
Compute aggregate status and value from the internal state.
- update_aggregate(updated_sensor: Sensor[_U] | None, reading: Reading[_U] | None, old_reading: Reading[_U] | None) Reading[_T] | None
Update the aggregated sensor.
The user is required to override this function, which must return the updated
Reading(i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.- Parameters:
updated_sensor – The sensor in the target
SensorSetwhich has changed in some way.reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.
old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.
- Returns:
The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.
- Return type:
Optional[Reading]
aiokatcp.server module
- class aiokatcp.server.ClientConnection(owner: DeviceServer, limit: int)
Bases:
ConnectionServer’s view of the connection from a single client.
- owner: DeviceServer
- samplers_lock
Protects against concurrent request_sensor_sampling (but not sensor removal)
- connection_lost(exc: Exception | None) None
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
- set_sampler(s: Sensor, sampler: SensorSampler | None) None
Set or clear the sampler for a sensor.
- get_sampler(s: Sensor) SensorSampler | None
Retrieve the sampler for a sensor
- class aiokatcp.server.RequestContext(conn: ClientConnection, req: Message)
Bases:
objectInterface for informs and replies to a request.
- Parameters:
conn – Client connection from which the request originated
req – The request itself
- reply(*args: Any) None
Send a reply to the request.
- Parameters:
*args – The fields of the reply
- Raises:
RuntimeError – If the request has already been replied to.
- inform(*args: Any) None
Send an inform in response to the request.
- Parameters:
*args – The fields of the inform
- Raises:
RuntimeError – If the request has already been replied to.
- informs(informs: Iterable[Iterable], *, send_reply=True) None
Write a sequence of informs and send an
okreply with the count.- Parameters:
informs – Each element is an iterable of fields in the inform.
- Raises:
RuntimeError – If the request has already been replied to.
- class aiokatcp.server.DeviceServer(host: str, port: int, *, limit: int = 16777216, max_pending: int = 100, max_backlog: int | None = None, loop: AbstractEventLoop | None = None)
Bases:
objectServer that handles katcp.
- Parameters:
host – Hostname to listen on (empty for all interfaces)
port – Port number to listen on
limit – Maximum line length in a request
max_pending – Maximum number of asynchronous requests that can be in progress. Once this number of reached, new requests are blocked.
loop – Event loop on which the server will run, defaulting to
asyncio.get_event_loop().max_backlog –
Maximum backlog in the write queue to a client.
If a message is to be sent to a client and it has more than this many bytes in its backlog, it is disconnected instead to prevent the server running out of memory. At present this is only applied to asynchronous informs, but it may be applied to other messages in future.
If not specified it defaults to twice limit.
- class LogHandler(server: DeviceServer)
Bases:
HandlerLog handler that issues log messages as
#loginforms.It is automatically initialised with a filter that discard log messages from the aiokatcp module, because otherwise one may get into an infinite recursion where log messages are communications with a client are communicated to the client.
It is also initialised with a default formatter that emits only the the log message itself.
Typical usage to inform clients about all log messages in the application is
logging.getLogger().addHandler(DeviceServer.LogHandler(server))
- async start() None
Start the server running on the event loop.
- Raises:
RuntimeError – if the server is already running
- async on_stop() None
Extension point for subclasses to run shutdown code.
This is called after the TCP server has been shut down and all in-flight requests have been completed or cancelled, but before service tasks are cancelled. Subclasses should override this function rather than
stop()to run late shutdown code because this is called before the flag is set to wake upjoin().It is only called if the server was running when
stop()was called.
- async stop(cancel: bool = True) None
Shut down the server.
- Parameters:
cancel – If true (default), cancel any pending asynchronous requests.
- halt(cancel: bool = True) Task
Begin server shutdown, but do not wait for it to complete.
- Parameters:
cancel – If true (default), cancel any pending asynchronous requests.
- Returns:
Task that is performing the stop.
- Return type:
task
- async join() None
Block until the server has stopped.
This will re-raise any exception raised by
on_stop()or a service task.
- property sockets: Tuple[socket, ...]
Sockets associated with the underlying server.
If
start()has not yet been called, this will be empty.
- add_service_task(task: Task) None
Register an asynchronous task that runs as part of the server.
The task will be cancelled when the server is stopped. If it throws an exception (other than
asyncio.CancelledError), the server will be halted and the exception will be rethrown fromstop()orjoin().Note
The task must actually be an instance of
asyncio.Taskrather than a coroutine.
- send_version_info(ctx: RequestContext, *, send_reply=True) None
Send version information informs to the client.
This is used for asynchronous #version-connect informs when a client connects and in response to a ?version-list request.
- Returns:
Number of informs sent
- Return type:
num_informs
- async unhandled_request(ctx: RequestContext, req: Message) None
Called when a request is received for which no handler is registered.
Subclasses may override this to do dynamic handling.
- handle_message(conn: ClientConnection, msg: Message) None
Called by
ClientConnectionfor each incoming message.
- mass_inform(name: str, *args: Any) None
Send an asynchronous inform to all clients.
- Parameters:
name – Inform name
*args – Fields for the inform
- async request_help(ctx: RequestContext, name: str | None = None) None
Return help on the available requests.
Return a description of the available requests using a sequence of #help informs.
- Parameters:
request (str, optional) – The name of the request to return help for (the default is to return help for all requests).
- Informs:
request (str) – The name of a request.
description (str) – Documentation for the named request.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the help succeeded.
informs (int) – Number of #help inform messages sent.
Examples
?help #help halt ...description... #help help ...description... ... !help ok 5 ?help halt #help halt ...description... !help ok 1
- async request_halt(ctx: RequestContext) None
Halt the device server.
- Returns:
success – Whether scheduling the halt succeeded.
- Return type:
{‘ok’, ‘fail’}
Examples
?halt !halt ok
- async request_watchdog(ctx: RequestContext) None
Check that the server is still alive.
- Returns:
success
- Return type:
{‘ok’}
Examples
?watchdog !watchdog ok
- async request_version_list(ctx: RequestContext) None
Request the list of versions of roles and subcomponents.
- Informs:
name (str) – Name of the role or component.
version (str) – A string identifying the version of the component. Individual components may define the structure of this argument as they choose. In the absence of other information clients should treat it as an opaque string.
build_state_or_serial_number (str) – A unique identifier for a particular instance of a component. This should change whenever the component is replaced or updated.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the version list succeeded.
informs (int) – Number of #version-list inform messages sent.
Examples
?version-list #version-list katcp-protocol 5.1-MIB #version-list katcp-library katcp-python-0.4 katcp-python-0.4.1-py2 #version-list katcp-device foodevice-1.0 foodevice-1.0.0rc1 !version-list ok 3
- async request_sensor_list(ctx: RequestContext, name: str | None = None) int
Request the list of sensors.
The list of sensors is sent as a sequence of #sensor-list informs.
- Parameters:
name (str, optional) – Name of the sensor to list (the default is to list all sensors). If name starts and ends with ‘/’ it is treated as a regular expression and all sensors whose names contain the regular expression are returned.
- Informs:
name (str) – The name of the sensor being described.
description (str) – Description of the named sensor.
units (str) – Units for the value of the named sensor.
type (str) – Type of the named sensor.
params (list of str, optional) – Additional sensor parameters (type dependent). For discrete sensors the additional parameters are the allowed values. For all other types no additional parameters are sent.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the sensor list succeeded.
informs (int) – Number of #sensor-list inform messages sent.
Examples
?sensor-list #sensor-list psu.voltage PSU\_voltage. V float #sensor-list cpu.status CPU\_status. \@ discrete on off error ... !sensor-list ok 5 ?sensor-list cpu.power.on #sensor-list cpu.power.on Whether\_CPU\_has\_power. \@ boolean !sensor-list ok 1 ?sensor-list /voltage/ #sensor-list psu.voltage PSU\_voltage. V float #sensor-list cpu.voltage CPU\_voltage. V float !sensor-list ok 2
- async request_sensor_value(ctx: RequestContext, name: str | None = None) None
Request the value of a sensor or sensors.
A list of sensor values as a sequence of #sensor-value informs.
- Parameters:
name (str, optional) – Name of the sensor to poll (the default is to send values for all sensors). If name starts and ends with ‘/’ it is treated as a regular expression and all sensors whose names contain the regular expression are returned.
- Informs:
timestamp (float) – Timestamp of the sensor reading in seconds since the Unix epoch, or milliseconds for katcp versions <= 4.
count ({1}) – Number of sensors described in this #sensor-value inform. Will always be one. It exists to keep this inform compatible with #sensor-status.
name (str) – Name of the sensor whose value is being reported.
status (Sensor.Status) – Sensor status (see Sensor.Status enum)
value (object) – Value of the named sensor. Type depends on the type of the sensor.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the list of values succeeded.
informs (int) – Number of #sensor-value inform messages sent.
Examples
?sensor-value #sensor-value 1244631611.415231 1 psu.voltage nominal 4.5 #sensor-value 1244631611.415200 1 cpu.status warn off ... !sensor-value ok 5 ?sensor-value cpu.power.on #sensor-value 1244631611.415231 1 cpu.power.on error 0 !sensor-value ok 1
- async request_sensor_sampling(ctx: RequestContext, name: str, strategy: Strategy | None = None, *args: bytes) tuple
Configure or query the way a sensor is sampled.
Sampled values are reported asynchronously using the #sensor-status message.
- Parameters:
name – Name of the sensor whose sampling strategy to query or configure. When configuring it may be a comma-separated list of names.
strategy – Type of strategy to use to report the sensor value. The differential strategy type may only be used with integer or float sensors. If this parameter is supplied, it sets the new strategy.
*args – Additional strategy parameters (dependent on the strategy type). For the differential strategy, the parameter is an integer or float giving the amount by which the sensor value may change before an updated value is sent. For the period strategy, the parameter is the sampling period in float seconds. The event strategy has no parameters. Note that this has changed from KATCPv4. For the event-rate strategy, a minimum period between updates and a maximum period between updates (both in float seconds) must be given. If the event occurs more than once within the minimum period, only one update will occur. Whether or not the event occurs, the sensor value will be updated at least once per maximum period.
- Returns:
success ({‘ok’, ‘fail’}) – Whether the sensor-sampling request succeeded.
name (str) – Name of the sensor queried or configured.
strategy (
SensorSampler.Strategy) – Name of the new or current sampling strategy for the sensor.params (list of str) – Additional strategy parameters (see description under Parameters).
Examples
?sensor-sampling cpu.power.on !sensor-sampling ok cpu.power.on none ?sensor-sampling cpu.power.on period 500 !sensor-sampling ok cpu.power.on period 500
- async request_client_list(ctx: RequestContext) None
Request the list of connected clients.
The list of clients is sent as a sequence of #client-list informs.
- Informs:
addr (str) – The address of the client as host:port with host in dotted quad notation. If the address of the client could not be determined (because, for example, the client disconnected suddenly) then a unique string representing the client is sent instead.
- Returns:
success ({‘ok’, ‘fail’}) – Whether sending the client list succeeded.
informs (int) – Number of #client-list inform messages sent.
Examples
?client-list #client-list 127.0.0.1:53600 !client-list ok 1
- async request_log_level(ctx: RequestContext, level: LogLevel | None = None) LogLevel
Query or set the current logging level.
- Parameters:
level ({'all', 'trace', 'debug', 'info', 'warn', 'error', 'fatal', 'off'}, optional) – Name of the logging level to set the device server to (the default is to leave the log level unchanged).
- Returns:
success ({‘ok’, ‘fail’}) – Whether the request succeeded.
level ({‘all’, ‘trace’, ‘debug’, ‘info’, ‘warn’, ‘error’, ‘fatal’, ‘off’}) – The log level after processing the request.
Examples
?log-level !log-level ok warn ?log-level info !log-level ok info
aiokatcp.time_sync module
Utilities for creating time synchronisation sensors.
These use the Linux-specific adjtimex() system call to get information
about time synchronisation. This does not give access to all the statistics
that an NTP or PTP server would provide, but should be sufficient to check that
time is being kept synchronised, and (at least on Linux) is accessible from
inside a container with no privileges required.
- class aiokatcp.time_sync.ClockState(*values)
Bases:
Enum- OK = 0
- INS = 1
- DEL = 2
- OOP = 3
- WAIT = 4
- ERROR = 5
- class aiokatcp.time_sync.TimeSyncUpdater(sensor_map: Mapping[str, Sensor])
Bases:
objectMaps raw adjtimex(2) fields to sensor values.
The sensors to populate are specified by a mapping, whose keys can be any subset of:
- state
Return value of the
adjtimex()call (as aClockState)- maxerror
Maximum error, in seconds
- esterror
Estimated error, in seconds
Module contents
- aiokatcp.minor_version()