Securing a Containerized Kafka Connect Cluster

A Journey to a Far Away Land

To build a data pipeline can be seen as embarking on an adventurous journey into an unknown, mysterious world, full of potential treasures but also dangers of wild animals and treacherous weather.

In Data Engineering at Enova, one such journey was to build a data pipeline from Postgres into Snowflake.  Postgres is a robust and performant relational database platform, and is our application database of choice.  Snowflake is a cloud data warehouse which is able to quickly aggregate and analyze enormous amounts of data for insightful analytics.  We needed the same data available to both platforms in order to meet all of our goals.

Let us suppose this problem appears simple to us.  After all, both Postgres and Snowflake have tables with columns, rows, and data types.  Why not simply directly copy the data?  There are at least two major problems with this approach:

  1. The language barrier: Postgres and Snowflake have completely different systems for data types, among other things, and so a translation layer is necessary.
  2. Fragility: to directly copy data from Postgres to Snowflake is to tightly couple them.  This means that Postgres could break Snowflake and Snowflake could break Postgres very easily.

Enter Kafka, a highly scalable messaging platform.  As a middle transport layer, if we send messages from Postgres through Kafka into Snowflake, we have decoupled Postgres and Snowflake.  We can also point to another Kafka technology, Schema Registry, to speak to the translation problem.  These technologies are great, but something is still missing: how do they all talk to each other and integrate?  Is there perhaps some wise wizard who knows how to mediate between all such parties?  Is there a technology which mediates between Kafka and these data platforms?

Kafka Connect as the Guide on our Journey

Yes, there is such a thing.  It’s called Kafka Connect, a technology that moves data to and from Kafka, which is highly extensible, with many data platforms currently supported.

To use Kafka Connect, we deploy what is known as a “connector”, which moves data either to or from Kafka.  “To” and “from” in this context have special terms in the Kafka Connect world:

  • To Kafka – “Source connector” – meaning, it pulls data from a “source” data platform into Kafka
  • From Kafka – “Sink connector” – meaning, it pulls data from Kafka and “sinks” it into another data platform

That is a rather simple idea with a lot of power.  We at present have two technologies in production built on Kafka Connect:

  • Debezium: It primarily provides source connectors into Kafka.  Debezium is an awesome open source project, which is also in the rare category of a performant Postgres ETL technology that uses Postgres’ native logical replication.
  • Snowflake’s Kafka Connector: to move data from Kafka into Snowflake (sink).  Also open source, this connector is maintained by Snowflake, and is being actively improved upon.

In that both of these connectors are built on Kafka Connect, we can deploy them in the same way.  Once one uses Kafka Connect, it is a short step to other connectors, such as MySQL to Kafka, or Kafka to Elasticsearch, Neo4j, s3, etc.

It has been several months now that we have been running containerized Kafka Connect in production.  However, we chose not to use a clustered (multi-node) deployment just yet.  In production, we are only using one node.  Although we were able to get multiple nodes running on staging, it was not feasible to automate this deployment in production.

We found in our journey that there are many good articles on Kafka Connect, but not so many with technical detail on multi-node deployments. In the following sections, we describe in detail how we got a multi-node cluster working in staging and the difficulties we ran into, in the hopes that someone might have some insights on solving the multi-node deployment problem (perhaps using a container orchestration tool).

Warning — a lot of technical details follow — the casual reader is welcome to skip to the final section.

Secured: The REST API

The “Secure” part of this project applies mainly to Kafka Connect’s REST API.  The key is the following: when you have a “connector” running in Kafka Connect, one of the workers will pick up the work for that connector.  That worker is then called the leader.  If you make a REST API call to a different worker, it will forward the request to the leader.  Herein was our greatest challenge: the correct SSL configuration for secure communication between nodes was baffling (see below).

Containerized: Easy

To run Kafka Connect as a container means especially easy management.  In a cluster of Kafka Connect containers, if one fails, we just spin up the same container image anywhere and it “just works”.  All its state is persisted in Kafka, so there is no state in the container we cannot live without.

Cluster: Scalable and Resilient

By a cluster here, we mean multiple unique host systems, running one container each.  This provides both scalability and resilience.  Kafka Connect workers automatically form a cluster with very little configuration.  In the case of a node failure, Kafka Connect automatically handles redistributing the work to other available nodes.  This process is called rebalancing.

The Journey of a REST API Call

The following diagram follows a single REST API call in order to show how it works with a distributed Kafka Connect cluster.  Consider the following diagram and its explanation below:

Suppose we have a Postgres adventure novels database, with data on novel names, characters, and publication dates.  We also have a connector which moves data from that Postgres database into Kafka.  This connector is registered to the Kafka Connect worker running on Host 2 in the diagram above (we will call it Worker 2), which is thus known as the leader for that connector.  Here is what happens when a REST API call is made to update the configuration for this connector (for example, to add a new table):

  1. Client makes a REST API call to the load balancer for the Kafka Connect cluster to update configuration.
  2. The load balancer chooses Host 1 to serve this request, and contacts nginx on port 443.  HTTP Basic auth configured in nginx is to validate the client.
  3. On Host 1, nginx forwards the request to Kafka Connect Worker 1 on port 8083.  Worker 1 discovers that Worker 2 is the leader for the Postgres connector, not Worker 1.  Worker 1 is therefore known as a follower.  It needs to forward the request to Worker 2.
  4. Host 1 / Worker 1 contacts Host 2 / Worker 2 through nginx.
  5. On Host 2, nginx forwards the request to Kafka Connect Worker 2 on port 8083.  Because is the leader, it can serve the request!
  6. Worker 2 communicates directly with Kafka to update the configuration for the Postgres connector.
  7. Worker 2, having then received a REST response from Kafka, forwards the response through nginx to the load balancer.
  8. The load balancer forwards the response to the client.

Docker Compose

Given the above diagram and scenario, the docker-compose below should make more sense:

version: '3'
services:
  connect:
    image: debezium/connect:1.2.1.Final
    ports:
     - "127.0.0.1:8083:8083"
    volumes:
     - /etc/kafka/secrets:/etc/kafka/secrets
     - /var/ssl:/var/ssl
    environment:
     - BOOTSTRAP_SERVERS=pkc-woot.region.aws.confluent.cloud:9092
     - GROUP_ID=debezium-postgres-staging
     - ADVERTISED_HOST_NAME=my-postgres-staging-01.node.com
     - ADVERTISED_PORT=443
      - CONNECT_REST_ADVERTISED_LISTENER=https
     - CONNECT_LISTENERS_HTTPS_SSL_CLIENT_AUTH=requested
     - CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_LOCATION=/var/ssl/truststore/kafka.truststore.jks
     - CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_PASSWORD=foo
     - CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_LOCATION=/var/ssl/keystore/kafka.keystore.jks
     - CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_PASSWORD=foo
     - CONFIG_STORAGE_TOPIC=dbz_pg_connect_configs
     - OFFSET_STORAGE_TOPIC=dbz_pg_connect_offsets
     - STATUS_STORAGE_TOPIC=dbz_pg_connect_statuses
     - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
     - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
     - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
     - OFFSET_FLUSH_INTERVAL_MS=8000
     - OFFSET_FLUSH_TIMEOUT_MS=60000
     - CONNECT_SECURITY_PROTOCOL=SASL_SSL
     - CONNECT_SASL_MECHANISM=PLAIN
     - CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="foo" password="bar";
     - CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
     - CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="foo" password="bar";
     - CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
     - CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
     - CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="foo" password="bar";
     - CONNECT_CONSUMER_SASL_MECHANISM=PLAIN

Notice the following, as noted in our diagram:

  1. The mapping 127.0.0.1:8083:8083 means that we are only allowing clients from 127.0.0.1 (localhost) to talk to the Kafka Connect worker on port 8083 (the default).  nginx will talk to it on localhost.  If we merely specified 8083:8083, the port would be listening on all interfaces, meaning someone could connect to it without going through nginx.
  2. We are running one container per host; that is why ports only shows one port: 8083.
  3. The GROUP_ID field is how Kafka Connect identifies other workers to form a cluster.  Thus, setting it to debezium-postgres-staging for all 3 workers will cause Kafka Connect to form a cluster from the 3 workers, which each reside on 3 separate nodes.  Also, the *_STORAGE_TOPIC variables should be the same across all nodes of the cluster.
  4. ADVERTISED_PORT is set to 443, which is the port exposed via nginx.  This is the port on which communication will happen for the REST API.  This port 443 maps to port 8083 via our nginx config.
  5. ADVERTISED_HOST_NAME is set to the host on which the container resides.  This is the one property that will differ between nodes in the docker-compose.  On node 2 it would be my-postgres-staging-02.node.com, and on node 3 it would be my-postgres-staging-03.node.com.

A few other important details:

  1. The REST endpoint for the load balancer is my-postgres-staging.node.com (without 01/02/etc. which correspond to individual nodes).  We will see how this is used later in the nginx ssl config.
  2. We are using Debezium’s Kafka Connect docker images. The usage of ADVERTISED_PORT and ADVERTISED_HOST_NAME is slightly different compared to Confluent’s Kafka Connect docker images.
  3. We are running one container per host.  This is relevant in particular in that ports only shows one port.
  4. Although node 3 is not used in this particular example, it is shown as an example of an available worker which provides redundancy.

Abundance of SSL

When we said above that we ran into difficulties solving the multi-node deployment problem, deciding not to go to production with it, the SSL configuration for the REST endpoint is what we meant.  Although we show here how we configured the SSL layers, it would have been a lot of work to automate something we were not sure was the right solution.  It also would not have solved the problems of discovering and configuring other nodes in a cluster, especially in AWS with transient IP addresses.

In order to get the communication to happen between nodes with SSL encryption, we had to configure Java’s SSL certs and standard openssl certs as well, on all nodes.  How does one automate such a deployment?  Perhaps there is some hope with something like TLS bootstrapping offered by Kubernetes, but we have not attempted that and we don’t see that it would handle the generation of the Java keystore and truststore.  In any case, we got it to work in staging, and here is how we did it!

Configuring the Java SSL layer

Notice these four variables from our docker-compose:

    - CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_LOCATION=/var/ssl/truststore/kafka.truststore.jks
    - CONNECT_LISTENERS_HTTPS_SSL_TRUSTSTORE_PASSWORD=foo
    - CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_LOCATION=/var/ssl/keystore/kafka.keystore.jks
    - CONNECT_LISTENERS_HTTPS_SSL_KEYSTORE_PASSWORD=foo

We had to create this SSL keystore/truststore manually.  We ran the kafka-generate-ssl-automatic.sh script provided by confluent on each node as per the README instructions.  Here is what we ran:

mkdir -p /var/ssl
chown kafka /var/ssl
cd /var/ssl
apt-get install default-jre
export COUNTRY=US
export STATE=IL
export ORGANIZATION_UNIT=SE
export CITY=Chicago
export PASSWORD=secret
bash ./kafka-generate-ssl-automatic.sh

After execution, we saw:

root@my-postgres-staging-01:~# ls -l /var/ssl/*
/var/ssl/keystore:
total 8
-rw-r--r-- 1 kafka root 5181 May 13 09:39 kafka.keystore.jks

/var/ssl/truststore:
total 12
-rw------- 1 kafka root 1854 May 13 09:31 ca-key
-rw-r--r-- 1 kafka root 7509 May 14 08:35 kafka.truststore.jks

Now add the keystore to the other node(s).  These commands have to be run for each node:

root@my-postgres-staging-01:~$ scp /var/ssl/keystore/kafka.keystore.jks my-postgres-staging-02.node.com:/tmp
root@my-postgres-staging-01:~$ ssh my-postgres-staging-02.node.com
root@my-postgres-staging-02:~# cd /var/ssl
root@my-postgres-staging-02:/var/ssl# keytool -importkeystore -srckeystore /tmp/kafka.keystore.jks -destkeystore truststore/kafka.truststore.jks
Importing keystore /tmp/kafka.keystore.jks to truststore/kafka.truststore.jks...
Enter destination keystore password:
Enter source keystore password:
Entry for alias localhost successfully imported.
Existing entry alias caroot exists, overwrite? [no]:  no
Enter new alias name    (RETURN to cancel import for this entry):  stg01
Entry for alias caroot successfully imported.
Import command completed:  2 entries successfully imported, 0 entries failed or cancelled

Configuring the Open SSL layer

Now we also have to generate self-signed openssl certs, then add the CA cert to the truststore and use the server cert/key in our nginx config.

Here again is where we are not clear on how to automate.  This should also work on AWS EC2 instances, but we are not sure how each Kafka Connect node would know what other nodes it would need to communicate with (hostnames/changing AWS IP addresses).  In our case, of course, we simply configured the host names we already knew about.

The following openssl commands can be run on any node/host/laptop that has openssl installed.  Then the certificates and keys should be copied as needed after they have been generated.  For this POC, we made a directory /var/ssl/test in which to save the SSL certs.

  1. Generate a CA key. It is extremely important to keep this key secure since anything it signs will be trusted:
    openssl genrsa -des3 -out myCA.key 2048
  2. Self-sign a CA certificate with the CA key from step 1:
    openssl req -x509 -new -nodes -key myCA.key -sha256 -days 1825 -out myCA.pem
  3. Generate server key:
    openssl genrsa -out my-postgres-staging-01.node.com.key 2048
  4. Generate certificate signing request (CSR) for a server certificate – be sure to use the server-name as the common-name (we have to determine how to add IP addresses as SSL server-alternative-names for this to work in AWS):
    openssl req -new -key my-postgres-staging-01.node.com.key -out my-postgres-staging-01.node.com.csr
  5. Use the CA key to sign the CSR, which generates the server certificate:
    openssl x509 -req -in my-postgres-staging-01.node.com.csr -CA myCA.pem -CAkey myCA.key -CAcreateserial -out my-postgres-staging-01.node.com.crt -days 1825 -sha256
  6. On the same node that you built the CA, repeat for other server names:
    openssl genrsa -out my-postgres-staging-02.node.com.key 2048
    openssl req -new -key my-postgres-staging-02.node.com.key -out my-postgres-staging-02.node.com.csr
    openssl x509 -req -in my-postgres-staging-02.node.com.csr -CA myCA.pem -CAkey myCA.key -CAcreateserial -out my-postgres-staging-02.node.com.crt -days 1825 -sha256
  7. Copy the myCA.crt certificate (NOT the key), and the certificates/keys you generated for other nodes to the other nodes, then add myCA.crt to the java truststore.  Again, this has to be run on each node.  Example:
    scp /var/ssl/test/myCA.pem my-postgres-staging-01.node.com:/var/ssl/test/
    ssh my-postgres-staging-01.node.com
    root@my-postgres-staging-01:/var/ssl/test# keytool -import -alias testCA -file /var/ssl/test/myCA.pem -storetype JKS -keystore /var/ssl/truststore/kafka.truststore.jks 
    Enter keystore password: ......   
    Trust this certificate? [no]:  Y   
    Certificate was added to keystore
  8. Now we have to update the site’s nginx configuration to use the new server certificate and key.   We updated the ssl_certificate and ssl_certificate_key lines in /etc/nginx/sites-available/my-postgres-staging.node.com to use the certifcates/keys we created with the CA that we added to the truststore.  Here is what it looks like on each node:
    • my-postgres-staging-01.node.com
      ################
      # HTTPS Server #
      ################
      server {
        listen 443 default_server deferred;  ssl on;
        ssl_certificate /var/ssl/test/my-postgres-staging-01.node.com.crt;
        ssl_certificate_key /var/ssl/test/my-postgres-staging-01.node.com.key;
    • my-postgres-staging-02.node.com
      ################
      # HTTPS Server #
      ################
      server {
        listen 443 default_server deferred;  ssl on;
        ssl_certificate /var/ssl/test/my-postgres-staging-02.node.com.crt;
        ssl_certificate_key /var/ssl/test/my-postgres-staging-02.node.com.key;
  9. We reload nginx
    sudo systemctl reload-or-restart nginx

Going Along For the Journey

If you have stayed with us this long, we commend you.  We are almost done.  Thankfully, what remains to complete our setup is much more straightforward.  If we have started our containers as defined above using docker-compose, and we have SSL configured, and we have our nginx and load balancer setup, we now have a running Kafka Connect cluster.  We can register the Postgres connector by sending the following REST request to it:

curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
https://my-postgres-staging.node.com/connectors/ \
-d @adventure_novels.json \
-u 'user:pass'
{
  "name": "adventure_novels",
  "config": {
    "plugin.name": "pgoutput",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.port": "5432",
    "datatype.propagate.source.type": ".+\\.numeric,.+\\.bytea",
    "decimal.handling.mode": "string",
    "include.unknown.datatypes": "true",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "heartbeat.interval.ms": "10000",
    "heartbeat.action.query": "INSERT INTO logical_ticker.tick (tick_time) VALUES (now()) ON CONFLICT (db) DO UPDATE SET tick_time = now();",
    "slot.name": "debezium",
    "database.hostname": "adventure-novels-primarydb.example.com",
    "binary.handling.mode": "base64",
    "table.whitelist": "^logical_ticker.tick$,^novels$,^authors$,^characters$",
    "name": "adventure_novels:debezium_postgres",
    "database.dbname": "adventure_novels",
    "database.server.name": "adventure_novels",
    "database.user": "debezium",
    "database.password": "$pass",
    "key.converter.schema.registry.url": "https://psrc-happyschemas.region.aws.confluent.cloud",
    "value.converter.schema.registry.url": "https://psrc-happyschemas.region.aws.confluent.cloud",
    "key.converter.basic.auth.credentials.source": "USER_INFO",
    "key.converter.basic.auth.user.info": "$key:$secret",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.user.info": "$key:$secret"
  }
}

If this POST request succeeds, we now have data flowing from Postgres to Kafka!  As our diagram above shows, this connector is assigned to Worker 2, the leader.  If I now want to make a request to update this connector, here is where all 8 steps of our diagram happen, and because we have configured it correctly, succeed.

Have a Safe Journey

In the end, we were able to finally see the desired outcome of our diagram.  The client issued a request to the connector to restart it:
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
https://my-postgres-staging.node.com/connectors/adventure_novels:debezium_postgres/restart \
-u 'user:pass'

Not only did we receive a successful API response, but we saw an SSL handshake message in our logs indicating that in fact the request was forwarded from the follower to the leader!

{"log":"2020-05-14 16:38:02,396 INFO   ||  x509=X509@62b3665d(localhost,h=[my-postgres-staging-01.node.com],w=[]) for Client@7ca864a1[provider=null,keyStore=file:///var/ssl/keystore/kafka.keystore.jks,trustStore=file:///var/ssl/truststore/kafka.truststore.jks]   [org.eclipse.jetty.util.ssl.SslContextFactory]\n","stream":"stdout","time":"2020-05-14T16:38:02.396851738Z"}
{"log":"2020-05-14 16:38:02,397 INFO   ||  x509=X509@69cbe9e2(caroot,h=[my-postgres-staging-01.node.com],w=[]) for Client@7ca864a1[provider=null,keyStore=file:///var/ssl/keystore/kafka.keystore.jks,trustStore=file:///var/ssl/truststore/kafka.truststore.jks]   [org.eclipse.jetty.util.ssl.SslContextFactory]\n","stream":"stdout","time":"2020-05-14T16:38:02.39731858Z"}

Where to now?

The complexity of the SSL configuration, which still left several problems unsolved, made us decide not to go to production with this staging multi-node deployment.  We are still sharing it because we believe we are so very close, and we are hoping that more community feedback could fill in the necessary gaps.  It was indeed a lot of work we went through only for the REST API to work.  This effort reflects how promising the technology is to us.  We are hopeful that there may be an easier way to do it, at least in the near future.

Kafka Connect has massive potential to aid us on our data pipeline journey, especially if containerized and with the benefits of a clustered deployment.  It is my hope that this post at the very least can be a point of reference in making clustered Kafka Connect deployments easier in the future.

The journey is never over.  At Enova, we are encouraged to embark on exciting journeys like this when it aligns with our goals.  May you be wise also in your data pipeline journey.