public class DefaultKafkaMessageConverter extends Object implements KafkaMessageConverter<String,byte[]>
EventMessage
to kafkaMessage
and message read from
Kafka
back
to EventMessage
(if possible).
During conversion it passes all meta-data entries with 'axon-metadata-'
prefix to Headers
. Other
message-specific attributes are added as metadata. The payload is serialized using the
configured Serializer
and passed as the message body.
This implementation will suffice in most cases.
Constructor and Description |
---|
DefaultKafkaMessageConverter(Serializer serializer)
Initializes the KafkaMessageConverter with the given
serializer . |
DefaultKafkaMessageConverter(Serializer serializer,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy,
BiFunction<String,Object,org.apache.kafka.common.header.internals.RecordHeader> headerValueMapper)
Initializes the KafkaMessageConverter with the given
serializer , sequencingPolicy and
objectMapper . |
Modifier and Type | Method and Description |
---|---|
org.apache.kafka.clients.producer.ProducerRecord<String,byte[]> |
createKafkaMessage(EventMessage<?> eventMessage,
String topic)
Creates
ProducerRecord for a given EventMessage |
Optional<EventMessage<?>> |
readKafkaMessage(org.apache.kafka.clients.consumer.ConsumerRecord<String,byte[]> consumerRecord)
Reconstruct an EventMessage from the given
ConsumerRecord . |
public DefaultKafkaMessageConverter(Serializer serializer)
serializer
.serializer
- The serializer to serialize the Event Message's payload with.public DefaultKafkaMessageConverter(Serializer serializer, SequencingPolicy<? super EventMessage<?>> sequencingPolicy, BiFunction<String,Object,org.apache.kafka.common.header.internals.RecordHeader> headerValueMapper)
serializer
, sequencingPolicy
and
objectMapper
.serializer
- The serializer to serialize the Event Message's payload and Meta Data withsequencingPolicy
- The policy to generate the key of the ProducerRecord
.headerValueMapper
- The Function for mapping values to Kafka headers.public org.apache.kafka.clients.producer.ProducerRecord<String,byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic)
KafkaMessageConverter
ProducerRecord
for a given EventMessage
createKafkaMessage
in interface KafkaMessageConverter<String,byte[]>
eventMessage
- the event message to send to Kafka.topic
- the Kafka topic.public Optional<EventMessage<?>> readKafkaMessage(org.apache.kafka.clients.consumer.ConsumerRecord<String,byte[]> consumerRecord)
KafkaMessageConverter
ConsumerRecord
. The returned optional
resolves to a message if the given input parameters represented a correct EventMessage
.readKafkaMessage
in interface KafkaMessageConverter<String,byte[]>
consumerRecord
- Event message represented inside kafkaCopyright © 2010–2018. All rights reserved.