Source code for qufe.filehandler

import os
import re
from datetime import datetime
import pickle
from pathlib import Path
from typing import Iterable, Callable
import unicodedata

from . import base as qb
from . import texthandler as qth

qb_ts = qb.TS()


[docs] class FileHandler: """ A comprehensive file handling utility class providing various file operations, directory management, and data persistence functionality. """
[docs] def __init__(self): self.path_temp_data = './temp/data'
[docs] @staticmethod def get_file_name(folder_path): """ Get all file names from a directory recursively. Args: folder_path (str): Path to the directory to search Returns: list: List of file names found in the directory """ # f_list = list() # for r_, d_, f_ in os.walk(folder_path): # if len(f_): # for n_ in f_: # f_list += [n_] # return f_list return [n_ for r_, d_, f_ in os.walk(folder_path) for n_ in f_ if len(f_)]
[docs] @staticmethod def get_tree(folder_path: str, normalize: bool = False): """ Get all file paths from a directory recursively. Args: folder_path (str): Path to the directory to search normalize (bool): Whether to apply Unicode normalization Returns: list: List of full file paths found in the directory """ # if not folder_path.endswith('/'): # folder_path += '/' # f_tree = tuple() # for r_, d_, f_ in os.walk(folder_path): # if len(f_): # for n_ in f_: # f_path = os.path.join(r_, n_) # f_path = f_path.replace('\\', '/') # f_tree += (f_path,) # return f_tree r = [os.path.join(r_, n_).replace('\\', '/') for r_, d_, f_ in os.walk(folder_path) for n_ in f_ if len(f_)] if normalize: return [unicodedata.normalize('NFC', foo) for foo in r] else: return r
[docs] @staticmethod def get_latest_by_pattern(directory, pattern): """Deprecated method. Use get_latest_file instead.""" print('Method name changed to get_latest_file. (Parameter changed too.)') raise NotImplementedError
[docs] @staticmethod def get_datetime_from_date_pattern(pattern: str, filename: str) -> datetime: """ Extract datetime from filename using a regex pattern. Args: pattern (str): Regex pattern to match datetime parts filename (str): Filename to extract datetime from Returns: datetime: Extracted datetime object or None if no match """ match = re.match(pattern, filename) result = None if match: parts = list(map(int, match.groups())) if len(parts) == 3: year, month, day = parts result = datetime(year, month, day) elif len(parts) == 5: year, month, day, hour, minute = parts result = datetime(year, month, day, hour, minute) elif len(parts) == 6: year, month, day, hour, minute, second = parts result = datetime(year, month, day, hour, minute, second) else: raise ValueError(f"Unsupported number of datetime parts: {len(parts)}") return result
[docs] @staticmethod def get_int_from_timestamp_pattern(pattern: str, filename: str) -> int: """ Extract integer timestamp from filename using a regex pattern. Args: pattern (str): Regex pattern to match timestamp filename (str): Filename to extract timestamp from Returns: int: Extracted timestamp or None if no match """ match = re.match(pattern, filename) result = None if match: result = int(match.group(1)) return result
[docs] @staticmethod def get_latest_file(directory, extract_fn, pattern, analysis: bool = False): """ Find the latest file in a directory based on a datetime/timestamp pattern. Args: directory (str): Directory path to search extract_fn (Callable): Function to extract datetime/timestamp from filename pattern (str): Regex pattern for filename matching analysis (bool): Whether to print analysis information Returns: tuple: (latest_file_path, timestamp_latest, files) Example 1.: from qufe import filehandler as qfh f_path = './temp/data/' pattern = r'page_data_(\d{10}).pickle' extract_fn = qfh.FileHandler.get_int_from_timestamp_pattern (latest_file, timestamp_latest, files) = qfh.FileHandler.get_latest_file( f_path, extract_fn, pattern) print(latest_file) Example 2.: if, pattern = r"Receipt_(\d{4})_(\d{2})_(\d{2})\.pickle" then, Receipt_2024_10_15.pickle Receipt_2025_01_20.pickle Receipt_2025_03_25.pickle 2025_03_25 is the latest. """ latest_file = None timestamp_latest = None prev_ts = None ts_diff = '' files = list() # Check files in directory for filename in sorted(os.listdir(directory)): timestamp = extract_fn(pattern, filename) if timestamp is not None: timestamp = qb_ts.timestamp_to_datetime(timestamp) # Analysis output if analysis: if prev_ts is not None: ts_diff = timestamp - prev_ts prev_ts = timestamp print(f'{filename} - {qb_ts.get_ts_formatted(timestamp)} (Diff.: {ts_diff})') files.append(filename) # Check if this is the latest if (timestamp_latest is None) or (timestamp > timestamp_latest): timestamp_latest = timestamp latest_file = filename if not latest_file: raise FileNotFoundError('No matching files found.') # Return path + filename latest_file_path = os.path.join(directory, latest_file) print(f'Latest File Name: {latest_file}') return (latest_file_path, timestamp_latest, files)
[docs] @staticmethod def load_pickle(pkl, rb: bool = True): """ Load data from a pickle file. Args: pkl (str): Path to pickle file rb (bool): Whether to open in binary mode Returns: object: Loaded data from pickle file """ mode = 'rb' if rb else 'r' with open(pkl, mode) as f_: pkl = pickle.load(f_) return pkl
[docs] @staticmethod def pickle_to_txt(input_pickle_name: str, output_txt_name: str): """Deprecated method. Use iterable_to_txt_file instead.""" print('Method name changed to "iterable_to_txt_file()"') raise NotImplementedError
[docs] def extract_iterable(self, itrb: Iterable, depth=0) -> list: """ Flatten nested dictionaries or iterables with proper indentation. Args: itrb (Iterable): The iterable to flatten depth (int): Current indentation depth Returns: list: Flattened representation with indentation """ extracted = list() # Handle dictionaries if isinstance(itrb, dict): for (k, v) in itrb.items(): extracted.append(f'{" " * depth}{k}') extracted.extend(self.extract_iterable(v, depth + 1)) # Handle lists/tuples/sets elif isinstance(itrb, (list, tuple, set)): for v in itrb: extracted.extend(self.extract_iterable(v, depth + 1)) if depth < 1: extracted.append('\n') # Handle scalar values else: extracted.append(f'{" " * depth}{itrb}') return extracted
[docs] @staticmethod def list_to_txt_file(lines: list, file_name: str) -> None: """Deprecated method. Use iterable_to_txt_file instead.""" print('Method name changed to "iterable_to_txt_file()"') raise NotImplementedError
[docs] def make_path(self, path: str) -> str: """ Create directory if it doesn't exist. Args: path (str): Path to create Returns: str: Created path """ if (not path) or (not isinstance(path, str)): path = self.path_temp_data os.makedirs(path, exist_ok=True) return path
[docs] def make_file_path(self, path: str, file_name: str) -> str: """ Create full file path by joining directory and filename. Args: path (str): Directory path file_name (str): File name Returns: str: Full file path """ path_made = self.make_path(path) return os.path.join(path_made, file_name)
def _save_file(self, path: str, file_name: str, save_func: Callable[[str], None]) -> None: """ Helper function to save files with error handling. Args: path (str): Directory path file_name (str): File name save_func (Callable): Function to perform the actual save operation """ try: file_path = self.make_file_path(path, file_name) save_func(file_path) print('Save to: ', file_path) except Exception as e: print(f'Error occurred while creating file: {e}')
[docs] def iterable_to_txt_file(self, itrb: Iterable, file_name: str, path: str = '') -> None: """ Save iterable data to a text file. Args: itrb (Iterable): Data to save file_name (str): Output file name path (str): Output directory path """ def save_func(file_path: str) -> None: with open(file_path, 'w', encoding='utf-8') as f_: for itr in itrb: f_.write(f'{itr}\n') self._save_file(path, file_name, save_func)
[docs] def pickle_temp_data(self, data, file_name: str, path: str = '') -> None: """ Save data to a pickle file. Args: data: Data to save file_name (str): Output file name path (str): Output directory path """ def save_func(file_path: str) -> None: with open(file_path, 'wb') as f_: pickle.dump(data, f_) self._save_file(path, file_name, save_func)
[docs] def build_tree(self, path): """ Build a nested dictionary representation of directory structure. Args: path (str): Directory path to build tree from Returns: list: Nested structure representation """ items = [] for name in os.listdir(path): full_path = os.path.join(path, name) if os.path.isdir(full_path): items.append({name: self.build_tree(full_path)}) else: items.append(name) # Sort files and folders by name def sort_key(item): if isinstance(item, str): return item.lower() elif isinstance(item, dict): return list(item.keys())[0].lower() return '' return sorted(items, key=sort_key)
[docs] def tree_to_dict(self, start_path): """ Convert directory tree to dictionary format. Args: start_path (str): Starting directory path Returns: dict: Dictionary representation of directory tree """ return {os.path.basename(os.path.normpath(start_path)): self.build_tree(start_path)}
[docs] def get_contents(self, base_path: str, print_tree: bool = False) -> dict: """ Extract text file contents from directory structure. Args: base_path (str): Base directory path print_tree (bool): Whether to print the directory tree Returns: dict: Dictionary containing file contents """ # Generate tree structure using full path ttd = self.tree_to_dict(base_path) if print_tree: qth.print_dict(ttd) # Create path for _get_contents (remove last folder component) if base_path.endswith('/'): base_path = base_path.rstrip('/') base_path = f'{"/".join(base_path.split("/")[:-1])}' return self._get_contents(ttd, base_path)
def _get_contents(self, d_: dict, path_: str) -> dict: """ Recursively extract text file contents from dictionary structure. Args: d_ (dict): Directory structure dictionary path_ (str): Current path Returns: dict: Dictionary containing file contents """ if isinstance(d_, dict): txt_container = dict() for (k0, v0) in d_.items(): if k0 not in txt_container.keys(): txt_container[k0] = dict() if isinstance(v0, list): for v1 in v0: if isinstance(v1, str): if v1.endswith('.txt'): with open(f'{path_}/{k0}/{v1}', 'r') as f: txt_container[k0].update({ v1: [line.rstrip().replace('\t', ' ') for line in f if len(line)] }) elif isinstance(v1, dict): txt_container[k0].update(self._get_contents(v1, f'{path_}/{k0}')) elif isinstance(v0, dict): txt_container[k0].update(self._get_contents(v0, f'{path_}/{k0}')) else: raise NotImplementedError("Unsupported file type") return txt_container else: raise NotImplementedError("Input must be a dictionary")
[docs] @staticmethod def sanitize_filename(name: str, replacement: str = "_") -> str: """ Sanitize filename by removing invalid characters. Args: name (str): Original filename replacement (str): Character to replace invalid characters with Returns: str: Sanitized filename """ # Remove characters not supported by Windows file system invalid_chars = r'[\\/*?:"<>|]' sanitized = re.sub(invalid_chars, replacement, name).strip() return sanitized if sanitized else "untitled"
[docs] @staticmethod def get_unique_filename(base_dir: Path, base_name: str, extension: str = ".csv") -> Path: """ Generate unique filename in given directory to avoid conflicts. Args: base_dir (Path): Base directory path base_name (str): Base filename without extension extension (str): File extension Returns: Path: Unique file path Example: output_dir = Path("output") output_dir.mkdir(exist_ok=True) for (key, df) in container.items(): base_name = FileHandler.sanitize_filename(key) file_path = FileHandler.get_unique_filename(output_dir, base_name) df.to_csv(file_path, index=False, encoding='utf-8-sig') """ counter = 0 candidate = base_dir / f"{base_name}{extension}" while candidate.exists(): counter += 1 candidate = base_dir / f"{base_name}_{counter}{extension}" return candidate
[docs] @staticmethod def copy_files_by_extension( source_dir: str, dest_dir: str, extension: str, flatten: bool = True, preserve_structure: bool = False, verbose: bool = True) -> tuple: """ Copy all files with specific extension from source directory to destination. Args: source_dir (str): Source directory path to search files dest_dir (str): Destination directory path to copy files extension (str): File extension to search (e.g., '.db', 'db', '*.db') flatten (bool): If True, copy all files to dest_dir root without subdirectories preserve_structure (bool): If True, preserve source directory structure in destination verbose (bool): If True, print copy progress Returns: tuple: (copied_count, failed_files, copied_files) - copied_count (int): Number of successfully copied files - failed_files (list): List of tuples (source_path, error_message) for failed copies - copied_files (list): List of tuples (source_path, dest_path) for successful copies Example: from qufe import filehandler as qfh fh = qfh.FileHandler() # Copy all files with the specified extension from the source folder to the destination folder source = '/source_folder' dest = '/dest_folder/data' copied, failed, files = fh.copy_files_by_extension( source_dir=source, dest_dir=dest, extension='.db', flatten=True ) print(f"Successfully copied: {copied} files") if failed: print(f"Failed to copy: {len(failed)} files") """ from pathlib import Path import shutil # Normalize extension format if not extension.startswith('.'): extension = f'.{extension}' if extension.startswith('*.'): extension = extension[1:] # Convert to Path objects source_path = Path(source_dir) dest_path = Path(dest_dir) # Validate source directory if not source_path.exists(): raise FileNotFoundError(f"Source directory does not exist: {source_dir}") # Create destination directory dest_path.mkdir(parents=True, exist_ok=True) # Initialize counters and lists copied_count = 0 failed_files = [] copied_files = [] # Search pattern for files pattern = f'*{extension}' # Find all matching files recursively for source_file in source_path.rglob(pattern): try: if flatten: # Copy to destination root with unique filename dest_file_path = dest_path / source_file.name dest_file_path = FileHandler._get_unique_path(dest_file_path) elif preserve_structure: # Preserve relative directory structure relative_path = source_file.relative_to(source_path) dest_file_path = dest_path / relative_path dest_file_path.parent.mkdir(parents=True, exist_ok=True) else: # Default: flatten structure dest_file_path = dest_path / source_file.name dest_file_path = FileHandler._get_unique_path(dest_file_path) # Copy file shutil.copy2(source_file, dest_file_path) copied_count += 1 copied_files.append((str(source_file), str(dest_file_path))) if verbose: print(f"복사 완료: {source_file.name} -> {dest_file_path}") except Exception as e: failed_files.append((str(source_file), str(e))) if verbose: print(f"복사 실패: {source_file.name} - 오류: {e}") if verbose: print(f"\n{copied_count}개의 {extension} 파일이 복사되었습니다.") if failed_files: print(f"{len(failed_files)}개의 파일 복사에 실패했습니다.") return copied_count, failed_files, copied_files
@staticmethod def _get_unique_path(file_path: Path) -> Path: """ Generate unique file path by adding suffix if file already exists. Args: file_path (Path): Original file path Returns: Path: Unique file path """ if not file_path.exists(): return file_path stem = file_path.stem suffix = file_path.suffix parent = file_path.parent counter = 1 while True: new_path = parent / f"{stem}_{counter}{suffix}" if not new_path.exists(): return new_path counter += 1
[docs] @staticmethod def batch_copy_files(copy_tasks: list, verbose: bool = True) -> dict: """ Execute multiple file copy tasks with different extensions or directories. Args: copy_tasks (list): List of dictionaries with copy task parameters Each dict should have: source_dir, dest_dir, extension, and optional flatten/preserve_structure verbose (bool): If True, print progress for each task Returns: dict: Results for each task with statistics Example: tasks = [ { 'source_dir': '/source_folder_a', 'dest_dir': '/dest_folder/data_a', 'extension': '.db', 'flatten': True }, { 'source_dir': '/source_folder_b', 'dest_dir': '/dest_folder/data_b', 'extension': '.csv', 'flatten': True } ] results = FileHandler.batch_copy_files(tasks) for i, task in enumerate(tasks): print(f"Task {i+1}: Copied {results[i]['copied_count']} {task['extension']} files") """ results = {} for i, task in enumerate(copy_tasks): if verbose: print(f"\n작업 {i+1}/{len(copy_tasks)} 시작:") print(f" 소스: {task['source_dir']}") print(f" 대상: {task['dest_dir']}") print(f" 확장자: {task['extension']}") print("-" * 50) # Extract parameters with defaults source_dir = task['source_dir'] dest_dir = task['dest_dir'] extension = task['extension'] flatten = task.get('flatten', True) preserve_structure = task.get('preserve_structure', False) # Execute copy task copied, failed, files = FileHandler.copy_files_by_extension( source_dir=source_dir, dest_dir=dest_dir, extension=extension, flatten=flatten, preserve_structure=preserve_structure, verbose=verbose ) # Store results results[i] = { 'task': task, 'copied_count': copied, 'failed_files': failed, 'copied_files': files, 'success_rate': copied / (copied + len(failed)) * 100 if (copied + len(failed)) > 0 else 0 } if verbose: print("\n" + "=" * 50) print("전체 작업 요약:") total_copied = sum(r['copied_count'] for r in results.values()) total_failed = sum(len(r['failed_files']) for r in results.values()) print(f" 총 복사된 파일: {total_copied}개") print(f" 총 실패한 파일: {total_failed}개") if (total_copied + total_failed) > 0: print(f" 전체 성공률: {total_copied / (total_copied + total_failed) * 100:.1f}%") return results
[docs] class PathFinder: """ Interactive directory exploration utility for step-by-step folder traversal. Useful when you don't know the folder structure and want to explore gradually without overwhelming output from os.walk. """
[docs] def __init__(self, start_path='.'): self.current_path = os.path.abspath(start_path)
[docs] def go_up_n_level(self, n_level: int = 1, set_current: bool = True): """ Navigate up directory levels. Args: n_level (int): Number of levels to go up set_current (bool): Whether to update current_path or just return new path Returns: str: New path if set_current is False """ new_path = self.current_path for _ in range(n_level): new_path = os.path.abspath(os.path.join(new_path, os.pardir)) if set_current: self.current_path = new_path else: return new_path
[docs] def get_one_depth(self, input_path: str = '') -> tuple: """ Get directories and files at one depth level using os.scandir. Args: input_path (str): Path to scan (uses current_path if empty) Returns: tuple: (path, directories, files) """ if not len(input_path): input_path = self.current_path try: with os.scandir(input_path) as entries: dirs = list() files = list() for entry in entries: if entry.is_dir(): dirs.append(entry.name) elif entry.is_file(): files.append(entry.name) return input_path, dirs, files except FileNotFoundError: return None, [], []
[docs] @staticmethod def print_each(label: str, items: list) -> None: """ Print list items with numbering and formatting. Args: label (str): Label for the items items (list): Items to print """ if len(items): if isinstance(items, list): lgh = len(items) for k, v in enumerate(sorted(items)): print(f'{label} ({k + 1:0{len(str(lgh))}}/{lgh}): {v}') else: print(f'{label}: {items}') print('')
[docs] def print_result(self, result: tuple) -> None: """ Print formatted result from get_one_depth. Args: result (tuple): Result tuple from get_one_depth """ (root, dirs, files) = result self.print_each("Root:", root) self.print_each("Sub directories:", dirs) self.print_each("Files:", files)