2025-09-16 16:05:54
md_save
Рекурсивно обходит указанную директорию и её поддиректории, вычисляет MD5-суммы для файлов и сохраняет их в базу данных SQLite (md5.sqlite). Выводит однострочное сообщение с временной меткой, относительным путём файла и информацией: "New file added" (новый файл добавлен) или "File changed" (файл изменён) с указанием старой и новой MD5-суммы, если хэш отличается от сохранённого.
md_check
Читает базу данных md5.sqlite и проверяет наличие файлов и их MD5-суммы. Выводит однострочное сообщение с временной меткой, относительным или абсолютным путём файла и информацией: "File missing" (файл отсутствует) или "File changed" (файл изменён) с указанием сохранённой и текущей MD5-суммы, если они различаются.
==============
$ cat md_save
#!/usr/bin/env python3
import os
import sys
import hashlib
import sqlite3
from pathlib import Path
from datetime import datetime
def calculate_md5(file_path):
"""Calculate MD5 hash of a file."""
md5_hash = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
def init_db(db_path):
"""Initialize SQLite database with md5sums table."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS md5sums (
file_path TEXT PRIMARY KEY,
md5sum TEXT NOT NULL
)
""")
conn.commit()
return conn
def check_file_md5(file_path, conn, base_dir):
"""Check file's MD5 against database and update if needed."""
cursor = conn.cursor()
md5 = calculate_md5(file_path)
relative_path = str(Path(file_path).relative_to(base_dir))
timestamp = datetime.now().strftime("%y-%m-%d %H:%M:%S")
# Query database for existing MD5
cursor.execute("SELECT md5sum FROM md5sums WHERE file_path = ?", (file_path,))
result = cursor.fetchone()
if result is None:
# New file, insert into database
cursor.execute("INSERT INTO md5sums (file_path, md5sum) VALUES (?, ?)", (file_path, md5))
print(f"{timestamp} 0 user@hm:{base_dir} $ ./md5_checker {file_path} New file added with MD5: {md5}")
elif result[0] != md5:
# MD5 mismatch, update database and report
cursor.execute("UPDATE md5sums SET md5sum = ? WHERE file_path = ?", (md5, file_path))
print(f"{timestamp} 0 user@hm:{base_dir} $ ./md5_checker {file_path} File changed: Old MD5: {result[0]} New MD5: {md5}")
conn.commit()
def process_directory(dir_path, conn):
"""Recursively process all files in directory and subdirectories."""
base_dir = Path(dir_path).resolve()
try:
for entry in os.scandir(dir_path):
if entry.is_file():
check_file_md5(entry.path, conn, base_dir)
elif entry.is_dir():
process_directory(entry.path, conn)
except PermissionError:
print(f"Permission denied: {dir_path}")
except Exception as e:
print(f"Error processing {dir_path}: {e}")
def main():
if len(sys.argv) != 2:
print("Usage: ./md5_checker.py <directory_path>")
sys.exit(1)
dir_path = Path(sys.argv[1]).resolve()
if not dir_path.is_dir():
print(f"Error: {dir_path} is not a directory")
sys.exit(1)
db_path = "./md5.sqlite"
conn = init_db(db_path)
try:
process_directory(dir_path, conn)
finally:
conn.close()
if __name__ == "__main__":
main()
===========
$ cat md_check
#!/usr/bin/env python3
import os
import sys
import hashlib
import sqlite3
from pathlib import Path
from datetime import datetime
def calculate_md5(file_path):
"""Calculate MD5 hash of a file."""
md5_hash = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
def main():
db_path = "./md5.sqlite"
if not os.path.exists(db_path):
print("Error: Database file './md5.sqlite' does not exist.")
sys.exit(1)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT file_path, md5sum FROM md5sums")
rows = cursor.fetchall()
cwd = Path(os.getcwd()).resolve()
for row in rows:
file_path = row[0]
stored_md5 = row[1]
timestamp = datetime.now().strftime("%y-%m-%d %H:%M:%S")
try:
relative_path = str(Path(file_path).relative_to(cwd))
except ValueError:
relative_path = file_path # Use absolute path if not relative to cwd
if not os.path.exists(file_path):
print(f"{relative_path} File missing")
else:
try:
current_md5 = calculate_md5(file_path)
if current_md5 != stored_md5:
print(f"{relative_path} File changed: Stored MD5: {stored_md5} Current MD5: {current_md5}")
except Exception as e:
print(f"{relative_path} Error accessing file: {e}")
conn.close()
if __name__ == "__main__":
main()
Back to list