SERVER/Kafka

[kafka] 커밋이 장시간 누락되었을때 컨슈머 재배치 이슈

벨포트조던 2021. 10. 28.
반응형

에러 상황은 다음과 같다

Oct 8, 2021 @ 23:28:24.867		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:24.867		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:24.867		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:24.867		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:24.867		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:24.867		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:24.867		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:24.867		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:24.867		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:24.867		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:09.832		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:09.832		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:09.832		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:09.832		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:09.832		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:09.832		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:09.832		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:09.832		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:09.832		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:09.832		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:08.272		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:08.272		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:08.272		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:08.272		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:08.272		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:08.272		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:08.272		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:08.272		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:08.272		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:08.272		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:06.816		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:06.816		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:06.816		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:06.816		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:06.816		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:06.816		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:06.816		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:06.816		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:28:06.816		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:28:06.816		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:57.353		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:57.353		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:57.353		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:57.353		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:57.353		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:57.353		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:57.353		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:57.353		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:57.353		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:57.353		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:51.997		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:51.997		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:51.997		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:51.997		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:51.997		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:51.997		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:51.997		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:51.997		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:51.997		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:51.997		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:42.007		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:42.007		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:42.007		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:42.007		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:42.007		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:42.007		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:42.007		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:42.007		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:27:42.007		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:27:42.007		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:59.955		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:59.955		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:23:59.955		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:59.955		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:23:59.955		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:59.955		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:59.955		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:59.955		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:59.955		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:59.955		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:23:57.971		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:57.971		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:57.971		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:23:57.971		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:57.971		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:57.971		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:57.971		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:57.971		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:23:57.971		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:23:57.971		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:22:49.683		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:22:49.683		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:22:49.683		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:22:49.683		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:22:49.683		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:22:49.683		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:22:49.683		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:22:49.683		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:22:49.683		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:22:49.683		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:21:33.786		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:21:33.786		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:21:33.786		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:21:33.786		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:21:33.786		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:21:33.786		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:21:33.786		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:21:33.786		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:21:33.786		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:21:33.786		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:20:42.175		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:20:42.175		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:20:42.175		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1761) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:20:42.175		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:20:42.175		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:930) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:20:42.175		at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:20:42.175		at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.0.jar!/:?]	
	Oct 8, 2021 @ 23:20:42.175		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:20:42.175		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]	
	Oct 8, 2021 @ 23:20:42.175		at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.0.jar!/:?]

 

이러한 에러가 발생했다

 

대략적인 문제

- 컨슈머에서 api 호출시 3초 타임아웃 발생하면 재시도 하지않고 offset 갱신하기로 한다.

- 모든 호출이 타임아웃 발생해도 내 예상으로는 한건씩 offset 갱신하고 넘어갈 줄 알았는데, 아니었음

- offset은 그대로 있고 계속 타임아웃에러가 발생하다가 컨슈머 재배치가 발생하면서 토픽을 처음 실패했던 부분부터 다시 읽어옴 

 

방안( 확실한 해결인지는 또 같은 에러가 발생시 알수 있다 )

아래 참고 블로그 여러개를 보고 결론내렸다.

 

방법으로는 카프카설정값을 바꾸는것

가장 중요한건 max.poll.record 값이다 

해당 값이 한번에 읽어오는 값인데, 예를들어 100이라고 했을때, 100을 다 처리못하면 처음으로 돌아가는 듯하다 

 

트랜잭션과 비슷한 느낌으로 생각된다

polling 양이 크면 그만큼 안에 마무리를 못지으면 오프셋이 갱신안되는것으로 보임

 

다른 설정값들은 크게 중요해보이지 않아 다른 블로그 글들을 따라했으며, max.poll 값만 해당 이슈에 맞게 적절히 설정했다. 

 

문제는 같은 에러를 발생하는게 힘들어서... 서버 돌리면서 확인해야할것 같다. 

 

 

 

참고블로그 내용

초기 Kafka 운영 중 이상한 이슈에 직면했다.

 

- 현상 

kafka의 소비되어 처리된 메시지가 한 번이계속 반복적으로 수행하고 적게는 몇번 많게는 수십번까지 다시 소비하는 현상 이였다.

 

내 이론상 kafka 메시지는 리스너에서 처리된 후 kafka에 커밋을 치고 offset값을 올려주면서 다음 메시지를 처리한다고 생각했는데...

 

즉, 다시 말해서 정말 생각지도 못했던 이슈였던 것이다.

 

원인을 찾고 해당 증상을 면밀히 조사하여, kafka 리스너 셋팅 몇개를 수정하면서 문제를 해결하긴 했지만 초기에 알지 못했으면 큰 이슈로 연결 될뻔 했다.

 

 

- 원인 

 

 

Consumer가 메시지를 처리하는 도중 Timeout이 발생했고 그로 인해 파티션 리밸런싱 과정이 일어났다.

 

Consumer는 (MAX_POLL_RECORDS_CONFIG = "500") Topic에서 메시지를 500개씩 가져오고 있었고, 
Consumer와 Broker 사이의 세션 유지시간 (SESSION_TIMEOUT_MS_CONFIG = 10000) 10초,

요청에 대해 응답을 기다리는 최대 시간 (REQUEST_TIMEOUT_MS_CONFIG = 30000) 30초를 셋팅하고 있다. 

 

위 기본값 대로라면, Topic의 메시지를 500개씩 Consumer가 가져오고 있으며, 세션유지시간은 10초, 컨슈머의 응답을 기다리는 시간은 30초 였던 것이다.

 

500개를 처리하는 중 문제가(timeout 등) 있었다면, Consumer와 Broker 사이의 Rule이 깨진 것이다.

그로 인해 파티션  리밸런싱이 발생했던 것이다. 

 

--> 컨수머에서 500개를 가져오던 중 리밸런싱이 일어났으니 Broker는 Consumer에서 가져간 500개를 처리하지 못했다고 판단하고 다른 Consumer는 해당 메시지를 Broker에서 가져온다.

 

위 증상이 반복되면서 이미 소비된 메시지가 반복적으로 소비할 수 있는 환경이 갖춰진 것이다.

 

 

해결안.

# 요청에 대해 응답을 기다리는 최대 시간
requestTimeoutMs: 30000
# 컨슈머와 브로커사이의 세션 타임 아웃시간.
sessionTimeoutMs: 30000
# session.timeout.ms와 밀접한 관계가 있으며 session.timeout.ms보다 낮아야한다. 일반적으로 1/3
heartbeatIntervalMs: 10000
# 컨슈머가 polling하고 commit 할때까지의 대기시간
maxPollIntervalMs: 600000
# poll ()에 대한 단일 호출에서 반환되는 최대 레코드 수
maxPollRecords: 1

 

Consumer 프로젝트에서 Kafka 기본 셋팅값을 수정해주면 된다.

필자는 Java (Spring Boot환경)에서 ConsumerConfig.java의 값을 수정했다.

google에 ConsumerConfig.java 만 검색해도 자세한 설명을 나올 것으로 보고, Listener에 대한 설정이 가능하니 참고하길 바란다.

 

우선적으로 500개씩 Broker에서 가져오는 MAX_POLL_RECORDS_CONFIG를 1개로 수정했다.

(참고한 블로그에선 3개로 셋팅해서 3개로 진행했는데 간혈적으로 타임아웃으로 인해 똑같은 상황이 반복되어 정확해야 하는건이라 1개로 수정.)

그리고 카프카 Broker와 Listener간의 Time들을 수정했는데 조건부적인 작업이라고 생각한다.

MAX_POLL_RECORDS_CONFIG 이 부분이 가장 크리티컬 하다고 판단한다.

 

 

session.timeout.ms

이 옵션은 heartbeat 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며 heartbeat.interval.ms와 밀접한 관련이 있어서 일반적으로 두 속성이 함께 수정된다.
10000 (10초)
heartbeat.interval.ms 컨슈머가 얼마나 자주 heartbeat을 보낼지 조정한다. session.timeout.ms보다 작아야 하며 일반적으로 1/3로 설정 3000 (3초)
max.poll.interval.ms
이러한 경우에 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외시키도록 하는 옵션이다.
300000 (5분)
max.poll.records
이 옵션으로 polling loop에서 데이터 양을 조정 할 수 있다.
500
enable.auto.commit 백그라운드로 주기적으로 offset을 commit true
auto.commit.interval.ms 주기적으로 offset을 커밋하는 시간 5000 (5초)
auto.offset.reset

none: 이전 offset값을 찾지 못하면 error 발생
latest

 

Kafka에서 공식 다큐먼트로 Consumer 옵션들이 있다 참고하면 된다~! 

kafka.apache.org/documentation/#consumerconfigs

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

참고 -
카프카 컨슈머 애플리케이션 배포 전략 (medium.com/11st-pe-techblog/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%EB%B0%B0%ED%8F%AC-%EC%A0%84%EB%9E%B5-4cb2c7550a72)
kafka 설정을 사용한 문제해결  saramin.github.io/2019-09-17-kafka/

 

 

2020/10/12 - [Delvelopment/Kafka] - [Kafka] Topic 메세지 보관주기 설정 (MSK)

2020/10/12 - [Delvelopment/Kafka] - [kafka Connect] 주기적으로 수행되는 무거운 쿼리 ALL_OBJECTS (table.poll.interval.ms)

2020/08/05 - [Delvelopment/Self-MSA구축기] - [Kafka Tool] KaDeck 이용해 Topic, Message생성하기.

2020/06/29 - [Delvelopment/Kafka] - [MAC, Kafka] 맥에 Kafka 설치 하고 토픽생성. (Docker, homebrew, Apache)

2020/08/21 - [Delvelopment/Kafka] - MQ (Message queue)란?

2020/05/21 - [Delvelopment/Kafka] - [Apache Kafka] Kafka 란?

 

참고

https://medium.com/@mouli2k5/solution-for-kafka-commitfailedexception-c3a6c9baae51

https://hackernoon.com/kafka-says-it-is-likely-that-the-consumer-was-kicked-out-of-the-group-az1r34dm

https://stackoverflow.com/questions/35658171/kafka-commitfailedexception-consumer-exception

https://stackoverflow.com/questions/60924842/kafka-consumer-commitfailedexception

https://dev-jj.tistory.com/entry/Kafka-%EA%B0%99%EC%9D%80%EB%A9%94%EC%8B%9C%EC%A7%80%EB%A5%BC-%EB%B0%98%EB%B3%B5%EC%A0%81%EC%9C%BC%EB%A1%9C-%EC%86%8C%EB%B9%84%ED%96%88%EB%8D%98-%EB%A6%AC%EB%B0%B8%EB%9F%B0%EC%8B%B1-%EC%9D%B4%EC%8A%88-%ED%95%B4%EA%B2%B0-MAX-POLL-RECORDS-CONFIG-maxpollrecord

https://getto215.github.io/kafka-architecture-2/

반응형

'SERVER > Kafka' 카테고리의 다른 글

[kafka] 에러 java.nio.BufferUnderflowException 해결방안  (0) 2020.08.13

댓글