1. [kafka] spring boot consumer config 설정, client 입력
2. [kafka] kafka 수동 commit, ENABLE_AUTO_COMMIT_CONFIG
3. [kafka] 실패시 retry, 실패시 처리방법, setRecoveryCallback, setErrorHandler
springboot 카프카 컨슈머 개발 중, 특정 값만 commit 해야하는 경우가 생겼다.
내 경우에는 로직 성공시에만 commit 하려고한다.
--- 문제가 약간있음. 마지막에 설명함
이거 하나면 끝남 ..
잘 읽으면 바로 알겠지만 추가로 설명하자면
1
2
3
4
5
6
7
|
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
|
위 경우는 내 코드이다. 팩토리에 등록시에 설정 추가한다.
약간 다른데 ..
AckMode.MANUAL_IMMEDIATE); 이부분이 나랑 좀 다르다.
공식 문서보고 바꿨다.. 차이는 써있는데, 정확히는 모르겠다. 알면 댓글좀 .. 나는 비슷하게 동작한다.
이렇게 설정하면
원하는 부분에서
acknowledgment.acknowledge(); 로 commit 할수 있다.
이 코드를 안쓰면 offset이 갱신이 안된다.
요까지는... 깔끔햇다 .. .
테스트 중 문제를 발견했는데.... 주위사람들이 잘못알고있는 부분이 있었음.
글로 설명하긴 좀 힘든데.. 해보자면
결론부터 말하자면 kafka에 성공,실패 마킹기능은 없다.
kafka data
1번 성공 commit offset :1
2번 성공 commit offset :2
3번 실패 - offset :2
이런 상황이면, 서버 재시작시에 3번부터 불러온다. - ok
또는, 다른 consumer 가 붙어서 처리한다 -ok
문제는 저 위에서 추가로 4번이 성공으로 처리되면
kafka data
1번 성공 commit offset :1
2번 성공 commit offset :2
3번 실패 - offset :2
4번 성공 commit offset :4
4번이 성공나면 3번의 실패가 따로 저장되거나, 처리하지 않고 지나가 버린다.
( 내가 잘 못 알고있는거면 댓글 바란다. 근데 아마 맞을거다. 몇칠을 확인함)
카프카 사용시 이런 문제가 있는데, 다들 저 상황에서 3번을 처리할수 있다고 생각하고 있더라고..
아무튼 이런 문제가 있다.
이 문제를 해결할라고 또 몇칠을 찾아봤다.
https://stackoverflow.com/questions/55568856/how-can-i-retry-failure-messages-from-kafka
https://stackoverflow.com/questions/46532116/spring-kafka-consumer-retry
여러 설명을 찾와봤을때 해결 방안은
1. retry 재시도
2. 다른 topic에 넣어서 재시도
이정도이다..
실패시 마킹이 됬으면 좋았으련만, 아마도 없는것같다.
다른 방안은 난 포기했고, log와 retry로 그냥 쇼부봤다..
다음에는 retry 작성한다.
- ack 모드 뭐뭐 있는지 설정값 설명
https://justin-g.tistory.com/44
'Web > SpringBoot' 카테고리의 다른 글
[java]JSON, GSON, JSONObject, JsonObject 내가 한데까지 정리 (0) | 2019.12.27 |
---|---|
[kafka] 실패시 retry, 실패시 처리방법, setRecoveryCallback, setErrorHandler (0) | 2019.12.17 |
[kafka] spring boot consumer config 설정, client 입력 (0) | 2019.11.27 |
Configure Pod readiness with Spring Actuator (0) | 2019.11.20 |
[springboot] healthcheck ,actuator, kubernetes 헬스 체크 하여 pod 롤링 업데이트 403 에러 - 2/2 (0) | 2019.11.18 |
댓글