局域网聊天项目

要设计: 

    ​   本项目主要由服务器端和客户端两大模块组成,每个模块的主要功能有:
服务器端:

    ​    ​利用多线程实现并行,结合半同步半异步的网络模型完成线程之间的任务分工,利用socket_pair完成线程之间的数据交互

    ​    ​1>负责绑定与监听目标端口
    ​    ​2>为每个客户建立连接,并将套接字按照子线程压力分发给子线程监听,子线程利用libevent实现I/O复用监听客户端的套接字,利用MVC模式通过判断客户端的请求类型调用不同的视图处理用户请求。
    ​    ​3>循环接受并处理客户消息,并反馈给客户。
    ​    ​4>利用mysql数据库存储用户相关信息以及离线消息。

    ​    ​5>利用memcached实现数据库和内存之间的高速缓存。
客户端:
      1>请求与服务器端口的连接

       2>利用多线程实现,采取输入命令的方式,让用户选择服务类型,根据不同的服务类型调用不同的处理函数,完成请求

       3>利用json和自定义的上层协议完成和服务器端的数据的交互,

    ​   4>在登录成功后,启动一个线程接收服务器端的消息

详细设计:
服务器端主要由五个类组成:

    ​mysql类::进行数据库的连接与选择。

    ​tcpsever类::创建服务器,创建cpu内核数-1个子线程,创建线程个数个双向管道,接受用户连接,并将客户端套接字通过sock_pair发给当前监听数量最少的子线程(pthread)。

    ​pthread类::启动子线程,接收主线程传过来的客户端套接字,并给主线程回复当前监听的客户端数量,监听客户端套接字,并接受客户端发送的消息发送给控制台(control)。

    ​control类::解析客户端发来的数据,并根据其类型调用相关的视图(view)。

    ​view类::对发来的数据做出相关的处理。

客户端主要有主线程和子线程:

    ​主线程连接服务器,进入死循环,让用户选择服务(注册,登录,退出)

    ​登录之后进入子线程等待接收服务器发送来的消息。让用户选择接下来的服务内容。

    ​

最后进行了服务器的最大负载量的测试:

    ​在默认的打开文件描述符的情况下,最多连接客户端1021个,

    ​将默认个数改调后,发现最多客户端连接是28231个。

main::


int main(int argc,char *argv[])
{
 cout<<"main begin:"<<endl;
 if(argc < 4)
 {
  cout<<"error"<<endl;
  return 0;
 }
   
 //分离参数
    int port = atoi(argv[2]);
    char *ip = argv[1];
 short pth_num = atoi(argv[3]);
 Tcpsever sever(ip,port,pth_num);
 sever.run();
}


tcpsever::


int i = 0;
void listen_cb(int fd,short event,void *arg)
{
 i++;
 Tcpsever *ser = (Tcpsever*)arg;
 //接受用户链接
 struct sockaddr_in cli;
 socklen_t len = sizeof(cli);
 int cli_fd = accept(fd,(struct sockaddr*)&cli,&len);
 assert(cli_fd != -1);
 cout<<"one cli linked!"<<endl;
 cout<<"次数:"<<i<<endl;
 //查找当前监听数量最少的子线程
 map<int,int>::iterator it = ser->_pth_work_num.begin();
 int min_num = it->second; //最小个数
 int min_fd;//监听最少的对应的fd
 for(;it != ser->_pth_work_num.end();++it)
 {
  if(it->second <= min_num)
  {
   min_num = it->second;
   min_fd = it->first;
  }
 }
 //将客户端套接子通过socktpair发给子线程
 char sendbuff[128] = {0};
 sprintf(sendbuff,"%d",cli_fd);
 send(min_fd,sendbuff,strlen(sendbuff),0);
 
}

void sock_pair_cb(int fd,short event,void *arg)
{
 Tcpsever *ser = (Tcpsever*)arg;
 //读取管道内容
 char pairbuff[128] = {0};
 recv(fd,pairbuff,127,0);

 
 //更新到map表_pth_work_num  ----->fd
 map<int,int>::iterator it = ser->_pth_work_num.find(fd);
 it->second = atoi(pairbuff);
}

Tcpsever::Tcpsever(char *ip,short port,int pth_num)
{
 cout<<"tcpsever begin:"<<endl;
 ///创建服务器
   int _listen_fd = socket(AF_INET,SOCK_STREAM,0);
 if(-1 == _listen_fd)
 {
  cerr<<"socket fail;error:"<<errno<<endl;
  return;
 } 
 struct sockaddr_in ser;
 ser.sin_family = AF_INET;
 ser.sin_port = htons(port);
 ser.sin_addr.s_addr = inet_addr(ip);

 _pth_num = pth_num;

 if(-1 == bind(_listen_fd,(struct sockaddr*)&ser,sizeof(ser)))
 {
  cerr<<"bind fail;errno:"<<errno<<endl;
  throw "";
 }

 if(-1 == listen(_listen_fd,20))
 {
  cerr<<"listen fail;errno:"<<errno<<endl;
  throw "";
 }

 //给libevent申请空间
 _base = event_base_new();

 //创建事件,绑定监听套接子的回调函数(listen_cb)
 
 struct event *listen_event = event_new(_base,_listen_fd,EV_READ|EV_PERSIST,listen_cb,this);
 if(NULL == listen_event)
 {
  cerr<<"event new fail;errno:"<<errno<<endl;
  throw "";
 }

 //添加到事件列表
 event_add(listen_event,NULL);
 
 
}

void Tcpsever::run()
{
 //申请socketpair(函数自查)
 get_sock_pair();

 //创建线程//规定  int arr[2]  arr[0]<=>主线程占用   arr[1]<=>子线程占用
 get_pthread();

 //为主线程的socktpair创建事件,绑定回调函数(sock_pair_cb)
 int i;
 for(i=0; i<_pth_num; i++)
 {
  struct event* sock_event = event_new(_base,_socket_pair[i].arr[0],EV_READ|EV_PERSIST,sock_pair_cb,this);
  
  event_add(sock_event,NULL);
 } 

 event_base_dispatch(_base);

}

void Tcpsever::get_sock_pair()
{
 int i = 0;
 for(i = 0;i < _pth_num;i++ )
 {
  //申请双向管道
  int arr[2];
  if(-1 == socketpair(AF_UNIX,SOCK_STREAM,0,arr))
  {
   cerr<<"socketpair fail;errno:"<<errno<<endl;
   return;
  }
  //将双向管道加入到_sock_pair.push_back();
  _socket_pair.push_back(MVEC(arr));

  _pth_work_num.insert(make_pair(arr[0],0));
 }

}

void Tcpsever::get_pthread()
{
 //开辟线程
 for(int i = 0; i< _pth_num; i++)
 {
  _pthread.push_back(new Pthread(_socket_pair[i].arr[1]));
 }
}

Tcpsever::~Tcpsever()
{

}


pthread::


extern Control control_sever;
void *pthread_run(void *arg);
void sock_pair_1_cb(int fd,short event,void *arg);
void client_cb(int fd,short int event,void *arg);

Pthread::Pthread(int sock_fd)
{
 cout<<"Pthread begin:"<<endl;
 _sock_fd = sock_fd;
 _base = event_base_new();
    //启动线程
 pthread_create(&_pthread,NULL,pthread_run,this);
}

void* pthread_run(void *arg)
{
 Pthread *pth = (Pthread *)arg;
 //将sock_pair_1加入到libevent  sock_pair_1_cb()
 struct event* _event = event_new(pth->_base,pth->_sock_fd,EV_READ|EV_PERSIST,sock_pair_1_cb,arg);
 event_add(_event,NULL);
 pth->_event_map.insert(make_pair(pth->_sock_fd,_event));
 event_base_dispatch(pth->_base);
}

void sock_pair_1_cb(int fd,short int event,void *arg)
{
 Pthread *pth = (Pthread *)arg;
 //recv -> clien_fd
 char buff[128] = {0};
 if(0 >recv(fd,buff,127,0))
 {
  cout<<"recv fail;errno:"<<errno<<endl;
  exit(1);

 }
 int cli_fd = atoi(buff);

 //将client_fd加入到libevent     client_cb()
 struct event* cli_event = event_new(pth->_base,cli_fd,EV_READ|EV_PERSIST,client_cb,arg);
 if(NULL == cli_event)
 {
  cerr<<"client create fail;errno:"<<errno<<endl;
  exit(1);
 }

 event_add(cli_event,NULL);
 //给主线程回复当前监听的客户端数量
 //插入到map表中
 pth->_event_map.insert(make_pair(cli_fd,cli_event)); 
 int cli_fd_num = pth->_event_map.size();
 char buf[128] = {0};
 sprintf(buf,"%d",cli_fd_num);
 send(fd,buff,strlen(buff),0);

}

void client_cb(int fd,short event,void *arg)
{
 cout<<"pthread::recv"<<endl;
 Pthread *pth = (Pthread*)arg;
    //recv  ->buff
 char buff[128] = {0};
 if(0 > recv(fd,buff,127,0))
 {
  cout<<"revc buff fail;errno:"<<errno<<endl;
  return;
 }
 cout<<buff<<endl;
 control_sever.process(fd,buff);
 //判断退出并将客户端退出
 Json::Value val;
 Json::Reader read;
 if(-1 == read.parse(buff,val))
 {
  cerr<<"pthread::read fail;errno:"<<errno<<endl;
  return;
 }
 if(val["reason_type"] == REASON_TYPE_EXIT)
 {  
  map<int,struct event*>::iterator it =pth->_event_map.find(fd);
  if(pth->_event_map.end()!=it)
  {
   event_del(it->second);
   close(fd);
   pth->_event_map.erase(it);
  }
 }
}

Pthread::~Pthread()
{
}

control::


Control::Control()
{
 cout<<"control begin:"<<endl;
 _map.insert(make_pair(REASON_TYPE_REGISTER,new Register()));
 _map.insert(make_pair(REASON_TYPE_LOGIN,new Login()));
 _map.insert(make_pair(REASON_TYPE_LIST,new Get_list()));
 _map.insert(make_pair(REASON_TYPE_EXIT,new Exit()));
 _map.insert(make_pair(REASON_TYPE_TALK,new Talk_one()));
 _map.insert(make_pair(REASON_TYPE_GROUP,new Talk_group()));
}

void Control::process(int fd,char *json)
{
 Json::Value val;
 Json::Reader read;
 if(-1 == read.parse(json,val))
 {
  cerr<<"Control process json parse fail;errno:"<<errno<<endl;
  return;
 }

 map<int,View*>::iterator it = _map.find(val["reason_type"].asInt());
 if(it == _map.end())
 {
  cout<<"reason type not find!"<<endl;
 }
 else
 {
  it->second->process(fd,json);
 
  it->second->response();
 
 }

}

Control control_sever;


mysql::

Mysql::Mysql()
{
 _mpcon = mysql_init((MYSQL*)0);
 if(NULL == _mpcon)
 {
  cerr<<"_mpcon NULL;errno:"<<errno<<endl;
  exit(0);

 }

 //链接数据库
 if(!mysql_real_connect(_mpcon,"127.0.0.1","root","123456",NULL,3306,NULL,0))
 {
  cerr<<"mysql connect fail;errno:"<<errno<<endl;
  exit(0);
 }
 
 //选择数据库
 if(mysql_select_db(_mpcon,"chat"))
 {
  cerr<<"database select fail;errno:"<<errno<<endl;
  exit(0);
 }
 
}

Mysql::~Mysql()
{
 if(NULL != _mp_res)
 {
  mysql_free_result(_mp_res);
 }

 mysql_close(_mpcon);
}


Mysql Mysql_sever;


register::


extern Mysql Mysql_sever;

void Register::process(int fd,char* json)
{
 cout<<"register::begin"<<endl;
 _fd = fd;
 //解析json
 Json::Value val;
 Json::Reader read;
 if(-1 == read.parse(json,val))
 {
  cerr<<"read fail;errno:"<<errno<<endl;
  return;
 }
 
 //name pw
 //在数据库中查找name有没有重复
 char cmd[100] = "SELECT *FROM user WHERE name='";
    strcat(cmd,val["name"].asString().c_str());
 strcat(cmd,"';");
    cout<<cmd<<endl;
    if(mysql_real_query(Mysql_sever._mpcon,cmd,strlen(cmd)))
 {
  cerr<<" select fail ;errno:"<<errno<<endl;
  return;
 }
 Mysql_sever._mp_res = mysql_store_result(Mysql_sever._mpcon);
 if(!(Mysql_sever._mp_row = mysql_fetch_row(Mysql_sever._mp_res)))
 {
  //将name pw加入到数据库的user
  char cmd1[100] = "INSERT INTO user VALUE('";
  strcat(cmd1,val["name"].asString().c_str());
  strcat(cmd1,"','");
  strcat(cmd1,val["pw"].asString().c_str());
  strcat(cmd1,"');");
     cout<<cmd1<<endl;
  if(mysql_real_query(Mysql_sever._mpcon,cmd1,strlen(cmd1)))
  {
   cerr<<"insert fail;errno:"<<errno<<endl;
   return;
     }
  _str = "register success";
  
 }
 else
 {
    _str = "register fail";
 }
}
void Register::response()
{
    //用json将消息打包,发送给客户端
 Json::Value root;
 root["reason"] = _str;
    if(-1 == send(_fd,root.toStyledString().c_str(),strlen(root.toStyledString().c_str()),0))
 {
  cerr<<"register send reason fail;errno:"<<errno<<endl;
  return;
 }

}