Thursday, March 2, 2023

Using Kafka with Spring Boot

 KafkaConsumerConfig

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, BureauDataDTO> emiPaymentConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(), new JsonDeserializer(BureauDataDTO.class));
}

@Bean
ConcurrentKafkaListenerContainerFactory<String, BureauDataDTO> kafkaEmiPaymentListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BureauDataDTO> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(emiPaymentConsumerFactory());
containerFactory.setConcurrency(3);
containerFactory.getContainerProperties().setPollTimeout(3000);
containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return containerFactory;
}

KafkaProducerConfig

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

Producer:

@Autowired
private KafkaProducerService kafkaProducerService;
private void pushCreditReportMessageToKafka(Long userId, String reportId, BureauType bureauType) {
try {
sendKafkaMessage(new BureauDataDTO(userId, reportId, bureauType));
} catch (Exception exception) {
log.error("Exception occurred in pushCreditReportMessageToKafka ", exception);
}
}

@Async
private void sendKafkaMessage(BureauDataDTO dto) {
log.debug("Inside sendKafkaMessage");
String s = getObjectAsString(dto);
if (StringUtils.isNotEmpty(s)) {
kafkaProducerService.sendMessage(s, bureauTransfer);
}
}

Consumer:

@KafkaListener(topics = "${kafka.topics.bureau-transfer}", groupId = "bureau-transfer-group", containerFactory = "kafkaEmiPaymentListenerContainerFactory")
public void processBureauTransfer(@Payload BureauDataDTO dto) {
log.info("Inside processBureauTransfer {}", dto);
process(dto);
}








No comments:

Post a Comment