agc-chatbot/app.py

537 lines
23 KiB
Python

import streamlit as st
from db.db_utils import get_all_documents, get_document
from embedding.enhanced_rag_service import enhanced_rag_search
from db.db_utils import get_search_history
import re
import json
# Set page configuration
st.set_page_config(
page_title="AGC Document Search",
page_icon="📚",
layout="wide"
)
# Initialize session state for page navigation
if "page" not in st.session_state:
st.session_state.page = "browse"
if "selected_document_id" not in st.session_state:
st.session_state.selected_document_id = None
if "current_query" not in st.session_state:
st.session_state.current_query = ""
if "messages" not in st.session_state:
st.session_state.messages = []
if "previous_page" not in st.session_state:
st.session_state.previous_page = "browse"
# Function to change page
def navigate_to_page(page, document_id=None, query=None):
# Store the previous page before changing
if page != st.session_state.page and page != "document_detail":
st.session_state.previous_page = st.session_state.page
st.session_state.page = page
if document_id is not None:
st.session_state.selected_document_id = document_id
if query is not None:
st.session_state.current_query = query
# Force a rerun to update the UI immediately
st.rerun()
# Browse Documents Page
def browse_documents_page():
st.title("📚 AGC Document Browser")
# Add some CSS styling
st.markdown("""
<style>
.document-card {
background-color: #121212;
border-radius: 10px;
padding: 1rem;
margin-bottom: 1rem;
border-left: 4px solid #4CAF50;
}
.document-title {
font-size: 1.2rem;
margin-bottom: 0.5rem;
color: white;
font-weight: bold;
}
.document-info {
display: flex;
flex-wrap: wrap;
margin-bottom: 0.5rem;
color: #bbb;
font-size: 0.9rem;
}
.document-preview {
margin-top: 0.5rem;
padding: 0.5rem;
background-color: #1e1e1e;
border-radius: 5px;
color: #eee;
font-size: 0.9rem;
max-height: 150px;
overflow-y: auto;
}
.info-badge {
background-color: #333;
padding: 3px 8px;
border-radius: 10px;
margin-right: 10px;
white-space: nowrap;
}
</style>
""", unsafe_allow_html=True)
# Get all documents
documents = get_all_documents()
# Create filters
doc_types = list(set([doc.get('doc_type', 'Unknown') for doc in documents]))
doc_types.insert(0, "All Types")
col1, col2 = st.columns(2)
with col1:
selected_type = st.selectbox("Filter by document type:", doc_types)
with col2:
title_filter = st.text_input("Filter by title or case number:", placeholder="Enter keywords")
# Apply filters
filtered_docs = documents
if selected_type and selected_type != "All Types":
filtered_docs = [doc for doc in filtered_docs if doc.get('doc_type') == selected_type]
if title_filter:
filtered_docs = [
doc for doc in filtered_docs
if title_filter.lower() in doc.get('title', '').lower() or
title_filter.lower() in doc.get('content', '').lower()
]
# Show count of filtered documents
st.write(f"Showing {len(filtered_docs)} document(s)")
# Order documents by created_at date, newest first
filtered_docs = sorted(filtered_docs, key=lambda d: d.get('created_at', ''), reverse=True)
# Create a better layout for documents
for doc in filtered_docs:
# Create a card display for each document
st.markdown(f"""
<div class="document-card">
<div class="document-title">{doc.get('title', 'Untitled Document')}</div>
<div class="document-info">
<span class="info-badge">📄 {doc.get('doc_type', 'Unknown')}</span>
<span class="info-badge">📂 {doc.get('source', 'Unknown Source')}</span>
<span class="info-badge">⏰ {doc.get('created_at', 'Unknown Date')}</span>
</div>
""", unsafe_allow_html=True)
# Preview of content
content = doc.get('content', '')
# Try to extract key information for the preview
file_match = re.search(r"File Number:\s*([^\n]+)", content)
status_match = re.search(r"Status:\s*([^\n]+)", content)
preview_parts = []
if file_match:
preview_parts.append(f"<b>File Number:</b> {file_match.group(1).strip()}")
if status_match:
preview_parts.append(f"<b>Status:</b> {status_match.group(1).strip()}")
# Add a snippet of the content
preview_content = content[:200].replace("\n", " ").strip() + "..." if len(content) > 200 else content
if not preview_parts:
preview_parts = [preview_content]
preview = "<br>".join(preview_parts)
st.markdown(f'<div class="document-preview">{preview}</div>', unsafe_allow_html=True)
# Button to view full document
col1, col2 = st.columns([4, 1])
with col2:
if st.button(f"View Details", key=f"doc_btn_{doc.get('id', 0)}"):
# Store current page for return navigation
st.session_state.previous_page = "browse"
navigate_to_page("document_detail", doc.get('id'))
st.markdown('</div>', unsafe_allow_html=True)
# Search Documents Page
def search_documents_page():
st.title("Search AGC Documents")
search_query = st.text_input("Search documents:", placeholder="Enter search terms")
if search_query:
# Display a spinner while searching
with st.spinner("Searching documents with OpenAI-enhanced query..."):
# Use enhanced RAG search
results = enhanced_rag_search(search_query, profile_search=False)
# Display the enhanced query that was used
if "enhanced_query" in results and results["enhanced_query"] != search_query:
st.info(f"Search query was enhanced to: '{results['enhanced_query']}'")
# Display results
st.subheader("Search Results")
if results["documents"]:
for doc in results["documents"]:
st.markdown(f"**{doc.get('title', 'Untitled')}** (Relevance: {doc['similarity']:.2f})")
st.markdown(f"**Type:** {doc.get('doc_type', 'Unknown')}")
st.markdown(f"{doc.get('content_preview', '')}")
# Button to view full document
if st.button(f"View Details", key=f"result_btn_{doc.get('id', 0)}"):
# Store current page for return navigation
st.session_state.previous_page = "search"
navigate_to_page("document_detail", doc.get('id'))
st.markdown("---")
else:
st.info("No matching documents found.")
# Chat Interface Page
def chat_interface_page():
st.title("Chat with AGC Documents")
st.write("Ask questions about AGC documents in the database! OpenAI will enhance your query and provide better answers.")
# Display chat history
for msg_idx, message in enumerate(st.session_state.messages):
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Show query enhancement if available
if message["role"] == "assistant" and "enhanced_query" in message:
with st.expander("See how your query was enhanced"):
st.markdown(f"**Original query:** {message['original_query']}")
st.markdown(f"**Enhanced query:** {message['enhanced_query']}")
# Display document buttons if available
if message["role"] == "assistant" and "document_results" in message:
st.markdown("**Relevant Documents:**")
for doc_idx, doc in enumerate(message["document_results"]):
st.markdown(f"- **{doc.get('title', 'Untitled')}** (Relevance: {doc['similarity']:.2f})")
st.markdown(f" {doc.get('content_preview', '')}")
# Button to navigate to document detail page - add message index to ensure unique keys
if st.button(f"View Full Document", key=f"history_doc_btn_{msg_idx}_{doc_idx}_{doc.get('id', 0)}"):
# Store current page for return navigation
st.session_state.previous_page = "chat"
navigate_to_page("document_detail", doc.get('id'))
# Chat input
query = st.chat_input("What would you like to know about AGC documents?")
if query:
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": query})
# Display user message
with st.chat_message("user"):
st.markdown(query)
# Get response using enhanced RAG search
with st.chat_message("assistant"):
with st.spinner("Searching documents with OpenAI-enhanced query..."):
results = enhanced_rag_search(query, profile_search=False)
# Display assistant response
st.markdown(results["answer"])
# Store relevant documents for displaying in chat history
relevant_docs = []
if results["documents"]:
# Filter for high relevance documents
relevant_docs = [
doc for doc in results["documents"]
if doc['similarity'] > 0.7
]
if relevant_docs:
st.markdown(f"**Relevant Documents:**")
for doc_idx, doc in enumerate(relevant_docs):
st.markdown(f"- **{doc.get('title', 'Untitled')}** (Relevance: {doc['similarity']:.2f})")
st.markdown(f" {doc.get('content_preview', '')}")
# Add button to view full document details - use current_chat suffix for unique keys
if st.button(f"View Full Document", key=f"current_chat_doc_btn_{doc_idx}_{doc.get('id', 0)}"):
# Store current page for return navigation
st.session_state.previous_page = "chat"
navigate_to_page("document_detail", doc.get('id'))
else:
st.info("No highly relevant documents found.")
# Show enhancement information
with st.expander("See how your query was enhanced"):
st.markdown(f"**Original query:** {query}")
st.markdown(f"**Enhanced query:** {results.get('enhanced_query', query)}")
# Add assistant response to chat history with enhancement info and document results
st.session_state.messages.append({
"role": "assistant",
"content": results["answer"],
"original_query": query,
"enhanced_query": results.get("enhanced_query", query),
"document_results": relevant_docs if relevant_docs else []
})
# Document detail page
def document_detail_page():
# Back button at the top
if st.button("← Back to Previous Page"):
# Return to the page that linked to this document
if st.session_state.get("previous_page"):
navigate_to_page(st.session_state.get("previous_page"))
else:
navigate_to_page("browse")
# Get the selected document
doc = get_document(st.session_state.selected_document_id)
if not doc:
st.error(f"Document not found with ID: {st.session_state.selected_document_id}")
return
# Create a card-like container for document
doc_container = st.container()
with doc_container:
# Add styling with CSS
st.markdown("""
<style>
.doc-header {
padding: 1rem;
background-color: #1e1e1e;
color: white;
border-radius: 10px 10px 0 0;
margin-bottom: 0;
}
.doc-content {
padding: 1.5rem;
background-color: #121212;
color: white;
border: 1px solid #333;
border-radius: 0 0 10px 10px;
margin-top: 0;
}
.section-header {
background-color: #333;
padding: 0.5rem;
border-radius: 5px;
margin-top: 1rem;
margin-bottom: 0.5rem;
}
.data-row {
display: flex;
margin-bottom: 0.5rem;
}
.data-label {
font-weight: bold;
min-width: 150px;
}
.allegation {
background-color: #1e1e1e;
border-left: 4px solid #4CAF50;
padding: 1rem;
margin-bottom: 1rem;
border-radius: 0 5px 5px 0;
}
.person {
background-color: #1e1e1e;
border-left: 4px solid #2196F3;
padding: 0.7rem;
margin-bottom: 0.5rem;
border-radius: 0 5px 5px 0;
}
</style>
""", unsafe_allow_html=True)
# Document Content
content = doc.get('content', '')
doc_type = doc.get('doc_type', 'Unknown')
# Document header with title
st.markdown(f'<div class="doc-header"><h1>{doc.get("title", "Untitled Document")}</h1></div>',
unsafe_allow_html=True)
# Document content
st.markdown('<div class="doc-content">', unsafe_allow_html=True)
# Create two columns for document information
col1, col2 = st.columns(2)
with col1:
# Display document info with icons
st.markdown('<h2>Document Information</h2>', unsafe_allow_html=True)
st.markdown(f'<div class="data-row"><span class="data-label">📄 Type:</span> {doc_type}</div>', unsafe_allow_html=True)
st.markdown(f'<div class="data-row"><span class="data-label">📁 Source:</span> {doc.get("source", "Unknown")}</div>', unsafe_allow_html=True)
st.markdown(f'<div class="data-row"><span class="data-label">⏰ Added:</span> {doc.get("created_at", "Unknown")}</div>', unsafe_allow_html=True)
with col2:
# Display additional info
st.markdown('<h2>Additional Information</h2>', unsafe_allow_html=True)
# Parse key information from content
file_match = re.search(r"File Number:\s*([^\n]+)", content)
status_match = re.search(r"Status:\s*([^\n]+)", content)
dpp_match = re.search(r"DPP Suggestion:\s*([^\n]+)", content)
hod_match = re.search(r"HOD Decision:\s*([^\n]+)", content)
if file_match:
st.markdown(f'<div class="data-row"><span class="data-label">📃 File Number:</span> {file_match.group(1).strip()}</div>', unsafe_allow_html=True)
if status_match:
st.markdown(f'<div class="data-row"><span class="data-label">🔄 Status:</span> {status_match.group(1).strip()}</div>', unsafe_allow_html=True)
if dpp_match:
st.markdown(f'<div class="data-row"><span class="data-label">📝 DPP Suggestion:</span> {dpp_match.group(1).strip()}</div>', unsafe_allow_html=True)
if hod_match:
st.markdown(f'<div class="data-row"><span class="data-label">⚖️ HOD Decision:</span> {hod_match.group(1).strip()}</div>', unsafe_allow_html=True)
# Display the formatted content
st.markdown('<h2 class="section-header">Case Details</h2>', unsafe_allow_html=True)
# Extract and format allegations
allegations_sections = re.split(r"ALLEGATION #\d+:", content)
if len(allegations_sections) > 1:
st.markdown('<h3>Allegations</h3>', unsafe_allow_html=True)
# First section contains the header info, skip it
header_info = allegations_sections[0]
# Show basic case information
basic_info_parts = header_info.split("--- ALLEGATIONS ---")[0].strip().split("\n")
for info in basic_info_parts:
if info and ":" in info:
label, value = info.split(":", 1)
st.markdown(f'<div class="data-row"><span class="data-label">{label}:</span> {value.strip()}</div>',
unsafe_allow_html=True)
# Process each allegation
for i, allegation in enumerate(allegations_sections[1:], 1):
st.markdown(f'<div class="allegation">', unsafe_allow_html=True)
st.markdown(f'<h4>Allegation #{i}</h4>', unsafe_allow_html=True)
# Clean up HTML tags
allegation = re.sub(r'<[^>]+>', '', allegation)
# Format the allegation details
details = allegation.strip().split("\n")
for detail in details:
if detail.strip() and ":" in detail:
try:
label, value = detail.split(":", 1)
st.markdown(f'<div class="data-row"><span class="data-label">{label}:</span> {value.strip()}</div>',
unsafe_allow_html=True)
except:
# If splitting fails, just display the line
st.markdown(f'{detail.strip()}', unsafe_allow_html=True)
st.markdown('</div>', unsafe_allow_html=True)
# Extract and format involved persons
persons_section = re.search(r"--- INVOLVED PERSONS ---\s*([\s\S]*?)(?:$|---)", content)
if persons_section:
st.markdown('<h3>Involved Persons</h3>', unsafe_allow_html=True)
persons_text = persons_section.group(1).strip()
person_entries = persons_text.split("\nPerson ID:")
for i, person_entry in enumerate(person_entries):
if i == 0 and not person_entry.startswith("Person ID:"):
# Skip if it's not a proper person entry
continue
if i == 0:
# First entry already has "Person ID:" prefix
person_entry = person_entry.strip()
else:
# Add back the prefix for other entries
person_entry = f"Person ID:{person_entry.strip()}"
# Process individual person entry
if person_entry.strip():
st.markdown(f'<div class="person">', unsafe_allow_html=True)
# Extract Person ID
id_match = re.search(r"Person ID:\s*(\d+)", person_entry)
if id_match:
person_id = id_match.group(1)
st.markdown(f'<div class="data-row"><span class="data-label">Person ID:</span> {person_id}</div>', unsafe_allow_html=True)
# Extract and format Person Data
data_match = re.search(r"Person Data:\s*({.*})", person_entry, re.DOTALL)
if data_match:
try:
# Try to parse and pretty format the JSON
json_str = data_match.group(1)
person_data = json.loads(json_str)
# Display formatted person data
if isinstance(person_data, dict):
for key, value in person_data.items():
if value: # Only show non-empty values
# Format the key with proper capitalization and spaces
formatted_key = key.replace("_", " ").title()
st.markdown(f'<div class="data-row"><span class="data-label">{formatted_key}:</span> {value}</div>', unsafe_allow_html=True)
else:
st.text(json_str)
except json.JSONDecodeError:
# If JSON parsing fails, show the raw text
st.text(data_match.group(1))
else:
# Display the raw person entry if no JSON data found
st.markdown(person_entry, unsafe_allow_html=True)
st.markdown('</div>', unsafe_allow_html=True)
st.markdown('</div>', unsafe_allow_html=True)
# Back button at the bottom
if st.button("← Back to Previous Page", key="back_bottom"):
if st.session_state.get("previous_page"):
navigate_to_page(st.session_state.get("previous_page"))
else:
navigate_to_page("browse")
# Main page with navigation
def main_page():
# Check if we're on the document detail page
if st.session_state.page == "document_detail":
# Display the document detail page as a full page without sidebar
document_detail_page()
else:
# Create a sidebar for navigation
st.sidebar.title("Navigation")
page = st.sidebar.radio(
"Select a page:",
["Browse Documents", "Search Documents", "Chat Interface"],
format_func=lambda x: x,
index=0 if st.session_state.page == "browse" else
1 if st.session_state.page == "search" else
2 if st.session_state.page == "chat" else 0
)
# Navigate to the selected page
if page == "Browse Documents":
st.session_state.page = "browse"
elif page == "Search Documents":
st.session_state.page = "search"
elif page == "Chat Interface":
st.session_state.page = "chat"
# Render the appropriate page based on session state
if st.session_state.page == "browse":
browse_documents_page()
elif st.session_state.page == "search":
search_documents_page()
elif st.session_state.page == "chat":
chat_interface_page()
# Run the app
if __name__ == "__main__":
main_page()