Web/SpringBoot

[kafka] kafka 수동 commit, ENABLE_AUTO_COMMIT_CONFIG

벨포트조던 2019. 12. 10.
반응형

1. [kafka] spring boot consumer config 설정, client 입력

2. [kafka] kafka 수동 commit, ENABLE_AUTO_COMMIT_CONFIG

3. [kafka] 실패시 retry, 실패시 처리방법, setRecoveryCallback, setErrorHandler

 

springboot 카프카 컨슈머 개발 중, 특정 값만 commit 해야하는 경우가 생겼다.

 

내 경우에는 로직 성공시에만 commit 하려고한다.

 --- 문제가 약간있음. 마지막에 설명함 

 

https://stackoverflow.com/questions/47427948/how-to-acknowledge-current-offset-in-spring-kafka-for-manual-commit?cf_lbyyhhwhyjj5l3rs65cb3w=le79rqactxj0g9f97b4gzyh

이거 하나면 끝남 .. 

 

잘 읽으면 바로 알겠지만 추가로 설명하자면

 

1
2
3
4
5
6
7
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
 

위 경우는 내 코드이다. 팩토리에 등록시에 설정 추가한다.

약간 다른데 .. 

AckMode.MANUAL_IMMEDIATE); 이부분이 나랑 좀 다르다. 

 

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

공식 문서보고 바꿨다.. 차이는 써있는데, 정확히는 모르겠다. 알면 댓글좀 ..  나는 비슷하게 동작한다.

 

이렇게 설정하면

원하는 부분에서 

acknowledgment.acknowledge(); 로 commit 할수 있다. 

이 코드를 안쓰면 offset이 갱신이 안된다.

 

요까지는... 깔끔햇다 .. .

 

테스트 중 문제를 발견했는데.... 주위사람들이 잘못알고있는 부분이 있었음.

글로 설명하긴 좀 힘든데.. 해보자면

결론부터 말하자면 kafka에 성공,실패 마킹기능은 없다. 

 

kafka data

1번     성공    commit    offset :1

2번     성공    commit    offset :2

3번     실패        -         offset :2

 

이런 상황이면, 서버 재시작시에 3번부터 불러온다. - ok

또는, 다른 consumer 가 붙어서 처리한다 -ok

 

문제는 저 위에서 추가로 4번이 성공으로 처리되면

kafka data

1번     성공    commit    offset :1

2번     성공    commit    offset :2

3번     실패        -         offset :2

4번     성공    commit    offset :4

 

4번이 성공나면 3번의 실패가 따로 저장되거나, 처리하지 않고 지나가 버린다.

( 내가 잘 못 알고있는거면 댓글 바란다. 근데 아마 맞을거다. 몇칠을 확인함)

 

카프카 사용시 이런 문제가 있는데, 다들 저 상황에서 3번을 처리할수 있다고 생각하고 있더라고.. 

 

아무튼 이런 문제가 있다. 

이 문제를 해결할라고 또 몇칠을 찾아봤다.

 

https://stackoverflow.com/questions/55568856/how-can-i-retry-failure-messages-from-kafka

https://stackoverflow.com/questions/46532116/spring-kafka-consumer-retry

 

여러 설명을 찾와봤을때 해결 방안은

1. retry 재시도

2. 다른 topic에 넣어서 재시도 

이정도이다.. 

실패시 마킹이 됬으면 좋았으련만, 아마도 없는것같다. 

 

다른 방안은 난 포기했고, log와 retry로 그냥 쇼부봤다.. 

 

다음에는 retry 작성한다.

 

- ack 모드 뭐뭐 있는지 설정값 설명

https://justin-g.tistory.com/44

 

반응형

댓글