Skip to main content

Produce and Consume Events in Java Basic service

๐ŸŽฏcontext

In order to integrate an event against Kafka, you need to publish an event to a Kafka topic.

Descriptionโ€‹

This How-To will show you how to produce and consume an event to a Kafka topic in a Java Basic service project. The instructions are based on generated Java classes by IBM DevOps Solution Workbench.

Production of an eventโ€‹

Preconditionsโ€‹

  • You created a Java Basic service project with the Solution Designer.
  • You have already implemented a Data Transfer Object (DTO), which contains the information that you want to transfer via kafka message.
  • You have a service class that provides a DTO that shall be sent by event.

Stepsโ€‹

Convert the DTO to a JsonNodeโ€‹

The generated EventService class wants the payload to be a JsonNode from FasterXML Jackson. To map your Data Transfer Object to a JsonNode, you can use an instance of ObjectMapper, also provided by FasterXML Jackson.

  1. In your custom created service import the dependent classes and inject instances of the ObjectMapper and EventService:

    MyService.java
    package myproject.service;

    import myproject.dto.OrdersEvent;

    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;

    @Service
    public class MyService {

    private EventService eventService;
    private ObjectMapper mapper;

    public MyService (EventService eventService, ObjectMapper mapper) {
    this.eventService = eventService;
    this.mapper = ObjectMapper;
    }

    // Class body ...
    }
  2. Create your event payload:

    MyService.java
    public JsonNode createPayload() throws JsonProcessingException {

    OrdersEvent dto = new OrdersEvent();
    // Assign attribute values ...

    String str = mapper.writeValueAsString(dto);
    JsonNode payload = mapper.readTree(str);

    return payload;
    }

Send the eventโ€‹

Finally send your event by calling the EventService. Provide your payload and your desired topic name;

MyService.java
public void sendMyEvent() {
try{
JsonNode payload = this.createPayload();
eventService.sendEvent("myEventTopic", payload);
} catch (Exception e) {
// Exception handling
}
}
โš warning

The EventService is using a KafkaTopicHelper internally. This changes the topic name to something like my-deployment-target.project-acronym.my-topic. To change this behaviour, you need to adjust the EventService.

Just comment out the line that defines the String fullyQualifiedTopicName and replace it with your own value:

EventService.java
public JsonNode sendEvent(String topic, JsonNode payload) throws InterruptedException, ExecutionException, TimeoutException {
log.info("send event to topic {}", topic);

//String fullyQualifiedTopicName = topicsHelper.getFullyQualifiedTopicName(topic);
String fullyQualifiedTopicName = topic;
ListenableFuture<SendResult<String, JsonNode>> future = kafkaTemplate.send(fullyQualifiedTopicName, payload);

SendResult<String, JsonNode> sendResult = future.get(10, TimeUnit.SECONDS);

return sendResult.getProducerRecord().value();
}
๐Ÿ’กtip

If you do not want to hardcode the topic name, you can read it from a custom environment variable.

โ„น๏ธnote

A topic gets created automatically, when its name is in use for the first time. There is no need to create it manually.

Consumtion of an eventโ€‹

โ„น๏ธnote

Please note that the links to the workbench tools in this tutorial only apply to the IBM Education Environment we provide. If you are using a different environment, e.g. your own installation, you will need to navigate directly to the required tools.

Preconditionsโ€‹

  • You created a Java Basic service project with the Solution Designer.
  • You have another service, which provides the event.

Stepsโ€‹

Remodel the event payloadโ€‹

Create a simple class to represent the attributes of the events' payload within a simple Data Transfer Object. Example:

OrdersEvent.java
public class OrdersEvent {

private String orderId;
private String orderStatus;
// ...

public OrdersEvent(int orderId, String orderStatus) {
this.orderId = orderId;
this.orderStatus = orderStatus;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getOrderId() {
return orderId;
}

public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
public String getOrderStatus() {
return orderStatus;
}
}

Provide the topic name and start the listenerโ€‹

Edit the sampleListener method in the file src/java/myproject/config/KafkaConfiguration.java. Change the String "myEventTopic" to your desired topic name and start the listener by removing the comment:

KafkaConfiguration.java
// ...
@Configuration
@EnableKafka
public class KafkaConfiguration {
// ...
@Bean
@Autowired
public EventListenerImpl sampleListener(KafkaTopicsHelper, ConsumerFactory<String, JsonNode> consumerFactory) {

String fullyQualifiedTopicName = topicsHelper.getFullyQualifiedTopicName("myEventTopic");
//...
// listener.start();
return listener;
}
}
โš warning

The topicsHelper will change the topic name to something like my-deployment-target.project-acronym.my-topic. To change this behaviour, just assign the fullyQualifiedTopicName directly, without using the topicsHelper.

๐Ÿ’กtip

If you do not want to hardcode the topic name, you can read it from a custom environment variable here.

โ„น๏ธnote

A topic gets created automatically, when its name is in use for the first time. There is no need to create it manually.

Edit the onMessage() methodโ€‹

  1. Have a look at the EventListenerImpl nested inside the file src/java/myproject/service/EventService.java. It has a onMessage() method. Your kafka event arrives here.

    EventService.java
    // ...
    public class EventService {
    // ...
    public static class EventListenerImpl implements AcknowledgingMessageListener<String, JsonNode>, ErrorHandler, Lifecycle {
    // ...
    @Override
    public void onMessage(ConsumerRecord<String, JsonNode> data, Acknowledgment acknowledgment) {
    log.info("received payload='{}'", data);
    lastEvent = data.value();
    acknowledgment.acknowledge();
    }
    // ...
    }
    }
  2. The property lastEvent contains the attributes for your Data Transfer Object as JsonNode from FasterXML Jackson. This library also contains an ObjectMapper to convert the JsonNode to a native Java object (your DTO). Edit onMessage() to map the data to your DTO accordingly:

    @Override
    public void onMessage(ConsumerRecord<String, JsonNode> data, Acknowledgment acknowledgment) {
    log.info("received payload='{}'", data);
    lastEvent = data.value();
    acknowledgment.acknowledge();

    ObjectMapper mapper = new ObjectMapper();
    OrdersEvent eventPayload = mapper.convertValue(lastEvent, EventMessage.class);
    }
  3. You should create a separate service class to make use of the data you just received, so you can just add a method call to onMessage(). Before, you need to inject an instance of your service to the EventListenerImpl. Make sure that your custom service has the @Service annotation so it can be auto-injected by Spring Boot.

    EventService.java
    // ... 
    public static class EventListenerImpl {
    // ...
    @Autowired
    private MyService service;
    // ...
    @Override
    public void onMessage(ConsumerRecord<String, JsonNode> data, Acknowledgment acknowledgment) {
    // ...
    OrdersEvent payload = mapper.convertValue(lastEvent, EventMessage.class);
    service.useEvent(payload);
    }
    // ...
    }
๐ŸŒŸresult

Well Done ! Congratulations ! You have successfully implemented the code to consume an event from a Kafka topic.