SERVER/Kafka

[kafka] 에러 java.nio.BufferUnderflowException 해결방안

벨포트조던 2020. 8. 13.
반응형

문제발생

에러1

[ERROR], 2020-08-12 15:31:17 KafkaMessageListenerContainer$ListenerConsumer:842 - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
	at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:92) ~[spring-kafka-2.2.2.RELEASE.jar!/:2.2.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:830) [spring-kafka-2.2.2.RELEASE.jar!/:2.2.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:687) [spring-kafka-2.2.2.RELEASE.jar!/:2.2.2.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_212]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:243) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175) ~[kafka-clients-2.0.1.jar!/:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) ~[kafka-clients-2.0.1.jar!/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719) ~[spring-kafka-2.2.2.RELEASE.jar!/:2.2.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676) ~[spring-kafka-2.2.2.RELEASE.jar!/:2.2.2.RELEASE]
	... 3 more

에러2

[ERROR], 2020-08-12 16:27:43 KafkaMessageListenerContainer$ListenerConsumer:149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.protocol.types.SchemaException's; no record information is available
	at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1110) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:902) [spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_212]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) ~[kafka-clients-2.3.0.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106) ~[kafka-clients-2.3.0.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262) ~[kafka-clients-2.3.0.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424) ~[kafka-clients-2.3.0.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) ~[kafka-clients-2.3.0.jar!/:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) ~[kafka-clients-2.3.0.jar!/:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) ~[kafka-clients-2.3.0.jar!/:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.3.0.jar!/:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.0.jar!/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:982) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:938) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890) ~[spring-kafka-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
	... 3 more

 

원인발생 조건

1. 기존 (java spring)으로 topic을 잘 컨슈밍하고 있는 상황

2. 같은 코드를 활용하여 새로운 토픽에 컨슈머를 등록해야했음
 - 토픽과 컨슈머는 기존에 존재했었고, 새롭게 컨슈머 서버를 만드는 상황

 

에러 발생 프로세스 

1. 똑같은 방식으로 똑같이 진행했고, consumer group-id 만 변경하여 사용하려고 함 

2. 근데 에러1번 발생 원인 해결이 안되었음 ... 에러에  version 이라고 적혀있어서, kafka 버전을 맞추려고 함 

 - 단순히 텍스트인 version 이다 다른사람들 에러보면 다른 문자열이 적혀있다. 

https://stackoverflow.com/questions/44417033/kafka-error-reading-field-correlation-id-java-nio-bufferunderflowexception

https://stackoverrun.com/ko/q/10355351

 

3. 일단 2번처럼 생각되어서 버전을 맞추려고 했음. 최대한 설치된 kafka 버전이랑 동일하게 맞췄음

 - kafka 버전 확인 후, spring에서 참조하는 버전 맞춤 

4. 이후 에러2번이 발생. 비슷한듯하나 에러메시지가 약간 다름

https://issues.apache.org/jira/browse/KAFKA-4349

https://issues.apache.org/jira/browse/KAFKA-2768

https://github.com/apache/kafka/pull/447

5. 위와 같이 검색을 해보아도 답이 없었음 .. 

 

해결 

1. 기존 kafka 서버가 3개로 cluster 구성이 되어있었다. 나중에 5개로 증설되었음.
2. 나는 5개로 증설된걸 전달못받아서, 3개로 계속 접근을 했었음.

3. 에러1을 해결하는 와중에 발견함. 이미 여러번 컨슈머로 접근한 상황이었음.

4. 중간에 서버 2개 추가하여 5개로 변경 -> 그래도 안됨 

5. consumer groupid 새걸로 변경 ( lag이 없었음 )

6. 잘됨 .... .. 

 

결론

솔직히.. 정확한 원인은 모르겠다.. 추측일 뿐이지..

검색으로도 데이터가 많이 부족했다

정황상 보았을때, kafka 클라이언트로 3개의 서버에 접근했을 시 클러스터 셋팅이 되었고(꼬였다고 느껴짐)
이로 인해, 5개로 다시 접근시 3개로 계속 접근하는게 아닌가 싶음

 

윈도우도 뭐 해결안되면, 재설치하듯이... 기존 컨슈머 삭제하고 새로 하니까 되드라.

 

 

반응형

댓글