366 lines
13 KiB
Python
366 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Database utility module for accessing and querying the SQLite database.
|
|
Provides convenient functions for data retrieval, analysis, and database operations.
|
|
"""
|
|
|
|
import sqlite3
|
|
import pandas as pd
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
# Setup logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DatabaseUtils:
|
|
"""Utility class for database operations."""
|
|
|
|
def __init__(self, db_path: str = "data.sqlite"):
|
|
"""
|
|
Initialize database utilities.
|
|
|
|
Args:
|
|
db_path (str): Path to the SQLite database file
|
|
"""
|
|
self.db_path = db_path
|
|
self._validate_database()
|
|
|
|
def _validate_database(self):
|
|
"""Validate that the database exists and is accessible."""
|
|
if not Path(self.db_path).exists():
|
|
raise FileNotFoundError(f"Database '{self.db_path}' does not exist")
|
|
|
|
def get_connection(self) -> sqlite3.Connection:
|
|
"""Get a database connection."""
|
|
return sqlite3.connect(self.db_path)
|
|
|
|
def get_tables(self) -> List[str]:
|
|
"""
|
|
Get list of all tables in the database.
|
|
|
|
Returns:
|
|
List[str]: List of table names
|
|
"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
|
return [row[0] for row in cursor.fetchall()]
|
|
|
|
def get_table_info(self, table_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Get detailed information about a table.
|
|
|
|
Args:
|
|
table_name (str): Name of the table
|
|
|
|
Returns:
|
|
Dict[str, Any]: Table information including columns, row count, etc.
|
|
"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get column info
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = cursor.fetchall()
|
|
|
|
# Get row count
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
|
|
row_count = cursor.fetchone()[0]
|
|
|
|
return {
|
|
'table_name': table_name,
|
|
'row_count': row_count,
|
|
'column_count': len(columns),
|
|
'columns': [{'name': col[1], 'type': col[2], 'not_null': bool(col[3]),
|
|
'default_value': col[4], 'primary_key': bool(col[5])} for col in columns],
|
|
'column_names': [col[1] for col in columns]
|
|
}
|
|
|
|
def query(self, sql: str, params: Optional[Tuple] = None) -> pd.DataFrame:
|
|
"""
|
|
Execute a SQL query and return results as a DataFrame.
|
|
|
|
Args:
|
|
sql (str): SQL query string
|
|
params (Optional[Tuple]): Parameters for the query
|
|
|
|
Returns:
|
|
pd.DataFrame: Query results
|
|
"""
|
|
with self.get_connection() as conn:
|
|
return pd.read_sql_query(sql, conn, params=params)
|
|
|
|
def execute(self, sql: str, params: Optional[Tuple] = None) -> int:
|
|
"""
|
|
Execute a SQL statement (INSERT, UPDATE, DELETE).
|
|
|
|
Args:
|
|
sql (str): SQL statement
|
|
params (Optional[Tuple]): Parameters for the statement
|
|
|
|
Returns:
|
|
int: Number of affected rows
|
|
"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql, params or ())
|
|
conn.commit()
|
|
return cursor.rowcount
|
|
|
|
def get_table_data(self, table_name: str, limit: Optional[int] = None,
|
|
columns: Optional[List[str]] = None) -> pd.DataFrame:
|
|
"""
|
|
Get data from a specific table.
|
|
|
|
Args:
|
|
table_name (str): Name of the table
|
|
limit (Optional[int]): Limit number of rows returned
|
|
columns (Optional[List[str]]): Specific columns to select
|
|
|
|
Returns:
|
|
pd.DataFrame: Table data
|
|
"""
|
|
col_str = ', '.join(columns) if columns else '*'
|
|
sql = f"SELECT {col_str} FROM {table_name}"
|
|
if limit:
|
|
sql += f" LIMIT {limit}"
|
|
|
|
return self.query(sql)
|
|
|
|
def search_table(self, table_name: str, column: str, value: Any,
|
|
comparison: str = '=') -> pd.DataFrame:
|
|
"""
|
|
Search for records in a table based on column value.
|
|
|
|
Args:
|
|
table_name (str): Name of the table
|
|
column (str): Column to search in
|
|
value (Any): Value to search for
|
|
comparison (str): Comparison operator (=, >, <, LIKE, etc.)
|
|
|
|
Returns:
|
|
pd.DataFrame: Matching records
|
|
"""
|
|
sql = f"SELECT * FROM {table_name} WHERE {column} {comparison} ?"
|
|
return self.query(sql, (value,))
|
|
|
|
def get_database_summary(self) -> Dict[str, Any]:
|
|
"""
|
|
Get a comprehensive summary of the database.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Database summary
|
|
"""
|
|
tables = self.get_tables()
|
|
summary = {
|
|
'database_path': self.db_path,
|
|
'total_tables': len(tables),
|
|
'tables': {}
|
|
}
|
|
|
|
total_rows = 0
|
|
for table in tables:
|
|
info = self.get_table_info(table)
|
|
summary['tables'][table] = info
|
|
total_rows += info['row_count']
|
|
|
|
summary['total_rows'] = total_rows
|
|
return summary
|
|
|
|
def print_database_summary(self):
|
|
"""Print a formatted database summary."""
|
|
summary = self.get_database_summary()
|
|
|
|
print("="*60)
|
|
print("DATABASE SUMMARY")
|
|
print("="*60)
|
|
print(f"Database: {summary['database_path']}")
|
|
print(f"Total Tables: {summary['total_tables']}")
|
|
print(f"Total Rows: {summary['total_rows']:,}")
|
|
print()
|
|
|
|
for table_name, info in summary['tables'].items():
|
|
print(f"Table: {table_name}")
|
|
print(f" Rows: {info['row_count']:,}")
|
|
print(f" Columns: {info['column_count']}")
|
|
print(f" Column Names: {', '.join(info['column_names'])}")
|
|
print()
|
|
|
|
# Convenience functions for specific business logic
|
|
class BusinessAnalytics:
|
|
"""Business-specific analytics functions."""
|
|
|
|
def __init__(self, db_utils: DatabaseUtils):
|
|
self.db = db_utils
|
|
|
|
def get_customer_profile(self, customer_id: int) -> Dict[str, Any]:
|
|
"""Get complete customer profile including purchases and calls."""
|
|
# Get customer basic info
|
|
customer = self.db.query(
|
|
"SELECT * FROM customer_profile_dataset WHERE customer_id = ?",
|
|
(customer_id,)
|
|
)
|
|
|
|
if customer.empty:
|
|
return {"error": f"Customer {customer_id} not found"}
|
|
|
|
# Get purchase history
|
|
purchases = self.db.query(
|
|
"""SELECT p.*, pr.product_name, pr.category, pr.brand
|
|
FROM purchase_history_dataset_with_status p
|
|
LEFT JOIN products_dataset_with_descriptions pr ON p.product_id = pr.product_id
|
|
WHERE p.customer_id = ?
|
|
ORDER BY p.purchase_date DESC""",
|
|
(customer_id,)
|
|
)
|
|
|
|
# Get call summaries
|
|
calls = self.db.query(
|
|
"SELECT * FROM customer_call_summaries WHERE customer_id = ?",
|
|
(customer_id,)
|
|
)
|
|
|
|
return {
|
|
"customer_info": customer.to_dict('records')[0],
|
|
"purchases": purchases.to_dict('records'),
|
|
"calls": calls.to_dict('records'),
|
|
"total_purchases": len(purchases),
|
|
"total_spent": purchases['total_amount'].sum() if not purchases.empty else 0,
|
|
"total_calls": len(calls)
|
|
}
|
|
|
|
def get_product_analytics(self, product_id: Optional[int] = None) -> pd.DataFrame:
|
|
"""Get product sales analytics."""
|
|
sql = """
|
|
SELECT
|
|
pr.product_id,
|
|
pr.product_name,
|
|
pr.category,
|
|
pr.brand,
|
|
pr.price_per_unit,
|
|
COUNT(p.purchase_id) as total_purchases,
|
|
SUM(p.quantity) as total_quantity_sold,
|
|
SUM(p.total_amount) as total_revenue,
|
|
AVG(p.total_amount) as avg_order_value,
|
|
COUNT(DISTINCT p.customer_id) as unique_customers
|
|
FROM products_dataset_with_descriptions pr
|
|
LEFT JOIN purchase_history_dataset_with_status p ON pr.product_id = p.product_id
|
|
"""
|
|
|
|
if product_id:
|
|
sql += " WHERE pr.product_id = ?"
|
|
return self.db.query(sql + " GROUP BY pr.product_id", (product_id,))
|
|
else:
|
|
return self.db.query(sql + " GROUP BY pr.product_id ORDER BY total_revenue DESC")
|
|
|
|
def get_customer_analytics(self) -> pd.DataFrame:
|
|
"""Get customer analytics summary."""
|
|
return self.db.query("""
|
|
SELECT
|
|
c.customer_id,
|
|
c.first_name,
|
|
c.last_name,
|
|
c.city,
|
|
c.state,
|
|
COUNT(p.purchase_id) as total_purchases,
|
|
SUM(p.total_amount) as total_spent,
|
|
AVG(p.total_amount) as avg_order_value,
|
|
MIN(p.purchase_date) as first_purchase,
|
|
MAX(p.purchase_date) as last_purchase,
|
|
COUNT(DISTINCT p.product_id) as unique_products_bought,
|
|
COUNT(cs.customer_id) as total_calls
|
|
FROM customer_profile_dataset c
|
|
LEFT JOIN purchase_history_dataset_with_status p ON c.customer_id = p.customer_id
|
|
LEFT JOIN customer_call_summaries cs ON c.customer_id = cs.customer_id
|
|
GROUP BY c.customer_id
|
|
ORDER BY total_spent DESC
|
|
""")
|
|
|
|
def get_sales_by_category(self) -> pd.DataFrame:
|
|
"""Get sales summary by product category."""
|
|
return self.db.query("""
|
|
SELECT
|
|
pr.category,
|
|
COUNT(p.purchase_id) as total_orders,
|
|
SUM(p.quantity) as total_quantity,
|
|
SUM(p.total_amount) as total_revenue,
|
|
AVG(p.total_amount) as avg_order_value,
|
|
COUNT(DISTINCT p.customer_id) as unique_customers,
|
|
COUNT(DISTINCT pr.product_id) as unique_products
|
|
FROM products_dataset_with_descriptions pr
|
|
LEFT JOIN purchase_history_dataset_with_status p ON pr.product_id = p.product_id
|
|
GROUP BY pr.category
|
|
ORDER BY total_revenue DESC
|
|
""")
|
|
|
|
def get_top_customers(self, limit: int = 10) -> pd.DataFrame:
|
|
"""Get top customers by total spent."""
|
|
return self.db.query(f"""
|
|
SELECT
|
|
c.customer_id,
|
|
c.first_name || ' ' || c.last_name as customer_name,
|
|
c.email,
|
|
c.city,
|
|
c.state,
|
|
SUM(p.total_amount) as total_spent,
|
|
COUNT(p.purchase_id) as total_orders
|
|
FROM customer_profile_dataset c
|
|
JOIN purchase_history_dataset_with_status p ON c.customer_id = p.customer_id
|
|
GROUP BY c.customer_id
|
|
ORDER BY total_spent DESC
|
|
LIMIT {limit}
|
|
""")
|
|
|
|
# Factory function to create database utilities
|
|
def create_db_utils(db_path: str = "data.sqlite") -> Tuple[DatabaseUtils, BusinessAnalytics]:
|
|
"""
|
|
Create database utilities and business analytics instances.
|
|
|
|
Args:
|
|
db_path (str): Path to the database
|
|
|
|
Returns:
|
|
Tuple[DatabaseUtils, BusinessAnalytics]: Database utilities and analytics instances
|
|
"""
|
|
db_utils = DatabaseUtils(db_path)
|
|
analytics = BusinessAnalytics(db_utils)
|
|
return db_utils, analytics
|
|
|
|
if __name__ == "__main__":
|
|
# Example usage
|
|
try:
|
|
db, analytics = create_db_utils()
|
|
|
|
# Print database summary
|
|
db.print_database_summary()
|
|
|
|
# Example queries
|
|
print("\n" + "="*60)
|
|
print("SAMPLE ANALYTICS")
|
|
print("="*60)
|
|
|
|
# Top 5 customers
|
|
print("\nTop 5 Customers by Total Spent:")
|
|
top_customers = analytics.get_top_customers(5)
|
|
print(top_customers.to_string(index=False))
|
|
|
|
# Sales by category
|
|
print("\nSales by Category:")
|
|
category_sales = analytics.get_sales_by_category()
|
|
print(category_sales.to_string(index=False))
|
|
|
|
# Customer profile example (if customer 1 exists)
|
|
print("\nSample Customer Profile (Customer ID 1):")
|
|
profile = analytics.get_customer_profile(1)
|
|
if "error" not in profile:
|
|
print(f"Customer: {profile['customer_info']['first_name']} {profile['customer_info']['last_name']}")
|
|
print(f"Total Purchases: {profile['total_purchases']}")
|
|
print(f"Total Spent: ${profile['total_spent']:.2f}")
|
|
print(f"Total Calls: {profile['total_calls']}")
|
|
else:
|
|
print(profile["error"])
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|