STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。javascript
它提供了一个可互操做的链接格式,容许STOMP客户端与任意STOMP消息代理(Broker)进行交互,相似于OpenWire(一种二进制协议)。css
因为其设计简单,很容易开发客户端,所以在多种语言和多种平台上获得普遍应用。其中最流行的STOMP消息代理是Apache ActiveMQ。前端
STOMP协议工做于TCP协议之上,使用了下列命令:java
STOMP Over WebSocket:http://jmesnil.net/stomp-websocket/doc/web
1.1,添加pom依赖spring
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
1.2,配置 websocket stomp浏览器
/** * 经过EnableWebSocketMessageBroker 开启使用STOMP协议来传输基于代理(message broker)的消息,此时浏览器支持使用@MessageMapping 就像支持@RequestMapping同样。 * @author zhenghuasheng */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
/** * 注册stomp的端点 */
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/endpointService").setAllowedOrigins("*").withSockJS();
}
/** * 配置消息代理(message broker) * @param registry */
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 订阅Broker名称
registry.enableSimpleBroker("/queue","/topic");
// 全局使用的消息前缀(客户端订阅路径上会体现出来)
// registry.setApplicationDestinationPrefixes("/app");
// 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
// registry.setUserDestinationPrefix("/user/");
}
}
说明:springboot
registry.enableSimpleBroker("/queue","/topic");
/topic 表明发布广播,即群发
/queue 表明点对点,即发指定用户服务器
registry.setApplicationDestinationPrefixes("/app");
例如客户端发送消息的目的地为/app/send,则对应控制层@MessageMapping(“/send”)
客户端订阅主题的目的地为/app/subscribe,则对应控制层@SubscribeMapping(“/subscribe”)websocket
registry.setUserDestinationPrefix("/user/");
1.3,消息实体类
/** * 客户端发往服务器消息实体 * @author zhenghuasheng */
public class Message {
private String name;
public String getName() {
return name;
}
}
/** * 服务器发往客户端消息实体 * @author zhenghuasheng */
public class Response {
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
private String responseMessage;
public Response(String responseMessage) {
this.responseMessage = responseMessage;
}
public String getResponseMessage() {
return responseMessage;
}
}
1.4,控制类测试
/** * @author zhenghuasheng */
@Controller
public class WebSocketController {
@Autowired
SimpMessagingTemplate template;
@Autowired
WelcomeJob welcomeTask;
private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);
/** * 浏览器发送请求经过@messageMapping 映射/welcome 这个地址。 * 服务器端有消息时,会订阅@SendTo 中的路径的浏览器发送消息。 * @param message * @return * @throws Exception */
@MessageMapping("/welcome")
@SendTo("/topic/getResponse")
public Response say(Message message) throws Exception {
Thread.sleep(1000);
return new Response("Welcome, " + message.getName() + "!");
}
/** * 当有客户端订阅"/topic/getResponse",会收到消息 * @return */
@SubscribeMapping("/topic/getResponse")
public Response sub() {
logger.info("XXX用户订阅了我。。。");
return new Response("感谢你订阅了我。。。");
}
@GetMapping("/test")
String test() {
return "test";
}
/** * 能够利用普通http request来主动推送广播消息 * @return */
@RequestMapping("/welcome")
@ResponseBody
public ResultVo say02() {
try {
template.convertAndSend("/topic/getResponse", new Response("欢迎!" ));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return ResultVo.ok();
}
}
使用点对点定向推送消
/** * 注册stomp的端点 */
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/endpointWisely")
//.addInterceptors(new WebSocketSessionHandshakeInterceptor())
.setHandshakeHandler(new DefaultHandshakeHandler(){
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
return new LocalPrincipal(ShiroUtils.getUser().getName());
}
})
.setAllowedOrigins("*").withSockJS();
}
本例使用了shiro,使用用户名做为标示,,,能够设置为其余的惟一标示 只需修改 return new LocalPrincipal(ShiroUtils.getUser().getName());构造方法中的值。由于带有request和attributes,彻底能够自定义标示数据,注册时若是想扩展参数或者自定义 只需新增握手拦截器HandshakeInterceptor
例如:经过前端传递的userId来标示,将获取的参数放入attributes中,注册时在该对象中获取
/** * Created by zhenghuasheng on 2016/6/20. */
public class WebSocketSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
Logger logger = LoggerFactory.getLogger(WebSocketSessionHandshakeInterceptor.class);
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (getSession(request) != null) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpServletRequest = servletRequest.getServletRequest();
attributes.put("userId", httpServletRequest.getParameter("userId"));
}
super.beforeHandshake(request, response, wsHandler, attributes);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
super.afterHandshake(request, response, wsHandler, ex);
}
private HttpSession getSession(ServerHttpRequest request) {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
return serverRequest.getServletRequest().getSession();
}
return null;
}
}
/** * @author zhenghuasheng * @date 2018/3/27.14:31 */
public class LocalPrincipal implements Principal {
private String username;
@Override
public String getName() {
return username;
}
public LocalPrincipal(String username) {
this.username = username;
}
public LocalPrincipal() {
}
}
1.5,客户端
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
弹框使用 Toastr,本身引入相关js文件
<script src="/js/plugins/toastr/toastr.min.js"></script>
var stompClient = null;
$(function () {
connect();
});
function sendMessage() {
stompClient.send("/welcome", {}, JSON.stringify({'name': "123456"}));
}
function connect() {
var socket = new SockJS('/endpointService'); //连接SockJS 的endpoint 名称为"/endpointService",对应config中的站点名称
stompClient = Stomp.over(socket);//使用stomp子协议的WebSocket 客户端
stompClient.connect({}, function (frame) {//连接Web Socket的服务端。
console.log('Connected: ' + frame);
stompClient.subscribe('/user/queue/msg', function (response) {
var returnData = JSON.parse(response.body);
console.log(returnData)
toastr.options = {
"closeButton": true,
"debug": false,
"progressBar": true,
"positionClass": "toast-top-center",
"onclick": null,
"showDuration": "400",
"hideDuration": "1000",
"timeOut": "7000",
"extendedTimeOut": "1000",
"showEasing": "swing",
"hideEasing": "linear",
"showMethod": "fadeIn",
"hideMethod": "fadeOut"
}
toastr.info(JSON.parse(response.body).responseMessage);
});
stompClient.subscribe('/topic/getResponse', function (response) { //订阅/topic/getResponse 目标发送的消息。这个是在控制器的@SendTo中定义的。
toastr.options = {
"closeButton": true,
"debug": false,
"progressBar": true,
"positionClass": "toast-top-center",
"onclick": null,
"showDuration": "400",
"hideDuration": "1000",
"timeOut": "7000",
"extendedTimeOut": "1000",
"showEasing": "swing",
"hideEasing": "linear",
"showMethod": "fadeIn",
"hideMethod": "fadeOut"
}
toastr.info(JSON.parse(response.body).responseMessage);
});
});
}
2.1 新增pom依赖,在原spring基础上新增
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring.version}</version>
</dependency>
2.2 客户端、实体类与配置与SpringBoot的相关配置相同
spring可使用xml形式配置站点服务
<websocket:message-broker application-destination-prefix="app" user-destination-prefix="user" >
<websocket:stomp-endpoint allowed-origins="*" path="/webSocketServer">
<websocket:handshake-interceptors>
<ref bean="webSocketSessionHandshakeInterceptor"></ref>
</websocket:handshake-interceptors>
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue" heartbeat-receive-interval="2000" heartbeat-send-interval="2000"/>
</websocket:message-broker>
function sendMessage() {
stompClient.send("/welcome", {name:zhs}, JSON.stringify({'name': "123456"}));
}
@MessageMapping("/welcome")
@SendTo("/topic/getResponse")
public Response say(Message message, @Header("name") String name) throws Exception {
Thread.sleep(1000);
return new Response("Welcome, " + message.getName() + "!");
}
2,发送示例,路径参数传递
function sendMessage() {
stompClient.send("/welcome/zhs", {}, JSON.stringify({'name': "123456"}));
}
2,接受示例
@MessageMapping("/welcome/{name}")
@SendTo("/topic/getResponse")
public Response say(Message message, @DestinationVariable("name") String name) throws Exception {
Thread.sleep(1000);
return new Response("Welcome, " + message.getName() + "!");
}