技术支持

了解最新技术动态及行业资讯

当前位置:首页>技术支持
全部 135

Python中的网络编程与并发处理

时间:2025-06-03   访问量:1029


Python中的网络编程与并发处理

在处理网络高并发时, 提供了多种方法和工具,可以帮助你有效地管理大量并发连接和请求。

相关的示例代码如下:

1,多线程模式


# 多线程示例(ThreadPoolExecutor)

# 从concurrent.futures模块中导入ThreadPoolExecutor类。

# ThreadPoolExecutor是  标准库中用于管理线程池的工具,它可以帮助我们方便地实现多线程并发执行任务。

from concurrent.futures import ThreadPoolExecutor

# 导入requests库,这是一个常用的  HTTP 库,用于发送各种 HTTP 请求,如 GET、POST 等。

import requests



# 定义了一个名为fetch的函数,该函数接受一个 URL 作为参数。

def fetch(url):

    # 使用requests库发送一个 HTTP GET 请求到指定的 URL,获取 HTTP 响应的状态码,将状态码作为函数的返回值。

    return requests.get(url).status_code



# 使用列表乘法的方式来快速生成多个相同的元素。

urls = ["https://example.com"] * 10


# ThreadPoolExecutor(max_workers=5):创建一个线程池,max_workers=5表示线程池最多同时运行 5 个线程。

# 使用with语句可以确保在线程池使用完毕后自动关闭和清理资源。

with ThreadPoolExecutor(max_workers=5) as executor:

    # executor.map(fetch, urls):调用线程池的map方法,

    # map方法会将urls列表中的每个 URL 依次传递给fetch函数,并并发地执行这些任务。

    # map方法会返回一个迭代器results,迭代器中的元素是每个任务的返回值(即 HTTP 响应的状态码)。

    results = executor.map(fetch, urls)

# 将迭代器results转换为列表,这样可以一次性获取所有任务的返回值。

print(list(results))

上面的代码输出10个返回值:[200, 200, 200, 200, 200, 200, 200, 200, 200, 200]



import socket

import threading



def handle_client(client_socket):

    try:

        while True:

            data = client_socket.recv(1024)

            if not data:

                break

            client_socket.sendall(data)

    except Exception as e:

        print(f"Error handling client: {e}")  

        # 如果客户端关闭,上面的异常处理输出则有如下输出:

        #Error handling client: [WinError 10054] 远程主机强迫关闭了一个现有的连接。

    finally:

        client_socket.close()



def main():

    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    server_socket.bind(('localhost', 8888))

    server_socket.listen(5)

    print("Server is listening on port 8888...")


    while True:

        client_socket, client_address = server_socket.accept()

        print(f"Accepted connection from {client_address}")

        client_thread = threading.Thread(target=handle_client, args=(client_socket,))

        print(f"thread {client_thread} start...")

        client_thread.start()

        client_socket.send(b"Hello from server!")



if __name__ == "__main__":

    main()

上面的代码作为服务端监听连接,下面的代码作为客户端执行连接。


import socket


# 创建客户端套接字

# socket.AF_INET:指定地址族为IPv4(Internet地址)。这是最常用的地址族,用于IPv4网络通信

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接到服务器

# 127.0.0.1:这是一个特殊的IP地址,称为回环地址(loopback address),用于指代本机。

# 当客户端和服务器运行在同一台机器上时,可以使用这个地址进行连接。

# 8888:这是服务器监听的端口号。客户端需要连接到这个端口才能与服务器进行通信。

try:

    client_socket.connect(('127.0.0.1', 8888))  # 连接服务器

    print("Connected to the server.")


    try:

        data = client_socket.recv(1024)  # 接收数据

        if data:

            print(f"Server says: {data.decode()}")

        else:

            print("No data received from the server.")

        client_socket.send(b"Hello from client!")  # 发送数据

    except OSError as e:

        print(f"OSError:{e}")


except ConnectionRefusedError as e:

    print(f"can not connect:{e}")


finally:

    client_socket.close()

上面的代码运行服务端和客户端后会有如下输出:


vbnet



服务端:

Server is listening on port 8888...

Accepted connection from ('127.0.0.1', 57775)

thread <Thread(Thread-1 (handle_client), initial)> start...

Error handling client: [WinError 10054] 远程主机强迫关闭了一个现有的连接。

Accepted connection from ('127.0.0.1', 57788)

thread <Thread(Thread-2 (handle_client), initial)> start...

Error handling client: [WinError 10053] 你的主机中的软件中止了一个已建立的连接。

Accepted connection from ('127.0.0.1', 57791)

thread <Thread(Thread-3 (handle_client), initial)> start...

Error handling client: [WinError 10054] 远程主机强迫关闭了一个现有的连接。

Accepted connection from ('127.0.0.1', 57799)

thread <Thread(Thread-4 (handle_client), initial)> start...

Error handling client: [WinError 10054] 远程主机强迫关闭了一个现有的连接。

客户端:

Connected to the server.

Server says: Hello from server!

如果服务端未打开,则执行客户端会有如下输出:


arduino


can not connect:[WinError 10061] 由于目标计算机积极拒绝,无法连接。

2,多进程模式



# 使用 multiprocessing 模块

# multiprocessing 模块允许你创建多个进程来并行处理任务。

# 与线程相比,进程在中通常是独立的内存空间,因此不受GIL(全局解释器锁)的限制,但在进程间通信(IPC)上开销较大。

import socket

# multiprocessing 模块:用于实现多进程编程,允许服务器同时处理多个客户端连接。

import multiprocessing



def handle_client(client_socket):

    try:

        request = client_socket.recv(1024).decode('utf-8')

        print(f"Received: {request}")

        response = "Hello from server!"

        client_socket.send(response.encode('utf-8'))

    except ConnectionResetError as e:

        print(f"ConnectionResetError...{e}")

        # 如果客户端关闭,则上面的代码输出:ConnectionResetError...[WinError 10054] 远程主机强迫关闭了一个现有的连接。

    finally:

        client_socket.close()



def start_server(host='127.0.0.1', port=8888):

    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    server.bind((host, port))

    server.listen(5)

    print(f"Server started at {host}:{port}")


    while True:

        # 接受客户端连接,返回一个新的客户端套接字对象和客户端的地址。

        client_socket, addr = server.accept()

        print(f"Accepted connection from {addr}")

        # 创建一个新的进程,将 handle_client 函数作为目标函数,并将客户端套接字作为参数传递给该函数。

        p = multiprocessing.Process(target=handle_client, args=(client_socket,))

        print(f"process {p} start...")

        p.start()



# 确保代码作为脚本直接运行时才会启动服务器,避免在作为模块导入时意外启动服务器。

if __name__ == "__main__":

    start_server()

3,异步I/O模式


# 异步 I/O 模式(asyncio)

# 适用场景:超高并发 I/O 任务,单线程事件循环。

# 异步 HTTP 客户端(aiohttp)

# aiohttp:是一个基于 asyncio 的异步 HTTP 客户端 / 服务器库,用于实现异步的 HTTP 请求和响应处理。

import aiohttp

# asyncio:是  标准库中用于编写异步代码的库,提供了事件循环、协程等异步编程的基础工具。

import asyncio



# async def:定义了一个异步函数(协程)fetch1.

# 它接受两个参数:session 是 aiohttp.ClientSession 对象,用于管理 HTTP 会话;url 是要请求的目标 URL。

async def fetch1(session, url):

    # 使用 session.get(url) 发起一个异步的 HTTP GET 请求,并使用 async with 语句来管理响应对象的生命周期。

    # session.get(url) 会返回一个 aiohttp.ClientResponse 对象,表示 HTTP 响应。

    async with session.get(url) as response:

        return response.status



# 对上面的fetch1代码进行优化如下:

async def fetch(session, url):

    try:

        # 设置超时时间为 10 秒

        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:

            # 检查响应状态码,如果不是 200 - 299 范围,抛出异常

            response.raise_for_status()

            return response.status

    except aiohttp.ClientError as e:

        # 处理 aiohttp 客户端相关错误,如网络错误、连接失败等

        print(f"Client error occurred while fetching {url}: {e}")

        return None

    except asyncio.TimeoutError:

        # 处理请求超时错误

        print(f"Request to {url} timed out.")

        return None

    except Exception as e:

        # 处理其他未知异常

        print(f"An unexpected error occurred while fetching {url}: {e}")

        return None



async def main():

    # async with aiohttp.ClientSession() as session:创建一个 aiohttp.ClientSession 对象,用于管理 HTTP 会话。

    # 使用 async with 语句可以确保会话在使用完毕后自动关闭。

    async with aiohttp.ClientSession() as session:

        # 使用列表推导式创建一个包含 10 个 fetch 协程对象的列表 tasks,每个协程都会发起一个对 https://example.com 的请求。

        tasks = [fetch(session, "https://example.com") for _ in range(10)]

        # 使用 asyncio.gather 函数并发地运行所有的协程,并等待它们全部完成。

        # *tasks 是将 tasks 列表解包为多个独立的参数传递给 asyncio.gather 函数。

        # asyncio.gather 会返回一个包含所有协程返回值的列表,存储在 results 变量中。

        results = await asyncio.gather(*tasks)

        print(results)



# 运行主异步函数

# asyncio.run 是  3.7 及以上版本引入的函数,用于运行顶级的异步函数。

# 它会创建一个事件循环,运行 main 协程,并在协程完成后关闭事件循环。

asyncio.run(main())

"""

上面的代码使用  的 aiohttp 库和 asyncio 模块实现了异步的 HTTP 请求。

它会同时发起 10 个对 https://example.com 的 HTTP GET 请求,并获取每个请求的响应状态码,最后将这些状态码打印输出

其优点是:

异步性能:使用异步编程可以在等待 HTTP 请求响应的过程中处理其他任务,避免了线程或进程的创建和切换开销,提高了程序的并发性能和资源利用率。

代码简洁:aiohttp 和 asyncio 提供了简洁的 API,使得异步 HTTP 请求的代码编写变得简单易懂。

"""

上面代码的输出是:[200, 200, 200, 200, 200, 200, 200, 200, 200, 200]


使用 asyncio 模块


# 服务端代码示例

# 使用 asyncio 模块

# asyncio 是 3.4引入的一个用于编写单线程并发代码的库。它使用协程来避免线程切换的开销,并且非常适合I/O密集型任务。

# asyncio 是  标准库中用于编写异步代码的库,提供了事件循环、协程等异步编程的基础工具,用于实现异步的网络通信。

import asyncio



# async def:定义了一个异步函数(协程)handle_client,它接受两个参数:reader 和 writer。

# reader 是 asyncio.StreamReader 对象,用于从客户端读取数据;writer 是 asyncio.StreamWriter 对象,用于向客户端发送数据。

async def handle_client(reader, writer):

    print("call handle_client...")

    # 使用 reader.read(100) 异步地从客户端读取最多 100 字节的数据。await 关键字用于暂停协程的执行,直到读取操作完成。

    data = await reader.read(100)

    # 将接收到的字节数据解码为字符串。

    message = data.decode()

    # 打印接收到的客户端消息。

    print(f"Received: {message}")

    # 使用 writer.write 方法将响应消息 "Hello from server!" 编码为字节流并发送给客户端。

    writer.write(b"Hello from server!")

    # await writer.drain():await 关键字用于等待所有数据都被实际发送到客户端。

    # writer.drain() 会暂停协程的执行,直到所有数据都被发送出去。

    await writer.drain()

    # 关闭与客户端的连接

    writer.close()



async def start_server():

    # 使用 asyncio.start_server 函数创建一个异步 TCP 服务器。handle_client 是处理客户端连接的协程函数.

    # await 关键字用于等待服务器启动完成。

    server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)

    print(f"Server started at 127.0.0.1:8888")

    # 使用 async with 语句管理服务器对象的生命周期,确保在服务器不再使用时正确关闭。

    async with server:

        # 调用 server.serve_forever() 方法,使服务器开始永久监听客户端连接。await 关键字用于暂停协程的执行,直到服务器停止。

        await server.serve_forever()



if __name__ == "__main__":

    asyncio.run(start_server())

上面是服务端代码,下面看客户端代码:


import socket


# 创建客户端套接字

# socket.AF_INET:指定地址族为IPv4(Internet地址)。这是最常用的地址族,用于IPv4网络通信

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接到服务器

# 127.0.0.1:这是一个特殊的IP地址,称为回环地址(loopback address),用于指代本机。

# 当客户端和服务器运行在同一台机器上时,可以使用这个地址进行连接。

# 8888:这是服务器监听的端口号。客户端需要连接到这个端口才能与服务器进行通信。

try:

    client_socket.connect(('127.0.0.1', 8888))  # 连接服务器

    print("Connected to the server.")

    client_socket.send(b"Hello !!!")  # 发送数据


    try:

        data = client_socket.recv(1024)  # 接收数据

        if data:

            print(f"Server says: {data.decode()}")

        else:

            print("No data received from the server.")

        client_socket.send(b"Hello from client!")  # 发送数据

    except OSError as e:

        print(f"OSError:{e}")


except ConnectionRefusedError as e:

    print(f"can not connect:{e}")


finally:

    client_socket.close()


运行服务端和客户端,会有如下输出:


vbscript




Server started at 127.0.0.1:8888

call handle_client...

Received: Hello !!!


Connected to the server.

Server says: Hello from server!

使用aiohttp库


"""

下面的代码使用 aiohttp 库构建了一个简单的异步 Web 服务器。

当客户端向服务器的根路径(/)发送 HTTP GET 请求时,服务器会返回一个包含 "Hello from server!" 文本的响应。

"""

# 使用 aiohttp 库

# aiohttp 是一个基于 asyncio 的HTTP客户端/服务器库,非常适合处理高并发的HTTP请求。

# aiohttp 是一个基于 asyncio 的异步 HTTP 客户端 / 服务器库,web 模块提供了构建 Web 服务器所需的功能,如处理请求、路由管理等。

from aiohttp import web



# async def:定义了一个异步函数(协程)handle,用于处理客户端的 HTTP 请求。

# 它接受一个 request 参数,该参数是 aiohttp.web.Request 对象,包含了客户端请求的相关信息,如请求方法、请求头、请求参数等。

async def handle(request):

    text = "Hello from server!"

    # 创建一个 aiohttp.web.Response 对象,并将响应文本作为参数传入,最后返回该响应对象。

    return web.Response(text=text)



async def init_app():

    # app = web.Application():创建一个 aiohttp.web.Application 对象,

    # 它是 aiohttp 中 Web 应用的核心,用于管理路由、中间件等。

    app = web.Application()

    # 使用 app.router.add_get 方法将 handle 函数注册为根路径(/)的 HTTP GET 请求处理函数。

    # 也就是说,当客户端向服务器的根路径发送 GET 请求时,会调用 handle 函数来处理该请求。

    app.router.add_get('/', handle)

    # 返回初始化好的 web.Application 对象。

    return app


# web.run_app():aiohttp 提供的用于启动 Web 服务器的函数,它会自动创建事件循环并运行 Web 应用。

# 将 init_app() 返回的 web.Application 对象作为参数传入,即可启动服务器。

# 使用 web.run_app(init_app()) 启动服务器,没有为 web.run_app 函数指定 host 和 port 参数,

# 所以服务器会使用默认的主机地址 0.0.0.0 和默认端口 8080 进行监听,最终输出 http://0.0.0.0:8080 。

# web.run_app(init_app())

# 指定监听地址和端口

web.run_app(init_app(), host='127.0.0.1', port=8080)


# 运行上面的代码,输出如下内容:

# ======== Running on http://0.0.0.0:8080 ========

"""

在网络编程里,0.0.0.0 是一个特殊的 IP 地址,它代表 “所有可用的网络接口”。

当服务器监听 0.0.0.0 时,意味着它会接受来自服务器所在主机的任何网络接口的连接请求,

无论这些请求是通过本地回环接口(如 127.0.0.1)、局域网 IP 地址,还是公网 IP 地址发起的。

8080 是 aiohttp 服务器默认使用的监听端口。在 web.run_app 函数中,如果没有显式指定端口号,它就会使用 8080 作为默认端口。


web.run_app 函数用于启动 aiohttp 服务器,其定义如下

web.run_app(app, host=None, port=None, **kwargs)

app:表示要运行的 aiohttp.web.Application 实例。

host:指定服务器监听的主机地址,默认值为 None,此时服务器会监听 0.0.0.0。

port:指定服务器监听的端口号,默认值为 None,此时服务器会使用 8080 作为端口。


对于搭建的服务,在服务器上测试上面使用 aiohttp 构建的 Web 应用。

打开浏览器,在本地服务器测试,也可以使用 http://127.0.0.1:8080 或 http://localhost:8080。

查看响应结果:如果一切正常,浏览器会显示服务器返回的响应内容,在你的代码中应该是 "Hello from server!"。

"""

上面的代码是web服务端,下面看web客户端代码示例:


"""

代码使用 aiohttp 库和 asyncio 模块实现了一个异步的 HTTP 请求客户端。

其主要功能是向指定的 URL(http://127.0.0.1:8080/)发送 HTTP GET 请求,并打印出响应的文本内容。

"""


import aiohttp

import asyncio



async def fetch(session, url):

    async with session.get(url) as response:

        return await response.text()



async def main():

    url = 'http://127.0.0.1:8080/'

    async with aiohttp.ClientSession() as session:

        html = await fetch(session, url)

        print(html)



# loop = asyncio.get_event_loop():获取当前线程的事件循环对象。事件循环是 asyncio 中用于调度和执行异步任务的核心组件。

# loop = asyncio.get_event_loop()

# # loop.run_until_complete(main()):将 main 协程提交给事件循环,并运行事件循环,直到 main 协程完成。

# loop.run_until_complete(main())


"""

运行上面的代码,出现如下输出:

DeprecationWarning: There is no current event loop

  loop = asyncio.get_event_loop()

Hello from server!


警告产生的原因

在  3.10 及之后的版本中,asyncio.get_event_loop() 函数的行为发生了改变。在之前的版本里,如果当前线程没有事件循环,asyncio.get_event_loop() 会创建一个新的事件循环;但从  3.10 开始,该函数不再自动创建事件循环,若当前线程没有事件循环,就会抛出警告。

在你的代码里,使用 asyncio.get_event_loop() 时,由于没有显式创建事件循环,所以触发了这个警告。

解决办法是:使用 asyncio.run()

asyncio.run() 是  3.7 引入的一个高级函数,它会自动创建、管理和关闭事件循环,能避免手动操作事件循环带来的问题。

"""

# 使用 asyncio.run() 来运行异步程序

asyncio.run(main())


上一篇:FreeSwitch服务器配置搭建详解

下一篇:Pi-hole,多平台广告拦截的利器

  • 友情链接:
  • 百度 DeepSeek 百度中英在线翻译 阿里云 腾讯云 西部数码 贝锐花生壳 IP查询 360在线翻译 360搜索 搜狗搜索 宽带测速 MAC厂商查询 mikrotik 容器镜像 Windows/office/2下载 PbootCMS Windows/office/开发工具下载 Windows/office下载 中川网维 bilibili