Locker Number W/O Repeats

A common interview problem for programming is to generate all of the possible locker numbers without repeats.

This is a little silly in the physical world… but the basic idea is:
– You have the numbers 0-9 (so, 10 numbers).
– You can choose any of the 10 for digit 1.
– You can choose any of the remaining 9 for digit 2.
– And the pattern continues.

So, this is a combinations problem… how many ways can you choose N numbers out of 10 numbers?

Calculating it Directly

  • Basically, its a limited factorial.
  • If N = 4, we have 10 options for #1, 9 for #2, 8 for #3 and 7 for #4 = 10 * 9 * 8 * 7 = 5,040 combinations.

Listing and Counting Them in Java

This problem can be expressed very easily and directly in a functional manner. In this case, that means that we continually build up our solutions immutably, as opposed to, say, trying to keep a character array that we modify as we go.

public class LockerNumberNoRepeats {
    public static void main(String[] args) {
        System.out.println("Total combinations = " +
            permutations(4));
    }

    public static int permutations(int targetLength) {
        return permutations("", "0123456789", targetLength);
    }

    private static int permutations(String c, String r,
            int targetLength) {

        //Print the result if we've reached a full locker
        //combo of the target length.  Also, return 1 so we
        //can sum up all found combinations.
        if (c.length() == targetLength) {
            System.out.println(c);
            return 1;
        }

        //Use sum to count all the permutations we see.  Also
        //cycle through all remaining letters (r) and add each
        //to our current string (c).  Then recursively call this
        //function to continue.
        int sum = 0;
        for (int i = 0; i < r.length(); ++i) {
            sum += permutations(c + r.charAt(i),
                r.substring(0,i) + r.substring(i + 1),
                targetLength);
        }
        return sum;
    }
}

Output

9874
9875
9876
Total combinations = 5040

Interleaving Iterator

Creating an interleaving iterator is a popular interview question in the Java world.

What is an Interleaving Iterator?

An interleaving iterator is an iterator of iterators. An iterator is a class that implements the following methods (others exist but are not mandatory).

boolean hasNext();
T next();

Iterators are designed to “iterate” (cycle through) a sequence of elements of unknown length. You don’t know how many elements there are; you just know that hasNext() returns true so more elements exist (or it returns false and you’re done with the iterator).

So, an iterator of iterators is an iterator that iterates over iterators which iterate over sequences of elements of another type.

Iterators can even be potentially infinite; they may never end!

Solution Details

There are quite a few different solutions for this… but the easiest one I’ve seen leverages a queue. The basic approach is:

  • Create a member variable in the class that is a Queue.
  • On construction, populate the queue with all of the provided iterators.
  • Create a utility method (getNext() in the solution below).
    • This method is designed to get the next element if it hasn’t already been found.
    • It stores it in a member variable of the class so that we know if it has already been found.
    • The methods next() and hasNext() both defer to this method to test if there is a next element and/or retrieve the next element.
    • This is important as when using the iterator, callers will call hasNext() to see if there is a next element (which must clearly get the next element to know if it exists). We want to return that already-found element when next is called instead of accidentally skipping ahead to yet-another element.
    • Also, if I (badly) made an iterator of iterators that I knew I put 10 elements in overall, I should technically be able to just call next() 10 times without calling hasNext(), even though it is bad practice.
    • IMPORTANT – the method must basically consist of:
    • while next element isn’t found and some iterators remain in the queue:
      • Take next iterator.
      • See if it has more elements.
      • If it does, take the next element and put it back in the queue if it has more.
      • If it doesn’t have more elements, add it back to the queue.
  • The next() method just gets the next element using the utility method and returns it, or throws a NoSuchElementException if there wasn’t one.
  • The hasNext() method just returns whether or not the utility method says there are more elements.

Implementation (Java)

import java.util.*;

public class InterleavingIterator<T> implements Iterator<T> {

    private Queue<Iterator<T>> iteratorQueue;
    private T next;

    public InterleavingIterator(Iterator<Iterator<T>> masterIterator) {
        if (masterIterator == null) {
            throw new IllegalArgumentException("Iterator cannot be null.");
        }
        iteratorQueue = new ArrayDeque<>();
        masterIterator.forEachRemaining(iteratorQueue::add);
    }

    private T getNext() {
        while (next == null && !iteratorQueue.isEmpty()) {
            Iterator<T> iterator = iteratorQueue.poll();
            if (iterator.hasNext()) {
                next = iterator.next();
                if (iterator.hasNext()) {
                    iteratorQueue.add(iterator);
                }
                return next;
            }
        }
        return next;
    }

    @Override
    public boolean hasNext() {
        return getNext() != null;
    }

    @Override
    public T next() {
        T n = getNext();
        next = null;
        if (n == null) throw new NoSuchElementException();
        return n;
    }

    //Test it out!
    public static void main(String[] args) {

        //Create 4 lists with a range of test cases.
        List<String> a = Arrays.asList("a", "d", "f");
        List<String> b = Arrays.asList("b", "e", "g", "h");
        List<String> c = Collections.singletonList("c");
        List<String> d = Collections.emptyList();

        Iterator<Iterator<String>> masterIterator = Arrays.asList(
                a.iterator(),
                b.iterator(),
                c.iterator(),
                d.iterator()).iterator();

        InterleavingIterator<String> interleaving =
                new InterleavingIterator<>(masterIterator);

        while (interleaving.hasNext()) {
            System.out.println(interleaving.next());
        }
    }
}

Output

a
b
c
d
e
f
g
h

The Heap Data Structure

What is a Heap?

A heap is a specialized data structure that is designed to always yield the smallest element in O(lg(n)) time (assuming it is a min-heap; a max-heap would do the opposite.

It is logically represented as a balanced binary tree with all the elements shifted as left as possible. Practically it is represented as an array because it is actually very easy to map a left-shifted binary tree into an array. The left child of any node is index 2 * N + 1 in the array and the right child of any node is index 2 * N + 2 of the array.

The O(lg(n)) retrieval time of a min or max element makes heaps ideal to be used as a priority queue.

How Does it Work?

The key to it providing this performance is that all elements are stored in such way that every element is guaranteed to be less than either of its child elements (so, logically this means the root is the smallest element). If it was a max-heap the opposite would be true.

It accomplishes this in a pretty simple way. Let’s say we’re using a min-heap:

  • The smallest element will be at the root of the tree.
  • So, to get the next smallest element, we just:
    • Take the root of the tree.
    • Move the last element in the tree (the rightmost leaf node)
      up to where the root element was.
    • Repeatedly switch this node with any nodes below it that are
      less than it (this is called “percolating down”.
    • Taking the root takes O(1) time, and percolating down at most
      takes the height of the tree, which for a binary tree is O(lg(n)).
      So, this is how we get O(lg(n)) time for taking each element.
  • If we want to insert new elements, we basically do the opposite:
    • Add the new element at the last position of the tree.
    • Repeatedly switch it upwards if it is less than its parent.
    • This is called “percolating up” and also is O(log(n)) time since
      it is also dependent on the height of the tree.

Creating a Heap

Surprisingly, a heap can actually be constructed in linear time O(n) from an existing array of objects.

This is a little time consuming to prove, so please refer to the method here: https://www.geeksforgeeks.org/time-complexity-of-building-a-heap/.

Implementation

Here is a full sample implementation of a heap (minus the construction of a heap from an existing array; in this case we just build it up using add and remove).

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;

public class MinHeap<T extends Comparable> {

    private List heap;

    public MinHeap() {
        heap = new ArrayList();
    }

    //Add to end, percolate up to correct position.
    public void add(T value) {
        heap.add(value);

        for (int index = heap.size() - 1; index > 0; index = index / 2) {
            int parent = index / 2;
            if (heap.get(index).compareTo(heap.get(parent)) < 0) {
                T temp = heap.get(index);
                heap.set(index, heap.get(parent));
                heap.set(parent, temp);
            }
        }
    }

    private static int getLeft(int i) { return i * 2 + 1; }
    private static int getRight(int i) { return i * 2 + 2; }

    //Take from top, move last element to root, percolate down.
    public T remove() {
        if (heap.size() == 0) throw new NoSuchElementException();
        T result = heap.get(0);

        //Remove the last element.  If this makes the heap empty,
        //then just return the result.
        T last = heap.remove(heap.size() - 1);
        if (heap.size() == 0) return result;

        //If we got here, the heap still has elements, so add the last
        //element to the root (0) and percolate down.  Percolating down
        //means swapping the root with its children recursively until
        //it is smaller than both of its children.
        heap.set(0, last);
        int i = 0;
        while (i = getRight(i)) {
                T leftChild  = heap.get(getLeft(i));
                T rightChild = heap.get(getRight(i));
                if (leftChild.compareTo(rightChild) > 0) {
                    minChildIndex = getRight(i);
                }
            }
            if (heap.get(i).compareTo(heap.get(minChildIndex)) > 0) {
                T temp = heap.get(i);
                heap.set(i, heap.get(minChildIndex));
                heap.set(minChildIndex, temp);
                i = minChildIndex;
            }
            else break;
        }

        return result;
    }

    public boolean isEmpty() { return heap.isEmpty(); }

    public static void main(String[] args) {
        MinHeap h = new MinHeap();
        Arrays.asList(7, 6, 2, 9, 8, 17, 46, 12, 13, 1, 7)
            .forEach(h::add);
        while(!h.isEmpty()) { System.out.print(h.remove() + " "); }
        System.out.println();

        Arrays.asList(7, 6, 2, 9, 8, 17, 46, 12, 13, 1, 7, 51)
            .forEach(h::add);
        while(!h.isEmpty()) { System.out.print(h.remove() + " "); }

        System.out.println();
        h.add(1);
        while(!h.isEmpty()) { System.out.print(h.remove() + " "); }
    }
}

Continue reading

MapR-Streams Crash Course

I was writing another blog post focusing on a hanging error in MapR-Streams, and I ended up going into some good detail on the basic architecture of the product.  So, I though it was worth breaking out into its own blog.

What is MapR-Streams?

MapR-Streams is basically MapR’s version of Apache Kafka.  It is built into their cluster automatically though; it is actually a part of their core file system (along with MapR-DB).

Apache Kafka and MapR-Streams are basically a newer form of message queuing service that is particularly well-geared for big data and extremely high loads (read: millions of messages a per second in many common use cases).

As we dive into more detail, you can refer to this diagram to get a mental picture of how the product is used.  It was taken from a MapR presentation which can be found right here.

streams-architecture

MapR-Streams Basics

  • You create a “stream” in MapR.
  • A stream is a container for topics.
  • A topic is basically a message queue broken into partitions.
  • Each partition is a unit of parallelism… realistically, each one is basically a queue within the topic.
  • Each partition is replicated to multiple servers for high availability.
  • Messages within each partition are guaranteed to be in the order in which they were sent.

Programming Languages

MapR-Streams can be used with a variety of languages including Java, Python, and C++.  It also has a RESTFUL interface which would make it available from pretty much anywhere.

Writing Messages

You use the Kafka APIs to send messages to MapR-Streams topics.  There is some one-off code to configure the producer; but after that, it’s pretty must just 1-2 lines of code to send a message from then on.

When sending the messages, you can either:

  1. Target the topic with no partitions specified.  This will cause messages you send to be distributed across all partitions in the topic.
  2. Target specific partitions in a topic to send to.  For example, I could make sure that all metrics messages for Server A’s CPU metric went to partition #3.  This way, they would be guaranteed to be in order when read by any consumers.

Creating a Kafka Producer:

public static void configureProducer(String[] args) {
    Properties props = new Properties();
    props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
    props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer(props);
}

Sending a Sample Message:

ProducerRecord rec = new ProducerRecord(
topic, messageText);
producer.send(rec);

You can find a full producer example from MapR here (I just referenced code from it): https://mapr.com/docs/52/MapR_Streams/code_for_the_sample_java_producer.html.

Consuming Messages

When you want to consume messages from a topic, usually the Kafka subscribe API is used (although there is an alternative/older assign API too).

  • When you subscribe to a topic, you provide a consumer-group name; it’s just an arbitrary string; you could call it “woohoo!” if you felt like it.
  • All consumers that subscribe to the same topic with the same consumer-group will “share” the partitions of that topic.  By share, I mean that consumers will all get a subset of the partitions.  A partition is only ever owned by one consumer a time.
  • MapR-Streams stores a long integer offset for each partition for each consumer group.  So, if you had 2 consumer-groups reading a topic with 4 partitions, MapR-Streams would be tracking 8 offsets.

As practical example… If I have 2 Java processes, each with 2 threads having 1 consumer each, then I have 4 consumers running.  If the consumers subscribe to a topic with 16 partitions, it is likely that each of the 4 consumes will end up with 4 partitions.

Again, only one consumer in a consumer-group can own a partition of a topic at a time.  So… if the consumer dies or is shut down, it will release the partition, and another consumer will automatically pick it up (which is totally awesome).  This means that you can have 4 servers reading the same topic in parallel and you can be confident that if one server dies, the other 3 will pick up its share of the load transparently without you doing anything on the client side.

Creating a Kafka Consmer:

public static void configureConsumer(String[] args) {
    Properties props = new Properties();
    props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

    consumer = new KafkaConsumer(props);
}

Consuming Messages:

List topics = new ArrayList();
topics.add(topic);
consumer.subscribe(topics);

//You should wrap this line in a loop so it happens forever, or until you
//have met some terminal condition.
ConsumerRecords consumerRecords = consumer.poll(
pollTimeout);

Iterator iterator =
consumerRecords.iterator();

while (iterator.hasNext()) {
    ConsumerRecord record = iterator.next();
    System.out.println((" Consumed Record: " +
    record.toString()));
}

You can find a full consumer example from MapR here (I just referenced code from it): https://mapr.com/docs/52/MapR_Streams/code_for_the_sample_java_consumer.html.

MapR Streams 5.2 – Consumer Group Hang; Related to Shut-Down

Some Context

I spend most of my time these days on a big-data metrics project which heavily uses the MapR platform.  All components of the project are decoupled by MapR Streams, which is basically a custom implementation of Kafka plus some other bells and whistles.

In general, MapR streams has been very stable and I have great things to say about it.  But before version 6.1 of MapR, there is a very frustrating bug which has caused me quite a lot of pain.

What’s the bug!?

  • I have a Java process which consumes off of MapR streams and writes to OpenTSDB daemons (OpenTSDB isn’t related to the bug).
  • Multiple copies of the Java process may be running at a time.
  • The processes consume messages off of 1 MapR-Streams topic.
  • The topic has 16 partitions.
  • I restart the processes once a day just to keep them running well.  I haven’t encountered a reason for this, I just think its a good habit.
  • Sometimes when the restart happens, a subset of the partitions, or potentially all of the partitions, hang!

What do you mean hang!?

Well, by hang, I mean that when the processes come back, some or all of the 16 partitions will not be available for retrieving data.  So, the topic will just keep piling up with more and more messages, and the Java applications will just sit there, doing nothing.

Given the system processes billions of messages a day, this is pretty problematic.

Before we can dig into why the hang happens, we need to understand some more technical detail about MapR-Streams.

MapR-Streams Basics

  • You create a “stream” in MapR.
  • A stream is a container for topics.
  • A topic is basically a message queue broken into partitions.
  • Each partition is a unit of parallelism… realistically, each one is basically a queue within the topic.
  • Each partition is replicated to multiple servers for high availability.
  • Messages within each partition are guaranteed to be in the order in which they were sent.

Consuming Messages

When you want to consume messages from a topic, usually the Kafka subscribe API is used (although there is an alternative/older assign API too).

  • When you subscribe to a topic, you provide a consumer-group name; it’s just an arbitrary string; you could call it “woohoo!” if you felt like it.
  • All consumers that subscribe to the same topic with the same consumer-group will “share” the partitions of that topic.  By share, I mean that consumers will all get a subset of the partitions.  A partition is only ever owned by one consumer a time.
  • MapR-Streams stores a long integer offset for each partition for each consumer group.  So, if you had 2 consumer-groups reading a topic with 4 partitions, MapR-Streams would be tracking 8 offsets.

As practical example… If I have 2 Java processes, each with 2 threads having 1 consumer each, then I have 4 consumers running.  If the consumers subscribe to a topic with 16 partitions, it is likely that each of the 4 consumes will end up with 4 partitions.

Again, only one consumer in a consumer-group can own a partition of a topic at a time.  So… if the consumer dies or is shut down, it will release the partition, and another consumer will automatically pick it up (which is totally awesome).  This means that you can have 4 servers reading the same topic in parallel and you can be confident that if one server dies, the other 3 will pick up its share of the load transparently without you doing anything on the client side.

So, what’s the bug?

Here’s the bug…  In all the versions before 6.1 of MapR, there is a race condition which is very rare but very bad.  If I shut down a process with multiple consumers, each of the consumers will shut down (which is fine).

Here’s the problem though.  There is a cluster-side bug with MapR-Streams where a partition owned by a consumer that is shutting down can actually get re-assigned to another consumer that is shutting down!

If that happens, then the second consumer can shut down without releasing the partitions.  As far as the cluster is concerned, the partition is now owned by a consumer that does not exist.  It will never notice this, the partition is basically not available for the consumer group ever again after this.

How can you verify this bug?

To determine if you have this bug, you can use the stream/assign/list REST API or CLI call against your MapR-Stream.  Given your consumer group name, it will tell you which processes on which hosts own which partitions.  Here’s a link for the documentation: https://mapr.com/docs/home/ReferenceGuide/stream_assign_list.html.

If you have this bug, you will notice that your partitions are owned by processes that no longer exist!

Workarounds

MapR identified this bug for me when I posted it to the community forums.  One of their representatives said it is fixed in 6.1 which is being released soon.  If you’re like me, you’re working somewhere where upgrading takes a lot of time (hooray!), so you’ll need some ways to mitigate the issue to use streams in production reliably.

  • MapR recommends restarting Warden on the MapR-Streams nodes.  This kind of clears out the in-memory state and stops the issue (apparently).   In our case, it seemed to help but it happened again a couple of weeks later.
  • I found adding shut-down hooks to our applications with large delays between each consumer shut-down helped a lot, but the problem did eventually happen again.
  • Next, I made each Java application just have one consumer, and I made sure our deployment automation left many minutes between restarting multiple copies of the application.  The applications read off of their one consumer as fast as possible, send the messages to a blocking queue, and then multiple threads process the messages off of that.  This surprisingly worked well because streams is so fast that I didn’t need multiple reader threads, even for loads of 200,000 messages a second.  As there is only one consumer per process and the processes are staggered a lot, there is not much of a chance to hit the error.  We haven’t seen it manifest again since this change (yet).
  • As a “emergency” option, I also used the Kafka APIs to make an application that can duplicate a consumer group; i.e. take its configuration and create it under another group name.  Then I can just tell my app to use that new group name and its fine/not-hung.  This is more of a “production failed! what does operations do!?” sort of thing though.

All done!

I hope you never hit this bug, but if you do, I hope this saves you tons of time! I suffered a lot with this one :).