Skip to main content

Use the Half-Sync/Half-Async in the Workbench

๐ŸŽฏcontext

Your have a service that only offers async communication, but you need to call it synchronously.

Descriptionโ€‹

This How-To will cover how you can use kafka in a request/response scenario, respectively how to wait for a reply of an issued kafka event in the Workbench Java Domain Service solution. If a service only offers async communication, but needs to be called synchronously, then you can apply the Half-Sync/Half-Async pattern. A request to a 3rd party system is enqueued via kafka to a topic and the system waits until, the answer arrives on a dedicated response topic or times out.

The picture gives you a short overview:

Overview

Implementationโ€‹

It is recommended to rely on the Spring Boot integrated capabilities of the request-reply template for messaging with kafka. For a detailed description see the official documentation.

For applying this pattern in a kubernetes deployment with different pods, it is cructial to ensure, that every pod will receive the message. This is done by assigning unique IDs (consumer-ID) to every listener.

Exampleโ€‹

If you want to have a look at the example in the Solution Designer, please click here.

The example is offering one sample API to trigger the pattern. As a reply for this call, the input string will be just reversed. You can specify a boolean whether to indicate if the request should time out or not. The reply functionality is realized directly in this service again.

The example uses all the available mechanism available in the Workbench java Domain Service projects like topic binding, and also shows how to override certain settings.

Here are the interesting classes:

Configuring the Kafka Template for the requesterโ€‹

SyncAsyncPatternConfiguration.java
@Configuration
public class SyncAsyncPatternConfiguration {

private static Logger log = LoggerFactory.getLogger(SyncAsyncPatternConfiguration.class);

@Bean
public ReplyingKafkaTemplate<String, String, String> template(
TopicBindingConfiguration topicBindingConfig,
ConcurrentMessageListenerContainer<String, String> replyContainer) {

String topicAlias = "request";
String topicName = topicBindingConfig.getTopicBinding(topicAlias).getTopicName();
log.info("[topicName={}]", topicName);

Map<String, Object> config = topicBindingConfig.getProducerConfig(topicAlias, topicBindingConfig);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(config);

ReplyingKafkaTemplate<String, String, String> template = new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new MessagingMessageConverter());
template.setDefaultTopic(topicName);
// important setting to avoid error logs, if the event received within the wrong pod
template.setSharedReplyTopic(true);
return template;
}

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(TopicBindingConfiguration topicBindingConfig,
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory, @Value("${HOSTNAME:}") String hostname) {

String topicAlias = "reply";

String topicName = topicBindingConfig.getTopicBinding(topicAlias).getTopicName();

Properties kafkaConsumerProperties = MapUtils.toProperties(topicBindingConfig.getConsumerConfig(topicAlias, topicBindingConfig));
kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer(topicName);

// set an unique group ID to every listener container - we also could use the unique HOSTNAME, which is the pod name
if(StringUtils.isEmpty(hostname)) {
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique id
} else {
container.getContainerProperties().setGroupId(hostname); // unique pod name
}
log.info("using groupId {} for the listener container on topic-alias {}", container.getContainerProperties().getGroupId(), topicAlias);

kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(kafkaConsumerProperties);
return container;
}


}

Configuration to deal and re-use the topic-binding mechanismโ€‹

TopicBindingConfiguration.java
@Configuration
@ConfigurationProperties("k5.sdk.springboot.binding.topic")
public class TopicBindingConfiguration extends KafkaBaseConfig {

private static Logger log = LoggerFactory.getLogger(TopicBindingConfiguration.class);

@Value("${k5.sdk.springboot.deployment.identifier}")
private String deploymentIdentifier;

@Autowired
private KubernetesServiceBindingService kubernetesService;

/**
* key-value map where key is topic alias and value is topic binding
* configurations
*/
private Map<String, TopicBinding> topicBindings;

/**
* key-value map where key is kafka Binding Secret Name and value is Kafka
* Broker configurations
*/
private Map<String, KafkaBinding> kafkaBindings;

/**
* Topic Alias names
*/
private String[] topicAliases = { "request", "reply" };

/**
* This methods build necessary topic binding configuration using k8s secrets
*/
@PostConstruct
public void buildTopicBindingsFromSecrets() {

// If no local configurations
if (topicBindings == null && kafkaBindings == null) {

log.info("Building Topic Binding Configurations Using k8s secret(s)");

topicBindings = new HashMap<String, TopicBinding>();
kafkaBindings = new HashMap<String, KafkaBinding>();

for (String topicAlias : topicAliases) {

String topicBindingSecretName = format("{}-topic-{}", deploymentIdentifier, topicAlias);

// Read topic binding secret
TopicBinding topicBindingConfig = kubernetesService.getBinding(topicBindingSecretName, TopicBinding.class);
String kafkaBinding = topicBindingConfig.getKafkaBinding();

if (!kafkaBindings.containsKey(kafkaBinding)) {
// Read Kafka Broker Configurations
KafkaBinding kafkaBrokerConfig = kubernetesService.getBinding(kafkaBinding, KafkaBinding.class);
kafkaBindings.put(kafkaBinding, kafkaBrokerConfig);
}

topicBindings.put(topicAlias, topicBindingConfig);
}
} else {
log.info("Application is using Topic Binding(s) & Kafka Binding(s) from application configuration");
}
}

/**
* Get a single topic binding configurations
*
* @param topicAlias
* @return TopicBinding topic binding needed configurations.
*/
public TopicBinding getTopicBinding(String topicAlias) {
return topicBindings.get(topicAlias);
}

/**
* Get a single Kafka binding configurations
*
* @param kafkaBinding Secret name of Kafka Binding.
* @return KafkaBrokerConfig Kafka needed configurations.
*/
public KafkaBinding getKafkaBinding(String kafkaBinding) {
return kafkaBindings.get(kafkaBinding);
}

/**
* Setter method for topic bindings.
*
* @param topicBindings
*/
public void setTopicBindings(Map<String, TopicBinding> topicBindings) {
this.topicBindings = topicBindings;
}

/**
* Setter method for kafka bindings.
*
* @param kafkaBindings
*/
public void setKafkaBindings(Map<String, KafkaBinding> kafkaBindings) {
this.kafkaBindings = kafkaBindings;
}

/**
* getter method for all topic aliases.
*
* @returns String[] events topic aliases.
*/
public String[] getTopicAliases() {
return topicAliases;
}

public Map<String, Object> getProducerConfig(String topicAlias, TopicBindingConfiguration topicBindingConfig) {
TopicBinding topicBinding = topicBindingConfig.getTopicBinding(topicAlias);
KafkaBinding kafkaBinding = topicBindingConfig.getKafkaBinding(topicBinding.getKafkaBinding());

Map<String, Object> producerConfig = getProducerConfigs(kafkaBinding, deploymentIdentifier);
return producerConfig;
}

public Map<String, Object> getConsumerConfig(String topicAlias, TopicBindingConfiguration topicBindingConfig) {
TopicBinding topicBinding = topicBindingConfig.getTopicBinding(topicAlias);
KafkaBinding kafkaBinding = topicBindingConfig.getKafkaBinding(topicBinding.getKafkaBinding());

Map<String, Object> consumerConfig = getConsumerConfig(kafkaBinding);
return consumerConfig;
}
}

Issuing the request and waiting for the responseโ€‹

CallApiV1Provider.java
@Service
@ComponentScan(basePackages = "fsw.patterns.syncasync.sdk.api.v1.api")
public class CallApiV1Provider implements CallApiV1Delegate {

private static Logger log = LoggerFactory.getLogger(CallApiV1Provider.class);

private ReplyingKafkaTemplate<String, String, String> template;

public CallApiV1Provider(ReplyingKafkaTemplate<String, String, String> template) {
this.template = template;
}

@Override
public ResponseEntity<Response> doSomething(String input, Boolean timeout) {
Response resp = new Response();

try {
boolean time = Optional.ofNullable(timeout).orElse(Boolean.FALSE);

String payload = time + "-" + input;
log.info("Payload: {}", payload);
Message<String> msg = MessageBuilder.withPayload(payload).build();
log.debug("Message: {}", msg);

RequestReplyTypedMessageFuture<String, String, String> result = template
.sendAndReceive(msg, Duration.of(3L, ChronoUnit.SECONDS), new ParameterizedTypeReference<String>() {});

SendResult<String,String> sr = result.getSendFuture().get(10, TimeUnit.SECONDS);
log.info("sent with correlation: {}", sr.getProducerRecord().headers().lastHeader(KafkaHeaders.CORRELATION_ID).value());
String answer = result.get(10, TimeUnit.SECONDS).getPayload();

log.info("{} responded with {}", input, answer);

resp.setOutput(answer);
resp.setSuccess(SuccessEnum.SUCCESS);
} catch (Exception e) {
log.error("could not fulfil the request {}", e);
resp.setOutput(e.getMessage());
resp.setSuccess(SuccessEnum.ERROR);
}
return ResponseEntity.ok(resp);

}

}

The server part which replies to the messageโ€‹

ReplyerConfiguration.java
@Configuration
public class ReplyerConfiguration {

private static Logger log = LoggerFactory.getLogger(ReplyerConfiguration.class);

public ReplyerConfiguration() {
log.info("initializing the configuration class {}", ReplyerConfiguration.class.getName());
}

@Bean
public KafkaTemplate<String, Object> kafkaReplyerTemplate(TopicBindingConfiguration topicBindingConfig) {
String topicAlias = "reply";

Map<String, Object> config = topicBindingConfig.getProducerConfig(topicAlias, topicBindingConfig);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

DefaultKafkaProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(config);
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory);
template.setMessageConverter(new MessagingMessageConverter());

return template;
}

@Bean
public ConcurrentMessageListenerContainer<String, Object> replyerContainer(Replyer replyer, TopicBindingConfiguration topicBindingConfig,
ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory) {

String topicAlias = "request";

Properties kafkaConsumerProperties = MapUtils.toProperties(topicBindingConfig.getConsumerConfig(topicAlias, topicBindingConfig));
kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

log.info("using alias {} with properties {}", topicAlias, kafkaConsumerProperties);
String topicName = topicBindingConfig.getTopicBinding(topicAlias).getTopicName();
log.info("[topicName={}]", topicName);

ConcurrentMessageListenerContainer<String, Object> container = containerFactory.createContainer(topicName);

String groupId = topicName + "syncasync-manual";
log.info("[groupId={}]", groupId);
container.getContainerProperties().setGroupId(groupId);
kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(kafkaConsumerProperties);

container.setConcurrency(3);

container.setupMessageListener(replyer);

return container;

}

@Bean
public Replyer replyer(@Qualifier(value = "kafkaReplyerTemplate") KafkaTemplate<String, Object> template) {
Replyer r = new Replyer(template);
return r;
}

public static class Replyer implements MessageListener<String, Object> {

private static Logger log = LoggerFactory.getLogger(Replyer.class);

private KafkaTemplate<String, Object> template;

public Replyer(KafkaTemplate<String, Object> template) {
this.template = template;
}

@Override
public void onMessage(ConsumerRecord<String, Object> data) {
log.debug("Server received: {}", data);

String payload;
boolean timeout = false;

String value = (String) data.value();

if(value.startsWith("true-")) {
timeout = true;
payload = value.substring("true-".length());
} else {
payload = value.substring("false-".length());
}

org.apache.kafka.common.header.Header corr = data.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
String topic = new String(data.headers().lastHeader(KafkaHeaders.REPLY_TOPIC).value(), StandardCharsets.UTF_8);

String answer = StringUtils.reverse(payload);
log.info("Reply: {}", answer);

ProducerRecord<String, Object> record = new ProducerRecord<String, Object>(topic, answer);
record.headers().add(corr);
if(timeout) {
Long cnt = 0L;
log.info("wait for some moments");
for(int i=0; i<Integer.MAX_VALUE; i++) {
cnt = cnt+i;
}
}
CompletableFuture<SendResult<String, Object>> send = template.send(record);
try {
log.info(send.get(5, TimeUnit.SECONDS).getRecordMetadata().toString());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("reply not possible", e);
}
}
}


}

Resourcesโ€‹

๐ŸŒŸresult

Congratulations! You have successfully use the Half-Sync/Half-Async in the Workbench.