Managing Auto-Generated Kafka Topics in an Unpredictable World
Kafka has hit a place of maturity across so many Big Data stacks that at this point we’re all becoming far too familiar with its hang-ups. Don’t get me wrong at all, it’s the messaging framework we would choose in nearly every situation (apart for gRPC) and it handles data at any scale like an absolute beast. For this reason, it’s important to work around things like number of topics limitations (which, by the way, will be much less of a concern in the newest versions). One situation we encountered was deeply illustrative of how not to interact with Kafka and what the repercussions can be, and in rethinking that interaction we may have discovered some useful best practices.
It began with the need to split data streams into a different topic per action that was using that stream. Imagine you’ve got a query engine running on each event coming through your main pipeline and for each query it split matching events off into their own topics– one topic per query. As that number of queries grows with the maturity of your product, so does the number of topics linearly. Pretty common situation and honestly how many topics could that create when they’re all user generated? Well on top of that, and what really got us, is the shutdown and cleanup of those topics.
Imagine that you’re done with a query (which has its own topic associated with it) and it’s now time to stop writing/reading to/from that topic and hopefully delete it. Well it’s not so easy, usually the services writing are not the same services that are reading. Kafka’s APIs certainly have a deletion method, which will clean up a topic (eventually) if two things never occur again under default settings: another write on that topic, or another read on that topic. This is because under default settings Kafka automatically will create (or in this case re-create) a topic as soon as it is interacted with. This begs the question, which side should be responsible for the topic deletion? Let’s go over a few possibilities for how we can handle this very common situation.
Option 1. Turn off Auto Creation of Topics
This one is the easiest to enforce (you simply have to set the field ‘auto.create.topics.enable’ in server.properties for Kafka to ‘false’, as described here), but creates an additional burden on the services that interact with Kafka. Even if the service is using only a single topic it’s unlikely that it is anticipating having to explicitly create that topic. Here is a snippet that one can use to create topics if using this route:
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import scala.jdk.CollectionConverters._
import java.util.Properties
import org.apache.kafka.common.errors.TopicExistsException
import java.util.concurrent.ExecutionException
/**
* @param kafkaBootstrap The kafka bootstrap.servers value for your cluster,
* e.g. kafka01.dev:9091,kafka02.dev:9091,kafka03.dev:9091
* @param replication Should be less than the number of kafka servers in cluster
* @param partitions We default to 8 for most cases
*/
def createTopic(topic: String, kafkaBootstrap: String, replication: Int, partitions: Int): Unit = {
val adminSettings = new Properties()
adminSettings.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap)
val adminClient = AdminClient.create(adminSettings)
try {
val replication = Math.min(kafkaBootstrap.split(",").length, replicationFactor).toShort
val toCreate = List(new NewTopic(topic, partitions, replication)).asJava
val result = adminClient.createTopics(toCreate)
result.all().get()
} catch {
case ex: ExecutionException => ex.getCause match {
case _: TopicExistsException => // We've already created this topic, ignore exception
case inner: Throwable =>
log.error(s"CT400: Failed to create topic '$topic'", inner)
}
} finally {
adminClient.close()
}
log.info(s"CT100: Created topic '$topic'")
}
If you build your Kafka interactions from the ground up expecting to have to create topics programmatically in call cases then this would be a pretty solid approach. It prevents stale topics from being recreated later due to references to that topic being re-instantiated or failing to close down entirely. Once you delete a topic it will stay deleted until this create code is hit again, unlikely in the stale case.
Drawbacks
The big pitfall of this approach is that it can be a recipe for disaster if one changes that ‘auto.create.topics.enable’ setting in an established stack. We’ve tried this before and the result was tons of lost functionality in our legacy services. If you’ve got a service that was often ‘creating’ (not explicitly) new topics then it will immediately cease to function. Even more nefariously, when Ops goes to spin up a completely new pod, you’re bound to run into many surprises as services that only used one or two static topics ever will not be able to create those topics, often leading to failures to initialize at all. In our view it’s not a fair solution in an existing stack to change the common Kafka cluster to not create topics automatically, in a large product suite it’s nearly impossible to predict all the places that would be affected at that time or in the future.
Option 2. Spinning up a Second Kafka Cluster
For teams with the bandwidth, it might make the most sense to spin up a second Kafka cluster which newer services (and problem services that are creating many topics) can use instead of the current one. This new pod would have our ‘auto.create.topics.enable’ field set to ‘false’ such that we don’t need to worry about topics being recreated, similar to Option 1. However, the advantage of this approach is that we won’t break legacy services using the original cluster and we can slowly move things over as they are upgraded.
In our case this was the eventual solution we went with. In practice, what we did is spun up this second pod and moved all of our data streaming onto that new cluster. The main issue that could be encountered here is if you’ve got a processor spinning data off onto extra topics to be consumed by some legacy service. Obviously, if this legacy service is expecting their topics to show up on the old cluster this would be a problem. Our solution was to change only the data processor (which in this case was a Samza job) that was responsible for sending data to by default create and push topics onto the older Kafka cluster. This way anything that wanted to use the new cluster could tell our Samza streaming job to write to there specifically, while legacy services didn’t need to change at all. This is the approach I would recommend if it’s achievable from a resources standpoint.
Option 3. Reapers Everywhere
This was the approach that we originally attempted to solve the issue of stale Kafka topics. We couldn’t isolate which process was holding on to our Kafka topics and recreating them after we’d deleted them. This would often happen around 15 minutes later, so a simple retry at time of deletion wouldn’t have worked. It was our hypothesis that creating a Reaper, a program that runs on a loop and attempts to delete stale objects after a certain interval, with a time interval of 30 minutes would be able to delete the left over topics after they’d long been dereferenced.
Writing the Reaper was relatively simple, the below snippet shows a generic example of how one could go about it. In your case the challenge will be to determine which topics should still be there, most likely with a reference to your DB or by forwarding a true-up list to this class. Note that we use fs2 library to get a KafkaAdmin class but there are many ways to do this:
import java.util.concurrent._
import fs2.kafka.{AdminClientSettings, KafkaAdminClient}
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.jdk.CollectionConverters._
class TopicReaper(kafkaBootstrap: String) {
private val reapingGroup: String = "reaping-group"
val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run() = reapTopics()
}
val f = ex.scheduleAtFixedRate(task, 1, 30, TimeUnit.MINUTES)
f.cancel(false)
def reapTopics(): Unit = {
val zkTopics = activeTopics()
val kafkaTopics = allTopics()
val adminSettings = AdminClientSettings.apply[IO].withBootstrapServers(kafkaBootstrap)
val kafkaAdmin = KafkaAdminClient.resource(adminSettings)
val toDelete = kafkaTopics.filter { topicToCheck =>
!zkTopics.contains(topicToCheck)
}
if (toDelete.nonEmpty) {
kafkaAdmin.map(_.deleteTopics(toDelete))
}
}
private def activeTopics(): List[String] = {
// Fill this in with code to return what topics should still exist
}
// Will list all topics in Kafka
private def allTopics(): List[String] = {
val props = new Properties()
props.put("bootstrap.servers", kafkaBootstrap)
props.put("group.id", reapingGroup)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer(props)
consumer.listTopics.keySet().asScala.toList
}
}
Drawbacks
The issue here is that even if one attempts another deletion of the topics it doesn’t solve the issue of some code somewhere still referencing that topic. In our case we saw a large reduction in left over topics with this code but eventually some would still return. This doesn’t solve the issue of consumers/producers recreating those topics later. Also, Kafka seemed to suffer when we would do mass deletes in this way.
Lessons Learned
This was a messy problem and our solution of a second Kafka cluster was far from convenient. Looking back, it would have been prescient to have spun up our original Kafka cluster with automatic topic creation disabled, and to have spent extra time ensuring that topics were disconnected from and deleted completely with each new interaction. Kafka (at least before the 2.0.0 upgrade) would become interminably slow until finally locking entirely as topics leaked over months, at which point it became a top-priority escalation. Don’t make the same mistakes we did, design around this from the start.