Flink 101

A few years back I was working at a startup that had an  enormous product catalogue that was constantly updated and people needed  to search through it. And I got into a debate with a colleague  of mine because the data was in MySQL and he  wanted to start regularly exporting it out and shipping it  across into Elasticsearch because he thought the search  capabilities would be better. And I thought that’s a great idea in  theory but making that run efficiently and reliably

every day was just going to be an ongoing maintenance problem  that our startup didn’t need.  We debated it back and forth and in the end we went ahead and  it turns out we were both right.  It was painful to set up, it was never quite fast enough to be up  to date and it was reliable except on days when it wasn’t and those  days just took all productivity out of the  window. But it was worth it, it was a much better  experience, it was painful but it was worth 
https://www.youtube.com/watch?v=avaOFCgrtfg&ab_channel=VivekRamaswamy

it. Could it have been less painful?  When you need, as so often you do, to pull data from these  places, do something with it and stick it into those places, is  there a good generalised solution to that problem?  And that’s today’s topic. We’re going to look at Apache Flink which  has been getting a lot of interest in this  space and it’s designed to solve exactly that kind of problem.  So I’ve called in Robert Metzger who’s on the PMC for Flink and  I’m going to get him to explain what Flink

does and how it does it. And in this conversation we managed to go  all the way from why does Flink exist and how did he get involved to how  sophisticated can you make the data processing part, what  options do you have, what languages can you use and how do you  recover when the process crashes, what’s its crash recovery and  scalability story and lots more. So let’s talk about Flink,  let’s ship some data around. I’m your host Kris Jenkins, this is

Developer Voices and today’s voice is Robert Metzger.  Robert Metzger, thanks for joining us, how are you doing?  Good thank you, I’m really excited to be here on the podcast to  talk about Flink today. Yeah, I’m very excited to learn more  about Flink because there’s a lot of buzz about

it but I’ve never really sat down with an expert and really understood what’s going  on so you can fill me in from scratch. Yeah, let’s do it.  So let’s start with your credentials, why are you a Flink  expert, how did you get into it and what do you do with it?  I try to keep the story as short as possible.  So when I was studying in the Technical University in Berlin in  like 2012, 2013 or so, I was working with a bunch of PhD students  there that were building a big data system called

Stradosphere that was about competing with Apache Hadoop.  Basically they had like a database background and thought  that there’s so many findings in like 30 years of database research  that have not really been considered with Apache  Hadoop and they wanted to take these learnings and add them  to a big data system. And then a few events happened so we open

sourced this research project, donated it to the Apache Software Foundation, we  started fundraising, I became a co-founder of a company  called Data Artisans, now it’s called Averika that set out to  commercialize Apache Flink as a technology.  Back when we started the company, we still intended to build a batch  processing framework, but it was actually an outside

contribution to add a stream processing API on top of the  engine because the engine, as I mentioned, is based on database  principles and databases use this concept called pipeline  execution where multiple operators are running at the  same time. And this sounds very much like stream  processing, like operators running all the time processing  data. It was actually quite easy to add an API  on top of this engine for stream processing primitives.

And of course then we added a lot more things to the engine like  checkpointing, event time and watermark support, the state backends  and so on and so forth to make it a, let’s  say, real stream processor. And so going back to this contribution,  sorry, was that we noticed that there was a lot more  interest in our stream processing capabilities compared to the batch  processing capabilities that Flink was offering.

And also to some extent, Apache Spark was rising very quickly at  that time as an alternative to Hadoop.  And I think they were, let’s say, winning the race in the batch processing space.  Okay. This is how I learned Flink and all of  that because I was basically working as a student  on the staff and started a company around it.  I helped building the open source community as part of Apache.

And we did a successful exit of the company to Alibaba.  They are offering Flink in their cloud product.  And like one and a half years ago, I switched to the codable to  work on making Flink more accessible as part of a stream processing  platform, basically, which is also based on  Flink. So your entire career has been in Flink.  Yes. Are you happy with that?

Yeah, but of course, like after, I don’t know, 10 years or so  almost in the same technology, you start to wonder if you also need to  look left and right. But of course, I mean, Flink is a really  big project, not as big as like the Linux kernel, but there are like subsystems in  Flink, completely different APIs and abstractions.  And you can go very deep in many areas. So there’s a lot of very different areas  in Flink to work on very different problems.

Yeah. And a large enough project becomes a  whole sub world, right? A little universe in itself.  Yeah. Yeah.  So there’s a lot of things we need to get into, but before we get  there, just for context, give me a typical example of how someone  uses Flink to solve some problem. So let’s say you have some relational  database like MySQL, and you’re noticing that your  full text search queries are taking too much time on that database.

So you want to offload these types of queries from your  transactional database into something that is more purpose-built,  something like Elasticsearch. So you would use DBSium for change data  capture to follow the changes on this transactional  database, then perform some joins in Flink to combine data  from different tables into let’s say one big document, and then you

ingest these documents into Elasticsearch.  So basically you’re using, and you’re not using DBSium as a standalone system.  You could do that and use Kafka in between, but in this  case, DBSium would run as part of a Flink connector.  So there’s connectors in Flink that are based on DBSium that  allow you to follow changes on a transactional database.  And then you can use, for example, Flink SQL to express this

join of multiple tables and filtering, whatever, aggregations,  and then you use a Elasticsearch sync in Flink  to write this data to Elasticsearch. And in effect, you have something like a  continuous real-time join from multiple different tables.  And whenever there’s an update, you basically instantly get  an update in Elasticsearch to always have your  data fresh and up-to-date. Okay.

So the main thing it’s doing for you is in combination with  DBSium, slurping that data out from one place, a certain amount of  processing to transform it as you load it into another  place. I think the core of  Flink is the processing. The core really is how Flink is able to  do this real-time join in this particular example.  There’s many other examples. You can also do machine  learning stuff with it. You can do aggregations.  But that’s the core of what Flink is

mostly about, real-time stream processing.  The connectors are more a necessity to get your data in and out of Flink.  Yeah. Yeah.  It’s not going to do much without those. Yeah.  But the essence of the project is… I mean, there are other stream processors  like Kafka Streams, for example, which are  only working with Kafka as a data source, but Flink is independent of data sources.  Okay. How many data sources  and sinks does it support? Roughly.

So I think the core project has maybe like eight-ish, and  then there’s probably like 20 or 30 that are in other repositories  or somehow available publicly. But all the important stuff is covered,  like Pulsar, Kinesis, Kafka. On the Sink side, you  can do any JDBC database. You can do writing to S3.  You can… I mean, any file system actually, you can  monitor for changes. You can write to these file systems in

various bucketing, like time-based bucketing, database  bucketing, whatever. So there’s a ton of options for getting  your data in and out. And to be honest, if there is a rare case  where you have a custom system, then it’s also not terribly hard to build your own  source or sink in Flink. Okay.  Does it ever get used for… Is it always transforming the data and  sending it to other places, or does it ever get used

just for straight reporting? I mean, you can use Flink…  So what do you mean by reporting? I guess what I mean is, could you have a  CSV or a PDF as your sink? A CSV for sure that’s  supported out of the box. So you can write CSV or more popular like  Parquet files, for example, or you can write

in the iceberg formats to S3 or something.  So you can definitely use Flink also for getting your data in  the shape that you want, like for loading your  data warehouse, for example. Okay.  I only asked that because I’m always thinking, what’s the  smallest possible change I could make to introduce this to a project?  Yeah. Okay.  In that case, I think we should dig down into how it actually works.  What do I need to understand about Flink’s architecture?

So Flink has a few basic building blocks that you use to express your workload.  So the core is a dataflow graph, like a set of operators, let’s  say a Kafka source and a CDC source, and then a bunch of  operators like transformations, filters, aggregations,

joins, and they can be arbitrarily complex.  Trust me, they can fill entire screens if you visualize them with  hundreds of operators, really.  I mean, hundreds is maybe an extreme case, but 30, 40  operators is really not unheard of.  And you basically use one of the APIs in Flink, like the Java or  Scala API or the SQL API or the Python API to

build this dataflow graph. So basically to define the structure of  your Flink job and to define either the logic of  predefined operators, so Flink has, for example, windowing  operators that allow you to window data into hourly buckets or something, or  session windows where you want to analyze like basically dynamic windows that just  have the size of a certain user activity. And so you define the structure and then

you also define the logic of what is happening  inside these operators. And then the engine knows how to execute  this graph efficiently on a cluster of machines.  So if your workload is increasing, you can change the number of  instances, the parallelism of your job in Flink, and then you will  execute this on more machines. Okay.

I’m just trying to think how that’s going to look.  So what do I do if I’m using SQL? Am I doing something like create thing  that looks like table, but is actually a slurping  from my SQL? Pretty much, yes.  So then I can go into that in my from clause later.  Exactly. So basically you’re defining a stream as  like a table in Flink SQL. So when you want to read from a number of  Kafka brokers or whatever, like Kafka clusters,

you do a create table statement for each of these data sources.  So Flink SQL supports all the data, all the connectors that I’ve mentioned.  So you can create, they do like whatever. If you have a Kafka topic called users,  you do basically create table users, and then  you define the fields in your Kafka topic.  Let’s say the data is in JSON, so you define like a JSON

de-serializer, and then Flink knows how to read the JSON data from your  Kafka topic to analyze it. So you do like a create table to expose  your Kafka topic in Flink SQL. And then if you want to do, for example,  a filter, you just do select stuff from this  table, and then in the where clause, you can define your filter.  Or you read from multiple tables and you do a join.

Right. So I’m going to make you flinch again,  because I got to mention CSV again. Do I also do like create table as  output.csv and then insert into output table select stuff  from? Exactly.  Yes. It just looks like  regular SQL type operations. Yes.  Yes. So the underlying storage, the underlying  storage is just outside of Flink. Exactly.

Flink doesn’t come with any storage. So you define S3 as a storage or a local  file system or whatever. Yeah.  Okay. Okay.  So the next question that raises, you say it will automatically distribute the jobs  across nodes. How is that working?  So there’s a central compo. So when you’re building this data flow  representation in your, let’s say Java API or in SQL, you’re

typically doing this on some kind of client instance.  So either it’s the SQL shell that you’re using, or it’s a Java application that  you’re implementing. And the Java application knows how to  connect to what is called the job manager process  of Flink. Job manager is the central component that  is coordinating the execution of a Flink job  or actually also multiple Flink jobs. So Flink supports both single or multiple

jobs on a job manager. And the job manager is creating a  distributed representation of this data flow graph and  is basically splitting out. So if you have Kafka sources, some join  operator and a CSV sync, then it will create multiple  instances of these operators. If you’re running this on 10 machines,

then it will create 10 Kafka sources, 10 join operators  and 10 CSV syncs. And the work will then get distributed  from the job manager to the so-called task manager.  So task managers are the workers that are running on all the cluster nodes.  And as you add or remove task managers, you can scale up or  scale down the resources that are available for processing.  Can you do that dynamically? Can you just throw more machines at it?

Yeah, that’s a feature that we’ve added in Flink, I don’t know,  like 113 or 14, I don’t know, maybe earlier.  So it is a feature that has been added to Flink, let’s say, recently.  For a given value of recent. Two years ago or so.  That allows you, it’s called the reactive mode in Flink.  And it basically allows you to add or remove machines and Flink will  dynamically grow and shrink.

And that’s pretty neat if you’re using something like Kubernetes  horizontal pod auto scalars. So Kubernetes will monitor CPU usage.  And if it goes too high, it will just add more machines.  You can do the same with like EC2 auto scaling groups or something.  Okay, that makes sense. And is there, I mean, I’m just trying to  figure out this SQL interface. Is this something like the  notion of a temporary table? Can I join two things, stick them into

one logical table while I think about how I want  to select that data out? So you don’t have to worry too much about  the representation, like how Flink is representing  a table or a join in your SQL statement. So Flink will, of course, internally, so  it really depends on what you’re doing. So if you do a, let’s go back to the  example that we had earlier, some user table.

When you do like a select star, and let’s say you’re reading from  Kafka, you’re filtering and you’re writing to Kafka.  Then it’s really data is just flowing through the system.  It’s never materialized in the system. Just passing through and we are filtering  out some, like if you do like insert into target, select star from users where X,

smaller 10 or whatever, then it will filter out all  the elements based on that predicate. But of course, if you do something like a  join and your input, so let’s say you have  this user table and you want to enrich some information in this  user table from a database table.  So you have this stream of user data and you have a MySQL

database that contains, let’s say the address of each user.  You could do something like a database query for every user,  like a query on MySQL for every user that comes.  But of course you would bombard your MySQL server with queries  for every incoming record in the stream processor.  The stream processor would actually be slowed down to the  speed of your MySQL server. And that’s not what you want for a

massively parallel system like Flink. So what you do instead is that you do  change data capture on this MySQL table. So you do an initial snapshot of all the  data in the MySQL table, load it into Flink state,  and then you subscribe to further updates from that table.  So whenever a user address is changing, we are updating this data in Flink state.  And Flink can then do the lookup locally in its state backend.

So you don’t have to do any network access for this particular state lookup.  It will just be in the local ROCCP instance of the task manager.  Okay, yeah. So that’s a lookup join in Flink SQL.  And that’s where Flink SQL will decide that one side of the join,  the side with the address information from MySQL needs

to be stored in Flink state. But the user data that is  flowing will not be persisted. But there are cases where you’re joining  two tables and you’re materializing both sides  of the join. And whenever any of the two inputs are  updating, you produce an output event that will send  downstream and then the sink has to decide what to do with it.  Okay, that leads naturally to two questions.  One’s about time and windowing, but first you’re saying that like

for the address thing, you’re maintaining what is essentially a  local read-only replica of MySQL’s address table.  Yes. How is that state  management working in Flink? Because you said Flink  doesn’t do any storage. So what’s going on?  It is okay. I was lying to you.  Let’s just say it was for the sake of making it easy to understand.  We can separate between logical and physical storage.  Sure. So Flink, the project itself doesn’t

provide any permanent storage like a MySQL server  to persist your data in your database tables.  But Flink needs a lot of temporary storage and that’s what  we generally call state. So state can be as small and simple as  the current Kafka offset that you’re reading from  and it can be as big as your entire address table from MySQL.

We have customers that are using Flink with like 250 gigs of  state in their joints and even that is small.  Like if you talk to Apple or Netflix, which are using Flink in  production, they’re really talking about terabytes of state that  they’re maintaining across their task managers.  Okay. So a lot of things that we’ve implemented

in Flink are about the state management. Because I think that’s also why I think  we say on the website, Flink is doing stateful  stream processing. So this ability to maintain state inside  your operators is what makes Flink really interesting.  Because if you just, without state, Flink would just be a system that allows you to  move data from left to right, but it cannot really maintain

or build up any knowledge about the outside world.  So with Flink you can basically, you have memory.  You know what happened before and you can make some decisions  based on what happened before.  Like in the case of this lookup join, you know what is in the  MySQL table and you know that Flink is always making sure that  you’re not losing any data. So even if you kill one of those task  managers or multiple of these task managers, Flink

is able to recover the state and restore it for you and you will never notice that  there was any loss of state temporarily because the task manager has died.  Okay. This presumably is also the same  mechanism you use to calculate aggregates that are  live updating, right? Exactly.  So you can store aggregates in state. Yes.  Yeah. Let’s probably talk a litt…