luyuanwu003 luyuanwu003
关注数: 88 粉丝数: 60 发帖数: 408 关注贴吧数: 5
activemq的topic消息一直都会存在数据库中,为什么不会删除 我把消息设置的持久化到数据库。采用topic的方式发送消息。 每次发送消息之后,数据库的ACTIVEMQ_MSGS表里都会插入10条记录(每次是发送10条消息),在接收端接收消息之后,ACTIVEMQ_MSGS表里的数据没有被删掉,求大神帮忙给看看。(采用queue方式发送消息,接收端没接收消息之前,ACTIVEMQ_MSGS表里有数据,接收之后ACTIVEMQ_MSGS的数据库就自动被删掉了) 发送端的代码如下; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicPublisher { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("myTopic.messages"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(int i =0;i<10;i++){ TextMessage message = session.createTextMessage(); message.setText("message_" + System.currentTimeMillis()); producer.send(message); System.out.println("Sent message: " + message.getText()); // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } } session.close(); connection.stop(); connection.close(); } } 接收端的代码如下: import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicSubscriber { /** * @param args */ public static void main(String[] args) { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try { Connection connection = factory.createConnection(); connection.setClientID("null"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("myTopic.messages"); MessageConsumer consumer = session.createDurableSubscriber(topic,"123"); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }
1 下一页