import os import json import mysql.connector import numpy as np import re import sys from dotenv import load_dotenv # Load environment variables load_dotenv() # Print environment variables for debugging print(f"MySQL Host: {os.getenv('MYSQL_HOST')}") print(f"MySQL User: {os.getenv('MYSQL_USER')}") print(f"MySQL Database: {os.getenv('MYSQL_DATABASE')}") print(f"MySQL Password: {'[SET]' if os.getenv('MYSQL_PASSWORD') else '[NOT SET]'}") # Database configuration DB_CONFIG = { 'host': os.getenv('MYSQL_HOST', 'localhost'), 'user': os.getenv('MYSQL_USER', 'root'), 'password': os.getenv('MYSQL_PASSWORD', ''), 'database': os.getenv('MYSQL_DATABASE', 'agc') } def get_db_connection(): """Create a connection to the MySQL database""" return mysql.connector.connect(**DB_CONFIG) def truncate_all_tables(): """Truncate all tables to remove existing data before importing new data""" conn = get_db_connection() cursor = conn.cursor() print("Truncating all tables to ensure a clean import...") # Disable foreign key checks temporarily to allow truncating tables with foreign keys cursor.execute("SET FOREIGN_KEY_CHECKS = 0;") try: # Get all tables in the database cursor.execute("SHOW TABLES;") tables = cursor.fetchall() for table in tables: table_name = table[0] print(f"Truncating table: {table_name}") cursor.execute(f"TRUNCATE TABLE {table_name};") print(f"Successfully truncated {len(tables)} tables") except mysql.connector.Error as err: print(f"Error truncating tables: {err}") finally: # Re-enable foreign key checks cursor.execute("SET FOREIGN_KEY_CHECKS = 1;") conn.commit() cursor.close() conn.close() def setup_tables(): """Create the tables for document search if they don't exist""" conn = get_db_connection() cursor = conn.cursor() # Create document_search tables schema_sql = """ -- Documents table CREATE TABLE IF NOT EXISTS documents ( id INT AUTO_INCREMENT PRIMARY KEY, title VARCHAR(255) NOT NULL, content TEXT NOT NULL, source VARCHAR(255), doc_type VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Embeddings table - store as JSON since MySQL doesn't have a vector type CREATE TABLE IF NOT EXISTS embeddings ( id INT AUTO_INCREMENT PRIMARY KEY, document_id INT NOT NULL, embedding JSON NOT NULL, FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE ); -- Search history CREATE TABLE IF NOT EXISTS search_logs ( id INT AUTO_INCREMENT PRIMARY KEY, query TEXT NOT NULL, results JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ # Execute schema SQL statements for statement in schema_sql.split(';'): if statement.strip(): try: cursor.execute(statement + ';') print(f"Executed: {statement[:50]}...") except mysql.connector.Error as err: print(f"Error executing statement: {err}") print(f"Statement: {statement}") conn.commit() cursor.close() conn.close() print("Document search tables created successfully") def extract_sql_inserts(file_path, table_name): """Extract SQL INSERT statements from file and return as a list of SQL statements""" try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: content = file.read() # Extract all INSERT statements for the specified table pattern = rf"INSERT INTO {table_name}[^;]*;" inserts = re.findall(pattern, content, re.DOTALL) # Clean up the inserts to make them more compatible cleaned_inserts = [] for insert in inserts: # Replace TEXT fields that might have problematic characters # This is a simplistic approach - for a production system, you'd need more robust parsing if 'TEXT' in insert or 'text' in insert or 'Text' in insert: # Try to handle the quotes better insert = re.sub(r"'([^']*?)<([^>]*?)>'", r"'\\1<\\2>'", insert) cleaned_inserts.append(insert) return cleaned_inserts except Exception as e: print(f"Error reading SQL file {file_path}: {e}") return [] def execute_sql_statements(statements): """Execute a list of SQL statements""" if not statements: return conn = get_db_connection() cursor = conn.cursor() successful = 0 failed = 0 for statement in statements: try: cursor.execute(statement) successful += 1 except mysql.connector.Error as err: # Skip duplicate key errors if err.errno == 1062: # Duplicate entry error print(f"Skipping duplicate entry: {err}") else: print(f"Error executing SQL: {err}") print(f"Statement: {statement[:100]}...") # Print first 100 chars of the statement failed += 1 conn.commit() cursor.close() conn.close() print(f"Executed {successful} statements successfully, {failed} statements failed") def scan_directory_for_sql_files(directory): """Scan a directory and its subdirectories for SQL files""" sql_files = { 'info': [], 'allegation': [], 'person': [] } # Walk through the directory for root, dirs, files in os.walk(directory): for file in files: if file.endswith('.sql'): full_path = os.path.join(root, file) # Categorize files if 'LT_LKK_INFO' in file: sql_files['info'].append(full_path) elif 'LT_LKK_ALLEGATION' in file: sql_files['allegation'].append(full_path) elif 'LT_LKK_PERSON' in file or 'PERSON_RESPONSIBLE' in file: sql_files['person'].append(full_path) return sql_files def extract_case_data_from_directory(directory_path): """Extract legal case data directly from a directory""" # Get the directory name (category) category = os.path.basename(directory_path) # Dictionary to store case data cases = [] # Look for PDF files first pdf_files = [] for file in os.listdir(directory_path): if file.lower().endswith('.pdf'): pdf_files.append(os.path.join(directory_path, file)) # Look for SQL files to extract data info_files = [] allegation_files = [] person_files = [] for file in os.listdir(directory_path): if file.endswith('.sql'): full_path = os.path.join(directory_path, file) if 'LT_LKK_INFO' in file: info_files.append(full_path) elif 'LT_LKK_ALLEGATION' in file: allegation_files.append(full_path) elif 'LT_LKK_PERSON' in file or 'PERSON_RESPONSIBLE' in file: person_files.append(full_path) # Create a basic case from the directory case = { 'title': f"Legal Case - {category}", 'content': f"Legal category: {category}\n", 'source': directory_path, 'doc_type': "Legal Case Category", 'pdf_files': pdf_files } cases.append(case) return cases def import_pdf_files(directory): """Import PDF files as documents""" # Import required modules sys.path.append('.') # Add current directory to path from db.db_utils import add_document, store_embedding try: from embedding.embedding_service import generate_embedding except Exception as e: print(f"Error importing regular embedding service: {e}") print("Falling back to HTTP-based embedding service...") from embedding.embedding_service_http import generate_embedding imported_count = 0 # Walk through the directory for root, dirs, files in os.walk(directory): for file in files: if file.lower().endswith('.pdf'): try: full_path = os.path.join(root, file) # Get the category from directory name category = os.path.basename(root) # Try to extract text from PDF (would need a PDF library) # For now, we'll just use the filename as content print(f"Found PDF: {full_path}") # For a real implementation, you would use PyPDF2 or a similar library # This is a placeholder for actual PDF text extraction title = file.replace('.pdf', '') content = f"PDF Document: {file} from category {category}\n\nSource: {full_path}" # Add document to the database doc_id = add_document( title=title, content=content, source=full_path, doc_type="Legal PDF Document" ) # Generate and store embedding embedding = generate_embedding(f"{title} {content}") store_embedding(doc_id, embedding) print(f"Added PDF document: {title}") imported_count += 1 except Exception as e: print(f"Error processing PDF {file}: {e}") return imported_count def import_data_from_directory(directory): """Import data directly from each category directory""" # Import required modules sys.path.append('.') # Add current directory to path from db.db_utils import add_document, store_embedding try: from embedding.embedding_service import generate_embedding except Exception as e: print(f"Error importing regular embedding service: {e}") print("Falling back to HTTP-based embedding service...") from embedding.embedding_service_http import generate_embedding # Count for imported documents imported_count = 0 # Get all subdirectories try: subdirs = [os.path.join(directory, d) for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))] print(f"Found {len(subdirs)} category directories") # Process each directory for subdir in subdirs: category = os.path.basename(subdir) print(f"Processing directory: {category}") # Extract cases from the directory cases = extract_case_data_from_directory(subdir) # Add each case as a document for case in cases: doc_id = add_document( title=case['title'], content=case['content'], source=case['source'], doc_type=case['doc_type'] ) # Generate and store embedding embedding = generate_embedding(f"{case['title']} {case['content']}") store_embedding(doc_id, embedding) print(f"Added document: {case['title']}") imported_count += 1 except Exception as e: print(f"Error scanning directories: {e}") return imported_count def import_sql_data_to_db(): """Import SQL files from Data directory into the database""" data_dir = os.path.join(os.getcwd(), 'Data') print(f"Scanning directory: {data_dir}") sql_files = scan_directory_for_sql_files(data_dir) print(f"Found {len(sql_files['info'])} info files, {len(sql_files['allegation'])} allegation files, {len(sql_files['person'])} person files") # Import info files for file_path in sql_files['info']: print(f"Importing info from {file_path}") statements = extract_sql_inserts(file_path, 'LT_LKK_INFO') execute_sql_statements(statements) # Import allegation files for file_path in sql_files['allegation']: print(f"Importing allegations from {file_path}") statements = extract_sql_inserts(file_path, 'LT_LKK_ALLEGATION') execute_sql_statements(statements) # Import person files for file_path in sql_files['person']: print(f"Importing persons from {file_path}") statements = extract_sql_inserts(file_path, 'LT_LKK_PERSON_INVOLVE') execute_sql_statements(statements) print("SQL data import complete!") # Import directly from directories print("Importing data directly from directories...") dir_docs = import_data_from_directory(data_dir) print(f"Imported {dir_docs} documents from directories") # Also import PDF files print("Looking for PDF files...") pdf_docs = import_pdf_files(data_dir) print(f"Imported {pdf_docs} PDF documents") def check_tables(db_config, label): """Check tables in the specified database""" conn = mysql.connector.connect(**db_config) cursor = conn.cursor() # Get all tables cursor.execute("SHOW TABLES;") tables = cursor.fetchall() print(f"\n{label} tables in database {db_config['database']}:") for table in tables: table_name = table[0] # Count records cursor.execute(f"SELECT COUNT(*) FROM {table_name};") count = cursor.fetchone()[0] print(f" {table_name}: {count} records") cursor.close() conn.close() def convert_user_data_to_documents(): """Convert user data to documents in our application""" # Import required modules sys.path.append('.') # Add current directory to path try: from db.db_utils import add_document, store_embedding from embedding.embedding_service_http import generate_embedding except Exception as e: print(f"Error importing required modules: {e}") return # Connect to the source database source_conn = get_db_connection() source_cursor = source_conn.cursor(dictionary=True) # Get all users with their details and preferences - adjusted for actual schema try: query = """ SELECT u.user_id, u.username, u.email, u.created_at, ud.first_name, ud.last_name, ud.date_of_birth, ud.phone_number, up.language, up.theme, up.notifications_enabled FROM users u LEFT JOIN user_details ud ON u.user_id = ud.user_id LEFT JOIN user_preferences up ON u.user_id = up.user_id """ source_cursor.execute(query) user_records = source_cursor.fetchall() if not user_records: print("No user records found in source database.") return print(f"Found {len(user_records)} user records to convert") # Process each user as a document user_count = 0 for user in user_records: user_id = user.get('user_id') # Create title with user information username = user.get('username', f"User {user_id}") title = f"User Profile: {username}" # Compile content from specific fields based on the schema content_parts = [ f"Username: {user.get('username', 'N/A')}", f"Email: {user.get('email', 'N/A')}", f"Created At: {user.get('created_at', 'N/A')}" ] # Add user details if available if user.get('first_name') or user.get('last_name'): name = f"{user.get('first_name', '')} {user.get('last_name', '')}".strip() content_parts.append(f"Name: {name}") if user.get('date_of_birth'): content_parts.append(f"Date of Birth: {user.get('date_of_birth')}") if user.get('phone_number'): content_parts.append(f"Phone Number: {user.get('phone_number')}") # Add user preferences if available if user.get('language'): content_parts.append(f"Language: {user.get('language')}") if user.get('theme'): content_parts.append(f"Theme: {user.get('theme')}") if user.get('notifications_enabled') is not None: notifications = "Enabled" if user.get('notifications_enabled') else "Disabled" content_parts.append(f"Notifications: {notifications}") # Combine all content content = "\n".join(content_parts) # Add document to the database print(f"Adding document for user {username}") doc_id = add_document( title=title, content=content, source=f"User ID: {user_id} from docdoc database", doc_type="User Profile" ) # Generate and store embedding print(f"Generating embedding for document: {doc_id}") embedding = generate_embedding(f"{title} {content}") store_embedding(doc_id, embedding) user_count += 1 print(f"Conversion complete! Created {user_count} user profile documents.") except mysql.connector.Error as err: print(f"Error accessing user data: {err}") finally: source_cursor.close() source_conn.close() def setup_document_tables(): """Create the document-related tables if they don't exist""" conn = get_db_connection() cursor = conn.cursor() # Create document search tables schema_sql = """ -- Documents table for storing processed LKK data CREATE TABLE IF NOT EXISTS documents ( id INT AUTO_INCREMENT PRIMARY KEY, title VARCHAR(255) NOT NULL, content TEXT NOT NULL, source VARCHAR(255), doc_type VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, LKK_INFOID INT, FOREIGN KEY (LKK_INFOID) REFERENCES lt_lkk_info(LKK_INFOID) ); -- Embeddings table for semantic search CREATE TABLE IF NOT EXISTS embeddings ( id INT AUTO_INCREMENT PRIMARY KEY, document_id INT NOT NULL, embedding JSON NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE ); -- Search history CREATE TABLE IF NOT EXISTS search_logs ( id INT AUTO_INCREMENT PRIMARY KEY, query TEXT NOT NULL, results JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ # Drop existing tables if they exist cursor.execute("DROP TABLE IF EXISTS embeddings;") cursor.execute("DROP TABLE IF EXISTS documents;") cursor.execute("DROP TABLE IF EXISTS search_logs;") # Execute schema SQL statements for statement in schema_sql.split(';'): if statement.strip(): try: cursor.execute(statement + ';') print(f"Executed: {statement[:50]}...") except mysql.connector.Error as err: print(f"Error executing statement: {err}") conn.commit() cursor.close() conn.close() def convert_lkk_to_documents(): """Convert LKK data into searchable documents based on LLA_CASE_NO""" # Import required modules for embeddings sys.path.append('.') try: from embedding.embedding_service import generate_embedding except Exception as e: print(f"Error importing embedding service: {e}") print("Falling back to HTTP-based embedding service...") from embedding.embedding_service_http import generate_embedding conn = get_db_connection() cursor = conn.cursor(dictionary=True) try: # Step 1: Get all unique case numbers from allegations cursor.execute(""" SELECT DISTINCT LLA_CASE_NO FROM lt_lkk_allegation WHERE LLA_CASE_NO IS NOT NULL AND LLA_CASE_NO != '' """) case_numbers = cursor.fetchall() print(f"Found {len(case_numbers)} unique case numbers to process") for case_number_row in case_numbers: case_number = case_number_row['LLA_CASE_NO'] try: # Step 2: For each case number, get all relevant data from allegations and info tables cursor.execute(""" SELECT i.*, a.* FROM lt_lkk_allegation a JOIN lt_lkk_info i ON a.LKK_INFOID = i.LKK_INFOID WHERE a.LLA_CASE_NO = %s GROUP BY a.LLA_ALLEGATION_ID """, (case_number,)) allegation_records = cursor.fetchall() if not allegation_records: print(f"No records found for case number {case_number}") continue # Get the first record to extract basic info first_record = allegation_records[0] lkk_infoid = first_record['LKK_INFOID'] # Step 3: Get person data separately cursor.execute(""" SELECT LTL_PERSON_ID, LTL_DATA FROM lt_lkk_person_involve WHERE LKK_INFOID = %s """, (lkk_infoid,)) person_records = cursor.fetchall() involved_persons = [] for person in person_records: person_id = person['LTL_PERSON_ID'] try: # Parse person data from JSON person_data = {} if person['LTL_DATA']: try: person_data = json.loads(person['LTL_DATA']) except: print(f"Error parsing LTL_DATA JSON for person ID {person_id}") # Instead of just showing the ID, include the full LTL_DATA content person_info = f"Person ID: {person_id}" # Add the full JSON data if person['LTL_DATA']: person_info = f"Person ID: {person_id}\nPerson Data: {person['LTL_DATA']}" involved_persons.append(person_info) except Exception as e: print(f"Error processing person data for ID {person_id}: {e}") # Start with basic case info base_info = { 'case_number': case_number, 'file_number': first_record['LKK_FILE_NO'], 'status': first_record['LKK_STATUS'], 'dpp_suggestion': first_record['LKK_DPP_ANT_SUGGESTION'], 'hod_decision': first_record['LKK_HOD_DECISION'], 'lkk_infoid': lkk_infoid, 'created_date': first_record['LKK_CREATEDDATE'] } # Title using case number title = f"Case Number: {case_number}" # Compile content content_parts = [ f"File Number: {base_info['file_number']}", f"Status: {base_info['status']}", f"Case Number: {case_number}", f"DPP Suggestion: {base_info['dpp_suggestion'] or 'None'}", f"HOD Decision: {base_info['hod_decision'] or 'None'}", "\n--- ALLEGATIONS ---" ] # Add all allegations for idx, record in enumerate(allegation_records, 1): allegation_parts = [ f"\nALLEGATION #{idx}:", f"Allegation ID: {record['LLA_ALLEGATION_ID']}", f"Case Number: {record['LLA_CASE_NO']}", f"Accused Name: {record['LLA_OKT_NAME'] or 'N/A'}", f"Type: {record['LLA_TYPE'] or 'N/A'}", f"Act ID: {record['LLA_ACT_ID'] or 'N/A'}", f"Act Description: {record['LLA_ACT_DESC'] or 'N/A'}", f"Section: {record['LLA_SECTION'] or 'N/A'}", f"Date: {record['LLA_DATE']}", f"Charge Notes: {record['LLA_CHARGE_NOTES'] or 'N/A'}", f"Charge Type: {record['LKK_CHARGE_TYPE'] or 'N/A'}", f"Charge Reason: {record['LLA_CHARGE_REASON'] or 'N/A'}", f"Charged By: {record['LLA_CHARGE_BY'] or 'N/A'}" ] content_parts.extend(allegation_parts) # Add involved persons section if involved_persons: content_parts.append("\n--- INVOLVED PERSONS ---") for person in involved_persons: content_parts.append(person) # Join all content content = "\n".join(filter(None, content_parts)) # Insert into documents table cursor.execute(""" INSERT INTO documents (title, content, source, doc_type, LKK_INFOID, created_at) VALUES (%s, %s, %s, %s, %s, %s) """, ( title, content, f"Case Number: {case_number}", "Legal Case", base_info['lkk_infoid'], base_info['created_date'] )) doc_id = cursor.lastrowid # Generate and store embedding embedding = generate_embedding(f"{title} {content}") # Convert numpy array to list if necessary if isinstance(embedding, np.ndarray): embedding = embedding.tolist() cursor.execute(""" INSERT INTO embeddings (document_id, embedding) VALUES (%s, %s) """, (doc_id, json.dumps(embedding))) conn.commit() print(f"Processed case number {case_number} into document {doc_id}") except Exception as e: print(f"Error processing case number {case_number}: {e}") conn.rollback() except mysql.connector.Error as err: print(f"Database error: {err}") finally: cursor.close() conn.close() if __name__ == "__main__": # Ask for confirmation before proceeding print("This script will convert LKK data into searchable documents.") confirm = input("Do you want to proceed? (y/n): ") if confirm.lower() != 'y': print("Import cancelled.") sys.exit(0) # Set up document tables setup_document_tables() # Convert LKK data to documents convert_lkk_to_documents() print("\nData conversion completed successfully!")