ActiveMQ

休息了大概一个礼拜,到一个新的单位重新开始,大佬列了很多新人技术清单,其中有很多只是听说过但没有学习和接触过,得赶紧学习啦,^-^

另外最近使用的是来自大傻逼推荐的一个md编辑器Typora,非常好看还带大纲,比markdownPad好用很多

Activemq学习来源:https://www.cnblogs.com/cyfonly/p/6380860.html

https://www.cnblogs.com/jaycekon/p/6225058.html

https://blog.csdn.net/Ouyzc/article/details/79643387

首先了解我初步要学到的内容,理解JMS规范-理解点对点,发布订阅模式,理解生产者与消费者

ok,我的环境是win,没有在虚拟机里装,但一般用在Linux的好像比较多,来看看activemq是个什么东西吧:它是一个面向消息中间件(MOM Message-oriented middleware)【MOM 的总体思想是它作为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合。】

JMS(Java Message Service)java消息服务

activemq就是JMS的一种体现,再了解消息的传递方式-包括点对点P2P和发布/订阅两种,nice 这似乎是我需要get的第一个知识点。

P2P与Pub/Sub

P2P (点对点)消息域使用 queue 作为 目标,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。

Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收。

多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。

Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。
除非显式指定,否则 topic 不会为订阅者保留消息。当然,这可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。

Demo

  • 获取连接工厂
  • 使用连接工厂创建连接
  • 启动连接
  • 从连接创建会话
  • 获取 Destination
  • 创建 Producer,或
    • 创建 Producer
    • 创建 message
  • 创建 Consumer,或发送或接收message发送或接收 message
    • 创建 Consumer
    • 注册消息监听器(可选)
  • 发送或接收 message
  • 关闭资源(connection, session, producer, consumer 等)
1
2
3
4
5
6
7
8
9
10
11
12
<!--ActiveMQ所需要的jar包 -->  
<!-- 添加ActiveMQ的pool包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>

第一种P2P模式:

①消息生产者给消息中间件(队列)发送消息

②消息消费者接收消息中间件(队列)的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class Producter {
//activamq默认用户名 密码和地址,获取连接的三个必要参数
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BORKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

//Atomic原子类型,保证该int一时间只被单线程操作
AtomicInteger count = new AtomicInteger(0);
//获取连接工厂
ConnectionFactory connectionFactory;
//创建连接对象
Connection connection;
//事务管理
Session session;

ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<MessageProducer>();


public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BORKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true,Session.SESSION_TRANSACTED);
}catch (Exception e){
e.printStackTrace();
}
}

public void sendMessage(String disname){
try {
//获取消息队列
Queue queue = session.createQueue(disname);
MessageProducer messageProducer = null;
if (threadLocal.get() !=null){
//若不为空则从线程中获取,若为空则用session创建
messageProducer = threadLocal.get();
//消息过期设置
//messageProducer.setTimeToLive(1000);
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
//开始创建消息
while (true){
Thread.sleep(1000);
int num = count.getAndIncrement();
TextMessage textMessage = session.createTextMessage(Thread.currentThread().getName()+"我正在生产消息,num"+num);
System.out.println(Thread.currentThread().getName()+"我正在生产消息,num"+num);
//发送消息
messageProducer.send(textMessage);
//提交事务
session.commit();
if (num>5){
break;
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
System.out.println("消息发送结束");
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* @program: activemq
*
* @description: 测试Activemq
*
* @author: WYuyin
*
* @create: 2018-11-01 15:45
**/
public class TestMq {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
TestMq testMq = new TestMq();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}

private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
}


public void run() {
while(true){
try {
producter.sendMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @program: activemq
* @description: 消费者
* @author: WYuyin
* @create: 2018-11-01 16:29
**/
public class Comsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BORKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
//获取连接工厂
ConnectionFactory connectionFactory;
//创建连接对象
Connection connection;
//事务管理
Session session;

ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<MessageConsumer>();
AtomicInteger count = new AtomicInteger();

public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BORKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true,Session.SESSION_TRANSACTED);
}catch (Exception e){
e.printStackTrace();
}
}
public void getMessage(String disname){
try{
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){

TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @program: activemq
* @description: 测试Mq消费者
* @author: WYuyin
* @create: 2018-11-01 16:44
**/
public class TestCustomer {
public static void main(String[] args){
Comsumer comsumer = new Comsumer();
comsumer.init();
TestCustomer testCustomer = new TestCustomer();
new Thread(testCustomer.new ConsumerMq(comsumer)).start();
new Thread(testCustomer.new ConsumerMq(comsumer)).start();
new Thread(testCustomer.new ConsumerMq(comsumer)).start();
new Thread(testCustomer.new ConsumerMq(comsumer)).start();
}

private class ConsumerMq implements Runnable{
Comsumer comsumer;
public ConsumerMq(Comsumer comsumer){
this.comsumer = comsumer;
}

public void run() {
while(true){
try {
comsumer.getMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

第二种Pub/Sub模式

①消息生产者(发布者)给消息中间件发送话题(topic)

②消息消费者(订阅者)接收消息中间件的发送话题

消息生产者(发布者)代码与第一种P2P模式的消息生产者代码几乎一样,只是在创建生产者时,创建的是话题并不是队列

1
2
3
4
5
6
7
8
// 获取session注意参数值mytopic是一个服务器的topic,须在在ActiveMq的console配置  
topic = session.createTopic("mytopic");
// 得到消息生成者【发送者】
producer = session.createProducer(topic);
//......
TextMessage message = session.createTextMessage("我给你发话题");
System.out.println("Sender发送消息:" + "topic:" + i);
producer.send(message);

订阅者:

1
2
3
4
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);  
// 获取session注意参数值mytopic是一个服务器的topic,须在在ActiveMq的console配置
destination = session.createTopic("mytopic");
consumer = session.createConsumer(destination);

什么情况下使用ActiveMQ?

  1. 多个项目之间集成
    (1) 跨平台
    (2) 多语言
    (3) 多项目
  2. 降低系统间模块的耦合度,解耦
    (1) 软件扩展性
  3. 系统前后端隔离
    (1) 前后端隔离,屏蔽高安全区

SpringBoot集成Activemq

即JMS在Springboot中的应用

[https://blog.csdn.net/Ouyzc/article/details/79756574

下面按照上面这个链接中的代码进行分析:

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 1生产者发送消息(发送信息到队列)P2P
* 根据实例化Destination目的地参数的类型的对象决定是点对点队列的形式,还是广播的形式
*
*/
@Scheduled(fixedRate = 5000)
@Override
public void queueSend() {

//定义一个目的地(队列类型)
Destination queue = new ActiveMQQueue("mytest.queue");
jmsTemplate.convertAndSend(queue, "myname is Oyzc");

}
/**
* 2生产者发送消息(发送信息到话题)Pub
*
*/
@Scheduled(fixedRate = 5000)
@Override
public void topicSend() {

//定义一个目的地(队列类型)
Destination topic = new ActiveMQTopic("mytest.topic");
jmsTemplate.convertAndSend(topic, "myname is Oyzc");
}
/**
* 3生产者发送出去消息之后,可以马上监听指定消费者的反馈信息
* @param text
*/
@Override
@JmsListener(destination="out.queue")
public void consumerMessage(String text) {
System.out.println("从out.queue队列收到的回复报文为:"+text);
}

消费者生产说明:

//P2P创建队列,参数为队列名

Destination queue = new ==ActiveMQQueue==(“mytest.queue”);

//Pub定义topic,参数为topic名称

Destination topic = new ==ActiveMQTopic==(“mytest.topic”);

//jms发送队列/话题信息,第一个参数为队列/topic,第二个参数为发送的内容,可以是字符串或实体或json

==jmsTemplate.convertAndSend(topic, “myname is Oyzc”);==

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 使用JmsListener配置消费者监听的队列,其中text是接收到的消息

/**
* 消费者接收信息1(接收队列为mytest.queue的信息,并马上回复信息到out.queue队列中)
*/
@Override
// 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
@JmsListener(destination = "mytest.queue")
@SendTo("out.queue")
public String receiveQueue1(String text) {

System.out.println("消费者1收到目的地为mytest.queue发来的信息"+text);

return "mytest.queue接收到你的信息了";
}

消费者说明:

//@JmsListener开启监听,接受名为”mytest.queue”的队列/话题信息

@JmsListener(destination = “mytest.queue”)

//返回给生产者的队列

@SendTo(“out.queue”)

在队列和话题同时存在的情况下,Jms默认支持队列,解决方式:

在配置文件中添加 spring.jms.pub-sub-domain=true,使得队列和话题同时发送成功,但消费者只接受topic。

正确解决方式:

① Jms配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
@EnableJms
public class JmsFactoryConfig {

@Value("${activemq.ClientId}")
private String ClientId;

@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//订阅发布
factory.setPubSubDomain(true);
//消息持久化
factory.setSubscriptionDurable(true);
//链接超时
factory.setReceiveTimeout(1000L);
//接收者id
factory.setClientId(ClientId);
factory.setConnectionFactory(connectionFactory);
return factory;
}

@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//队列
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
return factory;
}

}

修改消费者代码:

1
@JmsListener(destination = "${activemq.topic}" , containerFactory = "topicListenerFactory")

//使监听器处理话题/队列信息

containerFactory = “topicListenerFactory/queueListenerFactory”