个人总结
command.py
file.py
time.py
excel.py
with 的原理和用法
subprocess 模块
cline 提示词
PathLike
pathlib
peewee
生成随机ID
FastApi 使用 peewee
http connect
Dataframe Protocol
pyarrow
overload 函数重载
1111
peewee 线程安全
Python 日志模块
rabbitmq 问答
多进程 daemon 参数
记录异常
dict.py
多进程
忽略异常
timer.py
Ctrl+C 退出while
发布pip包
Bolo组件
迭代器与生成器
dataclass
单例模式(二)
单例模式(一)
errors.py
enum.py
每日学习
本文档使用 MrDoc 发布
-
+
首页
rabbitmq 问答
### 1. rabbitmq 的connection和channel 是什么关系? 让我为您解释RabbitMQ中Connection和Channel的关系: - Connection(连接) - 是客户端和RabbitMQ服务器之间的物理TCP连接; - 建立和维护TCP连接需要较大的开销; - 一个应用程序通常只需要一个Connection; - 代表了应用程序与RabbitMQ服务器的网络连接; - Channel(信道) - 是在Connection上创建的虚拟连接; - 一个Connection可以包含多个Channel; - Channel是真正执行消息发送和接收的通道; - 使用Channel的好处 - 避免频繁创建TCP连接的开销; - 实现多线程/进程下的并发操作; - 每个Channel都是独立的,互不影响; - channel的特点: - **[轻量级]:** - Channel是Connection内的虚拟连接 - 创建和销毁的开销很小 - 不会创建新的进程或线程 - **[多路复用]:** - 多个Channel共享同一个TCP连接 - 通过Channel ID来区分不同的通道 - 类似于在一个TCP连接上建立多个逻辑通道 > Channel 不是一个独立的进程,而是在同一个Connection内的逻辑通道(logical channel); > Channel 不是线程安全的,也不是协程安全的,每个线程/协程应该有独立的Channel; 关系如图所示: ``` Application │ │ Connection (TCP连接) │ ├── Channel 1 ├── Channel 2 ├── Channel 3 └── Channel N ``` ### 2. channel 和 queue 是什么关系? 让我解释Queue和Channel的关系: Queue(队列): - Queue是消息的实际存储位置; - 位于RabbitMQ服务器端; - 可以被多个Channel访问; - 相当于消息的"邮箱"; channel 和 queue的关系: - Queue是独立于Channel存在的; - Channel是操作Queue的工具; - 同一个Queue可以被多个Channel访问; 简单来说: - Queue就像是邮箱; - Channel就像是邮递员; - 一个邮递员(Channel)可以投递到多个邮箱(Queue); - 一个邮箱(Queue)也可以被多个邮递员(Channel)访问; ### 3. 从queue读取数据有哪几种方式? 从RabbitMQ的Queue读取数据主要有以下几种方式: 1. Basic Get (Pull模式) ``` async def pull_message(self): """单次拉取消息""" async with self.channel_pool as channel: # no_ack=False 表示需要手动确认 message = await self.queue.get(no_ack=False) if message: async with message.process(): # 自动确认 return message.body.decode() return None ``` 特点: - 主动拉取消息 - 适合低频率消费 - 可能造成空轮询 - 不推荐用于高并发场景 2. Basic Consume (Push模式) ``` async def consume_with_callback(self): """使用回调函数消费消息""" async with self.channel_pool as channel: queue = await channel.declare_queue(self.queue_name) async def process_message(message): async with message.process(): data = message.body.decode() print(f"Received: {data}") # 注册消费者回调 await queue.consume(process_message) # 保持运行 try: await asyncio.Future() # 永久等待 except asyncio.CancelledError: pass ``` 特点: - 服务器主动推送消息 - 高效率,低延迟 - 适合高并发场景 - 自动处理负载均衡 3. Iterator方式 (异步迭代器) ``` async def consume_with_iterator(self): """使用异步迭代器消费消息""" async with self.channel_pool as channel: queue = await channel.declare_queue(self.queue_name) async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): data = message.body.decode() yield data ``` 特点: - 结合了Push模式的效率 - 代码更清晰易读 - 可以方便地进行流程控制 - 适合异步处理场景 async with message.process() 是一个非常重要的上下文管理器,它主要有以下作用: - 自动消息确认(Auto Acknowledgment) ``` # 自动确认示例 async def consume_message(self): async with self.channel_pool as channel: message = await self.queue.get(no_ack=False) # 需要手动确认 if message: async with message.process(): # 代码块正常执行完成后自动确认 return message.body.decode() ``` - 异常处理和自动拒绝(Auto Rejection) ``` async def consume_with_error_handling(self): message = await self.queue.get(no_ack=False) if message: async with message.process(): # 如果这里抛出异常 # message.process() 会自动拒绝这条消息 # 消息会被返回到队列或死信队列 result = process_message(message.body.decode()) ``` ### 总结 代码示例: ```python class ChannelPool: def __init__(self, connection, max_size=10): self.connection: aio_pika.abc.AbstractConnection = connection self.channels = asyncio.Queue(max_size) async def get_channel(self) -> aio_pika.abc.AbstractChannel: try: # 尝试从池中获取channel return await self.channels.get_nowait() except asyncio.QueueEmpty: # 如果池空,创建新的channel return await self.connection.channel() async def release_channel(self, channel: aio_pika.abc.AbstractChannel): # 将用完的channel放回池中 await self.channels.put(channel) async def close(self): for channel in self.channels: await channel.close() await self.connection.close() async def __aenter__(self): return await self.get_channel() async def __aexit__(self, exc_type, exc_value, traceback): await self.release_channel(self.channel) class Consumer(object): def __init__(self, channel_pool: ChannelPool, queue_name: str): self.channel_pool: ChannelPool = channel_pool self.queue_name: str = queue_name async def consume_message(self): # 由于channel不是协程安全的,多个协程不能共享同一个channel # 所以每次消费需要获取独立的channel,并重新声明队列 async with self.channel_pool as channel: queue = await channel.declare_queue(self.queue_name, durable=True) message = await queue.get(no_ack=False) if message: async with message.process(): return message.body.decode() class AsyncRabbitMQClient: def __init__(self, rabbitmq_url, vhost='/'): self.rabbitmq_url: str = rabbitmq_url if not self.rabbitmq_url.endswith("/"): self.rabbitmq_url = f"{self.rabbitmq_url}/" self.queue_name = DATA_FLOW_QUEUE_NAME self.queue_name2 = LLM_PROXY_QUEUE_NAME self.vhost = vhost.replace("/", "%2F") self.connection: aio_pika.abc.AbstractConnection = None self.channel_pool: ChannelPool = None self.queue: aio_pika.abc.AbstractQueue = None self.queue2: aio_pika.abc.AbstractQueue = None async def setup(self): """Initialize the connection, channel, and queue.""" if self.vhost.startswith('/'): self.vhost = self.vhost.replace("/", "%2F") url_with_vhost = f"{self.rabbitmq_url}{self.vhost}" print(url_with_vhost) self.connection = await aio_pika.connect_robust(url_with_vhost) self.channel_pool = ChannelPool(self.connection) # async with self.channel_pool as channel: # self.queue = await channel.declare_queue(self.queue_name, # durable=True) # self.queue2 = await channel.declare_queue(self.queue_name2, # durable=True) async def consume_message(self): """Consume a single message and return its content, or None if the queue is empty.""" # try: # incoming_message = await self.queue.get(no_ack=False) # if incoming_message: # async with incoming_message.process(): # message_content = incoming_message.body.decode() # return message_content # except aio_pika.exceptions.QueueEmpty: # return None consumer = Consumer(self.channel_pool, self.queue_name) return await consumer.consume_message() async def consume_llm_proxy(self): """Consume a single message and return its content, or None if the queue is empty.""" consumer = Consumer(self.channel_pool, self.queue_name2) return await consumer.consume_message() async def send_message(self, message: str): """Send a message to the RabbitMQ queue.""" try: async with self.channel_pool as channel: await channel.default_exchange.publish( aio_pika.Message( body=message.encode('utf-8'), delivery_mode=aio_pika.DeliveryMode.PERSISTENT), routing_key=self.queue.name, ) print(f"Sent message: {message}") except Exception as e: print(f"Failed to send message: {e}") raise async def send_llm_proxy(self, message: str): """Send a message to the RabbitMQ queue.""" try: async with self.channel_pool as channel: await channel.default_exchange.publish( aio_pika.Message( body=message.encode('utf-8'), delivery_mode=aio_pika.DeliveryMode.PERSISTENT), routing_key=self.queue2.name, ) print(f"Sent message: {message}") except Exception as e: print(f"Failed to send message: {e}") raise async def close(self): """Close the connection and channel.""" # if self.channel: # await self.channel.close() # if self.connection: # await self.connection.close() await self.channel_pool.close() ```
gaojian
2024年12月31日 11:17
分享文档
收藏文档
上一篇
下一篇
微信扫一扫
复制链接
手机扫一扫进行分享
复制链接
关于 MrDoc
觅思文档MrDoc
是
州的先生
开发并开源的在线文档系统,其适合作为个人和小型团队的云笔记、文档和知识库管理工具。
如果觅思文档给你或你的团队带来了帮助,欢迎对作者进行一些打赏捐助,这将有力支持作者持续投入精力更新和维护觅思文档,感谢你的捐助!
>>>捐助鸣谢列表
微信
支付宝
QQ
PayPal
Markdown文件
分享
链接
类型
密码
更新密码