基于linux下的局域网聊天



基于linux下的局域网聊天

本项目是在linux环境下,利用网络编程,多线程等模拟出来的聊天程序,只要有客户端请求连接,就建立线程向服务器发送消息。服务器的监听到客户的消息便给其做出对应处理。

本项目主要由服务器端和客户端组成。各自的功能不同。

服务器端:绑定和监听目的端口;为每个客户建立连接,利用多线程实现并行,对客户的请求作出相应处理;利用socket_pair进行线程间的数据交互。主线程只接受客户连接并将套接字给子线程监听个数最少的,子线程用libevent实现i/o复用监听客户端套接字,利用MVC模式判断客户的消息类型进而作出相应的视图处理。Mysql数据库存储用户信息和离线消息。

 

客户端:请求与服务器端口的连接;建立多线程,选择服务类型,根据选择的类型调用相应的处理函数,完成请求。用json完成向服务器发送和接受数据。每个客户登陆成功就会启动一个线程接收服务器的消息。

 

客户端和服务器之间的通信:通过双方约定的消息协议。利用json包进行数据交互。

 

服务器端:

主线程:构造服务器对象,创建libevent,监听服务器的监听套接字;创建pth_nuumsocker_pair,加入到服务器存储的socker_pair_vector,socket_pair0端处理为绑定事件发生的回调函数socketpair_cb(),全部事件加入到主线程libevent,而在socketpair_cb()中只接受子线程发送过来的当前其对应的监听个数,收到后更新map;创建内核-1个子线程,利用socketpair进行主线程和子线程间的通信,所有子线程存储在子线程的vector中,创建子线程对象时在socketpair1端传给子线程。之后的子线程用1端和主线程通信;socketpair0端加入到服务器对象map中,map表中的每个键值对value初值为0;主线程的libevent有事件发生时,调用相应的回调函数,如果监听的套接字上有事件发生,说明有客户端和服务器进行连接,主线程调用监听套接字的相应回调函数。在listen_cb()accept新客户端,产生套接字,随后查看服务器的map表,选择监听最少的子线程,map0端与子线程进行通信,将新产生的客户端套接字发送给相应子线程(如果sp_fd上事件发生那说明子线程给主线程发送消息,此时主线程调用响应的socketpair_cb());和客户端进行通信,客户端的请求交给子线程处理,主线程只判断客户端的连接。子线程根据客户端的消息类型做相应处理。

Tcpsever.cpp

void listen_cb(int fd,short event,void *arg)

{

Tcpsever *This = (Tcpsever*)arg;

 

//接受用户链接

struct sockaddr_in caddr;

    socklen_t len = sizeof(caddr);    

int clifd = accept(This->_listen_fd,(struct sockaddr*)&caddr,&len);

 

//查找当前监听数量最少的子线程

map<int,int>::iterator it = (This->_pth_work_num).begin();

int min = it->second;

map<int,int>::iterator itm = it;

for(;it != (This->_pth_work_num).end();++it)

{

if(it->second < min)

{

min = it->second;

itm = it;

}

}

int sp_fd = itm->first;

 

//将客户端套接子通过socktpair发给子线程

char p[10] = {0};

sprintf(p,"%d",clifd);

send(sp_fd,p,strlen(p),0);

}

 

void sock_pair_1_cb(int fd,short event,void *arg)

{

Tcpsever *This = (Tcpsever *)arg;

//读取管道内容

char buff[256] = {0};

recv(fd,buff,255,0);

 

int num = atoi(buff);

 

 //更新到map_pth_work_num  ----->fd

This->_pth_work_num[fd] = num;

}

 

Tcpsever::Tcpsever(char *ip,short port,int pth_num)

{

///创建服务器

_listen_fd = socket(AF_INET,SOCK_STREAM,0);

assert(_listen_fd != -1);

 

struct sockaddr_in saddr;

saddr.sin_family = AF_INET;

saddr.sin_port = htons(port);

saddr.sin_addr.s_addr = inet_addr(ip);

if(-1 == bind(_listen_fd,(struct sockaddr*)&saddr,sizeof(saddr)))

{

cerr<<"bind fail."<<endl;

return ;

}

 

listen(_listen_fd,5);

 

_pth_num = pth_num;

 

//libevent申请空间

  _base=event_base_new();

 

//创建事件,绑定监听套接子的回调函数(listen_cb)

Struct event *listen_event

= event_new(_base,_listen_fd,EV_READ|EV_PERSIST,listen_cb,this);

 

event_add(listen_event,NULL);

}

 

void Tcpsever::run()

{

//申请socketpair

this->get_sock_pair();

//创建线程

    this->get_pthread();

//规定  int arr[2]  arr[0]<=>主线程占用   arr[1]<=>子线程占用

//循环监听

event_base_dispatch(this->_base);

}

 

void Tcpsever::get_sock_pair()

{

for(int i = 0;i < _pth_num;i++ )

{

//申请双向管道

  int arr[2]={0};

    if(-1 == socketpair(AF_UNIX,SOCK_STREAM,0,arr))

{

     cerr<<"socketpair create fail."<<endl;

return ;

}

//将双向管道加入到_sock_pair.push_back();

_socket_pair.push_back(arr);

struct event *sp_event = event_new(_base,arr[0],EV_READ|EV_PERSIST,listen_cb,this);

//将事件添加到事件列表

event_add(sp_event,NULL);

_pth_work_num.insert(make_pair(arr[0],0));

}

}

 

void Tcpsever::get_pthread()

{//开辟线程

for(int i = 0; i< _pth_num; i++)

{

_pthread.push_back(new Pthread(_socket_pair[i][1]));

}

}

Pthread.cpp

Pthread::Pthread(int sock_fd)

{

_sock_fd = sock_fd;//socketpair1

if(0 != pthread_create(&_pthread,NULL,pthread_run,this))

{

cerr<<"pthread create fail."<<endl;

return ;

}

}

 

void *pthread_run(void *arg)

{

Pthread *This = (Pthread *)arg;

//sock_pair_1加入到libevent sock_pair_1_cb()

This->_base = event_base_new();

 

struct event*listen_event

= event_new(This->_base,This->_sock_fd,EV_READ|EV_PERSIST,sockpair_cb,This);

assert(NULL!=listen_event);

 

    //将事件添加到事件列表

   event_add(listen_event,NULL);

    

    //循环监听

   event_base_dispatch(This->_base);

 

}

 

void sockpair_cb(int fd,short event,void *arg)

{

 Pthread *This = (Pthread *)arg;

 

//recv -> client_fd

char buff[256] = {0};

if(-1 == recv(This->_sock_fd,buff,255,0))

{

cerr<<"recv fail."<<endl;

return ;

 }

 

int cli_fd = atoi(buff);

 //client_fd加入到libevent  client_cb()

    struct event* cli_event

= event_new(This->_base,cli_fd,EV_READ|EV_PERSIST,client_cb,This);

    assert(NULL != cli_event);

    event_add(cli_event,NULL);

///给主线程回复当前监听的客户端数量

//map->first->sort,send()

This->_event_map.insert(make_pair(cli_fd,cli_event));

 

memset(buff,0,256);

int listen_num = This->_event_map.size();

send(This->_sock_fd,buff,strlen(buff),0);

}

 

void client_cb(int fd,short event,void *arg)

{

Pthread *This = (Pthread *)arg;

//recv  ->buff

char buff[2048]={0};

    if(-1 == recv(fd,buff,2047,0))

    {

    cerr<<"recv fali."<<endl;

return ;

}

Json::Value val;

Json::Reader read;

 

read.parse(buff,val);

 

if(REASON_TYPE_EXIT == val["reason_type"].asInt())

{

event_free(This->_event_map[fd]);

This->_event_map.erase(This->_event_map.find(fd));

}

 

control_sever.process(fd,buff);

}

Control.cpp

Control::Control()

{

_map.insert(make_pair(REASON_TYPE_REGISTER,new Register));

_map.insert(make_pair(REASON_TYPE_LOGIN,new Login));

_map.insert(make_pair(REASON_TYPE_LIST,new List));

_map.insert(make_pair(REASON_TYPE_TALK,new Talk));

_map.insert(make_pair(REASON_TYPE_GROUP,new Group));

_map.insert(make_pair(REASON_TYPE_EXIT,new Exit));

}

 

 //观察者(查找与json信息匹配的视图)

void Control::process(int fd,char* json)

{

 //解析json ,获取消息类型

Json::Value val;

Json::Reader read;

if(-1==read.parse(json,val))

{

cerr<<"read fail."<<endl;

return ;

}

 

MSG_TYPE message = (MSG_TYPE)val["reason_type"].asInt();//获取客户端消息类型

 

//根据消息类型在map中查找

map<int,View*>::iterator it = _map.find(message);

//判断是否找到

if(it != _map.end())

{

it->second->process(fd,json);

it->second->response();

}

else

{

cout<<"not find message type."<<endl;

}

//it->second->response();

}

子线程:

构造子线程类,产生子线程对象,子线程有自己的libevent和自己的监听客户端套接字个数;子线程在构造生成子线程对象时,传进来socketpair 1端,此时的1端加入到子线程libevent中监听;libevent监听的socketpair有事件发生时,说明服务器主线程要通信,此时调socketpair_cb(),在回调函数中就接收到主线程发送过来的客户端fd,该函数将fd加入到libevent中监听,函数将自己监听的客户端num发送给主线程;子线程监听的客户端套接字,客户端有消息发送过来时,子线程对消息json包进行解析;子线程的libevent监听到客户端套接字发生事件时,调用client_cb(),此时在这个函数中对消息json包进行解析,得到消息类型,子线程根据客户端的消息类型做相应处理。

Register.cpp

void Register::process(int fd,char* json)

{

_fd = fd;

//解析 json

Json::Value val;

Json::Reader read;

if(-1==read.parse(json,val))

{

cerr<<"read fail."<<endl;

return ;

}

string name = val["name"].asString();

string pw = val["pw"].asString();

//在数据库中查找name有没有重复?

char cmd[100] = "SELECT * FROM user WHERE name=';";

strcat(cmd,name.c_str());

strcat(cmd,"';");

if(mysql_real_query(Mysql_sever._mpcon,cmd,strlen(cmd)))

{

cerr<<"select fail."<<endl;

return ;

}

Mysql_sever._mp_res = mysql_store_result(Mysql_sever._mpcon);

Mysql_sever._mp_row = mysql_fetch_row(Mysql_sever._mp_res);

if(Mysql_sever._mp_row == NULL)//用户未注册过

{

//name pw 加入到数据库的user

char cmd1[100] = "INSERT INTO user VALUE ('";

strcat(cmd1,name.c_str());

strcat(cmd1,"','");

strcat(cmd1,pw.c_str());

strcat(cmd1,"');");

if(mysql_real_query(Mysql_sever._mpcon,cmd1,strlen(cmd1)))

{

 cout<<"insert fail."<<endl;

 return ;

}

 _str = "register sucess!";

}

else

{//注册过

_str = "register fail";

}

}

 

void Register::response()

{

//json打包后发给客户端,客户端解析,太麻烦,直接发送内容

send(_fd,_str.c_str(),strlen(_str.c_str()),0);

}

Talk.cpp

void Talk::process(int fd,char* json)

{

_fd = fd;

//解析 json

Json::Value val;

val["realson_type"] = REASON_TYPE_TALK;

Json::Value root;

Json::Reader read;

if(-1==read.parse(json,val))

{

cerr<<"read fail."<<endl;

return ;

}

 

string sendname = root["sendname"].asString();

val["sendname"] = sendname.c_str();

string recvname = root["name"].asString();

string recvmsg = root["message"].asString();

 

char cmd[100] = "SELECT * FROM online WHERE name='";

strcat(cmd,recvname.c_str());

strcat(cmd,"';");

if(mysql_real_query(Mysql_sever._mpcon,cmd,strlen(cmd)))

{

cerr<<"user not online."<<endl;

return ;

}

Mysql_sever._mp_res = mysql_store_result(Mysql_sever._mpcon);

if(Mysql_sever._mp_row = mysql_fetch_row(Mysql_sever._mp_res))

{

char buff[2048] = {0};

strcpy(buff,recvname.c_str());

strcat(buff,"speack::");

strcat(buff,recvmsg.c_str());

Json::Value message;

message["reason_type"] = 1024;

message["message"] = buff;

 

if(-1 == send(atoi(Mysql_sever._mp_row[0]),message.toStyledString().c_str(),strlen(message.toStyledString().c_str()),0))

{

cerr<<"send fail."<<endl;

return ;

}

_str = "person talk success.";

}

else

{

char cmd1[100] = "SELECT * FROM user WHERE name='";

strcat(cmd1,sendname.c_str());

strcat(cmd1,"';");

if(mysql_real_query(Mysql_sever._mpcon,cmd1,strlen(cmd1)))

{

cerr<<"select fail."<<endl;

return ;

}

Mysql_sever._mp_res = mysql_store_result(Mysql_sever._mpcon);

if(Mysql_sever._mp_row = mysql_fetch_row(Mysql_sever._mp_res))

{

char cmd2[2048] = "INSERT INTO offline VALUE('";

strcat(cmd2,sendname.c_str());

strcat(cmd2,"','");

strcat(cmd2,recvname.c_str());

strcat(cmd2,"','");

strcat(cmd2,recvmsg.c_str());

strcat(cmd2,"');");

if(mysql_real_query(Mysql_sever._mpcon,cmd2,strlen(cmd2)))

{

cerr<<"insert fail."<<endl;

return ;

}

_str = "person talk success.";

}

else

{

_str = "person talk fail";

}

}

}

 

void Talk::response()

{

 //json打包后发给客户端,客户端解析,太麻烦,直接发送内容

send(_fd,_str.c_str(),strlen(_str.c_str()),0);

}