此前开发网络数据发送和接收功能,一直使用C语言和标准的socket系统库代码,或是在高吞吐量的情况下使用Boost的ASIO库,最近需要测试一些简单的功能,就用Python写了一些基本的测试代码,但是在使用socket的过程中遇到了一些奇怪的问题。在解决问题的过程中,发现现在的搜索引擎的结果已经完全垃圾化了,可能前面10页提供的信息都是烂大街的网页(大家都忙着SEO,根本没有人关心质量,相信以后AI也会跟着傻瓜化)。最后还是通过自己写代码测试和翻阅官方文档解决了这个问题,遂决定写一篇博客来分享心得。

先说一下背景和问题:本地有一对进程A和B,通过Unix Domain Socket进行数据通信,A只负责发送信息,B只负责接收。由于不想要维护状态,使用的是UDP的通信方式,在Python代码里面就只需要简简单单写如下代码就可以发送数据了:

import socket

sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
addr = "/root/domainSocketTest"
data = "domain socket test data"
sock.sendto(data, addr) 

接收侧的代码完全照抄官方,不过如果是Python 3.9以上的版本,官方代码用了with来启动socketserver,而下面的用法可以在3.9以下的Python环境运行。另外注意到接收侧作为服务器,要先手动删除一下指定的Unix Domain Socket文件(在这个例子里面是/root/domainSocketTest)。

import socketserver
import os

class SendNoticeHandler(socketserver.BaseRequestHandler):
	def handle(self):
		data = self.request[0].strip()
		data = data.decode("ascii")
		print(data)
			
if __name__ == "__main__":
	sock = socketserver.UnixDatagramServer(addr, SendNoticeHandler)
    addr = "/root/domainSocketTest"
	sock.serve_forever()

看起来一点问题都没有,但是在实际生产环境里面给它点压力,就出现了奇怪的问题——如果服务器端这边在收到数据后进行一些计算量比较大的运算,似乎发送端那边的性能也会受到影响?可是这完全不应该啊,明明用的UDP的sendto发送方式,随便在网上搜索文档,都会告诉你这个函数是非阻塞的,而且用到了Unix Domain Socket,也不存在什么网络延迟,怎么会出现接收端和发送端“联动”的问题?

对于2024年的技术开发人员,遇到疑难杂症问AI和问搜索引擎,可能花(浪)费的时间比自己去想办法调试一下还要多得多。实际上,对于数据发送端,我们注意到两种特殊情况——服务端代码还未启动Unix Domain Socket文件被占用——会导致代码运行出现异常,分别对应FileNotFoundErrorConnectionRefusedError,这个让人丈二和尚摸不着头脑,怎么你一个UDP数据传输还能感知到服务器端的状态了?

为了验证,我们不妨改写一下服务器端的代码,让它在收到数据后就sleep一秒钟

import socketserver
import os

class SendNoticeHandler(socketserver.BaseRequestHandler):
	def handle(self):
		data = self.request[0].strip()
		data = data.decode("ascii")
		print(data)
		time.sleep(1) # take a rest

if __name__ == "__main__":
    addr = "/root/domainSocketTest"
    os.system("rm " + addr)

    sock = socketserver.UnixDatagramServer(addr, SendNoticeHandler)
	sock.serve_forever()

然后在发送端写一个for循环去发送10000条信息看看?果然被阻塞住了!!!这简直是不科学!!!

翻阅官方文档(参见 https://docs.python.org/3/library/socket.html 可选择不同的Python版本,但是大同小异)会发现,在Python中socket是可以设置Non-blocking mode的,但是文档并没有说清楚socket在UDP而不是TCP的情况下会怎么样。因为我们在初始化的时候已经显式指定了SOCK_DGRAM,应该假设它会自动设置Non-blocking mode,难道代码没有这样处理?

为了深入验证,我们把发送端的代码改写一下,用class的形式来包装一下,并且在初始化的时候设置一下非阻塞模式:

class RegSocket:
	def __init__(self, addr):
		self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
		self.sock.setblocking(False) # set as Non-blocking mode
		self.addr = addr

	def msg(self, data):
		data = data.encode("ASCII")
		try:
			self.sock.sendto(data, self.addr) 
		except (FileNotFoundError):
			print("unix domain addr %s does not exist"%self.addr)
		except (ConnectionRefusedError):
			print("cannot connect to unix domain addr %s"%self.addr)

然后写个测试代码

s = RegSocket("/root/domainSocketTest")
for i in range(10000):
    m = "test for %d\n"%i
    print(m)
	s.msg(m)

结果呢?这个循环运行不到10000次就抛出了BlockingIOError异常,果然是阻塞模式的错。这里实际上应该是系统的Unix Domain Socket缓冲区用完了(因为发送速度远远大于接收速度),而本来我们的网络编程知识储备默认的UDP发送方是不关心这个情况的(发送方只管把数据发出去,至于是网络协议栈的缓冲区满了还是服务器端收不到数据,根本不需要管),而在Python网络编程这里风格有点变化,因此需要程序员手工去处理下。如果需要处理,我们可以把上面那个class定义改一下,增加一个BlockingIOError异常处理,如果遇到阻塞就直接不管了(就是丢包嘛,没什么大不了的):

class RegSocket:
	def __init__(self, addr):
		self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
		self.sock.setblocking(False) # set as Non-blocking mode
		self.addr = addr

	def msg(self, data):
		data = data.encode("ASCII")
		try:
			self.sock.sendto(data, self.addr) 
		except (FileNotFoundError):
			print("unix domain addr %s does not exist"%self.addr)
		except (ConnectionRefusedError):
			print("cannot connect to unix domain addr %s"%self.addr)
		except (BlockingIOError):
			print("send buf is full, throw: %s"%data)

当然如果你还是不希望发送方丢包(那就得等待),那就注释掉init函数里面设置阻塞模式的代码就好了。