MapR Streams 5.2 – Consumer Group Hang; Related to Shut-Down

Some Context

I spend most of my time these days on a big-data metrics project which heavily uses the MapR platform.  All components of the project are decoupled by MapR Streams, which is basically a custom implementation of Kafka plus some other bells and whistles.

In general, MapR streams has been very stable and I have great things to say about it.  But before version 6.1 of MapR, there is a very frustrating bug which has caused me quite a lot of pain.

What’s the bug!?

  • I have a Java process which consumes off of MapR streams and writes to OpenTSDB daemons (OpenTSDB isn’t related to the bug).
  • Multiple copies of the Java process may be running at a time.
  • The processes consume messages off of 1 MapR-Streams topic.
  • The topic has 16 partitions.
  • I restart the processes once a day just to keep them running well.  I haven’t encountered a reason for this, I just think its a good habit.
  • Sometimes when the restart happens, a subset of the partitions, or potentially all of the partitions, hang!

What do you mean hang!?

Well, by hang, I mean that when the processes come back, some or all of the 16 partitions will not be available for retrieving data.  So, the topic will just keep piling up with more and more messages, and the Java applications will just sit there, doing nothing.

Given the system processes billions of messages a day, this is pretty problematic.

Before we can dig into why the hang happens, we need to understand some more technical detail about MapR-Streams.

MapR-Streams Basics

  • You create a “stream” in MapR.
  • A stream is a container for topics.
  • A topic is basically a message queue broken into partitions.
  • Each partition is a unit of parallelism… realistically, each one is basically a queue within the topic.
  • Each partition is replicated to multiple servers for high availability.
  • Messages within each partition are guaranteed to be in the order in which they were sent.

Consuming Messages

When you want to consume messages from a topic, usually the Kafka subscribe API is used (although there is an alternative/older assign API too).

  • When you subscribe to a topic, you provide a consumer-group name; it’s just an arbitrary string; you could call it “woohoo!” if you felt like it.
  • All consumers that subscribe to the same topic with the same consumer-group will “share” the partitions of that topic.  By share, I mean that consumers will all get a subset of the partitions.  A partition is only ever owned by one consumer a time.
  • MapR-Streams stores a long integer offset for each partition for each consumer group.  So, if you had 2 consumer-groups reading a topic with 4 partitions, MapR-Streams would be tracking 8 offsets.

As practical example… If I have 2 Java processes, each with 2 threads having 1 consumer each, then I have 4 consumers running.  If the consumers subscribe to a topic with 16 partitions, it is likely that each of the 4 consumes will end up with 4 partitions.

Again, only one consumer in a consumer-group can own a partition of a topic at a time.  So… if the consumer dies or is shut down, it will release the partition, and another consumer will automatically pick it up (which is totally awesome).  This means that you can have 4 servers reading the same topic in parallel and you can be confident that if one server dies, the other 3 will pick up its share of the load transparently without you doing anything on the client side.

So, what’s the bug?

Here’s the bug…  In all the versions before 6.1 of MapR, there is a race condition which is very rare but very bad.  If I shut down a process with multiple consumers, each of the consumers will shut down (which is fine).

Here’s the problem though.  There is a cluster-side bug with MapR-Streams where a partition owned by a consumer that is shutting down can actually get re-assigned to another consumer that is shutting down!

If that happens, then the second consumer can shut down without releasing the partitions.  As far as the cluster is concerned, the partition is now owned by a consumer that does not exist.  It will never notice this, the partition is basically not available for the consumer group ever again after this.

How can you verify this bug?

To determine if you have this bug, you can use the stream/assign/list REST API or CLI call against your MapR-Stream.  Given your consumer group name, it will tell you which processes on which hosts own which partitions.  Here’s a link for the documentation: https://mapr.com/docs/home/ReferenceGuide/stream_assign_list.html.

If you have this bug, you will notice that your partitions are owned by processes that no longer exist!

Workarounds

MapR identified this bug for me when I posted it to the community forums.  One of their representatives said it is fixed in 6.1 which is being released soon.  If you’re like me, you’re working somewhere where upgrading takes a lot of time (hooray!), so you’ll need some ways to mitigate the issue to use streams in production reliably.

  • MapR recommends restarting Warden on the MapR-Streams nodes.  This kind of clears out the in-memory state and stops the issue (apparently).   In our case, it seemed to help but it happened again a couple of weeks later.
  • I found adding shut-down hooks to our applications with large delays between each consumer shut-down helped a lot, but the problem did eventually happen again.
  • Next, I made each Java application just have one consumer, and I made sure our deployment automation left many minutes between restarting multiple copies of the application.  The applications read off of their one consumer as fast as possible, send the messages to a blocking queue, and then multiple threads process the messages off of that.  This surprisingly worked well because streams is so fast that I didn’t need multiple reader threads, even for loads of 200,000 messages a second.  As there is only one consumer per process and the processes are staggered a lot, there is not much of a chance to hit the error.  We haven’t seen it manifest again since this change (yet).
  • As a “emergency” option, I also used the Kafka APIs to make an application that can duplicate a consumer group; i.e. take its configuration and create it under another group name.  Then I can just tell my app to use that new group name and its fine/not-hung.  This is more of a “production failed! what does operations do!?” sort of thing though.

All done!

I hope you never hit this bug, but if you do, I hope this saves you tons of time! I suffered a lot with this one :).

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s