Listen to Postgres Changes with Apache Kafka

Josip Vojak
Geek Culture
Published in
8 min readAug 25, 2021

--

In case you ever had a situation in which different applications and sources insert data in the same database and you want to take actions based on the stored, updated, or deleted data — this article may help you.

Consider a case from the image below:

Different applications using the same database

Different applications using the same database

The system is using microservices where the processing of data is broken up into smaller tasks. Each application enters its data into the database. You want to react to certain changes in the database — for example, when one of the records updates. You don’t want to further complicate life for yourself and the rest of the team. You want to avoid the situation that each application has its piece of code that sends a notification that something has been updated in the database.

Change Data Capture (CDC) comes into play — a software design pattern that is used to monitor changes in data and perform certain actions based on data change. This mostly includes read, update or delete operations. These actions mostly come from another system (or in this case — multiple services).

What are the advantages of such a method?
The original database (which we listen to) remains intact — we do not add any triggers or log tables. This helps a lot because such actions degrade the performance of the database, and ultimately can affect the entire system.

To implement the CDC software design pattern, we will need to add two items in our pipeline:

  • service that listens for changes over rows in the database
  • service that streams the data

For data streaming, Apache Kafka has proven to be an ideal combination for many major players in the industry — Pinterest, Airbnb, Cisco, Cloudflare, Goldman Sachs, LinkedIn, Mozilla Firefox, Oracle, Paypal, Spotify, Shopify, Tencent, Twitter, and many, many more.
Along with Apache Kafka, Debezium proved to be the case as one of the native CDC connectors for Postgres (and other databases). In order for Apache Kafka and Debezium to communicate, the Apache Connector is used — it is an out-of-box solution that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems.

What are Apache Kafka and Debezium?

Here’s a brief introduction to the technologies that will be used, their main functionalities, and their advantages.

Apache Kafka
In short, Apache Kafka is an open-source distributed event streaming platform. It is a distributed publish-subscribe messaging system that was created as a highly fast, scalable, and durable alternative to existing solutions. It is also reliable as it automatically balances consumers in the event of failure.

There is a total of 5 Kafka Core APIs, some of which sends the data to Kafka topics, while other read and consume that from Kafka topics:

Apache Kafka Producer, Connect Source, Streams, Consumer and Connect Sink API
  • Kafka Producer API: allows applications to send streams of data to topics in the Kafka cluster. (logs, IoT, streams).
  • Kafka Connect Source API: allows implementing connectors that continually pull from some source system or application into Kafka or push from Kafka into some sink system or application (REST API, CDC, MySQL, Postgres, MongoDB, Twitter, Slack).
  • Kafka Streams API / KSQL: allows transforming streams of data from input topics to output topics.
  • Kafka Consumer API: allows applications to read streams of data from topics in the Kafka cluster. (send an email, request, store to file)
  • Kafka Connect Sink API: allows applications to read a stream and store it into a target store (Kafka to S3, HDFS, PostgreSQL, MongoDB, Telegram)

Debezium

Debezium is built upon the Apache Kafka project and uses Kafka to transport the changes from one system to another. It utilizes the change data capture pattern. It ensures all data changes are captured, very low delay, no changes to the data model. It can capture deleted, transaction IDs and additional metadata. Some of the main features are:

  • Snapshots — although optional, you can take the database’s current state if a connector is started and not all logs still exist.
  • Filters — you can configure the set of captured schemas, tables, and columns with include / exclude list filters.
  • Masking — the values ​​from specific columns can be masked, for example, when they contain sensitive data.
  • Monitoring
  • Ready-to-use message transformations

Finally, messages published by Apache Kafka are consumed by an end Consumer or Sink — which can, in fact, be all sorts of things — depending on the use case.

If you want to stream your data into a well-known service, such as another database (psql, Maria DB, Mongo DB), Elastic Search, Hadoop, AWS S3 bucket, you will use out-of-box configurable connectors — sinks.

If you want to have a custom logic, you will use the consumer API.

Proof Of Concept (Example)

In this example, I will show the whole workflow using Postgres, Debezium, Apache Connect + Apache Kafka, and a NodeJS app that will act as a consumer.

Everything except the NodeJS app will be inside docker containers, so I created a single docker-compose.yml file that includes everything you need to know.

Architectural overview of the system:

Apache Kafka + Connect (Debezium) secure that all database changes will be reported to the Consumer Application
  1. Applications 1, 2 & 3 will each store its own data into the Postgres database. Since this is already something that is widely known, I won’t lose time with this, so we will do updates directly in psql (simple UPDATE).
  2. Once there is a data change in Postgres, Debezium will detect it and send changes to Kafka
  3. Kafka will publish the data into a specific topic (servername.schema.table)
  4. Previously subscribed NodeJS Consumer will receive the data published by Apache Kafka

I managed to put everything excluding the Consumer app inside a single docker-compose.yml, so you follow everything by simply copy/pasting the below configuration:

It consists of four services:

  1. zookeeper — Zookeeper keeps track of the status of the Kafka cluster nodes and it also keeps track of Kafka topics, partitions, etc. Zookeeper allows multiple clients to perform simultaneous reads and writes and acts as a shared configuration service within the system.
  2. kafka — Apache Kafka server
  3. postgres — Postgres database
  4. postgres-connector — Debezium connect instance used for Change Data Capture

Before running all services, you can see that we’re using a ${HOST_IP} environmental variable. If you want to learn more about the ${HOST_IP} usage, take a look at Kafka Connectivity.

Follow this step-by-step example on how to put everything to work together.

Run docker containers:

  • Open a terminal and create a new folder
  • Copy/paste docker-compose.yml into a file and save it
  • Set the HOST_IP address (environmental variable):

export HOST_IP=$(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{print $2}' | cut -f2 -d: |head -n1)

  • Run docker-compose up -d → This will create four containers and initialization may take a while (couple of seconds).
  • OPTIONAL: You can check the state of your docker containers by executing docker ps.
b1b37bf623db   debezium/example-postgres:1.3   "docker-entrypoint.s…"   About a minute ago   Up About a minute     0.0.0.0:5433->5432/tcp, :::5433->5432/tcp                                                                       postgres1b9a3be29c09   debezium/zookeeper:1.3          "/docker-entrypoint.…"   About a minute ago   Up 58 seconds         2888/tcp, 3888/tcp, 8778/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 9779/tcp                               zookeeper

Create postgres database:

  • Connect to your psql instance by executing: docker exec -it postgres bash
  • Login to your psql server: psql -U postgres
  • Enter postgres password (if you followed the example, the password is ‘postgres’)
  • Create a new table:

You can check if newly inserted records are stored:

postgres=# select * from customers;id | name
----+------
1 | john
2 | jack
3 | jane
(3 rows)
  • Exit the psql server and postgres container

Configure Kafka Connect (Debezium) to connect to the database:

  • Kafka Connect has a REST endpoint which we can use to see which connectors are enabled in the container (and to test the connection):

This will return an empty array [ ] if everything is good.

  • Create a configurational file for Kafka Connector (Debezium), so that it is able to connect to the database and listen for changes.
Debezium configurational file — pg-source-config.json

Copy this into a file and save the file under pg-source-config.json.

The following will start a connector that reads the customer table out of the source postgres database:

This should return a response with the same JSON object you sent.

Now you should have an up and running pipeline — each change in the data table ‘Customers’ will be streamed to Kafka and published to myserver.public.customers topic.

Why that name? Kafka will automatically create a topic based on the database.server.name (myserver) attribute from the pg-source-config.json, followed by schema and data table (public, customers — respectively).

Create Consumer application

We’re left with creating a Consumer app, that will consume the changes published by Apache Kafka. For that, I created a simple NodeJS app. You can do the same by following the next steps:

  • Create a new NodeJS folder and project with npm init
  • Install dependencies: npm i --save dotenv ip kafkajs
  • Create environmental file (.env):

TOPIC is the topic that our KafkaJS consumer app will be listening to.

  • Create kafka.js file inside NodeJS folder that serves the purpose of configuring the Kafka consumer.
  • Create index.jsfile inside NodeJS project that will connect the consumer to Kafka broker, subscribe to ‘myserver.public.consumer’ schema, and set up a handler for each received message:

Run the app by executing node index.js .

This should output a response into the console:

[
‘pg_connect_statuses’,
‘myserver.public.customers’,
‘__consumer_offsets’,
‘pg_connect_offsets’,
‘pg_connect_configs’
]
{“level”:”INFO”,”timestamp”:”2021–08–25T07:44:20.562Z”,”logger”:”kafkajs”,”message”:”[Consumer] Starting”,”groupId”:”nodejs-consumer”}
{“level”:”INFO”,”timestamp”:”2021–08–25T07:44:44.686Z”,”logger”:”kafkajs”,”message”:”[ConsumerGroup] Consumer has joined the group”,”groupId”:”nodejs-consumer”,”memberId”:”node-consumer-5e4e7ecc-2330–4038–9d28–1dc089c7af31",”leaderId”:”node-consumer-5e4e7ecc-2330–4038–9d28–1dc089c7af31",”isLeader”:true,”memberAssignment”:{“myserver.public.customers”:[0]},”groupProtocol”:”RoundRobinAssigner”,”duration”:24124}

At this moment, you should have the whole pipeline connected. When there’s a data change in the postgres database, Debezium will detect it and stream it to Kafka. Kafka will publish it to a specific topic, based on server name, schema and table.

Insert record in the database

Now, it’s time to test everything. We will insert a single record inside our previously created table customers. Connect to the psql database that’s inside postgres container. Run the following command:

Shortly, you should see the NodeJS app console log:

{
before: null,
after: { id: 4, name: 'josip' },
source: {
version: '1.3.1.Final',
connector: 'postgresql',
name: 'myserver',
ts_ms: 1629878089981,
snapshot: 'false',
db: 'postgres',
schema: 'public',
table: 'customers',
txId: 606,
lsn: 34250448,
xmin: null
},
op: 'c',
ts_ms: 1629878090452,
transaction: null
}

Final Words

Although a similar outcome can be created using simple database triggers, this solution offers much more:

  • you will not get into a situation where your database has degraded performance
  • the technologies used (debezium, kafka) can be easily adjusted via configuration files
  • containerisation allows you to easily, quickly, and efficiently run this pipeline on any server
  • Kafka connectors are ready-made products for which you do not need to write any code, but work out-of-the-box
  • working with technologies used by already established strong companies that have a large community behind them
  • technologies are fast, scalable, durable, and reliable
  • if necessary, you can easily change each component

Anyway, I think this solution, although a little more complex to set up at first, since it interacts with a low-level transactional log, is very effective.

--

--

Josip Vojak
Geek Culture

Software architect (AWS Certified) and a former professional volleyball player.