level 1
luyuanwu003
楼主
我把消息设置的持久化到数据库。采用topic的方式发送消息。
每次发送消息之后,数据库的ACTIVEMQ_MSGS表里都会插入10条记录(每次是发送10条消息),在接收端接收消息之后,ACTIVEMQ_MSGS表里的数据没有被删掉,求大神帮忙给看看。(采用queue方式发送消息,接收端没接收消息之前,ACTIVEMQ_MSGS表里有数据,接收之后ACTIVEMQ_MSGS的数据库就自动被删掉了)
发送端的代码如下;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicPublisher {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic.messages");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for(int i =0;i<10;i++){
TextMessage message = session.createTextMessage();
message.setText("message_" + System.currentTimeMillis());
producer.send(message);
System.out.println("Sent message: " + message.getText());
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
session.close();
connection.stop();
connection.close();
}
}
接收端的代码如下:
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicSubscriber {
/**
* @param args
*/
public static void main(String[] args) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
Connection connection = factory.createConnection();
connection.setClientID("null");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic.messages");
MessageConsumer consumer = session.createDurableSubscriber(topic,"123");
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2014年05月23日 07点05分
1
每次发送消息之后,数据库的ACTIVEMQ_MSGS表里都会插入10条记录(每次是发送10条消息),在接收端接收消息之后,ACTIVEMQ_MSGS表里的数据没有被删掉,求大神帮忙给看看。(采用queue方式发送消息,接收端没接收消息之前,ACTIVEMQ_MSGS表里有数据,接收之后ACTIVEMQ_MSGS的数据库就自动被删掉了)
发送端的代码如下;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicPublisher {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic.messages");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for(int i =0;i<10;i++){
TextMessage message = session.createTextMessage();
message.setText("message_" + System.currentTimeMillis());
producer.send(message);
System.out.println("Sent message: " + message.getText());
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
session.close();
connection.stop();
connection.close();
}
}
接收端的代码如下:
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicSubscriber {
/**
* @param args
*/
public static void main(String[] args) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
Connection connection = factory.createConnection();
connection.setClientID("null");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic.messages");
MessageConsumer consumer = session.createDurableSubscriber(topic,"123");
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}