Kafka Flink integration with Cassandra
Hello All Data Engineers in this blog we are going to learn how we can integrate Flink streaming with Kafka and push the data into Cassandra as data sink.We are going to use below tools and SDKs to perform the execution in Linux environment.
Components
- Apache Flink is a framework and distributed processing engine. it is used for stateful computations over unbounded and bounded data streams.
- Kafka is a scalable, high performance, low latency platform. It allows reading and writing streams of data like a messaging system.
- Cassandra: A distributed and wide-column NoSQL data store.
Basic Data Pipeline
Minimum SDK Requirements
To start the application, you will need Kafka, and Cassandra installed locally on your machine. The minimum requirements for the application:
Java 1.8+, scala 2.12.13, Flink 1.9.0 , sbt 1.6, Kafka 2.3.0 , Cassandra 3.10,Intellij Ide and Linux terminal.
Dependencies
libraryDependencies ++= Seq(
//kafka flink connector
"org.apache.flink" %% "flink-connector-kafka" % "1.9.0",
//flink scala streaming
"org.apache.flink" %% "flink-streaming-scala" % "1.9.0" ,
//Json parsing library
"org.json4s" %% "json4s-native" % "3.6.10",
//Cassandra
"org.apache.flink" %% "flink-connector-cassandra" % "1.9.0"
)
Connecting to Kafka and Reading Streams of Data
def kafkaStreamConsumer(environment: StreamExecutionEnvironment):DataStream[Car] ={
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "KafkaCassandra")
val kafkaConsumer = new FlinkKafkaConsumer[String]("carevents", new SimpleStringSchema(), properties)
environment.addSource(kafkaConsumer).flatMap(raw => JsonMethods.parse(raw).toSome)
.map(_.extract[Car])
}
Modelling Car Data
case class Car(
Name: String,
Cylinders: Option[Long],
Horsepower: Option[Long],
)
The above code snippet shows how to establish connection with local Kafka server and create kafkaconsumer by reading Json car data and parsing the data with jon4s Scala library to de-serialize the data using SimpleStringSchema.
By the use of the Flink streaming engine and reading the JSON data from the Kafka topic, we will get DataStream[Car] as a object streams. We can also apply some transformations to the Car DataStream. Then sink the resultant DataStream to the Cassandra Database.
Writing Flink Data to Cassandra DB
def sinkToCassandraDB(sinkCarStream:DataStream[Car]) :Unit ={
createTypeInformation[(String, Long, Long)]
val sinkCarDataStream = sinkCarStream.map(car => (car.Name, car.Cylinders,
car.Horsepower))
CassandraSink.addSink(sinkCarDataStream)
.setHost("127.0.0.1")
.setQuery("INSERT INTO example.car(name, cylinders, horsepower) values (?, ?, ?);")
.build
}
You can find the complete code here.
Install Cassandra on Ubuntu
- sudo apt update
- sudo apt install openjdk-8-jdk -y
- sudo apt install apt-transport-https
- sudo sh -c 'echo "deb http://www.apache.org/dist/cassandra/debian 40x main" > /etc/apt/sources.list.d/cassandra.list'
- wget -q -O - https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
- sudo apt update
- sudo apt install cassandra
- nodetool status
- sudo systemctl restart cassandra
- sudo cp /etc/cassandra/cassandra.yaml /etc/cassandra/cassandra.yaml.backup
- sudo vim /etc/cassandra/cassandra.yaml
- Update the cluster_name and seeds per your needs
- Finally csqlsh
In csqlsh prompt run the below commands to create the key space and table in order
- CREATE KEYSPACE example WITH REPLICATION = { class' : 'SimpleStrategy', replication_factor' : 1 };
- CREATE TABLE example.car("name" text primary key, "cylinder" text, "horsepower" text)
Install Kafka on Ubuntu
- sudo apt update
- sudo apt install default-jdk
- wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
- tar xzf kafka_2.13-3.2.0.tgz
- sudo mv kafka_2.13-3.2.0 ~/kafka
- Start the zookeeper
- ./zookeeper-server-start.sh ../config/zookeeper.properties
- Start the Kafka
- ./kafka-server-start.sh ../config/server.properties
- Create topic
- ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic carevents
- Start the Kafka producer and send some records
- ./kafka-console-producer.sh --topic carevents --bootstrap-server localhost:9092
- {"Name":"Toyota","Cylinders":"4","Horsepower":"96"}
- {"Name":"Chev","Cylinders":"4","Horsepower":"86"}
- {"Name":"Honda","Cylinders":"4","Horsepower":"90"}
- {"Name":"Hyndai","Cylinders":"4","Horsepower":"89"}
- {"Name":"Tata","Cylinders":"2","Horsepower":"65"}
Execute the Flink application
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val carStream = new KafkaService().kafkaStreamConsumer(environment)
val cassandraService = new CassandraService()
cassandraService.sinkToCassandraDB(carStream)
environment.execute("Flink Kafka to cassandra app")
}
- Method 1
- sbt clean compile
- sbt run
- Method 2
- Right click on Intellij and run StartJob object
- Method 3
- Submit jar to Flink job
- ./flink run kafka-flink-cassandra-sbt-assembly-fatjar-1.0.jar
Results
Go to the cassandra shell cqlsh and execute (select * from events.car;)
This is very basic example to setup and a simple data pipeline,feel free to update take this code and take it to next level.
Next..
- Writing the producer class to automatically produce the car event records to a Kafka topic
- Adding more car attributes to the model class
- Avro Support
- Reading the Cassandra data and writing to another Kafka topic.
Comments
Post a Comment