要在Ubuntu上实现Spark流处理,需要按照以下步骤操作:
安装Java环境:Spark需要Java环境才能运行。 。 您可以使用 OpenJDK 或 Oracle JDK。 在 Ubuntu 上,您可以使用以下命令安装 OpenJDK:
sudo apt update
sudo apt install openjdk-11-jdk
下载并解压Spark:从Spark官网下载合适版本的Spark,并解压到合适位置。 示例:
wget https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2 。 tgz
tar -xzf Spark-3.2.0-bin-hadoop3.2.tgz
cd Spark-3.2.0-bin-hadoop3.2
配置环境变量:编辑~/.bashrc
文件,添加以下内容:
导出 SPARK_HOME=/path/to/spark-3.2.0-bin-hadoop3 .2
导出 PATH=$PATH:$SPARK_HOME/bin
然后运行 source ~/.bashrc
启用配置。
启动Spark:在Spark下,可以使用以下命令启动Spark:
./bin/spark-shell
这将启动一个交互式 Spark shell,您可以在其中编写和运行 Spark 代码。
编写流处理代码:Spark shell 允许您使用 Spark Streaming API 编写流处理代码。 例如,以下代码从标准输入读取数据,对每个输入记录进行计数,并将结果打印到控制台:
来自 pyspark.streaming 导入 StreamingContext
ssc = StreamingContext(spark.sparkContext, 1) # 创建持续时间为 1 秒的 StreamingContext
input_stream = ssc.socketTextStream("localhost", 9999) # 从本地主机上的端口 9999 读取文本数据
# 统计每条输入记录
def count_words(时间,rdd):
如果 不 rdd.isEmpty():
word_counts = rdd.地图(lambda x: (x, 1)).reduceByKey(lambda a , b : a + b)
word_counts.pprint()
input_stream.foreachRDD(count_words)
ssc.start()
ssc.awaitTermination()
运行流处理代码:在 Spark shell 中l、运行以下命令启动流处理:
input_stream.print ()
然后您可以使用telnet
或其他工具将文本数据发送到本地主机上的端口9999。 每次发送一行数据时,Spark Streaming 都会对数据行进行计数并将结果打印到控制台。
请注意,上述步骤仅适用于独立模式下的 Spark 流处理。 如果您想在分布式环境中运行 Spark 流处理,您需要配置 Spark 集群并使用 spark-submit
命令提交您的应用程序。 此外,Spark Streaming 依赖于 Hadoop,因此您必须安装 Hadoop 依赖项。
评论前必须登录!
注册