이 글에서 얻는 것
- @Async로 비동기 메서드를 만들 수 있습니다.
- CompletableFuture로 비동기 작업을 조합할 수 있습니다.
- ThreadPoolTaskExecutor로 스레드 풀을 설정할 수 있습니다.
- 비동기 예외 처리를 구현할 수 있습니다.
0) 비동기 처리는 “대기 시간을 활용"한다
문제 상황: 동기 처리
@Service
public class OrderService {
public OrderResult processOrder(Order order) {
// 1. 결제 처리 (500ms)
paymentService.processPayment(order);
// 2. 이메일 발송 (300ms)
emailService.sendConfirmation(order);
// 3. SMS 발송 (200ms)
smsService.sendNotification(order);
// 총 소요 시간: 500 + 300 + 200 = 1000ms
return new OrderResult(order);
}
}
문제점:
- 순차 실행 → 총 1초 소요
- 이메일/SMS는 결제와 독립적인데 기다려야 함
- 사용자 응답 시간 증가
해결: 비동기 처리
@Service
public class OrderService {
@Async
public CompletableFuture<Void> sendEmailAsync(Order order) {
emailService.sendConfirmation(order);
return CompletableFuture.completedFuture(null);
}
@Async
public CompletableFuture<Void> sendSmsAsync(Order order) {
smsService.sendNotification(order);
return CompletableFuture.completedFuture(null);
}
public OrderResult processOrder(Order order) {
// 1. 결제 처리 (동기, 필수)
paymentService.processPayment(order); // 500ms
// 2. 이메일/SMS 비동기 발송 (병렬)
sendEmailAsync(order); // 백그라운드에서 실행
sendSmsAsync(order); // 백그라운드에서 실행
// 총 소요 시간: 500ms (이메일/SMS는 백그라운드)
return new OrderResult(order);
}
}
1) @Async 기본
1-1) @EnableAsync 설정
@Configuration
@EnableAsync // Async 기능 활성화
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 기본 스레드 수
executor.setMaxPoolSize(10); // 최대 스레드 수
executor.setQueueCapacity(100); // 큐 크기
executor.setThreadNamePrefix("async-");
executor.initialize();
return executor;
}
}
1-2) @Async 사용
@Service
public class EmailService {
@Async
public void sendEmail(String to, String subject, String body) {
log.info("Sending email to {} - Thread: {}", to, Thread.currentThread().getName());
// 이메일 발송 로직
// ...
log.info("Email sent to {}", to);
}
}
호출:
@RestController
public class UserController {
@Autowired
private EmailService emailService;
@PostMapping("/users")
public ResponseEntity<User> createUser(@RequestBody User user) {
User saved = userService.save(user);
// 비동기로 이메일 발송 (즉시 반환)
emailService.sendEmail(user.getEmail(), "Welcome!", "환영합니다!");
return ResponseEntity.ok(saved);
}
}
1-3) 반환값이 있는 @Async
@Service
public class UserService {
@Async
public CompletableFuture<User> findByIdAsync(Long id) {
log.info("Finding user {} - Thread: {}", id, Thread.currentThread().getName());
User user = userRepository.findById(id)
.orElseThrow(() -> new UserNotFoundException(id));
return CompletableFuture.completedFuture(user);
}
@Async
public CompletableFuture<List<Order>> findOrdersAsync(Long userId) {
log.info("Finding orders for user {} - Thread: {}", userId, Thread.currentThread().getName());
List<Order> orders = orderRepository.findByUserId(userId);
return CompletableFuture.completedFuture(orders);
}
}
병렬 실행:
@GetMapping("/users/{id}/details")
public ResponseEntity<UserDetails> getUserDetails(@PathVariable Long id) {
// 두 작업을 병렬로 실행
CompletableFuture<User> userFuture = userService.findByIdAsync(id);
CompletableFuture<List<Order>> ordersFuture = userService.findOrdersAsync(id);
// 두 작업이 모두 완료될 때까지 대기
CompletableFuture.allOf(userFuture, ordersFuture).join();
// 결과 조합
User user = userFuture.join();
List<Order> orders = ordersFuture.join();
return ResponseEntity.ok(new UserDetails(user, orders));
}
2) ThreadPoolTaskExecutor 설정
2-1) 스레드 풀 파라미터
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 기본 스레드 수 (항상 유지)
executor.setCorePoolSize(5);
// 최대 스레드 수
executor.setMaxPoolSize(10);
// 큐 크기 (대기열)
executor.setQueueCapacity(100);
// 유휴 스레드 유지 시간
executor.setKeepAliveSeconds(60);
// 스레드 이름 접두사
executor.setThreadNamePrefix("async-");
// 거부 정책 (큐가 가득 찼을 때)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 종료 대기
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
동작 방식:
1. 작업 요청
↓
2. CorePoolSize 미만? → 새 스레드 생성
↓
3. CorePoolSize 이상? → 큐에 추가
↓
4. 큐 가득? → MaxPoolSize까지 스레드 생성
↓
5. MaxPoolSize 도달? → RejectedExecutionHandler 실행
2-2) 여러 Executor 설정
@Configuration
@EnableAsync
public class AsyncConfig {
// 이메일 발송용
@Bean(name = "emailExecutor")
public Executor emailExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("email-");
executor.initialize();
return executor;
}
// 파일 처리용
@Bean(name = "fileExecutor")
public Executor fileExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("file-");
executor.initialize();
return executor;
}
}
사용:
@Service
public class EmailService {
@Async("emailExecutor") // 특정 Executor 지정
public void sendEmail(String to, String subject, String body) {
// ...
}
}
@Service
public class FileService {
@Async("fileExecutor") // 다른 Executor 사용
public void processFile(File file) {
// ...
}
}
3) CompletableFuture 활용
3-1) 기본 사용
// 비동기 작업 생성
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 백그라운드 스레드에서 실행
Thread.sleep(1000);
return "Hello";
});
// 결과 대기 (블로킹)
String result = future.get(); // "Hello"
// 타임아웃 설정
String result = future.get(2, TimeUnit.SECONDS);
// 논블로킹 대기
future.join(); // get()과 유사하지만 Checked Exception 없음
3-2) 작업 체이닝
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "user123";
})
.thenApply(userId -> {
// 이전 결과를 받아서 변환
return userRepository.findById(userId);
})
.thenApply(user -> {
// User → UserDTO 변환
return new UserDTO(user);
})
.thenAccept(userDto -> {
// 최종 결과 사용 (반환값 없음)
log.info("User loaded: {}", userDto);
});
3-3) 병렬 실행
@GetMapping("/dashboard")
public ResponseEntity<Dashboard> getDashboard() {
// 3개 작업을 병렬로 실행
CompletableFuture<Long> userCountFuture = CompletableFuture.supplyAsync(() ->
userRepository.count()
);
CompletableFuture<Long> orderCountFuture = CompletableFuture.supplyAsync(() ->
orderRepository.count()
);
CompletableFuture<BigDecimal> totalRevenueFuture = CompletableFuture.supplyAsync(() ->
orderRepository.calculateTotalRevenue()
);
// 모든 작업이 완료될 때까지 대기
CompletableFuture.allOf(userCountFuture, orderCountFuture, totalRevenueFuture).join();
// 결과 조합
Dashboard dashboard = new Dashboard(
userCountFuture.join(),
orderCountFuture.join(),
totalRevenueFuture.join()
);
return ResponseEntity.ok(dashboard);
}
3-4) 예외 처리
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Error!");
}
return "Success";
})
.exceptionally(ex -> {
// 예외 발생 시 기본값 반환
log.error("Error occurred", ex);
return "Default Value";
})
.thenApply(result -> {
return result.toUpperCase();
});
또는 handle():
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Error!");
})
.handle((result, ex) -> {
if (ex != null) {
// 예외 처리
log.error("Error occurred", ex);
return "Error: " + ex.getMessage();
} else {
// 정상 처리
return result;
}
});
3-5) 여러 작업 조합
// 둘 중 먼저 완료된 것 사용
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Result from DB";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(500);
return "Result from Cache";
});
CompletableFuture<Object> fastest = CompletableFuture.anyOf(future1, future2);
String result = (String) fastest.join(); // "Result from Cache" (더 빠름)
// 두 작업이 모두 완료될 때까지 대기
CompletableFuture<Void> allComplete = CompletableFuture.allOf(future1, future2);
allComplete.join();
// 두 결과를 조합
CompletableFuture<String> combined = future1.thenCombine(future2, (result1, result2) -> {
return result1 + " + " + result2;
});
4) 비동기 예외 처리
4-1) @Async 예외 처리
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("Async exception in method: {} with params: {}",
method.getName(), Arrays.toString(params), ex);
// 알림 발송 (선택)
// slackNotifier.sendError(...);
}
}
4-2) CompletableFuture 예외 처리
@Service
public class UserService {
@Async
public CompletableFuture<User> findByIdAsync(Long id) {
return CompletableFuture.supplyAsync(() -> {
return userRepository.findById(id)
.orElseThrow(() -> new UserNotFoundException(id));
})
.exceptionally(ex -> {
log.error("Error finding user: {}", id, ex);
return null; // 또는 기본값
});
}
}
5) 실전 패턴
5-1) 이메일 발송
@Service
@Slf4j
public class EmailService {
@Async("emailExecutor")
public CompletableFuture<Void> sendWelcomeEmail(User user) {
log.info("Sending welcome email to {}", user.getEmail());
try {
// 이메일 발송
mailSender.send(createWelcomeEmail(user));
log.info("Welcome email sent to {}", user.getEmail());
} catch (Exception e) {
log.error("Failed to send welcome email to {}", user.getEmail(), e);
throw e;
}
return CompletableFuture.completedFuture(null);
}
}
5-2) 파일 처리
@Service
public class FileProcessingService {
@Async("fileExecutor")
public CompletableFuture<ProcessResult> processFileAsync(File file) {
log.info("Processing file: {}", file.getName());
try {
// 파일 읽기
List<String> lines = Files.readAllLines(file.toPath());
// 처리
ProcessResult result = processLines(lines);
log.info("File processed: {} - {} lines", file.getName(), lines.size());
return CompletableFuture.completedFuture(result);
} catch (IOException e) {
log.error("Failed to process file: {}", file.getName(), e);
return CompletableFuture.failedFuture(e);
}
}
}
5-3) 외부 API 호출
@Service
public class ExternalApiService {
@Async
public CompletableFuture<ApiResponse> callExternalApi(String url) {
return CompletableFuture.supplyAsync(() -> {
return restTemplate.getForObject(url, ApiResponse.class);
})
.orTimeout(3, TimeUnit.SECONDS) // 타임아웃
.exceptionally(ex -> {
log.error("External API call failed: {}", url, ex);
return ApiResponse.error("External API unavailable");
});
}
}
6) 주의사항
⚠️ 1. @Async는 Proxy 기반
// ❌ 나쁜 예: 같은 클래스 내 @Async 메서드 호출
@Service
public class UserService {
public void registerUser(User user) {
userRepository.save(user);
sendWelcomeEmail(user); // @Async 동작 안 함! (프록시 우회)
}
@Async
public void sendWelcomeEmail(User user) {
// ...
}
}
// ✅ 좋은 예: 다른 Bean에서 호출
@Service
public class UserService {
@Autowired
private EmailService emailService;
public void registerUser(User user) {
userRepository.save(user);
emailService.sendWelcomeEmail(user); // @Async 동작!
}
}
⚠️ 2. 트랜잭션 주의
// ❌ 나쁜 예: @Transactional과 @Async 함께 사용
@Transactional
@Async
public void updateUser(User user) {
// 트랜잭션이 다른 스레드에서 실행됨!
}
// ✅ 좋은 예: 분리
@Transactional
public void updateUser(User user) {
userRepository.save(user);
// 트랜잭션 커밋 후 비동기 작업
}
@Async
public void sendNotification(User user) {
// 비동기 작업
}
⚠️ 3. 스레드 풀 고갈
// ❌ 나쁜 예: 무한정 비동기 작업 생성
for (int i = 0; i < 10000; i++) {
asyncService.process(i); // 스레드 풀 고갈!
}
// ✅ 좋은 예: 배치 처리 또는 스로틀링
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < items.size(); i += 100) {
List<Item> batch = items.subList(i, Math.min(i + 100, items.size()));
futures.add(asyncService.processBatch(batch));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
연습 (추천)
@Async 구현
- @EnableAsync 설정
- 비동기 이메일 발송
- 반환값 있는 @Async
CompletableFuture 활용
- 병렬 실행 (allOf, anyOf)
- 작업 체이닝 (thenApply, thenCompose)
- 예외 처리
스레드 풀 튜닝
- 적절한 CorePoolSize/MaxPoolSize 설정
- 작업 특성에 맞는 Executor 설계
요약: 스스로 점검할 것
- @Async로 비동기 메서드를 만들 수 있다
- CompletableFuture로 비동기 작업을 조합할 수 있다
- ThreadPoolTaskExecutor를 설정할 수 있다
- 비동기 예외를 처리할 수 있다
- Proxy 기반 동작을 이해한다
다음 단계
- Spring WebFlux:
/learning/deep-dive/deep-dive-spring-webflux-vs-mvc/ - Reactive Programming:
/learning/deep-dive/deep-dive-reactive-streams/ - 동시성 제어:
/learning/deep-dive/deep-dive-java-concurrency-basics/
💬 댓글