Earlier this year, I wrote a post about exploring Apache DataFusion as a foundation for a streaming framework:
At the time, it was mostly theoretical: I had started working on a streaming framework for a client of mine, and I only had a couple of months of Rust and DataFusion experience.
Over the course of this year, the framework became feature-complete, and it’s being integrated as a core product offering. I hope to write more about it one day; there is a plan to open-source it eventually.
At the same time, I started to work on the new Irontools extension called Iron Vector. It’s a native, columnar, vectorized, high-performance accelerator for Apache Flink SQL and Table API pipelines. I’m building it with Rust, Arrow and DataFusion. Feel free to check the announcement post with more details here.
So, this post is my attempt to summarize my actual, hands-on learnings from building several streaming products with what I call the RAD Stack: Rust, Arrow, DataFusion.
Following Up
First, let me follow up on a few key areas I identified in the previous post.
Checkpointing and Fault Tolerance
I implemented the classic Chandy–Lamport style algorithm for checkpointing. I also added a simple, pluggable state backend (with Postgres support being the target).
The main use case for this was storing Kafka consumer group offsets, but only when all nodes in a pipeline acknowledge processing (up to a certain epoch). You get at-least-once delivery guarantee.
The implementation was pretty straightforward. It’s important to have decent test coverage to catch edge cases.
I really liked Postgres as the main state backend: it’s perfect for storing small amounts of data (like the consumer offsets). If you need to modify the consumer position, you just modify a table row in Postgres.
If I were to look at storing larger amounts of data, I’d seriously consider using SlateDB.
Scaling Out Beyond a Single Node
As you probably guessed, we ended up relying on Kafka consumer groups quite a bit, Kafka Streams style. You can spin up many pods in parallel; each gets its own share of partitions. Adding autoscaling can be pretty straightforward.
In other scenarios, we could rely on the data source characteristics to parallelize data processing. E.g., if a data source supports range scanning, it’s possible to spin up several pods in parallel and assign a different range to each. It’s definitely not a generic solution.
Also, in many situations, having just a single pod was enough. You can go pretty far with vertical scaling.
I didn’t have a chance to explore Ballista or Ray.
Connectors: Building Your Own
I built several connectors (typically implemented as TableProviders), but there are no interesting insights to share: you take an external client, wrap it into DataFusion primitives, hook it up to the checkpointing system, and it’s done. Converting to/from Arrow can be tricky, but Arrow support is getting better1.
I do want to share my experience about using datafusion-table-providers: a somewhat official repository of the community “connectors” (TableProviders).
My experience aligns with the observation I’ll make below in the post: there is just not much awareness about streaming use cases at the moment. Look at the way the Postgres sink is implemented:
It starts a transaction
It writes all input RecordBatch records as inserts. It keeps writing until the input stream is terminated:
while let Some(batch) = data.next().await
Finally, it commits the transaction
Can you see the problem? The input stream never ends when running in the streaming environment. So this logic has to be modified to introduce triggers (e.g. record-based or time-based) to periodically commit the current transaction and start a new one.
Implementing a Plugin System
If you build a connector (or any kind of library) using Java, you typically just need to:
Implement a certain interface
Package it in a JAR file
Make it available in the classpath, either by using a dependency manager or simply adding the JAR file to the classpath
This is not the case with compiled languages like Rust. With Rust, the path of the least resistance is building static binaries. This means recompiling your application every time you change any of your dependencies, including external libraries. This can be very painful at scale or when dealing with external contributions.
Using dynamic, not static linking, is the typical answer to this problem. However, dynamic linking with Rust comes with its own challenges. Rust ABI is unstable, so a minor Rust version difference (or even having different compiler flags enabled) can lead to compatibility issues.
Using Foreign Function Interface (FFI) and crates like the abi_stable is the standard workaround. But it’s not for the faint of heart! There is a great post series called A Plugin System in Rust that walks you through the implementation end-to-end, covering many learnings along the way. However, even if you follow all best practices, you can still end up with many unsolved problems. For example, there are no good solutions to share Tokio runtimes. So, if your plugin needs to perform async functions (and it almost certainly does), you have a choice between creating a Tokio runtime per plugin (which becomes prohibitively expensive with every plugin you add) or exposing a small subset of functionality via FFI-safe structures (thanks to async_ffi).
Whew, I know. It’s messy.
Performance Gain Is Visible
Performance is one of the reasons we’re doing it in the first place, and oh boy, it delivers. I ran hundreds of benchmarks for the past few months, and it’s not uncommon to see 2x, 3x or even 5x throughput increase after rewriting something with the RAD stack. I believe Arrow is one of the reasons behind it. Columnar, vectorized execution can be much more efficient for streaming workloads.
This aligns with benchmarks by Arroyo (which claimed 3x - 5x throughput increase) and RisingWave (with 2x throughput increase as a norm for stateless workloads).
DataFusion is not that high in the ClickBench results, but it doesn’t mean it’s not fast. It’s just that there are faster solutions for certain types of queries, but none of which come even close when it comes to extensibility, which is covered in the next section.
Extending DataFusion: Lessons Learned
DataFusion was initially designed as a batch query engine. There were many improvements over the years to make it more compatible with streaming semantics. Sometimes, the batch nature is very obvious (e.g. operators with EmissionType::Final: Records are only emitted once all input has been processed). But sometimes, it’s so subtle that it’s very hard to notice until you hit issues in production.
Pay Attention to Your Functions
DataFusion has a variety of standard system functions commonly found in any SQL database: comparison, string, math, etc. Most of the functions are either immutable (always return the same output when given the same input) or volatile (may change the return value from evaluation to evaluation).
There are also several stable functions. From documentation:
A stable function may return different values given the same input across different queries but must return the same value for a given input within a query.
These functions are now(), current_time() and current_date(). From the batch engine perspective, it may make sense to keep the value of now() the same during the execution of a given query. However, in the streaming context, we only have one never-ending query! This means that by default, now() will always return the same value (captured as the beginning of execution), even if called a week after the streaming query was started.
Thankfully, it’s very easy to come up with custom implementations of those functions that are basically the same as the built-in ones, but marked as Volatile. Then it’s trivial to override a system function with a custom function that has the same name: DataFusion doesn’t really differentiate between system and user-defined functions. They use exactly the same API and registration mechanism.
Stateless and Incremental Doesn’t Mean Streaming Friendly
Projections and filters form the foundation of stateless stream processing. Surely such primitive operators support streaming execution. Right? Right???
Well, kinda. Let’s look at the filter (FilterExec operator). It propagates the input emission type, so if the input emits data incrementally, the filter operator does the same.
However, there is one implementation detail that can come back to haunt you. If the filter operator filters out all rows from the current batch, it doesn’t emit an empty batch. Instead, it just waits for the next batch that’s not empty 🫠. This is ok for the batch engine, but a streaming system could be relying on a continuous stream of batches (even empty ones) for many reasons (e.g. observability).
And again, DataFusion makes it very easy to replace the built-in filter operator with a custom one. Just fork it, tweak the logic, and then add a PhysicalOptimizerRule that replaces any FilterExec with your custom operator. It takes a few lines of code. Seriously, is there a query engine out there that allows you to replace the operator behind such an important, fundamental feature (a WHERE clause) with this kind of simplicity?
Not Optimizing Is Sometimes a Good Thing
Imagine you have an Iceberg table as a data source and you want to run two queries against it. Naturally, these queries have different projections and filters. Any reasonable query engine implements optimizations like projection and predicate pushdown, which limit the amount of data returned by the source. From the query plan perspective, you query the same table twice, but you expect different outputs.
Now, take a Kafka topic as a data source. Kafka doesn’t support projection or predicate pushdown2. And when you issue two queries with different projections and filters, you actually DO want to query this source once, and apply the projections and filters later. I believe it’s called scan sharing in some literature. But it’s not the way DataFusion (and most query engines) are designed to behave, so you end up reading the same topic multiple times, even though it could’ve been done just once.
I haven’t implemented a good workaround for this just yet, but I believe it’ll involve disabling certain optimizer rules and broadcasting the data from one operator to many. Btw, Apache Flink has scan sharing implemented: you can find the reuse marker in its query plans.
I could go on, but I think these findings paint a clear picture: DataFusion is extensible enough to support anything you want, but it takes some effort to get there.
Community Alignment Is Not Quite There
As you probably understand by now, streaming support is not a very high priority for the DataFusion project. It feels like, sometimes, it happens to support some streaming primitives by accident (and because it’s actually a very good thing to do even in many batch-oriented scenarios).
However, the community is generally interested in this:
So, I think it’s just a matter of time and the number of contributors involved. Folks from Synnada have been contributing a lot on the streaming side, and I’m very grateful for that! I hope that more contributors can start thinking about streaming scenarios, e.g. modifying that Postgres sink to support streaming execution can actually be beneficial for batch workloads too (you don’t want to keep open transactions for long).
Conclusion
Rewriting Bigdata in Rust is slowly happening. Arrow is becoming the standard for data exchange. I haven’t even mentioned Arrow Flight, ADBC and Substrait (which Iron Vector uses).
Overall efficiency (and cost efficiency in particular) has been quite a focus in the past few years. I hope the RAD stack is here to stay: let’s try to get more from the infrastructure we have.
Irontools
Iron Vector is a native, columnar, vectorized, high-performance accelerator for Apache Flink SQL and Table API pipelines.
It’s easy to install, requires no code changes, and can increase compute efficiency by up to 2x (as of now).
Check the announcement here.
Events
Find me at the following events next month:
Polyglot Unconference, Vancouver 🇨🇦, October 11th
Flink Forward, Barcelona 🇪🇸, October 15th - October 16th
Current, New Orleans 🇺🇸, October 29th - October 30th
Also, this is one of the areas where LLMs can really shine: hand-writing boilerplate conversion logic from one format to another is no fun.
You could filter data by partition or by the Kafka message timestamp, but that’s pretty much it.