import streamlit as st from db.db_utils import get_all_documents, get_document from embedding.enhanced_rag_service import enhanced_rag_search from db.db_utils import get_search_history import re import json # Set page configuration st.set_page_config( page_title="AGC Document Search", page_icon="📚", layout="wide" ) # Initialize session state for page navigation if "page" not in st.session_state: st.session_state.page = "browse" if "selected_document_id" not in st.session_state: st.session_state.selected_document_id = None if "current_query" not in st.session_state: st.session_state.current_query = "" if "messages" not in st.session_state: st.session_state.messages = [] if "previous_page" not in st.session_state: st.session_state.previous_page = "browse" # Function to change page def navigate_to_page(page, document_id=None, query=None): # Store the previous page before changing if page != st.session_state.page and page != "document_detail": st.session_state.previous_page = st.session_state.page st.session_state.page = page if document_id is not None: st.session_state.selected_document_id = document_id if query is not None: st.session_state.current_query = query # Force a rerun to update the UI immediately st.rerun() # Browse Documents Page def browse_documents_page(): st.title("📚 AGC Document Browser") # Add some CSS styling st.markdown(""" """, unsafe_allow_html=True) # Get all documents documents = get_all_documents() # Create filters doc_types = list(set([doc.get('doc_type', 'Unknown') for doc in documents])) doc_types.insert(0, "All Types") col1, col2 = st.columns(2) with col1: selected_type = st.selectbox("Filter by document type:", doc_types) with col2: title_filter = st.text_input("Filter by title or case number:", placeholder="Enter keywords") # Apply filters filtered_docs = documents if selected_type and selected_type != "All Types": filtered_docs = [doc for doc in filtered_docs if doc.get('doc_type') == selected_type] if title_filter: filtered_docs = [ doc for doc in filtered_docs if title_filter.lower() in doc.get('title', '').lower() or title_filter.lower() in doc.get('content', '').lower() ] # Show count of filtered documents st.write(f"Showing {len(filtered_docs)} document(s)") # Order documents by created_at date, newest first filtered_docs = sorted(filtered_docs, key=lambda d: d.get('created_at', ''), reverse=True) # Create a better layout for documents for doc in filtered_docs: # Create a card display for each document st.markdown(f"""
{doc.get('title', 'Untitled Document')}
📄 {doc.get('doc_type', 'Unknown')} 📂 {doc.get('source', 'Unknown Source')} ⏰ {doc.get('created_at', 'Unknown Date')}
""", unsafe_allow_html=True) # Preview of content content = doc.get('content', '') # Try to extract key information for the preview file_match = re.search(r"File Number:\s*([^\n]+)", content) status_match = re.search(r"Status:\s*([^\n]+)", content) preview_parts = [] if file_match: preview_parts.append(f"File Number: {file_match.group(1).strip()}") if status_match: preview_parts.append(f"Status: {status_match.group(1).strip()}") # Add a snippet of the content preview_content = content[:200].replace("\n", " ").strip() + "..." if len(content) > 200 else content if not preview_parts: preview_parts = [preview_content] preview = "
".join(preview_parts) st.markdown(f'
{preview}
', unsafe_allow_html=True) # Button to view full document col1, col2 = st.columns([4, 1]) with col2: if st.button(f"View Details", key=f"doc_btn_{doc.get('id', 0)}"): # Store current page for return navigation st.session_state.previous_page = "browse" navigate_to_page("document_detail", doc.get('id')) st.markdown('
', unsafe_allow_html=True) # Search Documents Page def search_documents_page(): st.title("Search AGC Documents") search_query = st.text_input("Search documents:", placeholder="Enter search terms") if search_query: # Display a spinner while searching with st.spinner("Searching documents with OpenAI-enhanced query..."): # Use enhanced RAG search results = enhanced_rag_search(search_query, profile_search=False) # Display the enhanced query that was used if "enhanced_query" in results and results["enhanced_query"] != search_query: st.info(f"Search query was enhanced to: '{results['enhanced_query']}'") # Display results st.subheader("Search Results") if results["documents"]: for doc in results["documents"]: st.markdown(f"**{doc.get('title', 'Untitled')}** (Relevance: {doc['similarity']:.2f})") st.markdown(f"**Type:** {doc.get('doc_type', 'Unknown')}") st.markdown(f"{doc.get('content_preview', '')}") # Button to view full document if st.button(f"View Details", key=f"result_btn_{doc.get('id', 0)}"): # Store current page for return navigation st.session_state.previous_page = "search" navigate_to_page("document_detail", doc.get('id')) st.markdown("---") else: st.info("No matching documents found.") # Chat Interface Page def chat_interface_page(): st.title("Chat with AGC Documents") st.write("Ask questions about AGC documents in the database! OpenAI will enhance your query and provide better answers.") # Display chat history for msg_idx, message in enumerate(st.session_state.messages): with st.chat_message(message["role"]): st.markdown(message["content"]) # Show query enhancement if available if message["role"] == "assistant" and "enhanced_query" in message: with st.expander("See how your query was enhanced"): st.markdown(f"**Original query:** {message['original_query']}") st.markdown(f"**Enhanced query:** {message['enhanced_query']}") # Display document buttons if available if message["role"] == "assistant" and "document_results" in message: st.markdown("**Relevant Documents:**") for doc_idx, doc in enumerate(message["document_results"]): st.markdown(f"- **{doc.get('title', 'Untitled')}** (Relevance: {doc['similarity']:.2f})") st.markdown(f" {doc.get('content_preview', '')}") # Button to navigate to document detail page - add message index to ensure unique keys if st.button(f"View Full Document", key=f"history_doc_btn_{msg_idx}_{doc_idx}_{doc.get('id', 0)}"): # Store current page for return navigation st.session_state.previous_page = "chat" navigate_to_page("document_detail", doc.get('id')) # Chat input query = st.chat_input("What would you like to know about AGC documents?") if query: # Add user message to chat history st.session_state.messages.append({"role": "user", "content": query}) # Display user message with st.chat_message("user"): st.markdown(query) # Get response using enhanced RAG search with st.chat_message("assistant"): with st.spinner("Searching documents with OpenAI-enhanced query..."): results = enhanced_rag_search(query, profile_search=False) # Display assistant response st.markdown(results["answer"]) # Store relevant documents for displaying in chat history relevant_docs = [] if results["documents"]: # Filter for high relevance documents relevant_docs = [ doc for doc in results["documents"] if doc['similarity'] > 0.7 ] if relevant_docs: st.markdown(f"**Relevant Documents:**") for doc_idx, doc in enumerate(relevant_docs): st.markdown(f"- **{doc.get('title', 'Untitled')}** (Relevance: {doc['similarity']:.2f})") st.markdown(f" {doc.get('content_preview', '')}") # Add button to view full document details - use current_chat suffix for unique keys if st.button(f"View Full Document", key=f"current_chat_doc_btn_{doc_idx}_{doc.get('id', 0)}"): # Store current page for return navigation st.session_state.previous_page = "chat" navigate_to_page("document_detail", doc.get('id')) else: st.info("No highly relevant documents found.") # Show enhancement information with st.expander("See how your query was enhanced"): st.markdown(f"**Original query:** {query}") st.markdown(f"**Enhanced query:** {results.get('enhanced_query', query)}") # Add assistant response to chat history with enhancement info and document results st.session_state.messages.append({ "role": "assistant", "content": results["answer"], "original_query": query, "enhanced_query": results.get("enhanced_query", query), "document_results": relevant_docs if relevant_docs else [] }) # Document detail page def document_detail_page(): # Back button at the top if st.button("← Back to Previous Page"): # Return to the page that linked to this document if st.session_state.get("previous_page"): navigate_to_page(st.session_state.get("previous_page")) else: navigate_to_page("browse") # Get the selected document doc = get_document(st.session_state.selected_document_id) if not doc: st.error(f"Document not found with ID: {st.session_state.selected_document_id}") return # Create a card-like container for document doc_container = st.container() with doc_container: # Add styling with CSS st.markdown(""" """, unsafe_allow_html=True) # Document Content content = doc.get('content', '') doc_type = doc.get('doc_type', 'Unknown') # Document header with title st.markdown(f'

{doc.get("title", "Untitled Document")}

', unsafe_allow_html=True) # Document content st.markdown('
', unsafe_allow_html=True) # Create two columns for document information col1, col2 = st.columns(2) with col1: # Display document info with icons st.markdown('

Document Information

', unsafe_allow_html=True) st.markdown(f'
📄 Type: {doc_type}
', unsafe_allow_html=True) st.markdown(f'
📁 Source: {doc.get("source", "Unknown")}
', unsafe_allow_html=True) st.markdown(f'
⏰ Added: {doc.get("created_at", "Unknown")}
', unsafe_allow_html=True) with col2: # Display additional info st.markdown('

Additional Information

', unsafe_allow_html=True) # Parse key information from content file_match = re.search(r"File Number:\s*([^\n]+)", content) status_match = re.search(r"Status:\s*([^\n]+)", content) dpp_match = re.search(r"DPP Suggestion:\s*([^\n]+)", content) hod_match = re.search(r"HOD Decision:\s*([^\n]+)", content) if file_match: st.markdown(f'
📃 File Number: {file_match.group(1).strip()}
', unsafe_allow_html=True) if status_match: st.markdown(f'
🔄 Status: {status_match.group(1).strip()}
', unsafe_allow_html=True) if dpp_match: st.markdown(f'
📝 DPP Suggestion: {dpp_match.group(1).strip()}
', unsafe_allow_html=True) if hod_match: st.markdown(f'
⚖️ HOD Decision: {hod_match.group(1).strip()}
', unsafe_allow_html=True) # Display the formatted content st.markdown('

Case Details

', unsafe_allow_html=True) # Extract and format allegations allegations_sections = re.split(r"ALLEGATION #\d+:", content) if len(allegations_sections) > 1: st.markdown('

Allegations

', unsafe_allow_html=True) # First section contains the header info, skip it header_info = allegations_sections[0] # Show basic case information basic_info_parts = header_info.split("--- ALLEGATIONS ---")[0].strip().split("\n") for info in basic_info_parts: if info and ":" in info: label, value = info.split(":", 1) st.markdown(f'
{label}: {value.strip()}
', unsafe_allow_html=True) # Process each allegation for i, allegation in enumerate(allegations_sections[1:], 1): st.markdown(f'
', unsafe_allow_html=True) st.markdown(f'

Allegation #{i}

', unsafe_allow_html=True) # Clean up HTML tags allegation = re.sub(r'<[^>]+>', '', allegation) # Format the allegation details details = allegation.strip().split("\n") for detail in details: if detail.strip() and ":" in detail: try: label, value = detail.split(":", 1) st.markdown(f'
{label}: {value.strip()}
', unsafe_allow_html=True) except: # If splitting fails, just display the line st.markdown(f'{detail.strip()}', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) # Extract and format involved persons persons_section = re.search(r"--- INVOLVED PERSONS ---\s*([\s\S]*?)(?:$|---)", content) if persons_section: st.markdown('

Involved Persons

', unsafe_allow_html=True) persons_text = persons_section.group(1).strip() person_entries = persons_text.split("\nPerson ID:") for i, person_entry in enumerate(person_entries): if i == 0 and not person_entry.startswith("Person ID:"): # Skip if it's not a proper person entry continue if i == 0: # First entry already has "Person ID:" prefix person_entry = person_entry.strip() else: # Add back the prefix for other entries person_entry = f"Person ID:{person_entry.strip()}" # Process individual person entry if person_entry.strip(): st.markdown(f'
', unsafe_allow_html=True) # Extract Person ID id_match = re.search(r"Person ID:\s*(\d+)", person_entry) if id_match: person_id = id_match.group(1) st.markdown(f'
Person ID: {person_id}
', unsafe_allow_html=True) # Extract and format Person Data data_match = re.search(r"Person Data:\s*({.*})", person_entry, re.DOTALL) if data_match: try: # Try to parse and pretty format the JSON json_str = data_match.group(1) person_data = json.loads(json_str) # Display formatted person data if isinstance(person_data, dict): for key, value in person_data.items(): if value: # Only show non-empty values # Format the key with proper capitalization and spaces formatted_key = key.replace("_", " ").title() st.markdown(f'
{formatted_key}: {value}
', unsafe_allow_html=True) else: st.text(json_str) except json.JSONDecodeError: # If JSON parsing fails, show the raw text st.text(data_match.group(1)) else: # Display the raw person entry if no JSON data found st.markdown(person_entry, unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) # Back button at the bottom if st.button("← Back to Previous Page", key="back_bottom"): if st.session_state.get("previous_page"): navigate_to_page(st.session_state.get("previous_page")) else: navigate_to_page("browse") # Main page with navigation def main_page(): # Check if we're on the document detail page if st.session_state.page == "document_detail": # Display the document detail page as a full page without sidebar document_detail_page() else: # Create a sidebar for navigation st.sidebar.title("Navigation") page = st.sidebar.radio( "Select a page:", ["Browse Documents", "Search Documents", "Chat Interface"], format_func=lambda x: x, index=0 if st.session_state.page == "browse" else 1 if st.session_state.page == "search" else 2 if st.session_state.page == "chat" else 0 ) # Navigate to the selected page if page == "Browse Documents": st.session_state.page = "browse" elif page == "Search Documents": st.session_state.page = "search" elif page == "Chat Interface": st.session_state.page = "chat" # Render the appropriate page based on session state if st.session_state.page == "browse": browse_documents_page() elif st.session_state.page == "search": search_documents_page() elif st.session_state.page == "chat": chat_interface_page() # Run the app if __name__ == "__main__": main_page()