Zookeeper as a Messaging Service?

The Surprising Use Case for Everyone’s Favorite State Service

Zookeeper has a special place in all our hearts; mainly due to the fact that it’s so dang reliable for storing configuration or state in a way that can be instantly seen by services anywhere. The applications of this are limited only by the imagination; from keeping track of what dependent servers are UP in a cluster to the very common use case of finding Kafka cluster brokers– I’ll cover topics like these in future posts. What I’d like today is to discuss a way to bring Zookeeper beyond the classic usage of distributing configurations which facilitate communication between servers, and into a paradigm where Zookeeper itself is doing the communication.

The key feature of Zookeeper that allows this kind of interaction is its ability to ‘watch’ its internal ‘filesystem’ for changes. Setting up something like this is blessedly simple, first let’s get a familiar looking class that writes to Zookeeper. Imagine that what it’s writing is a query for our data pipeline that we’d another service anywhere to see and begin splitting off data to another Kafka topic, for instance:

import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode

class ZkQueryRegister(curator: CuratorFramework) {

  // Writes a query to be picked up by our 'QueryService'
  // @param queryId Arbitrary GUID used to identify this query
  // @param query The query to run on our data stream, might look like 'SELECT data.value'
  def registerInZk(queryId: String, query: String): Unit = {
    val path = s"/registry/queries/$queryId"
    
    curator.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.PERSISTENT)
      .forPath(path, query.toByteArray)
  }
}

Easy enough, some server somewhere has now registered this query into ZK’s filesystem. Ostensibly this does nothing unless there’s another service listening to for changes on this exact path. It’s a slightly trickier to set that up but honestly not that bad compared to many messaging services out there. Let’s take a look at a simple example that would do the trick:

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener}

// The extension of PathChildrenCacheListener allows us to listen to Zk changes
class ZkQueryRegistryWatcher(curator: CuratorFramework) extends PathChildrenCacheListener {
  // The Zk directory we want to watch for new additions
  val queryDirectory = "/registry/queries"
  // Setting up the Zk cache which lets us register this path
  val cache = new PathChildrenCache(curator, queryDirectory, true)
  // Starts listening for changes on the path, by setting 'this' we're telling
  // the listener to call 'childEvent(...)' below when we see a change
  cache.getListenable.addListener(this)
  // Being listening for changes, also by using this setting any children that already
  // exist in this Zk directory will be fired as events as well
  cache.start(StartMode.POST_INITIALIZED_EVENT)


  // This method is part of the Zk API and will be called due to our setup above
  override def childEvent(CuratorFramework client, PathChildrenCacheEvent event): Unit = {
    if (event != null) {
      val queryId = new File(event.getData.getPath).getName()
      val query = new String(event.getData.getData)

      event.getType() match {
        case PathChildrenCacheEvent.Type.CHILD_ADDED =>
          // Up to you what to decide to when a new query is detected
          queryRegistered(queryId, query)
        case PathChildrenCacheEvent.Type.CHILD_REMOVED =>
          // Up to you to figure out how to stop those queries from running
          queryUnregistered(queryId, query)
        case PathChildrenCacheEvent.Type.CHILD_UPDATED =>
          // Up to you do decide what to do when a query with the same ID is updated
          queryUpdated(queryId, query)
      }
    }
  }
}

So as you can see, the listening part is a bit trickier that the writing– but not too confusing. When you’ve got this setup the magical part is that these two classes can exist absolutely anywhere on different services. It’s even possible to have multiple listeners on different services acting on these events, or multiple writers publishing to the same directory– there’s no limitation in that regard. Essentially what you’ve made is a messaging service that also has the advantage of persistent state allowing one to easily true-up with what’s in the Zk filesystem. We’ve used this approach in a variety of cases, and I imagine it will continue to be a very relevant approach to problems I encounter throughout my career. Let me know if you can think of any other interesting applications of this feature!

Share