String kafkaDir = "/path/to/kafka";
String kafkaZipFile = "/path/to/kafka/kafka_2.13-2.7.0.tgz";
String kafkaExtractCmd = "tar -xzf " + kafkaZipFile + " -C " + kafkaDir;
Process process = Runtime.getRuntime().exec(kafkaExtractCmd);
process.waitFor();
修改Zookeeper配置文件 zookeeper.properties
:
String zookeeperConfigFile = kafkaDir + "/config/zookeeper.properties";
String zookeeperConnectConfig = "localhost:2181";
Files.write(Paths.get(zookeeperConfigFile), ("dataDir=/tmp/zookeeper\n" +
"clientPort=2181\n" +
"maxClientCnxns=0\n" +
"server.1=" + zookeeperConnectConfig).getBytes());
修改Kafka配置文件 server.properties
:
String kafkaConfigFile = kafkaDir + "/config/server.properties";
String zookeeperConnectConfig = "localhost:2181";
String kafkaBrokerId = "0";
Files.write(Paths.get(kafkaConfigFile), ("broker.id=" + kafkaBrokerId + "\n" +
"zookeeper.connect=" + zookeeperConnectConfig + "\n" +
"log.dirs=/tmp/kafka-logs").getBytes());
String kafkaLogDir = "/tmp/kafka-logs";
String zookeeperDataDir = "/tmp/zookeeper";
new File(kafkaLogDir).mkdirs();
new File(zookeeperDataDir).mkdirs();
String zookeeperStartupCmd = kafkaDir + "/bin/zookeeper-server-start.sh " +
kafkaDir + "/config/zookeeper.properties";
Process zookeeperProcess = Runtime.getRuntime().exec(zookeeperStartupCmd);
String kafkaStartupCmd = kafkaDir + "/bin/kafka-server-start.sh " +
kafkaDir + "/config/server.properties";
Process kafkaProcess = Runtime.getRuntime().exec(kafkaStartupCmd);
现在,Kafka和Zookeeper已经启动成功。完美,可以使用Java代码进行Kafka相关的操作。