#!/usr/bin/env python3 """ Database utility module for accessing and querying the SQLite database. Provides convenient functions for data retrieval, analysis, and database operations. """ import sqlite3 import pandas as pd from typing import List, Dict, Any, Optional, Tuple import logging from pathlib import Path # Setup logging logger = logging.getLogger(__name__) class DatabaseUtils: """Utility class for database operations.""" def __init__(self, db_path: str = "data.sqlite"): """ Initialize database utilities. Args: db_path (str): Path to the SQLite database file """ self.db_path = db_path self._validate_database() def _validate_database(self): """Validate that the database exists and is accessible.""" if not Path(self.db_path).exists(): raise FileNotFoundError(f"Database '{self.db_path}' does not exist") def get_connection(self) -> sqlite3.Connection: """Get a database connection.""" return sqlite3.connect(self.db_path) def get_tables(self) -> List[str]: """ Get list of all tables in the database. Returns: List[str]: List of table names """ with self.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") return [row[0] for row in cursor.fetchall()] def get_table_info(self, table_name: str) -> Dict[str, Any]: """ Get detailed information about a table. Args: table_name (str): Name of the table Returns: Dict[str, Any]: Table information including columns, row count, etc. """ with self.get_connection() as conn: cursor = conn.cursor() # Get column info cursor.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() # Get row count cursor.execute(f"SELECT COUNT(*) FROM {table_name}") row_count = cursor.fetchone()[0] return { 'table_name': table_name, 'row_count': row_count, 'column_count': len(columns), 'columns': [{'name': col[1], 'type': col[2], 'not_null': bool(col[3]), 'default_value': col[4], 'primary_key': bool(col[5])} for col in columns], 'column_names': [col[1] for col in columns] } def query(self, sql: str, params: Optional[Tuple] = None) -> pd.DataFrame: """ Execute a SQL query and return results as a DataFrame. Args: sql (str): SQL query string params (Optional[Tuple]): Parameters for the query Returns: pd.DataFrame: Query results """ with self.get_connection() as conn: return pd.read_sql_query(sql, conn, params=params) def execute(self, sql: str, params: Optional[Tuple] = None) -> int: """ Execute a SQL statement (INSERT, UPDATE, DELETE). Args: sql (str): SQL statement params (Optional[Tuple]): Parameters for the statement Returns: int: Number of affected rows """ with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(sql, params or ()) conn.commit() return cursor.rowcount def get_table_data(self, table_name: str, limit: Optional[int] = None, columns: Optional[List[str]] = None) -> pd.DataFrame: """ Get data from a specific table. Args: table_name (str): Name of the table limit (Optional[int]): Limit number of rows returned columns (Optional[List[str]]): Specific columns to select Returns: pd.DataFrame: Table data """ col_str = ', '.join(columns) if columns else '*' sql = f"SELECT {col_str} FROM {table_name}" if limit: sql += f" LIMIT {limit}" return self.query(sql) def search_table(self, table_name: str, column: str, value: Any, comparison: str = '=') -> pd.DataFrame: """ Search for records in a table based on column value. Args: table_name (str): Name of the table column (str): Column to search in value (Any): Value to search for comparison (str): Comparison operator (=, >, <, LIKE, etc.) Returns: pd.DataFrame: Matching records """ sql = f"SELECT * FROM {table_name} WHERE {column} {comparison} ?" return self.query(sql, (value,)) def get_database_summary(self) -> Dict[str, Any]: """ Get a comprehensive summary of the database. Returns: Dict[str, Any]: Database summary """ tables = self.get_tables() summary = { 'database_path': self.db_path, 'total_tables': len(tables), 'tables': {} } total_rows = 0 for table in tables: info = self.get_table_info(table) summary['tables'][table] = info total_rows += info['row_count'] summary['total_rows'] = total_rows return summary def print_database_summary(self): """Print a formatted database summary.""" summary = self.get_database_summary() print("="*60) print("DATABASE SUMMARY") print("="*60) print(f"Database: {summary['database_path']}") print(f"Total Tables: {summary['total_tables']}") print(f"Total Rows: {summary['total_rows']:,}") print() for table_name, info in summary['tables'].items(): print(f"Table: {table_name}") print(f" Rows: {info['row_count']:,}") print(f" Columns: {info['column_count']}") print(f" Column Names: {', '.join(info['column_names'])}") print() # Convenience functions for specific business logic class BusinessAnalytics: """Business-specific analytics functions.""" def __init__(self, db_utils: DatabaseUtils): self.db = db_utils def get_customer_profile(self, customer_id: int) -> Dict[str, Any]: """Get complete customer profile including purchases and calls.""" # Get customer basic info customer = self.db.query( "SELECT * FROM customer_profile_dataset WHERE customer_id = ?", (customer_id,) ) if customer.empty: return {"error": f"Customer {customer_id} not found"} # Get purchase history purchases = self.db.query( """SELECT p.*, pr.product_name, pr.category, pr.brand FROM purchase_history_dataset_with_status p LEFT JOIN products_dataset_with_descriptions pr ON p.product_id = pr.product_id WHERE p.customer_id = ? ORDER BY p.purchase_date DESC""", (customer_id,) ) # Get call summaries calls = self.db.query( "SELECT * FROM customer_call_summaries WHERE customer_id = ?", (customer_id,) ) return { "customer_info": customer.to_dict('records')[0], "purchases": purchases.to_dict('records'), "calls": calls.to_dict('records'), "total_purchases": len(purchases), "total_spent": purchases['total_amount'].sum() if not purchases.empty else 0, "total_calls": len(calls) } def get_product_analytics(self, product_id: Optional[int] = None) -> pd.DataFrame: """Get product sales analytics.""" sql = """ SELECT pr.product_id, pr.product_name, pr.category, pr.brand, pr.price_per_unit, COUNT(p.purchase_id) as total_purchases, SUM(p.quantity) as total_quantity_sold, SUM(p.total_amount) as total_revenue, AVG(p.total_amount) as avg_order_value, COUNT(DISTINCT p.customer_id) as unique_customers FROM products_dataset_with_descriptions pr LEFT JOIN purchase_history_dataset_with_status p ON pr.product_id = p.product_id """ if product_id: sql += " WHERE pr.product_id = ?" return self.db.query(sql + " GROUP BY pr.product_id", (product_id,)) else: return self.db.query(sql + " GROUP BY pr.product_id ORDER BY total_revenue DESC") def get_customer_analytics(self) -> pd.DataFrame: """Get customer analytics summary.""" return self.db.query(""" SELECT c.customer_id, c.first_name, c.last_name, c.city, c.state, COUNT(p.purchase_id) as total_purchases, SUM(p.total_amount) as total_spent, AVG(p.total_amount) as avg_order_value, MIN(p.purchase_date) as first_purchase, MAX(p.purchase_date) as last_purchase, COUNT(DISTINCT p.product_id) as unique_products_bought, COUNT(cs.customer_id) as total_calls FROM customer_profile_dataset c LEFT JOIN purchase_history_dataset_with_status p ON c.customer_id = p.customer_id LEFT JOIN customer_call_summaries cs ON c.customer_id = cs.customer_id GROUP BY c.customer_id ORDER BY total_spent DESC """) def get_sales_by_category(self) -> pd.DataFrame: """Get sales summary by product category.""" return self.db.query(""" SELECT pr.category, COUNT(p.purchase_id) as total_orders, SUM(p.quantity) as total_quantity, SUM(p.total_amount) as total_revenue, AVG(p.total_amount) as avg_order_value, COUNT(DISTINCT p.customer_id) as unique_customers, COUNT(DISTINCT pr.product_id) as unique_products FROM products_dataset_with_descriptions pr LEFT JOIN purchase_history_dataset_with_status p ON pr.product_id = p.product_id GROUP BY pr.category ORDER BY total_revenue DESC """) def get_top_customers(self, limit: int = 10) -> pd.DataFrame: """Get top customers by total spent.""" return self.db.query(f""" SELECT c.customer_id, c.first_name || ' ' || c.last_name as customer_name, c.email, c.city, c.state, SUM(p.total_amount) as total_spent, COUNT(p.purchase_id) as total_orders FROM customer_profile_dataset c JOIN purchase_history_dataset_with_status p ON c.customer_id = p.customer_id GROUP BY c.customer_id ORDER BY total_spent DESC LIMIT {limit} """) # Factory function to create database utilities def create_db_utils(db_path: str = "data.sqlite") -> Tuple[DatabaseUtils, BusinessAnalytics]: """ Create database utilities and business analytics instances. Args: db_path (str): Path to the database Returns: Tuple[DatabaseUtils, BusinessAnalytics]: Database utilities and analytics instances """ db_utils = DatabaseUtils(db_path) analytics = BusinessAnalytics(db_utils) return db_utils, analytics if __name__ == "__main__": # Example usage try: db, analytics = create_db_utils() # Print database summary db.print_database_summary() # Example queries print("\n" + "="*60) print("SAMPLE ANALYTICS") print("="*60) # Top 5 customers print("\nTop 5 Customers by Total Spent:") top_customers = analytics.get_top_customers(5) print(top_customers.to_string(index=False)) # Sales by category print("\nSales by Category:") category_sales = analytics.get_sales_by_category() print(category_sales.to_string(index=False)) # Customer profile example (if customer 1 exists) print("\nSample Customer Profile (Customer ID 1):") profile = analytics.get_customer_profile(1) if "error" not in profile: print(f"Customer: {profile['customer_info']['first_name']} {profile['customer_info']['last_name']}") print(f"Total Purchases: {profile['total_purchases']}") print(f"Total Spent: ${profile['total_spent']:.2f}") print(f"Total Calls: {profile['total_calls']}") else: print(profile["error"]) except Exception as e: print(f"Error: {e}")