作者 crossoverJie
提交者 GitHub

Merge pull request #5 from crossoverJie/cim

cim
正在显示 70 个修改的文件 包含 3456 行增加324 行删除

要显示太多修改。

为保证性能只显示 70 of 70+ 个文件。

... ... @@ -5,10 +5,10 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.crossoverjie.netty</groupId>
<artifactId>netty-action</artifactId>
<artifactId>cim</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>netty-action-heartbeat-client</artifactId>
<artifactId>cim-client</artifactId>
<packaging>jar</packaging>
<properties>
... ... @@ -30,7 +30,7 @@
<dependency>
<groupId>com.crossoverjie.netty</groupId>
<artifactId>netty-action-common</artifactId>
<artifactId>cim-common</artifactId>
</dependency>
... ...
package com.crossoverjie.netty.action.client;
package com.crossoverjie.cim.client;
import com.crossoverjie.cim.client.scanner.Scan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
... ... @@ -10,18 +11,20 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @author crossoverJie
*/
@SpringBootApplication
public class HeartbeatClientApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClientApplication.class);
public class CIMClientApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);
public static void main(String[] args) {
SpringApplication.run(HeartbeatClientApplication.class, args);
LOGGER.info("启动 Client 成功");
SpringApplication.run(CIMClientApplication.class, args);
LOGGER.info("启动 Client 服务成功");
}
@Override
public void run(String... args) throws Exception {
Scan scan = new Scan() ;
Thread thread = new Thread(scan);
thread.setName("scan-thread");
thread.start();
}
}
\ No newline at end of file
... ...
package com.crossoverjie.netty.action.client;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.netty.action.client.init.CustomerHandleInitializer;
import com.crossoverjie.netty.action.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.common.protocol.BaseRequestProto;
package com.crossoverjie.cim.client.client;
import com.crossoverjie.cim.client.init.CIMClientHandleInitializer;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
... ... @@ -16,6 +18,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
... ... @@ -29,55 +32,93 @@ import javax.annotation.PostConstruct;
* @since JDK 1.8
*/
@Component
public class HeartbeatClient {
public class CIMClient {
private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClient.class);
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClient.class);
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${cim.user.id}")
private long userId;
@Value("${netty.server.port}")
private int nettyPort;
@Value("${netty.server.host}")
private String host;
@Value("${cim.user.userName}")
private String userName;
private SocketChannel channel;
@Autowired
private RouteRequest routeRequest;
@PostConstruct
public void start() throws InterruptedException {
public void start() throws Exception {
//登录 + 获取可以使用的服务器 ip+port
CIMServerResVO.ServerInfo cimServer = userLogin();
//启动客户端
startClient(cimServer);
//向服务端注册
loginCIMServer();
}
/**
* 启动客户端
*
* @param cimServer
* @throws InterruptedException
*/
private void startClient(CIMServerResVO.ServerInfo cimServer) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CustomerHandleInitializer())
.handler(new CIMClientHandleInitializer())
;
ChannelFuture future = bootstrap.connect(host, nettyPort).sync();
ChannelFuture future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
if (future.isSuccess()) {
LOGGER.info("启动 Netty 成功");
LOGGER.info("启动 cim client 成功");
}
channel = (SocketChannel) future.channel();
}
/**
* 发送消息
* 登录+路由服务器
*
* @param customProtocol
* @return 路由服务器信息
* @throws Exception
*/
public void sendMsg(CustomProtocol customProtocol) {
ChannelFuture future = channel.writeAndFlush(customProtocol);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("客户端手动发消息成功={}", JSON.toJSONString(customProtocol)));
private CIMServerResVO.ServerInfo userLogin() throws Exception {
LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
CIMServerResVO.ServerInfo cimServer = routeRequest.getCIMServer(loginReqVO);
LOGGER.info("cimServer=[{}]", cimServer.toString());
return cimServer;
}
/**
* 向服务器注册
*/
private void loginCIMServer() {
CIMRequestProto.CIMReqProtocol login = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(userId)
.setReqMsg(userName)
.setType(Constants.CommandType.LOGIN)
.build();
ChannelFuture future = channel.writeAndFlush(login);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("注册成功={}", login.toString()));
}
/**
* 发送消息字符串
*
* @param msg
*/
public void sendStringMsg(String msg) {
ByteBuf message = Unpooled.buffer(msg.getBytes().length) ;
message.writeBytes(msg.getBytes()) ;
ByteBuf message = Unpooled.buffer(msg.getBytes().length);
message.writeBytes(msg.getBytes());
ChannelFuture future = channel.writeAndFlush(message);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("客户端手动发消息成功={}", msg));
... ... @@ -91,9 +132,10 @@ public class HeartbeatClient {
*/
public void sendGoogleProtocolMsg(GoogleProtocolVO googleProtocolVO) {
BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder()
CIMRequestProto.CIMReqProtocol protocol = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(googleProtocolVO.getRequestId())
.setReqMsg(googleProtocolVO.getMsg())
.setType(Constants.CommandType.MSG)
.build();
... ... @@ -102,4 +144,12 @@ public class HeartbeatClient {
LOGGER.info("客户端手动发送 Google Protocol 成功={}", googleProtocolVO.toString()));
}
/**
* 关闭
* @throws InterruptedException
*/
public void close() throws InterruptedException {
channel.close() ;
}
}
... ...
package com.crossoverjie.cim.client.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/8/24 01:43
* @since JDK 1.8
*/
@Component
public class AppConfiguration {
@Value("${cim.user.id}")
private Long userId;
@Value("${cim.user.userName}")
private String userName;
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
... ...
package com.crossoverjie.cim.client.config;
import com.crossoverjie.cim.client.handle.MsgHandleCaller;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.*;
/**
* Function:bean 配置
*
* @author crossoverJie
* Date: 24/05/2018 15:55
* @since JDK 1.8
*/
@Configuration
public class BeanConfig {
private final static Logger LOGGER = LoggerFactory.getLogger(BeanConfig.class);
@Value("${cim.user.id}")
private long userId;
@Value("${cim.callback.thread.queue.size}")
private int queueSize;
@Value("${cim.callback.thread.pool.size}")
private int poolSize;
/**
* 创建心跳单例
* @return
*/
@Bean(value = "heartBeat")
public CIMRequestProto.CIMReqProtocol heartBeat() {
CIMRequestProto.CIMReqProtocol heart = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(userId)
.setReqMsg("ping")
.setType(Constants.CommandType.PING)
.build();
return heart;
}
/**
* http client
* @return okHttp
*/
@Bean
public OkHttpClient okHttpClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10,TimeUnit.SECONDS)
.retryOnConnectionFailure(true);
return builder.build();
}
/**
* 创建回调线程池
* @return
*/
@Bean("callBackThreadPool")
public ThreadPoolExecutor buildCallerThread(){
BlockingQueue<Runnable> queue = new LinkedBlockingQueue(queueSize);
ThreadFactory product = new ThreadFactoryBuilder()
.setNameFormat("msg-callback-%d")
.setDaemon(true)
.build();
ThreadPoolExecutor productExecutor = new ThreadPoolExecutor(poolSize, poolSize, 1, TimeUnit.MILLISECONDS, queue,product);
return productExecutor ;
}
/**
* 回调 bean
* @return
*/
@Bean
public MsgHandleCaller buildCaller(){
MsgHandleCaller caller = new MsgHandleCaller(msg -> {
//处理业务逻辑,或者自定义实现接口
}) ;
return caller ;
}
}
... ...
package com.crossoverjie.netty.action.client.config;
package com.crossoverjie.cim.client.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
... ...
package com.crossoverjie.netty.action.client.controller;
import com.crossoverjie.netty.action.client.HeartbeatClient;
import com.crossoverjie.netty.action.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.netty.action.client.vo.req.SendMsgReqVO;
import com.crossoverjie.netty.action.client.vo.req.StringReqVO;
import com.crossoverjie.netty.action.client.vo.res.SendMsgResVO;
import com.crossoverjie.netty.action.common.constant.Constants;
import com.crossoverjie.netty.action.common.enums.StatusEnum;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.common.res.BaseResponse;
import com.crossoverjie.netty.action.common.res.NULLBody;
package com.crossoverjie.cim.client.controller;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.SendMsgReqVO;
import com.crossoverjie.cim.client.vo.req.StringReqVO;
import com.crossoverjie.cim.client.vo.res.SendMsgResVO;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.common.res.NULLBody;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.metrics.CounterService;
... ... @@ -37,30 +38,13 @@ public class IndexController {
private CounterService counterService;
@Autowired
private HeartbeatClient heartbeatClient ;
private CIMClient heartbeatClient ;
/**
* 向服务端发消息
* @param sendMsgReqVO
* @return
*/
@ApiOperation("客户端发送消息")
@RequestMapping(value = "sendMsg",method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getId(),sendMsgReqVO.getMsg())) ;
// 利用 actuator 来自增
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
res.setDataBody(sendMsgResVO) ;
return res ;
}
@Autowired
private RouteRequest routeRequest ;
/**
* 向服务端发消息 字符串
... ... @@ -111,4 +95,27 @@ public class IndexController {
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
return res ;
}
/**
* 群发消息
* @param sendMsgReqVO
* @return
*/
@ApiOperation("群发消息")
@RequestMapping(value = "sendGroupMsg",method = RequestMethod.POST)
@ResponseBody
public BaseResponse sendGroupMsg(@RequestBody SendMsgReqVO sendMsgReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
GroupReqVO groupReqVO = new GroupReqVO(sendMsgReqVO.getUserId(),sendMsgReqVO.getMsg()) ;
routeRequest.sendGroupMsg(groupReqVO) ;
counterService.increment(Constants.COUNTER_SERVER_PUSH_COUNT);
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
return res ;
}
}
... ...
package com.crossoverjie.netty.action.client.handle;
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.netty.action.common.protocol.BaseResponseProto;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.common.protocol.CIMResponseProto;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Function:
*
... ... @@ -13,27 +20,31 @@ import org.slf4j.LoggerFactory;
* Date: 16/02/2018 18:09
* @since JDK 1.8
*/
public class EchoClientHandle extends SimpleChannelInboundHandler<BaseResponseProto.ResponseProtocol> {
@ChannelHandler.Sharable
public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProto.CIMResProtocol> {
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientHandle.class);
private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class);
private MsgHandleCaller caller ;
private ThreadPoolExecutor threadPoolExecutor ;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
/*if (evt instanceof IdleStateEvent){
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
LOGGER.info("已经 10 秒没有发送信息!");
//LOGGER.info("已经 10 秒没有发送信息!");
//向服务端发送消息
CustomProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CustomProtocol.class);
ctx.writeAndFlush(heartBeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat) ;
}
}*/
}
super.userEventTriggered(ctx, evt);
}
... ... @@ -42,16 +53,32 @@ public class EchoClientHandle extends SimpleChannelInboundHandler<BaseResponsePr
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//客户端和服务端建立连接时调用
LOGGER.info("已经建立了联系。。");
LOGGER.info("cim server connect success!");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, BaseResponseProto.ResponseProtocol responseProtocol) throws Exception {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol responseProtocol) throws Exception {
//从服务端收到消息时被调用
//LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
LOGGER.info("客户端收到消息={}" ,responseProtocol.getResMsg());
//回调消息
callBackMsg(responseProtocol.getResMsg());
LOGGER.info(responseProtocol.getResMsg());
}
/**
* 回调消息
* @param msg
*/
private void callBackMsg(String msg) {
threadPoolExecutor = SpringBeanFactory.getBean("callBackThreadPool",ThreadPoolExecutor.class) ;
threadPoolExecutor.execute(() -> {
caller = SpringBeanFactory.getBean(MsgHandleCaller.class) ;
caller.getMsgHandleListener().handle(msg);
});
}
@Override
... ...
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.service.CustomMsgHandleListener;
/**
* Function:消息回调 bean
*
* @author crossoverJie
* Date: 2018/12/26 17:37
* @since JDK 1.8
*/
public class MsgHandleCaller {
/**
* 回调接口
*/
private CustomMsgHandleListener msgHandleListener ;
public MsgHandleCaller(CustomMsgHandleListener msgHandleListener) {
this.msgHandleListener = msgHandleListener;
}
public CustomMsgHandleListener getMsgHandleListener() {
return msgHandleListener;
}
public void setMsgHandleListener(CustomMsgHandleListener msgHandleListener) {
this.msgHandleListener = msgHandleListener;
}
}
... ...
package com.crossoverjie.netty.action.client.init;
package com.crossoverjie.cim.client.init;
import com.crossoverjie.netty.action.client.handle.EchoClientHandle;
import com.crossoverjie.netty.action.common.protocol.BaseResponseProto;
import com.crossoverjie.cim.client.handle.CIMClientHandle;
import com.crossoverjie.cim.common.protocol.CIMResponseProto;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
... ... @@ -17,12 +17,15 @@ import io.netty.handler.timeout.IdleStateHandler;
* Date: 23/02/2018 22:47
* @since JDK 1.8
*/
public class CustomerHandleInitializer extends ChannelInitializer<Channel> {
public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
private final CIMClientHandle cimClientHandle = new CIMClientHandle();
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//10 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 10, 0))
//60 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 60, 0))
//心跳解码
//.addLast(new HeartbeatEncode())
... ... @@ -30,14 +33,12 @@ public class CustomerHandleInitializer extends ChannelInitializer<Channel> {
// google Protobuf 编解码
//拆包解码
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(BaseResponseProto.ResponseProtocol.getDefaultInstance()))
.addLast(new ProtobufDecoder(CIMResponseProto.CIMResProtocol.getDefaultInstance()))
//
//拆包编码
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new EchoClientHandle())
.addLast(cimClientHandle)
;
}
}
... ...
package com.crossoverjie.cim.client.scanner;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Scanner;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/21 16:44
* @since JDK 1.8
*/
public class Scan implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(Scan.class);
/**
* 系统参数
*/
private AppConfiguration configuration;
private MsgHandle msgHandle ;
public Scan() {
this.configuration = SpringBeanFactory.getBean(AppConfiguration.class);
this.msgHandle = SpringBeanFactory.getBean(MsgHandle.class) ;
}
@Override
public void run() {
Scanner sc = new Scanner(System.in);
while (true) {
String msg = sc.nextLine();
//检查消息
if (msgHandle.checkMsg(msg)) {
continue;
}
//系统内置命令
if (msgHandle.innerCommand(msg)){
continue;
}
//真正的发送消息
msgHandle.sendMsg(msg) ;
LOGGER.info("{}:【{}】", configuration.getUserName(), msg);
}
}
}
... ...
package com.crossoverjie.cim.client.service;
/**
* Function: 自定义消息回调
*
* @author crossoverJie
* Date: 2018/12/26 17:24
* @since JDK 1.8
*/
public interface CustomMsgHandleListener {
/**
* 消息回调
* @param msg
*/
void handle(String msg);
}
... ...
package com.crossoverjie.cim.client.service;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
/**
* Function:消息处理器
*
* @author crossoverJie
* Date: 2018/12/26 11:11
* @since JDK 1.8
*/
public interface MsgHandle {
/**
* 统一的发送接口,包含了 groupChat p2pChat
* @param msg
*/
void sendMsg(String msg) ;
/**
* 群聊
* @param groupReqVO 群聊消息 其中的 userId 为发送者的 userID
* @throws Exception
*/
void groupChat(GroupReqVO groupReqVO) throws Exception ;
/**
* 私聊
* @param p2PReqVO 私聊请求
* @throws Exception
*/
void p2pChat(P2PReqVO p2PReqVO) throws Exception;
// TODO: 2018/12/26 后续对消息的处理可以优化为责任链模式
/**
* 校验消息
* @param msg
* @return 不能为空,后续可以加上一些敏感词
* @throws Exception
*/
boolean checkMsg(String msg) ;
/**
* 执行内部命令
* @param msg
* @return 是否应当跳过当前消息(包含了":" 就需要跳过)
*/
boolean innerCommand(String msg) ;
}
... ...
package com.crossoverjie.cim.client.service;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import java.util.List;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/22 22:26
* @since JDK 1.8
*/
public interface RouteRequest {
/**
* 群发消息
* @param groupReqVO 消息
* @throws Exception
*/
void sendGroupMsg(GroupReqVO groupReqVO) throws Exception;
/**
* 私聊
* @param p2PReqVO
* @throws Exception
*/
void sendP2PMsg(P2PReqVO p2PReqVO)throws Exception;
/**
* 获取服务器
* @return 服务ip+port
* @param loginReqVO
* @throws Exception
*/
CIMServerResVO.ServerInfo getCIMServer(LoginReqVO loginReqVO) throws Exception;
/**
*
* @return 获取所有在线用户
*/
List<OnlineUsersResVO.DataBodyBean> onlineUsers()throws Exception ;
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import com.crossoverjie.cim.common.enums.SystemCommandEnumType;
import com.crossoverjie.cim.common.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/26 11:15
* @since JDK 1.8
*/
@Service
public class MsgHandler implements MsgHandle {
private final static Logger LOGGER = LoggerFactory.getLogger(MsgHandler.class);
@Autowired
private RouteRequest routeRequest ;
@Autowired
private AppConfiguration configuration;
@Autowired
private ThreadPoolExecutor executor ;
@Autowired
private CIMClient cimClient ;
@Override
public void sendMsg(String msg) {
String[] totalMsg = msg.split(";;");
if (totalMsg.length > 1) {
//私聊
P2PReqVO p2PReqVO = new P2PReqVO();
p2PReqVO.setUserId(configuration.getUserId());
p2PReqVO.setReceiveUserId(Long.parseLong(totalMsg[0]));
p2PReqVO.setMsg(totalMsg[1]);
try {
p2pChat(p2PReqVO);
} catch (Exception e) {
LOGGER.error("Exception",e);
}
} else {
//群聊
GroupReqVO groupReqVO = new GroupReqVO(configuration.getUserId(), msg);
try {
groupChat(groupReqVO);
} catch (Exception e) {
LOGGER.error("Exception",e);
}
}
}
@Override
public void groupChat(GroupReqVO groupReqVO) throws Exception {
routeRequest.sendGroupMsg(groupReqVO);
}
@Override
public void p2pChat(P2PReqVO p2PReqVO) throws Exception {
routeRequest.sendP2PMsg(p2PReqVO);
}
@Override
public boolean checkMsg(String msg) {
if (StringUtil.isEmpty(msg)){
LOGGER.warn("不能发送空消息!");
return true;
}
return false;
}
@Override
public boolean innerCommand(String msg) {
if (msg.startsWith(":")){
Map<String, String> allStatusCode = SystemCommandEnumType.getAllStatusCode();
if (SystemCommandEnumType.QUIT.getCommandType().trim().equals(msg)){
//关闭系统
shutdown();
} else if (SystemCommandEnumType.ALL.getCommandType().trim().equals(msg)){
printAllCommand(allStatusCode);
} else if (SystemCommandEnumType.ONLINE_USER.getCommandType().toLowerCase().trim().equals(msg.toLowerCase())){
//打印在线用户
printOnlineUsers();
}else {
printAllCommand(allStatusCode);
}
return true ;
}else {
return false ;
}
}
/**
* 打印在线用户
*/
private void printOnlineUsers() {
try {
List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
LOGGER.info("userId={}=====userName={}",onlineUser.getUserId(),onlineUser.getUserName());
}
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
} catch (Exception e) {
LOGGER.error("Exception" ,e);
}
}
/**
* 关闭系统
*/
private void shutdown() {
LOGGER.info("系统关闭中。。。。");
executor.shutdown();
try {
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.info("线程池关闭中。。。。");
}
cimClient.close();
} catch (InterruptedException e) {
LOGGER.error("InterruptedException",e);
}
System.exit(0);
}
private void printAllCommand(Map<String, String> allStatusCode) {
LOGGER.warn("====================================");
for (Map.Entry<String, String> stringStringEntry : allStatusCode.entrySet()) {
String key = stringStringEntry.getKey();
String value = stringStringEntry.getValue();
LOGGER.warn(key + "----->" + value);
}
LOGGER.warn("====================================");
}
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.res.BaseResponse;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.List;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/22 22:27
* @since JDK 1.8
*/
@Service
public class RouteRequestImpl implements RouteRequest {
private final static Logger LOGGER = LoggerFactory.getLogger(RouteRequestImpl.class);
@Autowired
private OkHttpClient okHttpClient ;
private MediaType mediaType = MediaType.parse("application/json");
@Value("${cim.group.route.request.url}")
private String groupRouteRequestUrl;
@Value("${cim.p2p.route.request.url}")
private String p2pRouteRequestUrl;
@Value("${cim.server.route.request.url}")
private String serverRouteRequestUrl;
@Value("${cim.server.online.user.url}")
private String onlineUserUrl;
@Autowired
private AppConfiguration appConfiguration ;
@Override
public void sendGroupMsg(GroupReqVO groupReqVO) throws Exception {
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg",groupReqVO.getMsg());
jsonObject.put("userId",groupReqVO.getUserId());
RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
Request request = new Request.Builder()
.url(groupRouteRequestUrl)
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute() ;
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
}
@Override
public void sendP2PMsg(P2PReqVO p2PReqVO) throws Exception {
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg",p2PReqVO.getMsg());
jsonObject.put("userId",p2PReqVO.getUserId());
jsonObject.put("receiveUserId",p2PReqVO.getReceiveUserId());
RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
Request request = new Request.Builder()
.url(p2pRouteRequestUrl)
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute() ;
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
String json = response.body().string() ;
BaseResponse baseResponse = JSON.parseObject(json, BaseResponse.class);
//选择的账号不存在
if (baseResponse.getCode().equals(StatusEnum.OFF_LINE.getCode())){
LOGGER.error(p2PReqVO.getReceiveUserId() + ":" + StatusEnum.OFF_LINE.getMessage());
}
}
@Override
public CIMServerResVO.ServerInfo getCIMServer(LoginReqVO loginReqVO) throws Exception {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId",loginReqVO.getUserId());
jsonObject.put("userName",loginReqVO.getUserName());
RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
Request request = new Request.Builder()
.url(serverRouteRequestUrl)
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute() ;
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
String json = response.body().string();
CIMServerResVO cimServerResVO = JSON.parseObject(json, CIMServerResVO.class);
//重复登录
if (cimServerResVO.getCode().equals(StatusEnum.REPEAT_LOGIN.getCode())){
LOGGER.error(appConfiguration.getUserName() + ":" + StatusEnum.REPEAT_LOGIN.getMessage());
System.exit(-1);
}
if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){
throw new RuntimeException("route server exception code=" + cimServerResVO.getCode()) ;
}
return cimServerResVO.getDataBody();
}
@Override
public List<OnlineUsersResVO.DataBodyBean> onlineUsers() throws Exception{
JSONObject jsonObject = new JSONObject();
RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
Request request = new Request.Builder()
.url(onlineUserUrl)
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute() ;
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
String json = response.body().string() ;
OnlineUsersResVO onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class);
return onlineUsersResVO.getDataBody();
}
}
... ...
package com.crossoverjie.netty.action.client.util;
package com.crossoverjie.cim.client.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
... ...
package com.crossoverjie.netty.action.client.vo.req;
package com.crossoverjie.cim.client.vo.req;
import com.crossoverjie.netty.action.common.req.BaseRequest;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
... ...
package com.crossoverjie.cim.client.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
/**
* Function: 群发请求
*
* @author crossoverJie
* Date: 2018/05/21 15:56
* @since JDK 1.8
*/
public class GroupReqVO extends BaseRequest {
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "消息发送者的 userId", example = "1545574049323")
private Long userId ;
@NotNull(message = "msg 不能为空")
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
public GroupReqVO(Long userId, String msg) {
this.userId = userId;
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "GroupReqVO{" +
"userId=" + userId +
", msg='" + msg + '\'' +
"} " + super.toString();
}
}
... ...
package com.crossoverjie.cim.client.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 22:30
* @since JDK 1.8
*/
public class LoginReqVO extends BaseRequest{
private Long userId ;
private String userName ;
public LoginReqVO(Long userId, String userName) {
this.userId = userId;
this.userName = userName;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "LoginReqVO{" +
"userId=" + userId +
", userName='" + userName + '\'' +
"} " + super.toString();
}
}
... ...
package com.crossoverjie.cim.client.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
/**
* Function: 单聊请求
*
* @author crossoverJie
* Date: 2018/05/21 15:56
* @since JDK 1.8
*/
public class P2PReqVO extends BaseRequest {
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "消息发送者的 userId", example = "1545574049323")
private Long userId ;
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "消息接收者的 userId", example = "1545574049323")
private Long receiveUserId ;
@NotNull(message = "msg 不能为空")
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
public P2PReqVO() {
}
public P2PReqVO(Long userId, Long receiveUserId, String msg) {
this.userId = userId;
this.receiveUserId = receiveUserId;
this.msg = msg;
}
public Long getReceiveUserId() {
return receiveUserId;
}
public void setReceiveUserId(Long receiveUserId) {
this.receiveUserId = receiveUserId;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "GroupReqVO{" +
"userId=" + userId +
", msg='" + msg + '\'' +
"} " + super.toString();
}
}
... ...
package com.crossoverjie.netty.action.vo.req;
package com.crossoverjie.cim.client.vo.req;
import com.crossoverjie.netty.action.common.req.BaseRequest;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
... ... @@ -18,9 +18,9 @@ public class SendMsgReqVO extends BaseRequest {
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
@NotNull(message = "id 不能为空")
@ApiModelProperty(required = true, value = "id", example = "11")
private long id ;
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "userId", example = "11")
private Long userId ;
public String getMsg() {
return msg;
... ... @@ -30,11 +30,11 @@ public class SendMsgReqVO extends BaseRequest {
this.msg = msg;
}
public long getId() {
return id;
public long getUserId() {
return userId;
}
public void setId(long id) {
this.id = id;
public void setUserId(long userId) {
this.userId = userId;
}
}
... ...
package com.crossoverjie.netty.action.client.vo.req;
package com.crossoverjie.cim.client.vo.req;
import com.crossoverjie.netty.action.common.req.BaseRequest;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
... ...
package com.crossoverjie.cim.client.vo.res;
import java.io.Serializable;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 00:43
* @since JDK 1.8
*/
public class CIMServerResVO implements Serializable {
/**
* code : 9000
* message : 成功
* reqNo : null
* dataBody : {"ip":"127.0.0.1","port":8081}
*/
private String code;
private String message;
private Object reqNo;
private ServerInfo dataBody;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Object getReqNo() {
return reqNo;
}
public void setReqNo(Object reqNo) {
this.reqNo = reqNo;
}
public ServerInfo getDataBody() {
return dataBody;
}
public void setDataBody(ServerInfo dataBody) {
this.dataBody = dataBody;
}
public static class ServerInfo {
/**
* ip : 127.0.0.1
* port : 8081
*/
private String ip ;
private Integer cimServerPort;
private Integer httpPort;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Integer getCimServerPort() {
return cimServerPort;
}
public void setCimServerPort(Integer cimServerPort) {
this.cimServerPort = cimServerPort;
}
public Integer getHttpPort() {
return httpPort;
}
public void setHttpPort(Integer httpPort) {
this.httpPort = httpPort;
}
@Override
public String toString() {
return "ServerInfo{" +
"ip='" + ip + '\'' +
", cimServerPort=" + cimServerPort +
", httpPort=" + httpPort +
'}';
}
}
@Override
public String toString() {
return "CIMServerResVO{" +
"code='" + code + '\'' +
", message='" + message + '\'' +
", reqNo=" + reqNo +
", dataBody=" + dataBody +
'}';
}
}
... ...
package com.crossoverjie.cim.client.vo.res;
import java.util.List;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/26 23:17
* @since JDK 1.8
*/
public class OnlineUsersResVO {
/**
* code : 9000
* message : 成功
* reqNo : null
* dataBody : [{"userId":1545574841528,"userName":"zhangsan"},{"userId":1545574871143,"userName":"crossoverJie"}]
*/
private String code;
private String message;
private Object reqNo;
private List<DataBodyBean> dataBody;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Object getReqNo() {
return reqNo;
}
public void setReqNo(Object reqNo) {
this.reqNo = reqNo;
}
public List<DataBodyBean> getDataBody() {
return dataBody;
}
public void setDataBody(List<DataBodyBean> dataBody) {
this.dataBody = dataBody;
}
public static class DataBodyBean {
/**
* userId : 1545574841528
* userName : zhangsan
*/
private long userId;
private String userName;
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
}
... ...
spring.application.name=netty-heartbeat-client
# web port
server.port=8082
# 是否打开swagger
swagger.enable = true
logging.level.root=info
###=======生产模拟======###
# 群发消息
cim.group.route.request.url=http://45.78.28.220:8083/groupRoute
# 私聊消息
cim.p2p.route.request.url=http://45.78.28.220:8083/p2pRoute
# 登录并获取服务器ip+port
cim.server.route.request.url=http://45.78.28.220:8083/login
# 在线用户
cim.server.online.user.url=http://45.78.28.220:8083/onlineUser
###=======本地模拟======###
## 群发消息
#cim.group.route.request.url=http://localhost:8083/groupRoute
#
## 私聊消息
#cim.p2p.route.request.url=http://localhost:8083/p2pRoute
#
## 登录并获取服务器ip+port
#cim.server.route.request.url=http://localhost:8083/login
#
## 在线用户
#cim.server.online.user=http://localhost:8083/onlineUser
# 客户端唯一ID
cim.user.id=1545574841528
cim.user.userName=zhangsan
# 回调线程队列大小
cim.callback.thread.queue.size = 2
# 回调线程池大小
cim.callback.thread.pool.size = 2
# 关闭健康检查权限
management.security.enabled=false
# SpringAdmin 地址
spring.boot.admin.url=http://127.0.0.1:8888
\ No newline at end of file
... ...
▄████▄ ██▓ ███▄ ▄███▓
▒██▀ ▀█ ▓██▒▓██▒▀█▀ ██▒
▒▓█ ▄ ▒██▒▓██ ▓██░
▒▓▓▄ ▄██▒░██░▒██ ▒██
▒ ▓███▀ ░░██░▒██▒ ░██▒
░ ░▒ ▒ ░░▓ ░ ▒░ ░ ░
░ ▒ ▒ ░░ ░ ░
░ ▒ ░░ ░
░ ░ ░ ░
Power by @crossoverJie
... ...
package com.crossoverjie.cim.server.test;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Function:
*
* @author crossoverJie
* Date: 22/05/2018 18:44
* @since JDK 1.8
*/
public class CommonTest {
private final static Logger LOGGER = LoggerFactory.getLogger(CommonTest.class);
@Test
public void test() {
String json = "{\"code\":\"9000\",\"message\":\"成功\",\"reqNo\":null,\"dataBody\":{\"ip\":\"127.0.0.1\",\"port\":8081}}" ;
CIMServerResVO cimServerResVO = JSON.parseObject(json, CIMServerResVO.class);
System.out.println(cimServerResVO.toString());
String text = "nihaoaaa" ;
String[] split = text.split(" ");
System.out.println(split.length);
}
@Test
public void onlineUser(){
List<OnlineUsersResVO.DataBodyBean> onlineUsers = new ArrayList<>(64) ;
OnlineUsersResVO.DataBodyBean bodyBean = new OnlineUsersResVO.DataBodyBean() ;
bodyBean.setUserId(100L);
bodyBean.setUserName("zhangsan");
onlineUsers.add(bodyBean) ;
bodyBean = new OnlineUsersResVO.DataBodyBean();
bodyBean.setUserId(200L);
bodyBean.setUserName("crossoverJie");
onlineUsers.add(bodyBean) ;
LOGGER.info("list={}",JSON.toJSONString(onlineUsers));
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
LOGGER.info("userId={}=====userName={}",onlineUser.getUserId(),onlineUser.getUserName());
}
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
}
... ...
package com.crossoverjie.cim.server.test;
import com.crossoverjie.cim.client.CIMClientApplication;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 22:39
* @since JDK 1.8
*/
@SpringBootTest(classes = CIMClientApplication.class)
@RunWith(SpringRunner.class)
public class RouteTest {
private final static Logger LOGGER = LoggerFactory.getLogger(RouteTest.class);
@Value("${cim.user.id}")
private long userId;
@Value("${cim.user.userName}")
private String userName;
@Autowired
private RouteRequest routeRequest ;
@Test
public void test() throws Exception {
LoginReqVO vo = new LoginReqVO(userId,userName) ;
CIMServerResVO.ServerInfo cimServer = routeRequest.getCIMServer(vo);
LOGGER.info("cimServer=[{}]",cimServer.toString());
}
}
... ...
... ... @@ -3,13 +3,14 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>netty-action</artifactId>
<artifactId>cim</artifactId>
<groupId>com.crossoverjie.netty</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>1.0.1-SNAPSHOT</version>
<artifactId>netty-action-common</artifactId>
<artifactId>cim-common</artifactId>
<dependencies>
... ... @@ -19,6 +20,11 @@
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<scope>compile</scope>
... ...
package com.crossoverjie.netty.action.common.constant;
package com.crossoverjie.cim.common.constant;
/**
* Function:常量
... ... @@ -22,4 +22,24 @@ public class Constants {
public static final String COUNTER_CLIENT_PUSH_COUNT = "counter.client.push.count" ;
/**
* 自定义报文类型
*/
public static class CommandType{
/**
* 登录
*/
public static final int LOGIN = 1 ;
/**
* 业务消息
*/
public static final int MSG = 2 ;
/**
* ping
*/
public static final int PING = 3 ;
}
}
... ...
package com.crossoverjie.netty.action.common.enums;
package com.crossoverjie.cim.common.enums;
import java.util.ArrayList;
import java.util.List;
... ... @@ -14,8 +14,11 @@ public enum StatusEnum {
/** 失败 */
FAIL("4000", "失败"),
/** 重复请求 */
REPEAT_REQUEST("5000", "重复请求"),
/** 重复登录 */
REPEAT_LOGIN("5000", "账号重复登录,请退出一个账号!"),
/** 账号不在线 */
OFF_LINE("7000", "你选择的账号不在线,请重新选择!"),
/** 请求限流 */
REQUEST_LIMIT("6000", "请求限流"),
... ...
package com.crossoverjie.cim.common.enums;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/26 18:38
* @since JDK 1.8
*/
public enum SystemCommandEnumType {
ALL(":all ","获取所有命令"),
ONLINE_USER(":olu","获取所有在线用户"),
QUIT(":q ","退出程序")
;
/** 枚举值码 */
private final String commandType;
/** 枚举描述 */
private final String desc;
/**
* 构建一个 。
* @param commandType 枚举值码。
* @param desc 枚举描述。
*/
private SystemCommandEnumType(String commandType, String desc) {
this.commandType = commandType;
this.desc = desc;
}
/**
* 得到枚举值码。
* @return 枚举值码。
*/
public String getCommandType() {
return commandType;
}
/**
* 得到枚举描述。
* @return 枚举描述。
*/
public String getDesc() {
return desc;
}
/**
* 得到枚举值码。
* @return 枚举值码。
*/
public String code() {
return commandType;
}
/**
* 得到枚举描述。
* @return 枚举描述。
*/
public String message() {
return desc;
}
/**
* 获取全部枚举值码。
*
* @return 全部枚举值码。
*/
public static Map<String,String> getAllStatusCode() {
List<String> list = new ArrayList<String>();
Map<String,String> map = new HashMap<String, String>(16) ;
for (SystemCommandEnumType status : values()) {
list.add(status.code());
map.put(status.getCommandType(),status.getDesc()) ;
}
return map;
}
}
\ No newline at end of file
... ...
package com.crossoverjie.cim.common.exception;
import com.crossoverjie.cim.common.enums.StatusEnum;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/8/25 15:26
* @since JDK 1.8
*/
public class CIMException extends GenericException {
public CIMException(String errorCode, String errorMessage) {
super(errorMessage);
this.errorCode = errorCode;
this.errorMessage = errorMessage;
}
public CIMException(Exception e, String errorCode, String errorMessage) {
super(e, errorMessage);
this.errorCode = errorCode;
this.errorMessage = errorMessage;
}
public CIMException(String message) {
super(message);
this.errorMessage = message;
}
public CIMException(StatusEnum statusEnum) {
super(statusEnum.getMessage());
this.errorMessage = statusEnum.message();
this.errorCode = statusEnum.getCode();
}
public CIMException(StatusEnum statusEnum, String message) {
super(message);
this.errorMessage = message;
this.errorCode = statusEnum.getCode();
}
public CIMException(Exception oriEx) {
super(oriEx);
}
public CIMException(Throwable oriEx) {
super(oriEx);
}
public CIMException(String message, Exception oriEx) {
super(message, oriEx);
this.errorMessage = message;
}
public CIMException(String message, Throwable oriEx) {
super(message, oriEx);
this.errorMessage = message;
}
public static boolean isResetByPeer(String msg) {
if ("Connection reset by peer".equals(msg)) {
return true;
}
return false;
}
}
... ...
package com.crossoverjie.cim.common.exception;
import java.io.Serializable;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/8/25 15:27
* @since JDK 1.8
*/
public class GenericException extends RuntimeException implements Serializable {
private static final long serialVersionUID = 1L;
String errorCode;
String errorMessage;
public GenericException() {
}
public GenericException(String message) {
super(message);
}
public GenericException(Exception oriEx) {
super(oriEx);
}
public GenericException(Exception oriEx, String message) {
super(message, oriEx);
}
public GenericException(Throwable oriEx) {
super(oriEx);
}
public GenericException(String message, Exception oriEx) {
super(message, oriEx);
}
public GenericException(String message, Throwable oriEx) {
super(message, oriEx);
}
public String getErrorCode() {
return this.errorCode;
}
public void setErrorCode(String errorCode) {
this.errorCode = errorCode;
}
public String getErrorMessage() {
return this.errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
}
\ No newline at end of file
... ...
package com.crossoverjie.cim.common.pojo;
/**
* Function: 用户信息
*
* @author crossoverJie
* Date: 2018/12/24 02:33
* @since JDK 1.8
*/
public class CIMUserInfo {
private Long userId ;
private String userName ;
public CIMUserInfo(Long userId, String userName) {
this.userId = userId;
this.userName = userName;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "CIMUserInfo{" +
"userId=" + userId +
", userName='" + userName + '\'' +
'}';
}
}
... ...
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: BaseRequestProto.proto
package com.crossoverjie.netty.action.common.protocol;
package com.crossoverjie.cim.common.protocol;
public final class BaseRequestProto {
private BaseRequestProto() {}
public final class CIMRequestProto {
private CIMRequestProto() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistryLite registry) {
}
... ... @@ -14,18 +14,18 @@ public final class BaseRequestProto {
registerAllExtensions(
(com.google.protobuf.ExtensionRegistryLite) registry);
}
public interface RequestProtocolOrBuilder extends
// @@protoc_insertion_point(interface_extends:protocol.RequestProtocol)
public interface CIMReqProtocolOrBuilder extends
// @@protoc_insertion_point(interface_extends:protocol.CIMReqProtocol)
com.google.protobuf.MessageOrBuilder {
/**
* <code>required int32 requestId = 2;</code>
* <code>required int64 requestId = 2;</code>
*/
boolean hasRequestId();
/**
* <code>required int32 requestId = 2;</code>
* <code>required int64 requestId = 2;</code>
*/
int getRequestId();
long getRequestId();
/**
* <code>required string reqMsg = 1;</code>
... ... @@ -40,22 +40,32 @@ public final class BaseRequestProto {
*/
com.google.protobuf.ByteString
getReqMsgBytes();
/**
* <code>required int32 type = 3;</code>
*/
boolean hasType();
/**
* <code>required int32 type = 3;</code>
*/
int getType();
}
/**
* Protobuf type {@code protocol.RequestProtocol}
* Protobuf type {@code protocol.CIMReqProtocol}
*/
public static final class RequestProtocol extends
public static final class CIMReqProtocol extends
com.google.protobuf.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:protocol.RequestProtocol)
RequestProtocolOrBuilder {
// @@protoc_insertion_point(message_implements:protocol.CIMReqProtocol)
CIMReqProtocolOrBuilder {
private static final long serialVersionUID = 0L;
// Use RequestProtocol.newBuilder() to construct.
private RequestProtocol(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
// Use CIMReqProtocol.newBuilder() to construct.
private CIMReqProtocol(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
super(builder);
}
private RequestProtocol() {
requestId_ = 0;
private CIMReqProtocol() {
requestId_ = 0L;
reqMsg_ = "";
type_ = 0;
}
@Override
... ... @@ -63,7 +73,7 @@ public final class BaseRequestProto {
getUnknownFields() {
return this.unknownFields;
}
private RequestProtocol(
private CIMReqProtocol(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
... ... @@ -97,7 +107,12 @@ public final class BaseRequestProto {
}
case 16: {
bitField0_ |= 0x00000001;
requestId_ = input.readInt32();
requestId_ = input.readInt64();
break;
}
case 24: {
bitField0_ |= 0x00000004;
type_ = input.readInt32();
break;
}
}
... ... @@ -114,29 +129,29 @@ public final class BaseRequestProto {
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return BaseRequestProto.internal_static_protocol_RequestProtocol_descriptor;
return CIMRequestProto.internal_static_protocol_CIMReqProtocol_descriptor;
}
protected FieldAccessorTable
internalGetFieldAccessorTable() {
return BaseRequestProto.internal_static_protocol_RequestProtocol_fieldAccessorTable
return CIMRequestProto.internal_static_protocol_CIMReqProtocol_fieldAccessorTable
.ensureFieldAccessorsInitialized(
RequestProtocol.class, Builder.class);
CIMReqProtocol.class, Builder.class);
}
private int bitField0_;
public static final int REQUESTID_FIELD_NUMBER = 2;
private int requestId_;
private long requestId_;
/**
* <code>required int32 requestId = 2;</code>
* <code>required int64 requestId = 2;</code>
*/
public boolean hasRequestId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required int32 requestId = 2;</code>
* <code>required int64 requestId = 2;</code>
*/
public int getRequestId() {
public long getRequestId() {
return requestId_;
}
... ... @@ -182,6 +197,21 @@ public final class BaseRequestProto {
}
}
public static final int TYPE_FIELD_NUMBER = 3;
private int type_;
/**
* <code>required int32 type = 3;</code>
*/
public boolean hasType() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>required int32 type = 3;</code>
*/
public int getType() {
return type_;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
... ... @@ -196,6 +226,10 @@ public final class BaseRequestProto {
memoizedIsInitialized = 0;
return false;
}
if (!hasType()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
... ... @@ -206,7 +240,10 @@ public final class BaseRequestProto {
com.google.protobuf.GeneratedMessageV3.writeString(output, 1, reqMsg_);
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt32(2, requestId_);
output.writeInt64(2, requestId_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeInt32(3, type_);
}
unknownFields.writeTo(output);
}
... ... @@ -221,7 +258,11 @@ public final class BaseRequestProto {
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(2, requestId_);
.computeInt64Size(2, requestId_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(3, type_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
... ... @@ -233,10 +274,10 @@ public final class BaseRequestProto {
if (obj == this) {
return true;
}
if (!(obj instanceof RequestProtocol)) {
if (!(obj instanceof CIMReqProtocol)) {
return super.equals(obj);
}
RequestProtocol other = (RequestProtocol) obj;
CIMReqProtocol other = (CIMReqProtocol) obj;
boolean result = true;
result = result && (hasRequestId() == other.hasRequestId());
... ... @@ -249,6 +290,11 @@ public final class BaseRequestProto {
result = result && getReqMsg()
.equals(other.getReqMsg());
}
result = result && (hasType() == other.hasType());
if (hasType()) {
result = result && (getType()
== other.getType());
}
result = result && unknownFields.equals(other.unknownFields);
return result;
}
... ... @@ -262,80 +308,85 @@ public final class BaseRequestProto {
hash = (19 * hash) + getDescriptor().hashCode();
if (hasRequestId()) {
hash = (37 * hash) + REQUESTID_FIELD_NUMBER;
hash = (53 * hash) + getRequestId();
hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
getRequestId());
}
if (hasReqMsg()) {
hash = (37 * hash) + REQMSG_FIELD_NUMBER;
hash = (53 * hash) + getReqMsg().hashCode();
}
if (hasType()) {
hash = (37 * hash) + TYPE_FIELD_NUMBER;
hash = (53 * hash) + getType();
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
}
public static RequestProtocol parseFrom(
public static CIMReqProtocol parseFrom(
java.nio.ByteBuffer data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static RequestProtocol parseFrom(
public static CIMReqProtocol parseFrom(
java.nio.ByteBuffer data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static RequestProtocol parseFrom(
public static CIMReqProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static RequestProtocol parseFrom(
public static CIMReqProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static RequestProtocol parseFrom(byte[] data)
public static CIMReqProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static RequestProtocol parseFrom(
public static CIMReqProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static RequestProtocol parseFrom(java.io.InputStream input)
public static CIMReqProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static RequestProtocol parseFrom(
public static CIMReqProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
public static RequestProtocol parseDelimitedFrom(java.io.InputStream input)
public static CIMReqProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
public static RequestProtocol parseDelimitedFrom(
public static CIMReqProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input, extensionRegistry);
}
public static RequestProtocol parseFrom(
public static CIMReqProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static RequestProtocol parseFrom(
public static CIMReqProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
... ... @@ -347,7 +398,7 @@ public final class BaseRequestProto {
public static Builder newBuilder() {
return DEFAULT_INSTANCE.toBuilder();
}
public static Builder newBuilder(RequestProtocol prototype) {
public static Builder newBuilder(CIMReqProtocol prototype) {
return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
}
public Builder toBuilder() {
... ... @@ -362,25 +413,25 @@ public final class BaseRequestProto {
return builder;
}
/**
* Protobuf type {@code protocol.RequestProtocol}
* Protobuf type {@code protocol.CIMReqProtocol}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
// @@protoc_insertion_point(builder_implements:protocol.RequestProtocol)
RequestProtocolOrBuilder {
// @@protoc_insertion_point(builder_implements:protocol.CIMReqProtocol)
CIMReqProtocolOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return BaseRequestProto.internal_static_protocol_RequestProtocol_descriptor;
return CIMRequestProto.internal_static_protocol_CIMReqProtocol_descriptor;
}
protected FieldAccessorTable
internalGetFieldAccessorTable() {
return BaseRequestProto.internal_static_protocol_RequestProtocol_fieldAccessorTable
return CIMRequestProto.internal_static_protocol_CIMReqProtocol_fieldAccessorTable
.ensureFieldAccessorsInitialized(
RequestProtocol.class, Builder.class);
CIMReqProtocol.class, Builder.class);
}
// Construct using com.crossoverjie.netty.action.protocol.BaseRequestProto.RequestProtocol.newBuilder()
// Construct using com.crossoverjie.cim.common.protocol.CIMRequestProto.CIMReqProtocol.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
... ... @@ -397,32 +448,34 @@ public final class BaseRequestProto {
}
public Builder clear() {
super.clear();
requestId_ = 0;
requestId_ = 0L;
bitField0_ = (bitField0_ & ~0x00000001);
reqMsg_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
type_ = 0;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return BaseRequestProto.internal_static_protocol_RequestProtocol_descriptor;
return CIMRequestProto.internal_static_protocol_CIMReqProtocol_descriptor;
}
public RequestProtocol getDefaultInstanceForType() {
return RequestProtocol.getDefaultInstance();
public CIMReqProtocol getDefaultInstanceForType() {
return CIMReqProtocol.getDefaultInstance();
}
public RequestProtocol build() {
RequestProtocol result = buildPartial();
public CIMReqProtocol build() {
CIMReqProtocol result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public RequestProtocol buildPartial() {
RequestProtocol result = new RequestProtocol(this);
public CIMReqProtocol buildPartial() {
CIMReqProtocol result = new CIMReqProtocol(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
... ... @@ -433,6 +486,10 @@ public final class BaseRequestProto {
to_bitField0_ |= 0x00000002;
}
result.reqMsg_ = reqMsg_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.type_ = type_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
... ... @@ -465,16 +522,16 @@ public final class BaseRequestProto {
return (Builder) super.addRepeatedField(field, value);
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof RequestProtocol) {
return mergeFrom((RequestProtocol)other);
if (other instanceof CIMReqProtocol) {
return mergeFrom((CIMReqProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(RequestProtocol other) {
if (other == RequestProtocol.getDefaultInstance()) return this;
public Builder mergeFrom(CIMReqProtocol other) {
if (other == CIMReqProtocol.getDefaultInstance()) return this;
if (other.hasRequestId()) {
setRequestId(other.getRequestId());
}
... ... @@ -483,6 +540,9 @@ public final class BaseRequestProto {
reqMsg_ = other.reqMsg_;
onChanged();
}
if (other.hasType()) {
setType(other.getType());
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
... ... @@ -495,6 +555,9 @@ public final class BaseRequestProto {
if (!hasReqMsg()) {
return false;
}
if (!hasType()) {
return false;
}
return true;
}
... ... @@ -502,11 +565,11 @@ public final class BaseRequestProto {
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
RequestProtocol parsedMessage = null;
CIMReqProtocol parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (RequestProtocol) e.getUnfinishedMessage();
parsedMessage = (CIMReqProtocol) e.getUnfinishedMessage();
throw e.unwrapIOException();
} finally {
if (parsedMessage != null) {
... ... @@ -517,34 +580,34 @@ public final class BaseRequestProto {
}
private int bitField0_;
private int requestId_ ;
private long requestId_ ;
/**
* <code>required int32 requestId = 2;</code>
* <code>required int64 requestId = 2;</code>
*/
public boolean hasRequestId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required int32 requestId = 2;</code>
* <code>required int64 requestId = 2;</code>
*/
public int getRequestId() {
public long getRequestId() {
return requestId_;
}
/**
* <code>required int32 requestId = 2;</code>
* <code>required int64 requestId = 2;</code>
*/
public Builder setRequestId(int value) {
public Builder setRequestId(long value) {
bitField0_ |= 0x00000001;
requestId_ = value;
onChanged();
return this;
}
/**
* <code>required int32 requestId = 2;</code>
* <code>required int64 requestId = 2;</code>
*/
public Builder clearRequestId() {
bitField0_ = (bitField0_ & ~0x00000001);
requestId_ = 0;
requestId_ = 0L;
onChanged();
return this;
}
... ... @@ -624,6 +687,38 @@ public final class BaseRequestProto {
onChanged();
return this;
}
private int type_ ;
/**
* <code>required int32 type = 3;</code>
*/
public boolean hasType() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>required int32 type = 3;</code>
*/
public int getType() {
return type_;
}
/**
* <code>required int32 type = 3;</code>
*/
public Builder setType(int value) {
bitField0_ |= 0x00000004;
type_ = value;
onChanged();
return this;
}
/**
* <code>required int32 type = 3;</code>
*/
public Builder clearType() {
bitField0_ = (bitField0_ & ~0x00000004);
type_ = 0;
onChanged();
return this;
}
public final Builder setUnknownFields(
final com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
... ... @@ -635,49 +730,49 @@ public final class BaseRequestProto {
}
// @@protoc_insertion_point(builder_scope:protocol.RequestProtocol)
// @@protoc_insertion_point(builder_scope:protocol.CIMReqProtocol)
}
// @@protoc_insertion_point(class_scope:protocol.RequestProtocol)
private static final RequestProtocol DEFAULT_INSTANCE;
// @@protoc_insertion_point(class_scope:protocol.CIMReqProtocol)
private static final CIMReqProtocol DEFAULT_INSTANCE;
static {
DEFAULT_INSTANCE = new RequestProtocol();
DEFAULT_INSTANCE = new CIMReqProtocol();
}
public static RequestProtocol getDefaultInstance() {
public static CIMReqProtocol getDefaultInstance() {
return DEFAULT_INSTANCE;
}
@Deprecated public static final com.google.protobuf.Parser<RequestProtocol>
PARSER = new com.google.protobuf.AbstractParser<RequestProtocol>() {
public RequestProtocol parsePartialFrom(
@Deprecated public static final com.google.protobuf.Parser<CIMReqProtocol>
PARSER = new com.google.protobuf.AbstractParser<CIMReqProtocol>() {
public CIMReqProtocol parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new RequestProtocol(input, extensionRegistry);
return new CIMReqProtocol(input, extensionRegistry);
}
};
public static com.google.protobuf.Parser<RequestProtocol> parser() {
public static com.google.protobuf.Parser<CIMReqProtocol> parser() {
return PARSER;
}
@Override
public com.google.protobuf.Parser<RequestProtocol> getParserForType() {
public com.google.protobuf.Parser<CIMReqProtocol> getParserForType() {
return PARSER;
}
public RequestProtocol getDefaultInstanceForType() {
public CIMReqProtocol getDefaultInstanceForType() {
return DEFAULT_INSTANCE;
}
}
private static final com.google.protobuf.Descriptors.Descriptor
internal_static_protocol_RequestProtocol_descriptor;
internal_static_protocol_CIMReqProtocol_descriptor;
private static final
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
internal_static_protocol_RequestProtocol_fieldAccessorTable;
internal_static_protocol_CIMReqProtocol_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
... ... @@ -687,10 +782,10 @@ public final class BaseRequestProto {
descriptor;
static {
String[] descriptorData = {
"\n\026BaseRequestProto.proto\022\010protocol\"4\n\017Re" +
"questProtocol\022\021\n\trequestId\030\002 \002(\005\022\016\n\006reqM" +
"sg\030\001 \002(\tB:\n&com.crossoverjie.netty.actio" +
"n.protocolB\020BaseRequestProto"
"\n\026BaseRequestProto.proto\022\010protocol\"A\n\016CI" +
"MReqProtocol\022\021\n\trequestId\030\002 \002(\003\022\016\n\006reqMs" +
"g\030\001 \002(\t\022\014\n\004type\030\003 \002(\005B7\n$com.crossoverji" +
"e.cim.common.protocolB\017CIMRequestProto"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
... ... @@ -704,12 +799,12 @@ public final class BaseRequestProto {
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
internal_static_protocol_RequestProtocol_descriptor =
internal_static_protocol_CIMReqProtocol_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_protocol_RequestProtocol_fieldAccessorTable = new
internal_static_protocol_CIMReqProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_protocol_RequestProtocol_descriptor,
new String[] { "RequestId", "ReqMsg", });
internal_static_protocol_CIMReqProtocol_descriptor,
new String[] { "RequestId", "ReqMsg", "Type", });
}
// @@protoc_insertion_point(outer_class_scope)
... ...
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: BaseResponseProto.proto
package com.crossoverjie.netty.action.common.protocol;
package com.crossoverjie.cim.common.protocol;
public final class BaseResponseProto {
private BaseResponseProto() {}
public final class CIMResponseProto {
private CIMResponseProto() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistryLite registry) {
}
... ... @@ -14,18 +14,18 @@ public final class BaseResponseProto {
registerAllExtensions(
(com.google.protobuf.ExtensionRegistryLite) registry);
}
public interface ResponseProtocolOrBuilder extends
// @@protoc_insertion_point(interface_extends:protocol.ResponseProtocol)
public interface CIMResProtocolOrBuilder extends
// @@protoc_insertion_point(interface_extends:protocol.CIMResProtocol)
com.google.protobuf.MessageOrBuilder {
/**
* <code>required int32 responseId = 2;</code>
* <code>required int64 responseId = 2;</code>
*/
boolean hasResponseId();
/**
* <code>required int32 responseId = 2;</code>
* <code>required int64 responseId = 2;</code>
*/
int getResponseId();
long getResponseId();
/**
* <code>required string resMsg = 1;</code>
... ... @@ -42,19 +42,19 @@ public final class BaseResponseProto {
getResMsgBytes();
}
/**
* Protobuf type {@code protocol.ResponseProtocol}
* Protobuf type {@code protocol.CIMResProtocol}
*/
public static final class ResponseProtocol extends
public static final class CIMResProtocol extends
com.google.protobuf.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:protocol.ResponseProtocol)
ResponseProtocolOrBuilder {
// @@protoc_insertion_point(message_implements:protocol.CIMResProtocol)
CIMResProtocolOrBuilder {
private static final long serialVersionUID = 0L;
// Use ResponseProtocol.newBuilder() to construct.
private ResponseProtocol(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
// Use CIMResProtocol.newBuilder() to construct.
private CIMResProtocol(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
super(builder);
}
private ResponseProtocol() {
responseId_ = 0;
private CIMResProtocol() {
responseId_ = 0L;
resMsg_ = "";
}
... ... @@ -63,7 +63,7 @@ public final class BaseResponseProto {
getUnknownFields() {
return this.unknownFields;
}
private ResponseProtocol(
private CIMResProtocol(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
... ... @@ -97,7 +97,7 @@ public final class BaseResponseProto {
}
case 16: {
bitField0_ |= 0x00000001;
responseId_ = input.readInt32();
responseId_ = input.readInt64();
break;
}
}
... ... @@ -114,29 +114,29 @@ public final class BaseResponseProto {
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return BaseResponseProto.internal_static_protocol_ResponseProtocol_descriptor;
return CIMResponseProto.internal_static_protocol_CIMResProtocol_descriptor;
}
protected FieldAccessorTable
internalGetFieldAccessorTable() {
return BaseResponseProto.internal_static_protocol_ResponseProtocol_fieldAccessorTable
return CIMResponseProto.internal_static_protocol_CIMResProtocol_fieldAccessorTable
.ensureFieldAccessorsInitialized(
ResponseProtocol.class, Builder.class);
CIMResProtocol.class, Builder.class);
}
private int bitField0_;
public static final int RESPONSEID_FIELD_NUMBER = 2;
private int responseId_;
private long responseId_;
/**
* <code>required int32 responseId = 2;</code>
* <code>required int64 responseId = 2;</code>
*/
public boolean hasResponseId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required int32 responseId = 2;</code>
* <code>required int64 responseId = 2;</code>
*/
public int getResponseId() {
public long getResponseId() {
return responseId_;
}
... ... @@ -206,7 +206,7 @@ public final class BaseResponseProto {
com.google.protobuf.GeneratedMessageV3.writeString(output, 1, resMsg_);
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt32(2, responseId_);
output.writeInt64(2, responseId_);
}
unknownFields.writeTo(output);
}
... ... @@ -221,7 +221,7 @@ public final class BaseResponseProto {
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(2, responseId_);
.computeInt64Size(2, responseId_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
... ... @@ -233,10 +233,10 @@ public final class BaseResponseProto {
if (obj == this) {
return true;
}
if (!(obj instanceof ResponseProtocol)) {
if (!(obj instanceof CIMResProtocol)) {
return super.equals(obj);
}
ResponseProtocol other = (ResponseProtocol) obj;
CIMResProtocol other = (CIMResProtocol) obj;
boolean result = true;
result = result && (hasResponseId() == other.hasResponseId());
... ... @@ -262,7 +262,8 @@ public final class BaseResponseProto {
hash = (19 * hash) + getDescriptor().hashCode();
if (hasResponseId()) {
hash = (37 * hash) + RESPONSEID_FIELD_NUMBER;
hash = (53 * hash) + getResponseId();
hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
getResponseId());
}
if (hasResMsg()) {
hash = (37 * hash) + RESMSG_FIELD_NUMBER;
... ... @@ -273,69 +274,69 @@ public final class BaseResponseProto {
return hash;
}
public static ResponseProtocol parseFrom(
public static CIMResProtocol parseFrom(
java.nio.ByteBuffer data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static ResponseProtocol parseFrom(
public static CIMResProtocol parseFrom(
java.nio.ByteBuffer data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static ResponseProtocol parseFrom(
public static CIMResProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static ResponseProtocol parseFrom(
public static CIMResProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static ResponseProtocol parseFrom(byte[] data)
public static CIMResProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static ResponseProtocol parseFrom(
public static CIMResProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static ResponseProtocol parseFrom(java.io.InputStream input)
public static CIMResProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static ResponseProtocol parseFrom(
public static CIMResProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
public static ResponseProtocol parseDelimitedFrom(java.io.InputStream input)
public static CIMResProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
public static ResponseProtocol parseDelimitedFrom(
public static CIMResProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input, extensionRegistry);
}
public static ResponseProtocol parseFrom(
public static CIMResProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static ResponseProtocol parseFrom(
public static CIMResProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
... ... @@ -347,7 +348,7 @@ public final class BaseResponseProto {
public static Builder newBuilder() {
return DEFAULT_INSTANCE.toBuilder();
}
public static Builder newBuilder(ResponseProtocol prototype) {
public static Builder newBuilder(CIMResProtocol prototype) {
return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
}
public Builder toBuilder() {
... ... @@ -362,25 +363,25 @@ public final class BaseResponseProto {
return builder;
}
/**
* Protobuf type {@code protocol.ResponseProtocol}
* Protobuf type {@code protocol.CIMResProtocol}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
// @@protoc_insertion_point(builder_implements:protocol.ResponseProtocol)
ResponseProtocolOrBuilder {
// @@protoc_insertion_point(builder_implements:protocol.CIMResProtocol)
CIMResProtocolOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return BaseResponseProto.internal_static_protocol_ResponseProtocol_descriptor;
return CIMResponseProto.internal_static_protocol_CIMResProtocol_descriptor;
}
protected FieldAccessorTable
internalGetFieldAccessorTable() {
return BaseResponseProto.internal_static_protocol_ResponseProtocol_fieldAccessorTable
return CIMResponseProto.internal_static_protocol_CIMResProtocol_fieldAccessorTable
.ensureFieldAccessorsInitialized(
ResponseProtocol.class, Builder.class);
CIMResProtocol.class, Builder.class);
}
// Construct using com.crossoverjie.netty.action.protocol.BaseResponseProto.ResponseProtocol.newBuilder()
// Construct using com.crossoverjie.cim.common.protocol.CIMResponseProto.CIMResProtocol.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
... ... @@ -397,7 +398,7 @@ public final class BaseResponseProto {
}
public Builder clear() {
super.clear();
responseId_ = 0;
responseId_ = 0L;
bitField0_ = (bitField0_ & ~0x00000001);
resMsg_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
... ... @@ -406,23 +407,23 @@ public final class BaseResponseProto {
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return BaseResponseProto.internal_static_protocol_ResponseProtocol_descriptor;
return CIMResponseProto.internal_static_protocol_CIMResProtocol_descriptor;
}
public ResponseProtocol getDefaultInstanceForType() {
return ResponseProtocol.getDefaultInstance();
public CIMResProtocol getDefaultInstanceForType() {
return CIMResProtocol.getDefaultInstance();
}
public ResponseProtocol build() {
ResponseProtocol result = buildPartial();
public CIMResProtocol build() {
CIMResProtocol result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public ResponseProtocol buildPartial() {
ResponseProtocol result = new ResponseProtocol(this);
public CIMResProtocol buildPartial() {
CIMResProtocol result = new CIMResProtocol(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
... ... @@ -465,16 +466,16 @@ public final class BaseResponseProto {
return (Builder) super.addRepeatedField(field, value);
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof ResponseProtocol) {
return mergeFrom((ResponseProtocol)other);
if (other instanceof CIMResProtocol) {
return mergeFrom((CIMResProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(ResponseProtocol other) {
if (other == ResponseProtocol.getDefaultInstance()) return this;
public Builder mergeFrom(CIMResProtocol other) {
if (other == CIMResProtocol.getDefaultInstance()) return this;
if (other.hasResponseId()) {
setResponseId(other.getResponseId());
}
... ... @@ -502,11 +503,11 @@ public final class BaseResponseProto {
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
ResponseProtocol parsedMessage = null;
CIMResProtocol parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (ResponseProtocol) e.getUnfinishedMessage();
parsedMessage = (CIMResProtocol) e.getUnfinishedMessage();
throw e.unwrapIOException();
} finally {
if (parsedMessage != null) {
... ... @@ -517,34 +518,34 @@ public final class BaseResponseProto {
}
private int bitField0_;
private int responseId_ ;
private long responseId_ ;
/**
* <code>required int32 responseId = 2;</code>
* <code>required int64 responseId = 2;</code>
*/
public boolean hasResponseId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required int32 responseId = 2;</code>
* <code>required int64 responseId = 2;</code>
*/
public int getResponseId() {
public long getResponseId() {
return responseId_;
}
/**
* <code>required int32 responseId = 2;</code>
* <code>required int64 responseId = 2;</code>
*/
public Builder setResponseId(int value) {
public Builder setResponseId(long value) {
bitField0_ |= 0x00000001;
responseId_ = value;
onChanged();
return this;
}
/**
* <code>required int32 responseId = 2;</code>
* <code>required int64 responseId = 2;</code>
*/
public Builder clearResponseId() {
bitField0_ = (bitField0_ & ~0x00000001);
responseId_ = 0;
responseId_ = 0L;
onChanged();
return this;
}
... ... @@ -635,49 +636,49 @@ public final class BaseResponseProto {
}
// @@protoc_insertion_point(builder_scope:protocol.ResponseProtocol)
// @@protoc_insertion_point(builder_scope:protocol.CIMResProtocol)
}
// @@protoc_insertion_point(class_scope:protocol.ResponseProtocol)
private static final ResponseProtocol DEFAULT_INSTANCE;
// @@protoc_insertion_point(class_scope:protocol.CIMResProtocol)
private static final CIMResProtocol DEFAULT_INSTANCE;
static {
DEFAULT_INSTANCE = new ResponseProtocol();
DEFAULT_INSTANCE = new CIMResProtocol();
}
public static ResponseProtocol getDefaultInstance() {
public static CIMResProtocol getDefaultInstance() {
return DEFAULT_INSTANCE;
}
@Deprecated public static final com.google.protobuf.Parser<ResponseProtocol>
PARSER = new com.google.protobuf.AbstractParser<ResponseProtocol>() {
public ResponseProtocol parsePartialFrom(
@Deprecated public static final com.google.protobuf.Parser<CIMResProtocol>
PARSER = new com.google.protobuf.AbstractParser<CIMResProtocol>() {
public CIMResProtocol parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new ResponseProtocol(input, extensionRegistry);
return new CIMResProtocol(input, extensionRegistry);
}
};
public static com.google.protobuf.Parser<ResponseProtocol> parser() {
public static com.google.protobuf.Parser<CIMResProtocol> parser() {
return PARSER;
}
@Override
public com.google.protobuf.Parser<ResponseProtocol> getParserForType() {
public com.google.protobuf.Parser<CIMResProtocol> getParserForType() {
return PARSER;
}
public ResponseProtocol getDefaultInstanceForType() {
public CIMResProtocol getDefaultInstanceForType() {
return DEFAULT_INSTANCE;
}
}
private static final com.google.protobuf.Descriptors.Descriptor
internal_static_protocol_ResponseProtocol_descriptor;
internal_static_protocol_CIMResProtocol_descriptor;
private static final
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
internal_static_protocol_ResponseProtocol_fieldAccessorTable;
internal_static_protocol_CIMResProtocol_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
... ... @@ -687,10 +688,10 @@ public final class BaseResponseProto {
descriptor;
static {
String[] descriptorData = {
"\n\027BaseResponseProto.proto\022\010protocol\"6\n\020R" +
"esponseProtocol\022\022\n\nresponseId\030\002 \002(\005\022\016\n\006r" +
"esMsg\030\001 \002(\tB;\n&com.crossoverjie.netty.ac" +
"tion.protocolB\021BaseResponseProto"
"\n\027BaseResponseProto.proto\022\010protocol\"4\n\016C" +
"IMResProtocol\022\022\n\nresponseId\030\002 \002(\003\022\016\n\006res" +
"Msg\030\001 \002(\tB8\n$com.crossoverjie.cim.common" +
".protocolB\020CIMResponseProto"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
... ... @@ -704,11 +705,11 @@ public final class BaseResponseProto {
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
internal_static_protocol_ResponseProtocol_descriptor =
internal_static_protocol_CIMResProtocol_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_protocol_ResponseProtocol_fieldAccessorTable = new
internal_static_protocol_CIMResProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_protocol_ResponseProtocol_descriptor,
internal_static_protocol_CIMResProtocol_descriptor,
new String[] { "ResponseId", "ResMsg", });
}
... ...
package com.crossoverjie.netty.action.common.protocol;
package com.crossoverjie.cim.common.protocol;
import com.google.protobuf.InvalidProtocolBufferException;
... ... @@ -12,14 +12,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class ProtocolUtil {
public static void main(String[] args) throws InvalidProtocolBufferException {
BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder()
.setRequestId(123)
CIMRequestProto.CIMReqProtocol protocol = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(123L)
.setReqMsg("你好啊")
.build();
byte[] encode = encode(protocol);
BaseRequestProto.RequestProtocol parseFrom = decode(encode);
CIMRequestProto.CIMReqProtocol parseFrom = decode(encode);
System.out.println(protocol.toString());
System.out.println(protocol.toString().equals(parseFrom.toString()));
... ... @@ -30,7 +30,7 @@ public class ProtocolUtil {
* @param protocol
* @return
*/
public static byte[] encode(BaseRequestProto.RequestProtocol protocol){
public static byte[] encode(CIMRequestProto.CIMReqProtocol protocol){
return protocol.toByteArray() ;
}
... ... @@ -40,7 +40,7 @@ public class ProtocolUtil {
* @return
* @throws InvalidProtocolBufferException
*/
public static BaseRequestProto.RequestProtocol decode(byte[] bytes) throws InvalidProtocolBufferException {
return BaseRequestProto.RequestProtocol.parseFrom(bytes);
public static CIMRequestProto.CIMReqProtocol decode(byte[] bytes) throws InvalidProtocolBufferException {
return CIMRequestProto.CIMReqProtocol.parseFrom(bytes);
}
}
... ...
package com.crossoverjie.netty.action.common.req;
package com.crossoverjie.cim.common.req;
import io.swagger.annotations.ApiModelProperty;
... ...
package com.crossoverjie.netty.action.common.res;
package com.crossoverjie.cim.common.res;
import com.crossoverjie.netty.action.common.util.StringUtil;
import com.crossoverjie.netty.action.common.enums.StatusEnum;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.util.StringUtil;
import java.io.Serializable;
... ...
package com.crossoverjie.netty.action.common.res;
package com.crossoverjie.cim.common.res;
/**
* Function:空对象,用在泛型中,表示没有额外的请求参数或者返回参数
... ...
package com.crossoverjie.netty.action.common.util;
package com.crossoverjie.cim.common.util;
import java.util.Random;
import java.util.UUID;
... ...
package com.crossoverjie.cim.common.enums;
import org.junit.Test;
import java.util.Map;
public class SystemCommandEnumTypeTest {
@Test
public void getAllStatusCode() throws Exception {
Map<String, String> allStatusCode = SystemCommandEnumType.getAllStatusCode();
for (Map.Entry<String, String> stringStringEntry : allStatusCode.entrySet()) {
String key = stringStringEntry.getKey();
String value = stringStringEntry.getValue();
System.out.println(key + "----->" + value);
}
}
}
\ No newline at end of file
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cim</artifactId>
<groupId>com.crossoverjie.netty</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cim-forward-route</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<swagger.version>2.5.0</swagger.version>
<logback.version>1.2.3</logback.version>
</properties>
<dependencies>
<dependency>
<groupId>com.crossoverjie.netty</groupId>
<artifactId>cim-common</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- spring-boot-maven-plugin (提供了直接运行项目的插件:如果是通过parent方式继承spring-boot-starter-parent则不用此插件) -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
... ...
package com.crossoverjie.netty.action;
package com.crossoverjie.cim.route;
import com.crossoverjie.cim.route.kit.ServerListListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
... ... @@ -9,13 +11,21 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @author crossoverJie
*/
@SpringBootApplication
public class HeartbeatServerApplication {
public class RouteApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatServerApplication.class);
private final static Logger LOGGER = LoggerFactory.getLogger(RouteApplication.class);
public static void main(String[] args) {
SpringApplication.run(HeartbeatServerApplication.class, args);
LOGGER.info("启动 Server 成功");
SpringApplication.run(RouteApplication.class, args);
LOGGER.info("启动 route 成功");
}
@Override
public void run(String... args) throws Exception {
//监听服务
Thread thread = new Thread(new ServerListListener());
thread.setName("zk-listener");
thread.start() ;
}
}
\ No newline at end of file
... ...
package com.crossoverjie.cim.route.cache;
import com.crossoverjie.cim.route.kit.ZKit;
import com.google.common.cache.LoadingCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* Function: 服务器节点缓存
*
* @author crossoverJie
* Date: 2018/8/19 01:31
* @since JDK 1.8
*/
@Component
public class ServerCache {
@Autowired
private LoadingCache<String, String> cache;
@Autowired
private ZKit zkUtil;
private AtomicLong index = new AtomicLong();
public void addCache(String key) {
cache.put(key, key);
}
/**
* 更新所有缓存/先删除 再新增
*
* @param currentChilds
*/
public void updateCache(List<String> currentChilds) {
cache.invalidateAll();
for (String currentChild : currentChilds) {
String key = currentChild.split("-")[1];
addCache(key);
}
}
/**
* 获取所有的服务列表
*
* @return
*/
public List<String> getAll() {
List<String> list = new ArrayList<>();
if (cache.size() == 0) {
List<String> allNode = zkUtil.getAllNode();
for (String node : allNode) {
String key = node.split("-")[1];
addCache(key);
}
}
for (Map.Entry<String, String> entry : cache.asMap().entrySet()) {
list.add(entry.getKey());
}
return list;
}
/**
* 选取服务器
*
* @return
*/
public String selectServer() {
List<String> all = getAll();
if (all.size() == 0) {
throw new RuntimeException("CIM 服务器可用服务列表为空");
}
Long position = index.incrementAndGet() % all.size();
if (position < 0) {
position = 0L;
}
return all.get(position.intValue());
}
}
... ...
package com.crossoverjie.cim.route.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/8/24 01:43
* @since JDK 1.8
*/
@Component
public class AppConfiguration {
@Value("${app.zk.root}")
private String zkRoot;
@Value("${app.zk.addr}")
private String zkAddr;
@Value("${server.port}")
private int port;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getZkRoot() {
return zkRoot;
}
public void setZkRoot(String zkRoot) {
this.zkRoot = zkRoot;
}
public String getZkAddr() {
return zkAddr;
}
public void setZkAddr(String zkAddr) {
this.zkAddr = zkAddr;
}
}
... ...
package com.crossoverjie.cim.route.config;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import okhttp3.OkHttpClient;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 00:25
* @since JDK 1.8
*/
@Configuration
public class BeanConfig {
@Autowired
private AppConfiguration appConfiguration ;
@Bean
public ZkClient buildZKClient(){
return new ZkClient(appConfiguration.getZkAddr(), 5000);
}
@Bean
public LoadingCache<String,String> buildCache(){
return CacheBuilder.newBuilder()
.build(new CacheLoader<String, String>() {
@Override
public String load(String s) throws Exception {
return null;
}
});
}
/**
* Redis bean
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate redisTemplate = new StringRedisTemplate(factory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* http client
* @return okHttp
*/
@Bean
public OkHttpClient okHttpClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10,TimeUnit.SECONDS)
.retryOnConnectionFailure(true);
return builder.build();
}
}
... ...
package com.crossoverjie.cim.route.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
/** 是否打开swagger **/
@ConditionalOnExpression("'${swagger.enable}' == 'true'")
public class SwaggerConfig {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.crossoverjie.cim.route.controller"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("cim-forward-route")
.description("cim-forward-route api")
.termsOfServiceUrl("http://crossoverJie.top")
.contact("crossoverJie")
.version("1.0.0")
.build();
}
}
\ No newline at end of file
... ...
package com.crossoverjie.cim.route.constant;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/9/10 14:07
* @since JDK 1.8
*/
public final class Constant {
/**
* 账号前缀
*/
public final static String ACCOUNT_PREFIX = "cim-account:";
/**
* 路由信息前缀
*/
public final static String ROUTE_PREFIX = "cim-route:";
/**
* 登录状态前缀
*/
public final static String LOGIN_STATUS_PREFIX = "login-status";
}
... ...
package com.crossoverjie.cim.route.controller;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.common.res.NULLBody;
import com.crossoverjie.cim.route.cache.ServerCache;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.req.P2PReqVO;
import com.crossoverjie.cim.route.vo.req.RegisterInfoReqVO;
import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
import com.crossoverjie.cim.route.vo.res.RegisterInfoResVO;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Map;
import java.util.Set;
/**
* Function:
*
* @author crossoverJie
* Date: 22/05/2018 14:46
* @since JDK 1.8
*/
@Controller
@RequestMapping("/")
public class RouteController {
private final static Logger LOGGER = LoggerFactory.getLogger(RouteController.class);
@Autowired
private ServerCache serverCache;
@Autowired
private AccountService accountService;
@Autowired
private UserInfoCacheService userInfoCacheService ;
@ApiOperation("群聊 API")
@RequestMapping(value = "groupRoute", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
LOGGER.info("msg=[{}]", groupReqVO.toString());
//获取所有的推送列表
Map<Long, CIMServerResVO> serverResVOMap = accountService.loadRouteRelated();
for (Map.Entry<Long, CIMServerResVO> cimServerResVOEntry : serverResVOMap.entrySet()) {
Long userId = cimServerResVOEntry.getKey();
CIMServerResVO value = cimServerResVOEntry.getValue();
if (userId.equals(groupReqVO.getUserId())){
//过滤掉自己
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
LOGGER.warn("过滤掉了发送者 userId={}",cimUserInfo.toString());
continue;
}
//推送消息
String url = "http://" + value.getIp() + ":" + value.getHttpPort() + "/sendMsg" ;
ChatReqVO chatVO = new ChatReqVO(userId,groupReqVO.getMsg()) ;
accountService.pushMsg(url,groupReqVO.getUserId(),chatVO);
}
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
// TODO: 2018/12/26 这些基于 HTTP 接口的远程通信都可以换为 SpringCloud
/**
* 私聊路由
*
* @param p2pRequest
* @return
*/
@ApiOperation("私聊 API")
@RequestMapping(value = "p2pRoute", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PReqVO p2pRequest) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
try {
//获取接收消息用户的路由信息
CIMServerResVO cimServerResVO = accountService.loadRouteRelatedByUserId(p2pRequest.getReceiveUserId());
//推送消息
String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort() + "/sendMsg" ;
//p2pRequest.getReceiveUserId()==>消息接收者的 userID
ChatReqVO chatVO = new ChatReqVO(p2pRequest.getReceiveUserId(),p2pRequest.getMsg()) ;
accountService.pushMsg(url,p2pRequest.getUserId(),chatVO);
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
}catch (CIMException e){
res.setCode(e.getErrorCode());
res.setMessage(e.getErrorMessage());
}
return res;
}
@ApiOperation("客户端下线")
@RequestMapping(value = "offLine", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<NULLBody> offLine(@RequestBody ChatReqVO groupReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
LOGGER.info("下线用户[{}]", cimUserInfo.toString());
accountService.offLine(groupReqVO.getUserId());
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
/**
* 获取一台 CIM server
*
* @return
*/
@ApiOperation("登录并获取服务器")
@RequestMapping(value = "login", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<CIMServerResVO> login(@RequestBody LoginReqVO loginReqVO) throws Exception {
BaseResponse<CIMServerResVO> res = new BaseResponse();
//登录校验
boolean login = accountService.login(loginReqVO);
if (login) {
String server = serverCache.selectServer();
String[] serverInfo = server.split(":");
CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
//保存路由信息
accountService.saveRouteInfo(loginReqVO,server);
res.setDataBody(vo);
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
} else {
res.setCode(StatusEnum.REPEAT_LOGIN.getCode());
res.setMessage(StatusEnum.REPEAT_LOGIN.getMessage());
}
return res;
}
/**
* 注册账号
*
* @return
*/
@ApiOperation("注册账号")
@RequestMapping(value = "registerAccount", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<RegisterInfoResVO> registerAccount(@RequestBody RegisterInfoReqVO registerInfoReqVO) throws Exception {
BaseResponse<RegisterInfoResVO> res = new BaseResponse();
long userId = System.currentTimeMillis();
RegisterInfoResVO info = new RegisterInfoResVO(userId, registerInfoReqVO.getUserName());
info = accountService.register(info);
res.setDataBody(info);
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
/**
* 注册账号
*
* @return
*/
@ApiOperation("获取所有在线用户")
@RequestMapping(value = "onlineUser", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception {
BaseResponse<Set<CIMUserInfo>> res = new BaseResponse();
Set<CIMUserInfo> cimUserInfos = userInfoCacheService.onlineUser();
res.setDataBody(cimUserInfos) ;
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
}
... ...
package com.crossoverjie.cim.route.kit;
import com.crossoverjie.cim.route.config.AppConfiguration;
import com.crossoverjie.cim.route.util.SpringBeanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 00:35
* @since JDK 1.8
*/
public class ServerListListener implements Runnable{
private static Logger logger = LoggerFactory.getLogger(ServerListListener.class);
private ZKit zkUtil;
private AppConfiguration appConfiguration ;
public ServerListListener() {
zkUtil = SpringBeanFactory.getBean(ZKit.class) ;
appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
}
@Override
public void run() {
//注册监听服务
zkUtil.subscribeEvent(appConfiguration.getZkRoot());
}
}
... ...
package com.crossoverjie.cim.route.kit;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.cim.route.cache.ServerCache;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Function: Zookeeper 工具
*
* @author crossoverJie
* Date: 2018/8/19 00:33
* @since JDK 1.8
*/
@Component
public class ZKit {
private static Logger logger = LoggerFactory.getLogger(ZKit.class);
@Autowired
private ZkClient zkClient;
@Autowired
private ServerCache serverCache ;
/**
* 监听事件
*
* @param path
*/
public void subscribeEvent(String path) {
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
logger.info("清除/更新本地缓存 parentPath=【{}】,currentChilds=【{}】", parentPath,currentChilds.toString());
//更新所有缓存/先删除 再新增
serverCache.updateCache(currentChilds) ;
}
});
}
/**
* 获取所有注册节点
* @return
*/
public List<String> getAllNode(){
List<String> children = zkClient.getChildren("/route");
logger.info("查询所有节点成功=【{}】", JSON.toJSONString(children));
return children;
}
}
... ...
package com.crossoverjie.cim.route.service;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
import com.crossoverjie.cim.route.vo.res.RegisterInfoResVO;
import java.util.Map;
/**
* Function: 账户服务
*
* @author crossoverJie
* Date: 2018/12/23 21:57
* @since JDK 1.8
*/
public interface AccountService {
/**
* 注册用户
* @param info 用户信息
* @return
* @throws Exception
*/
RegisterInfoResVO register(RegisterInfoResVO info) throws Exception;
/**
* 登录服务
* @param loginReqVO 登录信息
* @return true 成功 false 失败
* @throws Exception
*/
boolean login(LoginReqVO loginReqVO) throws Exception ;
/**
* 保存路由信息
* @param msg 服务器信息
* @param loginReqVO 用户信息
* @throws Exception
*/
void saveRouteInfo(LoginReqVO loginReqVO ,String msg) throws Exception ;
/**
* 加载所有用户的路有关系
* @return 所有的路由关系
*/
Map<Long,CIMServerResVO> loadRouteRelated() ;
/**
* 获取某个用户的路有关系
* @param userId
* @return 获取某个用户的路有关系
*/
CIMServerResVO loadRouteRelatedByUserId(Long userId) ;
/**
* 推送消息
* @param url url
* @param groupReqVO 消息
* @param sendUserId 发送者的ID
* @throws Exception
*/
void pushMsg(String url,long sendUserId ,ChatReqVO groupReqVO) throws Exception;
/**
* 用户下线
* @param userId 下线用户ID
* @throws Exception
*/
void offLine(Long userId) throws Exception;
}
... ...
package com.crossoverjie.cim.route.service;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import java.util.Set;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/24 11:06
* @since JDK 1.8
*/
public interface UserInfoCacheService {
/**
* 通过 userID 获取用户信息
* @param userId 用户唯一 ID
* @return
* @throws Exception
*/
CIMUserInfo loadUserInfoByUserId(Long userId) ;
/**
* 保存和检查用户登录情况
* @param userId userId 用户唯一 ID
* @return true 为可以登录 false 为已经登录
* @throws Exception
*/
boolean saveAndCheckUserLoginStatus(Long userId) throws Exception ;
/**
* 清除用户的登录状态
* @param userId
* @throws Exception
*/
void removeLoginStatus(Long userId) throws Exception ;
/**
*
* @return 获取所有在线用户
*/
Set<CIMUserInfo> onlineUser() ;
}
... ...
package com.crossoverjie.cim.route.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
import com.crossoverjie.cim.route.vo.res.RegisterInfoResVO;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import static com.crossoverjie.cim.common.enums.StatusEnum.OFF_LINE;
import static com.crossoverjie.cim.route.constant.Constant.ACCOUNT_PREFIX;
import static com.crossoverjie.cim.route.constant.Constant.ROUTE_PREFIX;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 21:58
* @since JDK 1.8
*/
@Service
public class AccountServiceRedisImpl implements AccountService {
private final static Logger LOGGER = LoggerFactory.getLogger(AccountServiceRedisImpl.class);
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private UserInfoCacheService userInfoCacheService ;
@Autowired
private OkHttpClient okHttpClient;
private MediaType mediaType = MediaType.parse("application/json");
@Override
public RegisterInfoResVO register(RegisterInfoResVO info) {
String key = ACCOUNT_PREFIX + info.getUserId();
String name = redisTemplate.opsForValue().get(info.getUserName());
if (null == name) {
//为了方便查询,冗余一份
redisTemplate.opsForValue().set(key, info.getUserName());
redisTemplate.opsForValue().set(info.getUserName(), key);
} else {
long userId = Long.parseLong(name.split(":")[1]);
info.setUserId(userId);
info.setUserName(info.getUserName());
}
return info;
}
@Override
public boolean login(LoginReqVO loginReqVO) throws Exception {
//再去Redis里查询
String key = ACCOUNT_PREFIX + loginReqVO.getUserId();
String userName = redisTemplate.opsForValue().get(key);
if (null == userName) {
return false;
}
if (!userName.equals(loginReqVO.getUserName())) {
return false;
}
//登录成功,保存登录状态
boolean status = userInfoCacheService.saveAndCheckUserLoginStatus(loginReqVO.getUserId());
if (status == false){
//重复登录
return false;
}
return true;
}
@Override
public void saveRouteInfo(LoginReqVO loginReqVO, String msg) throws Exception {
String key = ROUTE_PREFIX + loginReqVO.getUserId();
redisTemplate.opsForValue().set(key, msg);
}
@Override
public Map<Long, CIMServerResVO> loadRouteRelated() {
Map<Long, CIMServerResVO> routes = new HashMap<>(64);
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
ScanOptions options = ScanOptions.scanOptions()
.match(ROUTE_PREFIX + "*")
.build();
Cursor<byte[]> scan = connection.scan(options);
while (scan.hasNext()) {
byte[] next = scan.next();
String key = new String(next, StandardCharsets.UTF_8);
LOGGER.info("key={}", key);
parseServerInfo(routes, key);
}
return routes;
}
@Override
public CIMServerResVO loadRouteRelatedByUserId(Long userId) {
String value = redisTemplate.opsForValue().get(ROUTE_PREFIX + userId);
if (value == null){
throw new CIMException(OFF_LINE) ;
}
String[] server = value.split(":");
CIMServerResVO cimServerResVO = new CIMServerResVO(server[0], Integer.parseInt(server[1]), Integer.parseInt(server[2]));
return cimServerResVO;
}
private void parseServerInfo(Map<Long, CIMServerResVO> routes, String key) {
long userId = Long.valueOf(key.split(":")[1]);
String value = redisTemplate.opsForValue().get(key);
String[] server = value.split(":");
CIMServerResVO cimServerResVO = new CIMServerResVO(server[0], Integer.parseInt(server[1]), Integer.parseInt(server[2]));
routes.put(userId, cimServerResVO);
}
@Override
public void pushMsg(String url, long sendUserId, ChatReqVO groupReqVO) throws Exception {
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(sendUserId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg", cimUserInfo.getUserName() + ":【" + groupReqVO.getMsg() + "】");
jsonObject.put("userId", groupReqVO.getUserId());
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
}
@Override
public void offLine(Long userId) throws Exception {
//删除路由
redisTemplate.delete(ROUTE_PREFIX + userId) ;
//删除登录状态
userInfoCacheService.removeLoginStatus(userId);
}
}
... ...
package com.crossoverjie.cim.route.service.impl;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.crossoverjie.cim.route.constant.Constant.ACCOUNT_PREFIX;
import static com.crossoverjie.cim.route.constant.Constant.LOGIN_STATUS_PREFIX;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/24 11:06
* @since JDK 1.8
*/
@Service
public class UserInfoCacheServiceImpl implements UserInfoCacheService {
/**
* 本地缓存,为了防止内存撑爆,后期可换为 LRU。
*/
private final static Map<Long,CIMUserInfo> USER_INFO_MAP = new ConcurrentHashMap<>(64) ;
@Autowired
private RedisTemplate<String,String> redisTemplate ;
@Override
public CIMUserInfo loadUserInfoByUserId(Long userId) {
//优先从本地缓存获取
CIMUserInfo cimUserInfo = USER_INFO_MAP.get(userId);
if (cimUserInfo != null){
return cimUserInfo ;
}
//load redis
String sendUserName = redisTemplate.opsForValue().get(ACCOUNT_PREFIX + userId);
if (sendUserName != null){
cimUserInfo = new CIMUserInfo(userId,sendUserName) ;
USER_INFO_MAP.put(userId,cimUserInfo) ;
}
return cimUserInfo;
}
@Override
public boolean saveAndCheckUserLoginStatus(Long userId) throws Exception {
Long add = redisTemplate.opsForSet().add(LOGIN_STATUS_PREFIX, userId.toString());
if (add == 0){
return false ;
}else {
return true ;
}
}
@Override
public void removeLoginStatus(Long userId) throws Exception {
redisTemplate.opsForSet().remove(LOGIN_STATUS_PREFIX,userId.toString()) ;
}
@Override
public Set<CIMUserInfo> onlineUser() {
Set<CIMUserInfo> set = null ;
Set<String> members = redisTemplate.opsForSet().members(LOGIN_STATUS_PREFIX);
for (String member : members) {
if (set == null){
set = new HashSet<>(64) ;
}
CIMUserInfo cimUserInfo = loadUserInfoByUserId(Long.valueOf(member)) ;
set.add(cimUserInfo) ;
}
return set;
}
}
... ...
package com.crossoverjie.netty.action.util;
package com.crossoverjie.cim.route.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
... ...
package com.crossoverjie.cim.route.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
/**
* Function: Google Protocol 编解码发送
*
* @author crossoverJie
* Date: 2018/05/21 15:56
* @since JDK 1.8
*/
public class ChatReqVO extends BaseRequest {
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "userId", example = "1545574049323")
private Long userId ;
@NotNull(message = "msg 不能为空")
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
public ChatReqVO() {
}
public ChatReqVO(Long userId, String msg) {
this.userId = userId;
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "GroupReqVO{" +
"userId=" + userId +
", msg='" + msg + '\'' +
"} " + super.toString();
}
}
... ...
package com.crossoverjie.cim.route.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 22:30
* @since JDK 1.8
*/
public class LoginReqVO extends BaseRequest{
private Long userId ;
private String userName ;
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "LoginReqVO{" +
"userId=" + userId +
", userName='" + userName + '\'' +
"} " + super.toString();
}
}
... ...
package com.crossoverjie.cim.route.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
/**
* Function: 单聊请求
*
* @author crossoverJie
* Date: 2018/05/21 15:56
* @since JDK 1.8
*/
public class P2PReqVO extends BaseRequest {
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "消息发送者的 userId", example = "1545574049323")
private Long userId ;
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "消息接收者的 userId", example = "1545574049323")
private Long receiveUserId ;
@NotNull(message = "msg 不能为空")
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
public P2PReqVO() {
}
public P2PReqVO(Long userId, Long receiveUserId, String msg) {
this.userId = userId;
this.receiveUserId = receiveUserId;
this.msg = msg;
}
public Long getReceiveUserId() {
return receiveUserId;
}
public void setReceiveUserId(Long receiveUserId) {
this.receiveUserId = receiveUserId;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "GroupReqVO{" +
"userId=" + userId +
", msg='" + msg + '\'' +
"} " + super.toString();
}
}
... ...
package com.crossoverjie.cim.route.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 22:04
* @since JDK 1.8
*/
public class RegisterInfoReqVO extends BaseRequest {
@NotNull(message = "用户名不能为空")
@ApiModelProperty(required = true, value = "userName", example = "zhangsan")
private String userName ;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "RegisterInfoReqVO{" +
"userName='" + userName + '\'' +
"} " + super.toString();
}
}
... ...
package com.crossoverjie.netty.action.client.vo.req;
package com.crossoverjie.cim.route.vo.req;
import com.crossoverjie.netty.action.common.req.BaseRequest;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
... ...
package com.crossoverjie.cim.route.vo.res;
import java.io.Serializable;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 00:43
* @since JDK 1.8
*/
public class CIMServerResVO implements Serializable {
private String ip ;
private Integer cimServerPort;
private Integer httpPort;
public CIMServerResVO(String ip, Integer cimServerPort, Integer httpPort) {
this.ip = ip;
this.cimServerPort = cimServerPort;
this.httpPort = httpPort;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Integer getCimServerPort() {
return cimServerPort;
}
public void setCimServerPort(Integer cimServerPort) {
this.cimServerPort = cimServerPort;
}
public Integer getHttpPort() {
return httpPort;
}
public void setHttpPort(Integer httpPort) {
this.httpPort = httpPort;
}
}
... ...
package com.crossoverjie.cim.route.vo.res;
import java.io.Serializable;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 21:54
* @since JDK 1.8
*/
public class RegisterInfoResVO implements Serializable{
private Long userId ;
private String userName ;
public RegisterInfoResVO(Long userId, String userName) {
this.userId = userId;
this.userName = userName;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "RegisterInfo{" +
"userId=" + userId +
", userName='" + userName + '\'' +
'}';
}
}
... ...
spring.application.name=cim-forward-route
# web port
server.port=8083
# 是否打开swagger
swagger.enable = true
logging.level.root=info
# 关闭健康检查权限
management.security.enabled=false
# zk 地址
app.zk.addr=47.98.194.60:2182
# zk 注册根节点
app.zk.root=/route
# Redis 配置
spring.redis.host=47.98.194.60
spring.redis.port=6379
spring.redis.pool.max-active=100
spring.redis.pool.max-idle=100
spring.redis.pool.max-wait=1000
spring.redis.pool.min-idle=10
... ...