I was writing another blog post focusing on a hanging error in MapR-Streams, and I ended up going into some good detail on the basic architecture of the product. So, I though it was worth breaking out into its own blog.
What is MapR-Streams?
MapR-Streams is basically MapR’s version of Apache Kafka. It is built into their cluster automatically though; it is actually a part of their core file system (along with MapR-DB).
Apache Kafka and MapR-Streams are basically a newer form of message queuing service that is particularly well-geared for big data and extremely high loads (read: millions of messages a per second in many common use cases).
As we dive into more detail, you can refer to this diagram to get a mental picture of how the product is used. It was taken from a MapR presentation which can be found right here.
MapR-Streams Basics
- You create a “stream” in MapR.
- A stream is a container for topics.
- A topic is basically a message queue broken into partitions.
- Each partition is a unit of parallelism… realistically, each one is basically a queue within the topic.
- Each partition is replicated to multiple servers for high availability.
- Messages within each partition are guaranteed to be in the order in which they were sent.
Programming Languages
MapR-Streams can be used with a variety of languages including Java, Python, and C++. It also has a RESTFUL interface which would make it available from pretty much anywhere.
Writing Messages
You use the Kafka APIs to send messages to MapR-Streams topics. There is some one-off code to configure the producer; but after that, it’s pretty must just 1-2 lines of code to send a message from then on.
When sending the messages, you can either:
- Target the topic with no partitions specified. This will cause messages you send to be distributed across all partitions in the topic.
- Target specific partitions in a topic to send to. For example, I could make sure that all metrics messages for Server A’s CPU metric went to partition #3. This way, they would be guaranteed to be in order when read by any consumers.
Creating a Kafka Producer:
public static void configureProducer(String[] args) { Properties props = new Properties(); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer(props); }
Sending a Sample Message:
ProducerRecord rec = new ProducerRecord( topic, messageText); producer.send(rec);
You can find a full producer example from MapR here (I just referenced code from it): https://mapr.com/docs/52/MapR_Streams/code_for_the_sample_java_producer.html.
Consuming Messages
When you want to consume messages from a topic, usually the Kafka subscribe API is used (although there is an alternative/older assign API too).
- When you subscribe to a topic, you provide a consumer-group name; it’s just an arbitrary string; you could call it “woohoo!” if you felt like it.
- All consumers that subscribe to the same topic with the same consumer-group will “share” the partitions of that topic. By share, I mean that consumers will all get a subset of the partitions. A partition is only ever owned by one consumer a time.
- MapR-Streams stores a long integer offset for each partition for each consumer group. So, if you had 2 consumer-groups reading a topic with 4 partitions, MapR-Streams would be tracking 8 offsets.
As practical example… If I have 2 Java processes, each with 2 threads having 1 consumer each, then I have 4 consumers running. If the consumers subscribe to a topic with 16 partitions, it is likely that each of the 4 consumes will end up with 4 partitions.
Again, only one consumer in a consumer-group can own a partition of a topic at a time. So… if the consumer dies or is shut down, it will release the partition, and another consumer will automatically pick it up (which is totally awesome). This means that you can have 4 servers reading the same topic in parallel and you can be confident that if one server dies, the other 3 will pick up its share of the load transparently without you doing anything on the client side.
Creating a Kafka Consmer:
public static void configureConsumer(String[] args) { Properties props = new Properties(); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer(props); }
Consuming Messages:
List topics = new ArrayList(); topics.add(topic); consumer.subscribe(topics); //You should wrap this line in a loop so it happens forever, or until you //have met some terminal condition. ConsumerRecords consumerRecords = consumer.poll( pollTimeout); Iterator iterator = consumerRecords.iterator(); while (iterator.hasNext()) { ConsumerRecord record = iterator.next(); System.out.println((" Consumed Record: " + record.toString())); }
You can find a full consumer example from MapR here (I just referenced code from it): https://mapr.com/docs/52/MapR_Streams/code_for_the_sample_java_consumer.html.