青岛华金科技

docker搭建zookeeper集群和kafka集群并使用Java测试详解

2026-03-27 07:42:01 浏览次数:0
详细信息

一、Docker搭建Zookeeper集群

1.1 创建docker-compose.yml文件

version: '3.8'

services:
  zookeeper1:
    image: zookeeper:3.7
    container_name: zookeeper1
    restart: always
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zookeeper1:2888:3888;2181 server.2=zookeeper2:2888:3888;2181 server.3=zookeeper3:2888:3888;2181
    volumes:
      - ./zookeeper1/data:/data
      - ./zookeeper1/datalog:/datalog

  zookeeper2:
    image: zookeeper:3.7
    container_name: zookeeper2
    restart: always
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zookeeper1:2888:3888;2181 server.2=zookeeper2:2888:3888;2181 server.3=zookeeper3:2888:3888;2181
    volumes:
      - ./zookeeper2/data:/data
      - ./zookeeper2/datalog:/datalog

  zookeeper3:
    image: zookeeper:3.7
    container_name: zookeeper3
    restart: always
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zookeeper1:2888:3888;2181 server.2=zookeeper2:2888:3888;2181 server.3=zookeeper3:2888:3888;2181
    volumes:
      - ./zookeeper3/data:/data
      - ./zookeeper3/datalog:/datalog

1.2 启动Zookeeper集群

# 创建目录
mkdir -p {zookeeper1,zookeeper2,zookeeper3}/{data,datalog}

# 启动集群
docker-compose up -d

# 查看运行状态
docker-compose ps

# 查看日志
docker-compose logs -f

# 测试集群状态
echo stat | nc localhost 2181
echo stat | nc localhost 2182
echo stat | nc localhost 2183

二、Docker搭建Kafka集群

2.1 创建docker-compose-kafka.yml文件

version: '3.8'

services:
  kafka1:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka1
    restart: always
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://localhost:9092
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_NUM_PARTITIONS: 3
    volumes:
      - ./kafka1/data:/var/lib/kafka/data

  kafka2:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka2
    restart: always
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://localhost:9093
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:19093,EXTERNAL://0.0.0.0:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_NUM_PARTITIONS: 3
    volumes:
      - ./kafka2/data:/var/lib/kafka/data

  kafka3:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka3
    restart: always
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://localhost:9094
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:19094,EXTERNAL://0.0.0.0:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_NUM_PARTITIONS: 3
    volumes:
      - ./kafka3/data:/var/lib/kafka/data

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    restart: always
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka1:19092,kafka2:19093,kafka3:19094"
      KAFKA_CLUSTERS_0_ZOOKEEPER: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
    depends_on:
      - kafka1
      - kafka2
      - kafka3

2.2 启动Kafka集群

# 创建目录
mkdir -p kafka1 kafka2 kafka3

# 启动Kafka集群
docker-compose -f docker-compose-kafka.yml up -d

# 查看运行状态
docker-compose -f docker-compose-kafka.yml ps

# 测试Kafka
# 进入容器
docker exec -it kafka1 /bin/bash

# 创建topic
kafka-topics --create --topic test-topic --partitions 3 --replication-factor 3 --bootstrap-server kafka1:19092

# 查看topic
kafka-topics --describe --topic test-topic --bootstrap-server kafka1:19092

# 生产消息
kafka-console-producer --topic test-topic --bootstrap-server kafka1:19092

# 消费消息(新终端)
kafka-console-consumer --topic test-topic --from-beginning --bootstrap-server kafka1:19092

三、Java测试代码

3.1 Maven依赖

<dependencies>
    <!-- Kafka客户端 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>

    <!-- Slf4j日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.36</version>
    </dependency>

    <!-- JSON处理 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.14.2</version>
    </dependency>
</dependencies>

3.2 生产者示例

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerExample {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    private static final String TOPIC_NAME = "test-topic";
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public static void main(String[] args) {
        // 1. 创建配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 2. 生产者配置优化
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
        props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证顺序
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩
        props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 延迟发送
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024); // 批量大小

        // 3. 创建生产者
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {

            // 4. 发送同步消息
            System.out.println("开始发送同步消息...");
            for (int i = 1; i <= 10; i++) {
                User user = new User("user-" + i, "email-" + i + "@test.com", i * 10);
                String message = objectMapper.writeValueAsString(user);

                ProducerRecord<String, String> record = new ProducerRecord<>(
                    TOPIC_NAME, 
                    String.valueOf(i % 3), // 按用户ID分区的key
                    message
                );

                // 同步发送
                RecordMetadata metadata = producer.send(record).get();
                System.out.printf("同步发送成功 - Topic: %s, Partition: %d, Offset: %d, Key: %s%n",
                    metadata.topic(), metadata.partition(), metadata.offset(), record.key());

                Thread.sleep(100);
            }

            // 5. 发送异步消息
            System.out.println("\n开始发送异步消息...");
            for (int i = 11; i <= 20; i++) {
                User user = new User("user-" + i, "email-" + i + "@test.com", i * 10);
                String message = objectMapper.writeValueAsString(user);

                ProducerRecord<String, String> record = new ProducerRecord<>(
                    TOPIC_NAME, 
                    String.valueOf(i % 3),
                    message
                );

                // 异步发送
                producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("异步发送成功 - Topic: %s, Partition: %d, Offset: %d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
                        System.err.println("异步发送失败: " + exception.getMessage());
                    }
                });

                Thread.sleep(100);
            }

            // 6. 发送带回调的消息
            System.out.println("\n开始发送带回调的消息...");
            for (int i = 21; i <= 30; i++) {
                User user = new User("user-" + i, "email-" + i + "@test.com", i * 10);
                String message = objectMapper.writeValueAsString(user);

                ProducerRecord<String, String> record = new ProducerRecord<>(
                    TOPIC_NAME, 
                    String.valueOf(i % 3),
                    message
                );

                producer.send(record, new CustomCallback(i));
                Thread.sleep(100);
            }

            producer.flush();
            System.out.println("\n所有消息发送完成!");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static class CustomCallback implements Callback {
        private final int messageId;

        public CustomCallback(int messageId) {
            this.messageId = messageId;
        }

        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                System.out.printf("回调消息%d发送成功 - Partition: %d, Offset: %d%n",
                    messageId, metadata.partition(), metadata.offset());
            } else {
                System.err.printf("回调消息%d发送失败: %s%n", messageId, exception.getMessage());
            }
        }
    }

    static class User {
        private String username;
        private String email;
        private int age;

        public User(String username, String email, int age) {
            this.username = username;
            this.email = email;
            this.age = age;
        }

        // getters and setters
        public String getUsername() { return username; }
        public void setUsername(String username) { this.username = username; }
        public String getEmail() { return email; }
        public void setEmail(String email) { this.email = email; }
        public int getAge() { return age; }
        public void setAge(int age) { this.age = age; }
    }
}

3.3 消费者示例

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaConsumerExample {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    private static final String TOPIC_NAME = "test-topic";
    private static final String GROUP_ID = "test-consumer-group";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final AtomicBoolean running = new AtomicBoolean(true);

    public static void main(String[] args) {
        // 1. 创建消费者配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 2. 消费者配置优化
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早开始消费
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次poll最大记录数
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // poll间隔
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); // 会话超时
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 心跳间隔

        // 3. 创建消费者
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {

            // 添加关闭钩子
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("正在关闭消费者...");
                running.set(false);
                consumer.wakeup();
            }));

            // 4. 订阅主题
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));
            System.out.println("消费者已启动,开始消费消息...");

            // 5. 消费消息
            while (running.get()) {
                try {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                    if (!records.isEmpty()) {
                        System.out.printf("收到 %d 条消息%n", records.count());

                        // 按分区处理消息
                        for (TopicPartition partition : records.partitions()) {
                            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                            System.out.printf("分区 %d 有 %d 条消息%n", 
                                partition.partition(), partitionRecords.size());

                            for (ConsumerRecord<String, String> record : partitionRecords) {
                                processMessage(record);
                            }
                        }

                        // 手动提交偏移量(异步提交)
                        consumer.commitAsync((offsets, exception) -> {
                            if (exception != null) {
                                System.err.println("提交偏移量失败: " + exception.getMessage());
                            }
                        });
                    }

                } catch (WakeupException e) {
                    // 忽略,用于关闭消费者
                } catch (Exception e) {
                    System.err.println("消费消息异常: " + e.getMessage());
                }
            }

            // 最终提交
            try {
                consumer.commitSync();
                System.out.println("最终提交偏移量完成");
            } catch (Exception e) {
                System.err.println("最终提交偏移量失败: " + e.getMessage());
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void processMessage(ConsumerRecord<String, String> record) {
        try {
            User user = objectMapper.readValue(record.value(), User.class);

            System.out.printf("收到消息 - Topic: %s, Partition: %d, Offset: %d, Key: %s%n",
                record.topic(), record.partition(), record.offset(), record.key());
            System.out.printf("消息内容 - User: %s, Email: %s, Age: %d%n%n",
                user.getUsername(), user.getEmail(), user.getAge());

        } catch (Exception e) {
            System.err.println("解析消息失败: " + e.getMessage());
        }
    }

    // 重分配监听器示例
    static class CustomRebalanceListener implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("分区被撤销: " + partitions);
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("分区被分配: " + partitions);
        }
    }

    static class User {
        private String username;
        private String email;
        private int age;

        // 必须有无参构造函数
        public User() {}

        public User(String username, String email, int age) {
            this.username = username;
            this.email = email;
            this.age = age;
        }

        // getters and setters
        public String getUsername() { return username; }
        public void setUsername(String username) { this.username = username; }
        public String getEmail() { return email; }
        public void setEmail(String email) { this.email = email; }
        public int getAge() { return age; }
        public void setAge(int age) { this.age = age; }
    }
}

3.4 高级特性示例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;

public class KafkaAdvancedFeatures {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                 "localhost:9092,localhost:9093,localhost:9094");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {

            // 1. 获取主题分区信息
            System.out.println("=== 获取主题分区信息 ===");
            List<PartitionInfo> partitions = producer.partitionsFor("test-topic");
            partitions.forEach(p -> System.out.printf("分区: %d, Leader: %s, Replicas: %s%n",
                p.partition(), p.leader(), p.replicas()));

            // 2. 事务生产者示例
            System.out.println("\n=== 事务生产者示例 ===");
            Properties txProps = new Properties();
            txProps.putAll(props);
            txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
            txProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

            try (KafkaProducer<String, String> txProducer = new KafkaProducer<>(txProps)) {
                // 初始化事务
                txProducer.initTransactions();

                try {
                    // 开始事务
                    txProducer.beginTransaction();

                    // 发送事务消息
                    for (int i = 1; i <= 3; i++) {
                        ProducerRecord<String, String> record = new ProducerRecord<>(
                            "test-topic", "key-" + i, "transaction-message-" + i);
                        txProducer.send(record);
                        System.out.println("发送事务消息: " + i);
                    }

                    // 提交事务
                    txProducer.commitTransaction();
                    System.out.println("事务提交成功");

                } catch (Exception e) {
                    // 回滚事务
                    txProducer.abortTransaction();
                    System.err.println("事务回滚: " + e.getMessage());
                    throw e;
                }
            }

            // 3. 自定义分区器示例(如果需要)
            System.out.println("\n=== 自定义路由示例 ===");
            // Kafka默认使用hash(key) % partitions进行分区

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

四、测试脚本

4.1 集群验证脚本

#!/bin/bash
# cluster-test.sh

echo "=== 检查Zookeeper集群 ==="
echo "stat | nc localhost 2181"
echo "stat | nc localhost 2182"
echo "stat | nc localhost 2183"

echo -e "\n=== 检查Kafka集群 ==="
echo "列出所有主题:"
docker exec kafka1 kafka-topics --list --bootstrap-server localhost:9092

echo -e "\n=== 查看test-topic详情 ==="
docker exec kafka1 kafka-topics --describe --topic test-topic --bootstrap-server localhost:9092

echo -e "\n=== 查看消费者组 ==="
docker exec kafka1 kafka-consumer-groups --list --bootstrap-server localhost:9092

echo -e "\n=== Kafka UI访问地址 ==="
echo "http://localhost:8080"

4.2 性能测试脚本

#!/bin/bash
# kafka-performance-test.sh

echo "=== Kafka性能测试 ==="

echo "1. 创建测试主题"
docker exec kafka1 kafka-topics --create \
  --topic performance-test \
  --partitions 3 \
  --replication-factor 3 \
  --bootstrap-server localhost:9092

echo "2. 生产者性能测试"
docker exec kafka1 kafka-producer-perf-test \
  --topic performance-test \
  --num-records 100000 \
  --record-size 1024 \
  --throughput 10000 \
  --producer-props bootstrap.servers=localhost:9092

echo "3. 消费者性能测试"
docker exec kafka1 kafka-consumer-perf-test \
  --topic performance-test \
  --messages 100000 \
  --bootstrap-server localhost:9092

五、常见问题解决

5.1 连接问题

// 如果遇到连接问题,可以尝试以下配置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 10000);

// 消费者配置
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 1000);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 10000);

5.2 内存配置调整

# 在docker-compose中调整Kafka内存
environment:
  KAFKA_HEAP_OPTS: "-Xmx2G -Xms2G"
  KAFKA_JVM_PERFORMANCE_OPTS: "-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"

六、监控和管理

6.1 使用Kafka UI

访问 http://localhost:8080 可以查看:

6.2 JMX监控

# 在docker-compose中添加JMX配置
environment:
  JMX_PORT: 9999
  KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost"

总结

这个完整的Docker集群搭建和Java测试方案提供了:

完整的集群配置:3节点Zookeeper + 3节点Kafka 高可用性:所有组件都有冗余 Java客户端示例:包含生产者和消费者的完整实现 高级特性:事务、异步回调、自定义分区等 监控工具:Kafka UI用于可视化监控 故障恢复:配置了持久化存储

你可以根据需要调整配置参数,如分区数、副本因子、内存设置等,以适应不同的业务场景。

相关推荐