Server tutorial
Your first server
To start with, import the aiokatcp
module.
import aiokatcp
All the functionality in aiokatcp is available in this module namespace.
Your server needs to derive from DeviceServer
. At a
minimum, it needs to define VERSION
and BUILD_STATE
class
attributes, which are strings sent in the #version-connect
informs and in
response to the ?version-info
query.
class MyServer(aiokatcp.DeviceServer):
VERSION = 'myserver-api-1.0'
BUILD_STATE = 'myserver-1.0.1.dev0'
To run your server, you will need to call
DeviceServer.start()
, which is a coroutine. You can
wait for it to be shut down (either via a ?halt
request or by a call to
DeviceServer.stop()
) by using
DeviceServer.join()
). A basic program that will run the
server looks like this:
async def main():
server = Server('localhost', 4444)
await server.start()
await server.join()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
asyncio.get_event_loop().close()
That is sufficient to run a server that will listen on port 4444, and answer
standard requests like ?help
. But of course, you will want to extend the
functionality by adding new requests and sensors.
Requests
Requests are added by writing methods in your class whose name starts with
request_
. A metaclass detects these methods and automatically routes
requests to them. The rest of the method name becomes the request name, with
underscores changed to dashes to match katcp conventions. For example, a
?hello-world
request would be mapped to a call to a
request_hello_world
method.
The first two arguments to the request are fixed: self (the server object)
and ctx (an instance of RequestContext
, which
provides information needed to send informs back as part of the response). The
remaining positional arguments are filled with the arguments passed in the
request. They should be annotated with Python types, which are used to convert
from the encoded form in the message to the appropriate Python type.
See Type conversions below for details of which Python types you can
use and how the conversions work. You can even use argument defaults or a
*args
parameter for variadic requests.
If you need access to the original Message
object, it can be accessed
as RequestContext.msg
.
Your function needs to return an awaitable — the simplest way to do
that is to make it a coroutine i.e., declare it with async def
. If
the awaitable returns nothing, then aiokatcp takes care of automatically
returning an ok
response to the client. To provide extra arguments to the
response, provide them to the awaitable (either as a single scalar or as a
tuple). The same types that you can use as parameter annotations are also
available here.
That’s a lot to take in, so let’s see a simple example:
class MyServer(aiokatcp.DeviceServer):
...
async def request_greet(self, ctx, name: str):
"""Take a person's name and greet them"""
return 'hello', name
Note that the function has a docstring: this is required, as it is used to
generate the response to the ?help
request. The first line is used when
calling ?help
with no arguments, and the full docstring when asking for
help on this specific request.
What about if something goes wrong and the request fails? For that you should
raise the FailReply
exception with a message, which
will be sent to the client according to the katcp protocol. The same will
happen if any other exception is thrown, with the difference being that other
exceptions will include the full traceback and also make an entry in the
server’s logs. Thus, FailReply
should be used when
the error is “expected” (such as due to the client’s mistake) while other
exceptions should generally indicate a serious problem in the server.
There are a few other ways to make replies. You can set a reply by calling
RequestContext.reply()
— in this case your coroutine must return
None
. A convention used by several katcp requests is to respond with a
sequence of informs, followed by an ok
reply with the number of informs.
Such a response can be produced by calling RequestContext.informs()
, and
this may be more efficient than sending informs individually.
Type conversions
The katcp specification documents a number of standard types for use in
messages and sensors. The integer, float and boolean types are used when the
type annotation is int
, float
or bool
respectively,
and are largely self-explanatory. The string type does not specify any
particular encoding. When using str
as the type annotation in
Python, UTF-8 is assumed, and the request will fail if the string is not valid
UTF-8. On return, strings will similarly be encoded to UTF-8. If you need to
process binary data or apply another encoding, use bytes
instead,
which does no conversion (this is also the default if no type annotation is
provided for an argument).
There are also Timestamp
and
Address
classes corresponding to the timestamp and
address types. The former is a trivial subclass of float
and only
really useful in the sensor framework. The latter contains a host IP address
and a port number.
You can use enum types (subclasses of enum.Enum
). The string value in
the message is matched to the name of the enum value, but converted from upper
to lower case and with underscores changed to dashes to match katcp
conversions (e.g. MY_VALUE
in Python becomes my-value
on the wire). If
necessary, you can override this behaviour by defining a katcp_value
attribute on all members of the enum containing the wire representation (as a
bytes
).
It is also possible to register additional type conversions (see
register_type()
). For example, one could register
dict
to convert to and from JSON. The functions
encode()
and decode()
provide access
to the type conversions outside of the request handler machinery.
Informs
As part of replying to a request, you may need to send synchronous informs to
the client. The can be done by calling RequestContext.inform()
(a
coroutine) with the arguments. It automatically includes the appropriate
message ID and name.
A convention used in a number of the standard katcp requests is to reply by
sending a sequence of informs, followed by a reply containing the number of
such informs. This can be achieved using RequestContext.informs()
. Note
that this sends the reply as well, so if you use this your handler must not
return a value. Using this function may be more efficient that sending the
messages individually, because the messages are passed to the TCP socket as a
unit. On the other hand, this does require memory to hold the entire reply.
Informs can also be asynchronous, to inform clients about events occurring in
the server. An asynchronous inform can be sent to all clients using
DeviceServer.mass_inform()
.
Sensors
To provide a sensor from your server, create a
Sensor
and attach it to your server by calling
self.sensors.add
. It is a good idea to do this
from your __init__
method so that it is present before any client
connects, but it is also possible to dynamically modify the sensors (although
at present it is up to the user to send appropriate #interface-changed
informs to clients).
An example:
sensor = aiokatcp.Sensor(
int, 'counter-queries', 'number times ?counter was called',
default=0, initial_status=aiokatcp.Sensor.Status.NOMINAL)
self.sensors.add(sensor)
The first argument is the sensor type, which again is one of the types described under Type conversions. If it is an enum type, the default value is the first enum value, otherwise the default is whatever is appropriate to the type.
To update the sensor value, use Sensor.set_value()
, or simply assign to
the value
attribute.
Note
There is currently no support for providing extra parameters, such as the nominal range for numeric sensors, since these values are marked as deprecated. For discrete sensors, the parameters are automatically computed as the possible values of the enumeration.
The DeviceServer.sensors
attribute implements both a dictionary-like
and a set-like interface to allow sensors to be added and removed. Sensor
metadata (such as the name or type) should not be mutated after creation.
Automatic status
In many cases the status of a sensor (nominal, warn or error) is determined
entirely by the value e.g. an error counter may be nominal when zero and in
warning state when non-zero. Rather than needing to explicitly pass the status
each time, one can provide a status_func callback to the constructor that
takes the sensor value and returns the status. The callback can be overridden
by passing an explicit status to Sensor.set_value()
.
Auto strategy
If a client requests the auto
strategy for sampling a sensor, the default
is to send it all updates as they happen (and unlike the event
strategy,
assignments that do not change the value will still be reported). This default
can be changed by passing auto_strategy and auto_strategy_parameters when
constructing the sensor.
Aggregate sensors
To provide a sensor which has its reading derived from that of a set of other
sensors, such as a total, average or general “device status” sensor, subclass
SimpleAggregateSensor
, and implement
aggregate_add()
,
aggregate_remove()
and aggregate_compute()
to compute the aggregate
value and status from changes in sensor values. Alternatively, if you need more
control (particularly over the timestamp of the aggregate), subclass
AggregateSensor
and implement
update_aggregate()
.
In order to avoid circular references, a
filter_aggregate()
method is provided which excludes the
aggregate sensor itself from operations that happen on the target sensor set.
However, the user may wish to include more complex logic than this, such as to
include only integer datatypes or to exclude other aggregate sensors. In this
case, the method can be overridden. The method should only consider the
identity of the sensor and not its status or value.
Here is an example of a sensor whose value is the sum of all the other
integer sensors. Note that it does not consider that the other sensors might
have statuses such as FAILURE
that make the value invalid. The
subclass does not need to distinguish between existing sensors, added sensors,
removals, and updates. It simply responds to readings being added to or removed
from a set of currently-active readings. Note that the internal state
(_total
) needs to be initialised before calling the superclass
constructor, as that constructor will call
aggregate_add()
for sensors that already exist in
the target set.
class Total(SimpleAggregateSensor):
def __init__(self, target):
self._total = 0
super().__init__(target=target, sensor_type=int, name="total")
def aggregate_add(self, updated_sensor, reading):
self._total += reading.value
return True
def aggregate_remove(self, updated_sensor, reading):
self._total -= reading.value
return True
def aggregate_compute(self):
return (Sensor.Status.NOMINAL, self._total)
def filter_aggregate(self, sensor):
"""Return true for int sensors which aren't self."""
return sensor.stype is int and sensor is not self
In the __init__()
method of the DeviceServer
subclass being
created, you’d include a few lines like this:
self.total_sensor = Total(self.sensors)
self.sensors.add(self.total_sensor)
Cancellation
It is important that request handlers operate gracefully if cancelled (refer
to the asyncio documentation). When DeviceServer.stop()
is called, any
pending requests are cancelled and the client is sent a failure response to
indicate that this occurred. It is not necessary (nor desirable) to swallow
the asyncio.CancelledError
— but it should not leave the server in a
state that will cause it to deadlock or crash.
Keep in mind that asyncio cancellation can only occur at a yield point
(await
or yield from
). Thus, if your handler is completely synchronous
then you do not need to worry.
Graceful shutdown
Apart from the ?halt
request, the server can be stopped from code with
DeviceServer.stop()
or DeviceServer.halt()
. The former is a
coroutine that completes when the server has shut down; the latter is a thin
wrapper which schedules the former as an asyncio task and returns immediately.
The latter is useful as a callback for a signal handler, e.g.
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, server.halt)
The ?halt
request is implemented in terms of halt()
and hence stop()
.
One gotcha is that join()
returns as soon as
stop()
completes. This is a problem if you override
stop()
to shut down other parts of your system after the
katcp server has stopped, because this code may only run after
join()
returns (particularly if it is asynchronous
code). Instead, one can override on_stop()
, which
is a coroutine that does nothing and is intended specifically for this
purpose. It is called by stop()
after it has
completed its work, but before it signals join()
to
wake up.