Kafka Connect: Deep dive into Kafka Connectors

kafka connect
Difficulty

Apache Kafka Connect is a framework for streaming data between Apache Kafka and other data systems. Unlike writing custom Producer or Consumer code, Connect operates as a scalable, fault-tolerant service that manages the tedious details of integration, such as parallelism, offset management, and error handling. It allows developers to focus on the data transformation rather than the mechanics of connectivity.

Kafka Connect runs on a cluster of workers (standalone or distributed mode) and leverages Connectors. Pre-built plugins that define where data comes from (Source) or where it goes (Sink). This simplicity makes it the cornerstone of event-driven architectures requiring robust ETL/ELT capabilities.

You can find all the connectors you need at the following link:
https://www.confluent.io/hub/
Below is a detailed description of the most useful and convenient ones, along with the package to include.


Kafka Connect – FileStream Connector – File to Kafka to file

While rarely used in production for large-scale data transfer, the FileStream Connector is invaluable for testing, quick proof-of-concepts, and basic logging pipelines due to its simplicity.

FileStreamSource: Reading Local Data

The Source connector reads lines of data from a local file and publishes each line as a message to a Kafka topic. This is a great way to simulate real-time log ingestion.

Practical Configuration (file-source.properties):

name=file-source-connector
connector.class=FileStreamSource
tasks.max=1
file=/var/log/app.log # Path to the file to monitor
topic=raw-application-logs
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

FileStreamSink: Verification and Quick Output

The Sink connector takes messages from a Kafka topic and appends them to a file. This is crucial for verifying the end-to-end data flow in development before introducing a complex database or cloud target.

Practical Configuration (file-sink.properties):

name=file-sink-connector
connector.class=FileStreamSink
tasks.max=1
file=/tmp/final-output.txt # Path to the output file
topics=transformed-data
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter


Kafka Connect – JDBC Connector – Bridging relational databases

The JDBC Connector is arguably the most essential connector, providing robust integration with traditional relational databases like PostgreSQL, MySQL, SQL Server, and Oracle. It enables moving data into Kafka from tables and writing data from Kafka back into tables.

JDBCSource: Database Ingestion Strategies

The JDBC Source connector supports several modes for querying database tables and detecting changes.

  • mode=bulk: Loads the entire table once or periodically. Simple but inefficient for large tables.
  • mode=timestamp: Uses a timestamp column (e.g., updated_at) to find newly added or modified rows since the last execution. Recommended for incremental updates.
  • mode=incrementing: Uses an auto-incrementing column (e.g., primary key ID) to find new rows.

Practical Configuration (PostgreSQL Source):

name=postgres-source-users
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=3
connection.url=jdbc:postgresql://dbhost:5432/appdb
connection.user=kafka_user
connection.password=secure_password
topic.prefix=db-source-
mode=timestamp+incrementing
timestamp.column.name=updated_at
incrementing.column.name=id
table.whitelist=users, orders

Setting tasks.max=3 enables parallelism. Connect automatically assigns table partitions across the three tasks for faster ingestion.

JDBCSink: Writing to Databases

The JDBC Sink connector reads records from Kafka and inserts or updates rows in the target database.

Practical Configuration (MySQL Sink):

name=mysql-sink-metrics
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=5
connection.url=jdbc:mysql://mysqlhost:3306/analytics
connection.user=metrics_writer
topics=processed-metrics
auto.create=true # Automatically create tables if they don't exist (use with caution!)
insert.mode=upsert # Use UPSERT to handle updates to existing rows
pk.mode=record_value # Primary key is derived from the message value
pk.fields=user_id # The fields in the message value that constitute the primary key

JDBC Error Handling (Dead Letter Queue – DLQ): When a Sink connector attempts to write a malformed record (e.g., missing a required field) to a database, the default behavior is often to fail and halt the entire task. To prevent this, implement a Dead Letter Queue (DLQ).

# Add these lines to the Sink Connector configuration
errors.tolerance=all # Do not halt the task on errors
errors.deadletterqueue.topic.name=mysql-sink-error-dlq
errors.deadletterqueue.topic.replication.factor=3

This ensures that bad records are written to a designated error topic (mysql-sink-error-dlq) for separate analysis, allowing the healthy data pipeline to continue running.


Kafka Connect – S3 Connector – The cloud storage bridge

For data archiving, lake building, and disaster recovery, the Amazon S3 Connector is indispensable. It manages the complex process of grouping Kafka records, formatting them (e.g., Parquet, JSON, Avro), and uploading them to S3 buckets efficiently.

S3Sink: Archiving Data

The S3 Sink connector is used for archival purposes. It uses configuration properties to define how often to commit files based on time or size.

Practical Configuration (S3 Sink):

name=s3-archiver-connector
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=5
topics=user-events-topic
s3.region=eu-central-1
s3.bucket.name=my-kafka-data-lake
# Output format configuration
format.class=io.confluent.connect.s3.format.json.JsonFormat
# Time and Size batching strategy
flush.size=100000 # Commit file after 100,000 records
rotate.interval.ms=3600000 # Commit file every hour (1 hour)
storage.class=io.confluent.connect.s3.storage.S3Storage
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner

S3 Problem Solving (File Commit Issues): A common issue is when the S3 Sink connector fails to commit a file and restarts repeatedly. This often happens if the IAM role or AWS credentials lack specific permissions for the bucket (e.g., s3:PutObject, s3:ListBucket). Ensure your Connect worker configuration has the correct AWS credentials and that the role associated with the worker process has the necessary permissions. The failure logs will typically show an AWS security exception.


Kafka Connect – General troubleshooting

Regardless of the connector used, the Connect REST API is your primary tool for monitoring and troubleshooting.

1. Checking Connector Status:

# Check overall status of a connector and its tasks
curl -s -X GET http://connect-host:8083/connectors/postgres-source-users/status | jq .

If a task is showing FAILED, you can restart it individually.

2. Restarting a Failed Task:

# Restart a specific failed task (e.g., task ID 0)
curl -X POST http://connect-host:8083/connectors/postgres-source-users/tasks/0/restart

3. Configuration Validation Errors: When submitting a connector configuration, Connect validates the properties. Look for HTTP 400 Bad Request errors, which usually pinpoint a missing mandatory property (e.g., connection.url) or an incorrect class name.

By standardizing integrations using the scalable Kafka Connect framework and its robust connectors, you move away from brittle, custom code and build reliable, easily auditable data pipelines. Mastering the configuration and leveraging built-in features like DLQ ensures data flow continuity even when facing data errors.

That’s all.
Try it at home!

0
Be the first one to like this.
Please wait...

Leave a Reply

Thanks for choosing to leave a comment.
Please keep in mind that all comments are moderated according to our comment policy, and your email address will NOT be published.
Please do NOT use keywords in the name field. Let's have a personal and meaningful conversation.

BlogoBay
Privacy Overview

This website uses cookies so that we can provide you with the best user experience possible. Cookie information is stored in your browser and performs functions such as recognising you when you return to our website and helping our team to understand which sections of the website you find most interesting and useful.