Use any Kafka docker images to install and start kafka.
reference:
https://docs.confluent.io/current/connect/userguide.html
https://github.com/bitnami/bitnami-docker-kafka
https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html
JDBC driver download for SQLlite3:
https://bitbucket.org/xerial/sqlite-jdbc/downloads/
- Start Kafka.
confluent start
- Install SQLite3.
apt-get update apt-get install sqlite3
- Create a New Database and Populate It with a Table and Some Data
Create a new database called “test.db”.
root@shanoj_srv1:/# sqlite3 test.db
- Create a new table in the SQLite database called “accounts”.
CREATE TABLE accounts (id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR (255));
- Insert values into the table to begin populating it.
INSERT INTO accounts(name) VALUES('sabu'); INSERT INTO accounts(name) VALUES('ronnie'); .quit
- Stop Kafka Connect.
confluent stop connect
- Make necessary changes to below files:
root@shanoj_srv1:/# vi /etc/schema-registry/connect-avro-standalone.properties bootstrap.servers=localhost:9092 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 # The internal converter used for offsets and config data is configurable and must be specified, internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Local storage file for offset data offset.storage.file.filename=/tmp/connect.offsets
root@shanoj_srv1:/# vi etc/kafka-connect-jdbc/source-quickstart-sqlite.properties # A simple example that copies all tables from a SQLite database. The first few settings are # required for all connectors: a name, the connector class to run, and the maximum number of # tasks to create: name=test-source-sqlite-jdbc-autoincrement connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 # The remaining configs are specific to the JDBC source connector. In this example, we connect to a # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g. # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'. connection.url=jdbc:sqlite:test.db mode=incrementing incrementing.column.name=id
- Start Kafka Connect in standalone mode.
root@shanoj_srv1:/#connect-standalone -daemon /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
- Verify that the connector was created.
root@shanoj_srv1:/# cat /logs/connectStandalone.out | grep -i "finished" [2019-08-15 15:45:49,421] INFO Finished creating connector test-source-sqlite-jdbc-autoincrement (org.apache.kafka.connect.runtime.Worker:225) [2019-08-15 15:45:49,504] INFO Source task WorkerSourceTask{id=test-source-sqlite-jdbc-autoincrement-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143) [2019-08-15 15:46:49,484] INFO Finished WorkerSourceTask{id=test-source-sqlite-jdbc-autoincrement-0} commitOffsets successfully in 6 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:373) root@shanoj_srv1:/# curl -s localhost:8083/connectors
- Examine the Kafka topic created.
root@shanoj_srv1:/# kafka-topics --list --zookeeper localhost:2181 | grep test-sqlite-jdbc test-sqlite-jdbc-accounts
Start a Kafka Consumer and Write New Data to the Database
- Open a Kafka consumer.
root@shanoj_srv1:/# kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-sqlite-jdbc-accounts --from-beginning
Open a new tab to a new terminal session.
Open a new shell in this session.
root@shanoj_srv1:/# sudo docker exec -it sqlite-test //bin//bash
- Transfer to the tmp directory.
root@shanoj_srv1:/# cd /tmp
- Access the SQLite database test.db.
root@shanoj_srv1:/# sqlite3 test.db
- Insert a new value into the accounts table.
root@shanoj_srv1:/tmp# sqlite3 test.db SQLite version 3.8.7.1 2014-10-29 13:59:56 Enter ".help" for usage hints. sqlite> INSERT INTO accounts(name) VALUES('rama'); sqlite> INSERT INTO accounts(name) VALUES('lev'); sqlite> INSERT INTO accounts(name) VALUES('sriram'); sqlite> INSERT INTO accounts(name) VALUES('joby'); sqlite> INSERT INTO accounts(name) VALUES('shanoj'); sqlite>
- Return to the previous session with the consumer and verify the data has been written.
root@ip-10-0-1-100:/# kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-sqlite-jdbc-accounts --from-beginning {"id":3,"name":{"string":"rama"}} {"id":4,"name":{"string":"lev"}} {"id":5,"name":{"string":"sriram"}} {"id":6,"name":{"string":"joby"}} {"id":7,"name":{"string":"shanoj"}}