Subscribe

Real-time Feature Pipelines in Python ⚡

Jan 22, 2024

Today you will learn step-by-step how to build production-ready real-time feature pipelines with Python using the open-source Quix Streams library.

Full source code implementation + FREE video lecture

→ All the code I show today is in this Github repository.
→ And if you prefer to watch, here is the video on Youtube

 

The final features are plotted in real-time on this dashboard 📈

 

The problem

 

Imagine you want to build a trading bot for crypto currencies using ML.

Before you even get to work on your ML model, you need to develop a real-time feature pipeline that produces the data your model need both at training time, and at inference time.

Pipeline design

 

A real-time feature pipeline has 3 steps:

  • Ingestion step → connects to an external real-time data source, for example, raw trades from the Kraken Websocket API

  • Transformation step → transforms this raw data into meaningful signals for your ML model, for example, Open-High-Low-Close (OHLC) candles.

  • Storing step → these features are saved in a feature store, from where your ML models can retrieve them.

     

In a real-world setting, each of these steps is implemented as a separate service and containerised with Docker. Communication between these services is implemented with a message broker like Apache Kafka or Redpanda.

This way you make your system scalable, by spinning up more containers as needed, and leveraging Kafka consumer groups.

This is all great, but

How do you implement this in practice? 🤔

 

Development

 

We will use an open-source Python library called Quix Streams to develop our real-time feature pipeline.

Quix Streams ❤️

is a cloud native library for processing data in Kafka using pure Python.

With Quix Streams you get the best from both worlds, low-level scalability and resiliency from Kafka, plus an easy-to-use Python interface.

 

In our example, we have 3 pipeline steps, each of them implemented in pure Python in their own subdirectory:

  • trade_producer (ingestion) → reads data from Kraken and pushes it into a Kafka topic.

  • trade_to_ohlc (transformation) → transforms these raw trades into OHLC candles, which are typically used to derived trading indicators, and push them into another Kafka topic.

  • ohlc_to_feature_store (destination) → saves the final features in the Hopsworks Feature Store.

Plus, a containerised Streamlit dashboard that fetches and plots these features in real-time.

 

Let’s take a closer look at the transformation step trade_to_ohlc

 

Real-time data transformation with Quix

 

Transforming real-time data in Quix is as easy as using a standard Pandas-like API, thanks to the Quix Streaming Data Frame object.

Streaming Data Frames ⚡

offer a Pandas-like API to work with real-time data. Which means, you can add, remove and transform columns as you do in Pandas, but for real-time data ⚡

To transform raw trades into OHLC 10-seconds candles, we use stateful window operators. More precisely, we build 10-second tumbling windows to bucket trades, and then compute bucket-level metrics, like

  • Open → first price in the window

  • High → highest price in the window

  • Low → lowest price in the window

  • Close → last price in the window

You can easily implement tumbling windows with Quix using the newly added tumbling_window API

sdf = sdf
    .tumbling_window(timedelta(seconds=10), 0)
    .reduce(reduce_price, init_reduce_price)
    .final()

 

Deployment to the Quix Cloud ❤️☁️

 

The pipeline we built is based on open-source tools like Kafka and Docker containers, which means it is ready to be deployed to a Kubernetes cluster.

So If you are already managing a Kubernetes cluster and Kafka message broker for your microservices, then go for it.

Otherwise, if you do not want to manage any infrastructure, I recommend you use Quix Cloud.

 

‍Quix Cloud ❤️☁️

provides fully managed containers, Kafka and observability tools to run your applications in production.

To deploy your pipeline to Quix Cloud you need to

→ Sign up for FREE
→ Create a Quix Cloud Project and an environment, and
→ Fork this repository and link it to your newly created Quix Cloud environment.

 

Now it is your turn 🫵

 

If you wanna learn real-time ML, you need to get your hands dirty.

→ Go to the Quix Streams repository
→ Give it a star on Github ⭐, and
→ Start building your own pipeline 👩🏽‍💻👨🏻‍💻

And let the fun begin.

Happy building! Happy Coding!