Hey there,
Been working on an assignment whereby I have to create a bounded buffer in Java using threads. I'll briefly state the main specs for the assignment instead of pasting the whole thing:
- It operates in a FIFO manner (First In First Out)
- We cannot use semaphores & can only use synchronized, wait, notifyAll, etc
- Updates to the buffer must be mutually exclusive
- It must contain the following classes : BoundedBuffer, Producer, Consumer, Watcher
I have a lot of it attempted but I'm having a few issues with the data being produced. I find that the updated status on the buffer data being produced by the Watcher thread seems to be out of sync or inaccurate in regards to what is actually happening. Everything seems to be a bit messy. Also, I'm not sure if my modulo arithmetic is doing the right job to ensure the pointers are being wrapped. Can anybody tell me if I'm doing things the right way and if not, can even nudge me in the right direction I'll post my code below. I understand that it's a lot of code to look through but i'd really be grateful with any help.
BoundedBuffer:
public class BoundedBuffer { // Variables private int nextIn, nextOut, size, occupied, ins, outs; private boolean dataAvailable, roomAvailable; private int[] buffer; // Constructor public BoundedBuffer(int size) { this.size = size; buffer = new int[size]; // Set buffer size // Initialize variables nextIn = nextOut = occupied = ins = outs = 0; dataAvailable = false; roomAvailable = true; } public synchronized void insertItem(int item) { while (roomAvailable == false || occupied == size) { try { wait(); } catch (InterruptedException e) { System.out.println("Error inserting item."); } } dataAvailable = true; notifyAll(); buffer[nextIn] = item; // For debugging purposes System.out.println("Item: " + item + " inserted."); nextIn = (nextIn + 1) % size; occupied++; ins++; } public synchronized int removeItem() { while (dataAvailable == false || occupied == 0) { try { wait(); } catch (InterruptedException e) { System.out.println("Error removing item."); } } roomAvailable = true; notifyAll(); int item = buffer[nextOut]; // For debugging purposes System.out.println("Item: " + item + " removed."); occupied--; nextOut = (nextOut + 1) % size; outs++; return item; } public void printStatus() { System.out.println("Items inserted : " + ins); System.out.println("Items removed : " + outs); System.out.println("Spaces occupied : " + occupied); System.out.println("Delta : " + (ins - outs - occupied)); System.out.println(); } }
Producer:
public class Producer extends Thread { private BoundedBuffer buffer; private int item; public Producer(BoundedBuffer buffer) { this.buffer = buffer; } public void run() { try { while (true) { item = (int) (Math.random() * 100); buffer.insertItem(item); // Sleep for 2 seconds sleep(2000); } } catch (InterruptedException e) { System.out.println("Error with Producer Thread."); } } }
Consumer:
public class Consumer extends Thread { private BoundedBuffer buffer; public Consumer(BoundedBuffer buffer) { this.buffer = buffer; } public void run() { try { while (true) { buffer.removeItem(); // Sleep for 2 seconds sleep(2000); } } catch (InterruptedException e) { System.out.println("Error with Consumer Thread."); } } }
Watcher:
public class Watcher extends Thread { private BoundedBuffer buffer; public Watcher(BoundedBuffer buffer) { this.buffer = buffer; } public void run() { try { while (true) { buffer.printStatus(); sleep(2000); } } catch (InterruptedException e) { System.out.println("Error in Watcher thread."); } } }
Assignment1:
public class Assignment1 { public static void main(String[] args) { BoundedBuffer buffer = new BoundedBuffer(20); Producer prod = new Producer(buffer); Consumer cons = new Consumer(buffer); Watcher watch = new Watcher(buffer); prod.start(); cons.start(); watch.start(); } }