Producer-Consumer programowanie wielowątkowe - program nie może się zakończyć.

0

Cześć

Próbuje zaimplementować prostą relacje gdzie Producer tworzy wartość int a Consumer je pobiera i wszystko ma działać współbieżnie.
Program niby wykonuje się poprawnie natomiast nie wiem jak go zakończyć. Gdy producer wyprodukuje już wszystkie wartość i kończy swoją metode run() consumer nadal oczekuje na nowe wartości. Ktoś byłby miły pomóc to poprawnie zaimplementować i ew. wskazać jakie błędy popęłniłem?

class Buffor {
    
    private int[] buff;
    private int readIdx = 0;
    private int writeIdx = 0;
    private int totalCap = 0;
    
    { 
      buff = new int[5];
      Arrays.fill(buff,-1);
    }
    
    public void push(int val) {
        try {
            synchronized(this) {
                while(totalCap == buff.length) {
                    System.out.println("przepelnienie - czekanie");
                    this.wait();
                }
      
                buff[writeIdx % buff.length] = val;
                writeIdx++;
                totalCap++;
                this.notify();
            }
        }
        catch(InterruptedException e) {
            System.out.println("przerwanie [push]: " + e);
        }
    }
    
     public int pop() {
        try {
            synchronized(this) {        //proba pozyskania blokady monitora obiektu
                while(totalCap == 0) {
                    System.out.println("pusto - czekanie");
                    this.wait();
                }
      
                int val = buff[readIdx % buff.length];
                buff[readIdx % buff.length] = -1;
                readIdx++;
                totalCap--;
                this.notify();
                return val;
            }
        }
        catch(InterruptedException e) {
            System.out.println("przerwanie [push]: " + e);
        }
        return -666;
    }
    
    public synchronized void show() {
        System.out.println(Arrays.toString(buff));
    }
}










class Producer implements Runnable {
    int[] products;
    private Buffor buff;
    private final int numOfProd = 10;
    private int produceCounter = 0;
    public Producer(Buffor buff) {
        this.buff = buff;
        SecureRandom rand = new SecureRandom();
        products = (int[])IntStream.iterate(0, x->x+1).limit(numOfProd).toArray();
        
        System.out.println("products: " + Arrays.toString(products));
    }
    
    @Override public void run() {
        for(int i=0; i<numOfProd; ++i) {
            buff.push(products[i]);
            produceCounter++;
            buff.show();
        }
        System.out.println("Koniec producer!!!!!!!");
    }
       
}






class Consumer implements Runnable {
     private Buffor buff;
     public Consumer(Buffor buff) {
         this.buff = buff;
     }
    
     @Override public void run() {
         while(!Thread.interrupted()) {
            int val = buff.pop();
            System.out.println("consume val:" + val);
            buff.show();
         }
     }
}







public class ProducentConsumentNotifyWait {
    public static void main(String[] args) throws Exception {
       
        
       Buffor buff = new Buffor();
       Producer prod = new Producer(buff);
       Consumer con = new Consumer(buff);
       
       ExecutorService exec = Executors.newFixedThreadPool(2);
       
       Future<?> f1 = exec.submit(prod);
       Future<?> f2 = exec.submit(con);
       
       if(f1.isDone()) {
          System.out.println("Robota zrobiona");
          f2.cancel(true);
       }
      
    }
    
}
0

Tak na szybko to myślę, że

f1.isDone()

w twoim ifie będzie false, bo to nie czeka, aż task się zrobi, tylko w danym momencie zwraca false/true w zależności czy task się już wykonał czy nie.

Może jakbyś zamiast

if(f1.isDone()) {
          System.out.println("Robota zrobiona");
          f2.cancel(true);
       }

dał

f1.get();
System.out.println("Robota zrobiona");
f2.cancel(true);

?

1

Zrób pop i push synchronizowane, będzie przejrzyściej., lepiej też nie przechwytywać wyjątków w tych metodach tylko zgłaszać wątkom wywołującym obie metody. Zakończenie działania konsumenta robi się różnie, możesz wstawiać jakąś specjalną wartość lub ciąg wartości np. 0 jako sygnał zakończenia działania. Możesz też dodać metody do bufora, które służą do przekazywania informacji między producentem i konsumentem np.:

class Buffor {

  private int[] buff;
  private int readIdx = 0;
  private int writeIdx = 0;
  private int totalCap = 0;
  private boolean stop= false;

  {
    buff = new int[5];
    Arrays.fill(buff,-1);
  }

  synchronized public void push(int val) throws InterruptedException {
        while(totalCap == buff.length) {
          System.out.println("przepelnienie - czekanie");
          this.wait();
        }

        buff[writeIdx % buff.length] = val;
        writeIdx++;
        totalCap++;
        this.notify();
    }

  synchronized public int pop() throws InterruptedException {
        while(totalCap == 0) {
          System.out.println("pusto - czekanie");
          this.wait();
        }

        int val = buff[readIdx % buff.length];
        buff[readIdx % buff.length] = -1;
        readIdx++;
        totalCap--;
        this.notify();
        return val;
  }

  synchronized public void stop(){
    stop = true;
  }

  synchronized public boolean shouldStop(){
    return stop && totalCap == 1;
  }

  public synchronized void show() {
    System.out.println(Arrays.toString(buff));
  }
}

Producent po zakończeniu wywołuje na buforze stop() a konsument przed pobranie testuje :

if (buff.shouldStop()) {
    Thread.currentThread().interrupt();
}

Wtedy w main wystarczy:

Buffor buff = new Buffor();
    Producer prod = new Producer(buff);
    Consumer con = new Consumer(buff);

    ExecutorService exec = Executors.newFixedThreadPool(2);

    exec.submit(prod);
    exec.submit(con);
    exec.shutdown();

1 użytkowników online, w tym zalogowanych: 0, gości: 1