Henson operates on messages through a series of
callback functions. Each callback type serves a unique purpose.
This is the only one of the callback settings that is required. Its purpose is to process the incoming message. If desired, it should return the result(s) of processing the message as an iterable.
async def callback(application, message): return ['spam'] Application('name', callback=callback)
There can only be one function registered as
These callbacks are called when an exception is raised while processing a message.
app = Application('name') @app.error async def log_error(application, message, exception): logger.error('spam')
Exceptions raised while postprocessing a result will not be processed through these callbacks.
These callbacks are intended to acknowledge that a message has been received and should not be made available to other consumers. They run after a message and its result(s) have been fully processed.
app = Application('name') @app.message_acknowledgement async def acknowledge_message(application, original_message): await original_message.acknowledge()
These callbacks are called as each message is first received. Any modifications
they make to the message will be reflected in what is passed to
app = Application('name') @app.message_preprocessor async def add_process_id(application, message): message['pid'] = os.getpid() return message
These callbacks will operate on the result(s) of
callback. Each callback is
applied to each result.
app = Application('name') @app.result_postprocessor async def store_result(application, result): with open('/tmp/result', 'w') as f: f.write(result)
These callbacks will run as an application is starting.
app = Application('name') @app.startup async def connect_to_database(application): await db.connect(application.settings['DB_HOST'])
These callbacks will run as an application is shutting down.
app = Application('name') @app.teardown async def disconnect_from_database(application): await db.close()