Python中的网络编程与并发处理
在处理网络高并发时, 提供了多种方法和工具,可以帮助你有效地管理大量并发连接和请求。
相关的示例代码如下:
1,多线程模式
# 多线程示例(ThreadPoolExecutor)
# 从concurrent.futures模块中导入ThreadPoolExecutor类。
# ThreadPoolExecutor是 标准库中用于管理线程池的工具,它可以帮助我们方便地实现多线程并发执行任务。
from concurrent.futures import ThreadPoolExecutor
# 导入requests库,这是一个常用的 HTTP 库,用于发送各种 HTTP 请求,如 GET、POST 等。
import requests
# 定义了一个名为fetch的函数,该函数接受一个 URL 作为参数。
def fetch(url):
# 使用requests库发送一个 HTTP GET 请求到指定的 URL,获取 HTTP 响应的状态码,将状态码作为函数的返回值。
return requests.get(url).status_code
# 使用列表乘法的方式来快速生成多个相同的元素。
urls = ["https://example.com"] * 10
# ThreadPoolExecutor(max_workers=5):创建一个线程池,max_workers=5表示线程池最多同时运行 5 个线程。
# 使用with语句可以确保在线程池使用完毕后自动关闭和清理资源。
with ThreadPoolExecutor(max_workers=5) as executor:
# executor.map(fetch, urls):调用线程池的map方法,
# map方法会将urls列表中的每个 URL 依次传递给fetch函数,并并发地执行这些任务。
# map方法会返回一个迭代器results,迭代器中的元素是每个任务的返回值(即 HTTP 响应的状态码)。
results = executor.map(fetch, urls)
# 将迭代器results转换为列表,这样可以一次性获取所有任务的返回值。
print(list(results))
上面的代码输出10个返回值:[200, 200, 200, 200, 200, 200, 200, 200, 200, 200]
import socket
import threading
def handle_client(client_socket):
try:
while True:
data = client_socket.recv(1024)
if not data:
break
client_socket.sendall(data)
except Exception as e:
print(f"Error handling client: {e}")
# 如果客户端关闭,上面的异常处理输出则有如下输出:
#Error handling client: [WinError 10054] 远程主机强迫关闭了一个现有的连接。
finally:
client_socket.close()
def main():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 8888))
server_socket.listen(5)
print("Server is listening on port 8888...")
while True:
client_socket, client_address = server_socket.accept()
print(f"Accepted connection from {client_address}")
client_thread = threading.Thread(target=handle_client, args=(client_socket,))
print(f"thread {client_thread} start...")
client_thread.start()
client_socket.send(b"Hello from server!")
if __name__ == "__main__":
main()
上面的代码作为服务端监听连接,下面的代码作为客户端执行连接。
import socket
# 创建客户端套接字
# socket.AF_INET:指定地址族为IPv4(Internet地址)。这是最常用的地址族,用于IPv4网络通信
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接到服务器
# 127.0.0.1:这是一个特殊的IP地址,称为回环地址(loopback address),用于指代本机。
# 当客户端和服务器运行在同一台机器上时,可以使用这个地址进行连接。
# 8888:这是服务器监听的端口号。客户端需要连接到这个端口才能与服务器进行通信。
try:
client_socket.connect(('127.0.0.1', 8888)) # 连接服务器
print("Connected to the server.")
try:
data = client_socket.recv(1024) # 接收数据
if data:
print(f"Server says: {data.decode()}")
else:
print("No data received from the server.")
client_socket.send(b"Hello from client!") # 发送数据
except OSError as e:
print(f"OSError:{e}")
except ConnectionRefusedError as e:
print(f"can not connect:{e}")
finally:
client_socket.close()
上面的代码运行服务端和客户端后会有如下输出:
vbnet
服务端:
Server is listening on port 8888...
Accepted connection from ('127.0.0.1', 57775)
thread <Thread(Thread-1 (handle_client), initial)> start...
Error handling client: [WinError 10054] 远程主机强迫关闭了一个现有的连接。
Accepted connection from ('127.0.0.1', 57788)
thread <Thread(Thread-2 (handle_client), initial)> start...
Error handling client: [WinError 10053] 你的主机中的软件中止了一个已建立的连接。
Accepted connection from ('127.0.0.1', 57791)
thread <Thread(Thread-3 (handle_client), initial)> start...
Error handling client: [WinError 10054] 远程主机强迫关闭了一个现有的连接。
Accepted connection from ('127.0.0.1', 57799)
thread <Thread(Thread-4 (handle_client), initial)> start...
Error handling client: [WinError 10054] 远程主机强迫关闭了一个现有的连接。
客户端:
Connected to the server.
Server says: Hello from server!
如果服务端未打开,则执行客户端会有如下输出:
arduino
can not connect:[WinError 10061] 由于目标计算机积极拒绝,无法连接。
2,多进程模式
# 使用 multiprocessing 模块
# multiprocessing 模块允许你创建多个进程来并行处理任务。
# 与线程相比,进程在中通常是独立的内存空间,因此不受GIL(全局解释器锁)的限制,但在进程间通信(IPC)上开销较大。
import socket
# multiprocessing 模块:用于实现多进程编程,允许服务器同时处理多个客户端连接。
import multiprocessing
def handle_client(client_socket):
try:
request = client_socket.recv(1024).decode('utf-8')
print(f"Received: {request}")
response = "Hello from server!"
client_socket.send(response.encode('utf-8'))
except ConnectionResetError as e:
print(f"ConnectionResetError...{e}")
# 如果客户端关闭,则上面的代码输出:ConnectionResetError...[WinError 10054] 远程主机强迫关闭了一个现有的连接。
finally:
client_socket.close()
def start_server(host='127.0.0.1', port=8888):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host, port))
server.listen(5)
print(f"Server started at {host}:{port}")
while True:
# 接受客户端连接,返回一个新的客户端套接字对象和客户端的地址。
client_socket, addr = server.accept()
print(f"Accepted connection from {addr}")
# 创建一个新的进程,将 handle_client 函数作为目标函数,并将客户端套接字作为参数传递给该函数。
p = multiprocessing.Process(target=handle_client, args=(client_socket,))
print(f"process {p} start...")
p.start()
# 确保代码作为脚本直接运行时才会启动服务器,避免在作为模块导入时意外启动服务器。
if __name__ == "__main__":
start_server()
3,异步I/O模式
# 异步 I/O 模式(asyncio)
# 适用场景:超高并发 I/O 任务,单线程事件循环。
# 异步 HTTP 客户端(aiohttp)
# aiohttp:是一个基于 asyncio 的异步 HTTP 客户端 / 服务器库,用于实现异步的 HTTP 请求和响应处理。
import aiohttp
# asyncio:是 标准库中用于编写异步代码的库,提供了事件循环、协程等异步编程的基础工具。
import asyncio
# async def:定义了一个异步函数(协程)fetch1.
# 它接受两个参数:session 是 aiohttp.ClientSession 对象,用于管理 HTTP 会话;url 是要请求的目标 URL。
async def fetch1(session, url):
# 使用 session.get(url) 发起一个异步的 HTTP GET 请求,并使用 async with 语句来管理响应对象的生命周期。
# session.get(url) 会返回一个 aiohttp.ClientResponse 对象,表示 HTTP 响应。
async with session.get(url) as response:
return response.status
# 对上面的fetch1代码进行优化如下:
async def fetch(session, url):
try:
# 设置超时时间为 10 秒
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
# 检查响应状态码,如果不是 200 - 299 范围,抛出异常
response.raise_for_status()
return response.status
except aiohttp.ClientError as e:
# 处理 aiohttp 客户端相关错误,如网络错误、连接失败等
print(f"Client error occurred while fetching {url}: {e}")
return None
except asyncio.TimeoutError:
# 处理请求超时错误
print(f"Request to {url} timed out.")
return None
except Exception as e:
# 处理其他未知异常
print(f"An unexpected error occurred while fetching {url}: {e}")
return None
async def main():
# async with aiohttp.ClientSession() as session:创建一个 aiohttp.ClientSession 对象,用于管理 HTTP 会话。
# 使用 async with 语句可以确保会话在使用完毕后自动关闭。
async with aiohttp.ClientSession() as session:
# 使用列表推导式创建一个包含 10 个 fetch 协程对象的列表 tasks,每个协程都会发起一个对 https://example.com 的请求。
tasks = [fetch(session, "https://example.com") for _ in range(10)]
# 使用 asyncio.gather 函数并发地运行所有的协程,并等待它们全部完成。
# *tasks 是将 tasks 列表解包为多个独立的参数传递给 asyncio.gather 函数。
# asyncio.gather 会返回一个包含所有协程返回值的列表,存储在 results 变量中。
results = await asyncio.gather(*tasks)
print(results)
# 运行主异步函数
# asyncio.run 是 3.7 及以上版本引入的函数,用于运行顶级的异步函数。
# 它会创建一个事件循环,运行 main 协程,并在协程完成后关闭事件循环。
asyncio.run(main())
"""
上面的代码使用 的 aiohttp 库和 asyncio 模块实现了异步的 HTTP 请求。
它会同时发起 10 个对 https://example.com 的 HTTP GET 请求,并获取每个请求的响应状态码,最后将这些状态码打印输出
其优点是:
异步性能:使用异步编程可以在等待 HTTP 请求响应的过程中处理其他任务,避免了线程或进程的创建和切换开销,提高了程序的并发性能和资源利用率。
代码简洁:aiohttp 和 asyncio 提供了简洁的 API,使得异步 HTTP 请求的代码编写变得简单易懂。
"""
上面代码的输出是:[200, 200, 200, 200, 200, 200, 200, 200, 200, 200]
使用 asyncio 模块
# 服务端代码示例
# 使用 asyncio 模块
# asyncio 是 3.4引入的一个用于编写单线程并发代码的库。它使用协程来避免线程切换的开销,并且非常适合I/O密集型任务。
# asyncio 是 标准库中用于编写异步代码的库,提供了事件循环、协程等异步编程的基础工具,用于实现异步的网络通信。
import asyncio
# async def:定义了一个异步函数(协程)handle_client,它接受两个参数:reader 和 writer。
# reader 是 asyncio.StreamReader 对象,用于从客户端读取数据;writer 是 asyncio.StreamWriter 对象,用于向客户端发送数据。
async def handle_client(reader, writer):
print("call handle_client...")
# 使用 reader.read(100) 异步地从客户端读取最多 100 字节的数据。await 关键字用于暂停协程的执行,直到读取操作完成。
data = await reader.read(100)
# 将接收到的字节数据解码为字符串。
message = data.decode()
# 打印接收到的客户端消息。
print(f"Received: {message}")
# 使用 writer.write 方法将响应消息 "Hello from server!" 编码为字节流并发送给客户端。
writer.write(b"Hello from server!")
# await writer.drain():await 关键字用于等待所有数据都被实际发送到客户端。
# writer.drain() 会暂停协程的执行,直到所有数据都被发送出去。
await writer.drain()
# 关闭与客户端的连接
writer.close()
async def start_server():
# 使用 asyncio.start_server 函数创建一个异步 TCP 服务器。handle_client 是处理客户端连接的协程函数.
# await 关键字用于等待服务器启动完成。
server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
print(f"Server started at 127.0.0.1:8888")
# 使用 async with 语句管理服务器对象的生命周期,确保在服务器不再使用时正确关闭。
async with server:
# 调用 server.serve_forever() 方法,使服务器开始永久监听客户端连接。await 关键字用于暂停协程的执行,直到服务器停止。
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(start_server())
上面是服务端代码,下面看客户端代码:
import socket
# 创建客户端套接字
# socket.AF_INET:指定地址族为IPv4(Internet地址)。这是最常用的地址族,用于IPv4网络通信
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接到服务器
# 127.0.0.1:这是一个特殊的IP地址,称为回环地址(loopback address),用于指代本机。
# 当客户端和服务器运行在同一台机器上时,可以使用这个地址进行连接。
# 8888:这是服务器监听的端口号。客户端需要连接到这个端口才能与服务器进行通信。
try:
client_socket.connect(('127.0.0.1', 8888)) # 连接服务器
print("Connected to the server.")
client_socket.send(b"Hello !!!") # 发送数据
try:
data = client_socket.recv(1024) # 接收数据
if data:
print(f"Server says: {data.decode()}")
else:
print("No data received from the server.")
client_socket.send(b"Hello from client!") # 发送数据
except OSError as e:
print(f"OSError:{e}")
except ConnectionRefusedError as e:
print(f"can not connect:{e}")
finally:
client_socket.close()
运行服务端和客户端,会有如下输出:
vbscript
Server started at 127.0.0.1:8888
call handle_client...
Received: Hello !!!
Connected to the server.
Server says: Hello from server!
使用aiohttp库
"""
下面的代码使用 aiohttp 库构建了一个简单的异步 Web 服务器。
当客户端向服务器的根路径(/)发送 HTTP GET 请求时,服务器会返回一个包含 "Hello from server!" 文本的响应。
"""
# 使用 aiohttp 库
# aiohttp 是一个基于 asyncio 的HTTP客户端/服务器库,非常适合处理高并发的HTTP请求。
# aiohttp 是一个基于 asyncio 的异步 HTTP 客户端 / 服务器库,web 模块提供了构建 Web 服务器所需的功能,如处理请求、路由管理等。
from aiohttp import web
# async def:定义了一个异步函数(协程)handle,用于处理客户端的 HTTP 请求。
# 它接受一个 request 参数,该参数是 aiohttp.web.Request 对象,包含了客户端请求的相关信息,如请求方法、请求头、请求参数等。
async def handle(request):
text = "Hello from server!"
# 创建一个 aiohttp.web.Response 对象,并将响应文本作为参数传入,最后返回该响应对象。
return web.Response(text=text)
async def init_app():
# app = web.Application():创建一个 aiohttp.web.Application 对象,
# 它是 aiohttp 中 Web 应用的核心,用于管理路由、中间件等。
app = web.Application()
# 使用 app.router.add_get 方法将 handle 函数注册为根路径(/)的 HTTP GET 请求处理函数。
# 也就是说,当客户端向服务器的根路径发送 GET 请求时,会调用 handle 函数来处理该请求。
app.router.add_get('/', handle)
# 返回初始化好的 web.Application 对象。
return app
# web.run_app():aiohttp 提供的用于启动 Web 服务器的函数,它会自动创建事件循环并运行 Web 应用。
# 将 init_app() 返回的 web.Application 对象作为参数传入,即可启动服务器。
# 使用 web.run_app(init_app()) 启动服务器,没有为 web.run_app 函数指定 host 和 port 参数,
# 所以服务器会使用默认的主机地址 0.0.0.0 和默认端口 8080 进行监听,最终输出 http://0.0.0.0:8080 。
# web.run_app(init_app())
# 指定监听地址和端口
web.run_app(init_app(), host='127.0.0.1', port=8080)
# 运行上面的代码,输出如下内容:
# ======== Running on http://0.0.0.0:8080 ========
"""
在网络编程里,0.0.0.0 是一个特殊的 IP 地址,它代表 “所有可用的网络接口”。
当服务器监听 0.0.0.0 时,意味着它会接受来自服务器所在主机的任何网络接口的连接请求,
无论这些请求是通过本地回环接口(如 127.0.0.1)、局域网 IP 地址,还是公网 IP 地址发起的。
8080 是 aiohttp 服务器默认使用的监听端口。在 web.run_app 函数中,如果没有显式指定端口号,它就会使用 8080 作为默认端口。
web.run_app 函数用于启动 aiohttp 服务器,其定义如下
web.run_app(app, host=None, port=None, **kwargs)
app:表示要运行的 aiohttp.web.Application 实例。
host:指定服务器监听的主机地址,默认值为 None,此时服务器会监听 0.0.0.0。
port:指定服务器监听的端口号,默认值为 None,此时服务器会使用 8080 作为端口。
对于搭建的服务,在服务器上测试上面使用 aiohttp 构建的 Web 应用。
打开浏览器,在本地服务器测试,也可以使用 http://127.0.0.1:8080 或 http://localhost:8080。
查看响应结果:如果一切正常,浏览器会显示服务器返回的响应内容,在你的代码中应该是 "Hello from server!"。
"""
上面的代码是web服务端,下面看web客户端代码示例:
"""
代码使用 aiohttp 库和 asyncio 模块实现了一个异步的 HTTP 请求客户端。
其主要功能是向指定的 URL(http://127.0.0.1:8080/)发送 HTTP GET 请求,并打印出响应的文本内容。
"""
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
url = 'http://127.0.0.1:8080/'
async with aiohttp.ClientSession() as session:
html = await fetch(session, url)
print(html)
# loop = asyncio.get_event_loop():获取当前线程的事件循环对象。事件循环是 asyncio 中用于调度和执行异步任务的核心组件。
# loop = asyncio.get_event_loop()
# # loop.run_until_complete(main()):将 main 协程提交给事件循环,并运行事件循环,直到 main 协程完成。
# loop.run_until_complete(main())
"""
运行上面的代码,出现如下输出:
DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
Hello from server!
警告产生的原因
在 3.10 及之后的版本中,asyncio.get_event_loop() 函数的行为发生了改变。在之前的版本里,如果当前线程没有事件循环,asyncio.get_event_loop() 会创建一个新的事件循环;但从 3.10 开始,该函数不再自动创建事件循环,若当前线程没有事件循环,就会抛出警告。
在你的代码里,使用 asyncio.get_event_loop() 时,由于没有显式创建事件循环,所以触发了这个警告。
解决办法是:使用 asyncio.run()
asyncio.run() 是 3.7 引入的一个高级函数,它会自动创建、管理和关闭事件循环,能避免手动操作事件循环带来的问题。
"""
# 使用 asyncio.run() 来运行异步程序
asyncio.run(main())