Export Kafka Events to a HTTP Endpoint
Goal
Export events from Kafka Topics to HTTP REST endpoints using HTTP Sink connector.
Key components involved
- Kafka - Hosted on Confluent Cloud
- Kafka Connect Cluster - To host connector(s)
- kafka-connect-http connector (HTTP Sink) - To export events from topics to HTTP REST Endpoint (HTTP Sink)
- RequestBin- Mock REST endpoint to receive Kafka messages
Create Kafka topic
Create Confluent Cloud account
You can signup to a free trial account that lasts for three months, which is more than sufficient for this exercise.
Note You can also use a local kafka instance for this exercise. I often use this docker-compose example to get started quickly.
Grab API keys
Grab the credentials by navigating to Cluster Settings -> API Keys -> Add Key -> Copy API Key & Secret.
Note Spin up Schema Registry and grab the API credentials. (This can be done through cloud console)
In summary, you need to grab the following settings/configs from Confluent Cloud:
Bootstrap host
Cloud API Key & Secret
Schema Registry Host
Schema Registry API Key & Secret
Create 'payments' topic
For this exercise I am going to use the Cloud UI to create a topic with just 1 partition and default settings.
Create a REST Destination endpoint
We need a mock HTTP endpoint to receive the events from Kafka topics.
RequestBin is a fanstastic tool that lets you capture REST requests.
Just click 'Create RequestBin', It will auto-generate a HTTP URL. Eg: https://enwc009xfid4f.x.pipedream.net
This URL needs to be configured in HTTP Sink connector, using 'http.api.url' property.
Spin up Kafka Connect cluster
I am going to spin up Kafka Connect to host the connectors. (Otherwise called as 'Kafka Connect Cluster' - It is basically a specially designed runtime to host Kafka connectors)
Create a docker-compose.yml file
Create a new folder with the following docker-compose.yml file:
---
version: '3'
services:
kafka-connect-01:
image: confluentinc/cp-kafka-connect:5.4.0
container_name: kafka-connect-01
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
##
CONNECT_BOOTSTRAP_SERVERS: "<<Kafka/Confluent Cloud bootstrap host goes here:9092>>"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
##
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "<< Schema registry URL goes here>>"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
##
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "<<Schema registry API key>>:<<Schema registry API secret>>"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "<< Schema registry URL goes here>>"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "<<Schema registry API key>>:<<Schema registry API secret>>"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
##
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Confluent Cloud API Key>>\" password=\"<<Confluent Cloud API Secret>>\";"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
##
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Confluent Cloud API Key>>\" password=\"<<Confluent Cloud API Secret>>\";"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
##
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Confluent Cloud API Key>>\" password=\"<<Confluent Cloud API Secret>>\";"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
# External secrets config
# See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
CONNECT_CONFIG_PROVIDERS: 'file'
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-http:1.0.8
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
Note Not all the connectors are pre-installed in Kafka connect.
In the above example 'confluent-hub install --no-prompt confluentinc/kafka-connect-http:1.0.8' installs the HTTP Sink connector within Kafka connect.
Also do not forget to update the docker-compose file with Confluent Cloud and Schema registry API credentials.
Bring up Kafka Connect
Run:
docker-compose up
Note You should see some of these topics created in the cloud web console: _kafka-connect-group-01-v04-configs,_kafka-connect-group-01-v04-offsets,_kafka-connect-group-01-v04-status
It is a sign that Kafka Connect successfully got connected to Confluent Cloud.
Deploy the connector
Kafka connect exposes as Admin REST interface to deploy/manage the connectors. POST the following JSON payload to http://localhost:8083/connectors/ (You can either use postman or curl)
Note More details on Connect REST Interface
{
"name": "HttpSink",
"config": {
"topics": "payments",
"tasks.max": "1",
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"http.api.url": "https://enwc009xfid4f.x.pipedream.net",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"confluent.topic.bootstrap.servers": "<<CLOUD BOOSTRAP HOST>>:9092",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Cloud API KEY>>\" password=\"<<Cloud API SECRET>>\";",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.replication.factor": "3",
"reporter.bootstrap.servers": "<<CLOUD BOOSTRAP HOST>>:9092",
"reporter.admin.bootstrap.servers": "<<CLOUD BOOSTRAP HOST>>:9092",
"reporter.admin.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Cloud API KEY>>\" password=\"<<Cloud API SECRET>>\";",
"reporter.admin.security.protocol": "SASL_SSL",
"reporter.admin.sasl.mechanism": "PLAIN",
"reporter.producer.bootstrap.servers": "<<CLOUD BOOSTRAP HOST>>:9092",
"reporter.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Cloud API KEY>>\" password=\"<<Cloud API SECRET>>\";",
"reporter.producer.security.protocol": "SASL_SSL",
"reporter.producer.sasl.mechanism": "PLAIN",
"name": "HttpSink"
},
"tasks": [
{
"connector": "HttpSink",
"task": 0
}
],
"type": "sink"
}
You will get a HTTP 201 success response. This means the connector is now successfully deployed.
Note You can hit http://localhost:8083/connectors/ in your browser to list the deployed connectors, it should return '[HttpSink]'
Test
Publish a message to 'payments' topic
You can directly publish a message using Confluent Cloud UI, just visit the message tab and add a message.
Note Confluent CLI is another powerful option to produce/consume messages:
Verify in RequestBin
Visit requestbin to verify the message. Eg:
Some useful references:
docker-compose reference for Cloud