关注分享主机优惠活动
国内外VPS云服务器

Ubuntu上集成Spark和RabbitMQ消息队列(ubuntu启动Rabbitmq)

按照以下步骤集成 Spark 和 RabbitMQ 在 Ubuntu 上实现消息队列处理。

安装 RabbitMQ

更新软件包列表:

  sudo apt update

安装 RabbitMQ 服务器:

 sudo apt安装 Rabbitmq-server

启动 RabbitMQ 服务:

sudo systemctl start Rabbitmq-server

配置RabbitMQ开机自动启动:

 sudo systemctl enable Rabbitmq-server

检查状态RabbitMQ 服务:

sudo systemctl status Rabbitmq-server 

安装 Spark

下载 Spark:

wget https://downloads.apache.org /spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2。tgz

解压 Spark:

tar - xzf Spark- 3.2.0-bin-hadoop3.2.tgz

设置Spark环境变量:
~/编辑.bashrc 将以下内容添加到文件中:

导出 SPARK_HOME=/path/to /spark-3.2.0-bin-hadoop3。 2
导出 PATH=$PATH :$SPARK_HOME /bin

保存并运行文件。

来源 ~/.bashrc

验证 Spark 安装:

spark-submit --version

配置 RabbitMQ 和 Spark 集成

安装 RabbitMQ Java 客户端库:

sudo apt install librabbitmq -java

将 RabbitMQ 依赖项添加到您的 Spark 项目中:
将以下依赖项添加到 pom.xml 文件中: 添加:

<依赖项> [小时] <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <版本>5.14.2</版本>
</依赖项>

创建 Spark 应用程序:
创建一个 Java 文件,例如 RabbitMQSparkApp.java 并编写以下代码。

导入 com.rabbitmq.client.*;
导入 org.apache.spark.SparkConf;
导入 org.apache.spark.api.java.JavaPairRDD ;
 导入 scala.Tuple2;

公共  RabbitMQSparkApp {

 公共 静态 无效 主要 ( String[] args) Exception {
 //创建Spark配置
 SparkConf conf =  SparkConf().setAppName("RabbitMQSparkApp").setMaster("local[*]" ) ;

 //创建Spark上下文
 JavaSparkContext sc =  JavaSparkContext(conf);

 // 创建 RabbitMQ 连接
 ConnectionFactory 工厂 = 新建 ConnectionFactory();
 Factory.setHost("localhost");
 连接 连接 = Factory.newConnection();
 通道 通道   = connection.createChannel();

 //声明一个队列
 Channel。    queueDeclare("spark_queue", false、 );

 //读取队列消息
 消费者 消费者 =  DefaultConsumer(频道) {
 @Override
 公共 void handleDeliver(字符串 ConsumerTag,信封 信封,AMQP.BasicProperties 属性,字节[] body) 抛出 IOException {
 字符串 消息 =  字符串(body,  "UTF-8");
 System.out.println("收到消息:" + message);

 //处理消息并将其发送到另一个队列
 String[] Parts = message.split( ", " );
 字符串 processedMessage  =  部分[0] + "_" + Parts[1];
 channel.basicPublish("" "processed_queue",属性,processedMessage.getBytes());
 }
 };
  channel.basicConsume("spark_queue", true, 消费者);
 }
}
 

编译并运行 Spark 应用程序:

mvn clean package
spark-submit --class RabbitMQSparkApp --master 本地[*] target/dependency/spark-examples .jar

启动另一个消费者来处理已处理的消息

ProcessedMessageApp.java等。使用以下内容创建一个新的Java文件以下代码:

导入 com.rabbitmq.client.*;
导入 org.apache.spark.SparkConf;
导入 org.apache.spark.api.java.JavaPairRDD;
导入 scala.Tuple2;

公共 ProcessedMessageApp {

公共 静态 void 主要(String[] args) 抛出 异常 {
// 创建 Spark 配置
SparkConf conf = new SparkConf().setAppName("ProcessedMessageApp").setMaster("local[*]");

// 创建 Spark 上下文
JavaSparkContext sc = JavaSparkContext(conf);

//创建RabbitMQ连接 Span>
ConnectionFactory 工厂 = ConnectionFactory();
Factory.setHost("localhost");
连接 连接 = Factory.newConnection();
通道 频道 = connection.createChannel();

//队列声明
channel.queueDeclare("processed_queue", false, , null);

//读取队列消息
消费者 消费者 = DefaultConsumer(频道){
@Override
公共 void handleDeliver(字符串ConsumerTag,Envelope,AMQP.BasicProperties属性,字节[]正文) span> 抛出 IOException {
字符串消息 = 新消息 字符串(body, "UTF-8");
System.out.println("收到已处理的消息:" + message);
}
};
channel.basicConsume("processed_queue", true, 消费者);
}
}

编译并运行 Spark 应用程序:

mvn clean package
spark-submit --class ProcessedMessageApp --master  local[*] target/dependency/spark-examples.jar

以上按照步骤操作后,乌班图成功集成Spark和RabbitMQ实现消息队列处理。

未经允许不得转载:主机频道 » Ubuntu上集成Spark和RabbitMQ消息队列(ubuntu启动Rabbitmq)

评论 抢沙发

评论前必须登录!