"""
Database handlers for PostgreSQL and SQLite with unified interface.
This module provides handlers for both PostgreSQL and SQLite databases
with a common base class for consistent usage patterns.
Required dependencies:
- For PostgreSQL: pip install qufe[database]
- For SQLite: No additional dependencies (uses standard library)
"""
import os
import sqlite3
from abc import ABC, abstractmethod
from pathlib import Path
from urllib.parse import quote_plus
from typing import List, Dict, Optional, Any, Union
from contextlib import contextmanager
import time
[docs]
def help():
"""
Display help information for database handler functionality.
Shows usage examples and available handlers.
"""
print("qufe.dbhandler - Database Handlers")
print("=" * 40)
print()
print("AVAILABLE HANDLERS:")
print(" • PostgreSQLHandler - For PostgreSQL databases")
print(" • SQLiteHandler - For SQLite3 databases")
print()
# Check PostgreSQL dependencies
try:
_import_sqlalchemy()
print("PostgreSQL: ✓ Dependencies installed")
except ImportError:
print("PostgreSQL: ✗ Missing dependencies")
print(" Install with: pip install qufe[database]")
print("SQLite3: ✓ Ready (standard library)")
print()
print("QUICK EXAMPLES:")
print()
print("# SQLite - Quick data peek")
print("from qufe.dbhandler import SQLiteHandler")
print("SQLiteHandler.quick_peek('data.db')")
print("df = SQLiteHandler.to_dataframe('data.db', 'table_name')")
print()
print("# SQLite - Scan multiple databases")
print("results = SQLiteHandler.quick_scan('/data/folder')")
print("SQLiteHandler.analyze_scan_results(results)")
print()
print("# PostgreSQL - Environment-based connection")
print("from qufe.dbhandler import PostgreSQLHandler")
print("db = PostgreSQLHandler() # Uses .env or environment vars")
print("results = db.execute_query('SELECT * FROM users LIMIT 5')")
# =====================================================================
# Lazy imports for optional dependencies
# =====================================================================
def _import_sqlalchemy():
"""Lazy import SQLAlchemy with helpful error message."""
try:
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
return create_engine, text, Engine
except ImportError as e:
raise ImportError(
"PostgreSQL functionality requires SQLAlchemy. "
"Install with: pip install qufe[database]"
) from e
def _import_dotenv():
"""Lazy import python-dotenv with graceful fallback."""
try:
from dotenv import load_dotenv
return load_dotenv
except ImportError:
return None
def _import_pandas():
"""Lazy import pandas with graceful fallback."""
try:
import pandas as pd
return pd
except ImportError:
return None
# =====================================================================
# Base Handler Class
# =====================================================================
[docs]
class BaseDBHandler(ABC):
"""
Abstract base class for database handlers.
Provides common interface and shared functionality for different
database implementations.
"""
[docs]
def __init__(self):
"""Initialize base handler."""
self.connection = None
[docs]
@abstractmethod
def connect(self) -> None:
"""Establish database connection."""
pass
[docs]
@abstractmethod
def disconnect(self) -> None:
"""Close database connection."""
pass
[docs]
@abstractmethod
def execute_query(self, query: str, params: Optional[tuple] = None) -> List:
"""
Execute a query and return results.
Args:
query: SQL query string
params: Query parameters for safe execution
Returns:
List of query results
"""
pass
[docs]
def execute_many(self, query: str, params_list: List[tuple]) -> None:
"""
Execute same query multiple times with different parameters.
Args:
query: SQL query string with placeholders
params_list: List of parameter tuples
"""
# Default implementation - subclasses can override for optimization
for params in params_list:
self.execute_query(query, params)
[docs]
def table_exists(self, table_name: str) -> bool:
"""
Check if a table exists in the database.
Args:
table_name: Name of the table to check
Returns:
True if table exists, False otherwise
"""
# Default implementation using ANSI SQL
query = """
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_name = ? \
"""
result = self.execute_query(query, (table_name,))
return bool(result and result[0][0] > 0)
[docs]
@contextmanager
def transaction(self):
"""
Context manager for database transactions.
Automatically commits on success, rolls back on error.
"""
try:
yield self
if hasattr(self.connection, 'commit'):
self.connection.commit()
except Exception as e:
if hasattr(self.connection, 'rollback'):
self.connection.rollback()
raise e
[docs]
def __enter__(self):
"""Context manager entry."""
self.connect()
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()
# =====================================================================
# PostgreSQL Handler
# =====================================================================
[docs]
class PostgreSQLHandler(BaseDBHandler):
"""
PostgreSQL connection handler with automatic environment variable support.
Extends BaseDBHandler for PostgreSQL-specific functionality.
"""
[docs]
def __init__(
self,
db_name: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
port: Optional[int] = None):
"""
Initialize PostgreSQL connection handler.
Args:
db_name: Database name (defaults to POSTGRES_DB env var)
user: Username (defaults to POSTGRES_USER env var)
password: Password (defaults to POSTGRES_PASSWORD env var)
host: Host address (defaults to POSTGRES_HOST or 'localhost')
port: Port number (defaults to POSTGRES_PORT or 5432)
"""
super().__init__()
# Import required dependencies
self._create_engine, self._text, self._Engine = _import_sqlalchemy()
# Try to load .env file if available
load_dotenv = _import_dotenv()
if load_dotenv:
load_dotenv()
# Set connection parameters
self.user = user or os.getenv('POSTGRES_USER')
self.password = password or os.getenv('POSTGRES_PASSWORD')
self.host = host or os.getenv('POSTGRES_HOST', 'localhost')
self.port = port or int(os.getenv('POSTGRES_PORT', '5432'))
self.database = db_name or os.getenv('POSTGRES_DB', 'postgres')
if not self.user or not self.password:
raise ValueError(
"Database credentials required. Set POSTGRES_USER and "
"POSTGRES_PASSWORD in .env file or as parameters."
)
self.engine = None
self.connect()
[docs]
def connect(self) -> None:
"""Establish PostgreSQL connection."""
password_encoded = quote_plus(self.password)
url = f"postgresql+psycopg2://{self.user}:{password_encoded}@{self.host}:{self.port}/{self.database}"
self.engine = self._create_engine(url, echo=False, future=True)
self.connection = self.engine
[docs]
def disconnect(self) -> None:
"""Close PostgreSQL connection."""
if self.engine:
self.engine.dispose()
self.engine = None
self.connection = None
[docs]
def execute_query(self, query: str, params: Optional[tuple] = None) -> List:
"""
Execute a SQL query and return results.
Args:
query: SQL query string
params: Query parameters (not used in current implementation)
Returns:
List of query results for SELECT queries, empty list for DDL/DML
"""
with self.engine.connect() as conn:
result = conn.execute(self._text(query))
# Check if the query returns rows (SELECT queries)
if result.returns_rows:
return result.fetchall()
else:
# For DDL (CREATE, ALTER, DROP) and DML (INSERT, UPDATE, DELETE)
# Commit the transaction and return empty list
conn.commit()
return []
[docs]
def get_database_list(self, print_result: bool = False) -> List[str]:
"""
Get list of all databases in the PostgreSQL server.
Args:
print_result: Whether to print the database list
Returns:
List of database names
"""
query = """
SELECT datname
FROM pg_database
WHERE datistemplate = false; \
"""
result = self.execute_query(query)
database_names = [row.datname for row in result]
if print_result:
print("Available databases:")
for db_name in database_names:
print(f" - {db_name}")
return database_names
[docs]
def get_table_list(self, print_result: bool = True) -> List[Dict[str, str]]:
"""
Get list of all tables and views in the current database.
Args:
print_result: Whether to print the table list
Returns:
List of dictionaries containing table information
"""
query = """
SELECT table_catalog, table_schema, table_name, table_type
FROM information_schema.tables
ORDER BY table_schema, table_name; \
"""
result = self.execute_query(query)
tables = [
{
'catalog': row.table_catalog,
'schema': row.table_schema,
'name': row.table_name,
'type': row.table_type
} for row in result
]
if print_result:
public_tables = [
t['name'] for t in tables
if t.get('schema') == 'public'
]
if public_tables:
print(f"\nDatabase: {self.database}")
print(f"Public tables: {public_tables}")
return tables
# =====================================================================
# SQLite Handler
# =====================================================================
[docs]
class SQLiteHandler(BaseDBHandler):
"""
SQLite database handler for local file-based databases.
Provides convenient methods for common SQLite operations in
Jupyter notebook environments.
"""
[docs]
def __init__(self, db_path: Union[str, Path], create_dir: bool = False):
"""
Initialize SQLite handler.
Args:
db_path: Path to SQLite database file
create_dir: Whether to create parent directory if missing
"""
super().__init__()
self.db_path = Path(db_path)
if create_dir and not self.db_path.parent.exists():
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.connect()
[docs]
def connect(self) -> None:
"""Establish SQLite connection."""
self.connection = sqlite3.connect(str(self.db_path))
self.connection.row_factory = sqlite3.Row # Enable column access by name
[docs]
def disconnect(self) -> None:
"""Close SQLite connection."""
if self.connection:
self.connection.close()
self.connection = None
[docs]
def execute_query(self, query: str, params: Optional[tuple] = None) -> List:
"""
Execute a SQL query and return results.
Args:
query: SQL query string
params: Query parameters for safe execution
Returns:
List of query results for SELECT queries, empty list for DDL/DML
"""
cursor = self.connection.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
# Query type detection
query_upper = query.strip().upper()
# Check if query returns data (SELECT, WITH, PRAGMA table_info, etc.)
if (query_upper.startswith(('SELECT', 'WITH', 'PRAGMA')) and
'PRAGMA' in query_upper and 'table_info' in query.lower()):
# PRAGMA table_info returns results
return cursor.fetchall()
elif query_upper.startswith(('SELECT', 'WITH', 'VALUES', 'RETURNING')):
# Standard SELECT queries and CTEs
return cursor.fetchall()
else:
# DDL (CREATE, ALTER, DROP) and DML (INSERT, UPDATE, DELETE)
self.connection.commit()
# Return rowcount for DML operations, empty list for DDL
if query_upper.startswith(('INSERT', 'UPDATE', 'DELETE')):
return [] # Could return [cursor.rowcount] if row count is needed
return []
[docs]
def get_tables(self) -> List[str]:
"""
Get list of all tables in the database.
Returns:
List of table names
"""
query = "SELECT name FROM sqlite_master WHERE type='table'"
results = self.execute_query(query)
return [row['name'] for row in results]
[docs]
def get_table_info(self, table_name: str) -> List[Dict[str, Any]]:
"""
Get column information for a table with multiple fallback methods.
Tries multiple approaches in order:
1. PRAGMA table_info (standard SQLite metadata)
2. Parse CREATE TABLE statement from sqlite_master
3. Infer from sample data using cursor.description
Args:
table_name: Name of the table
Returns:
List of column information dictionaries with keys:
- cid: Column ID (position)
- name: Column name
- type: Data type (may be 'UNKNOWN' if inferred)
- notnull: Whether NOT NULL constraint exists (0 or 1)
- dflt_value: Default value
- pk: Whether primary key (0 or 1)
"""
# Method 1: Try PRAGMA table_info (standard approach)
try:
query = f"PRAGMA table_info({table_name})"
results = self.execute_query(query)
if results:
return [dict(row) for row in results]
except Exception:
# PRAGMA might not be available in some environments
pass
# Method 2: Parse CREATE TABLE statement
try:
schema_info = self._get_schema_from_master(table_name)
if schema_info:
return schema_info
except Exception:
pass
# Method 3: Infer from sample data
try:
sample_info = self._infer_from_sample(table_name)
if sample_info:
return sample_info
except Exception:
pass
# If all methods fail, return minimal info if table exists
if self.table_exists(table_name):
return [{
'cid': 0,
'name': 'unknown',
'type': 'UNKNOWN',
'notnull': 0,
'dflt_value': None,
'pk': 0
}]
return []
def _get_schema_from_master(self, table_name: str) -> List[Dict[str, Any]]:
"""
Extract column information from sqlite_master CREATE TABLE statement.
Args:
table_name: Name of the table
Returns:
List of column information or empty list
"""
query = """
SELECT sql \
FROM sqlite_master
WHERE type ='table' AND name =? \
"""
results = self.execute_query(query, (table_name,))
if not results or not results[0]['sql']:
return []
create_sql = results[0]['sql']
# Basic parsing of CREATE TABLE statement
# This is a simplified parser - production code might need more robust parsing
columns = []
# Extract content between parentheses
import re
match = re.search(r'\((.*)\)', create_sql, re.DOTALL)
if not match:
return []
column_defs = match.group(1)
# Split by comma but not within parentheses (for complex constraints)
parts = re.split(r',(?![^()]*\))', column_defs)
for idx, part in enumerate(parts):
part = part.strip()
if not part:
continue
# Skip constraint definitions (PRIMARY KEY, FOREIGN KEY, etc.)
if part.upper().startswith(('PRIMARY KEY', 'FOREIGN KEY', 'UNIQUE', 'CHECK', 'CONSTRAINT')):
continue
# Extract column name and type
tokens = part.split()
if len(tokens) >= 2:
col_name = tokens[0].strip('`"[]')
col_type = tokens[1].upper() if len(tokens) > 1 else 'TEXT'
# Check for constraints
part_upper = part.upper()
is_pk = 'PRIMARY KEY' in part_upper
is_notnull = 'NOT NULL' in part_upper
# Extract default value if present
default_match = re.search(r'DEFAULT\s+([^\s,]+)', part, re.IGNORECASE)
default_val = default_match.group(1) if default_match else None
columns.append({
'cid': idx,
'name': col_name,
'type': col_type,
'notnull': 1 if is_notnull else 0,
'dflt_value': default_val,
'pk': 1 if is_pk else 0
})
return columns
def _infer_from_sample(self, table_name: str) -> List[Dict[str, Any]]:
"""
Infer column information from sample data.
Uses cursor.description after executing a SELECT query.
Note: Data types and constraints cannot be accurately determined this way.
Args:
table_name: Name of the table
Returns:
List of column information with limited accuracy
"""
# Use LIMIT 0 to get column info without fetching data
query = f"SELECT * FROM {table_name} LIMIT 0"
cursor = self.connection.cursor()
cursor.execute(query)
if not cursor.description:
return []
columns = []
for idx, col_desc in enumerate(cursor.description):
# cursor.description is a tuple: (name, type_code, display_size, internal_size, precision, scale, null_ok)
# For SQLite, most of these are None except name
columns.append({
'cid': idx,
'name': col_desc[0],
'type': 'UNKNOWN', # Cannot reliably determine from cursor
'notnull': 0, # Cannot determine from cursor
'dflt_value': None, # Cannot determine from cursor
'pk': 0 # Cannot determine from cursor
})
# Try to enhance with actual data type inference if table has data
try:
sample_query = f"SELECT * FROM {table_name} LIMIT 1"
sample_results = self.execute_query(sample_query)
if sample_results and len(sample_results) > 0:
sample_row = dict(sample_results[0])
for col in columns:
col_name = col['name']
if col_name in sample_row:
value = sample_row[col_name]
# Basic type inference from sample value
if value is None:
pass # Keep UNKNOWN
elif isinstance(value, int):
col['type'] = 'INTEGER'
elif isinstance(value, float):
col['type'] = 'REAL'
elif isinstance(value, bytes):
col['type'] = 'BLOB'
elif isinstance(value, str):
col['type'] = 'TEXT'
except Exception:
# If sampling fails, return basic column info
pass
return columns
[docs]
def get_table_schema(self, table_name: str, verbose: bool = False) -> Dict[str, Any]:
"""
Get comprehensive table schema information.
Provides detailed information about table structure including
the method used to obtain the information.
Args:
table_name: Name of the table
verbose: Whether to include additional diagnostic info
Returns:
Dictionary containing:
- columns: List of column information
- method: Method used to obtain info ('pragma', 'schema', 'inferred')
- accuracy: Confidence level ('high', 'medium', 'low')
- warnings: List of any warnings
"""
result = {
'columns': [],
'method': None,
'accuracy': None,
'warnings': []
}
# Try PRAGMA first (most accurate)
try:
query = f"PRAGMA table_info({table_name})"
pragma_results = self.execute_query(query)
if pragma_results:
result['columns'] = [dict(row) for row in pragma_results]
result['method'] = 'pragma'
result['accuracy'] = 'high'
if verbose:
print(f"✓ Retrieved schema via PRAGMA for table '{table_name}'")
return result
except Exception as e:
if verbose:
print(f"✗ PRAGMA failed: {e}")
result['warnings'].append("PRAGMA table_info not available")
# Try parsing CREATE TABLE
try:
schema_info = self._get_schema_from_master(table_name)
if schema_info:
result['columns'] = schema_info
result['method'] = 'schema'
result['accuracy'] = 'medium'
result['warnings'].append("Schema parsed from CREATE TABLE statement")
if verbose:
print(f"✓ Retrieved schema via sqlite_master for table '{table_name}'")
return result
except Exception as e:
if verbose:
print(f"✗ Schema parsing failed: {e}")
# Fall back to inference
try:
sample_info = self._infer_from_sample(table_name)
if sample_info:
result['columns'] = sample_info
result['method'] = 'inferred'
result['accuracy'] = 'low'
result['warnings'].append("Column types inferred from data - constraints unknown")
if verbose:
print(f"⚠ Schema inferred from sample data for table '{table_name}'")
return result
except Exception as e:
if verbose:
print(f"✗ Sample inference failed: {e}")
# Table might not exist or is inaccessible
if not self.table_exists(table_name):
result['warnings'].append(f"Table '{table_name}' does not exist")
else:
result['warnings'].append("Unable to retrieve schema information")
return result
[docs]
def table_exists(self, table_name: str) -> bool:
"""
Check if a table exists.
Args:
table_name: Name of the table
Returns:
True if table exists
"""
query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
result = self.execute_query(query, (table_name,))
return len(result) > 0
[docs]
def create_table(self, table_name: str, columns: Dict[str, str]) -> None:
"""
Create a table with specified columns.
Args:
table_name: Name of the table to create
columns: Dictionary of column_name: data_type
"""
column_defs = ', '.join(f"{name} {dtype}" for name, dtype in columns.items())
query = f"CREATE TABLE IF NOT EXISTS {table_name} ({column_defs})"
self.execute_query(query)
[docs]
def insert_data(self, table_name: str, data: Dict[str, Any]) -> None:
"""
Insert a single row of data into a table.
Args:
table_name: Name of the table
data: Dictionary of column_name: value
"""
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data])
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
self.execute_query(query, tuple(data.values()))
[docs]
def read_table(self, table_name: str, columns: str = '*',
where: Optional[str] = None, limit: Optional[int] = None) -> List:
"""
Read data from a table with optional filtering.
Args:
table_name: Name of the table
columns: Columns to select (default: '*')
where: WHERE clause (without 'WHERE' keyword)
limit: Maximum number of rows to return
Returns:
List of query results
"""
query = f"SELECT {columns} FROM {table_name}"
if where:
query += f" WHERE {where}"
if limit:
query += f" LIMIT {limit}"
return self.execute_query(query)
# =========================================================
# Class methods for quick operations without instance
# =========================================================
[docs]
@classmethod
def quick_peek(cls, db_path: str, max_rows: int = 5) -> None:
"""
Quick preview of database contents without creating instance.
Args:
db_path: Path to SQLite database
max_rows: Maximum rows to show per table
"""
with cls(db_path) as db:
tables = db.get_tables()
print(f"Database: {db_path}")
print(f"Tables: {tables}\n")
for table in tables:
print(f"Table: {table}")
print("-" * 40)
# Get column info
columns = db.get_table_info(table)
col_names = [col['name'] for col in columns]
print(f"Columns: {col_names}")
# Show sample data
rows = db.read_table(table, limit=max_rows)
if rows:
print(f"Sample data ({len(rows)} rows):")
for row in rows:
print(f" {dict(row)}")
else:
print(" (empty table)")
print()
[docs]
@classmethod
def to_dataframe(cls, db_path: str, table_name: str,
where: Optional[str] = None) -> Optional[Any]:
"""
Read table directly into pandas DataFrame.
Args:
db_path: Path to SQLite database
table_name: Name of the table
where: Optional WHERE clause
Returns:
pandas DataFrame or None if pandas not available
"""
pd = _import_pandas()
if not pd:
print("pandas not installed. Install with: pip install pandas")
return None
with cls(db_path) as db:
query = f"SELECT * FROM {table_name}"
if where:
query += f" WHERE {where}"
# Use pandas read_sql for better performance
return pd.read_sql(query, db.connection)
[docs]
@classmethod
def read_all_dbs(cls, folder_path: str, table_name: str,
pattern: str = "*.db") -> Dict[str, List]:
"""
Read same table from multiple database files in a folder.
Args:
folder_path: Path to folder containing .db files
table_name: Name of table to read from each database
pattern: File pattern to match (default: "*.db")
Returns:
Dictionary mapping database filename to data
"""
folder = Path(folder_path)
results = {}
for db_file in folder.glob(pattern):
try:
with cls(db_file) as db:
if db.table_exists(table_name):
data = db.read_table(table_name)
results[db_file.name] = data
else:
print(f"Table '{table_name}' not found in {db_file.name}")
except Exception as e:
print(f"Error reading {db_file.name}: {e}")
return results
[docs]
@classmethod
def describe(cls, db_path: str) -> None:
"""
Show complete database structure.
Args:
db_path: Path to SQLite database
"""
with cls(db_path) as db:
print(f"Database: {db_path}")
print(f"Size: {os.path.getsize(db_path):,} bytes")
print("=" * 50)
tables = db.get_tables()
print(f"Tables ({len(tables)}):")
for table in tables:
# Get row count
count_result = db.execute_query(f"SELECT COUNT(*) as cnt FROM {table}")
row_count = count_result[0]['cnt'] if count_result else 0
print(f"\n {table} ({row_count} rows)")
print(" " + "-" * 40)
# Show column details
columns = db.get_table_info(table)
for col in columns:
pk_marker = " [PK]" if col['pk'] else ""
null_marker = "" if col['notnull'] else " (nullable)"
default_val = f" DEFAULT {col['dflt_value']}" if col['dflt_value'] else ""
print(f" {col['name']}: {col['type']}{pk_marker}{null_marker}{default_val}")
[docs]
@classmethod
def read_by_date(cls, db_path: str, table_name: str,
date_column: str = 'timestamp',
start: Optional[str] = None,
end: Optional[str] = None) -> List:
"""
Read data filtered by date range.
Args:
db_path: Path to SQLite database
table_name: Name of the table
date_column: Name of the date column
start: Start date (inclusive)
end: End date (inclusive)
Returns:
List of query results
"""
where_clauses = []
if start:
where_clauses.append(f"{date_column} >= '{start}'")
if end:
where_clauses.append(f"{date_column} <= '{end}'")
where = " AND ".join(where_clauses) if where_clauses else None
with cls(db_path) as db:
return db.read_table(table_name, where=where)
# =========================================================
# Batch scanning methods for multiple databases
# =========================================================
[docs]
@classmethod
def scan_databases(cls, folder_path: str,
required_columns: Optional[List[str]] = None,
pattern: str = '**/*.db',
verbose: bool = True,
jupyter_mode: bool = False) -> Dict[str, Any]:
"""
Scan all SQLite databases in a folder for tables with specific columns.
Efficiently scans multiple database files to find tables containing
specified columns and counts their rows. Useful for data exploration
and migration planning.
Args:
folder_path: Directory path to scan for database files
required_columns: List of column names that tables must have
(None means count all tables)
pattern: Glob pattern for finding database files (default: '**/*.db')
verbose: Display progress and statistics (default: True)
jupyter_mode: Enable Jupyter notebook display mode (default: False)
Returns:
Dictionary containing:
- total_rows: Total number of rows across all matching tables
- matching_tables: Count of tables with required columns
- total_tables_scanned: Total number of tables examined
- processed_files: Number of successfully processed files
- error_files: Number of files with errors
- details: List of detailed information per file
- errors: List of error messages
- summary: Processing summary statistics
Example:
>>> # Basic usage - find tables with specific columns
>>> results = SQLiteHandler.scan_databases(
... folder_path='/data/databases',
... required_columns=['Column_A', 'Column_B'],
... verbose=True
... )
>>> # Scan all tables in all databases
>>> results = SQLiteHandler.scan_databases(
... folder_path='/data',
... pattern='**/*.sqlite',
... required_columns=None
... )
>>> # Jupyter notebook with progress tracking
>>> results = SQLiteHandler.scan_databases(
... folder_path='/data',
... required_columns=['user_id', 'created_at'],
... jupyter_mode=True
... )
"""
folder = Path(folder_path)
# Initialize results dictionary with additional fields
results = {
'total_rows': 0, # Rows in matching tables only
'matching_tables': 0, # Tables with required columns
'total_tables_scanned': 0, # All tables examined
'processed_files': 0,
'error_files': 0,
'details': [],
'errors': [],
'summary': {}
}
start_time = time.time()
# Find all database files
if verbose:
print("🔍 Scanning for database files...")
db_files = list(folder.glob(pattern))
total_files = len(db_files)
if total_files == 0:
if verbose:
print(f"❌ No files matching pattern '{pattern}' found in {folder_path}")
results['summary'] = {
'elapsed_time': time.time() - start_time,
'files_found': 0
}
return results
if verbose:
print(f"📁 Found {total_files} database file(s)")
if required_columns:
print(f"📋 Looking for tables with columns: {required_columns}")
else:
print("📋 Counting all tables")
print("-" * 60)
# Try to use progress tracker if available
tracker = None
try:
from . import base as qb
if verbose:
tracker = qb.ProgressTracker(
total=total_files,
prefix="Scanning databases",
jupyter_mode=jupyter_mode
)
except ImportError:
pass
# Process each database file
for file_idx, db_file in enumerate(db_files, 1):
file_info = {
'file': str(db_file),
'file_name': db_file.name,
'tables': [],
'file_total_rows': 0, # Rows in matching tables
'file_total_rows_all': 0, # Rows in all tables
'file_matching_tables': 0, # Number of matching tables
'file_total_tables': 0, # Total tables in file
'file_size': db_file.stat().st_size
}
# Manual progress display if tracker not available
if verbose and not tracker and not jupyter_mode:
print(f"[{file_idx}/{total_files}] Processing: {db_file.name}")
elif verbose and jupyter_mode and not tracker:
# Simple jupyter progress without tracker
try:
from IPython.display import clear_output
clear_output(wait=True)
print(f"Processing: {file_idx}/{total_files} - {db_file.name}")
print(f"Progress: {(file_idx / total_files) * 100:.1f}%")
except ImportError:
print(f"[{file_idx}/{total_files}] Processing: {db_file.name}")
try:
with cls(db_file) as db:
tables = db.get_tables()
file_info['file_total_tables'] = len(tables)
for table in tables:
try:
# Get column information
schema_info = db.get_table_schema(table, verbose=False)
columns = schema_info['columns']
column_names = [col['name'] for col in columns]
# Check if required columns exist (or count all if None)
if required_columns is None:
has_required = True
else:
has_required = all(
col in column_names for col in required_columns
)
# Count rows for all tables
count_query = f"SELECT COUNT(*) as cnt FROM {table}"
count_result = db.execute_query(count_query)
row_count = count_result[0]['cnt'] if count_result else 0
# Create table info with column_matched field
table_info = {
'table': table,
'rows': row_count,
'columns': column_names,
'column_matched': has_required, # New field indicating match status
'schema_method': schema_info['method'],
'schema_accuracy': schema_info['accuracy']
}
# Always add table info
file_info['tables'].append(table_info)
file_info['file_total_rows_all'] += row_count
results['total_tables_scanned'] += 1
# Update matching statistics only if columns match
if has_required:
file_info['file_total_rows'] += row_count
file_info['file_matching_tables'] += 1
results['total_rows'] += row_count
results['matching_tables'] += 1
except Exception as e:
error_msg = f"Table '{table}' in {db_file.name}: {str(e)}"
results['errors'].append(error_msg)
results['processed_files'] += 1
# Always store file info, even if no matching tables
results['details'].append(file_info)
except Exception as e:
error_msg = f"File '{db_file.name}': {str(e)}"
results['errors'].append(error_msg)
results['error_files'] += 1
# Update progress with more accurate feedback
if tracker:
# Determine status message
if file_info['file_total_tables'] == 0:
status_msg = "No tables found"
elif required_columns:
if file_info['file_matching_tables'] == 0:
status_msg = f"No matches (scanned {file_info['file_total_tables']} tables)"
else:
status_msg = f"Found {file_info['file_matching_tables']}/{file_info['file_total_tables']} matching"
else:
status_msg = f"Found {file_info['file_total_tables']} tables"
tracker.update(
increment=1,
item=db_file.name,
status=status_msg # Pass status message to tracker
)
# Calculate summary statistics
elapsed_time = time.time() - start_time
# Count files with/without matching tables
files_with_matches = sum(1 for d in results['details']
if d.get('file_matching_tables', 0) > 0)
# Calculate total rows across all tables (not just matching)
total_rows_all = sum(d.get('file_total_rows_all', 0)
for d in results['details'])
results['summary'] = {
'elapsed_time': elapsed_time,
'elapsed_formatted': cls._format_time(elapsed_time),
'files_found': total_files,
'files_processed': results['processed_files'],
'files_with_matches': files_with_matches,
'files_without_matches': results['processed_files'] - files_with_matches,
'average_time_per_file': elapsed_time / total_files if total_files > 0 else 0,
'total_size_bytes': sum(d['file_size'] for d in results['details']),
'rows_per_second': results['total_rows'] / elapsed_time if elapsed_time > 0 else 0,
'total_rows_all_tables': total_rows_all,
'match_rate': (results['matching_tables'] / results['total_tables_scanned'] * 100)
if results['total_tables_scanned'] > 0 else 0
}
# Finish progress tracking
if tracker:
tracker.finish(show_summary=False)
# Display final summary
if verbose:
cls._display_scan_summary(results, required_columns)
return results
@classmethod
def _format_time(cls, seconds: float) -> str:
"""Format seconds into human-readable time string."""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes}m {secs}s"
else:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours}h {minutes}m {secs}s"
@classmethod
def _display_scan_summary(cls, results: Dict[str, Any],
required_columns: Optional[List[str]] = None) -> None:
"""
Display formatted summary of scan results.
Args:
results: Scan results dictionary
required_columns: List of required columns (for display)
"""
print("\n" + "=" * 60)
print("📊 Scan Complete - Summary")
print("=" * 60)
summary = results['summary']
print(f"⏱️ Time elapsed: {summary.get('elapsed_formatted', summary['elapsed_time'])}")
print(f"📁 Files scanned: {results['processed_files']}/{summary['files_found']}")
# Display match statistics
if required_columns:
print(f"✅ Files with matching tables: {summary['files_with_matches']}")
print(f"➖ Files without matching tables: {summary['files_without_matches']}")
else:
print(f"✅ Files with tables: {summary['files_with_matches']}")
if results['error_files'] > 0:
print(f"❌ Files with errors: {results['error_files']}")
print(f"\n📈 Data Statistics:")
print(f" Tables scanned: {results['total_tables_scanned']}")
if required_columns:
print(f" Matching tables: {results['matching_tables']} ({summary['match_rate']:.1f}%)")
print(f" Rows in matching tables: {results['total_rows']:,}")
print(f" Rows in all tables: {summary['total_rows_all_tables']:,}")
else:
print(f" Total tables: {results['matching_tables']}")
print(f" Total rows: {results['total_rows']:,}")
total_size_mb = summary['total_size_bytes'] / (1024 * 1024)
print(f" Total database size: {total_size_mb:.2f} MB")
if summary['elapsed_time'] > 0:
print(f" Processing speed: {summary['rows_per_second']:.0f} rows/sec")
# Suggest processing method based on data size
print("\n💡 Processing Recommendation:")
cls._suggest_processing_method(results['total_rows'])
# Show errors if any
if results['errors'] and len(results['errors']) <= 5:
print("\n⚠️ Errors encountered:")
for error in results['errors']:
print(f" • {error}")
elif results['errors']:
print(f"\n⚠️ {len(results['errors'])} errors encountered (showing first 5):")
for error in results['errors'][:5]:
print(f" • {error}")
@classmethod
def _suggest_processing_method(cls, total_rows: int) -> None:
"""
Suggest data processing method based on row count.
Args:
total_rows: Total number of rows to process
"""
if total_rows == 0:
print(" ⚠️ No data found")
elif total_rows < 100_000:
print(f" 🟢 In-memory processing recommended ({total_rows:,} rows)")
print(" → Load all data into memory for processing")
elif total_rows < 1_000_000:
print(f" 🟡 Batch processing recommended ({total_rows:,} rows)")
print(" → Process in chunks or use intermediate storage")
else:
print(f" 🔴 Streaming processing recommended ({total_rows:,} rows)")
print(" → Use generators or database-level operations")
# Estimate memory usage
estimated_mb = (total_rows * 100) / (1024 * 1024) # Assume 100 bytes per row
print(f" 📏 Estimated memory usage: ~{estimated_mb:.1f} MB")
[docs]
@classmethod
def analyze_scan_results(cls, results: Dict[str, Any],
top_n: int = 10,
sort_by: str = 'rows',
show_unmatched: bool = False) -> None:
"""
Analyze and display detailed scan results.
Args:
results: Results from scan_databases
top_n: Number of top files to show
sort_by: Sort criterion ('rows', 'tables', 'size', 'match_rate')
show_unmatched: Whether to show unmatched tables (default: False)
"""
if not results['details']:
print("No data found to analyze.")
return
print(f"\n📋 Top {top_n} Files by {sort_by}")
print("-" * 80)
# Sort files based on criterion
if sort_by == 'rows':
sorted_files = sorted(
results['details'],
key=lambda x: x['file_total_rows'],
reverse=True
)
elif sort_by == 'tables':
sorted_files = sorted(
results['details'],
key=lambda x: len(x['tables']),
reverse=True
)
elif sort_by == 'size':
sorted_files = sorted(
results['details'],
key=lambda x: x['file_size'],
reverse=True
)
elif sort_by == 'match_rate':
sorted_files = sorted(
results['details'],
key=lambda x: (x['file_matching_tables'] / x['file_total_tables'] * 100)
if x['file_total_tables'] > 0 else 0,
reverse=True
)
else:
sorted_files = results['details']
# Display top files
for i, file_info in enumerate(sorted_files[:top_n], 1):
print(f"\n{i}. {file_info['file_name']}")
print(f" 📂 Path: {file_info['file']}")
print(f" 💾 Size: {file_info['file_size'] / 1024:.1f} KB")
print(f" 📊 Rows (matching/all): {file_info['file_total_rows']:,} / {file_info['file_total_rows_all']:,}")
print(f" 📋 Tables (matching/all): {file_info['file_matching_tables']} / {file_info['file_total_tables']}")
if file_info['file_total_tables'] > 0:
match_rate = (file_info['file_matching_tables'] / file_info['file_total_tables']) * 100
print(f" 📈 Match rate: {match_rate:.1f}%")
# Show table details
for table_info in file_info['tables']:
# Skip unmatched tables unless requested
if not show_unmatched and not table_info['column_matched']:
continue
accuracy_marker = ""
if table_info['schema_accuracy'] == 'low':
accuracy_marker = " ⚠️"
elif table_info['schema_accuracy'] == 'medium':
accuracy_marker = " ⚡"
match_marker = "✓" if table_info['column_matched'] else "✗"
print(f" └─ [{match_marker}] {table_info['table']}: {table_info['rows']:,} rows{accuracy_marker}")
# Show count of unmatched tables if not displaying them
if not show_unmatched:
unmatched_count = sum(1 for t in file_info['tables'] if not t['column_matched'])
if unmatched_count > 0:
print(f" └─ ... and {unmatched_count} unmatched table(s)")
[docs]
@classmethod
def export_scan_results(cls, results: Dict[str, Any],
output_path: str,
format: str = 'csv') -> None:
"""
Export scan results to file.
Args:
results: Results from scan_databases
output_path: Output file path
format: Export format ('csv', 'json', 'pickle')
"""
import json
import csv
import pickle
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
if format == 'json':
with open(output_file, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"Results exported to {output_file} (JSON)")
elif format == 'csv':
# Flatten data for CSV export
rows = []
for file_info in results['details']:
for table_info in file_info['tables']:
rows.append({
'file': file_info['file_name'],
'file_path': file_info['file'],
'file_size_kb': file_info['file_size'] / 1024,
'table': table_info['table'],
'rows': table_info['rows'],
'columns': ', '.join(table_info['columns'][:10]) # First 10 columns
})
if rows:
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
print(f"Results exported to {output_file} (CSV)")
else:
print("No data to export")
elif format == 'pickle':
with open(output_file, 'wb') as f:
pickle.dump(results, f)
print(f"Results exported to {output_file} (Pickle)")
else:
raise ValueError(f"Unsupported format: {format}")
[docs]
@classmethod
def quick_scan(cls, folder_path: str, **kwargs) -> Dict[str, Any]:
"""
Convenience method for quick database scanning.
Automatically detects Jupyter environment and enables appropriate
display mode for the best user experience.
Args:
folder_path: Directory path to scan
**kwargs: Additional arguments for scan_databases
Returns:
Scan results dictionary
Example:
>>> # Quick scan with automatic environment detection
>>> results = SQLiteHandler.quick_scan('/data/databases')
>>> # Quick scan with specific columns
>>> results = SQLiteHandler.quick_scan(
... '/data',
... required_columns=['column_a', 'column_b']
... )
"""
# Auto-detect Jupyter environment
try:
get_ipython() # This is defined in IPython/Jupyter
jupyter_mode = True
except NameError:
jupyter_mode = False
# Set defaults for quick scanning
kwargs.setdefault('verbose', True)
kwargs.setdefault('jupyter_mode', jupyter_mode)
return cls.scan_databases(folder_path, **kwargs)