Saturday, September 16, 2023

How to use SynchronousQueue in Java? Prouder Consumer Example

SynchronousQueue is a special kind of BlockingQueue in which each inserts operation must wait for a corresponding remove operation by another thread and vice versa. When you call to put() method on SynchronousQueue it blocks until another thread is there to take that element out of the Queue. Similarly, if a thread tries to remove an element and no element is currently present, that thread is blocked until another thread puts an element into the queue. You can correlate SynchronousQueue with athletes (threads) running with Olympic torch, they run with a torch (object need to be passed) and passes it to other athlete waiting at another end.

If you pay attention to the name, you will also understand that it is named SynchronousQueue with a reason, it passes data synchronously to other threads; it waits for the other party to take the data instead of just putting data and returning (asynchronous operation).

If you are familiar with CSP and Ada, then you know that synchronous queues are similar to rendezvous channels. They are well suited for hand-off designs, in which an object running in one thread must sync up with an object running in another thread in order to hand it some information, event, or task.

In earlier multi-threading tutorials we have learned how to solve the producer-consumer problem using wait and notify, and BlockingQueue, and in this tutorial, we will learn how to implement producer-consumer design pattern using an asynchronous queue.

This class also supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness property set to true grants threads accesses in FIFO order.

And, if you want to level up your Java multi-threading and concurrency skills then I also suggest you take a look at the Java Multithreading, Concurrency, and Performance Optimization course by Michael Pogrebinsy on Udemy. It's an advanced course to become an expert in Multithreading, concurrency, and Parallel programming in Java with a strong emphasis on high performance




Producer-Consumer Example using SynchronousQueue in Java

As I have said before, nothing is better than a producer-consumer problem to understand inter-thread communication in any programming language. In the Producer consumer problem, one thread act as a producer which produces an event or task, and the other thread act as a consumer. A shared buffer is used to transfer data from producer to consumer.

Difficulty in solving producer-consumer problems comes with edge cases like producer must wait if the buffer is full or consumer thread must wait if the buffer is empty. 

Later one was quite easy as blocking queue provides not only buffer to store data but also flow control to block thread calling put() method (PRODUCER) if the buffer is full, and blocking thread calling take() method (CONSUMER) if the buffer is empty.

In this tutorial, we will solve the same problem using SynchronousQueue, a special kind of concurrent collection that has zero capacity.

Producer Consumer Solution using SynchronousQueue in Java


In the following example, we have two threads which are named PRODUCER and CONSUMER (you should always name your threads, this is one of the best practices of writing concurrent applications).

The first thread, publishing cricket score, and the second thread is consuming it. Cricket scores are nothing but a String object here.

If you run the program as it is you won't notice anything different. In order to understand how SynchronousQueue works, and how it solves the producer-consumer problems, you either need to debug this program in Eclipse or just start the producer thread by commenting on the consumer.start(); If the consumer thread is not running then the producer will block at the queue.put(event); call, and you won't see [PRODUCER] published event: FOUR.

This happens because of the special behavior of SynchronousQueue, which guarantees that the thread inserting data will block until there is a thread to remove that data or vice-versa. You can test the other part of code by commenting producer.start(); and only starting consumer thread.

import java.util.concurrent.SynchronousQueue;

/**
 * Java Program to solve Producer Consumer problem using SynchronousQueue. A
 * call to put() will block until there is a corresponding thread to take() that
 * element.
 *
 * @author Javin Paul
 */
public class SynchronousQueueDemo{

    public static void main(String args[]) {

        final SynchronousQueue<String> queue = new SynchronousQueue<String>();

        Thread producer = new Thread("PRODUCER") {
            public void run() {
                String event = "FOUR";
                try {
                    queue.put(event); // thread will block here
                    System.out.printf("[%s] published event : %s %n", Thread
                            .currentThread().getName(), event);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };

        producer.start(); // starting publisher thread

        Thread consumer = new Thread("CONSUMER") {
            public void run() {
                try {
                    String event = queue.take(); // thread will block here
                    System.out.printf("[%s] consumed event : %s %n", Thread
                            .currentThread().getName(), event);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };

        consumer.start(); // starting consumer thread

    }

}

Output:
[CONSUMER] consumed event : FOUR 
[PRODUCER] published event : FOUR 

If you have sent the output carefully then you would have noticed that the order of events is reversed. Seems the [CONSUMER] thread is consuming data even before the [PRODUCER] thread has produced it.

This happens because by default SynchronousQueue doesn't guarantee any order, but it has a fairness policy, which is set to true allows access to threads in FIFO order. You can enable this fairness policy by passing true to the overloaded constructor of SynchronousQueue i.e. new SynchronousQueue(boolean fair).



Things to remember about SynchronousQueue in Java

Here are some of the important properties of this special blocking queue in Java. It's very useful to transfer data from one thread to another thread synchronously. It doesn't have any capacity and blocks until there is a thread on the other end.

1. SynchronousQueue blocks until another thread is ready to take the element, one thread is trying to put.

2. SynchronousQueue has zero capacity.

3. SynchronousQueue is used to implement the queuing strategy of direct hand-off, where thread hands-off to waiting thread, else creates a new one is allowed, else task rejected.

4. This queue does not permit null elements, adding null elements will result in NullPointerException.

5. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection.

6. You cannot peek at a synchronous queue because an element is only present when you try to remove it; Similarly, you cannot insert an element (using any method) unless another thread is trying to remove it.

7. You cannot iterate over SynchronousQueue as there is nothing to iterate.

8. A SynchronousQueue constructed with fairness policy set to true grants threads access in FIFO order.

That's all about SynchronousQueue in Java. We have seen some special properties of this special concurrent collection and learned how to solve the classical producer-consumer problem using SynchronousQueue in Java.  

By the way, calling it a Queue is a bit confusing because it doesn't have any capacity to hold your element. Call to put() operation will not complete until there is a thread that is calling take() operation. 

It's better to be a rendezvous point between threads to share objects. In other words, it is a utility to synchronously share data between two threads in Java, probably a safer alternative of wait and notify methods.

Other Java Concurrency Articles you may like
  • The  Java Developer RoadMap (roadmap)
  • How to use Executor Framework in Java? (Executor example)
  • 10 Java Multithreading and Concurrency Best Practices (article)
  • Top 50 Multithreading and Concurrency Questions in Java (questions)
  • How to use Futures in Java? (futures example)
  • 10 Courses to learn Java for Beginners (courses)
  • Difference between CyclicBarrier and CountDownLatch in Java? (answer)
  • How to avoid deadlock in Java? (answer)
  • Difference between volatile, atomic, and synchronized (difference)
  • How to do inter-thread communication in Java using wait-notify? (answer)
  • What is happens before in Java? (Example)
  • 5 Essential Skills to Crack Java Interviews (skills)
  • How does Exchanger works in Java Multithreading (tutorial)
  • Top 5 Books to Master Concurrency in Java (books)

Thanks for reading this article so far. If you like this Java Multithreading and Concurrency tutorial about SynchronousQueue and Producer-Consumer Pattern then please share it with your friends and colleagues. If you have any questions or feedback then please drop a note.

P. S. - If you are new to the Java Programming arena and looking for some free courses to kick-start your Java programmer journey then you can also check out this list of top 10 free Java courses for beginners to start with.

And, lastly one question for you? What is the best way to solve producer consumer problem in Java? wait and notify method, BlockingQueue or SynchronousQueue? If you know the answer let us know in comments. 

11 comments:

  1. Excellent. You explain things the best understandable way. Code is too neat.
    Thanks

    ReplyDelete
  2. First of all very nice article.
    I have a query on this. In case of one producer and multiple consumer scenario, how can i ensure that only one of the consumer consumes the message and not all of them.
    Thanks

    ReplyDelete
  3. @Gaurav by removing that element from Queue, but in case of multiple consumer having separate queue for each consumer is better design because it will reduce contention and prevent performance penalty due to locking and synchronization.

    ReplyDelete
  4. Thanks Javin for reply. I am using the Synchronous Queue and by testing I realised only one consumer is able to consume the message. So that's what I was looking for.
    Earlier I thought of having separate queue for each consumer, however that will defeat the purpose I want to achieve. Producer in my case is producing about million message hour and I want to process those message in parallel. For this purpose, I need multiple worker/consumers and at the same time I want to make sure no two consumer receive the same messages.
    I hope I am making sense here.

    ReplyDelete
  5. Given your producer is very fast, there is a risk of OutOfMemoryException in case of using blocking queue if consumer happened to be slow. I would suggest using a 2 tier approach where producer insert in a BlockingQueue and one Consumer takes and distribute messages into different queue, then you can use thread pool executor to assign each thread their own bounded queue. The intermediate thread will be very fast because it will just taking element and putting into right queue so that BlockingQueue will not grow, and if your consumer is fast enough then individual queues will also be empty by then. I hope this helps.

    ReplyDelete
  6. Thanks Javin. That's really a good suggestion. I could try the two tier approach. I think i need to develop some sort of fairness logic for the first consumer who distributes the messages to different queues such that second tier queues have fair distribution of messages.
    Also Can you throw some light on how to do deal with the clean up operation in case of producer has died. I guess I need to kill the single consumer thread in the first tier as it is sharing the blocking queue reference with the producer.

    ReplyDelete
  7. Hello Gaurav, you can develop that logic based upon type of message, or content of message if no two message is related to other, otherwise you need to ensure that some messages are processed by one particular thread and should be put in one queue. For example in Electronic Trading System, we usually process all messages for one symbol in one queue to avoid processing Cancel/Modification request before New Order. Regarding what happens if producer die, If you use BlockingQueue consumer will wait until your Producer comes up again, in fact that's the main reason why you use BlockingQueue.

    ReplyDelete
  8. How are these two threads not leading to deadlock? I mean there's nothing to put and remove from the SynchronousQueue as it has 0 capacity. Shouldn't it lead to a deadlock ?

    ReplyDelete
  9. @Sunny: When a thread put the element on Synchronous Queue, internally that thread goes to wait state on SynchronousQueue Object and when another thread take the element from the queue, it internally calls notify to Java SynchronousQueue
    Object leading to one waiting thread thread to wake up. So it will not lead to deadlock. You can also think Synchronous Queue as advanced way for Object wait() and notify() method. SynchronousQueue Object smartly uses Object's monitor lock feature to achieve this.

    ReplyDelete
  10. Very helpful article. Thanks a tonn.

    ReplyDelete
  11. Hi, is it possible to have multiple producers and one consumer per SyncronousBlockingQueue or does it work only for one producer and consumer?

    ReplyDelete