Understanding the use-cases and how Faust works, as well as some of the more nifty features.

Introduction

Since writing the last post, I have changed workplaces from Deliveroo to ComplyAdvantage. Here I’m working in one of the Media Insights teams that deals with processing articles from the web. This all sits in a mahoosive event pipeline which is linked by Kafka.

I say linked by Kafka - but that is more the aim, rather than the current design! As such we are now experiment with a few tools available to us and the team made a decision on using faust for this.

Annotations?!

Coming from Rubyland, seeing annotations on top of functions made me suspicious at first, but after reading a bit more about them, turns out they are just decorators or wrappers around your method. Annotations is the primary way that Faust works like.

This would look something like

@app.task
def my_async_task():
  print("Doing work")

Lets get into Faust

So how would an app look like working with Faust? Well the common patter is to use Faust for when we want to stream events, ie: we have an input Kafka queue, we modify the message, we then push this on an output queue. This would look something like this: Basic Faust app

Faust makes this use-case very simple and is basically the bread and butter of its offering. The code to get something like that up and running is about 20LOC, even less if you remove the DLQ handling, but I find this to be actually quite important for almost any application.

app = faust.App(
    app_config.name,
    broker=f"{KAFKA_BROKER}",
)

input_topic = app.topic("input_topic", partitions=1)
dead_letter_topic = app.topic("dlq")
output_topic = app.topic("output_topic", partitons=3)


@app.agent(input_topic)
async def input_stream(stream):
    async for doc in stream:
        try:
            result = BusinessLogic(doc).process
            output_topic.send(result)
        except FailedDocumentProcessing as error:
            await dead_letter_topic.send(value=failed_doc)
            raise error
        finally:
            yield None

How about some extras?

Some things come to mind fairly quickly like, how do I add monitoring to this? Faust comes built in with some sensors and you can add your own if you feel the default metrics don’t tell you enough. I’ve not experimented with that just yet - the default metrics are plenty to go on!

To do that you’d need to install Faust like this: pip3 install faust[datadog] and update your app initialization code like so:

from faust.sensors.datadog import DatadogMonitor

app = faust.App(
    app_config.name,
    broker=f"{KAFKA_BROKER}",
    monitor=DatadogMonitor(
        host=statsd_config.host, port=statsd_config.port, prefix=app_config.name
    ),
)

The default metrics provide a lot of information - so far the main one we used was committed vs read offsets, which are provided at partition level. Faust deals well with crashing services and offsets are not skipped which is great!

The fork

The main faust repository is not seeing any new development, however the community is now supporting a fork here which has quite a lot of improvements over the latest version of the original library. One of the main gains we saw right off the bat was the performance - for our particular use-case we saw almost a 2x speed increase going from the original library to the fork.

Where it kind-of lacks

Even considering the fork and the development happening there, faust is lacking in some areas. First of all - it would be around documentation - the fork has not updated any documentation, and the main repo covers only really the main use cases, if you are looking for anything non-standard - tough luck! Such an example was around monitoring, which I had to dig from the community Slack channel, which shouldn’t really be an off the beaten path type feature.

From what I’ve uncovered so far, there isn’t an easy way for us to control options for the underlying aiokafka client, such as the polling frequency, number of messages, byte sized pulled etc - which is a dis-advantage especially when trying to performance tune this in production.

Writing tests for the library was definitely challenging and we hit a few roadblocks, such as it requiring that yield None or else obscure error messages would come up. I’ll do a bit more writing on this once we figure out how to get any tests running using the fork, as that has proven very uncooperative.

Conclusion

We’ve still to deploy this into production at full-blown scale, and I’m sure there’ll be a fair few more learnings along the way - so definitely stay tuned for that - an update post will be happening for sure!