Export Kafka Events to a HTTP Endpoint

Export Kafka Events to a HTTP Endpoint

Gnanaguru
Gnanaguru

Goal

Export events from Kafka Topics to HTTP REST endpoints using HTTP Sink connector.

Key components involved

  • Kafka - Hosted on Confluent Cloud
  • Kafka Connect Cluster - To host connector(s)
  • kafka-connect-http connector (HTTP Sink) - To export events from topics to HTTP REST Endpoint (HTTP Sink)
  • RequestBin- Mock REST endpoint to receive Kafka messages

Create Kafka topic

Create Confluent Cloud account

You can signup to a free trial account that lasts for three months, which is more than sufficient for this exercise.

Note You can also use a local kafka instance for this exercise. I often use this docker-compose example to get started quickly.

Grab API keys

Grab the credentials by navigating to Cluster Settings -> API Keys -> Add Key -> Copy API Key & Secret.

Cluster Setttings

Note Spin up Schema Registry and grab the API credentials. (This can be done through cloud console)

In summary, you need to grab the following settings/configs from Confluent Cloud:

Bootstrap host

Cloud API Key & Secret

Schema Registry Host

Schema Registry API Key & Secret

Create 'payments' topic

For this exercise I am going to use the Cloud UI to create a topic with just 1 partition and default settings.

Create a REST Destination endpoint

We need a mock HTTP endpoint to receive the events from Kafka topics.

RequestBin is a fanstastic tool that lets you capture REST requests.

Just click 'Create RequestBin', It will auto-generate a HTTP URL. Eg: https://enwc009xfid4f.x.pipedream.net

RequestBin

This URL needs to be configured in HTTP Sink connector, using 'http.api.url' property.

Spin up Kafka Connect cluster

I am going to spin up Kafka Connect to host the connectors. (Otherwise called as 'Kafka Connect Cluster' - It is basically a specially designed runtime to host Kafka connectors)

Create a docker-compose.yml file

Create a new folder with the following docker-compose.yml file:

---
version: '3'
services:

  kafka-connect-01:
    image: confluentinc/cp-kafka-connect:5.4.0
    container_name: kafka-connect-01
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      ##
      CONNECT_BOOTSTRAP_SERVERS: "<<Kafka/Confluent Cloud bootstrap host goes here:9092>>"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01-v04
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      ##
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "<< Schema registry URL goes here>>"
      CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      ##
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "<<Schema registry API key>>:<<Schema registry API secret>>"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "<< Schema registry URL goes here>>"
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "<<Schema registry API key>>:<<Schema registry API secret>>"
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
      # Confluent Cloud config
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: "500"
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      ##
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Confluent Cloud API Key>>\" password=\"<<Confluent Cloud API Secret>>\";"
      #
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      ##
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Confluent Cloud API Key>>\" password=\"<<Confluent Cloud API Secret>>\";"
      CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
      #
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      ##
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Confluent Cloud API Key>>\" password=\"<<Confluent Cloud API Secret>>\";"
      CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
      # External secrets config
      # See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
      CONNECT_CONFIG_PROVIDERS: 'file'
      CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
    command: 
      - bash 
      - -c 
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt confluentinc/kafka-connect-http:1.0.8
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run & 
        #
        sleep infinity

Note Not all the connectors are pre-installed in Kafka connect.

In the above example 'confluent-hub install --no-prompt confluentinc/kafka-connect-http:1.0.8' installs the HTTP Sink connector within Kafka connect.

Also do not forget to update the docker-compose file with Confluent Cloud and Schema registry API credentials.

Bring up Kafka Connect

Run:

docker-compose up

Note You should see some of these topics created in the cloud web console: _kafka-connect-group-01-v04-configs,_kafka-connect-group-01-v04-offsets,_kafka-connect-group-01-v04-status

It is a sign that Kafka Connect successfully got connected to Confluent Cloud.

Deploy the connector

Kafka connect exposes as Admin REST interface to deploy/manage the connectors. POST the following JSON payload to http://localhost:8083/connectors/ (You can either use postman or curl)

Note More details on Connect REST Interface

{
    "name": "HttpSink",
    "config": {
        "topics": "payments",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "http.api.url": "https://enwc009xfid4f.x.pipedream.net", 
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "<<CLOUD BOOSTRAP HOST>>:9092",
        "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Cloud API KEY>>\" password=\"<<Cloud API SECRET>>\";",
        "confluent.topic.security.protocol": "SASL_SSL",
        "confluent.topic.sasl.mechanism": "PLAIN",
        "confluent.topic.replication.factor": "3",
        "reporter.bootstrap.servers": "<<CLOUD BOOSTRAP HOST>>:9092",
        "reporter.admin.bootstrap.servers": "<<CLOUD BOOSTRAP HOST>>:9092",
        "reporter.admin.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Cloud API KEY>>\" password=\"<<Cloud API SECRET>>\";",
        "reporter.admin.security.protocol": "SASL_SSL",
        "reporter.admin.sasl.mechanism": "PLAIN",
        "reporter.producer.bootstrap.servers": "<<CLOUD BOOSTRAP HOST>>:9092",
        "reporter.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<<Cloud API KEY>>\" password=\"<<Cloud API SECRET>>\";",
        "reporter.producer.security.protocol": "SASL_SSL",
        "reporter.producer.sasl.mechanism": "PLAIN",
        "name": "HttpSink"
    },
    "tasks": [
        {
            "connector": "HttpSink",
            "task": 0
        }
    ],
    "type": "sink"
}

Note Connector specific config reference

You will get a HTTP 201 success response. This means the connector is now successfully deployed.

Connector created

Note You can hit http://localhost:8083/connectors/ in your browser to list the deployed connectors, it should return '[HttpSink]'

Test

Publish a message to 'payments' topic

You can directly publish a message using Confluent Cloud UI, just visit the message tab and add a message.

Send a new message

Note Confluent CLI is another powerful option to produce/consume messages:

Verify in RequestBin

Visit requestbin to verify the message. Eg:

Verify events

Some useful references:

docker-compose reference for Cloud

Connect to Cloud config reference