AberSheeran
Aber Sheeran

asyncio 与 kafka

起笔自
所属文集: Python-Package
共计 3202 个字符
落笔于

aiokafka

Python 开发里使用的异步 kafka 客户端,大多是 aiokafka,因为它是目前 Python 社区里唯一一个直接支持 asyncio 的 kafka 客户端库。

但这个库有个大问题,也就是 [Producer] Performance drop when 'send' is called from multiple Futures #528 这个 issue 里提到的,当多个协程(一般是 Web 服务的接口里)并发的推送数据时,CPU 占用率会变得异常的高。由于需要快速恢复业务,所以我当时很快写好一段代码去打补丁。在接口里仅调用 push_message,通过一个内存态的队列把消息按照顺序发送出去。

import asyncio

from loguru import logger
from indexpy import HTTPException

from .mq import get_kafka_client

push_queue: asyncio.Queue[
    tuple[asyncio.Event, tuple[str, bytes, bytes]]
] = asyncio.Queue(1000)


async def _real_push():
    while not push_queue.empty():
        event, (topic, value, key) = await push_queue.get()
        try:
            await get_kafka_client().send_and_wait(topic=topic, value=value, key=key)
        except Exception as e:
            logger.error(f"Failed to push message to Kafka: {e}")
        finally:
            event.set()


worker: asyncio.Task | None = None


async def push_message(topic: str, value: bytes, key: bytes) -> None:
    """
    Push a message to a Kafka topic.
    """
    event = asyncio.Event()
    await push_queue.put((event, (topic, value, key)))
    global worker
    if worker is None:
        worker = asyncio.create_task(_real_push())
    if worker.done():
        worker = asyncio.create_task(_real_push())
    await event.wait()

confluent-kafka-python

过了很久之后我终于有时间寻找别的解决方案。confluent-kafka-python 这个 librdkafka 的 Python 包装几乎是唯一选择了。因为看起来还算可靠的 Python kafka 客户端一共就三个,纯 Python 实现的同步客户端 kafka、纯 Python 实现的异步客户端 aiokafka 以及这个库。

由于 librdkafka 是基于后台线程的回调设计,对 Python、Rust 这种拥有无栈协程的语言来说很容易就接入协程代码中。一个拥有线程保活能力的 asyncio 生产者代码如下。由于在这之前我就将性能瓶颈服务使用 Rust 重写,剩下的代码其实对并发要求并不高所以并没有做性能测试,但我所使用的 rdkafka 也是对 librdkafka 的封装,和如下代码的性能差异应当不会太大。

class AIOProducer:
    def __init__(self, bootstrap_servers: str, loop=None):
        self._loop = loop or asyncio.get_running_loop()
        self._producer = confluent_kafka.Producer({
            "bootstrap.servers": bootstrap_servers,
        })
        self._cancelled = False
        self._count = 0
        self._init_poll_thread()

    def _init_poll_thread(self):
        if hasattr(self, "_poll_thread") and self._poll_thread.is_alive():
            return
        self._poll_thread = Thread(target=self._poll_loop, daemon=True)
        self._poll_thread.start()

    def _poll_loop(self):
        while not self._cancelled:
            self._producer.poll(0.1)

    def close(self):
        self._cancelled = True
        self._poll_thread.join()
        self._producer.flush()

    async def produce(self, topic: str, value: bytes | str, key: bytes | str) -> str:
        result = self._loop.create_future()

        def ack(err, msg):
            if err:
                self._loop.call_soon_threadsafe(
                    result.set_exception, confluent_kafka.KafkaException(err)
                )
            else:
                self._loop.call_soon_threadsafe(result.set_result, msg)

        self._count += 1
        self._producer.produce(topic, value, key, on_delivery=ack)
        if self._count >= 256:
            self._count = 0
            self._init_poll_thread()
        return await result
如果你觉得本文值得,不妨赏杯茶
PDM 使用精要
Python asyncio