在Spring-Kafka的消费者中,跳过JSON反序列化类型转换错误

可能碰到在spring-kafka中无法继续处理的消息,例如JSON类型转换等情况,希望跳过。以下是可能适用的配置示例。

plugins {
  id 'org.springframework.boot' version '2.4.2'
  id 'io.spring.dependency-management' version '1.0.11.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '15'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.kafka:spring-kafka-test'
  implementation 'com.fasterxml.jackson.core:jackson-databind'
}
test {
  useJUnitPlatform()
}

首先,您需要在属性中将consumer的反序列化器设置为org.springframework.kafka.support.serializer.ErrorHandlingDeserializer,并通过spring.kafka.properties.spring.deserializer.value.delegate.class属性指定其删除的位置。这将指定一个JSON转换类。通过这样做,JSON转换错误将仅简单地被跳过。

spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

以下是一个适用于操作验证的随意消费者。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaSampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaSampleApplication.class, args);
    }
}

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleData {
    String id;
    int value;
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class JsonConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(SampleData content) {
        System.out.println("content" + content);
    }
}

尽管如此,实际上还有许多其他需求,比如想要记录日志、在发生错误时进行特殊处理等等,所以还需要一些更细致的设置。

bannerAds