日期:2014-05-16  浏览次数:20409 次

JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中.

一、JMS的理解
JMS(Java Message Service)是jcp组织02-03年定义了jsr914规范(http://jcp.org/en/jsr/detail?id=914),它定义了消息的格式和消息传递模式;
消息包括:消息头,消息扩展属性和消息体,其结构看起来与SOAP非常的相似,但一般情况下,SOAP主要关注远程服务调用,而消息则专注于信息的交换;
消息分为:消息生产者,消息服务器和消息消费者。生产者与消费者之间是透明的,生产者在产生消息之后,把消息发送到消息服务器,再由消息服务器发给消费者,因此它们构成了JMS的3点结构;
消息服务器再给消费者时,有2种模式:点到点(ptp: point to point)模式和发布/订阅(publish/subscribe)模式;
ptp:即生产者把消息投递到消息服务器后,这条消息只能由某一个消费者使用;
发布/订阅:顾名思义,就是共享消息了,只要愿意,消费者都可以监听消息;

二、消息服务器(ActiveMQ)
消息服务器在JMS的3点结构中起着重要作用,没有它,生产者的消息不知道如何投递出去,消费者不知道从哪里取得消息,它同样是隔离生产者和消费者的关键部分…………
JMS消息服务器有很多:ActiveMQ、Jboss MQ、Open MQ、RabbitMQ、ZeroMQ等等。
本文介绍的是开源的Java实现的Apache ActiveMQ(http://activemq.apache.org),它的特性在首页就能看到,我就不再介绍了;

1、下载AMQ:http://activemq.apache.org/download.html,最新版本是5.5.0;
2、解压apache-activemq-5.5.0-bin.zip文件到文件系统(比如D:\ActiveMQ-5.5.0);
3、执行bin/activemq.bat脚本即可启动AMQ:

 INFO | ActiveMQ 5.5.0 JMS Message Broker (localhost) is starting
 ......
 INFO | Listening for connections at: tcp://SHI-AP33382A:61616

?当看到上面的日志输出时,表示AMQ已经启动了;
4、默认情况下,AMQ使用conf/activemq.xml作为配置文件,我们可修改它,然后以 bin/activemq.bat xbean:./conf/my.xml启动AMQ;

三、持久化消息(MySQL)
因为接下来我们修改AMQ的默认配置文件,所以先备份conf/activemq.xml文件;
1、建立MySQL数据库:要使用MySQL存储消息,必须告诉AMQ数据源:

/**
 * 创建数据库
 */
CREATE DATABASE misc DEFAULT CHARSET=UTF8;

/**
 * 创建用户和授权
 */
GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'%' IDENTIFIED BY 'misc_root_pwd';
GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'localhost' IDENTIFIED BY 'misc_root_pwd';

通过上面的SQL脚本,我们建立了名为misc的数据库,并且把所有权限都赋予了misc_root的用户;
由于AMQ需要在本数据库中建立数据表,因此用户的权限必须具有建表权限;
2、添加MySQL数据源:默认情况下,AMQ使用KahaDB存储(我对KahaDB不了解),注释到KahaDB的配置方式,改为MySQL的:

<!--
<persistenceAdapter>
    <kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#MySQL-DS"/>
</persistenceAdapter>
?


该配置表示,我们将要使用一个叫做“MySQL-DS”的JDBC数据源;
3、配置MySQL数据源:在</broker>节点后面,增加MySQL数据源配置:

<!-- MySQL DataSource -->
<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&amp;characterEncoding=UTF-8"/>
    <property name="username" value="misc_root"/>
    <property name="password" value="misc_root_pwd"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
?


其实这就是一个Spring的Bean的配置,注意id与上面的保持一致;

整个AMQ的配置文件内容为:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>classpath:/META-INF/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost">
        <!--
            For better performances use VM cursor and small memory limit. For more information, see: http://activemq.apache.org/message-cursors.html Al