geforkt von LengKundee/NUNA
214 Zeilen
7,8 KiB
Python
214 Zeilen
7,8 KiB
Python
|
|
import unittest
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
import copy
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import trading_data_manager as tdm
|
|
|
|
|
|
class TestTradingDataManager(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.root = Path(self.test_dir) / "trading_data"
|
|
self.logs_dir = self.root / "logs"
|
|
self.raw_csv_dir = self.root / "raw_csv"
|
|
self.reports_dir = self.root / "reports"
|
|
self.archive_dir = self.root / "archive"
|
|
self.trash_dir = self.root / "trash"
|
|
|
|
# Fix: copy default config so we don't pollute global state across tests
|
|
self.cfg = copy.deepcopy(tdm.DEFAULT_CONFIG)
|
|
self.cfg["paths"] = {
|
|
"logs_dir": self.logs_dir.relative_to(self.root),
|
|
"raw_csv_dir": self.raw_csv_dir.relative_to(self.root),
|
|
"reports_dir": self.reports_dir.relative_to(self.root),
|
|
"archive_dir": self.archive_dir.relative_to(self.root),
|
|
"trash_dir": self.trash_dir.relative_to(self.root),
|
|
"run_logs_dir": "automation_logs",
|
|
}
|
|
|
|
tdm.mkdirp(self.logs_dir)
|
|
tdm.mkdirp(self.raw_csv_dir)
|
|
tdm.mkdirp(self.reports_dir)
|
|
tdm.mkdirp(self.archive_dir)
|
|
tdm.mkdirp(self.trash_dir)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_iter_files(self):
|
|
# Create some files and directories
|
|
(self.logs_dir / "log1.txt").touch()
|
|
(self.logs_dir / "log2.txt").touch()
|
|
(self.logs_dir / "subdir").mkdir()
|
|
(self.logs_dir / "subdir" / "log3.txt").touch()
|
|
|
|
files = list(tdm.iter_files(self.logs_dir))
|
|
self.assertEqual(len(files), 2)
|
|
self.assertEqual({p.name for p, s in files}, {"log1.txt", "log2.txt"})
|
|
|
|
def test_riter_files(self):
|
|
# Create some files and directories
|
|
(self.logs_dir / "log1.txt").touch()
|
|
(self.logs_dir / "subdir").mkdir()
|
|
(self.logs_dir / "subdir" / "log2.txt").touch()
|
|
|
|
files = list(tdm.riter_files(self.logs_dir))
|
|
self.assertEqual(len(files), 2)
|
|
self.assertEqual({p.name for p, s in files}, {"log1.txt", "log2.txt"})
|
|
|
|
def test_plan_actions_txt_retention(self):
|
|
# Create an old log file
|
|
old_log = self.logs_dir / "old.txt"
|
|
old_log.touch()
|
|
os.utime(old_log, (time.time() - 15 * 86400, time.time() - 15 * 86400))
|
|
|
|
# Create a new log file
|
|
(self.logs_dir / "new.txt").touch()
|
|
|
|
self.cfg["retention_days"]["txt_to_trash"] = 14
|
|
actions = tdm.plan_actions(self.root, self.cfg)
|
|
|
|
self.assertEqual(len(actions), 1)
|
|
self.assertEqual(actions[0].kind, "move")
|
|
self.assertEqual(actions[0].src, old_log)
|
|
|
|
def test_plan_purge_actions(self):
|
|
# Create an old file in the trash
|
|
old_file = self.trash_dir / "old_file.txt"
|
|
old_file.touch()
|
|
os.utime(old_file, (time.time() - 31 * 86400, time.time() - 31 * 86400))
|
|
|
|
# Create a new file in the trash
|
|
(self.trash_dir / "new_file.txt").touch()
|
|
|
|
self.cfg["retention_days"]["trash_purge"] = 30
|
|
actions = tdm.plan_purge_actions(self.root, self.cfg)
|
|
|
|
self.assertEqual(len(actions), 1)
|
|
self.assertEqual(actions[0].kind, "purge")
|
|
self.assertEqual(actions[0].src, old_file)
|
|
|
|
def test_csv_conversion_flag(self):
|
|
# Create a CSV file
|
|
csv_file = self.raw_csv_dir / "data.csv"
|
|
csv_file.touch()
|
|
|
|
# Test with conversion enabled (default)
|
|
self.cfg["conversion"]["csv_to_xlsx"] = True
|
|
actions = tdm.plan_actions(self.root, self.cfg)
|
|
# Should have 2 actions: convert and move to trash
|
|
self.assertEqual(len([a for a in actions if a.kind == "convert"]), 1)
|
|
self.assertEqual(len([a for a in actions if a.kind == "move" and a.src == csv_file]), 1)
|
|
|
|
# Test with conversion disabled
|
|
self.cfg["conversion"]["csv_to_xlsx"] = False
|
|
actions = tdm.plan_actions(self.root, self.cfg)
|
|
self.assertEqual(len(actions), 0)
|
|
|
|
def test_csv_conversion_skip_if_newer(self):
|
|
# Create a CSV file
|
|
csv_file = self.raw_csv_dir / "data.csv"
|
|
csv_file.touch()
|
|
|
|
# XLSX is at T=1000
|
|
xlsx_file = self.reports_dir / "data.xlsx"
|
|
xlsx_file.touch()
|
|
t_base = time.time()
|
|
os.utime(xlsx_file, (t_base, t_base))
|
|
|
|
# Case 1: CSV is older (T=500)
|
|
os.utime(csv_file, (t_base - 500, t_base - 500))
|
|
|
|
actions = tdm.plan_actions(self.root, self.cfg)
|
|
# Should NOT have any convert actions
|
|
self.assertEqual(len([a for a in actions if a.kind == "convert"]), 0)
|
|
|
|
# Case 2: CSV is newer (T=1500)
|
|
os.utime(csv_file, (t_base + 500, t_base + 500))
|
|
|
|
actions = tdm.plan_actions(self.root, self.cfg)
|
|
# Should have convert action
|
|
self.assertEqual(len([a for a in actions if a.kind == "convert"]), 1)
|
|
|
|
def test_keep_latest_per_day_flag(self):
|
|
# Create two reports for the same day
|
|
report1 = self.reports_dir / "report1.xlsx"
|
|
report1.touch()
|
|
# Ensure report1 is older
|
|
os.utime(report1, (time.time() - 100, time.time() - 100))
|
|
|
|
report2 = self.reports_dir / "report2.xlsx"
|
|
report2.touch()
|
|
|
|
# Test with keep_latest_per_day enabled (default)
|
|
self.cfg["reports"]["keep_latest_per_day"] = True
|
|
actions = tdm.plan_actions(self.root, self.cfg)
|
|
# Should move the older report to archive
|
|
self.assertEqual(len(actions), 1)
|
|
self.assertEqual(actions[0].kind, "move")
|
|
self.assertEqual(actions[0].src, report1)
|
|
|
|
# Test with keep_latest_per_day disabled
|
|
self.cfg["reports"]["keep_latest_per_day"] = False
|
|
actions = tdm.plan_actions(self.root, self.cfg)
|
|
self.assertEqual(len(actions), 0)
|
|
|
|
def test_csv_to_xlsx_sanitization(self):
|
|
# Create a CSV with malicious content
|
|
csv_file = self.raw_csv_dir / "malicious.csv"
|
|
xlsx_file = self.reports_dir / "malicious.xlsx"
|
|
|
|
with csv_file.open("w", newline="", encoding="utf-8") as f:
|
|
import csv
|
|
writer = csv.writer(f)
|
|
writer.writerow(["Header1", "Header2"])
|
|
writer.writerow(["=cmd|' /C calc'!A0", "123"])
|
|
writer.writerow(["+SUM(1,1)", "@SUM(1,1)"])
|
|
writer.writerow(["-SUM(1,1)", "Normal"])
|
|
|
|
# Mock openpyxl to check calls without needing the library installed or writing files
|
|
with patch.dict("sys.modules", {"openpyxl": unittest.mock.MagicMock()}):
|
|
import openpyxl
|
|
mock_wb = openpyxl.Workbook.return_value
|
|
mock_ws = mock_wb.create_sheet.return_value
|
|
|
|
# Side effect to create the file so replace() doesn't fail
|
|
def side_effect_save(path):
|
|
with open(path, "w") as f:
|
|
f.write("dummy")
|
|
mock_wb.save.side_effect = side_effect_save
|
|
|
|
# Since openpyxl is imported inside the function, we need to ensure
|
|
# it uses our mock. However, if openpyxl is already loaded in sys.modules,
|
|
# we need to be careful. The patch.dict above handles it.
|
|
|
|
tdm.csv_to_xlsx(
|
|
csv_file,
|
|
xlsx_file,
|
|
sheet_name="data",
|
|
delimiter=",",
|
|
encoding="utf-8"
|
|
)
|
|
|
|
# Verify calls to ws.append
|
|
# Expect 4 calls (header + 3 rows)
|
|
self.assertEqual(mock_ws.append.call_count, 4)
|
|
|
|
# Check header (not sanitized)
|
|
mock_ws.append.assert_any_call(["Header1", "Header2"])
|
|
|
|
# Check malicious rows (sanitized)
|
|
mock_ws.append.assert_any_call(["'=cmd|' /C calc'!A0", "123"])
|
|
mock_ws.append.assert_any_call(["'+SUM(1,1)", "'@SUM(1,1)"])
|
|
mock_ws.append.assert_any_call(["'-SUM(1,1)", "Normal"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|