Henson operates on messages through a series of asyncio.coroutine() callback functions. Each callback type serves a unique purpose.


This is the only one of the callback settings that is required. Its purpose is to process the incoming message. If desired, it should return the result(s) of processing the message as an iterable.

async def callback(application, message):
    return ['spam']

Application('name', callback=callback)


There can only be one function registered as callback.


These callbacks are called when an exception is raised while processing a message.

app = Application('name')

async def log_error(application, message, exception):


Exceptions raised while postprocessing a result will not be processed through these callbacks.


These callbacks are intended to acknowledge that a message has been received and should not be made available to other consumers. They run after a message and its result(s) have been fully processed.

app = Application('name')

async def acknowledge_message(application, original_message):
    await original_message.acknowledge()


These callbacks are called as each message is first received. Any modifications they make to the message will be reflected in what is passed to callback for processing.

app = Application('name')

async def add_process_id(application, message):
    message['pid'] = os.getpid()
    return message


These callbacks will operate on the result(s) of callback. Each callback is applied to each result.

app = Application('name')

async def store_result(application, result):
    with open('/tmp/result', 'w') as f:


These callbacks will run as an application is starting.

app = Application('name')

async def connect_to_database(application):
    await db.connect(application.settings['DB_HOST'])


These callbacks will run as an application is shutting down.

app = Application('name')

async def disconnect_from_database(application):
    await db.close()