mcp-server-demo/db_utils.py
2025-07-04 14:35:25 +02:00

366 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Database utility module for accessing and querying the SQLite database.
Provides convenient functions for data retrieval, analysis, and database operations.
"""
import sqlite3
import pandas as pd
from typing import List, Dict, Any, Optional, Tuple
import logging
from pathlib import Path
# Setup logging
logger = logging.getLogger(__name__)
class DatabaseUtils:
"""Utility class for database operations."""
def __init__(self, db_path: str = "data.sqlite"):
"""
Initialize database utilities.
Args:
db_path (str): Path to the SQLite database file
"""
self.db_path = db_path
self._validate_database()
def _validate_database(self):
"""Validate that the database exists and is accessible."""
if not Path(self.db_path).exists():
raise FileNotFoundError(f"Database '{self.db_path}' does not exist")
def get_connection(self) -> sqlite3.Connection:
"""Get a database connection."""
return sqlite3.connect(self.db_path)
def get_tables(self) -> List[str]:
"""
Get list of all tables in the database.
Returns:
List[str]: List of table names
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
return [row[0] for row in cursor.fetchall()]
def get_table_info(self, table_name: str) -> Dict[str, Any]:
"""
Get detailed information about a table.
Args:
table_name (str): Name of the table
Returns:
Dict[str, Any]: Table information including columns, row count, etc.
"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Get column info
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
# Get row count
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
row_count = cursor.fetchone()[0]
return {
'table_name': table_name,
'row_count': row_count,
'column_count': len(columns),
'columns': [{'name': col[1], 'type': col[2], 'not_null': bool(col[3]),
'default_value': col[4], 'primary_key': bool(col[5])} for col in columns],
'column_names': [col[1] for col in columns]
}
def query(self, sql: str, params: Optional[Tuple] = None) -> pd.DataFrame:
"""
Execute a SQL query and return results as a DataFrame.
Args:
sql (str): SQL query string
params (Optional[Tuple]): Parameters for the query
Returns:
pd.DataFrame: Query results
"""
with self.get_connection() as conn:
return pd.read_sql_query(sql, conn, params=params)
def execute(self, sql: str, params: Optional[Tuple] = None) -> int:
"""
Execute a SQL statement (INSERT, UPDATE, DELETE).
Args:
sql (str): SQL statement
params (Optional[Tuple]): Parameters for the statement
Returns:
int: Number of affected rows
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params or ())
conn.commit()
return cursor.rowcount
def get_table_data(self, table_name: str, limit: Optional[int] = None,
columns: Optional[List[str]] = None) -> pd.DataFrame:
"""
Get data from a specific table.
Args:
table_name (str): Name of the table
limit (Optional[int]): Limit number of rows returned
columns (Optional[List[str]]): Specific columns to select
Returns:
pd.DataFrame: Table data
"""
col_str = ', '.join(columns) if columns else '*'
sql = f"SELECT {col_str} FROM {table_name}"
if limit:
sql += f" LIMIT {limit}"
return self.query(sql)
def search_table(self, table_name: str, column: str, value: Any,
comparison: str = '=') -> pd.DataFrame:
"""
Search for records in a table based on column value.
Args:
table_name (str): Name of the table
column (str): Column to search in
value (Any): Value to search for
comparison (str): Comparison operator (=, >, <, LIKE, etc.)
Returns:
pd.DataFrame: Matching records
"""
sql = f"SELECT * FROM {table_name} WHERE {column} {comparison} ?"
return self.query(sql, (value,))
def get_database_summary(self) -> Dict[str, Any]:
"""
Get a comprehensive summary of the database.
Returns:
Dict[str, Any]: Database summary
"""
tables = self.get_tables()
summary = {
'database_path': self.db_path,
'total_tables': len(tables),
'tables': {}
}
total_rows = 0
for table in tables:
info = self.get_table_info(table)
summary['tables'][table] = info
total_rows += info['row_count']
summary['total_rows'] = total_rows
return summary
def print_database_summary(self):
"""Print a formatted database summary."""
summary = self.get_database_summary()
print("="*60)
print("DATABASE SUMMARY")
print("="*60)
print(f"Database: {summary['database_path']}")
print(f"Total Tables: {summary['total_tables']}")
print(f"Total Rows: {summary['total_rows']:,}")
print()
for table_name, info in summary['tables'].items():
print(f"Table: {table_name}")
print(f" Rows: {info['row_count']:,}")
print(f" Columns: {info['column_count']}")
print(f" Column Names: {', '.join(info['column_names'])}")
print()
# Convenience functions for specific business logic
class BusinessAnalytics:
"""Business-specific analytics functions."""
def __init__(self, db_utils: DatabaseUtils):
self.db = db_utils
def get_customer_profile(self, customer_id: int) -> Dict[str, Any]:
"""Get complete customer profile including purchases and calls."""
# Get customer basic info
customer = self.db.query(
"SELECT * FROM customer_profile_dataset WHERE customer_id = ?",
(customer_id,)
)
if customer.empty:
return {"error": f"Customer {customer_id} not found"}
# Get purchase history
purchases = self.db.query(
"""SELECT p.*, pr.product_name, pr.category, pr.brand
FROM purchase_history_dataset_with_status p
LEFT JOIN products_dataset_with_descriptions pr ON p.product_id = pr.product_id
WHERE p.customer_id = ?
ORDER BY p.purchase_date DESC""",
(customer_id,)
)
# Get call summaries
calls = self.db.query(
"SELECT * FROM customer_call_summaries WHERE customer_id = ?",
(customer_id,)
)
return {
"customer_info": customer.to_dict('records')[0],
"purchases": purchases.to_dict('records'),
"calls": calls.to_dict('records'),
"total_purchases": len(purchases),
"total_spent": purchases['total_amount'].sum() if not purchases.empty else 0,
"total_calls": len(calls)
}
def get_product_analytics(self, product_id: Optional[int] = None) -> pd.DataFrame:
"""Get product sales analytics."""
sql = """
SELECT
pr.product_id,
pr.product_name,
pr.category,
pr.brand,
pr.price_per_unit,
COUNT(p.purchase_id) as total_purchases,
SUM(p.quantity) as total_quantity_sold,
SUM(p.total_amount) as total_revenue,
AVG(p.total_amount) as avg_order_value,
COUNT(DISTINCT p.customer_id) as unique_customers
FROM products_dataset_with_descriptions pr
LEFT JOIN purchase_history_dataset_with_status p ON pr.product_id = p.product_id
"""
if product_id:
sql += " WHERE pr.product_id = ?"
return self.db.query(sql + " GROUP BY pr.product_id", (product_id,))
else:
return self.db.query(sql + " GROUP BY pr.product_id ORDER BY total_revenue DESC")
def get_customer_analytics(self) -> pd.DataFrame:
"""Get customer analytics summary."""
return self.db.query("""
SELECT
c.customer_id,
c.first_name,
c.last_name,
c.city,
c.state,
COUNT(p.purchase_id) as total_purchases,
SUM(p.total_amount) as total_spent,
AVG(p.total_amount) as avg_order_value,
MIN(p.purchase_date) as first_purchase,
MAX(p.purchase_date) as last_purchase,
COUNT(DISTINCT p.product_id) as unique_products_bought,
COUNT(cs.customer_id) as total_calls
FROM customer_profile_dataset c
LEFT JOIN purchase_history_dataset_with_status p ON c.customer_id = p.customer_id
LEFT JOIN customer_call_summaries cs ON c.customer_id = cs.customer_id
GROUP BY c.customer_id
ORDER BY total_spent DESC
""")
def get_sales_by_category(self) -> pd.DataFrame:
"""Get sales summary by product category."""
return self.db.query("""
SELECT
pr.category,
COUNT(p.purchase_id) as total_orders,
SUM(p.quantity) as total_quantity,
SUM(p.total_amount) as total_revenue,
AVG(p.total_amount) as avg_order_value,
COUNT(DISTINCT p.customer_id) as unique_customers,
COUNT(DISTINCT pr.product_id) as unique_products
FROM products_dataset_with_descriptions pr
LEFT JOIN purchase_history_dataset_with_status p ON pr.product_id = p.product_id
GROUP BY pr.category
ORDER BY total_revenue DESC
""")
def get_top_customers(self, limit: int = 10) -> pd.DataFrame:
"""Get top customers by total spent."""
return self.db.query(f"""
SELECT
c.customer_id,
c.first_name || ' ' || c.last_name as customer_name,
c.email,
c.city,
c.state,
SUM(p.total_amount) as total_spent,
COUNT(p.purchase_id) as total_orders
FROM customer_profile_dataset c
JOIN purchase_history_dataset_with_status p ON c.customer_id = p.customer_id
GROUP BY c.customer_id
ORDER BY total_spent DESC
LIMIT {limit}
""")
# Factory function to create database utilities
def create_db_utils(db_path: str = "data.sqlite") -> Tuple[DatabaseUtils, BusinessAnalytics]:
"""
Create database utilities and business analytics instances.
Args:
db_path (str): Path to the database
Returns:
Tuple[DatabaseUtils, BusinessAnalytics]: Database utilities and analytics instances
"""
db_utils = DatabaseUtils(db_path)
analytics = BusinessAnalytics(db_utils)
return db_utils, analytics
if __name__ == "__main__":
# Example usage
try:
db, analytics = create_db_utils()
# Print database summary
db.print_database_summary()
# Example queries
print("\n" + "="*60)
print("SAMPLE ANALYTICS")
print("="*60)
# Top 5 customers
print("\nTop 5 Customers by Total Spent:")
top_customers = analytics.get_top_customers(5)
print(top_customers.to_string(index=False))
# Sales by category
print("\nSales by Category:")
category_sales = analytics.get_sales_by_category()
print(category_sales.to_string(index=False))
# Customer profile example (if customer 1 exists)
print("\nSample Customer Profile (Customer ID 1):")
profile = analytics.get_customer_profile(1)
if "error" not in profile:
print(f"Customer: {profile['customer_info']['first_name']} {profile['customer_info']['last_name']}")
print(f"Total Purchases: {profile['total_purchases']}")
print(f"Total Spent: ${profile['total_spent']:.2f}")
print(f"Total Calls: {profile['total_calls']}")
else:
print(profile["error"])
except Exception as e:
print(f"Error: {e}")