以简单的echo server举例说明三种不同方式IO的原理和影响,示例代码仅做演示说明之用,有些异常处理和elegant quit都未实现,错误之处请批评指正

几种IO方式

简单的循环等待

#!/usr/bin/env python3
# coding=utf-8

import socket

SOCKET_FAMILY = socket.AF_INET
SOCKET_TYPE = socket.SOCK_STREAM

sockServer = socket.socket(SOCKET_FAMILY, SOCKET_TYPE)
sockServer.bind(('0.0.0.0', 8888))
sockServer.listen()

while True:
    conn, address = sockServer.accept()
    print('connected from', conn.getpeername())
    while True:
        data = conn.recv(1024)
        if data:
            print('data recv:', data.decode())
            conn.sendall("echo: ".encode() + data)
        else:
            conn.close()
            print('disconnected from', conn.getpeername())
            break

整体逻辑在一个死循环中等待客户端链接,accept到连接之后,直接进入数据接收/处理循环处理数据,完成之后关闭连接,进入下一个循环的accept。

这种写法逻辑简明,但是问题是,在进入数据处理循环之后,服务端不再处理新的连接,客户端的新连接被挂起,必须等到前一个数据处理循环出来之后才能继续

select

#!/usr/bin/env python3
# coding=utf-8

import socket
import select
import queue

server = socket.socket()
server.setblocking(False)
server.bind(('0.0.0.0', 8888))
server.listen()

message_queue = {}
rlist = {server}
wlist = set()

while True:
    readable, writable, exceptional = select.select(rlist, wlist, rlist)
    for rsocket in readable:
        if rsocket is server:
            conn, addr = rsocket.accept()
            print('connected from', conn.getpeername())
            rlist.add(conn)
            message_queue[conn] = queue.Queue()
        else:
            data = rsocket.recv(1024)
            if data:
                data_str = data.decode()
                print('data recv from %s: %s' % (rsocket.getpeername(), data_str))
                message_queue[rsocket].put(data_str)
                if rsocket not in wlist:
                    wlist.add(rsocket)
            else:
                print('disconnected from', rsocket.getpeername())
                rlist.remove(rsocket)
                rsocket.close()
                del message_queue[rsocket]

    for wsocket in writable:
        if not message_queue[wsocket].empty():
            data = message_queue[wsocket].get()
            print('send %s to %s' % (data, wsocket.getpeername()))
            wsocket.sendall(('echo: ' + data).encode())
        else:
            print('empty message queue:', wsocket.getpeername())
            wlist.remove(wsocket)

select接受三个文件描述符列表(读+写+异常),select会等待这三个列表中的至少一个描述符就绪(或者等待超时),然后返回三个对应的已就绪描述符列表。底层的c实现只接受描述符,python的wrapper中支持描述符或者实现了返回描述符的fileno()方法的对象。

select解决了面对大量连接时,空耗CPU去轮询连接是否就绪的时间,kernel会告诉你哪些连接已经就绪,可以读写数据了。

select的劣势是,每次调用select等待描述符就绪时,kernel都要遍历传进去的所有描述符以检查他们是否就绪,在描述符数量很多的时候,遍历就会非常耗时。并且描述符列表需要从user space复制到kernel space,返回的时候需要从kernel space再次复制到user space。

跟select类似的还有个poll调用,poll本质上和select一样,只是参数封装形式不同。python的poll wrapper已经封装得跟原始调用不一样了,这里不再举例

epoll

#!/usr/bin/env python3
# coding=utf-8

import socket
import select
import queue

server = socket.socket()
server.setblocking(False)
server.bind(('0.0.0.0', 8888))
server.listen()

epoll = select.epoll()
epoll.register(server.fileno(), select.EPOLLIN)

message_queue = {}
connections = {}
while True:
    events = epoll.poll()
    for fileno, event in events:
        if fileno == server.fileno():
            conn, address = server.accept()
            print('connected from', conn.getpeername())
            conn.setblocking(False)
            epoll.register(conn.fileno(), select.EPOLLIN)
            connections[conn.fileno()] = conn
            message_queue[conn.fileno()] = queue.Queue()
        elif event & select.EPOLLIN:
            conn = connections[fileno]
            data = conn.recv(1024)
            if data:
                data_str = data.decode()
                print('data recv from %s: %s' % (conn.getpeername(), data_str))
                message_queue[fileno].put(data_str)
                epoll.modify(fileno, select.EPOLLIN | select.EPOLLOUT)
            else:
                print('disconnected from', conn.getpeername())
                conn.close()
                epoll.modify(fileno, 0)
                del message_queue[fileno]
                del connections[fileno]
        elif event & select.EPOLLOUT:
            if not message_queue[fileno].empty():
                data = message_queue[fileno].get()
                conn = connections[fileno]
                print('send %s to %s' % (data, conn.getpeername()))
                conn.sendall(data.encode())
            else:
                epoll.modify(fileno, select.EPOLLIN)
        elif event & select.EPOLLHUP:
            conn = connections[fileno]
            print('disconnected from', conn.getpeername())
            epoll.unregister(fileno)
            del message_queue[fileno]
            del connections[fileno]

造成select劣势的原因是

  1. kernel不去记哪些描述符是进程感兴趣的,导致每次select等待时都得把那一坨描述符复制进kernel,返回时再复制出来
  2. kernel不主动通知就绪事件,而是需要select去遍历

epoll就解决了上面那的问题。epoll_create创建的时候会初始化一颗红黑树用于记录描述符以及他们感兴趣的事件,同时提供了epoll_ctl对这颗红黑树的增删改查(即改变描述符,或者他们感兴趣的事件),以及epoll_wait等待事件发生。

epoll_create不仅初始化了红黑树,还初始化了一个就绪列表,并向kernel(网卡驱动?)注册了事件触发callback,在callback中做插入就绪列表的逻辑,这样在epoll_wait返回的时候就不需要遍历整个红黑树,而是去取这个就绪列表就能知道哪些描述符已经就绪,提高了效率