Setting Up Change Data Capture with Postgres, Debezium, and Kafka

Uchephilz
2 min readFeb 10, 2024

--

Change Data Capture (CDC) is a powerful technique used in database systems to capture and propagate changes made to data.
In this article I will guide you through the process of setting up CDC with Postgres, Debezium, and Kafka. This combination allows you to stream real-time data changes from your Postgres database to Kafka topics, enabling various downstream applications such as real-time analytics, data synchronization, and event-driven architectures.

You can find the Ktor consumer implementation code on GitHub: https://github.com/UchePhilz/cdc-consumer

Requirement:

  • Postgres
  • Docker
  • Rest Client (CURL / Postman)

Configuration

Postgres

Edit the Postgres Configuration File:

  • Locate the Postgres configuration file and edit it to set wal_level property to logical.
wal_level = logical
  • Restart Postgres service after making the changes.

ℹ️ TIP: To find the location of the configuration file, run the following SQL command:

SHOW config_file;

2). Set the WAL configuration on the tables you want to capture data:
Execute the following SQL command to set the WAL configuration on the tables you want to capture data:

ALTER TABLE table_name REPLICA IDENTITY full;

ℹ️ TIP: The option value could be: Default, Nothing, full, index
ℹ️ TIP: View the table’s current wal config for a specific table

SELECT CASE relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class
WHERE oid = 'table_name'::regclass;

ℹ️ TIP: postgres user should have replication, create, and select privilege

Docker

Run the following Docker Compose file to set up Zookeeper, Kafka, and Debezium:

version: "3.7"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on:
- zookeeper

ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_JMX_PORT: 9991

debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on: [kafka]
ports:
- 8083:8083

Access Connectors

With Docker up and running, you can access the Debezium connector.

Get available connector

curl --loccation 'localhost:8083/connectors'

Delete available connector

curl --location --request DELETE 'localhost:8083/connectors/{connection-name}'

Create a new connector

curl --location 'localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "connector-name",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "host.docker.internal",
"database.port": "postgres port",
"database.user": "postgres user",
"database.password": "postgres password",
"database.dbname": "database",
"plugin.name": "pgoutput",
"database.server.name": "source",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"table.include.list": "public.tablename1,public.tablename2,",
"slot.name" : "a_uniquie_slot_name"
}
}'

--

--

Uchephilz
Uchephilz

Written by Uchephilz

In all you say, let wisdom be found.

No responses yet