Friday, September 15, 2023

Producer Consumer Design Pattern with Blocking Queue Example in Java

The Producer-Consumer Design pattern is a classic concurrency or threading pattern which reduces coupling between Producer and Consumer by separating Identification of work with Execution of Work. In producer-consumer design pattern, a shared queue is used to control the flow and this separation allows you to code producer and consumer separately. It also addresses the issue of different timing requirements to produce items or consuming items. by using producer-consumer patterns both Producer and Consumer Thread can work with different speeds.


In this article, we will see What is producer-consumer problem which is a very popular multi-threading interview question, How to solve the producer-consumer problem using Blocking Queue and the Benefits of using the Producer-Consumer design pattern.

 

Real-World Example of Producer-Consumer Design Pattern

Producer consumer pattern is everywhere in real life and depicts coordination and collaboration. Like one person is preparing food (Producer) while the other one is serving food (Consumer), both will use a shared table for putting food plates and taking food plates. 

The product which is the person preparing food will wait if the table is full and Consumer (Person who is serving food) will wait if the table is empty. the table is a shared object here. On Java library, the Executor framework itself implement Producer Consumer design pattern be separating responsibility of addition and execution of the task.




The benefit of the Producer-Consumer Pattern

Its indeed a useful design pattern and used most commonly while writing multi-threaded or concurrent code. here
is few of its benefit:

1) Producer Consumer Pattern simple development. you can Code Producer and Consumer independently and Concurrently, they just need to know shared object.

2) Producer doesn't need to know about who is consumer or how many consumers are there. Same is true with Consumer.

3) Producer and Consumer can work with different speed. There is no risk of Consumer consuming half-baked item.
In fact by monitoring consumer speed one can introduce more consumer for better utilization.

4) Separating producer and Consumer functionality result in more clean, readable and manageable code.

Producer Consumer Design Pattern with Blocking Queue Example in Java


 

Producer-Consumer Problem in Multi-threading

Producer Consumer design pattern BlockingQueue example JavaProducer-Consumer Problem is also a popular java interview question where interviewer ask to implement producer consumer design pattern so that Producer should wait if Queue or bucket is full and Consumer should wait if queue orbucket is empty. 

This problem can be implemented or solved by different ways in Java, classical way is using wait and notify method to communicate between Producer and Consumer thread and blocking each of them on individual condition like full queue and empty queue.

With the introduction of BlockingQueue Data Structure in Java 5 Its now much simpler because BlockingQueue provides this control implicitly by introducing blocking methods put() and take().

Now you don't require to use wait and notify to communicate between Producer and Consumer. BlockingQueue put() method will block if Queue is full in case of Bounded Queue and take() will block if Queue is empty. 

In the next section, we will see a code example of the Producer-Consumer design pattern.

 

Using Blocking Queue to implement Producer Consumer Pattern

BlockingQueue amazingly simplifies implementation of Producer-Consumer design pattern by providing outofbox support of blocking on put() and take(). The developer doesn't need to write confusing and critical piece of wait-notify code to implement communication. BlockingQuue is an interface and Java 5 provides different implantation like ArrayBlockingQueue and LinkedBlockingQueue , both implement FIFO order or elements, while ArrayLinkedQueue is bounded in nature LinkedBlockingQueue is optionally bounded. 

here is a complete code example of Producer Consumer pattern with BlockingQueue. Compare it with classic wait notify code, its much simpler and easy to understand.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){
  
     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();
 
     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }
 
}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
  
    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
  
  
}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9

You see Producer Thread produced number and Consumer thread consumes it in FIFO order because the blocking queue allows elements to be accessed in FIFO.

That’s all on How to use a Blocking Queue to solve Producer Consumer problem or example of Producer consumer design pattern. I am sure its much better than wait notify example but be prepare with both if you are going for any Java Interview as Interview may ask you both way.


Other Java threading tutorial you may like:

And lastly,  What is your favorite way to implement Producer Consumer pattern in Java? Wait and notify method or BlockingQueue in Java?

26 comments:

  1. Recently I got this question in interview with different scenario. How to resolve the producer and consumer problem so that my CPU cycle can be used to 100%. For ex if producer is producing less and consumer is consuming fast then your CPU cycle is getting wasted which is associated with cost. So what would be strategy to resolve this. Any suggestion?

    ReplyDelete
  2. Well i see something is missing, how to define the size of queue ?

    ReplyDelete
  3. Hi Javin, ..gr8 article few things that I want to add in this is...

    BlockingQueue Code Example

    Here is an example of how to use a BlockingQueue. The example uses the ArrayBlockingQueue implementation of the BlockingQueue interface.

    First, the BlockingQueueExample class which starts a Producer and a Consumer in separate threads. The Producer inserts strings into a shared BlockingQueue, and the Consumer takes them out.

    public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

    BlockingQueue queue = new ArrayBlockingQueue(1024);

    Producer producer = new Producer(queue);
    Consumer consumer = new Consumer(queue);

    new Thread(producer).start();
    new Thread(consumer).start();

    Thread.sleep(4000);
    }
    }
    Here is the Producer class. Notice how it sleeps a second between each put() call. This will cause the Consumer to block, while waiting for objects in the queue.

    public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
    this.queue = queue;
    }

    public void run() {
    try {
    queue.put("1");
    Thread.sleep(1000);
    queue.put("2");
    Thread.sleep(1000);
    queue.put("3");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    Here is the Consumer class. It just takes out the objects from the queue, and prints them to System.out.

    public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
    this.queue = queue;
    }

    public void run() {
    try {
    System.out.println(queue.take());
    System.out.println(queue.take());
    System.out.println(queue.take());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    ReplyDelete
  4. The above example is good but too simple. Would you minding providing a example with multiple producers and consumers and stopping the program once producers have done their jobs?

    ReplyDelete
  5. I tried out following implementation of Producer-Consumer and it doesn't list out all the elements from the list. Few of them are missing.

    I wanted to have one producer & multiple consumers and have the consumers take data out of the list in round robin fashion, but not working as expected. Would you be able to point me out the error in the code? Thanks.

    public class ProdConController {
    public static void main(String[] args) throws Exception {
    List stringList = new ArrayList();
    SignalObject signalObject = new SignalObject();
    ProdWorker prodWorker = new ProdWorker(stringList, signalObject);
    ConWorker conWorker1 = new ConWorker(stringList, signalObject, "worker1");
    ConWorker conWorker2 = new ConWorker(stringList, signalObject, "worker2");
    Thread prodThrd = new Thread(prodWorker);
    Thread conThrd1 = new Thread(conWorker1);
    Thread conThrd2 = new Thread(conWorker2);
    prodThrd.start();
    conThrd1.start();
    conThrd2.start();

    prodThrd.join();
    conThrd1.join();
    conThrd2.join();
    }
    }

    public class SignalObject {
    private boolean isDataAvailable = false;

    public synchronized boolean isDataAvailable() {
    return isDataAvailable;
    }

    public synchronized void setDataAvailable(boolean dataAvailable) {
    isDataAvailable = dataAvailable;
    }
    }

    public class ProdWorker implements Runnable {
    private List stringList;
    private SignalObject signalObject;

    public ProdWorker(List stringList, SignalObject signalObject) {
    this.stringList = stringList;
    this.signalObject = signalObject;
    }

    @Override
    public void run() {
    for (int i = 1; i <= 10; i++) {
    System.out.println("Adding " + i + " to queue");
    stringList.add(String.valueOf(i));
    }

    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    synchronized (signalObject) {
    signalObject.setDataAvailable(true);
    signalObject.notifyAll();
    }
    }
    }

    public class ConWorker implements Runnable {
    private final List stringList;
    private String name;
    private SignalObject signalObject;

    public ConWorker(List stringList, SignalObject signalObject, String name) {
    this.stringList = stringList;
    this.name = name;
    this.signalObject = signalObject;
    }

    @Override
    public void run() {
    while (!signalObject.isDataAvailable()) {
    try {
    synchronized (signalObject) {
    signalObject.wait();
    }
    } catch (InterruptedException ex) {
    System.out.println("Received interrupt");
    ex.printStackTrace();
    }
    }

    synchronized (stringList) {
    for (int i = 0; i < stringList.size(); i++) {
    System.out.println("Received:" + stringList.get(i) + " by worker:" + name);
    stringList.remove(i);
    // if (i % 2 == 0 && this.name.equals("worker1")) {
    // System.out.println("Received:" + stringList.get(i) + " by worker:" + name);
    // }
    // else {
    // System.out.println("Received:" + stringList.get(i) + " by worker:" + name);
    // }
    }
    }

    System.out.println("Finished consuming all data");
    }
    }

    ReplyDelete
  6. Recently I got this question in interview with different scenario. How to resolve the producer and consumer problem so that my CPU cycle can be used to 100%. For ex if producer is producing less and consumer is consuming fast then your CPU cycle is getting wasted which is associated with cost. So what would be strategy to resolve this. Any suggestion?
    ====================
    (Count of Producers == Count of Consumers ) >= max CPU threads (cores / hyper threads) While consumers are waiting, producers will allocate all CPU resources and vice versa. Also the queue size >= max CPU threads.

    ReplyDelete
  7. Can you please share solution of Producer Consumer problem using Semaphore? I know it can be solved using multiple way including BlockingQueue, wait and notify as shown above, but I am really interested in using Semaphore. Thanks

    ReplyDelete
  8. package Thread;

    import java.util.concurrent.Semaphore;

    class SharedResource
    {
    int n;
    static Semaphore semCons = new Semaphore(0);
    static Semaphore semProd = new Semaphore(1);

    void put(int n)
    {
    try {
    semProd.acquire();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    this.n=n;
    System.out.println("Put : " + n);
    semCons.release();
    }

    void get()
    {
    try {
    semCons.acquire();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    System.out.println("Got : "+n);
    semProd.release();
    }
    }

    class Consumer implements Runnable
    {
    SharedResource sr;
    public Consumer(SharedResource sr)
    {
    this.sr = sr;
    new Thread(this, "Consumer").start();
    }
    @Override
    public void run()
    {
    for(int i=0;i<10;i++)
    {
    sr.get();
    }
    }
    }

    class Producer implements Runnable
    {
    SharedResource sr;
    public Producer(SharedResource sr)
    {
    this.sr= sr;
    new Thread(this,"Producer").start();
    }

    @Override
    public void run()
    {
    for(int i=0;i<10;i++)
    {
    sr.put(i);
    }
    }
    }
    public class ProducerConsumer {
    public static void main(String[] args) {
    SharedResource sr = new SharedResource();

    new Consumer(sr);
    new Producer(sr);

    }
    }

    ReplyDelete
  9. seems like your method ain't right.
    how about this son:
    import java.util.concurrent.*;

    class Producer implements Runnable {

    private BlockingQueue queue;

    Producer(BlockingQueue q){
    this.queue=q;
    }

    public void run() {
    //produce messages
    for(int i=1; i<11; i++){
    try {
    Thread.sleep(i);
    queue.put(i);
    System.out.println("Produced "+i);
    } catch (InterruptedException e) {

    }
    }

    }

    }


    class Consumer implements Runnable{

    private BlockingQueue queue;

    Consumer(BlockingQueue q){
    this.queue=q;
    }


    public void run() {
    try{
    int i;
    //consuming messages until exit message is received
    while(true){
    Thread.sleep(10);
    i = queue.take();
    System.out.println("Consumed- "+i);
    if(i==10) break;
    }
    }catch(InterruptedException e) {

    }
    }
    }

    public class ProducerConsumerService {

    public static void main(String[] args) {
    //Creating BlockingQueue of size 10
    BlockingQueue queue = new ArrayBlockingQueue<>(10);
    Producer producer = new Producer(queue);
    Consumer consumer = new Consumer(queue);
    //starting producer to produce messages in queue
    new Thread(producer).start();
    //starting consumer to consume messages from queue
    new Thread(consumer).start();
    System.out.println("Producer and Consumer has been started");
    }

    }

    ReplyDelete
  10. @Gaurav, what was the problem and how did you fixed it?

    ReplyDelete
  11. Javin,

    In your example the program will never exit and @Gaurav is trying to break the program after 10 iteration.
    please let us know your thoughts.

    Thanks,

    ReplyDelete
  12. With the given code, the Consumer seems to be in an infinite loop

    ReplyDelete
  13. @Anonymous, yes, consumer is running in infinite loop, as is mostly case in real world and I kept producer down to 10 messages just for demo.

    ReplyDelete
  14. it give wrong result .If you print vector to know status of vector each time after consume or produce method then we can see than vector grow above given limit 10.

    ReplyDelete
  15. Hello
    In the above example in the producer code we have to r put the following after put otherwise ordering is not guaranteed.

    Thread.sleep(1000);

    ReplyDelete
  16. The simplest way to solve the producer consumer problem is by using Blocking-queue, as explained in this tutorial, but if you really want to understand the core concept behind producer consumer pattern i.e. inter-thread communication in Java, then you must solve this problem using wait and notify method in Java. when you solve it hard way, you learn more.

    ReplyDelete
  17. Can someone solve this?

    Consider a single producer and 10 consumers as threads. The consumers has to subscribe (Ad themselves) to the producer in-order to get any messages that produces sends.

    The expected functionality is that, the producer should add a message to a data structure/collection (you choose of your choice) every 2 minutes and notify all consumers. When the consumers receives a notification, it consumes the message, write the details to a common file.

    Message format in that file should be ,

    [Consumer Name][<>][Message][Message length]

    When you are sure that all the consumers consumed the new message, the message has to be removed from the data structure.

    The program should redirect the log messages (util logging) to a log file (say prod-consumer.log) too other than the common file where you write details and log the following actions
    When procedure sends a message and notify all consumers
    Consumer consumed the message
    Number of consumers consumed the new message
    When message is removed from the data structure/collection

    Please note that you will have to handle parallel updates (by consumers) to the same file and also provide good error handling part in-case of any issues. Ensure that you log errors to the log file

    ReplyDelete
  18. An simple example with two consumers.
    http://simpleapphub.blogspot.in/2016/02/simple-producer-consumer-framework-in.html

    ReplyDelete

  19. I am trying to implement producer consumer problem using wait and notify. The problem is that, the producer is first producing everything and then consumer starts consuming. I am new to multithreaded environment.

    Please find below my code:

    package javaExamples;

    import java.util.ArrayList;
    import java.util.List;

    public class producerConsumerWaitNotifyMain {

    public static void main(String[] args) {
    List list = new ArrayList();
    int size = 50;
    ProducerWait prod = new ProducerWait(list, size);
    ConsumerWait cons = new ConsumerWait(list, size);
    Thread pro = new Thread(prod);
    Thread con = new Thread(cons);
    pro.start();
    con.start();
    }

    }

    *****************************

    package javaExamples;

    import java.util.List;

    public class ProducerWait implements Runnable{
    List list;
    int size;

    public ProducerWait(List list, int size) {
    this.list = list;
    this.size = size;
    }

    @Override
    public void run() {
    while(true){
    synchronized(list){

    if(list.size() == size){
    try {
    System.out.println("Producer waiting");
    System.out.println(list);
    list.wait();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    else{
    System.out.println("producer will produce");
    list.add(1);
    System.out.println(list);
    list.notifyAll();
    }
    }
    }

    }

    }

    ***********************************


    package javaExamples;

    import java.util.List;

    public class ConsumerWait implements Runnable{
    List list;
    int size;

    public ConsumerWait(List list, int size) {
    this.list = list;
    this.size = size;
    }

    @Override
    public void run() {
    while(true){
    synchronized(list){

    if(list.isEmpty()){
    try {
    System.out.println("Consumer waiting");
    System.out.println(list);
    list.wait();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    else{
    System.out.println("consumer will consume");
    list.remove(0);
    System.out.println(list);
    list.notifyAll();
    }
    }
    }

    }

    }

    ReplyDelete
  20. @New, your syntax of using wait and notify is incorrect. You should check the condition like queue.isEmpty() or queue.size() == size on while loop, not in if block due to reasons mentioned here. You can also see my implementation of producer consumer using wait notify here.

    ReplyDelete
  21. @Javin Paul: Thanks, it worked.

    ReplyDelete
  22. Hi Javin, Thank you very much for this wonderful article and explaining it in a simple way.

    I just want to point out that there is no "ArrayLinkedQueue" in java and it should be replaced with "ArrayListQueue", please see "ArrayLinkedQueue is bounded in nature" text in an article.

    /Chirag

    ReplyDelete
  23. Hi Javin, Thank you very much for this wonderful article and explaining it in a simple way.

    I just want to point out that there is no "ArrayLinkedQueue" in java and it should be replaced with "ArrayBlockingQueue", please see "ArrayLinkedQueue is bounded in nature" text in an article.

    /Chirag

    ReplyDelete
  24. //package threadexample;
    class ProdConsu implements Runnable{
    public static int count=0;
    Thread t1=new Thread(this);
    Thread t2=new Thread(this);
    Thread t3=new Thread(this);
    int i=0;
    public ProdConsu ()
    {
    t1.start();
    t2.start();
    t3.start();
    }
    public void run(){
    try{
    //ProdConsu tar=new ProdConsu();
    synchronized (this){
    this.wait(1000);
    for(i=1;i<=5;i++){
    produce();

    consume();
    }
    }
    }catch(Exception e){e.printStackTrace();}
    }
    synchronized public void produce() throws Exception{
    //if(i<10)
    count++;
    t1.sleep(1000);
    System.out.println("prouduce is producing "+i);
    }
    synchronized public void consume(){
    //public void consume(){
    //if(count>0)
    count--;
    System.out.println("Consumer is consuming "+i);
    }
    }
    public class ProdCons extends ProdConsu implements Runnable{
    public static void main(String[] args) {
    ProdConsu p=new ProdConsu();

    }

    }

    ReplyDelete
  25. Hello @Amol, could you please care to explain your example for beginners, and what is difference between original solution and this one? Cheers

    ReplyDelete
  26. Hi,
    Is this implementation of producer consumer is thread safe?
    What if there are multiple producer and multiple consumer and if we are using linkedblockingqueue for sharing of the data , will it not cause data consistency issue ?

    ReplyDelete