SpringBoot/Spring中创建WebSocket链接(STOMP)

STOMP协议介绍

STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。javascript

它提供了一个可互操做的链接格式,容许STOMP客户端与任意STOMP消息代理(Broker)进行交互,相似于OpenWire(一种二进制协议)。css

因为其设计简单,很容易开发客户端,所以在多种语言和多种平台上获得普遍应用。其中最流行的STOMP消息代理是Apache ActiveMQ。前端

STOMP协议工做于TCP协议之上,使用了下列命令:java

  • SEND 发送
  • SUBSCRIBE 订阅
  • UNSUBSCRIBE 退订
  • BEGIN 开始
  • COMMIT 提交
  • ABORT 取消
  • ACK 确认
  • DISCONNECT 断开

STOMP Over WebSocket:http://jmesnil.net/stomp-websocket/doc/web

1,SpringBoot添加基于STMOP协议的WebSocket支持

1.1,添加pom依赖spring

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

1.2,配置 websocket stomp浏览器

/** * 经过EnableWebSocketMessageBroker 开启使用STOMP协议来传输基于代理(message broker)的消息,此时浏览器支持使用@MessageMapping 就像支持@RequestMapping同样。 * @author zhenghuasheng */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{

    /** * 注册stomp的端点 */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/endpointService").setAllowedOrigins("*").withSockJS();
    }

    /** * 配置消息代理(message broker) * @param registry */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 订阅Broker名称
        registry.enableSimpleBroker("/queue","/topic");
        // 全局使用的消息前缀(客户端订阅路径上会体现出来)
// registry.setApplicationDestinationPrefixes("/app");
        // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        // registry.setUserDestinationPrefix("/user/");
    }
}

说明:springboot

  • 用户订阅主题的前缀
registry.enableSimpleBroker("/queue","/topic");

/topic 表明发布广播,即群发
/queue 表明点对点,即发指定用户服务器

  • 全局使用的消息前缀(客户端订阅路径上会体现出来)
registry.setApplicationDestinationPrefixes("/app");

例如客户端发送消息的目的地为/app/send,则对应控制层@MessageMapping(“/send”)
客户端订阅主题的目的地为/app/subscribe,则对应控制层@SubscribeMapping(“/subscribe”)websocket

  • 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
registry.setUserDestinationPrefix("/user/");

1.3,消息实体类

  • 客户端发往服务器消息实体
/** * 客户端发往服务器消息实体 * @author zhenghuasheng */
public class Message {
    private String name;

    public String getName() {
        return name;
    }
}
  • 服务器发往客户端的消息实体
/** * 服务器发往客户端消息实体 * @author zhenghuasheng */
public class Response {
    public void setResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }

    private String responseMessage;

    public Response(String responseMessage) {
        this.responseMessage = responseMessage;
    }

    public String getResponseMessage() {
        return responseMessage;
    }
}

1.4,控制类测试

/** * @author zhenghuasheng */
@Controller
public class WebSocketController {
    @Autowired
    SimpMessagingTemplate template;

    @Autowired
    WelcomeJob welcomeTask;

    private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);

    /** * 浏览器发送请求经过@messageMapping 映射/welcome 这个地址。 * 服务器端有消息时,会订阅@SendTo 中的路径的浏览器发送消息。 * @param message * @return * @throws Exception */
    @MessageMapping("/welcome") 
    @SendTo("/topic/getResponse") 
    public Response say(Message message) throws Exception {
        Thread.sleep(1000);
        return new Response("Welcome, " + message.getName() + "!");
    }

    /** * 当有客户端订阅"/topic/getResponse",会收到消息 * @return */
    @SubscribeMapping("/topic/getResponse")
    public Response sub() {
        logger.info("XXX用户订阅了我。。。");
        return new Response("感谢你订阅了我。。。");
    }

    @GetMapping("/test")
    String test() {
        return "test";
    }

    /** * 能够利用普通http request来主动推送广播消息 * @return */
    @RequestMapping("/welcome")
    @ResponseBody
    public ResultVo say02() {
        try {
            template.convertAndSend("/topic/getResponse", new Response("欢迎!" ));
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return ResultVo.ok();
    }
}

使用点对点定向推送消

  • 更改端点注册,新增注册标示,这样服务器才能分辨推送的目的地
/** * 注册stomp的端点 */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/endpointWisely")
         //.addInterceptors(new WebSocketSessionHandshakeInterceptor())
                .setHandshakeHandler(new DefaultHandshakeHandler(){
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
                        return new LocalPrincipal(ShiroUtils.getUser().getName());
                    }
                })
                .setAllowedOrigins("*").withSockJS();
    }

本例使用了shiro,使用用户名做为标示,,,能够设置为其余的惟一标示   只需修改 return new LocalPrincipal(ShiroUtils.getUser().getName());构造方法中的值。由于带有request和attributes,彻底能够自定义标示数据,注册时若是想扩展参数或者自定义  只需新增握手拦截器HandshakeInterceptor

例如:经过前端传递的userId来标示,将获取的参数放入attributes中,注册时在该对象中获取
/** * Created by zhenghuasheng on 2016/6/20. */
public class WebSocketSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
    Logger logger = LoggerFactory.getLogger(WebSocketSessionHandshakeInterceptor.class);

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        if (getSession(request) != null) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            HttpServletRequest httpServletRequest = servletRequest.getServletRequest();
            attributes.put("userId", httpServletRequest.getParameter("userId"));
        }
        super.beforeHandshake(request, response, wsHandler, attributes);
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
        super.afterHandshake(request, response, wsHandler, ex);
    }
    private HttpSession getSession(ServerHttpRequest request) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
            return serverRequest.getServletRequest().getSession();
        }
        return null;
    }

}




/** * @author zhenghuasheng * @date 2018/3/27.14:31 */
public class LocalPrincipal implements Principal {

    private String username;

    @Override
    public String getName() {
        return username;
    }


    public LocalPrincipal(String username) {
        this.username = username;
    }

    public LocalPrincipal() {
    }
}

1.5,客户端

  • 引入js文件
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
弹框使用 Toastr,本身引入相关js文件
<script src="/js/plugins/toastr/toastr.min.js"></script>
var stompClient = null;
    $(function () {
        connect();
    });

    function sendMessage() {
        stompClient.send("/welcome", {}, JSON.stringify({'name': "123456"}));
    }
   function connect() {
        var socket = new SockJS('/endpointService'); //连接SockJS 的endpoint 名称为"/endpointService",对应config中的站点名称
        stompClient = Stomp.over(socket);//使用stomp子协议的WebSocket 客户端
        stompClient.connect({}, function (frame) {//连接Web Socket的服务端。
            console.log('Connected: ' + frame);
            stompClient.subscribe('/user/queue/msg', function (response) {
                var returnData = JSON.parse(response.body);
                console.log(returnData)
                toastr.options = {
                    "closeButton": true,
                    "debug": false,
                    "progressBar": true,
                    "positionClass": "toast-top-center",
                    "onclick": null,
                    "showDuration": "400",
                    "hideDuration": "1000",
                    "timeOut": "7000",
                    "extendedTimeOut": "1000",
                    "showEasing": "swing",
                    "hideEasing": "linear",
                    "showMethod": "fadeIn",
                    "hideMethod": "fadeOut"
                }
                toastr.info(JSON.parse(response.body).responseMessage);
            });
            stompClient.subscribe('/topic/getResponse', function (response) { //订阅/topic/getResponse 目标发送的消息。这个是在控制器的@SendTo中定义的。
                toastr.options = {
                    "closeButton": true,
                    "debug": false,
                    "progressBar": true,
                    "positionClass": "toast-top-center",
                    "onclick": null,
                    "showDuration": "400",
                    "hideDuration": "1000",
                    "timeOut": "7000",
                    "extendedTimeOut": "1000",
                    "showEasing": "swing",
                    "hideEasing": "linear",
                    "showMethod": "fadeIn",
                    "hideMethod": "fadeOut"
                }
                toastr.info(JSON.parse(response.body).responseMessage);
            });
        });
    }

2,Spring添加基于STMOP协议的WebSocket支持

2.1 新增pom依赖,在原spring基础上新增

<dependency>  
     <groupId>org.springframework</groupId>  
      <artifactId>spring-websocket</artifactId>  
      <version>${spring.version}</version>  
 </dependency>  

 <dependency>  
    <groupId>org.springframework</groupId>  
    <artifactId>spring-messaging</artifactId>  
    <version>${spring.version}</version>  
 </dependency>

2.2 客户端、实体类与配置与SpringBoot的相关配置相同

spring可使用xml形式配置站点服务

<websocket:message-broker application-destination-prefix="app" user-destination-prefix="user" >
    <websocket:stomp-endpoint allowed-origins="*" path="/webSocketServer">
        <websocket:handshake-interceptors>
            <ref bean="webSocketSessionHandshakeInterceptor"></ref>
        </websocket:handshake-interceptors>
        <websocket:sockjs/>
    </websocket:stomp-endpoint>

    <websocket:stomp-broker-relay prefix="/topic,/queue" heartbeat-receive-interval="2000" heartbeat-send-interval="2000"/>
</websocket:message-broker>

3,客户端向服务端传递参数

  • 1,发送示例,经过header传递
function sendMessage() {
        stompClient.send("/welcome", {name:zhs}, JSON.stringify({'name': "123456"}));
    }
  • 1,接受示例
@MessageMapping("/welcome")
    @SendTo("/topic/getResponse")
    public Response say(Message message, @Header("name") String name) throws Exception {
        Thread.sleep(1000);
        return new Response("Welcome, " + message.getName() + "!");
    }

2,发送示例,路径参数传递

function sendMessage() {
        stompClient.send("/welcome/zhs", {}, JSON.stringify({'name': "123456"}));
    }

2,接受示例

@MessageMapping("/welcome/{name}")
    @SendTo("/topic/getResponse")
    public Response say(Message message, @DestinationVariable("name") String name) throws Exception {
        Thread.sleep(1000);
        return new Response("Welcome, " + message.getName() + "!");
    }