Web/스프링

kafka 컨슈머 실패시 Exception 으로 조건 구분하기 Exception체크

벨포트조던 2020. 8. 26.
반응형

목표

Exception을 다르게 발생시켜서 받는곳에서 익셉션에따라 다른 작동하게 하기

카프카에서 했던 내용이지 익셉션이 주된 내용이다

spring의 Exception핸들러라면 이런식으로 안하고 어노테이션으로 해도 자동으로 읽어갈텐데, 컨슈머는 익셉션 발생시 재시도(RecoveryCallback)에서 익셉션을 구분했어야 했다. ( commit 할지 안할지 선택해야해서 )

 

이 경우 익셉션을 구분하려고 여러문서를 찾아봄

 

전체코드중 ... 몇가지만 가능한 부분이었음 

처음에 정리를 안해놨더니.... 정확한건 기억이안남.

 

컨슈머에서

throw new BusinessException("http status not 200"); 발생 -> 재시도코드에서 받는다.

 

카프카 컨슈머 에러시 재시도 코드

@Bean
	public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
			RetryTemplate retryTemplate) {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setConcurrency(2);
		factory.setRetryTemplate(retryTemplate);
		factory.setRecoveryCallback(context -> {
			
//			Class cls = BusinessException.class;
			log.info("consumer retry -" + context.toString());
			log.info("consumer retry -" + context.getLastThrowable().getLocalizedMessage());
			log.info("consumer retry -" + context.getLastThrowable().getCause().getClass() );
			log.info("consumer retry -" + context.getLastThrowable().getCause().getClass().isAssignableFrom(BusinessException.class)  );
			log.info("consumer retry -" + (context.getLastThrowable().getCause().getClass() == BusinessException.class )   );
			log.info("consumer retry -" + ( BusinessException.class.isAssignableFrom(context.getLastThrowable().getCause().getClass()) )  );
			log.info("consumer retry -" + context.getLastThrowable().getClass());
			log.info("consumer retry -" + context.getLastThrowable().getClass().getName());
			log.info("consumer retry -" + context.getLastThrowable().getClass().getTypeName());
			log.info("consumer retry -" + context.getLastThrowable().getClass().getSimpleName());
			log.info("consumer retry -" + ( context.getLastThrowable() instanceof BusinessException  )   );
			
			
			
			ConsumerRecord record = (ConsumerRecord)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
			Acknowledgment acknowledgment = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
//			MongoCollection<Document> collection = permissionMongo.getDatabase("logs").getCollection("pusherror");
//			
//			Document doc = Document.parse(record.value().toString())
//					.append("registerdate", new Date());
//    
//            collection.insertOne(doc);
			
			// status code 200 아닐시 ( 원시그널 장애 )
			if (context.getLastThrowable().getCause().getClass().isAssignableFrom(BusinessException.class) ) {
				acknowledgment.acknowledge();
			}
			
			return null;
		});
		factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<< ?? 뭔지모름 
		factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
		return factory;
	}

 

 

적용 가능했던 코드 (기억상 )

log.info("consumer retry -" + context.getLastThrowable().getCause().getClass().isAssignableFrom(BusinessException.class)  );
log.info("consumer retry -" + (context.getLastThrowable().getCause().getClass() == BusinessException.class )   );
log.info("consumer retry -" + ( BusinessException.class.isAssignableFrom(context.getLastThrowable().getCause().getClass()) )  );

이 세개만 true로 나옴 

 

익셉션을

throw new BusinessException("http status not 200")

이걸로 발생시켰지만 다른 코드를 실행해보면 결과값이 다르다

기대값 : BusinessException

결과값 : Exception

 

 

 

 

참고문서

 

 

반응형

댓글