A low latency, multi-tenant Change Data Capture(CDC) pipeline to continuously replicate data from OLTP(MySQL) to OLAP(NoSQL) systems with no impact to the source.
- Capture changes from many Data Sources and types.
- Feed data to many client types (real-time, slow/catch-up, full bootstrap).
- Multi-tenant: can contain data from many different databases, support multiple consumers.
- Non-intrusive architecture for change capture.
- Both batch and near real time delivery.
- Isolate fast consumers from slow consumers.
- Isolate sources from consumers
- Schema changes
- Physical layout changes
- Speed mismatch
- Change filtering
- Filtering of database changes at the database level, schema level, table level, and row/column level.
- Buffer change records in Kafka for flexible consumption from an arbitrary time point in the change stream including full bootstrap capability of the entire data.
- Guaranteed in-commit-order and at-least-once delivery with high availability (
at least once
vs.exactly once
) - Resilience and Recoverability
- Schema-awareness
Install source MySQL database and configure it with row based replication as per instructions.
Follow the instructions
cd oltp-cdc-olap/maxwell
curl -L -0 https://github.com/zendesk/maxwell/releases/download/v0.13.1/maxwell-0.13.1.tar.gz | tar --strip-components=1 -zx -C .
cd maxwell
-
Run with stdout producer
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' --producer=stdout
-
Run with kafka producer
bin/maxwell
If all goes well you'll see maxwell replaying your inserts:
mysql> CREATE TABLE test.guests (
id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
firstname VARCHAR(30) NOT NULL,
lastname VARCHAR(30) NOT NULL,
email VARCHAR(50),
reg_date TIMESTAMP
)
mysql> INSERT INTO test.guests SET firstname='sumo', lastname='demo';
Query OK, 1 row affected (0.04 sec)
(maxwell)
{"database":"test","table":"guests","type":"insert","ts":1446422524,"xid":1800,"commit":true,"data":{"reg_date":"2015-11-02 00:02:04","firstname":"sumo","id":1,"lastname":"demo"}}
This flow depends on nifi-scripting module, download nar and copy to $NIFI_HOME/lib
ExecuteJavaScript's JSON transformation logic:
// logical change record (LCR)
var lcr = util.flowFileToString(flowFile, session);
lcr = JSON.parse(lcr);
var attMap = new java.util.HashMap();
attMap.put('commit', lcr.commit.toString());
attMap.put('database', lcr.database);
attMap.put('table', lcr.table);
attMap.put('ts', lcr.ts.toString());
attMap.put('id', lcr.data.id.toString());
attMap.put('type', lcr.type);
attMap.put('xid', lcr.xid.toString());
session.remove(flowFile);
flowFile = util.stringToFlowFile(JSON.stringify(lcr.data) , session, flowFile);
flowFile = session.putAllAttributes(flowFile, attMap);