万物互联之~网络编程深刻篇

 

深刻篇

上节回顾:5种IO模型 | IO多路复用 and 万物互联之~网络编程增强篇php

官方文档:https://docs.python.org/3/library/internet.htmlhtml

总结篇:【总结篇】一文搞定网络编程python

1.概念回顾

1.1.TCP三次握手

画一张图来通俗化讲讲TCP三次握手: 1.通俗化讲git

用代码来讲,大概过程就是: 1.tcp3次握手.pnggithub

1.2.TCP四次挥手

画图通俗讲下TCP四次挥手: 2.tcp4次挥手.pngweb

用代码来讲,大概过程就是: 2.tcp4次挥手2.png数据库

其实这个也很好的解释了以前的端口占用问题,若是是服务端先断开链接,那么服务器就是四次挥手的发送方,最后一次消息是得不到回复的,端口就会保留一段时间(服务端的端口固定)也就会出现端口占用的状况。若是是客户端先断开,那下次链接会自动换个端口,不影响(客户端的端口是随机分配的)编程

PS:以前咱们讲端口就是send一个空消息,不少人不是很清楚,这边简单验证下就懂了: 2.测试.gifjson

1.3.HTTP

以前其实已经写了个简版的Web服务器了,简单回顾下流程:flask

  1. 输入要访问的网址,在回车的那一瞬间浏览器和服务器创建了TCP三次握手
  2. 而后浏览器send一个http的请求报文,服务器接recv以后进行相应的处理并返回对应的页面
  3. 浏览器关闭页面时(client close),进行了TCP四次挥手

而后简单说下HTTP状态码

  1. 20x系列:服务器正常响应
  2. 30x系列:重定向
    • 301:表明永久重定向,浏览器下次访问这个页面就直接去目的url了(不推荐)
    • 302:临时重定向,项目升级以后Url常常变,这个302常常用
      • eg:访问baidu.com =302=> www.baidu.com
    • 304:这个是重定向到本地缓存(以前NodeJS说过就不详细说了)
      • 服务器文件没更新,浏览器就直接访问本地缓存了
  3. 40x系列:通常都是客户端请求有问题
    • eg: 404 not found
  4. 50x系列:通常都是服务端出问题了
    • eg:500 Server Error

2.动态服务器(WSGI

2.1.简化版动态服务器

咱们先本身定义一个动态服务器:

import re
import socket

class HTTPServer(object):
    def __init__(self):
        with socket.socket() as tcp_server:
            tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            tcp_server.bind(('', 8080))
            tcp_server.listen()
            while True:
                self.client_socket, self.client_address = tcp_server.accept()
                self.handle()

    def response(self, status, body=None):
        print(status)
        header = f"HTTP/1.1 {status}\r\n\r\n"
        with self.client_socket:
            self.client_socket.send(header.encode("utf-8"))
            if body:
                self.client_socket.send(body)

    def __static_handler(self, name):
        try:
            with open(f"./www{name}", "rb") as fs:
                return fs.read()
        except Exception as ex:
            print(ex)
            return None

    def __dynamic_handler(self, name):
        try:
            m = __import__(name)
            return m.application().encode("utf-8")
        except Exception as ex:
            print(ex)
            return None

    def handle(self):
        with self.client_socket:
            print(f"[来自{self.client_address}的消息:]\n")
            data = self.client_socket.recv(2048)
            if data:
                header, _ = data.decode("utf-8").split("\r\n", 1)
                print(header)
                # GET /xxx HTTP/1.1
                ret = re.match("^\w+? (/[^ ]*) .+$", header)
                if ret:
                    url = ret.groups(1)[0]
                    # Python三元表达式(以前好像忘说了)
                    url = "/index.html" if url == "/" else url
                    print("请求url:", url)
                    body = str()
                    # 动态页面
                    if ".py" in url:
                        # 提取模块名(把开头的/和.py排除)
                        body = self.__dynamic_handler(url[1:-3])
                    else:  # 静态服务器
                        body = self.__static_handler(url)
                    # 根据返回的body内容,返回对应的响应码
                    if body:
                        self.response("200 ok", body)
                    else:
                        self.response("404 Not Found")
                else:  # 匹配不到url(基本上不会发生,不排除恶意修改)
                    self.response((404, "404 Not Found"))

if __name__ == "__main__":
    import sys
    # 防止 __import__ 导入模块的时候找不到,忘了能够查看:
    # https://www.cnblogs.com/dotnetcrazy/p/9253087.html#5.本身添加模块路径
    sys.path.insert(1, "./www/bin")
    HTTPServer()

效果: 3.动态服务器.gif

代码不难其中有个技术点说下:模块名为字符串怎么导入

# test.py
# 若是模块名是字符串,须要使用__import__
s = "time"
time = __import__(s)

def application():
    return time.ctime() # 返回字符串

if __name__ == "__main__":
    time_str = application()
    print(type(time_str))
    print(time_str)

输出:

<class 'str'>
Thu Dec 20 22:48:07 2018

2.2.路由版动态服务器

和上面基本同样,多了个路由表(self.router_urls)而已

import re
import socket

class HttpServer(object):
    def __init__(self):
        # 路由表
        self.router_urls = {"/test": "/test.py", "/user": "/test2.py"}

    def run(self):
        with socket.socket() as server:
            # 端口复用
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server.bind(("", 8080))
            server.listen()
            while True:
                self.client_socket, self.client_address = server.accept()
                print(f"[{self.client_address}已上线]")
                self.handler()

    def response(self, status, body=None):
        with self.client_socket as socket:
            header = f"HTTP/1.1 {status}\r\n\r\n"
            socket.send(header.encode("utf-8"))
            if body:
                socket.send(body)

    def __static_handler(self, name):
        try:
            with open(f"./www{name}", "rb") as fs:
                return fs.read()
        except Exception as ex:
            print(ex)
            return None

    def __dynamic_handler(self, name):
        try:
            m = __import__(name)
            return m.application().encode("utf-8")
        except Exception as ex:
            print(ex)
            return None

    def handler(self):
        data = self.client_socket.recv(2048)
        if data:
            header, _ = data.decode("utf-8").split("\r\n", 1)
            # GET /xxx HTTP/1.1
            ret = re.match("^\w+? (/[^ ]*) .+$", header)
            if ret:
                url = ret.group(1)
                print(url)  # print url log
                body = None
                # 路由有记录:动态页面
                if url in self.router_urls.keys():
                    url = self.router_urls[url]
                    # 切片提取模块名
                    body = self.__dynamic_handler(url[1:-3])
                else:  # 静态服务器
                    if url == "/":
                        url = "/index.html"
                    body = self.__static_handler(url)
                # 没有这个页面或者出错
                if body:
                    self.response("200 ok", body)
                else:
                    self.response("404 Not Found")
            else:
                # 404
                self.response("404 Not Found")
        else:
            print(f"{self.client_address}已下线")
            self.client_socket.close()

if __name__ == "__main__":
    import sys
    # 临时添加模块所在路径
    sys.path.insert(1, "./www/bin")
    HttpServer().run()

输出: 4.含路由的动态服务器.png

看一眼test2.py

# test2.py
def application():
    return "My Name Is XiaoMing"

if __name__ == "__main__":
    print(application())

2.3.官方接口版

官方文档:https://docs.python.org/3/library/wsgiref.html

其实Python官方提供了一个WSGI:Web Server Gateway Interface的约定:

它只要求Web开发者实现一个application函数,就能够响应HTTP请求

2.3.1演示

eg:(只要对应的python文件提供了 application(env,start_response) 方法就好了)

# hello.py
# env 是一个字典类型
def application(env, start_response):
    # 设置动态页面的响应头(回头服务器会再加上本身的响应头)
    # 列表里面的 item 是 tuple
    start_response("200 OK", [("Content-Type", "text/html")])
    # 返回一个列表
    return ["<h1>This is Test!</h1>".encode("utf-8")]

先使用官方的简单服务器看看:

from wsgiref.simple_server import make_server
# 导入咱们本身编写的application函数:
from hello import application

# 建立一个服务器,端口是8080,处理函数是application:
httpd = make_server('', 8080, application)
print('Serving HTTP on port 8080...')
# 开始监听HTTP请求:
httpd.serve_forever()

运行后效果:127.0.0.1:8080

This is Test!

若是把hello.py改为下面代码(服务端不变),那么就能够获取一些请求信息了:

def application(env, start_response):
    print(env["PATH_INFO"])
    start_response("200 OK", [("Content-Type", "text/html")])
    return [f'<h1>Hello, {env["PATH_INFO"][1:] or "web"}!</h1>'.encode("utf-8")]

输出: 5.env.png

2.3.2说明

上面的application()函数就是符合WSGI标准的一个HTTP处理函数,它接收两个参数:

  1. environ:一个包含全部HTTP请求信息的dict对象;
  2. start_response:一个发送HTTP响应的函数(调用服务器定义的方法)
    • Header只能发送一次 ==> 只能调用一次start_response()函数

有了WSGI,咱们关心的就是如何从env这个dict对象拿到HTTP请求信息,而后构造HTML,经过start_response()发送Header,最后返回Body内容

Python内置了一个WSGI服务器,这个模块叫wsgiref,它是用纯Python编写的WSGI服务器的参考实现(彻底符合WSGI标准,可是不考虑任何运行效率,仅供开发和测试使用)

PS:这样的好处就是,只要符合WSGI规范的服务器,咱们均可以直接使用了

其实经过源码就能够知道这个WSGIServer究竟是何方神圣了:

class WSGIServer(HTTPServer):
    pass

# HTTPServer其实就是基于TCPServer
class HTTPServer(socketserver.TCPServer):
    pass

# 这个就是咱们开头说的Python封装的简单WebServer了
class TCPServer(BaseServer):
    pass

若是仍是记不得能够回顾下上次说的内容,提示:

__all__ = ["BaseServer", "TCPServer", "UDPServer",
           "ThreadingUDPServer", "ThreadingTCPServer",
           "BaseRequestHandler", "StreamRequestHandler",
           "DatagramRequestHandler", "ThreadingMixIn"]

若是你想要在这个基础上进行处理,能够和上面说的同样,定义一个继承class WSGIRequestHandler(BaseHTTPRequestHandler)的类,而后再处理

2.3.3.自定义

在本小节结束前咱们模仿一下示例,定义一个符合WSGI规范的简单服务器:

import re
import socket
from index import WebFrame

class WSGIServer(object):
    def __init__(self):
        # 请求头
        self.env = dict()
        # 存放处理后的响应头
        self.response_headers = str()

    def run(self):
        with socket.socket() as server:
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server.bind(("", 8080))
            server.listen()
            while True:
                self.client_socket, self.client_address = server.accept()
                self.handler()

    # 转换浏览器请求头格式
    def request_headers_handler(self, headers):
        # 过滤一下空字符串(不能过滤空列表)
        headers = list(filter(None, headers.split("\r\n")))
        # 提取 Method 和 Url
        ret = re.match("^([\w]+?) (/[^ ]*?) .+$", headers[0])
        if ret:
            self.env["method"] = ret.group(1)
            url = ret.group(2)
            print(url)
            self.env["path"] = "/index.html" if url == "/" else url
        else:
            return None
        # [['Host', ' localhost:8080'], ['Connection', ' keep-alive']...]
        array = map(lambda item: item.split(":", 1), headers[1:])
        for item in array:
            self.env[item[0].lower()] = item[1]
        # print(self.env)
        return "ok"

    # 响应客户端(吐槽一下,request和response的headers为毛格式不同,这设计真不合理!)
    def start_response(self, status, header_list=[]):
        # 响应头
        self.response_headers = f"HTTP/1.1 {status}\r\n"
        for item in header_list:
            self.response_headers += f"{item[0]}:{item[1]}\r\n"
        # print(self.response_headers)

    # 响应浏览器
    def response(self, body):
        with self.client_socket as client:
            # 省略一系列服务器响应的headers
            self.response_headers += "server:WSGIServer\r\n\r\n"
            client.send(self.response_headers.encode("utf-8"))
            if body:
                client.send(body)

    def handler(self):
        with self.client_socket as client:
            data = client.recv(2048)
            if data:
                # 浏览器请求头
                headers = data.decode("utf-8")
                if self.request_headers_handler(headers):
                    # 模仿php全部请求都一个文件处理
                    body = WebFrame().application(self.env,
                                                  self.start_response)
                    # 响应浏览器
                    self.response(body)
                else:
                    self.start_response("404 Not Found")
            else:
                client.close()

if __name__ == "__main__":
    WSGIServer().run()

本身定义的框架:

class WebFrame(object):
    def __init__(self):
        # 路由表
        self.router_urls = {"/time": "get_time", "/user": "get_name"}

    def get_time(self):
        import time
        return time.ctime().encode("utf-8")

    def get_name(self):
        return "<h2>My Name Is XiaoMing</h2>".encode("utf-8")

    def application(self, env, start_response):
        body = b""
        url = env["path"]
        # 请求的页面都映射到路由对应的方法中
        if url in self.router_urls.keys():
            func = self.router_urls[url]
            body = getattr(self, func)()
        else:
            # 不然就请求对应的静态资源
            try:
                with open(f"./www{url}", "rb") as fs:
                    body = fs.read()
            except Exception as ex:
                start_response("404 Not Found")
                print(ex)
                return b"404 Not Found"  # 出错就直接返回了
        # 返回对应的页面响应头
        start_response("200 ok", [("Content-Type", "text/html"),
                                  ("Scripts", "Python")])
        return body

输出: 6.wsgi.png

知识扩展:

从wsgiref模块导入
https://docs.python.org/3/library/wsgiref.html

Python服务器网关接口
https://www.python.org/dev/peps/pep-3333/

Python原始套接字和流量嗅探
https://blog.csdn.net/cj1112/article/details/51303021
https://blog.csdn.net/peng314899581/article/details/78082244

【源码阅读】轻量级Web框架:bottle
https://github.com/bottlepy/bottle
 

3.RPC引入

上篇回顾:万物互联之~深刻篇

其余专栏最新篇:协程增强之~兼容答疑篇 | 聊聊数据库~SQL环境篇

Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/

3.1.概念

RPC(Remote Procedure Call):分布式系统常见的一种通讯方法(远程过程调用),通俗讲:能够一台计算机的程序调用另外一台计算机的子程序(能够把它当作以前咱们说的进程间通讯,只不过这一次的进程不在同一台PC上了)

PS:RPC的设计思想是力图使远程调用中的通信细节对于使用者透明,调用双方无需关心网络通信的具体实现

引用一张网上的图: 1.rpc.png

HTTP有点类似,你能够这样理解:

  1. 老版本的HTTP/1.0是短连接,而RPC是长链接进行通讯
    • HTTP协议(header、body),RPC能够采起HTTP协议,也能够自定义二进制格式
  2. 后来HTTP/1.1支持了长链接(Connection:keep-alive),基本上和RPC差很少了
    • keep-alive通常都限制有最长时间,或者最多处理的请求数,而RPC是基于长链接的,基本上没有这个限制
  3. 后来谷歌直接基于HTTP/2.0创建了gRPC,它们之间的基本上也就差很少了
    • 若是硬是要区分就是:HTTP-普通话RPC-方言的区别了
    • RPC高效而小众,HTTP效率没RPC高,但更通用
  4. PS:RPCHTTP调用不用通过中间件,而是端到端的直接数据交互
    • 网络交互能够理解为基于Socket实现的(RPCHTTP都是Socket的读写操做)

简单归纳一下RPC的优缺点就是:

  1. 优势:
    1. 效率更高(能够自定义二进制格式)
    2. 发起RPC调用的一方,在编写代码时可忽略RPC的具体实现(跟编写本地函数调用通常
  2. 缺点:
    • 通用性不如HTTP(方言普及程度确定不如普通话),若是传输协议不是HTTP协议格式,调用双方就须要专门实现通讯库

PS:HTTP更可能是ClientServer的通信;RPC更可能是内部服务器间的通信

3.2.引入

上面说这么多,可能尚未来个案例实在,咱们看个案例:

本地调用sum()

def sum(a, b):
    """return a+b"""
    return a + b

def main():
    result = sum(1, 2)
    print(f"1+2={result}")

if __name__ == "__main__":
    main()

输出:(这个你们都知道)

1+2=3

1.xmlrpc案例

官方文档:

https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html

都说RPC用起来就像本地调用同样,那么用起来啥样呢?看个案例:

服务端:(CentOS7:192.168.36.123:50051)

from xmlrpc.server import SimpleXMLRPCServer

def sum(a, b):
    """return a+b"""
    return a + b

# PS:50051是gRPC默认端口
server = SimpleXMLRPCServer(('', 50051))
# 把函数注册到RPC服务器中
server.register_function(sum)
print("Server启动ing,Port:50051")
server.serve_forever()

客户端:(Win10:192.168.36.144

from xmlrpc.client import ServerProxy

stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")

输出:(Client用起来是否是和本地差很少?就是经过代理访问了下RPCServer而已)

1+2=3

2.server.png

PS:CentOS服务器不是你绑定个端口就必定能访问的,若是不能记让防火墙开放对应的端口

这个以前在说MariaDB环境的时候有详细说:http://www.noobyard.com/article/p-ywtyxuzh-dw.html

# 添加 --permanent永久生效(没有此参数重启后失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent

2.ZeroRPC案例:

zeroRPC用起来和这个差很少,也简单举个例子吧:

把服务的某个方法注册到RPCServer中,供外部服务调用

import zerorpc

class Test(object):
    def say_hi(self, name):
        return f"Hi,My Name is{name}"


# 注册一个Test的实例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()

调用服务端代码

import zerorpc

client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)

3.3.简单版自定义RPC

看了上面的引入案例,是否是感受RPC不过如此?NoNoNo,要是真这么简单也就谈不上RPC架构了,上面两个是最简单的RPC服务了,能够这么说:生产环境基本上用不到,只能当案例练习罢了,对Python来讲,最经常使用的RPC就两个gRPC and Thrift

PS:国产最出名的是Dubbo and Tars,Net最经常使用的是gRPCThriftSurging

1.RPC服务的流程

要本身实现一个RPC Server那么就得了解整个流程了:

  1. Client(调用者)以本地调用的方式发起调用
  2. 经过RPC服务进行远程过程调用(RPC的目标就是要把这些步骤都封装起来,让使用者感受不到这个过程)
    1. 客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议
    2. 客户端的RPC Proxy组件在打包完成后经过网络把数据包发送给RPC Server
    3. 服务端的RPC Proxy组件把经过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数
    4. 服务端的RPC Proxy组件根据方法名和参数进行本地调用
    5. RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
    6. 服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并经过网络发送给客户端的RPC Proxy组件
    7. 客户端的RPC Proxy组件收到数据包后,进行拆包解码,把数据返回给Client
  3. Client(调用者)获得本次RPC调用的返回结果

用一张时序图来描述下整个过程: 4.时序图.png

PS:RPC Proxy有时候也叫Stub(存根):(Client Stub,Server Stub)

为屏蔽客户调用远程主机上的对象,必须提供某种方式来模拟本地对象,这种本地对象称为存根(stub),存根负责接收本地方法调用,并将它们委派给各自的具体实现对象

PRC服务实现的过程当中其实就两核心点:

  1. 消息协议:客户端调用的参数和服务端的返回值这些在网络上传输的数据以何种方式打包编码和拆包解码
    • 经典表明:Protocol Buffers
  2. 传输控制:在网络中数据的收发传输控制具体如何实现(TCP/UDP/HTTP

2.手写RPC

下面咱们就根据上面的流程来手写一个简单的RPC:

1.Client调用:

# client.py
from client_stub import ClientStub

def main():
    stub = ClientStub(("192.168.36.144", 50051))

    result = stub.get("sum", (1, 2))
    print(f"1+2={result}")

    result = stub.get("sum", (1.1, 2))
    print(f"1.1+2={result}")

    time_str = stub.get("get_time")
    print(time_str)

if __name__ == "__main__":
    main()

输出:

1+2=3
1.1+2=3.1
Wed Jan 16 22

2.Client Stub,客户端存根:(主要有打包解包、和RPC服务器通讯的方法)

# client_stub.py
import socket

class ClientStub(object):
    def __init__(self, address):
        """address ==> (ip,port)"""
        self.socket = socket.socket()
        self.socket.connect(address)

    def convert(self, obj):
        """根据类型转换成对应的类型编号"""
        if isinstance(obj, int):
            return 1
        if isinstance(obj, float):
            return 2
        if isinstance(obj, str):
            return 3

    def pack(self, func, args):
        """打包:把方法和参数拼接成自定义的协议
        格式:func:函数名@params:类型-参数,类型2-参数2...
        """
        result = f"func:{func}"
        if args:
            params = ""
            # params:类型-参数,类型2-参数2...
            for item in args:
                params += f"{self.convert(item)}-{item},"
            # 去除最后一个,
            result += f"@params:{params[:-1]}"
        # print(result)  # log 输出
        return result.encode("utf-8")

    def unpack(self, data):
        """解包:获取返回结果"""
        msg = data.decode("utf-8")
        # 格式应该是"data:xxxx"
        params = msg.split(":")
        if len(params) > 1:
            return params[1]
        return None

    def get(self, func, args=None):
        """1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""
        data = self.pack(func, args)
        # 2.客户端的RPC Proxy组件在打包完成后经过网络把数据包发送给RPC Server
        self.socket.send(data)
        # 等待服务端返回结果
        data = self.socket.recv(2048)
        if data:
            return self.unpack(data)
        return None

简要说明下:(我根据流程在Code里面标注了,看起来应该很轻松)

以前有说到核心其实就是消息协议and传输控制,我客户端存根的消息协议是自定义的格式(后面会说简化方案):func:函数名@params:类型-参数,类型2-参数2...,传输我是基于TCP进行了简单的封装


3.Server端:(实现很简单)

# server.py
import socket
from server_stub import ServerStub

class RPCServer(object):
    def __init__(self, address, mycode):
        self.mycode = mycode
        # 服务端存根(RPC Proxy)
        self.server_stub = ServerStub(mycode)
        # TCP Socket
        self.socket = socket.socket()
        # 端口复用
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 绑定端口
        self.socket.bind(address)

    def run(self):
        self.socket.listen()
        while True:
            # 等待客户端链接
            client_socket, client_addr = self.socket.accept()
            print(f"来自{client_addr}的请求:\n")
            try:
                # 交给服务端存根(Server Proxy)处理
                self.server_stub.handle(client_socket, client_addr)
            except Exception as ex:
                print(ex)

if __name__ == "__main__":
    from server_code import MyCode
    server = RPCServer(('', 50051), MyCode())
    print("Server启动ing,Port:50051")
    server.run()

为了简洁,服务端代码我单独放在了server_code.py中:

# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):
    def sum(self, a, b):
        return a + b

    def get_time(self):
        import time
        return time.ctime()

4.而后再看看重头戏Server Stub:

# server_stub.py
import socket

class ServerStub(object):
    def __init__(self, mycode):
        self.mycode = mycode

    def convert(self, num, obj):
        """根据类型编号转换类型"""
        if num == "1":
            obj = int(obj)
        if num == "2":
            obj = float(obj)
        if num == "3":
            obj = str(obj)
        return obj

    def unpack(self, data):
        """3.服务端的RPC Proxy组件把经过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
        msg = data.decode("utf-8")
        # 格式应该是"格式:func:函数名@params:类型编号-参数,类型编号2-参数2..."
        array = msg.split("@")
        func = array[0].split(":")[1]
        if len(array) > 1:
            args = list()
            for item in array[1].split(":")[1].split(","):
                temps = item.split("-")
                # 类型转换
                args.append(self.convert(temps[0], temps[1]))
            return (func, tuple(args))  # (func,args)
        return (func, )

    def pack(self, result):
        """打包:把方法和参数拼接成自定义的协议"""
        # 格式:"data:返回值"
        return f"data:{result}".encode("utf-8")

    def exec(self, func, args=None):
        """4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
        # 若是没有这个方法则返回None
        func = getattr(self.mycode, func, None)
        if args:
            return func(*args)  # 解包
        else:
            return func()  # 无参函数

    def handle(self, client_socket, client_addr):
        while True:
            # 获取客户端发送的数据包
            data = client_socket.recv(2048)
            if data:
                try:
                    data = self.unpack(data)  # 解包
                    if len(data) == 1:
                        data = self.exec(data[0])  # 执行无参函数
                    elif len(data) > 1:
                        data = self.exec(data[0], data[1])  # 执行带参函数
                    else:
                        data = "RPC Server Error Code:500"
                except Exception as ex:
                    data = "RPC Server Function Error"
                    print(ex)
                # 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并经过网络发送给客户端的RPC Proxy组件
                data = self.pack(data)  # 把函数执行结果按指定协议打包
                # 把处理过的数据发送给客户端
                client_socket.send(data)
            else:
                print(f"客户端:{client_addr}已断开\n")
                break

再简要说明一下:里面方法其实主要就是解包执行函数返回值打包

输出图示: 3.div.png

再贴一下上面的时序图: 4.时序图.png

课外拓展:

HTTP1.0、HTTP1.1 和 HTTP2.0 的区别
https://www.cnblogs.com/heluan/p/8620312.html

简述分布式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994

分布式基础—RPC
http://www.dataguru.cn/article-14244-1.html

下节预估:RPC服务进一步简化与演变手写一个简单的REST接口

 

4.RPC简化与提炼

上篇回顾:万物互联之~RPC专栏 http://www.noobyard.com/article/p-mzajpmez-c.html

上节课解答

以前有网友问,不少开源的RPC中都是使用路由表,这个怎么实现?

其实路由表实现起来也简单,代码基本上不变化,就修改一下server_stub.py__init__exe两个方法就能够了:

class ServerStub(object):
    def __init__(self, mycode):
        self.func_dict = dict()
        # 初始化一个方法名和方法的字典({func_name:func})
        for item in mycode.__dir__():
            if not item.startswith("_"):
                self.func_dict[item] = getattr(mycode, item)

    def exec(self, func, args=None):
        """4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
        # 若是没有这个方法则返回None
        # func = getattr(self.mycode, func, None)
        func = self.func_dict[func]
        if args:
            return func(*args)  # 解包
        else:
            return func()  # 无参函数

4.1.Json序列化

Python比较6的同志对上节课的Code确定嗤之以鼻,上次自定义协议是同的通用方法,这节课咱们先来简化下代码:

再贴一下上节课的时序图: 4.时序图.png

1.Json知识点

官方文档:https://docs.python.org/3/library/json.html

# 把字典对象转换为Json字符串
json_str = json.dumps({"func": func, "args": args})

# 把Json字符串从新变成字典对象
data = json.loads(data)
func, args = data["func"], data["args"]

须要注意的就是类型转换了(eg:python tuple ==> json array

Python JSON
dict object
list, tuple array
str string
int, float number
True true
False false
None null

PS:序列化:json.dumps(obj),反序列化:json.loads(json_str)

2.消息协议采用Json格式

在原有基础上只须要修改下Stubpackunpack方法便可

Client_Stub(类型转换都省掉了)

import json
import socket

class ClientStub(object):
    def pack(self, func, args):
        """打包:把方法和参数拼接成自定义的协议
        格式:{"func": "sum", "args": [1, 2]}
        """
        json_str = json.dumps({"func": func, "args": args})
        # print(json_str)  # log 输出
        return json_str.encode("utf-8")

    def unpack(self, data):
        """解包:获取返回结果"""
        data = data.decode("utf-8")
        # 格式应该是"{data:xxxx}"
        data = json.loads(data)
        # 获取不到就返回None
        return data.get("data", None)

    # 其余Code我没有改变

Server Stub()

import json
import socket

class ServerStub(object):
    def unpack(self, data):
        """3.服务端的RPC Proxy组件把经过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
        data = data.decode("utf-8")
        # 格式应该是"格式:{"func": "sum", "args": [1, 2]}"
        data = json.loads(data)
        func, args = data["func"], data["args"]
        if args:
            return (func, tuple(args))  # (func,args)
        return (func, )

    def pack(self, result):
        """打包:把方法和参数拼接成自定义的协议"""
        # 格式:"data:返回值"
        json_str = json.dumps({"data": result})
        return json_str.encode("utf-8")

    # 其余Code我没有改变

输出图示: 3.div.png

4.2.Buffer序列化

RPC其实更多的是二进制的序列化方式,这边简单介绍下

1.pickle知识点

官方文档:https://docs.python.org/3/library/pickle.html

用法和Json相似,PS:序列化:pickle.dumps(obj),反序列化:pickle.loads(buffer)

2.简单案例

和Json案例相似,也只是改了packunpack,我这边就贴一下完整代码(防止被吐槽)

1.Client

# 和上一节同样
from client_stub import ClientStub

def main():
    stub = ClientStub(("192.168.36.144", 50051))

    result = stub.get("sum", (1, 2))
    print(f"1+2={result}")

    result = stub.get("sum", (1.1, 2))
    print(f"1.1+2={result}")

    time_str = stub.get("get_time")
    print(time_str)

if __name__ == "__main__":
    main()

2.ClientStub

import socket
import pickle

class ClientStub(object):
    def __init__(self, address):
        """address ==> (ip,port)"""
        self.socket = socket.socket()
        self.socket.connect(address)

    def pack(self, func, args):
        """打包:把方法和参数拼接成自定义的协议"""
        return pickle.dumps((func, args))

    def unpack(self, data):
        """解包:获取返回结果"""
        return pickle.loads(data)

    def get(self, func, args=None):
        """1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""
        data = self.pack(func, args)
        # 2.客户端的RPC Proxy组件在打包完成后经过网络把数据包发送给RPC Server
        self.socket.send(data)
        # 等待服务端返回结果
        data = self.socket.recv(2048)
        if data:
            return self.unpack(data)
        return None

3.Server

# 和上一节同样
import socket
from server_stub import ServerStub

class RPCServer(object):
    def __init__(self, address, mycode):
        self.mycode = mycode
        # 服务端存根(RPC Proxy)
        self.server_stub = ServerStub(mycode)
        # TCP Socket
        self.socket = socket.socket()
        # 端口复用
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 绑定端口
        self.socket.bind(address)

    def run(self):
        self.socket.listen()
        while True:
            # 等待客户端链接
            client_socket, client_addr = self.socket.accept()
            print(f"来自{client_addr}的请求:\n")
            try:
                # 交给服务端存根(Server Proxy)处理
                self.server_stub.handle(client_socket, client_addr)
            except Exception as ex:
                print(ex)

if __name__ == "__main__":
    from server_code import MyCode
    server = RPCServer(('', 50051), MyCode())
    print("Server启动ing,Port:50051")
    server.run()

4.ServerCode

# 和上一节同样
# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):
    def sum(self, a, b):
        return a + b

    def get_time(self):
        import time
        return time.ctime()

5.ServerStub

import socket
import pickle

class ServerStub(object):
    def __init__(self, mycode):
        self.mycode = mycode

    def unpack(self, data):
        """3.服务端的RPC Proxy组件把经过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
        func, args = pickle.loads(data)
        if args:
            return (func, args)  # (func,args)
        return (func, )

    def pack(self, result):
        """打包:把方法和参数拼接成自定义的协议"""
        return pickle.dumps(result)

    def exec(self, func, args=None):
        """4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
        # 若是没有这个方法则返回None
        func = getattr(self.mycode, func)
        if args:
            return func(*args)  # 解包
        else:
            return func()  # 无参函数

    def handle(self, client_socket, client_addr):
        while True:
            # 获取客户端发送的数据包
            data = client_socket.recv(2048)
            if data:
                try:
                    data = self.unpack(data)  # 解包
                    if len(data) == 1:
                        data = self.exec(data[0])  # 执行无参函数
                    elif len(data) > 1:
                        data = self.exec(data[0], data[1])  # 执行带参函数
                    else:
                        data = "RPC Server Error Code:500"
                except Exception as ex:
                    data = "RPC Server Function Error"
                    print(ex)
                # 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并经过网络发送给客户端的RPC Proxy组件
                data = self.pack(data)  # 把函数执行结果按指定协议打包
                # 把处理过的数据发送给客户端
                client_socket.send(data)
            else:
                print(f"客户端:{client_addr}已断开\n")
                break

输出图示: 3.div.png

而后关于RPC高级的内容(会涉及到注册中心),我们后面说架构的时候继续,网络这边就说到这

5.Restful API

RESTful只是接口协议规范,它是创建在http基础上的,咱们在网络增强篇的末尾简单带一下,后面讲爬虫应该会再给你们说的

5.1.实现一个简单的REST接口

在编写REST接口时,通常都是为HTTP服务的。为了实现一个简单的REST接口,你只需让代码知足Python的WSGI标准便可

1.Restful引入

这边我就不本身实现了(上面手写服务器的时候其实已经展现了Restful接口是啥样),用Flask快速过一遍:

看个引入案例:

import flask

app = flask.Flask(__name__)

@app.route("/")
def index():
    return "This is Restful API Test"

if __name__ == "__main__":
    app.run()

图示输出: 5.api.png

Server Log:

* Serving Flask app "1.test" (lazy loading)
 * Environment: production
   WARNING: Do not use the development server in a production environment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:8080/ (Press CTRL+C to quit)
127.0.0.1 - - [17/Jan/2019 17:24:02] "GET / HTTP/1.1" 200 -

2.简单版RESTful Services

举个查询服务器节点信息的例子:/api/servers/

import flask
from infos import info_list

app = flask.Flask(__name__)

# Json的404自定义处理(不加自定义处理会返回默认404页面)
@app.errorhandler(404)
def not_found(error):
    return flask.make_response(
        flask.jsonify({
            "data": "Not Found",
            "status": 404
        }), 404)

# 运行Get和Post请求
@app.route("/api/v1.0/servers/<name>", methods=["GET", "POST"])
def get_info(name):
    infos = list(filter(lambda item: item["name"] == name, info_list))
    if len(infos) == 0:
        flask.abort(404) # 404
    # 基于json.dumps的封装版
    return flask.jsonify({"infos": infos})  # 返回Json字符串

if __name__ == "__main__":
    app.run(port=8080)

图示输出:(不深刻说,后面爬虫会再提的) 6.test.gif

课后拓展:

RESTful API 设计指南
http://www.ruanyifeng.com/blog/2014/05/restful_api.html

RESTful API 最佳实践
http://www.ruanyifeng.com/blog/2018/10/restful-api-best-practices.html

异步 API 的设计
http://www.ruanyifeng.com/blog/2018/12/async-api-design.html

使用python的Flask实现一个RESTful API服务器端[翻译]
https://www.cnblogs.com/vovlie/p/4178077.html