Python多线程实现tcp应答客户端和服务端

背景

近两日一边改毕设论文,一边学习python。从多任务开始,记录学习过程。java

此处实现一个tcp的应答程序,一个读线程一个写线程,python负责服务端,java负责客户端。任一端输入小写over,传输结束(另外一端须要按下回车便可退出)。python

 

服务端

服务端套接字的建立和监听

python服务端套接字的建立和监听与C类似,流程都是建立->绑定-.>监听。具体代码以下安全

tcpServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcpServerSocket.bind(("", 12345))
    tcpServerSocket.listen(128)

socket()方法第二个入参就表示tcpapp

bind()方法传入一个元组,前者是ip,这里为空,就是监听本机;后者是监听的端口号socket

listen()方法的入参表示一次性能够链接多少个客户端tcp

读写线程

读写线程差很少,为了方便控制,打算用一个类继承threading.Thread来实现线程ide

以写线程为例,首先要绑定客户端套接字和读线程学习

def setDestination(self, clientSocket, recvDataThread):
        self.clientSocket = clientSocket
        self.recvDataThread = recvDataThread

而后在run()方法里进行数据发送。为了控制线程的终止和运行,引入running字段进行控制。输入的内容是over时,running为false,同时调用terminate()方法控制读线程的退出测试

def run(self):
        self.running = True
        while (self.running):
            dataToSend = input("发给客户端:")
            if dataToSend != "\n" and dataToSend != "":
                self.clientSocket.send((dataToSend).encode("utf-8"))
            if dataToSend == "over":
                time.sleep(1)
                self.running = False
                self.clientSocket.close()
                self.recvDataThread.terminate()

延时1s是为了让服务端最后一条信息可以发出去,而terminate()方法就是把running标志位置为false,读写线程都有此方法this

def terminate(self):
        self.running = False

整个写线程类的代码以下

class SendDataThread(threading.Thread):
    def setDestination(self, clientSocket, recvDataThread):
        self.clientSocket = clientSocket
        self.recvDataThread = recvDataThread

    def terminate(self):
        self.running = False

    def run(self):
        self.running = True
        while (self.running):
            dataToSend = input("发给客户端:")
            if dataToSend != "\n" and dataToSend != "":
                self.clientSocket.send((dataToSend).encode("utf-8"))
            if dataToSend == "over":
                time.sleep(1)
                self.running = False
                self.clientSocket.close()
                self.recvDataThread.terminate()

读线程和写线程相似,只不过在接收数据时,有可能服务端发送了over后套接字已经关闭,从而recv方法会报出异常,此时只要try-except便可

class RecvDataThread(threading.Thread):
    def setSource(self, clientSocket, sendDataThread):
        self.clientSocket = clientSocket
        self.senDataThread = sendDataThread

    def terminate(self):
        self.running = False

    def run(self):
        self.running = True
        while (self.running):
            try:
                dataReceived = str(self.clientSocket.recv(1024), "utf-8")
                if dataReceived != "":
                    print("\n客户端来信:%s" % str(dataReceived))
                    if dataReceived == "over":
                        self.running = False
                        self.clientSocket.close()
                        self.senDataThread.terminate()
                        print("通讯结束,摁任意键关闭")
            except:
                pass

接收客户端套接字,启动读写线程

这个其实很简单,accept到了客户端信息,就能够启动读写线程了

print("等待客户端链接")
    tcpClientSocket, clientIp = tcpServerSocket.accept()
    print("新的客户端已链接:%s" % str(clientIp))

    sendDataThread = SendDataThread()
    recvDataThread = RecvDataThread()
    sendDataThread.setDestination(tcpClientSocket, recvDataThread)
    recvDataThread.setSource(tcpClientSocket, sendDataThread)

    sendDataThread.start()
    recvDataThread.start()

    sendDataThread.join()
    recvDataThread.join()

两个子线程join是为了避免让主线程提早结束

完整的服务端代码以下

import socket
import threading
import time

class SendDataThread(threading.Thread):
    def setDestination(self, clientSocket, recvDataThread):
        self.clientSocket = clientSocket
        self.recvDataThread = recvDataThread

    def terminate(self):
        self.running = False

    def run(self):
        self.running = True
        while (self.running):
            dataToSend = input("发给客户端:")
            if dataToSend != "\n" and dataToSend != "":
                self.clientSocket.send((dataToSend).encode("utf-8"))
            if dataToSend == "over":
                time.sleep(1)
                self.running = False
                self.clientSocket.close()
                self.recvDataThread.terminate()


class RecvDataThread(threading.Thread):
    def setSource(self, clientSocket, sendDataThread):
        self.clientSocket = clientSocket
        self.senDataThread = sendDataThread

    def terminate(self):
        self.running = False

    def run(self):
        self.running = True
        while (self.running):
            try:
                dataReceived = str(self.clientSocket.recv(1024), "utf-8")
                if dataReceived != "":
                    print("\n客户端来信:%s" % str(dataReceived))
                    if dataReceived == "over":
                        self.running = False
                        self.clientSocket.close()
                        self.senDataThread.terminate()
                        print("通讯结束,摁任意键关闭")
            except:
                pass

def main():
    tcpServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcpServerSocket.bind(("", 12345))
    tcpServerSocket.listen(128)

    tcpClientSocket = None
    serverOnUse = False

    print("等待客户端链接")
    tcpClientSocket, clientIp = tcpServerSocket.accept()
    print("新的客户端已链接:%s" % str(clientIp))

    sendDataThread = SendDataThread()
    recvDataThread = RecvDataThread()
    sendDataThread.setDestination(tcpClientSocket, recvDataThread)
    recvDataThread.setSource(tcpClientSocket, sendDataThread)

    sendDataThread.start()
    recvDataThread.start()

    sendDataThread.join()
    recvDataThread.join()

    tcpServerSocket.close()


main()

客户端

客户端用java实现,除了一开始是直接链接服务端套接字外,其他和服务端相似,主要也是读写线程负责接收和发送数据,而且两个线程能够相互控制对方的结束。所以直接贴代码便可,不作过多解释

package practice;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.*;

public class dm10 {

    public static void main(String[] args) {
		final Scanner input = new Scanner(System.in);
        try {
            Socket server = new Socket("localhost", 12345);
            System.out.println("已链接服务端:" + server.getInetAddress().getHostAddress());

            SendThread sendThread = new SendThread(server);
            RecvThread recvThread = new RecvThread(server);
            sendThread.setRecvThread(recvThread);
            recvThread.setSendThread(sendThread);
            final Thread tSendData = new Thread(sendThread);
            final Thread tRecvData = new Thread(recvThread);
            
            tSendData.start();            
            tRecvData.start();
                        
            tRecvData.join();
            tSendData.join();

            System.out.println("over..");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

class SendThread implements Runnable{
	private final Scanner input = new Scanner(System.in);
	private Socket serverSocket;
	private BufferedOutputStream outputStream;
	private RecvThread recvThread;
	private boolean running = false;
	
	public SendThread(Socket serverSocket) throws IOException {
		this.serverSocket = serverSocket;
		outputStream = new BufferedOutputStream(serverSocket.getOutputStream());
	}
	
	public void setRecvThread(RecvThread recvThread) {
		this.recvThread = recvThread;
	}
	
	public void terminate() {
		running = false;
	}
	
	@Override
	public void run() {
		try {
			running = true;
			while(running) {
				String info = input.nextLine();
				if (running) {
					outputStream.write(info.getBytes("utf-8"));
					outputStream.flush();
				}				
				running = !info.contains("over") && !serverSocket.isClosed();
			}
			recvThread.terminate();
			outputStream.close();
		} catch (IOException e) {
			try {
				running = false;
				recvThread.terminate();
				outputStream.close();
			} catch (IOException e1) {				
				
			}
		}
	}
}

class RecvThread implements Runnable{
	private Socket serverSocket;
	private BufferedInputStream inputStream;
	private SendThread sendThread;
	private boolean running = false;
	
	public RecvThread(Socket serverSocket) throws IOException {
		this.serverSocket = serverSocket;
		inputStream = new BufferedInputStream(serverSocket.getInputStream());
	}
	
	public void setSendThread(SendThread sendThread) {
		this.sendThread = sendThread;
	}
	
	public void terminate() {
		running = false;
	}
	
	@Override
	public void run() {
		try {
			running = true;
			while(running) {
				byte[] bytes = new byte[1024];
				int len;
				StringBuffer stringBuffer = new StringBuffer();
				while (inputStream.available() > 0 && (len = inputStream.read(bytes)) != -1) {
					stringBuffer.append(new String(bytes, 0, len, "utf-8"));
				}
				String fromServer = stringBuffer.toString();
				if (!fromServer.isEmpty()) {
					System.out.println("服务端来信:" + fromServer);
				}
				running = !fromServer.contains("over") && !serverSocket.isClosed();
				if (fromServer.contains("over")) {
					System.out.println("通讯结束,按任意键关闭");
				}
			}
			inputStream.close();
			sendThread.terminate();
		} catch (Exception e) {
			try {
				running = false;
				inputStream.close();
				sendThread.terminate();
			} catch (IOException e1) {				
				e1.printStackTrace();
			}
			e.printStackTrace();
		}
		
	}
}

测试

主要是测试可否安全退出。

先测试服务端通知客户端结束

再测试客户端通知服务端结束

双方都既能够连续接收消息,也能连续发送消息,还能随时关闭,功能实现

 

结语

tcp或udp通讯,实际流程是固定的,所以即使不一样语言语法有差别,依旧能够照猫画虎地写出来。