PORT is deprecated. Please use SCHEMA_REGISTRY_LISTENERS instead.

I was trying to launch a schema-registry within a kubernetes cluster and every time I wanted to expose the pod’s port through a service, I was greeted by the nice title message:

if [[ -n "${SCHEMA_REGISTRY_PORT-}" ]]
then
  echo "PORT is deprecated. Please use SCHEMA_REGISTRY_LISTENERS instead."
  exit 1
fi

This happened because I had named my service schema-registry also (which was kind of not negotiable at the time) and kubernetes happily sets the SCHEMA_REGISTRY_PORT environment variable to the value of the port you want to expose. But it turns out that this very named variable has special meaning within the container.

Fortunately, I was not the only one bitten by this error, albeit for a different variable name, but I also used the same ugly hack:

$ kubectl -n kafka-tests get deployment schema-registry -o yaml
:
    spec:
      containers:
      - command:
        - bash
        - -c
        - unset SCHEMA_REGISTRY_PORT; /etc/confluent/docker/run
        env:
        - name: SCHEMA_REGISTRY_LISTENERS
          value: http://0.0.0.0:8081/
:

Kafka, dotnet and SASL_SSL

This is similar to my previous post, only now the question is, how do you connect to a Kafka server using dotnet and SASL_SSL? This is how:

// based on https://github.com/confluentinc/confluent-kafka-dotnet/blob/v1.0.0/examples/Producer/Program.cs

using Confluent.Kafka;
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;


namespace Confluent.Kafka.Examples.ProducerExample
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            string topicName = "test-topic";

            var config = new ProducerConfig {
                BootstrapServers = "kafka-server.example.com:19094",
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SslCaLocation = "ca-cert",
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "USERNAME",
                SaslPassword = "PASSWORD",
                Acks = Acks.Leader,
                CompressionType = CompressionType.Lz4,
            };

            using (var producer = new ProducerBuilder<string, string>(config).Build())
            {
                for (int i = 0; i < 1000000; i++)
                {
                    var message = $"Event {i}";

                    try
                    {
                        // Note: Awaiting the asynchronous produce request
                        // below prevents flow of execution from proceeding
                        // until the acknowledgement from the broker is
                        // received (at the expense of low throughput).

                        var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = null, Value = message } );
                        // Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");

                        // Let's not await then
                        // producer.ProduceAsync(topicName, new Message<string, string> { Key = null, Value = message } );
                        // Console.WriteLine($"Event {i} sent.");
                    }
                    catch (ProduceException<string, string> e)
                    {
                        Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
                    }
                }

                // producer.Flush(TimeSpan.FromSeconds(120));

                // Since we are producing synchronously, at this point there will be no messages
                // in-flight and no delivery reports waiting to be acknowledged, so there is no
                // need to call producer.Flush before disposing the producer.
            }
        }
    }
}

Since I am a total .NET newbie, I usually docker run -it --rm microsoft/dotnet and experiment from there.

Kafka, PHP and SASL_SSL

When you want to connect to a Kafka cluster from PHP there are numerous examples showing how to use php-rdkafka, but unauthenticated. But what happens when you need to let a customer connect to a Kafka setup and IP whitelisting is not enough? Not much easily locatable information is out there.

Why not correct this by combing through various web pages and the librdkafka source code:

<?php

$conf = new RdKafka\Conf();
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('sasl.username', 'USERNAME_HERE');
$conf->set('sasl.password', 'PASSWORD_HERE');
$conf->set('ssl.ca.location', '/usr/local/etc/ca-cert.pem');
$conf->set('ssl.cipher.suites', 'TLSv1.2');

$rk = new RdKafka\Producer($conf);
$rk->addBrokers("SASL_SSL://kafka-1.example.com:19094");
$rk->addBrokers("SASL_SSL://kafka-2.example.com:19094");
$rk->addBrokers("SASL_SSL://kafka-3.example.com:19094");

$topic = $rk->newTopic("kafka-test-topic");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $rk->poll(0);
}

while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}

?>

Still this may not be enough if it is the case that your Kafka server is on OpenSSL-1.0.2 (CentOS 7 for example) and your php client is on OpenSSL-1.1.0 (like the php:7.2-cli docker image). In such a case you need to alter your client’s openssl.cnf to comment out the following line:

;CipherString = DEFAULT@SECLEVEL=2