1. [kafka] spring boot consumer config 설정, client 입력
2. [kafka] kafka 수동 commit, ENABLE_AUTO_COMMIT_CONFIG
3. [kafka] 실패시 retry, 실패시 처리방법, setRecoveryCallback, setErrorHandler
필요사항
2번에서 설명한 문제점이 생김.
그래서 선택한 방법은 재시도를 시도하는거다.
재시도 후 실패는 DB 저장
단계
재시도를 그냥 코드로도 할 수 있지만, consumer에서 제공하는 방법으로 할라니까.. 생각보다 어려웠음.
해당 방법은 아래 문서를 참고하여 작성하였음.
setRecoveryCallback 으로 해결 가능하였음
[주관적]재시도 관련 참고된 문서 - 설명은 자세하지만 너무 양이 많아서 대충 결론만 읽음
https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a
stackover - 간단한 실패시 대처 방법
https://stackoverflow.com/questions/55568856/how-can-i-retry-failure-messages-from-kafka
-여기서 재시도를 해야된다고 확신함. 안에 링크보고 도움
https://stackoverflow.com/questions/46532116/spring-kafka-consumer-retry
-consumer factory doc
- setRecoveryCallback 이거 기본 형식. 맨아래에 있음. 일단 이걸로 테스트함 . (중간코드도 retryTemplate 이거 참조함)
https://objectpartners.com/2018/11/21/building-resilient-kafka-consumers-with-spring-retry/
- setErrorHandler 이거 ... 뭔지는 모르겠는데.. 두개보고 조합해서 코드짬
https://docs.spring.io/spring-kafka/reference/html/
https://dzone.com/articles/spring-for-apache-kafka-deep-dive-part-1-error-han
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Bean
RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
return null;
});
factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<< ?? 뭔지모름
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
|
이렇게하면 일단 에러시에 로그확인했음.
---------------------- 추가 -------------------------------------------------------
수정조건
위 코드에서는 setRecoveryCallback에서 아무 처리를 하지않는다. 로그만 남김다.
3번째 시도 후 DB에 insert 필요한상황이 생겼음.
이거 참조해서 callback에서 kafka data 가지고올수있게 함.
(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)
뭐 이정도면 설명이면 대충 알아서 잘쓰겟지... ?? 그러츄 ?
예외 참고
-카프카 exception 모음
https://springboot.cloud/35?cf_lbyyhhwhyjj5l3rs65cb3w=u5yuofnbv6hym8g1r79tlq
-심화 Stateful Retry 대충뭔소린지 알겠는데 .. 코드적용이 어려워보임
https://gunju-ko.github.io/kafka/spring-kafka/2018/04/16/Spring-Kafka-Retry.html
'Web > SpringBoot' 카테고리의 다른 글
[JSON] json data java 모델(VO) 로 생성 (0) | 2019.12.27 |
---|---|
[java]JSON, GSON, JSONObject, JsonObject 내가 한데까지 정리 (0) | 2019.12.27 |
[kafka] kafka 수동 commit, ENABLE_AUTO_COMMIT_CONFIG (4) | 2019.12.10 |
[kafka] spring boot consumer config 설정, client 입력 (0) | 2019.11.27 |
Configure Pod readiness with Spring Actuator (0) | 2019.11.20 |
댓글