Профиль
Инженерные заметки

Тонкости Kafka Connect и Debezium

Практические наблюдения по работе Kafka Connect и Debezium с PostgreSQL: от настройки репликации до мониторинга и бэкфиллинга.

Kafka Connect Debezium PostgreSQL CDC

Kafka Connect

Kafka Connect — open-source инструмент, который позволяет переносить данные между Kafka1 и другими системами в формате real-streaming2.

В эпоху BigData real-streaming стал очень востребованным для аналитики, благодаря чему Kafka и Kafka Connect появились и стали набирать популярность. Сейчас крайне редко можно увидеть вакансию на специалиста, работающего с данными, где в требуемых технологиях не будет Kafka.

Kafka Connect состоит из двух видов коннекторов:

  1. Source Connector — коннектор, который пишет в Kafka из внешнего источника данных.
  2. Sink Connector — коннектор, который пишет во внешний источник данных из Kafka.
Схема Kafka Connect
Упрощенная схема потоков в Kafka Connect.

Debezium

CDC на практике
Debezium на практике

Debezium (в разговоре “дебез”) — коннекторы, позволяющие читать изменения данных (CDC)3 в режиме real-stream. Как правило, это источники под протоколом JDBC4.

PostgreSQL

Ниже пункты будут касаться работы СУБД PostgreSQL с Debezium. Решения были выявлены опытным путем и необязательно являются стопроцентными.

Настройка источника перед репликацией

Перед началом репликации из PostgreSQL нужно выполнить следующие действия:

  • max_slot_wal_keep_size задаёт лимит WAL, который держится для репликационных слотов (в МБ). Если лимит превышен, слот могут выключить, а старые WAL удалятся. Значение -1 — без лимита, и при отсутствии читателя WAL может разрастись до заполнения диска. Отдельно проверь max_wal_size: это порог для чекпойнта, а не жёсткий лимит WAL. Он не остановит рост WAL при сильном отставании слота, но влияет на частоту чекпойнтов и переработку сегментов.
  • Создать публикацию и добавить туда необходимые таблицы.
CREATE PUBLICATION <publication_name> FOR TABLE
<table1>,
<table2>,
…
<tableN>;
  • (опционально) Настроить Replica Identity — оно отвечает за то, какие события отправлять и насколько глубоко.

Replica Identity:

DEFAULT — отправляет create, update, delete, но в UPDATE только новое значение (без old state).

FULL — как DEFAULT, но включает предыдущее состояние (старые значения строк).

INDEX — использует уникальный индекс, указанный для репликации (минимальный набор ключей).

NOTHING — в UPDATE/DELETE не передает ключи строки (обычно не подходит для CDC).

ALTER TABLE <table> REPLICA IDENTITY FULL;
  • Создать heartbeat таблицу.
CREATE TABLE IF NOT EXISTS debezium.heartbeat(
  id INT,
  last_update TIMESTAMPTZ DEFAULT current_timestamp,
  CHECK (id=1), PRIMARY KEY (id)
  );

В этом случае всегда одна запись, у которой будет меняться поле last_update.

  • Создать сигнальную таблицу.
CREATE TABLE IF NOT EXISTS debezium.signal(
  id VARCHAR(42) PRIMARY KEY,
  type VARCHAR(32) NOT NULL,
  data VARCHAR(2048) NULL
  );

Потеря сетевой связности

Это проблема номер один: Debezium-коннектор чаще всего падает из-за этого. Чтобы это предотвратить, достаточно перезагрузить коннектор при условии, что сетевая связность восстановилась. По умолчанию коннектор не перезапускается после падения, чтобы он пытался перезапускаться, нужно настроить следующие параметры:

"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.max.retries": 15,
"errors.retry.delay.initial.ms": 1000,
"errors.retry.delay.max.ms": 10000,
"errors.retry.timeout": -1,
"errors.tolerance": "none",

TOAST столбец

Если вы работали с репликацией таблиц, в которых есть столбец с очень большим значением (чаще всего JSON), скорее всего встречались со значением debezium_unavailable_value.

Если коротко, то решается просто:

ALTER TABLE <schema>.<table> REPLICA IDENTITY FULL

Но в этом случае размер WAL увеличится в два раза, перед этим проверьте параметры max_slot_wal_keep_size и max_wal_size.

Если что, решение с этой статьи у меня не взлетело — привело к росту WAL и потере репликационного слота.

Гарантия корректной работы коннектора

Чтобы гарантировать корректную работу коннектора, статуса “Running” недостаточно, так как может быть так: статус зеленый, но репликация не работает. Гарантом здесь выступает heartbeat-топик, который N раз в минуту пишет в топик, то есть нужно проверять свежесть сообщений (можно использовать Grafana).

Параметры, которые отвечают за настройку heartbeat:

"heartbeat.action.query": "INSERT INTO debezium.heartbeat(id) VALUES(1) ON CONFLICT(id) DO UPDATE SET last_update=now();",
"heartbeat.interval.ms": 10000

В этом случае каждые 10 секунд в heartbeat-топик будет записываться сообщение, при отсутствии сообщений больше чем 5 минут — можно бить тревогу.

Бэкфиллинг

Часто бывает, что появилась возможность перезагрузить таблицу или какой-то интервал. Самый легкий и быстрый способ — использовать сигналы. Чтобы использовать их, нужно создать таблицу (DDL выше). И указать следующие параметры:

{
  "signal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username='secret' password='secret';",
  "signal.consumer.sasl.mechanism": "SCRAM-SHA-512",
  "signal.consumer.security.protocol": "SASL_SSL",
  "signal.consumer.ssl.truststore.location": "path_to_certificate",
  "signal.consumer.ssl.truststore.type": "PEM",
  "signal.data.collection": "debezium.signal",
  "signal.enabled.channels": "source,kafka",
  "signal.kafka.bootstrap.servers": "server1:9090,server2:9090,server3:9090",
  "signal.kafka.topic": "<service_name>.signal_topic"
}

После этого в сигнальный топик <service_name>.signal_topic отправить сообщение:

{
  "type": "execute-snapshot",
  "data": {
    "data-collections": [
      "public.table1",
      "public.table2"
    ],
    "type": "INCREMENTAL",
    "additional-conditions": [
      {
        "data-collection": "public.table1",
        "filter": "updated_at BETWEEN '2026-03-20 16:00:00' AND '2026-03-20 18:00:00'"
      }
    ]
  }
}

БОНУС: CLI для управления коннекторами

В качестве крутой находки хочу поделиться инструментом, с которым можно легко управлять коннекторами Kafka Connect — kcctl.

kcctl CLI
kcctl позволяет управлять конфигами и статусами коннекторов быстрее, чем через REST API.

В kcctl можно менять конфиги сразу в нескольких коннекторах с помощью регулярных выражений в названии коннектора, смотреть статусы коннекторов, создавать коннекторы. Мне кажется, это удобнее, чем использовать REST API или UI (если это касается каких-то манипуляций, а не просмотра).

Сноски

  1. Kafka — open-source распределенная платформа, предназначенная для обмена сообщениями в реальном времени.
  2. real-streaming — потоковая передача данных в реальном времени.
  3. CDC — Change Data Capture — захват изменений данных, в каждом источнике реализован по-разному, например, PostgreSQL пишет изменения данных в WAL-лог. Основные плюсы CDC — чтение не напрямую из таблицы, что сильно снижает нагрузку на базу и повышает консистентность, так как в качестве изменения пишутся все операции CRUD (create, update, delete).
  4. JDBC — Java Database Connectivity — единый интерфейс для взаимодействия с источниками данных, такими как MySQL, PostgreSQL и т. д.