基于linux下的局域网聊天
本项目是在linux环境下,利用网络编程,多线程等模拟出来的聊天程序,只要有客户端请求连接,就建立线程向服务器发送消息。服务器的监听到客户的消息便给其做出对应处理。
本项目主要由服务器端和客户端组成。各自的功能不同。
服务器端:绑定和监听目的端口;为每个客户建立连接,利用多线程实现并行,对客户的请求作出相应处理;利用socket_pair进行线程间的数据交互。主线程只接受客户连接并将套接字给子线程监听个数最少的,子线程用libevent实现i/o复用监听客户端套接字,利用MVC模式判断客户的消息类型进而作出相应的视图处理。Mysql数据库存储用户信息和离线消息。
客户端:请求与服务器端口的连接;建立多线程,选择服务类型,根据选择的类型调用相应的处理函数,完成请求。用json完成向服务器发送和接受数据。每个客户登陆成功就会启动一个线程接收服务器的消息。
客户端和服务器之间的通信:通过双方约定的消息协议。利用json包进行数据交互。
服务器端:
主线程:构造服务器对象,创建libevent,监听服务器的监听套接字;创建pth_nuum个socker_pair,加入到服务器存储的socker_pair_vector,将socket_pair的0端处理为绑定事件发生的回调函数socketpair_cb(),全部事件加入到主线程libevent,而在socketpair_cb()中只接受子线程发送过来的当前其对应的监听个数,收到后更新map;创建内核-1个子线程,利用socketpair进行主线程和子线程间的通信,所有子线程存储在子线程的vector中,创建子线程对象时在socketpair的1端传给子线程。之后的子线程用1端和主线程通信;socketpair的0端加入到服务器对象map中,map表中的每个键值对value初值为0;主线程的libevent有事件发生时,调用相应的回调函数,如果监听的套接字上有事件发生,说明有客户端和服务器进行连接,主线程调用监听套接字的相应回调函数。在listen_cb()中accept新客户端,产生套接字,随后查看服务器的map表,选择监听最少的子线程,map的0端与子线程进行通信,将新产生的客户端套接字发送给相应子线程(如果sp_fd上事件发生那说明子线程给主线程发送消息,此时主线程调用响应的socketpair_cb());和客户端进行通信,客户端的请求交给子线程处理,主线程只判断客户端的连接。子线程根据客户端的消息类型做相应处理。
Tcpsever.cpp
void listen_cb(int fd,short event,void *arg)
{
Tcpsever *This = (Tcpsever*)arg;
//接受用户链接
struct sockaddr_in caddr;
socklen_t len = sizeof(caddr);
int clifd = accept(This->_listen_fd,(struct sockaddr*)&caddr,&len);
//查找当前监听数量最少的子线程
map<int,int>::iterator it = (This->_pth_work_num).begin();
int min = it->second;
map<int,int>::iterator itm = it;
for(;it != (This->_pth_work_num).end();++it)
{
if(it->second < min)
{
min = it->second;
itm = it;
}
}
int sp_fd = itm->first;
//将客户端套接子通过socktpair发给子线程
char p[10] = {0};
sprintf(p,"%d",clifd);
send(sp_fd,p,strlen(p),0);
}
void sock_pair_1_cb(int fd,short event,void *arg)
{
Tcpsever *This = (Tcpsever *)arg;
//读取管道内容
char buff[256] = {0};
recv(fd,buff,255,0);
int num = atoi(buff);
//更新到map表_pth_work_num ----->fd
This->_pth_work_num[fd] = num;
}
Tcpsever::Tcpsever(char *ip,short port,int pth_num)
{
///创建服务器
_listen_fd = socket(AF_INET,SOCK_STREAM,0);
assert(_listen_fd != -1);
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(port);
saddr.sin_addr.s_addr = inet_addr(ip);
if(-1 == bind(_listen_fd,(struct sockaddr*)&saddr,sizeof(saddr)))
{
cerr<<"bind fail."<<endl;
return ;
}
listen(_listen_fd,5);
_pth_num = pth_num;
//给libevent申请空间
_base=event_base_new();
//创建事件,绑定监听套接子的回调函数(listen_cb)
Struct event *listen_event
= event_new(_base,_listen_fd,EV_READ|EV_PERSIST,listen_cb,this);
event_add(listen_event,NULL);
}
void Tcpsever::run()
{
//申请socketpair
this->get_sock_pair();
//创建线程
this->get_pthread();
//规定 int arr[2] arr[0]<=>主线程占用 arr[1]<=>子线程占用
//循环监听
event_base_dispatch(this->_base);
}
void Tcpsever::get_sock_pair()
{
for(int i = 0;i < _pth_num;i++ )
{
//申请双向管道
int arr[2]={0};
if(-1 == socketpair(AF_UNIX,SOCK_STREAM,0,arr))
{
cerr<<"socketpair create fail."<<endl;
return ;
}
//将双向管道加入到_sock_pair.push_back();
_socket_pair.push_back(arr);
struct event *sp_event = event_new(_base,arr[0],EV_READ|EV_PERSIST,listen_cb,this);
//将事件添加到事件列表
event_add(sp_event,NULL);
_pth_work_num.insert(make_pair(arr[0],0));
}
}
void Tcpsever::get_pthread()
{//开辟线程
for(int i = 0; i< _pth_num; i++)
{
_pthread.push_back(new Pthread(_socket_pair[i][1]));
}
}
Pthread.cpp
Pthread::Pthread(int sock_fd)
{
_sock_fd = sock_fd;//socketpair1
if(0 != pthread_create(&_pthread,NULL,pthread_run,this))
{
cerr<<"pthread create fail."<<endl;
return ;
}
}
void *pthread_run(void *arg)
{
Pthread *This = (Pthread *)arg;
//将sock_pair_1加入到libevent sock_pair_1_cb()
This->_base = event_base_new();
struct event*listen_event
= event_new(This->_base,This->_sock_fd,EV_READ|EV_PERSIST,sockpair_cb,This);
assert(NULL!=listen_event);
//将事件添加到事件列表
event_add(listen_event,NULL);
//循环监听
event_base_dispatch(This->_base);
}
void sockpair_cb(int fd,short event,void *arg)
{
Pthread *This = (Pthread *)arg;
//recv -> client_fd
char buff[256] = {0};
if(-1 == recv(This->_sock_fd,buff,255,0))
{
cerr<<"recv fail."<<endl;
return ;
}
int cli_fd = atoi(buff);
//将client_fd加入到libevent client_cb()
struct event* cli_event
= event_new(This->_base,cli_fd,EV_READ|EV_PERSIST,client_cb,This);
assert(NULL != cli_event);
event_add(cli_event,NULL);
///给主线程回复当前监听的客户端数量
//map->first->sort,send()
This->_event_map.insert(make_pair(cli_fd,cli_event));
memset(buff,0,256);
int listen_num = This->_event_map.size();
send(This->_sock_fd,buff,strlen(buff),0);
}
void client_cb(int fd,short event,void *arg)
{
Pthread *This = (Pthread *)arg;
//recv ->buff
char buff[2048]={0};
if(-1 == recv(fd,buff,2047,0))
{
cerr<<"recv fali."<<endl;
return ;
}
Json::Value val;
Json::Reader read;
read.parse(buff,val);
if(REASON_TYPE_EXIT == val["reason_type"].asInt())
{
event_free(This->_event_map[fd]);
This->_event_map.erase(This->_event_map.find(fd));
}
control_sever.process(fd,buff);
}
Control.cpp
Control::Control()
{
_map.insert(make_pair(REASON_TYPE_REGISTER,new Register));
_map.insert(make_pair(REASON_TYPE_LOGIN,new Login));
_map.insert(make_pair(REASON_TYPE_LIST,new List));
_map.insert(make_pair(REASON_TYPE_TALK,new Talk));
_map.insert(make_pair(REASON_TYPE_GROUP,new Group));
_map.insert(make_pair(REASON_TYPE_EXIT,new Exit));
}
//观察者(查找与json信息匹配的视图)
void Control::process(int fd,char* json)
{
//解析json ,获取消息类型
Json::Value val;
Json::Reader read;
if(-1==read.parse(json,val))
{
cerr<<"read fail."<<endl;
return ;
}
MSG_TYPE message = (MSG_TYPE)val["reason_type"].asInt();//获取客户端消息类型
//根据消息类型在map中查找
map<int,View*>::iterator it = _map.find(message);
//判断是否找到
if(it != _map.end())
{
it->second->process(fd,json);
it->second->response();
}
else
{
cout<<"not find message type."<<endl;
}
//it->second->response();
}
子线程:
构造子线程类,产生子线程对象,子线程有自己的libevent和自己的监听客户端套接字个数;子线程在构造生成子线程对象时,传进来socketpair 1端,此时的1端加入到子线程libevent中监听;libevent监听的socketpair有事件发生时,说明服务器主线程要通信,此时调socketpair_cb(),在回调函数中就接收到主线程发送过来的客户端fd,该函数将fd加入到libevent中监听,函数将自己监听的客户端num发送给主线程;子线程监听的客户端套接字,客户端有消息发送过来时,子线程对消息json包进行解析;子线程的libevent监听到客户端套接字发生事件时,调用client_cb(),此时在这个函数中对消息json包进行解析,得到消息类型,子线程根据客户端的消息类型做相应处理。
Register.cpp
void Register::process(int fd,char* json)
{
_fd = fd;
//解析 json
Json::Value val;
Json::Reader read;
if(-1==read.parse(json,val))
{
cerr<<"read fail."<<endl;
return ;
}
string name = val["name"].asString();
string pw = val["pw"].asString();
//在数据库中查找name有没有重复?
char cmd[100] = "SELECT * FROM user WHERE name=';";
strcat(cmd,name.c_str());
strcat(cmd,"';");
if(mysql_real_query(Mysql_sever._mpcon,cmd,strlen(cmd)))
{
cerr<<"select fail."<<endl;
return ;
}
Mysql_sever._mp_res = mysql_store_result(Mysql_sever._mpcon);
Mysql_sever._mp_row = mysql_fetch_row(Mysql_sever._mp_res);
if(Mysql_sever._mp_row == NULL)//用户未注册过
{
//将name pw 加入到数据库的user表
char cmd1[100] = "INSERT INTO user VALUE ('";
strcat(cmd1,name.c_str());
strcat(cmd1,"','");
strcat(cmd1,pw.c_str());
strcat(cmd1,"');");
if(mysql_real_query(Mysql_sever._mpcon,cmd1,strlen(cmd1)))
{
cout<<"insert fail."<<endl;
return ;
}
_str = "register sucess!";
}
else
{//注册过
_str = "register fail";
}
}
void Register::response()
{
//json打包后发给客户端,客户端解析,太麻烦,直接发送内容
send(_fd,_str.c_str(),strlen(_str.c_str()),0);
}
Talk.cpp
void Talk::process(int fd,char* json)
{
_fd = fd;
//解析 json
Json::Value val;
val["realson_type"] = REASON_TYPE_TALK;
Json::Value root;
Json::Reader read;
if(-1==read.parse(json,val))
{
cerr<<"read fail."<<endl;
return ;
}
string sendname = root["sendname"].asString();
val["sendname"] = sendname.c_str();
string recvname = root["name"].asString();
string recvmsg = root["message"].asString();
char cmd[100] = "SELECT * FROM online WHERE name='";
strcat(cmd,recvname.c_str());
strcat(cmd,"';");
if(mysql_real_query(Mysql_sever._mpcon,cmd,strlen(cmd)))
{
cerr<<"user not online."<<endl;
return ;
}
Mysql_sever._mp_res = mysql_store_result(Mysql_sever._mpcon);
if(Mysql_sever._mp_row = mysql_fetch_row(Mysql_sever._mp_res))
{
char buff[2048] = {0};
strcpy(buff,recvname.c_str());
strcat(buff,"speack::");
strcat(buff,recvmsg.c_str());
Json::Value message;
message["reason_type"] = 1024;
message["message"] = buff;
if(-1 == send(atoi(Mysql_sever._mp_row[0]),message.toStyledString().c_str(),strlen(message.toStyledString().c_str()),0))
{
cerr<<"send fail."<<endl;
return ;
}
_str = "person talk success.";
}
else
{
char cmd1[100] = "SELECT * FROM user WHERE name='";
strcat(cmd1,sendname.c_str());
strcat(cmd1,"';");
if(mysql_real_query(Mysql_sever._mpcon,cmd1,strlen(cmd1)))
{
cerr<<"select fail."<<endl;
return ;
}
Mysql_sever._mp_res = mysql_store_result(Mysql_sever._mpcon);
if(Mysql_sever._mp_row = mysql_fetch_row(Mysql_sever._mp_res))
{
char cmd2[2048] = "INSERT INTO offline VALUE('";
strcat(cmd2,sendname.c_str());
strcat(cmd2,"','");
strcat(cmd2,recvname.c_str());
strcat(cmd2,"','");
strcat(cmd2,recvmsg.c_str());
strcat(cmd2,"');");
if(mysql_real_query(Mysql_sever._mpcon,cmd2,strlen(cmd2)))
{
cerr<<"insert fail."<<endl;
return ;
}
_str = "person talk success.";
}
else
{
_str = "person talk fail";
}
}
}
void Talk::response()
{
//json打包后发给客户端,客户端解析,太麻烦,直接发送内容
send(_fd,_str.c_str(),strlen(_str.c_str()),0);
}