Files
fedora-infra_ansible/files/scripts/rotatelogs-cleanup.py
James Antill 26f36e0820 Second part of fixing the logrotate problem on kojipkgs*. #12398
Adds a script which will compress rotatelogs not being used and delete logs
after 8 days (same as logrotate).

Signed-off-by: James Antill <james@and.org>
2025-09-19 10:54:35 -04:00

277 lines
6.8 KiB
Python
Executable File

#! /usr/bin/python3
# When logging to files with Apache/httpd rotatelogs and using the date
# specifiers nothing will ever cleanup the old logfiles.
# This is a simple script which will do that.
# Eg. To keep the last 16 days of the following rotatelogs...
# | rotatelogs 'foo.log-%Y-%m-%d_%H:%M' 86400A
# rotatelogs-cleanup.py keep 16 foo.log-'%Y-%m-%d_%H:%M'
# rotatelogs-cleanup.py compress foo.log-'%Y-%m-%d_%H:%M'
# ...this sorts the files that match the wildcard, and removes the oldest
# until there are 16 left. We don't actually parse the time, but converts
# it to globs and will ignore files like:
# foo.log-X-blah-test
# foo.log-2004-01-02_12:34.old
#
# We do treat zero length files differently, removing them before older
# files with data (ignoring the latest file, if it's zero length).
#
conf_rm = True # For testing, we can turn this off and nothing gets deleted
# Do we want to rm zero length files when we are compressing.
conf_rm_zero_len_compress = False
conf_compress_cmd_gz = ["gzip", "-9"]
conf_compress_cmd_xz = ["xz", "-9"]
conf_compress_cmd_def = conf_compress_cmd_xz
import argparse
import glob
import os
import subprocess
import sys
def ftime2glob(fmt):
ftime_rep = (
("%A", "*"),
("%a", "???"),
("%B", "*"),
("%b", "???"),
("%c", "*"),
("%d", "??"),
("%H", "??"),
("%I", "??"),
("%j", "??"),
("%M", "??"),
("%m", "??"),
("%p", "*"),
("%S", "??"),
("%U", "??"),
("%W", "??"),
("%w", "?"),
("%X", "*"),
("%x", "*"),
("%Y", "????"),
("%y", "??"),
("%Z", "*"),
("%%", "%"),
)
for key in ftime_rep:
fmt = fmt.replace(*key)
return fmt
def _unlink(fname):
# Don't blow up everything if we can't rm a file...
try:
os.unlink(fname)
except OSError as e:
print("Error: rm(%s): %s" % (fname, e), file=sys.stderr)
def read_count(fo):
CHUNK_SIZE = 8192
tot = 0
while True:
chunk = fo.read(CHUNK_SIZE)
# If the chunk is empty, we've reached the end of the file
if not chunk:
break
tot += len(chunk)
return tot
def uncompressed_file_size(fname, stop_any=False):
if False:
pass
elif fname.endswith(".gz"):
p = subprocess.Popen(["zcat", fname], stdout=subprocess.PIPE)
elif fname.endswith(".xz"):
p = subprocess.Popen(["xzcat", fname], stdout=subprocess.PIPE)
else:
return os.path.getsize(fname)
if not stop_any:
return read_count(p.stdout)
# We want to know if there's any data there
d = p.stdout.read(1)
p.terminate()
return len(d)
def uncompressed_file_not_empty(fname):
return uncompressed_file_size(fname, stop_any=True) > 0
def _rm_zero_len(files):
if not files:
return []
nfiles = []
for fname in files[:-1]: # Don't delete the latest, even if empty
try:
sz = uncompressed_file_not_empty(fname)
except:
nfiles.append(fname)
continue
if sz == 0:
_ui_unlink(fname)
continue
nfiles.append(fname)
nfiles.append(files[-1]) # Add latest back
return nfiles
def _ui_unlink(fname):
print("rm", fname)
if conf_rm:
_unlink(fname)
def _ui_int(num):
return "{:_}".format(int(num))
def _glob(ftime_path, rm_zero_len=False):
ftime_path = ftime2glob(ftime_path)
files = glob.glob(ftime_path)
if ftime_path[-1] != "*": # Make sure we include the compressed files.
files += glob.glob(ftime_path + ".gz")
files += glob.glob(ftime_path + ".xz")
files = sorted(files)
files = list(files)
if rm_zero_len:
files = _rm_zero_len(files)
return files
# --- commands ----
def _cmd_compress(args):
files = _glob(args.ftime_path, conf_rm_zero_len_compress)
if False:
pass
elif args.cmd == "gzip":
cmd = conf_compress_cmd_gz
elif args.cmd == "xz":
cmd = conf_compress_cmd_xz
else: # compress/etc
cmd = conf_compress_cmd_def
files = files[:-1] # Everything but the latest
for fname in files:
if fname.endswith(".gz"):
continue
if fname.endswith(".xz"):
continue
print(cmd[0], fname)
subprocess.call(cmd + [fname])
def _cmd_ls(args):
files = _glob(args.ftime_path)
nfiles = []
for fname in files:
sz = ''
if args.cmd != "ls-f":
sz = _ui_int(uncompressed_file_size(fname))
nfiles.append((sz, fname))
msz = 0
for sz, _ in nfiles:
msz = max(msz, len(sz))
for sz, fname in nfiles:
if args.cmd != "ls-f":
print("%*s %s" % (msz, sz, fname))
else:
print(fname)
def _cmd_keep(args):
files = _glob(args.ftime_path, True)
files = files[: -args.num]
for fname in files:
_ui_unlink(fname)
def _cmd_rm(args):
files = _glob(args.ftime_path)
for fname in files:
_ui_unlink(fname)
def _cmd_show(args):
print(ftime2glob(args.ftime_path))
# --- main ----
def _parse_positive_integer(oval):
try:
val = int(oval)
except:
val = -1
if val <= 0:
raise argparse.ArgumentTypeError(f"{oval} is not a positive integer")
return val
def _main():
parser = argparse.ArgumentParser()
# parser.add_argument("-v", dest="verbose", action="store_true")
subparsers = parser.add_subparsers(required=True, dest="cmd")
cmd = subparsers.add_parser("help")
cmd.set_defaults(func=lambda x: parser.print_help())
hlp_ftime = "ftime expanded path (see rotatelogs)"
hlp_num = "number of files to keep"
hlp = "compress old files"
als = ["gzip", "xz"]
cmd = subparsers.add_parser("compress", aliases=als, help=hlp)
cmd.add_argument("ftime_path", help=hlp_ftime)
cmd.set_defaults(func=_cmd_compress)
cmd = subparsers.add_parser("keep", help="remove old files")
hlp = "number of files to keep"
cmd.add_argument("num", type=_parse_positive_integer, help=hlp)
cmd.add_argument("ftime_path", help=hlp_ftime)
cmd.set_defaults(func=_cmd_keep)
hlp = "list all files"
cmd = subparsers.add_parser("list", aliases=["ls", "ls-f"], help=hlp)
cmd.add_argument("ftime_path", help=hlp_ftime)
cmd.set_defaults(func=_cmd_ls)
hlp = "remove all files"
cmd = subparsers.add_parser("remove", aliases=["rm"], help=hlp)
cmd.add_argument("ftime_path", help=hlp_ftime)
cmd.set_defaults(func=_cmd_rm)
hlp = "show what the glob will look like"
cmd = subparsers.add_parser("show-glob", aliases=["show"], help=hlp)
cmd.add_argument("ftime_path", help=hlp_ftime)
cmd.set_defaults(func=_cmd_show)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
_main()