ActiveMQ中添加用户认证

本篇文章中用户认证基于MySQL

1.新建数据库activemq,存储用户信息

-- ----------------------------
-- Table structure for tb_user
-- ----------------------------
DROP TABLE IF EXISTS `tb_user`;
CREATE TABLE `tb_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(255) DEFAULT NULL,
  `password` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of tb_user
-- ----------------------------
INSERT INTO `tb_user` VALUES ('1', 'admin', 'a123');

2.pom中引入数据库的包

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>4.3.26.RELEASE</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

并将对应的包拷入lib目录下

3.代码相关

在org.apache.activemq.broker.region目录新建chenk目录
在chenk目录下:

① 添加AuthPlugin,内容为:

package org.apache.activemq.broker.region.chenk;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.springframework.jdbc.core.JdbcTemplate;

/**
 * @Author chenk
 * @create 2021/1/15 9:42
 */
public class AuthPlugin implements BrokerPlugin {
    public static JdbcTemplate jdbcTemplate;//注入了spring-jdbc
    public AuthPlugin(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate=jdbcTemplate;
    }

    @Override
    public Broker installPlugin(Broker broker) throws Exception {
        return new AuthBroker(broker,jdbcTemplate);
    }
}

② 添加User类,内容如下:

package org.apache.activemq.broker.region.chenk;

/**
 * @Author chenk
 * @create 2021/1/15 10:01
 */
public class User {
    private String id;
    private String username;
    private String password;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

③ 新建AuthBroker,内容为:

package org.apache.activemq.broker.region.chenk;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.security.AbstractAuthenticationBroker;
import org.apache.activemq.security.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;

import java.security.Principal;
import java.security.cert.X509Certificate;
import java.util.HashSet;
import java.util.Set;

/**
 * @Author chenk
 * @create 2021/1/15 9:51
 */
public class AuthBroker extends AbstractAuthenticationBroker {

    private static Logger LOG = LoggerFactory.getLogger(AuthBroker.class);

    private JdbcTemplate jdbcTemplate;

    public AuthBroker(Broker next, JdbcTemplate jdbcTemplate) {
        super(next);
        this.jdbcTemplate = jdbcTemplate;
    }

    /**
     * <p>
     * 创建连接的时候拦截
     * </p>
     */
    @Override
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
        LOG.debug("addConnection");
        SecurityContext securityContext = context.getSecurityContext();
        if (securityContext == null) {
            securityContext = authenticate(info.getUserName(), info.getPassword(), null);
            context.setSecurityContext(securityContext);
            securityContexts.add(securityContext);
        }

        try {
            super.addConnection(context, info);
        } catch (Exception e) {
            securityContexts.remove(securityContext);
            context.setSecurityContext(null);
            throw e;
        }
    }

    /**
     * 得到用户信息
     * <p>Title: getUser</p>
     *
     * @param username
     * @return
     */
    private User getUser(String username) {
        String sql = "select * from tb_user where username=? limit 1";
        User user = jdbcTemplate.queryForObject(sql, new Object[]{username}, new BeanPropertyRowMapper<User>(User.class));
        return user;
    }

    /**
     * 认证
     * <p>Title: authenticate</p>
     */
    @Override
    public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
        SecurityContext securityContext = null;
        User user = getUser(username);
        //验证用户信息
        LOG.info("user : {} , pwd : {}", user.getUsername(), user.getPassword());
        if (user != null && user.getPassword().equals(password)) {
            securityContext = new SecurityContext(username) {
                @Override
                public Set<Principal> getPrincipals() {
                    Set<Principal> groups = new HashSet<Principal>();
                    groups.add(new GroupPrincipal("users"));//默认加入了users的组
                    return groups;
                }
            };
        } else {
            LOG.error("验证失败");
            throw new SecurityException("验证失败");
        }
        return securityContext;
    }

}

4.修改activemq.xml

在broker中添加plugins

<plugins>
    <bean xmlns="http://www.springframework.org/schema/beans" id="testPlugin" class="org.apache.activemq.broker.region.chenk.AuthPlugin">
        <constructor-arg>
            <ref bean="jdbcTemplate"/>
        </constructor-arg>
    </bean>
</plugins>

添加

    <!-- mysql数据库数据源-->
    <bean id="mySqlDataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="${jdbc.driverClassName}" />
        <property name="url" value="${jdbc.url}" />
        <property name="username" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
    </bean>
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate" abstract="false"
          lazy-init="false" autowire="default" >
        <property name="dataSource">
            <ref bean="mySqlDataSource" />
        </property>
    </bean>

在db.properties中添加

jdbc.url=jdbc:mysql://127.0.0.1:3306/activemq?autoReconnect=true&useUnicode=true&characterEncoding=utf8

重新启动项目,用户连接时会判断用户名密码

添加新评论