Web/SpringBoot

[kafka] 실패시 retry, 실패시 처리방법, setRecoveryCallback, setErrorHandler

벨포트조던 2019. 12. 17.
반응형

1. [kafka] spring boot consumer config 설정, client 입력

2. [kafka] kafka 수동 commit, ENABLE_AUTO_COMMIT_CONFIG

3. [kafka] 실패시 retry, 실패시 처리방법, setRecoveryCallback, setErrorHandler



필요사항

2번에서 설명한 문제점이 생김.

그래서 선택한 방법은 재시도를 시도하는거다.

재시도 후 실패는 DB 저장

 

단계

재시도를 그냥 코드로도 할 수 있지만, consumer에서 제공하는 방법으로 할라니까.. 생각보다 어려웠음.

해당 방법은 아래 문서를 참고하여 작성하였음.

setRecoveryCallback 으로 해결 가능하였음

 

[주관적]재시도 관련 참고된 문서 - 설명은 자세하지만 너무 양이 많아서 대충 결론만 읽음

https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a

(번역본)https://blog.leocat.kr/notes/2018/10/10/translation-retrying-consumer-architecture-in-the-apache-kafka

 

 

stackover - 간단한 실패시 대처 방법 

https://stackoverflow.com/questions/55568856/how-can-i-retry-failure-messages-from-kafka

 

-여기서 재시도를 해야된다고 확신함. 안에 링크보고 도움

https://stackoverflow.com/questions/46532116/spring-kafka-consumer-retry  

 

-consumer factory doc

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.html#setRecoveryCallback-org.springframework.retry.RecoveryCallback-

 

- setRecoveryCallback 이거 기본 형식. 맨아래에 있음. 일단 이걸로 테스트함 . (중간코드도 retryTemplate 이거 참조함)

https://objectpartners.com/2018/11/21/building-resilient-kafka-consumers-with-spring-retry/


- setErrorHandler 이거 ... 뭔지는 모르겠는데.. 두개보고 조합해서 코드짬

https://docs.spring.io/spring-kafka/reference/html/

https://dzone.com/articles/spring-for-apache-kafka-deep-dive-part-1-error-han

 
이 글에서 내가 조합한거
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    @Bean
    RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
 
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000l);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
 
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
 
        return retryTemplate;
    }
 
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<StringString>> kafkaListenerContainerFactory(
            RetryTemplate retryTemplate) {
        ConcurrentKafkaListenerContainerFactory<StringString> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        factory.setRetryTemplate(retryTemplate);
        factory.setRecoveryCallback(context -> {
            log.info("consumer retry -" + context.toString());
            return null;
        });
        factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<< ?? 뭔지모름 
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
 

이렇게하면 일단 에러시에 로그확인했음. 

 

 

---------------------- 추가 -------------------------------------------------------

수정조건

위 코드에서는 setRecoveryCallback에서 아무 처리를 하지않는다. 로그만 남김다.

3번째 시도 후 DB에 insert 필요한상황이 생겼음.

 

https://stackoverflow.com/questions/46593270/kafka-consumer-fetch-parameters-received-by-listener-in-recover-method

이거 참조해서 callback에서 kafka data 가지고올수있게 함.

(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)

 

뭐 이정도면 설명이면 대충 알아서 잘쓰겟지... ?? 그러츄 ?

 

 

 

 

 

 

예외 참고

-카프카 exception 모음

https://springboot.cloud/35?cf_lbyyhhwhyjj5l3rs65cb3w=u5yuofnbv6hym8g1r79tlq

 

-심화 Stateful Retry  대충뭔소린지 알겠는데 .. 코드적용이 어려워보임

https://gunju-ko.github.io/kafka/spring-kafka/2018/04/16/Spring-Kafka-Retry.html

반응형

댓글