A few years back I was working at a startup that had an enormous product catalogue that was constantly updated and people needed to search through it. And I got into a debate with a colleague of mine because the data was in MySQL and he wanted to start regularly exporting it out and shipping it across into Elasticsearch because he thought the search capabilities would be better. And I thought that’s a great idea in theory but making that run efficiently and reliably
every day was just going to be an ongoing maintenance problem that our startup didn’t need. We debated it back and forth and in the end we went ahead and it turns out we were both right. It was painful to set up, it was never quite fast enough to be up to date and it was reliable except on days when it wasn’t and those days just took all productivity out of the window. But it was worth it, it was a much better experience, it was painful but it was worth
https://www.youtube.com/watch?v=avaOFCgrtfg&ab_channel=VivekRamaswamy
it. Could it have been less painful? When you need, as so often you do, to pull data from these places, do something with it and stick it into those places, is there a good generalised solution to that problem? And that’s today’s topic. We’re going to look at Apache Flink which has been getting a lot of interest in this space and it’s designed to solve exactly that kind of problem. So I’ve called in Robert Metzger who’s on the PMC for Flink and I’m going to get him to explain what Flink
does and how it does it. And in this conversation we managed to go all the way from why does Flink exist and how did he get involved to how sophisticated can you make the data processing part, what options do you have, what languages can you use and how do you recover when the process crashes, what’s its crash recovery and scalability story and lots more. So let’s talk about Flink, let’s ship some data around. I’m your host Kris Jenkins, this is
Developer Voices and today’s voice is Robert Metzger. Robert Metzger, thanks for joining us, how are you doing? Good thank you, I’m really excited to be here on the podcast to talk about Flink today. Yeah, I’m very excited to learn more about Flink because there’s a lot of buzz about
it but I’ve never really sat down with an expert and really understood what’s going on so you can fill me in from scratch. Yeah, let’s do it. So let’s start with your credentials, why are you a Flink expert, how did you get into it and what do you do with it? I try to keep the story as short as possible. So when I was studying in the Technical University in Berlin in like 2012, 2013 or so, I was working with a bunch of PhD students there that were building a big data system called
Stradosphere that was about competing with Apache Hadoop. Basically they had like a database background and thought that there’s so many findings in like 30 years of database research that have not really been considered with Apache Hadoop and they wanted to take these learnings and add them to a big data system. And then a few events happened so we open
sourced this research project, donated it to the Apache Software Foundation, we started fundraising, I became a co-founder of a company called Data Artisans, now it’s called Averika that set out to commercialize Apache Flink as a technology. Back when we started the company, we still intended to build a batch processing framework, but it was actually an outside
contribution to add a stream processing API on top of the engine because the engine, as I mentioned, is based on database principles and databases use this concept called pipeline execution where multiple operators are running at the same time. And this sounds very much like stream processing, like operators running all the time processing data. It was actually quite easy to add an API on top of this engine for stream processing primitives.
And of course then we added a lot more things to the engine like checkpointing, event time and watermark support, the state backends and so on and so forth to make it a, let’s say, real stream processor. And so going back to this contribution, sorry, was that we noticed that there was a lot more interest in our stream processing capabilities compared to the batch processing capabilities that Flink was offering.
And also to some extent, Apache Spark was rising very quickly at that time as an alternative to Hadoop. And I think they were, let’s say, winning the race in the batch processing space. Okay. This is how I learned Flink and all of that because I was basically working as a student on the staff and started a company around it. I helped building the open source community as part of Apache.
And we did a successful exit of the company to Alibaba. They are offering Flink in their cloud product. And like one and a half years ago, I switched to the codable to work on making Flink more accessible as part of a stream processing platform, basically, which is also based on Flink. So your entire career has been in Flink. Yes. Are you happy with that?
Yeah, but of course, like after, I don’t know, 10 years or so almost in the same technology, you start to wonder if you also need to look left and right. But of course, I mean, Flink is a really big project, not as big as like the Linux kernel, but there are like subsystems in Flink, completely different APIs and abstractions. And you can go very deep in many areas. So there’s a lot of very different areas in Flink to work on very different problems.
Yeah. And a large enough project becomes a whole sub world, right? A little universe in itself. Yeah. Yeah. So there’s a lot of things we need to get into, but before we get there, just for context, give me a typical example of how someone uses Flink to solve some problem. So let’s say you have some relational database like MySQL, and you’re noticing that your full text search queries are taking too much time on that database.
So you want to offload these types of queries from your transactional database into something that is more purpose-built, something like Elasticsearch. So you would use DBSium for change data capture to follow the changes on this transactional database, then perform some joins in Flink to combine data from different tables into let’s say one big document, and then you
ingest these documents into Elasticsearch. So basically you’re using, and you’re not using DBSium as a standalone system. You could do that and use Kafka in between, but in this case, DBSium would run as part of a Flink connector. So there’s connectors in Flink that are based on DBSium that allow you to follow changes on a transactional database. And then you can use, for example, Flink SQL to express this
join of multiple tables and filtering, whatever, aggregations, and then you use a Elasticsearch sync in Flink to write this data to Elasticsearch. And in effect, you have something like a continuous real-time join from multiple different tables. And whenever there’s an update, you basically instantly get an update in Elasticsearch to always have your data fresh and up-to-date. Okay.
So the main thing it’s doing for you is in combination with DBSium, slurping that data out from one place, a certain amount of processing to transform it as you load it into another place. I think the core of Flink is the processing. The core really is how Flink is able to do this real-time join in this particular example. There’s many other examples. You can also do machine learning stuff with it. You can do aggregations. But that’s the core of what Flink is
mostly about, real-time stream processing. The connectors are more a necessity to get your data in and out of Flink. Yeah. Yeah. It’s not going to do much without those. Yeah. But the essence of the project is… I mean, there are other stream processors like Kafka Streams, for example, which are only working with Kafka as a data source, but Flink is independent of data sources. Okay. How many data sources and sinks does it support? Roughly.
So I think the core project has maybe like eight-ish, and then there’s probably like 20 or 30 that are in other repositories or somehow available publicly. But all the important stuff is covered, like Pulsar, Kinesis, Kafka. On the Sink side, you can do any JDBC database. You can do writing to S3. You can… I mean, any file system actually, you can monitor for changes. You can write to these file systems in
various bucketing, like time-based bucketing, database bucketing, whatever. So there’s a ton of options for getting your data in and out. And to be honest, if there is a rare case where you have a custom system, then it’s also not terribly hard to build your own source or sink in Flink. Okay. Does it ever get used for… Is it always transforming the data and sending it to other places, or does it ever get used
just for straight reporting? I mean, you can use Flink… So what do you mean by reporting? I guess what I mean is, could you have a CSV or a PDF as your sink? A CSV for sure that’s supported out of the box. So you can write CSV or more popular like Parquet files, for example, or you can write
in the iceberg formats to S3 or something. So you can definitely use Flink also for getting your data in the shape that you want, like for loading your data warehouse, for example. Okay. I only asked that because I’m always thinking, what’s the smallest possible change I could make to introduce this to a project? Yeah. Okay. In that case, I think we should dig down into how it actually works. What do I need to understand about Flink’s architecture?
So Flink has a few basic building blocks that you use to express your workload. So the core is a dataflow graph, like a set of operators, let’s say a Kafka source and a CDC source, and then a bunch of operators like transformations, filters, aggregations,
joins, and they can be arbitrarily complex. Trust me, they can fill entire screens if you visualize them with hundreds of operators, really. I mean, hundreds is maybe an extreme case, but 30, 40 operators is really not unheard of. And you basically use one of the APIs in Flink, like the Java or Scala API or the SQL API or the Python API to
build this dataflow graph. So basically to define the structure of your Flink job and to define either the logic of predefined operators, so Flink has, for example, windowing operators that allow you to window data into hourly buckets or something, or session windows where you want to analyze like basically dynamic windows that just have the size of a certain user activity. And so you define the structure and then
you also define the logic of what is happening inside these operators. And then the engine knows how to execute this graph efficiently on a cluster of machines. So if your workload is increasing, you can change the number of instances, the parallelism of your job in Flink, and then you will execute this on more machines. Okay.
I’m just trying to think how that’s going to look. So what do I do if I’m using SQL? Am I doing something like create thing that looks like table, but is actually a slurping from my SQL? Pretty much, yes. So then I can go into that in my from clause later. Exactly. So basically you’re defining a stream as like a table in Flink SQL. So when you want to read from a number of Kafka brokers or whatever, like Kafka clusters,
you do a create table statement for each of these data sources. So Flink SQL supports all the data, all the connectors that I’ve mentioned. So you can create, they do like whatever. If you have a Kafka topic called users, you do basically create table users, and then you define the fields in your Kafka topic. Let’s say the data is in JSON, so you define like a JSON
de-serializer, and then Flink knows how to read the JSON data from your Kafka topic to analyze it. So you do like a create table to expose your Kafka topic in Flink SQL. And then if you want to do, for example, a filter, you just do select stuff from this table, and then in the where clause, you can define your filter. Or you read from multiple tables and you do a join.
Right. So I’m going to make you flinch again, because I got to mention CSV again. Do I also do like create table as output.csv and then insert into output table select stuff from? Exactly. Yes. It just looks like regular SQL type operations. Yes. Yes. So the underlying storage, the underlying storage is just outside of Flink. Exactly.
Flink doesn’t come with any storage. So you define S3 as a storage or a local file system or whatever. Yeah. Okay. Okay. So the next question that raises, you say it will automatically distribute the jobs across nodes. How is that working? So there’s a central compo. So when you’re building this data flow representation in your, let’s say Java API or in SQL, you’re
typically doing this on some kind of client instance. So either it’s the SQL shell that you’re using, or it’s a Java application that you’re implementing. And the Java application knows how to connect to what is called the job manager process of Flink. Job manager is the central component that is coordinating the execution of a Flink job or actually also multiple Flink jobs. So Flink supports both single or multiple
jobs on a job manager. And the job manager is creating a distributed representation of this data flow graph and is basically splitting out. So if you have Kafka sources, some join operator and a CSV sync, then it will create multiple instances of these operators. If you’re running this on 10 machines,
then it will create 10 Kafka sources, 10 join operators and 10 CSV syncs. And the work will then get distributed from the job manager to the so-called task manager. So task managers are the workers that are running on all the cluster nodes. And as you add or remove task managers, you can scale up or scale down the resources that are available for processing. Can you do that dynamically? Can you just throw more machines at it?
Yeah, that’s a feature that we’ve added in Flink, I don’t know, like 113 or 14, I don’t know, maybe earlier. So it is a feature that has been added to Flink, let’s say, recently. For a given value of recent. Two years ago or so. That allows you, it’s called the reactive mode in Flink. And it basically allows you to add or remove machines and Flink will dynamically grow and shrink.
And that’s pretty neat if you’re using something like Kubernetes horizontal pod auto scalars. So Kubernetes will monitor CPU usage. And if it goes too high, it will just add more machines. You can do the same with like EC2 auto scaling groups or something. Okay, that makes sense. And is there, I mean, I’m just trying to figure out this SQL interface. Is this something like the notion of a temporary table? Can I join two things, stick them into
one logical table while I think about how I want to select that data out? So you don’t have to worry too much about the representation, like how Flink is representing a table or a join in your SQL statement. So Flink will, of course, internally, so it really depends on what you’re doing. So if you do a, let’s go back to the example that we had earlier, some user table.
When you do like a select star, and let’s say you’re reading from Kafka, you’re filtering and you’re writing to Kafka. Then it’s really data is just flowing through the system. It’s never materialized in the system. Just passing through and we are filtering out some, like if you do like insert into target, select star from users where X,
smaller 10 or whatever, then it will filter out all the elements based on that predicate. But of course, if you do something like a join and your input, so let’s say you have this user table and you want to enrich some information in this user table from a database table. So you have this stream of user data and you have a MySQL
database that contains, let’s say the address of each user. You could do something like a database query for every user, like a query on MySQL for every user that comes. But of course you would bombard your MySQL server with queries for every incoming record in the stream processor. The stream processor would actually be slowed down to the speed of your MySQL server. And that’s not what you want for a
massively parallel system like Flink. So what you do instead is that you do change data capture on this MySQL table. So you do an initial snapshot of all the data in the MySQL table, load it into Flink state, and then you subscribe to further updates from that table. So whenever a user address is changing, we are updating this data in Flink state. And Flink can then do the lookup locally in its state backend.
So you don’t have to do any network access for this particular state lookup. It will just be in the local ROCCP instance of the task manager. Okay, yeah. So that’s a lookup join in Flink SQL. And that’s where Flink SQL will decide that one side of the join, the side with the address information from MySQL needs
to be stored in Flink state. But the user data that is flowing will not be persisted. But there are cases where you’re joining two tables and you’re materializing both sides of the join. And whenever any of the two inputs are updating, you produce an output event that will send downstream and then the sink has to decide what to do with it. Okay, that leads naturally to two questions. One’s about time and windowing, but first you’re saying that like
for the address thing, you’re maintaining what is essentially a local read-only replica of MySQL’s address table. Yes. How is that state management working in Flink? Because you said Flink doesn’t do any storage. So what’s going on? It is okay. I was lying to you. Let’s just say it was for the sake of making it easy to understand. We can separate between logical and physical storage. Sure. So Flink, the project itself doesn’t
provide any permanent storage like a MySQL server to persist your data in your database tables. But Flink needs a lot of temporary storage and that’s what we generally call state. So state can be as small and simple as the current Kafka offset that you’re reading from and it can be as big as your entire address table from MySQL.
We have customers that are using Flink with like 250 gigs of state in their joints and even that is small. Like if you talk to Apple or Netflix, which are using Flink in production, they’re really talking about terabytes of state that they’re maintaining across their task managers. Okay. So a lot of things that we’ve implemented
in Flink are about the state management. Because I think that’s also why I think we say on the website, Flink is doing stateful stream processing. So this ability to maintain state inside your operators is what makes Flink really interesting. Because if you just, without state, Flink would just be a system that allows you to move data from left to right, but it cannot really maintain
or build up any knowledge about the outside world. So with Flink you can basically, you have memory. You know what happened before and you can make some decisions based on what happened before. Like in the case of this lookup join, you know what is in the MySQL table and you know that Flink is always making sure that you’re not losing any data. So even if you kill one of those task managers or multiple of these task managers, Flink
is able to recover the state and restore it for you and you will never notice that there was any loss of state temporarily because the task manager has died. Okay. This presumably is also the same mechanism you use to calculate aggregates that are live updating, right? Exactly. So you can store aggregates in state. Yes. Yeah. Let’s probably talk a litt…