万字长文 - Python 日志记录器logging 百科全书 - 高级配置之 日志异步操作

举报
frica01 发表于 2023/12/31 18:53:02 2023/12/31
【摘要】 文章详细介绍了 Python logging 模块中的日志异步操作,包括多线程、多进程和异步IO三种主要方法,并提供对应代码和对应使用场景。

image.png

万字长文 - Python 日志记录器logging 百科全书 - 高级配置之 日志异步操作

前言

Pythonlogging模块中,它不仅提供了基础的日志功能,还拥有一系列高级配置选项来满足复杂应用的日志管理需求。

说到logging 模块的高级配置,必须提及日志分层logging.config配置日志异步操作等关键功能。它们每一项都为开发者提供了强大的调试和监控环境,对于构建可维护和高效的日志系统至关重要。

在接下来的三篇logging高级配置 文章中,我将为读者朋友们介绍 Pythonlogging 模块中的三个高级配置的具体应用:日志分层logging.config 以及 日志异步操作,探讨它们如何优化日志处理流程,并提升应用的整体性能。

本文将聚焦于 logging 模块中的日志异步操作概念,探讨如何通过多线程多进程异步IO 等操作去提高在处理大量或复杂的日志时的日志记录的整体效率。


念念碎🎊🎊

这篇文章反复修改了很久,从15000字修改成现在。
主要难点在于asyncio实现异步日志这一块,同时包含同步操作和异步操作,而异步操作又包含多个事件循环,是有些难搞的!!

这也属于是 ✨✨日志记录百科全书 的最后一篇新知识啦!
后面可能还会更新两篇不是新知识的关于日志记录的文章,它们大概是:

  • 日志记录的基础概念
  • 日志记录常用模板
    完。

知识点📖📖

模块 释义
logging Python 的日志记录工具,标准库
threading 多线程
ThreadPoolExecutor 线程池
ProcessPoolExecutor 进程池
asyncio 异步编程模块
aiohttp 异步网络请求模块
flask 轻便web框架

一些概念:

概念 解释
并行处理(多线程/多进程) 同一时间,多个线程或进程分别执行不同的任务
异步处理(如 asyncio 同一时间,单个线程或进程轮流执行多个任务,根据任务状态交替进行。

用通俗易懂的语言解释就是:

  • 并行(Parallelism):三个人(三个线程或进程)同时吃三个面包(执行三个任务)。每个人都独立地吃自己的面包,互不干扰;
  • 异步(Asynchronous):一个人(单个线程或进程)轮流切换吃三个面包(执行三个任务)。这个人在吃第一个面包时,如果发现面包太硬需要等待软化,他就会开始吃第二个面包,而不是闲着不做事。在等待的时间里,他可以处理其他任务。

日志异步操作

什么是日志异步操作?

异步日志操作指的是在应用程序的主执行流程之外处理日志记录的过程。这通常通过在后台线程或进程中执行日志记录任务来实现,从而避免阻塞主程序的运行。异步日志操作有助于提高应用程序的性能,特别是在高负载环境中(如 Web 服务器、大数据处理系统和高度并发的应用程序),因为它可以减少由于日志记录而导致的主线程阻塞。

Python中实现异步日志

在Python中,有几种方式可以实现异步日志:

  1. 使用线程:最常见的方法是使用线程(通过threading模块)来处理日志任务。
  2. 使用进程:对于更重的日志任务,可以使用进程(通过multiprocessing模块)来避免线程间的竞争条件。
  3. 使用异步IO:对于支持异步IO的应用,可以使用asyncio模块来实现日志记录。

下面我将分别展示使用多线程多进程异步IO 实现异步日志记录的完整代码示例。

⚠️⚠️注意事项

多线程多进程 是并行的,它们并不是严格意义上的异步任务!只有 asyncio 才可以实现真正的异步任务。

多线程和多进程都是非严格意义上的异步处理!!!

其中日志记录任务从主线程 &主进程 被分离出来,由另一个线程 &进程 并行处理,但它的本质还是同步的。

但也正因为它们是并行的,所以在处理复杂或资源密集型的日志记录场景时候,它们也都是非常合适的。


✨✨应用场景

在 Python 中,多线程、多进程和异步IO(基于 asyncio)是实现异步编程(非严格意义)的三种主要方法。它们各自适用于不同的场景,并且有各自的特点和优势。理解这些差异有助于在特定应用中选择最合适的并发策略。

对于大多数情况下的日志记录:

  • 多线程:简单易用,能有效地处理IO等待时间,而且与Python的标准库 logging 模块兼容性良好;
  • 多进程:通常不是日志记录的首选方法,除非日志处理确实涉及到大量的CPU密集型计算;
  • 异步IO:适用于高并发的日志记录,如需要处理大量的网络日志传输,但它需要异步编程模型的支持。

通常,对于日志记录,选择简单、易于维护的并发模式更为重要,因为日志记录本身通常不应该成为应用程序逻辑的主要瓶颈。

简单总结如下表:

方法 实现难度 应用场景
多线程 I/O密集型任务
多进程 ⭐⭐ CPU密集型任务
Asyncio异步 ⭐⭐⭐⭐⭐ I/O密集型任务和异步应用

在选择日志记录方法时,应考虑项目的具体需求、技术栈以及团队的技术能力。每种方法都有其适用的场景,选择合适的可以提高日志处理的效率和性能。


具体实现

队列(Queue)

⚠️⚠️在异步日志的编程中,如果没有显式的对队列(Queue)进行一些操作,那么队列(Queue)在这里,起到的作为几乎为零。

在实现日志异步操作时,无论是使用 多线程多进程 还是异步IO,都可以选择使用 logging + 队列(Queue)来实现。只是它们之间所使用的 队列(Queue) 都是不同类型的,这是因为它们的运行环境和机制不同:

  • 多线程:使用 queue.Queue,Python标准库, 它是线程安全的,适用于在多线程环境中共享数据,确保数据的一致性和安全性;
  • 多进程:使用multiprocessing.Queue,专门为多进程设计,能够在不同进程中安全地传递消息;
  • 异步IO:使用asyncio.Queue,为asyncio异步编程设计,支持在异步函数中使用。

使用队列的好处:

  • 使用队列来处理日志消息,无论是在多线程多进程还是 异步IO 环境下,都能提高日志记录的效率和程序的整体性能。
优点 阐述
减少主线程的阻塞 对于一些耗时的日志操作,如将日志发送到远程服务器,将其放入子线程中运行可以减少对主线程的阻塞,提高应用程序的响应速度和性能。
充分利用资源 在多线程环境中,可以更有效地利用多核处理器的能力。通过在后台线程中处理日志,可以平滑化资源使用,避免主线程在执行耗时操作时的性能瓶颈。
高度定制化的日志处理 异步日志记录可以实现高度定制化的日志处理,如关键词捕获和触发报警功能,有助于提高整体性能。
生产者-消费者解耦 使用队列实现了日志生产者(应用程序的各个部分生成日志)和消费者(负责处理日志的线程或进程)之间的解耦,提高了系统的灵活性和可扩展性。

虽然 logging 模块的标准处理器已经非常强大,但在特定场景下,使用自定义的异步日志处理可以提供更多的灵活性和性能优势。这在处理大量日志、需要高度定制化处理或希望最小化对主应用性能影响的情况下尤其有用。所以这个时候使用上队列(Queue),效果是非常好的。


单例模式(Singleton)

通过实现单例模式,可以确保整个应用中只有一个日志队列和一个后台线程(只有一个异步日志记录实例),在实现异步日志程序时,使用单例模式是非常合适的。

单例模式确保一个类只有一个实例,并提供一个全局访问点。通常我们只需要一个日志系统实例来收集和分发整个应用程序的日志信息。

使用单例模式的优势包括:

  1. 一致性和控制:确保所有的日志都通过同一个日志管理实例进行处理,这有助于维护一致的日志格式和行为;
  2. 资源共享和管理:单例模式允许共享同一个日志队列和处理器,这样可以更有效地管理资源;
  3. 易于配置和维护:只有一个日志实例,因此配置和维护(例如,设置日志级别、格式化器和处理器)变得更加集中和简单;
  4. 避免重复实例化:日志系统可能是资源密集型的,单例模式避免了重复创建和销毁日志系统实例的开销。

代码实现✨✨

下面三份异步日志代码中,都使用了一个自定义日志处理器-->AsyncLogHandler,每一份都是处理100条日志消息,用于测试。

  • 同步执行了日志记录,

  • 异步通过多线程、多进程或asyncio异步将日志信息上传到服务器;

通篇下来,它们实现的功能是一致,但速度上,有所不同,如下表所示:

类型 耗时/s
多线程 2.8
多进程 3.5
asyncio异步 0.71

延伸一下,如果如果日志记录数量剧增,那么:

  • 多线程在这里将会继续表现良好;
  • 多进程的优势得不到任何体现,因为它更加适合 CPU 密集型任务,
  • asyncio异步 的优势则会更加明显,因为异步 I/O 可以在单个线程内处理大量任务,避免了线程切换的开销。

服务端

  • 使用Flask搭一个服务端,用于测试
  • 通过 http://127.0.0.1:5000/submit
# -*- coding: utf-8 -*-

import time

from flask import Flask, jsonify

app = Flask(__name__)


@app.route("/submit", methods=["POST"])
def api_data():
    data = {"status": "success"}
    time.sleep(.5)
    return jsonify(data)


if __name__ == "__main__":
    app.run()


多线程日志记录

通过合理的设计代码,有效地结合了单独的监视线程和线程池的并发处理能力,适合于尤其是涉及耗时操作(如网络请求)的日志系统。这样的设计允许主程序继续运行而不被日志处理操作阻塞,同时确保日志消息能够及时且高效地被处理。

下面二者结合,将 _log_worker 放在单独的线程中运行可以避免阻塞主线程,并允许持续和实时地处理队列中的日志消息。

  • self.thread 负责运行_log_worker方法,

  • executor负责处理耗时任务(upload_log

# -*- coding: utf-8 -*-
# @Author : Frica01
# @Time   : 2023-12-12 20:43
# @Name   : async_logging.py

import logging
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor

import requests

log_format = '%(levelname)-7s - %(asctime)s - [%(filename)s:%(funcName)s:%(lineno)d] - %(message)s'


class AsyncLogHandler(logging.Handler):
    """
    异步日志处理器,将日志消息发送到队列中。

    Attributes:
        log_queue (queue.Queue): 存储日志消息的队列。
    """

    def __init__(self, log_queue: queue.Queue):
        super().__init__(level=logging.DEBUG)
        self.log_queue = log_queue

    def emit(self, record):
        """
        将格式化的日志记录放入队列中。

        Args:
            record (logging.LogRecord): 日志记录对象。
        """
        log_message = self.format(record)
        self.log_queue.put(log_message)


class AsyncLogger:
    """
    异步日志记录器,用于处理日志消息的异步记录。
    该类实现了单例模式,确保整个应用中只有一个日志队列和处理线程。

    Attributes:
        log_queue (queue.Queue): 用于存储日志消息的队列。
        thread (threading.Thread): 用于处理队列中日志消息的后台线程。
        executor (ThreadPoolExecutor): 用于处理队列中日志消息的后台线程池。
    """

    _instance = None

    def __new__(cls):
        """创建或获取 AsyncLogger 的单例实例。"""
        if cls._instance is None:
            cls._instance = super(AsyncLogger, cls).__new__(cls)
            cls._instance._init()
        return cls._instance

    def _init(self):
        """初始化日志队列和启动日志处理线程。"""
        self.log_queue = queue.Queue()
        self.executor = ThreadPoolExecutor(max_workers=10)  # 创建线程池
        self._start_log_worker()

    def _start_log_worker(self):
        """启动用于处理日志消息的后台线程。"""
        self.thread = threading.Thread(target=self._log_worker, daemon=True)
        self.thread.start()

    def _log_worker(self):
        """后台线程的工作函数,从队列中获取并打印日志消息。"""
        while True:
            try:
                log_message = self.log_queue.get_nowait()
                if log_message == "STOP":
                    break
                self.executor.submit(self.upload_log, log_message)
                # res = requests.post('http://127.0.0.1:5000/submit', data={'data': log_message})
                # print(res.json())
            except queue.Empty:
                # 处理队列为空的情况, 选择暂停一段时间
                time.sleep(.1)

    @staticmethod
    def upload_log(log_message):
        # 这里是上传日志的操作
        res = requests.post('http://127.0.0.1:5000/submit', data={'data': log_message})
        print(res.json())

    def setup_logger(self, name=None, level=logging.INFO, filename=None):
        """设置并返回具有指定名称和级别的日志记录器。

        Args:
            name (str): 日志记录器的名称。
            level (int): 日志级别。
            filename (str): 日志文件的名称。

        Returns:
            logging.Logger: 配置后的日志记录器。
        """
        logger = logging.getLogger(name)
        logger.setLevel(level)
        # # 避免日志消息被传播到root logger
        logger.propagate = False

        # 文件处理器
        if filename:
            file_handler = logging.FileHandler(filename)
            file_handler.setFormatter(logging.Formatter(log_format))
            logger.addHandler(file_handler)

        # 控制台处理器
        stream_handler = logging.StreamHandler()
        stream_handler.setFormatter(logging.Formatter(log_format))
        logger.addHandler(stream_handler)

        # 用于处于耗时的处理器
        async_handler = AsyncLogHandler(self.log_queue)
        async_handler.setFormatter(logging.Formatter(log_format))
        logger.addHandler(async_handler)

        return logger

    @property
    def is_queue_empty(self):
        """
        检查日志队列是否为空。

        Returns:
            bool: 如果队列为空返回 True,否则返回 False。
        """
        return self.log_queue.empty()

    def stop_logging(self):
        """发送停止信号并等待日志处理线程结束。"""
        self.log_queue.put("STOP")
        self.thread.join()
        self.executor.shutdown(wait=True)  # 关闭线程池

调用代码

# -*- coding: utf-8 -*-

import time
from async_logging import AsyncLogger


def main():
    # 创建AsyncLogger实例
    async_logger = AsyncLogger()
    # 创建日志记录器,确保指定的级别允许记录日志消息
    logger = async_logger.setup_logger(
        'main',
        level=10,   # 即 logging.DEBUG
        # filename='app.log'
    )
	
    # 使用日志记录器
    for i in range(100):
        logger.info('This is an info message')

    while not async_logger.is_queue_empty:
        time.sleep(0.1)


if __name__ == '__main__':
    main()


多进程日志记录

值得一提的是,这份代码仅仅是修改上面的 多线程日志记录 的一个方法。

  • ThreadPoolExecutor to ProcessPoolExecutor

但是它们之间的应用场景有所不同,特别是在涉及到CPU密集型的日志处理任务时。

  1. CPU密集型任务:对于CPU密集型的日志处理任务,如需要进行复杂计算或处理的日志消息,使用进程池可以提高性能。

  2. 避免GIL限制:由于GIL的存在,Python的线程并不能真正并行执行计算密集型任务。使用进程池可以绕过GIL,实现真正的并行处理。

from concurrent.futures import ProcessPoolExecutor

# 其他代码保持不变

class AsyncLogger:
    # 省略其他部分

    def _init(self):
        """初始化日志队列和启动日志处理进程池。"""
        self.log_queue = queue.Queue()
        self.executor = ProcessPoolExecutor(max_workers=10)  # 使用进程池

    # 省略其他部分


异步IO日志记录

# -*- coding: utf-8 -*-
# @Author : Frica01
# @Time   : 2023-12-12 20:43
# @Name   : async_logging.py


import asyncio
import logging
import threading

import aiohttp

log_format = '%(levelname)-7s - %(asctime)s - [%(filename)s:%(funcName)s:%(lineno)d] - %(message)s'


class AsyncLogHandler(logging.Handler):
    """
    异步日志处理器,将日志消息发送到队列中。

    Attributes:
        log_queue (asyncio.Queue): 存储日志消息的队列。
    """
    def __init__(self, log_queue: asyncio.Queue):
        super().__init__(level=logging.DEBUG)
        self.log_queue = log_queue

    def emit(self, record):
        """
        将格式化的日志记录放入队列中。

        Args:
            record (logging.LogRecord): 日志记录对象。
        """
        log_message = self.format(record)
        self.log_queue.put_nowait(log_message)


class AsyncLogger:
    """
    异步日志记录器,用于处理日志消息的异步记录。
    该类实现了单例模式,确保整个应用中只有一个日志队列和处理线程。

    Attributes:
        log_queue (queue.Queue): 用于存储日志消息的队列。
        loop (asyncio.new_event_loop): 事件循环
        thread (threading.Thread): 用于处理队列中日志消息的后台线程。
        tasks (list): 队列列表
    """
    _instance = None

    def __new__(cls):
        """创建或获取 AsyncLogger 的单例实例。"""
        if cls._instance is None:
            cls._instance = super(AsyncLogger, cls).__new__(cls)
            cls._instance._init()
        return cls._instance

    def _init(self):
        """初始化日志队列和启动日志处理线程。"""
        self.log_queue = asyncio.Queue()
        self.loop = asyncio.new_event_loop()  # 创建新的异步事件循环
        self.thread = threading.Thread(target=self._start_log_worker, args=(self.loop,), daemon=True)
        self.thread.start()
        self.tasks = list()

    def _start_log_worker(self, loop):
        """启动用于处理日志消息的后台线程的事件循环"""
        asyncio.set_event_loop(loop)    # 将当前线程的事件循环设置为 loop
        loop.run_until_complete(self._log_worker())  # 启动事件循环,并运行 _log_worker

    async def _log_worker(self):
        """后台线程的工作函数,从队列中获取并打印日志消息。"""
        async with aiohttp.ClientSession() as session:
            while True:
                log_message = await self.log_queue.get()
                if log_message == "STOP":
                    break
                task = asyncio.create_task(self.upload_log(session, log_message))
                self.tasks.append(task)
            await asyncio.gather(*self.tasks)

    @staticmethod
    async def upload_log(session, log_message):
        try:
            async with session.post(
                    'http://127.0.0.1:5000/submit', data={'data': log_message}
            ) as response:
                print(f"Log uploaded: {await response.json()}")
        except Exception as e:
            print(f"Failed to send log: {e}")

    def setup_logger(self, name=None, level=logging.INFO, filename=None):
        """设置并返回具有指定名称和级别的日志记录器。

        Args:
            name (str): 日志记录器的名称。
            level (int): 日志级别。
            filename (str): 日志文件的名称。

        Returns:
            logging.Logger: 配置后的日志记录器。
        """
        logger = logging.getLogger(name)
        logger.setLevel(level)
        logger.propagate = False

        # 文件处理器
        if filename:
            file_handler = logging.FileHandler(filename)
            file_handler.setFormatter(logging.Formatter(log_format))
            logger.addHandler(file_handler)

        # 控制台处理器
        stream_handler = logging.StreamHandler()
        stream_handler.setFormatter(logging.Formatter(log_format))
        logger.addHandler(stream_handler)

        # 用于处于耗时的处理器
        async_handler = AsyncLogHandler(self.log_queue)
        async_handler.setFormatter(logging.Formatter(log_format))
        logger.addHandler(async_handler)

        return logger

    @property
    def is_queue_empty(self):
        """
        检查日志队列是否为空。

        Returns:
            bool: 如果队列为空返回 True,否则返回 False。
        """
        return self.log_queue.empty()

    def stop_logging(self):
        """发送停止信号并等待日志处理线程结束和结束事件循环"""
        asyncio.run_coroutine_threadsafe(self.log_queue.put("STOP"), self.loop)
        self.thread.join()
        self.loop.close()

调用代码

# -*- coding: utf-8 -*-

import asyncio

from async_logging import AsyncLogger


async def main():
    async_logger = AsyncLogger()
    logger = async_logger.setup_logger(
        'main',
        level=10,  # 即 logging.DEBUG
        filename='app.log'
    )
    for i in range(100):
        logger.info('This is an info message')

    async_logger.stop_logging()


if __name__ == '__main__':
    asyncio.run(main())

代码对比🎈🎈

上面三份代码分别展示了使用多线程、多进程和异步asyncio进行异步日志处理的不同方法。每种方法都有其独特的应用场景和优势,根据项目的具体需求和环境可以选择合适的一种。以下是对它们的简单梳理:

  1. 多线程日志记录
    • 优势:适合于 I/O 密集型任务,如网络请求。由于 Python 的 GIL,多线程对 CPU 密集型任务的提升有限,但在 I/O 密集型任务中表现良好。
    • 应用场景:适用于不需要大量 CPU 计算且频繁进行 I/O 操作(如文件读写、网络请求)的日志记录。
  2. 多进程日志记录
    • 优势:由于进程之间内存是隔离的,适用于 CPU 密集型任务。可以绕过 GIL 限制,实现真正的并行计算。
    • 应用场景:适用于处理大量计算密集型日志处理任务,如需要进行复杂数据处理的日志分析。(不适合本文的这个场景!!!)
  3. 异步IO日志记录
    • 优势:非常高效,特别适合于高并发情况。异步 I/O 可以在单个线程内处理大量任务,避免了线程切换的开销。
    • 应用场景:适用于高并发的网络应用,如实时数据处理和大量网络日志传输。

总结

本文详尽地介绍了 Python 中 logging 模块的高级配置,特别是关于日志异步操作的实现。

文章深入探讨了多线程、多进程和异步IO(基于 asyncio)在日志记录中的应用,提供了实际代码示例,并清晰地解释了每种方法的应用场景和优势。

每种方法都有其特定的应用场景和优势。选择合适的日志记录方法取决于应用的具体需求,如性能要求、并发级别以及日志任务的性质(CPU 密集型或 I/O 密集型)。

总的来说,这篇文章为开发人员提供了一个全面的指南,以选择和实现最适合应用的异步日志记录方法。

后话

本次分享到此结束,
see you~🚀🚀

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。