目录
Apache Spark 和 Apache Kafka 是实时流处理领域两个非常流行的开源框架。 当组合使用时,它们可以实现高效且可扩展的实时数据处理和分析。 以下是在 Ubuntu 上安装和配置 Spark 和 Kafka 以进行实时流处理的简单步骤。
1. 安装 Java 开发工具包 (JDK)
Spark 需要 JDK 才能运行。 您可以使用以下命令安装 OpenJDK 11:
sudo apt update
sudo apt install openjdk-11-jdk
确保JDK安装成功:
java -version
2.下载并安装Apache Kafka
从Kafka官方网站下载最新版本:
wget https:// 下载。 apache.org/kafka/2.8.1/kafka_2.13-2.8.1.tgz
tar xzf kafka_2.13-2.8.1.tgz
cd kafka_2.13-2.8.1
那么,Kafka 启动服务器。
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
创建测试主题:
bin/kafka-topics.sh --create --topic 测试 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
向Kafka发送和消费消息:
bin/kafka-console -Producer .sh --broker -list localhost:9092 --topic 测试
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 测试 --from-beginning
3.下载并安装 Apache Spark。
从 Spark 官方网站下载最新版本的 Spark:
wget https://downloads.apache.org /spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
tar xzf spar-3.2.1-bin-hadoop3.2.tgz
cd smile-3.2.1-bin-hadoop3.2
配置Spark环境变量:
Echo "导出 SPARK_HOME=/path/to/spark- 3.2.1 -bin-hadoop3.2" >> ~/.bashrc
源 ~/.bashrc
echo "导出 PATH=$PATH:$SPARK_HOME /bin" >> ~/.bashrc
源代码 ~/.bashrc
4.使用Spark Streaming读取Kafka数据
创建一个Python脚本,例如spark_kafka_streaming.py
,并从
pyspark.sql 导入 SparkSession
来自 pyspark.streaming 导入 StreamingContext
来自 pyspark.streaming.kafka 导入 KafkaUtils
# 创建 SparkSession 和 StreamingContext
spark = SparkSession.builder \
.appName( "Kafka Spark“流”) \
.getOrCreate()
ssc = StreamingContext(spark.sparkContext, 1)
#从Kafka读取数据
kafkaStream = KafkaUtils.createDirectStream(ssc, ["test" ], {"元数据ta.broker.list": "localhost:9092"})
# 处理中中等 Kafka 数据
def process_message(时间,rdd):
if 不 rdd.isEmpty():
打印("收到消息:%s" % rdd.collect()[0][1])
kafkaStream.foreachRDD( process_message)
# 开始 StreamingContext
ssc.start()
ssc.awaitTermination()
运行 Spark 应用程序:
现在,当您向 Kafka 发送消息时,您的 Spark Streaming 应用程序会实时读取并处理这些消息。
请注意,这只是一个示例,您的应用程序可能需要更复杂的逻辑和配置。 有关更多信息和最佳实践,请参阅 Spark 和 Kafka 官方文档。
评论前必须登录!
注册