首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spring-Kafka JsonDeserializer如何关闭trusted.packages检查?

在Spring Kafka中,JsonDeserializer 默认情况下会检查反序列化的类是否在 trusted.packages 列表中,以防止反序列化攻击。如果你想关闭这个检查,可以将 trusted.packages 设置为 *,这意味着信任所有包。

以下是如何配置 JsonDeserializer 以关闭 trusted.packages 检查的示例:

使用Java配置

代码语言:javascript
复制
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, MyCustomObject> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // 关闭trusted.packages检查

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyCustomObject.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyCustomObject> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyCustomObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

使用application.properties配置

如果你更喜欢使用配置文件来设置这些属性,可以在 application.properties 文件中进行配置:

代码语言:javascript
复制
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroupId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

使用application.yml配置

如果你使用的是 application.yml 文件,可以这样配置:

代码语言:javascript
复制
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: myGroupId
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring:
          deserializer:
            value:
              delegate:
                class: org.springframework.kafka.support.serializer.JsonDeserializer
          json:
            trusted:
              packages: "*"

通过以上配置,你可以关闭 JsonDeserializertrusted.packages 检查,从而信任所有包。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

  • Apache Kafka-通过concurrency实现并发消费

    ---- 概述 默认情况下, Spring-Kafka @KafkaListener 串行消费的。缺点显而易见生产者生产的数据过多时,消费端容易导致消息积压的问题。...举个例子 : 如果设置 concurrency=2 时,Spring-Kafka 就会为该 @KafkaListener标注的方法消费的消息 创建 2个线程,进行并发消费。...Spring-Kafka 提供的并发消费,需要创建多个 Kafka Consumer 对象,并且每个 Consumer 都单独分配一个线程,然后 Consumer 拉取完消息之后,在各自的线程中执行消费...org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer...* @since 2.1.2 */ String beanRef() default "__listener"; 分布式下的concurrency 第一个单元测试,不要关闭,我们继续启动单元测试

    6.5K20

    一次机房停电引发的思考

    版本信息 spring-boot:2.0.6.RELEASE spring-kafka:2.1.2.RELEASE kafka-clients:1.0.2 为什么阻塞了 60s?...RecordMetadata> send(ProducerRecord record, Callback callback) {} 根据文档的说明[1]它是一个异步的发送方法,按道理不管如何它都不应该阻塞主线程...关闭自动重试 retries=0 默认就是 0 其他 acks=0,acks 有 4 个选项[all, -1, 0, 1] 。...的损耗比较严重,这个还要等到后续压测之后再更新文章吧 参考文章 站在巨人的肩膀上 Kafka producer 异步发送在某些情况会阻塞主线程,使用时候慎重[6] HAVENT 原创 Spring Boot + Spring-Kafka...异步发送在某些情况会阻塞主线程,使用时候慎重: https://www.cnblogs.com/felixzh/p/11849296.html [7] HAVENT 原创 Spring Boot + Spring-Kafka

    78430

    实现自定义序列化和反序列化控制的5种方式

    通过实现 JsonSerializer 和 JsonDeserializer 接口,你可以完全控制序列化和反序列化过程中的行为,包括如何读取属性、生成 JSON 或者解析 JSON 等。...以下是一个示例,展示如何使用自定义序列化器和反序列化器来控制日期格式的序列化和反序列化: import com.fasterxml.jackson.core.JsonGenerator; import...以下是一个示例,展示如何使用 Mix-in Annotations 来控制日期格式的序列化和反序列化: import com.fasterxml.jackson.annotation.JsonFormat...下面是一个简单的示例,演示如何使用 BeanSerializerModifier 来实现自定义的序列化控制: import com.fasterxml.jackson.databind.ObjectMapper...在 changeProperties 方法中,我们检查属性名称是否为 “email”,如果是的话,就将其序列化器指定为自定义的 UpperCaseStringSerializer,以将 email 字段的值序列化为大写形式

    1.1K10

    Spring Boot Kafka概览、配置及优雅地实现发布订阅

    这将实际关闭生产者并将其从ThreadLocal中移除。调用reset()或destroy()不会清理这些生产者。...从Spring Kafka2.2.7版开始,你可以将RecordInterceptor添加到侦听器容器中;在调用侦听器以允许检查或修改记录之前,将调用它。如果拦截器返回null,则不调用侦听器。...模式匹配将针对检查时存在的主题周期性地执行。表达式必须解析为主题模式(支持字符串或模式结果类型)。这使用组管理,Kafka将为组成员分配分区。...你还可以配置Spring Kafka JsonDeserializer,如下所示: spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer...allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic spring.kafka.listener.missing-topics-fatal=true # 非响应消费者的检查间隔时间

    15.4K72

    如何完美解决 org.springframework.http.converter.HttpMessageNotReadableException: JSON parse 错误

    作为一名开发者,掌握如何定位并解决这个错误显得尤为重要。接下来,猫头虎博主将带领大家深入探讨这个问题的成因和解决方法。 正文 1....解决方案 3.1 检查JSON格式 确保客户端发送的JSON格式正确,可以使用在线工具如 JSONLint 验证。...public class CustomDateDeserializer extends JsonDeserializer { private SimpleDateFormat dateFormat...Q2: 如何调试 HttpMessageNotReadableException? 可以通过日志查看详细错误信息,检查客户端发送的JSON数据格式是否正确。...Q3: 如何避免 HttpMessageNotReadableException? 通过严格的数据校验和格式检查,以及使用全局异常处理器来捕获并处理该异常。

    1.3K10

    从构建分布式秒杀系统聊聊WebSocket推送通知

    前言 秒杀架构到后期,我们采用了消息队列的形式实现抢购逻辑,那么之前抛出过这样一个问题:消息队列异步处理完每个用户请求后,如何通知给相应用户秒杀成功? 场景映射 ?...小喇叭叫到自己的排号相当于服务端通知用户秒杀成功,这时候可以进行支付逻辑 那些拿不到票号的同学,相当于队列已满直接返回秒杀失败 解决方案 通过上面的场景,我们很容易能够想到一种方案就是服务端通知,那么如何做到服务端异步通知的呢...subOnlineCount() { WebSocketServer.onlineCount--; } } KafkaConsumer 消费配置,通知用户是否秒杀成功: /** * 消费者 spring-kafka...CONNECTING(0) websocket正尝试与服务器建立连接 OPEN(1) websocket与服务器已经建立连接 CLOSING(2) websocket正在关闭与服务器的连接 CLOSED...思考 最后,思考一个问题:100件商品,假如有一万人进行抢购,该如何设置队列长度?

    1.5K20

    Spring认证中国教育管理中心-Spring Data MongoDB教程五

    MongoDB 模块JsonDeserializer通过GeoJsonConfiguration公开GeoJsonModule. org.springframework.data.mongodb.core.geo.GeoJsonPoint...有关如何创建索引结构的更多详细信息,请参阅文本索引。...以下示例显示了如何设置全文搜索: db.foo.createIndex( { title : "text", content : "text" }, { weights : {...指定是否检查文本是否需要归一化以及是否进行归一化。 排序规则可用于创建集合和索引。如果您创建一个指定排序规则的集合,除非您指定不同的排序规则,否则该排序规则将应用于索引创建和查询。...对象类型属性被检查并表示为嵌套文档。 StringCode由转换器转换为的类型属性。 @Transient 生成模式时省略属性。

    2.6K20
    领券