Produce and Consume Events in Java Basic service
In order to integrate an event against Kafka, you need to publish an event to a Kafka topic.
Descriptionโ
This How-To will show you how to produce and consume an event to a Kafka topic in a Java Basic service project. The instructions are based on generated Java classes by IBM DevOps Solution Workbench.
Production of an eventโ
Preconditionsโ
- You created a Java Basic service project with the Solution Designer.
- You have already implemented a Data Transfer Object (DTO), which contains the information that you want to transfer via kafka message.
- You have a service class that provides a DTO that shall be sent by event.
Stepsโ
Convert the DTO to a JsonNodeโ
The generated EventService class wants the payload to be a JsonNode from FasterXML Jackson. To map your Data Transfer Object to a JsonNode, you can use an instance of ObjectMapper, also provided by FasterXML Jackson.
-
In your custom created service import the dependent classes and inject instances of the
ObjectMapperandEventService:MyService.javapackage myproject.service;
import myproject.dto.OrdersEvent;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class MyService {
private EventService eventService;
private ObjectMapper mapper;
public MyService (EventService eventService, ObjectMapper mapper) {
this.eventService = eventService;
this.mapper = ObjectMapper;
}
// Class body ...
} -
Create your event payload:
MyService.javapublic JsonNode createPayload() throws JsonProcessingException {
OrdersEvent dto = new OrdersEvent();
// Assign attribute values ...
String str = mapper.writeValueAsString(dto);
JsonNode payload = mapper.readTree(str);
return payload;
}
Send the eventโ
Finally send your event by calling the EventService. Provide your payload and your desired topic name;
public void sendMyEvent() {
try{
JsonNode payload = this.createPayload();
eventService.sendEvent("myEventTopic", payload);
} catch (Exception e) {
// Exception handling
}
}
The EventService is using a KafkaTopicHelper internally. This changes the topic name to something like my-deployment-target.project-acronym.my-topic. To change this behaviour, you need to adjust the EventService.
Just comment out the line that defines the String fullyQualifiedTopicName and replace it with your own value:
public JsonNode sendEvent(String topic, JsonNode payload) throws InterruptedException, ExecutionException, TimeoutException {
log.info("send event to topic {}", topic);
//String fullyQualifiedTopicName = topicsHelper.getFullyQualifiedTopicName(topic);
String fullyQualifiedTopicName = topic;
ListenableFuture<SendResult<String, JsonNode>> future = kafkaTemplate.send(fullyQualifiedTopicName, payload);
SendResult<String, JsonNode> sendResult = future.get(10, TimeUnit.SECONDS);
return sendResult.getProducerRecord().value();
}
If you do not want to hardcode the topic name, you can read it from a custom environment variable.
A topic gets created automatically, when its name is in use for the first time. There is no need to create it manually.
Consumtion of an eventโ
Please note that the links to the workbench tools in this tutorial only apply to the IBM Education Environment we provide. If you are using a different environment, e.g. your own installation, you will need to navigate directly to the required tools.
Preconditionsโ
- You created a Java Basic service project with the Solution Designer.
- You have another service, which provides the event.
Stepsโ
Remodel the event payloadโ
Create a simple class to represent the attributes of the events' payload within a simple Data Transfer Object. Example:
public class OrdersEvent {
private String orderId;
private String orderStatus;
// ...
public OrdersEvent(int orderId, String orderStatus) {
this.orderId = orderId;
this.orderStatus = orderStatus;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getOrderId() {
return orderId;
}
public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
public String getOrderStatus() {
return orderStatus;
}
}
Provide the topic name and start the listenerโ
Edit the sampleListener method in the file src/java/myproject/config/KafkaConfiguration.java. Change the String "myEventTopic" to your desired topic name and start the listener by removing the comment:
// ...
@Configuration
@EnableKafka
public class KafkaConfiguration {
// ...
@Bean
@Autowired
public EventListenerImpl sampleListener(KafkaTopicsHelper, ConsumerFactory<String, JsonNode> consumerFactory) {
String fullyQualifiedTopicName = topicsHelper.getFullyQualifiedTopicName("myEventTopic");
//...
// listener.start();
return listener;
}
}
The topicsHelper will change the topic name to something like my-deployment-target.project-acronym.my-topic. To change this behaviour, just assign the fullyQualifiedTopicName directly, without using the topicsHelper.
If you do not want to hardcode the topic name, you can read it from a custom environment variable here.
A topic gets created automatically, when its name is in use for the first time. There is no need to create it manually.
Edit the onMessage() methodโ
-
Have a look at the
EventListenerImplnested inside the filesrc/java/myproject/service/EventService.java. It has aonMessage()method. Your kafka event arrives here.EventService.java// ...
public class EventService {
// ...
public static class EventListenerImpl implements AcknowledgingMessageListener<String, JsonNode>, ErrorHandler, Lifecycle {
// ...
@Override
public void onMessage(ConsumerRecord<String, JsonNode> data, Acknowledgment acknowledgment) {
log.info("received payload='{}'", data);
lastEvent = data.value();
acknowledgment.acknowledge();
}
// ...
}
} -
The property
lastEventcontains the attributes for your Data Transfer Object asJsonNodefrom FasterXML Jackson. This library also contains anObjectMapperto convert the JsonNode to a native Java object (your DTO). EditonMessage()to map the data to your DTO accordingly:@Override
public void onMessage(ConsumerRecord<String, JsonNode> data, Acknowledgment acknowledgment) {
log.info("received payload='{}'", data);
lastEvent = data.value();
acknowledgment.acknowledge();
ObjectMapper mapper = new ObjectMapper();
OrdersEvent eventPayload = mapper.convertValue(lastEvent, EventMessage.class);
} -
You should create a separate service class to make use of the data you just received, so you can just add a method call to
onMessage(). Before, you need to inject an instance of your service to the EventListenerImpl. Make sure that your custom service has the@Serviceannotation so it can be auto-injected by Spring Boot.EventService.java// ...
public static class EventListenerImpl {
// ...
@Autowired
private MyService service;
// ...
@Override
public void onMessage(ConsumerRecord<String, JsonNode> data, Acknowledgment acknowledgment) {
// ...
OrdersEvent payload = mapper.convertValue(lastEvent, EventMessage.class);
service.useEvent(payload);
}
// ...
}
Well Done ! Congratulations ! You have successfully implemented the code to consume an event from a Kafka topic.