Hazelcast – IQueue

The java.util.concurrent.BlockingQueue provides an interface that supports threads in a JVM to produce and consume messages at different rates. The producer blocks are based on available capacity and the consumer blocks for the element to be available in the queue.

Similarly, IQueue extends the BlockingQueue and makes it a distributed version of it. It provides similar functions: put, take, etc.

One important point to note about IQueue is that, unlike other collections, data is not partitioned. All the data is stored/present on a single JVM. Data is still accessible to all the JVMs, but the queue cannot be scaled beyond a single machine/JVM. If the number of elements increases beyond available memory, an OutOfMemoryException is thrown.

The queue supports synchronous backup as well as asynchronous backup. Synchronous backup ensures that even if the JVM holding the queue goes down, all the elements would be preserved and available from the backup.

Let’s look at an example of the useful functions.

Adding elements and reading elements

Let’s execute the following code on 3 JVMs. The producer code on one and 2 consumers code on others.

Example

The first piece is the producer code which creates a queue and adds items to it.

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   // create a queue
   IQueue<String> hzFruits = hazelcast.getQueue("fruits");
   String[] fruits = {"Mango", "Apple", "Banana", "Watermelon"};
   for (String fruit : fruits) {
      System.out.println("Producing: " + fruit);
      Thread.sleep(1000);
   }
   System.exit(0);
}

The second piece is of consumer code which reads the elements.

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   IQueue<String> hzFruits = hazelcast.getQueue("fruits");
   while(!hzFruits.isEmpty()) {
   System.out.println("Consuming: " + hzFruits.take());
      Thread.sleep(2000);
   }
   System.exit(0);
}

Output

The output for the code for the producer shows that it is not able to add an existing element.

Producing Mango
Producing Apple
Producing Banana
Producing Watermelon

The output for the code for the first consumer shows that it consumes some part of the data.

Consuming Mango
Consuming Banana

The output for the code for the second consumer shows that it consumes the other part of the data −

Consuming Apple
Consuming Watermelon

Useful Methods

Sr.NoFunction Name & Description
1add(Type element) Add an element to the list
2remove(Type element) Remove an element from the list
3poll() Return the head of the queue or returns NULL if the queue is empty
4take() Return the head of the queue or wait till the element becomes available
5size() Return the count of elements in the list
6contains(Type element) Return if the element is present
7getPartitionKey() Return the partition key which holds the list
6addItemListener(ItemListener<Type>listener, value) Notifies the subscriber of an element being removed/added/modified in the list.

Leave a Reply