[Java] Multithreading - Producer & Consumer

반응형

이번 글에서는 thread를 사용해 producer와 consumer를 구현하는 방법에 대해 알아보도록 하겠습니다.

1. Producer & Consumer

producer & consumer problem은 최소 1개씩의 producer & consumer thread가 있다고 가정합니다.

예를 들어 위와 같이 producer & consumer thread가 존재할 때, producer thread는 message를 생성해 queue에 발행합니다.

consumer thread는 queue에 발행된 message를 FIFO 순서대로 처리해 작업을 수행하도록 구현하고자 합니다.

이때 producer는 queue가 꽉 차있을 경우 더이상 message를 발생하지 않고, consumer는 반대로 queue가 빌때까지 message를 순서대로 소비해 작업을 수행해야 합니다.

위의 내용을 java로 구현해보면 다음과 같습니다.

1-1) producer

Producer Thread

public class Producer extends Thread {
    private static final int MAX_SIZE = 3;
    private final List<String> queue = new ArrayList<>();

    @Override
    public void run() {
        try {
            while (true){
                produce();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    // producer message
    private synchronized void produce() throws Exception {
        while (queue.size() ==  MAX_SIZE){
            System.out.println("Queue limit reached. Waiting for consumer");
            wait();
            System.out.println("Producer got notification from consumer");
        }
        String data = LocalDateTime.now().toString();
        queue.add(data);
        System.out.println("Producer produced data");
        notify();
    }

    // consume message
    public synchronized String consume() throws Exception {
        notify();
        while (queue.isEmpty()){
            wait();
        }
        String data = queue.get(0);
        queue.remove(data);
        return data;
    }
}

• message produce와 consume을 동기화 처리하기위해 위와 같이 Producer Thread 내부에 produce & consume 메서드를 작성합니다.

• producer는 queue의 사이즈가 MAX_SIZE 인 경우에 wait을 호출해 대기합니다.

• consumer는 queue가 비어있을 경우 wait을 호출해 대기합니다.

• producer는 message를 생성한 후 notify를 호출해 대기중인 consumer thread를 깨웁니다.

• consumer는 메세지를 message를 소비 후 notify를 호출해 대기중인 producer thread를 깨웁니다.

1-2) consumer

Consumer Thread

public class Consumer extends Thread {
    private Producer producer;

    public Consumer(Producer producer) {
        this.producer = producer;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String data = producer.consume();
                System.out.println("Consumed by : " + Thread.currentThread().getName() + " data : " + data);
            }
        } catch (Exception e) {

        }
    }
}

consumer thread는 생성자로 producer thread를 주입받아, producer thread에 작성된 consume 메서드를 호출합니다.

주입받은 producer thread instance는 동기화 객체이므로, message의 발행 & 소비는 thread-safe 하게 동작합니다.

1-3) main

위의 producer & consumer thread를 사용하는 main 함수는 다음과 같이 작성할 수 있습니다.

public static void main(String[] args) {
    Producer producer = new Producer();
    producer.setName("Producer-1");
    producer.start();

    Consumer consumer1 = new Consumer(producer);
    consumer1.setName("Consumer-1");
    consumer1.start();

    Consumer consumer2 = new Consumer(producer);
    consumer2.setName("Consumer-2");
    consumer2.start();

    Consumer consumer3 = new Consumer(producer);
    consumer3.setName("Consumer-3");
    consumer3.start();
}

message를 발행하는 producer thread instance가 동기화 인스턴스 이므로, 1개 이상의 consumer를 사용해 message를 소비하더라도 thread-safe하게 작동하게 됩니다.


추천서적

 

이것이 자바다:신용권의 Java 프로그래밍 정복

COUPANG

www.coupang.com

파트너스 활동을 통해 일정액의 수수료를 제공받을 수 있음


반응형

댓글

Designed by JB FACTORY