Tag Archives: afka Connect to Capture Data from a Relational Database

Using Kafka Connect to Capture Data from a Relational Database (sqlite3)

Use any Kafka docker images to install and start kafka.

reference:

https://docs.confluent.io/current/connect/userguide.html
https://github.com/bitnami/bitnami-docker-kafka
https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html

JDBC driver download for SQLlite3:
https://bitbucket.org/xerial/sqlite-jdbc/downloads/

  • Start Kafka.
confluent start
  • Install SQLite3.
apt-get update
apt-get install sqlite3
  • Create a New Database and Populate It with a Table and Some Data
    Create a new database called “test.db”.
root@shanoj_srv1:/# sqlite3 test.db
  • Create a new table in the SQLite database called “accounts”.
CREATE TABLE accounts (id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, 
name VARCHAR (255));
  • Insert values into the table to begin populating it.
INSERT INTO accounts(name) VALUES('sabu');
INSERT INTO accounts(name) VALUES('ronnie');
.quit
  • Stop Kafka Connect.
confluent stop connect
  • Make necessary changes to below files:
root@shanoj_srv1:/# vi /etc/schema-registry/connect-avro-standalone.properties
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
root@shanoj_srv1:/# vi etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:test.db
mode=incrementing
incrementing.column.name=id
  • Start Kafka Connect in standalone mode.
root@shanoj_srv1:/#connect-standalone -daemon /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
  • Verify that the connector was created.
root@shanoj_srv1:/# cat /logs/connectStandalone.out | grep -i "finished"
[2019-08-15 15:45:49,421] INFO Finished creating connector test-source-sqlite-jdbc-autoincrement (org.apache.kafka.connect.runtime.Worker:225)
[2019-08-15 15:45:49,504] INFO Source task WorkerSourceTask{id=test-source-sqlite-jdbc-autoincrement-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
[2019-08-15 15:46:49,484] INFO Finished WorkerSourceTask{id=test-source-sqlite-jdbc-autoincrement-0} commitOffsets successfully in 6 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:373)
root@shanoj_srv1:/# curl -s localhost:8083/connectors
  • Examine the Kafka topic created.
root@shanoj_srv1:/# kafka-topics --list --zookeeper localhost:2181 | grep test-sqlite-jdbc
test-sqlite-jdbc-accounts

Start a Kafka Consumer and Write New Data to the Database

  • Open a Kafka consumer.
root@shanoj_srv1:/# kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-sqlite-jdbc-accounts --from-beginning

Open a new tab to a new terminal session.
Open a new shell in this session.

root@shanoj_srv1:/# sudo docker exec -it sqlite-test //bin//bash
  •  Transfer to the tmp directory.
root@shanoj_srv1:/# cd /tmp
  • Access the SQLite database test.db.
root@shanoj_srv1:/# sqlite3 test.db
  • Insert a new value into the accounts table.
root@shanoj_srv1:/tmp# sqlite3 test.db
SQLite version 3.8.7.1 2014-10-29 13:59:56
Enter ".help" for usage hints.
sqlite> INSERT INTO accounts(name) VALUES('rama');
sqlite> INSERT INTO accounts(name) VALUES('lev');
sqlite> INSERT INTO accounts(name) VALUES('sriram');
sqlite> INSERT INTO accounts(name) VALUES('joby');
sqlite> INSERT INTO accounts(name) VALUES('shanoj');
sqlite>
  • Return to the previous session with the consumer and verify the data has been written.
root@ip-10-0-1-100:/# kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-sqlite-jdbc-accounts --from-beginning
{"id":3,"name":{"string":"rama"}}
{"id":4,"name":{"string":"lev"}}
{"id":5,"name":{"string":"sriram"}}
{"id":6,"name":{"string":"joby"}}
{"id":7,"name":{"string":"shanoj"}}