영권's

CompletableFuture 본문

자바/JAVA

CompletableFuture

ykkkk 2021. 8. 9. 01:11

자바 Concurrent 프로그래밍

 

Concurrent 소프트웨어

  • 동시에 여러 작업을 할 수 있는 소프트웨어
  • 예) 웹 브라우저로 유튜브를 보면서 키보드로 타이핑을 할 수 있다.
  • 예) 녹화를 하면서 인텔리J로 코딩을 하고 워드에 적어둔 문서를 보거나 수정할 수 있다.

자바에서 지원하는 Concurrent 프로그래밍

  • 멀티 프로세싱(ProcessBuilder)
  • 멀티 쓰레드

자바 멀티쓰레드 프로그래밍

  • Thread / Runnable

기존 자바에서 멀티 쓰레드 사용법

 

// Thread 상속
public static void main(String[] args) {
	MyThread myThread = new MyThread();
	MyThread.start();
	System.out.println("Hello : " + Thread.currentThread().getName());
}
static class MyThread extends Thread {
	@Override
	public void run() {
		System.out.println("Thread : " + Thread.currentThread().getName());
	}
}

 

// Runnable 익명 구현 또는 람다
Thread thread = new Thread(() -> System.out.println("Thread : " + Thread.currentThread().getName()));
thread.start();
System.out.println("Hello : " + Thread.currentThread().getName());

쓰레드 실행 결과

쓰레드의 순서는 보장할 수 없다.

코드상에서는 MyThread가 먼저 실행했지만 메인 쓰레드의 내용이 먼저 출력된 것을 알 수 있다.

 

쓰레드 주요 기능

  • 현재 쓰레드 멈춰두기 (sleep): 다른 쓰레드가 처리할 수 있도록 기회를 주지만 그렇다고 락을 놔주진 않는다. (잘못하면 데드락 발생)
  • 다른 쓰레드 깨우기 (interupt): 다른 쓰레드를 깨워서 interruptedExeption을 발생 시킨다. 그 에러가 발생했을 때 할 일은 코딩하기 나름. 종료 시킬 수도 있고 계속 하던 일 할 수도 있고.
  • 다른 쓰레드 기다리기 (join): 다른 쓰레드가 끝날 때까지 기다린다.
    • 기다리는 중에 대기하는 해당 쓰레드를 또 다른 쓰레드에서 interrupt 할 수 있음 

문제점 : 만약 굉장히 많은 쓰레드가 있는 경우 프로그래머가 사실상 직접 관리하기 어렵다.

그렇기 때문에 Executors가 등장.

 

Executors

executors는 Thread나 Runnable처럼 Low 레벨 API를 직접 다루는게 아니라 쓰레드를 만들고 관리하는 작업을 고수준(HIGH-LEVEL) API에 이를 위임한다.

 

고수준(HIGH-LEVEL) Concurrency 프로그래밍

  • 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리
  • 그런 기능을 Executors에게 위임
public class Main {

    public static void main(String[] args) {

        // newSingleThreadExecutor() : 쓰레드를 하나만 사용하는 Executor
        //ExecutorService executorService = Executors.newSingleThreadExecutor();

		// 쓰레드 풀 안에 쓰레드를 최대 2개로 고정적으로 만들어 놓고 사용.
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(getRunnable("Hello"));
        executorService.submit(getRunnable("Youngkwon"));
        executorService.submit(getRunnable("Java"));
        executorService.submit(getRunnable("Thread"));
        executorService.submit(getRunnable("Runnable"));

        //executorService.execute(() -> System.out.println("Thread : " + Thread.currentThread().getName()) );

        /* 
        executorService submit()으로 작업을 실행 후에 작업을 실행하고 나면 
        다음 작업이 들어올 때까지 계속해서 대기한다.
        (프로세스가 끝나지 않음) 그래서 명시적으로 shutdown을 해줘야한다.
        */

        // graceful Shutdown : 끝까지 작업을 마치고 executor를 끝냄
        executorService.shutdown();

        // 바로 작업을 끝냄
        //executorService.shutdownNow();

    }

    private static Runnable getRunnable(String message) {
        return () -> System.out.println(message+ " " + Thread.currentThread().getName());
    }
}

newFixedThreadPool(2)에 대한 결과

 

쓰레드 풀에 쓰레드가 생성되고 작업들이 처리되는 과정

newFixedThreadPool(int nThread)은 ThreadPool 내에 인자로 주어진 int값만큼 Thread를 생성해서 사용할 수 있다.

이 쓰레드 풀은 쓰레드 개수보다 작업 개수가 많으면 최대 쓰레드 수까지 쓰레드를 새로 생성하여 작업을 처리한다.

만약 일 없이 놀고 있어도 쓰레드를 제거하지 않고 내비둔다. 

그리고 쓰레드보다 많은 작업들이 들어오게 되면 Blocking Queue에 쌓아놓고 작업을 끝낸 쓰레드에게 순서대로 작업이 주어진다.

 

        // 스케줄대로 실행할 수 있도록 ScheduledExecutorService 사용
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

        // 3초뒤에 실행
        //executorService.schedule(getRunnable("Hello"),3, TimeUnit.SECONDS);

        // 1초 뒤에 실행하고 2초마다 반복 실행
        executorService.scheduleAtFixedRate(getRunnable("Hello"),1,2,TimeUnit.SECONDS);

스케줄대로 실행하고 싶다면 ScheduledExecutorService 사용하면 된다.

 

 

위에서는 계속 Runnable을 사용했다.

하지만 Runnable의 run은 리턴타입이 void라서 만약 어떤 쓰레드에서 작업을 실행 후에 어떤 결과를 리턴 한다면 Runnable를 사용할 수가 없다.

그래서 Callable를 사용하면 리턴을 할 수 있다. 그리고 Callable를 사용해서 리턴을 했을 때 받아올 수 있는 무언가가 필요한데 그것이 Future이다.

Callable과 Future

Callable

  • Runnable과 비슷하지만 작업의 결과를 받을 수 있다.

Future

  • 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

executorService로 submit 하게되면 Future를 리턴값으로 받을 수 있다.

 

이후 Future에 대하여 get()을 실행하면 멈춰서 결과값을 가져올 때 까지 기다린다. (블록킹 콜)

ExecutorService executorService = Executors.newSingleThreadExecutor();

Callable<String> hello = () -> {
	Thread.sleep(5000L);
	return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);

System.out.println(helloFuture.isDone());
System.out.println("Started");
helloFuture.get(); // 블록킹

// 작업 취소 
// true == 현재 진행중인 작업 인터럽트 하면서 종료
// false == 현재 진행중인 작업 기다리고 종료 (기다리지만 cancle하면 get해서 가져올 수 없음)
helloFuture.cancel(true);

System.out.println(helloFuture.isDone());
System.out.println("End");

 

그래서 마냥 기다리기만 해야하는가?

상태를 알 수 있는 방법이 없나?

isDone()으로 상태를 알 수 있습니다

아직 작업이 끝나지 않았다면 false 끝났다면 true

 

작업을 취소하는 방법은 cancle(boolean)을 사용하면 된다.

주의할 점은 false값을 파라미터로 사용해도 get을 해서 가져올 수 없습니다.

그리고 cancle을 하면 isDone의 값이 true가 되는데 이는 작업이 종료됐으니 결과를 가져갈 수 있다가 아닌 cancle이 됐으니 작업이 끝났다 이다.

그래서 cancle 후에 값을 가져오려고 하면 에러가 난다.

 

 

여러개의 작업을 한번에 호출할 수 있는 방법은 invokeAll 메서드를 사용하면 된다.

	ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(3000L);
            return "Hello";
        };
        Callable<String> java = () -> {
            Thread.sleep(5000L);
            return "java";
        };
        Callable<String> cyk = () -> {
            Thread.sleep(1000L);
            return "cyk";
        };


        List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, cyk));
        for (Future<String> future : futures) {
            System.out.println(future.get());
        }
        // graceful Shutdown : 끝까지 작업을 마치고 executor를 끝냄
        executorService.shutdown();

 

 

invokeAll

invokeAll은 한번에 호출한 작업을 모두 기다려야한다.

그래서 hello, cyk가 끝나더라도 java의 작업이 끝날때까지 기다리게 된다.

만약 예를 들어 현재 내가 가진 주가들을 모두 볼 때 이런식으로 처리할 수도 있다.

하지만 서버가 3대인 경우 같은 파일을 복사 하고 해당 파일을 가져오라고 했을 때 굳이 기다릴 필요없이 가장 먼저 가져온 자료를 사용하면 된다.

3개의 서버에서 같은 파일을 가져오는 경우

이러한 상황에서 사용하는 것이 invokeAny이다.

  • 동시에 실행한 작업 중 제일 짧게 걸리는 작업만큼 시간이 걸린다.
  • 블록킹 콜이다.
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        Callable<String> hello = () -> {
            Thread.sleep(5000L);
            return "Hello";
        };
        Callable<String> java = () -> {
            Thread.sleep(3000L);
            return "java";
        };
        Callable<String> cyk = () -> {
            Thread.sleep(1000L);
            return "cyk";
        };


        String futures = executorService.invokeAny(Arrays.asList(hello, java, cyk));
        System.out.println(futures);

주의할 점은 invokeAny를 사용해도 쓰레드가 한개이거나 작업보다 적은 쓰레드를 사용하면 해당 작업 안에서 가장 빠른 작업의 결과가 리턴된다.

 

CompletableFuture

자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스

  • Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다.

Future로는 하기 어렵던 작업들

  • Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있음
  • 블록킹 코드(get())를 사용하지 않고는 작업이 끝났을 때 콜백을 실행할 수 없다.
  • 여러 Future를 조합할 수 없다. Ex) Event 정보를 가저온 다음 Event에 참석하는 회원 목록 가져오기
  • 예외 처리용 API를 제공하지 않는다.

CompletableFuture 

  • Implements Future
  • Implements CompletionStage

Completable이 붙은 이유는 외부에서 명시적으로 Complet을 시킬 수 있다.

예를 들어 몇초이내 답이 없으면 미리 정해놓은 값을 리턴 시킬 수 있다.

 

CompletableFuture를 사용하면 명시적으로 executors를 만들어서 쓸 필요없고 CompletableFuture를 사용해서 비동기적인 작업을 실행할 수 있다.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = new CompletableFuture<>();
        
        // future의 기본값을 youngkwon으로 정해주고 작업을 끝낸 것이다.
        future.complete("youngkwon");

	// 작업의 결과를 얻어와서 출력하면 기본 값인 youngkwon이 출력된다.
        String s = future.get();
        System.out.println(s);
        
        // 혹은 static factory 메서드를 사용해서 기본값 cyk를 주고 작업의 결과를 얻어 올 수 있음
        CompletableFuture<String> future2 = CompletableFuture.completedFuture("cyk");
        System.out.println(future.get());
        
    }

위의 코드는 명시적으로 값을 준것이고 실제로 어떤 작업에 대해서 실행하고 싶을 수 있다.

  • 리턴이 없을 때 : runAsync()를 사용
  • 리턴이 있을 때 : supplyAsync()를 사용
  • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다.(기본은 ForkJoinPool.commonPool())
// 리턴 값이 없는 작업의 경우
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
	System.out.println("Hello : "  + Thread.currentThread().getName());
});

future.get();

/* 
join 을 사용할 수도 있음
join은 작업 처리중 예외가 발생하면 내부에서 interrupt 후 예외에 대한 처리를 uncaughtException 으로 처리한다.
uncaughtException은 기본적으로 자바의 쓰레드 클래스에 있는 uncaughtexceptionhandler가 처리한다.
*/
    
    // 리턴값이 있는 경우
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        });
        System.out.println(future.get());
    }

리턴 값이 있는 경우의 코드에서 future.get()을 실행 했을 때 해당 작업이 실행되면서

hello 현재 쓰레드명 이 출력된 후 리턴 값으로 Hello를 받아 출력한다.

 

콜백 제공하기

  • thenApply(Function): 리턴값을 받아서 다른 값으로 바꾸는 콜백
    • supplyAsync로 받은 결과 값을 변경할 수 있다.
    • thenApply 안에 function이 들어가고 get은 여전히 호출해야함.
    • 차이점은 Future만 썼을 때는 콜백을 get 호출하기 전에 정의하는게 불가능 했다.
    // thenApply 예시
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenApply(s -> {
            System.out.println(Thread.currentThread().getName());
            return s.toUpperCase();
        });
        System.out.println(future.get());
    }

 

 

  • thenAccept(Consumer): 리턴값을 또 다른 작업을 처리하는 콜백 (리턴없이)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenAccept(s -> { 
        	리턴 값 없음
            System.out.println(Thread.currentThread().getName());
            System.out.println(s.toUpperCase());
        });

        future.get();
    }

 

  • thenRun(Runnable): 리턴 값을 받지 않고 다른 작업을 처리하는 콜백 
	// thenRun()은 리턴 값을 사용하지 않고 작업만 처리
    public static void main(String[] args) throws ExecutionException, InterruptedException {
	// supplyAsync()의 리턴 값을 사용하지 않음.
	// runAsync()를 사용 해도 됨
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenRun(() -> {
        
        // thenRun()의 파라미터로 Runnable이 온다.
            System.out.println(Thread.currentThread().getName());
        });

        future.get();
    }

결과

  • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다

비동기적으로 작업을 처리했는데 쓰레드 풀을 만들지도 않고 별도의 쓰레드에서 동작을 할 수 있는지?

ForkJoinPool 때문에 가능하다.

ForkJoinPool은 work stealing 알고리즘을 구현한 것으로 dequeue를 사용한다.

원래 queue는 먼저 들어온 작업을 먼저 처리하는데  dequeue를 사용해서 마지막에 들어온 작업이 먼저 나간다.

만약 자기 쓰레드가 할 일이 없다면 쓰레드가 직접 dequeue에서 작업을 가져와서 처리하는 방식의 프레임워크다.

작업을 잘게 나눌 수 있을 때까지 split 하고 작업 큐(dequeue)에 있는 tail task를 다른 쓰레드가 나누어 병렬처리한 후, join하여 합산하는 방식이다.

 

출처 : https://velog.io/@vies00/Java-work-stealing-fork-join-xljtjnflly

결론적으로 별다른 executor를 사용하지 않아도 내부적으로 Forkjoinpool에 있는 commonPool을 사용한다.

 

하지만 원한다면 얼마든지 만들어서 쓰레드를 줄 수도 있다.

runAsync 혹은 supplyAsync의 두번째 인자로 줄 수 있습니다.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
            
            // runAsync 혹은 supplyAsync의 두번째 인자로 줄 수 있다.
        }, executorService).thenRun(() -> {
            System.out.println(Thread.currentThread().getName());
        });

        future.get();
    }

결과

결과를 보면 위에 결과(pool 이름 ForkJoinPool.commonPool)와 pool의 이름이 달라진 것을 볼 수 있다.(pool-1-thread)

thenRun, thenAccept, thenApply 도 마찬가지로 콜백을 실행할 Pool를 다른 곳에서 실행하고 싶다면 thenRunAsync, thenAcceptAsync, thenApplyAsync를 사용해서 생성한 쓰레드를 사용하게 할 수 있다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        }, executorService).thenRunAsync(() -> {
            System.out.println(Thread.currentThread().getName());
        },executorService);

        future.get();

        executorService.shutdown();
    }

결과 : 생성한 쓰레드 사용

 

 

여러 작업들을 조합

기존 Future를 사용한 방법에서는 여러 작업을 이어서 처리하는 것이 힘들었다.

비동기적인 작업 두개를 연결하는 것이 힘들었다.

왜냐하면 콜백을 주는 방법이 없었기 때문이다.

 

thenCompose

만약 두개의 Future작업의 의존성이 있는 경우 작업1 실행 후 작업2를 실행하는 경우 다음과 같이 사용할 수 있다.

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = hello.thenCompose(CompletableFutureExample::getWorld);

        System.out.println(future.get());
    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}

thenCompose 사용 결과

만약 서로 연관관계가 없는 경우

비동기적으로 실행하는 방법 thenCombine

hello에서 받은 리턴값 s, world에서 받은 리턴 값 s2 를 조합해서 get()을 통해 결과를 얻는다.

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("world " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<String> future = hello.thenCombine(world, (s, s2) -> s + " " + s2);
        System.out.println(future.get());

    }

thenCombine 결과

 

 

만약 2개 이상일 때 모든 작업을 합쳐서 실행하는 방법 allof(CompletableFuture<?>... cfs)를 사용하면 된다.

하지만 모든 작업의 결과가 동일한 타입이라는 보장도 없고 그 중 어떤 작업은 에러가 났을 수 있다

그래서 결과가 null이 나온다.

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("world " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(hello, world)
                .thenAccept(System.out::println);

        System.out.println(voidCompletableFuture.get());

    }

allof에 작업들을 넣고 수행한 결과

 

그래서 제대로 모든 작업을 합쳐서 실행하는 방법은 다음과 같다.

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("world " + Thread.currentThread().getName());
            return "World";
        });

        List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
        CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);

        CompletableFuture<List<String>> listCompletableFuture = CompletableFuture.allOf(futuresArray)
                .thenApply(v ->
                     futures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList()));

        listCompletableFuture.get().forEach(System.out::println);
    }

결과

null 이 아닌 제대로 된 결과를 볼 수 있다.

 

 


참고 :

 

https://www.inflearn.com/course/the-java-java8?inst=6fcc1e30 

 

https://docs.oracle.com/javase/tutorial/essential/concurrency/executors.htm

https://docs.oracle.com/javase/tutorial/essential/concurrency/ https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt--

ForkJoinPool (Java Platform SE 8 ) (oracle.com)

CompletableFuture (Java Platform SE 8 ) (oracle.com)

'자바 > JAVA' 카테고리의 다른 글

Java16 - record  (0) 2021.09.19
Java - Stream  (0) 2021.09.19
자바 (Optional)  (0) 2021.08.01
JAVA 인터뷰 대비  (0) 2020.10.25
JVM  (0) 2020.10.14
Comments