Subscribe

Feature engineering in real-time with Kafka, Docker and Python

Mar 01, 2024

Let me show you step-by-step how to do feature engineering in real-time using Apache Kafka, Docker and Python.

 

The problem

Feature engineering is about transforming raw data into predictive signals for your Machine Learning model.

And for many real-word problems, you need to run this transformation in real-time. Otherwise, your features will be irrelevant.

For example 💁

Imagine you build a Machine Learning model that can predict stock price changes in the next 1-minute, but the input features the model needs are computed with an 1-hour delay.

No matter how good your model is, the whole system will NOT work 😔

So, as a professional ML engineer, you need to go beyond static CSV files and local Jupyter notebooks, and learn the tools that help you build production-ready real-time systems.

Let’s go through a step-by-step example.

 

Wanna get more real-world ML(Ops) videos for FREE?
Subscribe to the Real-World ML Youtube channel

 

Hands-on example 👩‍💻🧑🏽‍💻

Let me show you how to build a real-time feature-engineering service that

  • reads raw crypto-trade data from a Kafka topic (input topic),

  • transforms this raw data into Open-High-Low-Close features using stateful window aggregations, and

  • saves the final features into another Kafka topic (output topic), so the data is accessible to downstream services.

 

These are the tools we will use to build this:

  • Python Poetry to package our Python code professionally 😎

  • Quix Streams to do window aggregations in real-time ⚡

  • Docker to containerize our service, and ease the deployment to a production-environment 📦

  • Docker Compose to test things out locally.

  • A Kafka message broker, in this case Redpanda, to enable communication between our service and the rest of the infrastructure.

 

Steps

You can find all the source code in this repository
→ Give it a star ⭐ on Github to support my work 🙏

These are the 4 steps to build a real-time feature-engineering service.

 

Step 1. Create the project structure

We will use Python Poetry to create our ML project structure. You can install it in your system with a one-liner.

$ curl -sSL https://install.python-poetry.org | python3 -

Once installed, go to the command line and type

$ poetry new trade_to_ohlc --name src

to generate the following folder structure

trade_to_ohlc
├── README.md
├── src
│   └── __init__.py
├── pyproject.toml
└── tests
    └── __init__.py

Then type

$ cd trade_to_ohlc && poetry install

to create your virtual environment and install your local package src in editable mode.

 

Step 2. Start a local Kafka cluster with Docker Compose

To develop and run our feature engineering service locally we first need to spin up a local lightweight Kafka cluster.

And the simplest-most-straight-forward way to do so is to use Docker an Redpanda.

What is Redpanda? 🟥🐼
Redpanda is a Kafka API-compatible data streaming platform, written in C++, that eliminates most of the of complexities that the original Apache Kafka has, while improving performance.

To spin up a minimal Redpanda cluster with one broker you add this docker-compose.yml

version: '3.7'
name: ohlc-data-pipeline
services:

  redpanda:
    container_name: redpanda
    image: docker.redpanda.com/redpandadata/redpanda:v23.2.19
    ...

  console:
    container_name: redpanda-console
    image: docker.redpanda.com/redpandadata/console:v2.3.8
    ...
    depends_on:
      - redpanda

Now go to the command line and start your local Redpanda cluster by running

$ docker compose up -d

Congratulations! You have a Redpanda cluster up and running.

It is time to focus on the feature engineering logic.

 

Step 3. Feature engineering script

Our script needs to do 3 things:

  1. Read input data from a Kafka topic,

  2. Transform this data into OHLC features, and

  3. Save the final data into another Kafka topic.

And the thing is, you can do the 3 things in Python using the Quix Streams library.

Install the Quix Streams library inside your virtual environment

$ poetry add quixstreams

And create a new Python file to define your feature engineering

dashboard
├── README.md
├── main.py
├── src
│   └── __init__.py
├── pyproject.toml
└── tests
    └── __init__.py

Inside this main.py file you

  1. Create a Quix Application, which will handle all the low-level communication with Kafka for you.

    from quixstreams import Application
    app = Application(
        broker_address=os.environ["KAFKA_BROKER_ADDRESS"],
        consumer_group="json__trade_to_ohlc_consumer_group"
    )
  2. Define the input and output Kafka topics of our application

    input_topic = app.topic('raw_trade', value_deserializer="json")
    output_topic = app.topic('ohlc', value_serializer="json")
  3. Define you feature engineering logic, in this case we use 10-second window aggregations, using a Pandas-like API.

    # Create your Streaming data frame
    sdf = app.dataframe(input_topic)
    
    # 10-second window aggregations
    sdf = sdf.tumbling_window(timedelta(seconds=WINDOW_SECONDS), 0) \
        .reduce(reduce_price, init_reduce_price) \
        .final()
  4. Produce the result to the output topic

    sdf = sdf.to_topic(output_topic)
  5. Start processing incoming messages with

    app.run(sdf)

If you now run your feature engineering

$ poetry run python main.py
... 
INFO:quixstreams.app:Waiting for incoming messages

your script will just hang, waiting for incoming messages to arrive from Kafka.

 

Why there are no incoming messages? 🤔

Simply because there is no data in the input Kafka topic “trades”.

To fix this you can either

  1. Write a second script that dumps mock raw data into the topic, or

  2. Use the actual service that will generate production raw data.

In this case, I opted for 2, and here you can find the complete implementation of the trade producer service.

 

 

Step 4. Dockerization of our service

So far you have a working feature engineering service locally. However, to make sure your app will work once deployed in a production environment, like a Kubernetes Cluster, you need to dockerize it.

For that, you need to add a Dockerfile to your repo

trade_to_ohlc
├── Dockerfile
├── README.md
├── main.py
├── poetry.lock
├── pyproject.toml
├── src
│   └── ...
└── tests
    └── __init__.py

with the following layered instructions

 

From this Dockerfile you can build your Docker image

$ docker build -t ${DOCKER_IMAGE_NAME} .

and run the container locally

$ docker run ${DOCKER_IMAGE_NAME}

Another way to spin up your service is to add it to your docker-compose.yml file, together with the trade producer service.

version: '3.7'
name: ohlc-data-pipeline
services:

  redpanda:
    container_name: redpanda
    image: docker.redpanda.com/redpandadata/redpanda:v23.2.19
    ...

  console:
    container_name: redpanda-console
    image: docker.redpanda.com/redpandadata/console:v2.3.8
    ...
    depends_on:
      - redpanda

  trade-producer:
    container_name: trade-producer
    restart: always
    build:
      context: "./trade_producer"
    ...
    depends_on:
      - redpanda

  trade-to-ohlc:
    container_name: trade-to-ohlc
    restart: always
    build:
      context: "./trade_to_ohlc"
    ...
    depends_on:
      - redpanda

Now go to the command line and run everything with

$ docker compose up -d

Congratulations! You have a production-ready real-time feature engineering script up and running.

 

Now it’s your turn 🫵🏻

The only way to learn ML is to get your hands dirty.

So, go to the Github repo I created, git clone it, and adjust it for your problem.

And remember,

No pain. No gain.

Let me know in the comments if something is not clear.