使用 Python 将日志轮换到目录中

发布于 2024-10-20 09:45:45 字数 722 浏览 2 评论 0原文

我有一个名为 Poller.log 的文件,它始终附加日志详细信息。我希望这个日志文件每天轮换一次,并且限制为 30 天。因此,该代码运行良好。

现在我希望已旋转的日志位于一个文件夹中(即logs/poller.log.2011-03-04_15-36)。无论如何,有没有办法指导应该在哪里创建这个旋转文件?

这个Python脚本将由Cron执行。

import logging
import logging.handlers

LOG_FILENAME = '/home/stackoverflow/snmpdata/poller.log'

# Set up a specific logger with our desired output level
poll_logger = logging.getLogger('pollerLog')

# Add the log message handler to the logger
log_rotator = logging.handlers.TimedRotatingFileHandler(LOG_FILENAME, when='d', interval=1, backupCount=30, encoding=None, delay=False, utc=False)
poll_logger.addHandler(log_rotator)

# Roll over on application start
poll_logger.handlers[0].doRollover()

I have a file called Poller.log and it's appended by log details all the time. I want this log file to be rotated everyday and limited by 30 days. Thus, the code works well.

Now I want this logs that has been rotated to be in a folder (i.e. logs/poller.log.2011-03-04_15-36). Is there anyway to direct where this rotated file should be created?

This python script will be executed by Cron.

import logging
import logging.handlers

LOG_FILENAME = '/home/stackoverflow/snmpdata/poller.log'

# Set up a specific logger with our desired output level
poll_logger = logging.getLogger('pollerLog')

# Add the log message handler to the logger
log_rotator = logging.handlers.TimedRotatingFileHandler(LOG_FILENAME, when='d', interval=1, backupCount=30, encoding=None, delay=False, utc=False)
poll_logger.addHandler(log_rotator)

# Roll over on application start
poll_logger.handlers[0].doRollover()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

烧了回忆取暖 2024-10-27 09:45:45

Python 日志处理程序不允许轻易做到这一点。您可能有两种方法来实现此目的:

  1. 最简单的方法是将 LOG_FILENAME 设置为已在日志/poller.log 中,如果您想在其他任何地方访问 poller.log,请使用符号链接:)

  2. 从 TimedRotatingFileHandler 开始创建您自己的处理程序,并且从 /usr/lib/python2.X/logging/handlers.py TimedRotatingFileHandler 类复制/粘贴 doRollover() 。并更改:

dfn = self.baseFilename + "." + time.strftime(self.suffix, timeTuple)

为:

dfn = os.path.join('logs', os.path.basename(self.baseFilename)) + "." + time.strftime(self.suffix, timeTuple)

Python logging handler don't allow to do that easily. You might have 2 way of achieve this :

  1. The simplest way would to setup LOG_FILENAME to be already in logs/poller.log, and if you want to access to your poller.log anywhere else, use a symlink :)

  2. Create your own handler starting from TimedRotatingFileHandler, and copy/paste the doRollover() from /usr/lib/python2.X/logging/handlers.py, TimedRotatingFileHandler class. And change :

dfn = self.baseFilename + "." + time.strftime(self.suffix, timeTuple)

to

dfn = os.path.join('logs', os.path.basename(self.baseFilename)) + "." + time.strftime(self.suffix, timeTuple)
郁金香雨 2024-10-27 09:45:45

如果您不介意额外的依赖性,您可以随时使用twisted 中的翻转日志记录模块。 Twisted 有一个日志文件模块,允许每日日志、每周日志,甚至每月日志(类似这种情况)。

If you don't mind the extra dependency you could always use the rollover logging module in twisted. Twisted has a logfile module that allows for daily logs, weekly logs, or even monthly logs like this situation.

俯瞰星空 2024-10-27 09:45:45

我为单独的进程添加了这段代码,以将任何日志备份移动到文件夹。

import logging
import logging.handlers
import shutil, os, glob
import zipfile
import schedule
import time
import threading

zip_file_name = "Log.zip"
zip_file_path = "Logs/LogsArchive/Log.zip"

source_directory = "Logs"
archive_directory = "Logs/LogsArchive"


def moveAllFilesinDir(srcDir, dstDir, allLogs = False):
    try:
    # Check if both the are directories
        if os.path.isdir(srcDir) and os.path.isdir(dstDir):
            # Iterate over all the files in source directory

            if allLogs == False:
                for filePath in glob.glob(srcDir + '/*.*.*'):
                    # Move each file to destination Directory
                    shutil.move(filePath, dstDir)
            elif allLogs == True:
                for filePath in glob.glob(srcDir + '/*.*'):
                    # Move each file to destination Directory
                    shutil.copy(filePath, dstDir)

        else:
            debug_logger.debug("LoggingModule: - moveAllFilesinDir - srcDir & dstDir should be Directories")
    except Exception as ex:
        error_logger.error("Error in LoggingModule - moveAllFilesinDir", exc_info=True)


只有扩展名为 3 部分的日志文件才会被移动到“name.log.date”上
我现在正在研究压缩存档文件夹的过程。

更新:
Zip 过程

def createZipDir(path):
    #delete old zipfile if exists, but leave old zipfile if no other files exist
    if len(os.listdir(path)) > 1:
        zipFile = zip_file_path
        if os.path.isfile(zipFile):
            os.remove(zipFile)
        zipf = zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED)
        for root, dirs, files in os.walk(path):
            for file in files:
                if file != zip_file_name:
                    zipf.write(os.path.join(root, file))
        zipf.close()
    else:
        debug_logger.debug("LoggingModule: - createZipDir - no files found, zip file left in place.")

这是删除旧文件的

def deleteOldFilesinDir(srcDir):
    try:
    # Check if both the are directories
        if os.path.isdir(srcDir):
            # Iterate over all the files in source directory
            for filePath in glob.glob(srcDir + '/*.*'):
                if filePath != zip_file_path:
                    os.remove(filePath)
        else:
            print("srcDir & dstDir should be Directories")
    except Exception as ex:
        error_logger.error("Error in LoggingModule - deleteOldFilesinDir", exc_info=True)

:这是整个过程:

我将 runArchiveProcess 设置为每周运行一次。


def runArchiveProcess(allFiles = False):
    debug_logger.debug("LoggingModule: Archive process started.")
    moveAllFilesinDir(source_directory, archive_directory, allFiles)
    createZipDir(archive_directory)
    deleteOldFilesinDir(archive_directory)
    debug_logger.debug("LoggingModule Archive process completed.")

和调度程序位:

#only kicked off in own thread...
def runScheduler():
    debug_logger.debug("LoggingModule - runScheduler - don't call this function outside of LoggingModule as it runs in own thread.")
    schedule.every().monday.at("00:00:00").do(runArchiveProcess)
    #schedule.every(10).seconds.do(runArchiveProcess).do(runArchiveProcess) #for testing

    try:
        while True:
            debug_logger.debug("LoggingModule checking scheduler...")
            #Checks whether a scheduled task is pending to run or not
            schedule.run_pending()
            debug_logger.debug("LoggingModule Scheduler sleeping...")
            time.sleep(60 * 60) # checks every 1 hour
            #time.sleep(10)  # for testing
    except Exception as ex:
        error_logger.error("Error in LoggingModule - runScheduler", exc_info=True)


def runSchedulerThread():
    thread = threading.Thread(target=runScheduler)
    thread.start()

I added this bit of code for a separate process to move any log backups to a folder.

import logging
import logging.handlers
import shutil, os, glob
import zipfile
import schedule
import time
import threading

zip_file_name = "Log.zip"
zip_file_path = "Logs/LogsArchive/Log.zip"

source_directory = "Logs"
archive_directory = "Logs/LogsArchive"


def moveAllFilesinDir(srcDir, dstDir, allLogs = False):
    try:
    # Check if both the are directories
        if os.path.isdir(srcDir) and os.path.isdir(dstDir):
            # Iterate over all the files in source directory

            if allLogs == False:
                for filePath in glob.glob(srcDir + '/*.*.*'):
                    # Move each file to destination Directory
                    shutil.move(filePath, dstDir)
            elif allLogs == True:
                for filePath in glob.glob(srcDir + '/*.*'):
                    # Move each file to destination Directory
                    shutil.copy(filePath, dstDir)

        else:
            debug_logger.debug("LoggingModule: - moveAllFilesinDir - srcDir & dstDir should be Directories")
    except Exception as ex:
        error_logger.error("Error in LoggingModule - moveAllFilesinDir", exc_info=True)


Only log files with a 3 part extension will be moved over "name.log.date"
I am working on a process to zip the archive folder now.

update:
Here's is the Zip process

def createZipDir(path):
    #delete old zipfile if exists, but leave old zipfile if no other files exist
    if len(os.listdir(path)) > 1:
        zipFile = zip_file_path
        if os.path.isfile(zipFile):
            os.remove(zipFile)
        zipf = zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED)
        for root, dirs, files in os.walk(path):
            for file in files:
                if file != zip_file_name:
                    zipf.write(os.path.join(root, file))
        zipf.close()
    else:
        debug_logger.debug("LoggingModule: - createZipDir - no files found, zip file left in place.")

Deleting the old files:

def deleteOldFilesinDir(srcDir):
    try:
    # Check if both the are directories
        if os.path.isdir(srcDir):
            # Iterate over all the files in source directory
            for filePath in glob.glob(srcDir + '/*.*'):
                if filePath != zip_file_path:
                    os.remove(filePath)
        else:
            print("srcDir & dstDir should be Directories")
    except Exception as ex:
        error_logger.error("Error in LoggingModule - deleteOldFilesinDir", exc_info=True)

Here's the whole process:

I have the runArchiveProcess set on a schedule to run once a week.


def runArchiveProcess(allFiles = False):
    debug_logger.debug("LoggingModule: Archive process started.")
    moveAllFilesinDir(source_directory, archive_directory, allFiles)
    createZipDir(archive_directory)
    deleteOldFilesinDir(archive_directory)
    debug_logger.debug("LoggingModule Archive process completed.")

And the scheduler bit:

#only kicked off in own thread...
def runScheduler():
    debug_logger.debug("LoggingModule - runScheduler - don't call this function outside of LoggingModule as it runs in own thread.")
    schedule.every().monday.at("00:00:00").do(runArchiveProcess)
    #schedule.every(10).seconds.do(runArchiveProcess).do(runArchiveProcess) #for testing

    try:
        while True:
            debug_logger.debug("LoggingModule checking scheduler...")
            #Checks whether a scheduled task is pending to run or not
            schedule.run_pending()
            debug_logger.debug("LoggingModule Scheduler sleeping...")
            time.sleep(60 * 60) # checks every 1 hour
            #time.sleep(10)  # for testing
    except Exception as ex:
        error_logger.error("Error in LoggingModule - runScheduler", exc_info=True)


def runSchedulerThread():
    thread = threading.Thread(target=runScheduler)
    thread.start()

久而酒知 2024-10-27 09:45:45

日志记录模块中的 BaseRotatingHandler 类提供了 接口

class BaseRotatingHandler:
    def rotation_filename(default_name):
        if not callable(self.namer):
            result = default_name
        else:
            result = self.namer(default_name)
        return result

因此,您可以像这样创建自己的客户RotatingHandler:

import datetime
import re

class DayRotatingHandler(RotatingFileHandler):

    @staticmethod
    def get_previous(name):
        basename, log_index = re.match(r"(.*)\.(\d+)$", name)
        date = datetime.datetime.now() - datetime.timedelta(days=int(log_index))
        return f"logs/{basename}.{date}"

    namer = get_previous

The BaseRotatingHandler class in logging module provides an interface

class BaseRotatingHandler:
    def rotation_filename(default_name):
        if not callable(self.namer):
            result = default_name
        else:
            result = self.namer(default_name)
        return result

So, you can create your own customer RotatingHandler like this:

import datetime
import re

class DayRotatingHandler(RotatingFileHandler):

    @staticmethod
    def get_previous(name):
        basename, log_index = re.match(r"(.*)\.(\d+)
quot;, name)
        date = datetime.datetime.now() - datetime.timedelta(days=int(log_index))
        return f"logs/{basename}.{date}"

    namer = get_previous
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文