from decimal import Decimal
from typing import Dict, List, Optional, Union, Tuple
from promapp.models import *
import logging
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, HoverTool, Span, BoxAnnotation, FactorRange
from bokeh.embed import components
from bokeh.palettes import Category10
from datetime import datetime
from bokeh.models.formatters import DatetimeTickFormatter
import math
from django.utils import timezone
from django.conf import settings
from datetime import timedelta
from dateutil.relativedelta import relativedelta
import os
import pandas as pd
from django.contrib.auth.models import User
from django.core.exceptions import PermissionDenied
from django.shortcuts import get_object_or_404
from patientapp.models import Patient, Institution, Diagnosis, Treatment, TreatmentType, TreatmentIntentChoices
# Set up plotting data logger
plotting_logger = logging.getLogger('plotting_data')
plotting_logger.setLevel(logging.INFO)
# Create file handler if it doesn't exist
if not plotting_logger.handlers:
log_dir = os.path.join(settings.BASE_DIR, 'logs')
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, 'plotting_data.log')
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
plotting_logger.addHandler(file_handler)
logger = logging.getLogger(__name__)
[docs]
def get_patient_available_start_dates(patient):
"""Get all available start dates for a patient.
Args:
patient: Patient instance
Returns:
list: List of tuples (reference_key, display_name, date_value)
"""
available_dates = []
try:
# Add registration date if available
if patient.date_of_registration:
available_dates.append((
'date_of_registration',
'Date of Registration',
patient.date_of_registration
))
# Fetch all diagnoses with related data in one optimized query
# This reduces N+1 queries to just 1 query by using select_related and prefetch_related
diagnoses = patient.diagnosis_set.select_related('diagnosis').prefetch_related(
'treatment_set__treatment_type'
).all()
# Process diagnoses and treatments
for i, diagnosis in enumerate(diagnoses):
diagnosis_name = diagnosis.diagnosis.diagnosis if diagnosis.diagnosis else f"Diagnosis {i+1}"
# Add diagnosis date if available
if diagnosis.date_of_diagnosis:
available_dates.append((
f'date_of_diagnosis_{diagnosis.id}',
f'Date of Diagnosis: {diagnosis_name}',
diagnosis.date_of_diagnosis
))
# Process treatments for this diagnosis (now prefetched, no additional queries)
treatments = diagnosis.treatment_set.all()
for j, treatment in enumerate(treatments):
# Get treatment types (now prefetched, no additional queries)
treatment_types = ", ".join([tt.treatment_type for tt in treatment.treatment_type.all()]) if treatment.treatment_type.exists() else f"Treatment {j+1}"
# Add start date if available
if treatment.date_of_start_of_treatment:
available_dates.append((
f'date_of_start_of_treatment_{treatment.id}',
f'Start of Treatment: {treatment_types} ({diagnosis_name})',
treatment.date_of_start_of_treatment
))
# Add end date if available
if treatment.date_of_end_of_treatment:
available_dates.append((
f'date_of_end_of_treatment_{treatment.id}',
f'End of Treatment: {treatment_types} ({diagnosis_name})',
treatment.date_of_end_of_treatment
))
# Sort by date
available_dates.sort(key=lambda x: x[2])
except Exception as e:
logger.error(f"Error getting available start dates for patient {patient.id}: {e}")
return available_dates
[docs]
def get_patient_start_date(patient, start_date_reference='date_of_registration'):
"""Get the start date for a patient based on the reference type.
Args:
patient: Patient instance
start_date_reference: Type of start date reference key
Returns:
datetime.date or None: The start date or None if not available
"""
try:
if start_date_reference == 'date_of_registration':
return patient.date_of_registration
elif start_date_reference.startswith('date_of_diagnosis_'):
# Extract diagnosis ID from reference
diagnosis_id = start_date_reference.replace('date_of_diagnosis_', '')
diagnosis = patient.diagnosis_set.filter(id=diagnosis_id, date_of_diagnosis__isnull=False).first()
return diagnosis.date_of_diagnosis if diagnosis else None
elif start_date_reference.startswith('date_of_start_of_treatment_'):
# Extract treatment ID from reference
treatment_id = start_date_reference.replace('date_of_start_of_treatment_', '')
# Direct query with JOIN to avoid N+1 problem
from promapp.models import Treatment
treatment = Treatment.objects.filter(
id=treatment_id,
diagnosis__patient=patient,
date_of_start_of_treatment__isnull=False
).first()
return treatment.date_of_start_of_treatment if treatment else None
elif start_date_reference.startswith('date_of_end_of_treatment_'):
# Extract treatment ID from reference
treatment_id = start_date_reference.replace('date_of_end_of_treatment_', '')
# Direct query with JOIN to avoid N+1 problem
from promapp.models import Treatment
treatment = Treatment.objects.filter(
id=treatment_id,
diagnosis__patient=patient,
date_of_end_of_treatment__isnull=False
).first()
return treatment.date_of_end_of_treatment if treatment else None
else:
# Fallback to registration date
return patient.date_of_registration
except Exception as e:
logger.error(f"Error getting start date for patient {patient.id}: {e}")
return None
[docs]
def calculate_time_interval_value(submission_date, start_date, interval_type='weeks'):
"""Calculate the time interval value from start date to submission date.
Args:
submission_date: datetime object of the submission
start_date: date object of the start reference
interval_type: Type of interval ('seconds', 'minutes', 'hours', 'days', 'weeks', 'months', 'years')
Returns:
float: The calculated interval value
"""
if not start_date or not submission_date:
return 0
# Convert submission_date to date if it's datetime
if hasattr(submission_date, 'date'):
submission_date_only = submission_date.date()
else:
submission_date_only = submission_date
# Calculate the difference using relativedelta for all interval types
delta = relativedelta(submission_date_only, start_date)
# Convert to total days for time-based calculations
total_days = delta.years * 365.25 + delta.months * 30.44 + delta.days
if interval_type == 'seconds':
return total_days * 24 * 60 * 60
elif interval_type == 'minutes':
return total_days * 24 * 60
elif interval_type == 'hours':
return total_days * 24
elif interval_type == 'days':
return total_days
elif interval_type == 'weeks':
return total_days / 7
elif interval_type == 'months':
# Calculate total months with fractional part for days
total_months = delta.years * 12 + delta.months
day_fraction = delta.days / 30.44
return total_months + day_fraction
elif interval_type == 'years':
# Calculate total years with fractional part for months and days
total_years = delta.years
month_fraction = delta.months / 12.0
day_fraction = delta.days / 365.25
return total_years + month_fraction + day_fraction
return 0
[docs]
def get_interval_label(interval_type):
"""Get the display label for the interval type.
Args:
interval_type: Type of interval
Returns:
str: Display label for the interval
"""
labels = {
'seconds': 'Seconds',
'minutes': 'Minutes',
'hours': 'Hours',
'days': 'Days',
'weeks': 'Weeks',
'months': 'Months',
'years': 'Years'
}
return labels.get(interval_type, 'Weeks')
[docs]
def filter_positive_intervals(historical_responses, start_date, time_interval='weeks'):
"""Filter historical responses to only include those with non-negative time intervals.
Args:
historical_responses: List of response objects with submission dates
start_date: The reference start date
time_interval: Time interval type for calculation
Returns:
List: Filtered responses with only non-negative intervals
"""
if not start_date:
return historical_responses
filtered_responses = []
for response in historical_responses:
interval_value = calculate_time_interval_value(
response.questionnaire_submission.submission_date,
start_date,
time_interval
)
# Only include responses with non-negative intervals
if interval_value >= 0:
filtered_responses.append(response)
return filtered_responses
[docs]
def filter_positive_intervals_construct(historical_scores, start_date, time_interval='weeks'):
"""Filter historical construct scores to only include those with non-negative time intervals.
Args:
historical_scores: List of construct score objects with submission dates
start_date: The reference start date
time_interval: Time interval type for calculation
Returns:
List: Filtered scores with only non-negative intervals
"""
if not start_date:
return historical_scores
filtered_scores = []
for score in historical_scores:
interval_value = calculate_time_interval_value(
score.questionnaire_submission.submission_date,
start_date,
time_interval
)
# Only include scores with non-negative intervals
if interval_value >= 0:
filtered_scores.append(score)
return filtered_scores
[docs]
def add_clinical_indicators_to_plot(p, selected_indicators, start_date, time_interval, x_min, x_max, y_max=None):
"""Add diagnosis and treatment indicators to a Bokeh plot.
Args:
p: Bokeh plot figure
selected_indicators: List of indicator dictionaries with type, date, and label
start_date: Reference start date for time calculations
time_interval: Time interval type for calculations
x_min, x_max: X-axis range limits
y_max: Maximum Y value for positioning markers (optional)
"""
if not selected_indicators or not start_date:
return
plotting_logger.info(f"Adding {len(selected_indicators)} clinical indicators to plot")
for indicator in selected_indicators:
try:
indicator_date = datetime.strptime(indicator['date'], '%Y-%m-%d').date()
indicator_time = calculate_time_interval_value(indicator_date, start_date, time_interval)
# Only add indicators that fall within the plot range
if indicator_time >= x_min and indicator_time <= x_max:
# Color coding: blue for diagnosis, green for treatment start, red for treatment end
if indicator['type'] == 'diagnosis':
line_color = '#3b82f6' # Blue
line_alpha = 0.8
marker_type = 'triangle'
elif indicator['type'] == 'treatment_start':
line_color = '#10b981' # Green
line_alpha = 0.8
marker_type = 'circle'
elif indicator['type'] == 'treatment_end':
line_color = '#ef4444' # Red
line_alpha = 0.8
marker_type = 'circle'
else:
line_color = '#6b7280' # Gray fallback
line_alpha = 0.6
marker_type = 'circle'
# Add vertical line indicator
indicator_line = Span(
location=indicator_time,
dimension='height',
line_color=line_color,
line_dash='dashed',
line_width=2,
line_alpha=line_alpha
)
p.add_layout(indicator_line)
# Determine y position for marker
if y_max is None:
# Try to get y_max from plot range
if hasattr(p.y_range, 'end'):
y_position = p.y_range.end * 0.95
else:
y_position = 0.95 # Fallback for categorical plots
else:
y_position = y_max * 0.95
# Create a data source for the indicator marker
indicator_source = ColumnDataSource(data=dict(
x=[indicator_time],
y=[y_position],
label=[indicator['label']],
type=[indicator['type']],
date=[indicator['date']]
))
# Add marker
indicator_marker = p.scatter(
x='x',
y='y',
source=indicator_source,
size=8,
fill_color=line_color,
line_color=line_color,
marker=marker_type,
alpha=0.9
)
# Add hover tool for indicator
indicator_hover = HoverTool(
tooltips=[
('Event', '@label'),
('Type', '@type'),
('Date', '@date'),
('Time Interval', f'@x{{0.1}} {get_interval_label(time_interval).lower()}')
],
mode='mouse',
point_policy='follow_mouse',
renderers=[indicator_marker]
)
p.add_tools(indicator_hover)
plotting_logger.info(f"Added {indicator['type']} indicator at time {indicator_time:.1f}: {indicator['label']}")
except Exception as e:
plotting_logger.error(f"Error adding indicator {indicator}: {e}")
[docs]
def calculate_percentage(value: Optional[Decimal], max_value: Optional[Decimal]) -> float:
"""Calculate the percentage of a value relative to a maximum value.
Args:
value (Optional[Decimal]): The current value
max_value (Optional[Decimal]): The maximum possible value
Returns:
float: The percentage (0-100) or 0 if calculation fails
"""
try:
if value is None or max_value is None or max_value == 0:
return 0
return (float(value) / float(max_value)) * 100
except (ValueError, TypeError, ZeroDivisionError):
return 0
[docs]
class ConstructScoreData:
[docs]
def __init__(self, construct: ConstructScale, current_score: Optional[Decimal],
previous_score: Optional[Decimal], historical_scores: List[QuestionnaireConstructScore],
patient=None, start_date_reference='date_of_registration', time_interval='weeks',
aggregated_statistics=None, aggregation_metadata=None, aggregation_type='median_iqr',
selected_indicators=None, generate_plot=True):
self.construct = construct
self.score = current_score
self.previous_score = previous_score
self.score_change = self._calculate_score_change()
self.patient = patient
self.start_date_reference = start_date_reference
self.time_interval = time_interval
self.aggregated_statistics = aggregated_statistics or {}
self.aggregation_metadata = aggregation_metadata or {}
self.aggregation_type = aggregation_type
self.selected_indicators = selected_indicators or []
# LAZY LOADING: Only generate plot if explicitly requested
self.bokeh_plot = self._create_bokeh_plot(historical_scores) if generate_plot else None
# Generate clinical significance explanations
self.current_score_explanation = self._generate_current_score_explanation()
self.score_change_explanation = self._generate_score_change_explanation()
self.clinical_significance_summary = self._generate_clinical_significance_summary()
logger.info(f"Created ConstructScoreData for {construct.name}: score={current_score}, previous={previous_score}, aggregated_intervals={len(self.aggregated_statistics)}, has_metadata={bool(self.aggregation_metadata)}")
def _calculate_score_change(self) -> Optional[float]:
if self.score is not None and self.previous_score is not None:
change = float(self.score) - float(self.previous_score)
logger.debug(f"Calculated score change for {self.construct.name}: {change}")
return change
logger.debug(f"No score change calculated for {self.construct.name} - missing current or previous score")
return None
def _is_current_score_clinically_significant(self) -> Tuple[bool, str]:
"""
Determine if the current score is clinically significant based on the rules.
Returns (is_significant, explanation)
"""
if not self.score:
return False, ""
score = float(self.score)
direction = self.construct.scale_better_score_direction or 'Higher is Better'
# Get available parameters
threshold = self.construct.scale_threshold_score
mid = self.construct.scale_minimum_clinical_important_difference
normative = self.construct.scale_normative_score_mean
normative_sd = self.construct.scale_normative_score_standard_deviation
logger.debug(f"Checking current score significance for {self.construct.name}: score={score}, direction={direction}, threshold={threshold}, mid={mid}, normative={normative}, sd={normative_sd}")
# Apply rules based on direction
if direction == 'Higher is Better':
return self._check_higher_is_better_current(score, threshold, mid, normative, normative_sd)
elif direction == 'Lower is Better':
return self._check_lower_is_better_current(score, threshold, mid, normative, normative_sd)
elif direction == 'Middle is Better':
return self._check_middle_is_better_current(score, threshold, mid, normative, normative_sd)
return False, ""
def _check_higher_is_better_current(self, score, threshold, mid, normative, normative_sd):
"""Check current score significance for 'Higher is Better' direction"""
# Rule 1: Threshold + MID + Normative + SD available
if threshold and mid and normative and normative_sd:
threshold_val = float(threshold)
mid_val = float(mid)
threshold_with_mid = threshold_val - mid_val
if score <= threshold_with_mid:
difference = threshold_val - score
times_mid = difference / mid_val
return True, f"Current score ({score:.1f}) is below threshold ({threshold_val:.1f}) by {difference:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Rule 2: Threshold + MID available (Normative NA)
elif threshold and mid:
threshold_val = float(threshold)
mid_val = float(mid)
threshold_with_mid = threshold_val - mid_val
if score <= threshold_with_mid:
difference = threshold_val - score
times_mid = difference / mid_val
return True, f"Current score ({score:.1f}) is below threshold ({threshold_val:.1f}) by {difference:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Rule 3: Threshold + Normative + SD available (MID NA)
elif threshold and normative and normative_sd:
normative_val = float(normative)
sd_val = float(normative_sd)
threshold_with_sd = normative_val - 0.5 * sd_val
if score <= threshold_with_sd:
difference = normative_val - score
times_sd = difference / sd_val
return True, f"Current score ({score:.1f}) is below normative mean ({normative_val:.1f}) by {difference:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Rule 4: Normative + SD available (Threshold + MID NA)
elif normative and normative_sd:
normative_val = float(normative)
sd_val = float(normative_sd)
threshold_with_sd = normative_val - 0.5 * sd_val
if score <= threshold_with_sd:
difference = normative_val - score
times_sd = difference / sd_val
return True, f"Current score ({score:.1f}) is below normative mean ({normative_val:.1f}) by {difference:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Rule 5: Threshold + Normative available (MID + SD NA)
elif threshold and normative:
threshold_val = float(threshold)
if score < threshold_val:
difference = threshold_val - score
return True, f"Current score ({score:.1f}) is below threshold ({threshold_val:.1f}) by {difference:.1f}"
# Rule 6: Normative available (Threshold + MID + SD NA)
elif normative:
normative_val = float(normative)
if score < normative_val:
difference = normative_val - score
return True, f"Current score ({score:.1f}) is below normative mean ({normative_val:.1f}) by {difference:.1f}"
return False, ""
def _check_lower_is_better_current(self, score, threshold, mid, normative, normative_sd):
"""Check current score significance for 'Lower is Better' direction"""
# Rule 1: Threshold + MID + Normative + SD available
if threshold and mid and normative and normative_sd:
threshold_val = float(threshold)
mid_val = float(mid)
threshold_with_mid = threshold_val + mid_val
if score >= threshold_with_mid:
difference = score - threshold_val
times_mid = difference / mid_val
return True, f"Current score ({score:.1f}) is above threshold ({threshold_val:.1f}) by {difference:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Rule 2: Threshold + MID available (Normative NA)
elif threshold and mid:
threshold_val = float(threshold)
mid_val = float(mid)
threshold_with_mid = threshold_val + mid_val
if score >= threshold_with_mid:
difference = score - threshold_val
times_mid = difference / mid_val
return True, f"Current score ({score:.1f}) is above threshold ({threshold_val:.1f}) by {difference:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Rule 3: Threshold + Normative + SD available (MID NA)
elif threshold and normative and normative_sd:
normative_val = float(normative)
sd_val = float(normative_sd)
threshold_with_sd = normative_val + 0.5 * sd_val
if score >= threshold_with_sd:
difference = score - normative_val
times_sd = difference / sd_val
return True, f"Current score ({score:.1f}) is above normative mean ({normative_val:.1f}) by {difference:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Rule 4: Normative + SD available (Threshold + MID NA)
elif normative and normative_sd:
normative_val = float(normative)
sd_val = float(normative_sd)
threshold_with_sd = normative_val + 0.5 * sd_val
if score >= threshold_with_sd:
difference = score - normative_val
times_sd = difference / sd_val
return True, f"Current score ({score:.1f}) is above normative mean ({normative_val:.1f}) by {difference:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Rule 5: Threshold + Normative available (MID + SD NA)
elif threshold and normative:
threshold_val = float(threshold)
if score > threshold_val:
difference = score - threshold_val
return True, f"Current score ({score:.1f}) is above threshold ({threshold_val:.1f}) by {difference:.1f}"
# Rule 6: Normative available (Threshold + MID + SD NA)
elif normative:
normative_val = float(normative)
if score > normative_val:
difference = score - normative_val
return True, f"Current score ({score:.1f}) is above normative mean ({normative_val:.1f}) by {difference:.1f}"
return False, ""
def _check_middle_is_better_current(self, score, threshold, mid, normative, normative_sd):
"""Check current score significance for 'Middle is Better' direction"""
# Rule 1: Threshold + MID + Normative + SD available
if threshold and mid and normative and normative_sd:
threshold_val = float(threshold)
mid_val = float(mid)
difference = abs(score - threshold_val)
if difference >= mid_val:
times_mid = difference / mid_val
direction = "above" if score > threshold_val else "below"
return True, f"Current score ({score:.1f}) is {direction} threshold ({threshold_val:.1f}) by {difference:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Rule 2: Threshold + MID available (Normative NA)
elif threshold and mid:
threshold_val = float(threshold)
mid_val = float(mid)
difference = abs(score - threshold_val)
if difference >= mid_val:
times_mid = difference / mid_val
direction = "above" if score > threshold_val else "below"
return True, f"Current score ({score:.1f}) is {direction} threshold ({threshold_val:.1f}) by {difference:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Rule 3: Threshold + Normative + SD available (MID NA)
elif threshold and normative and normative_sd:
normative_val = float(normative)
sd_val = float(normative_sd)
difference = abs(score - normative_val)
if difference >= (0.5 * sd_val):
times_sd = difference / sd_val
direction = "above" if score > normative_val else "below"
return True, f"Current score ({score:.1f}) is {direction} normative mean ({normative_val:.1f}) by {difference:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Rule 4: Normative + SD available (Threshold + MID NA)
elif normative and normative_sd:
normative_val = float(normative)
sd_val = float(normative_sd)
difference = abs(score - normative_val)
if difference >= (0.5 * sd_val):
times_sd = difference / sd_val
direction = "above" if score > normative_val else "below"
return True, f"Current score ({score:.1f}) is {direction} normative mean ({normative_val:.1f}) by {difference:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Rule 5: Threshold + Normative available (MID + SD NA)
elif threshold and normative:
threshold_val = float(threshold)
if score != threshold_val: # Any difference
difference = abs(score - threshold_val)
direction = "above" if score > threshold_val else "below"
return True, f"Current score ({score:.1f}) is {direction} threshold ({threshold_val:.1f}) by {difference:.1f}"
# Rule 6: Normative available (Threshold + MID + SD NA)
elif normative:
normative_val = float(normative)
if score != normative_val: # Any difference
difference = abs(score - normative_val)
direction = "above" if score > normative_val else "below"
return True, f"Current score ({score:.1f}) is {direction} normative mean ({normative_val:.1f}) by {difference:.1f}"
return False, ""
def _is_score_change_clinically_significant(self) -> Tuple[bool, str]:
"""
Determine if the score change is clinically significant based on the rules.
Returns (is_significant, explanation)
"""
if not self.score_change or not self.previous_score:
return False, ""
direction = self.construct.scale_better_score_direction or 'Higher is Better'
mid = self.construct.scale_minimum_clinical_important_difference
normative_sd = self.construct.scale_normative_score_standard_deviation
change = abs(self.score_change)
prev_score = float(self.previous_score)
logger.debug(f"Checking score change significance for {self.construct.name}: change={self.score_change}, direction={direction}, mid={mid}, sd={normative_sd}")
# Apply rules based on direction
if direction == 'Higher is Better':
return self._check_higher_is_better_change(prev_score, mid, normative_sd)
elif direction == 'Lower is Better':
return self._check_lower_is_better_change(prev_score, mid, normative_sd)
elif direction == 'Middle is Better':
return self._check_middle_is_better_change(prev_score, mid, normative_sd)
return False, ""
def _check_higher_is_better_change(self, prev_score, mid, normative_sd):
"""Check score change significance for 'Higher is Better' direction"""
change = self.score_change
change_magnitude = abs(change)
# MID takes precedence if available
if mid:
mid_val = float(mid)
if change < -mid_val: # Current score is lower than previous by MID or more
times_mid = change_magnitude / mid_val
return True, f"Score decreased by {change_magnitude:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Use 1 SD if MID not available but SD is available
elif normative_sd:
sd_val = float(normative_sd)
if change < -sd_val: # Current score is lower than previous by 1 SD or more
times_sd = change_magnitude / sd_val
return True, f"Score decreased by {change_magnitude:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Use 10% change if neither MID nor SD available
else:
threshold_change = abs(prev_score * 0.1)
if change < -threshold_change: # Current score is lower by 10% or more
percent_change = abs(change/prev_score*100)
return True, f"Score decreased by {change_magnitude:.1f} ({percent_change:.1f}%), exceeding 10% threshold ({threshold_change:.1f})"
return False, ""
def _check_lower_is_better_change(self, prev_score, mid, normative_sd):
"""Check score change significance for 'Lower is Better' direction"""
change = self.score_change
change_magnitude = abs(change)
# MID takes precedence if available
if mid:
mid_val = float(mid)
if change > mid_val: # Current score is higher than previous by MID or more
times_mid = change / mid_val
return True, f"Score increased by {change:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Use 1 SD if MID not available but SD is available
elif normative_sd:
sd_val = float(normative_sd)
if change > sd_val: # Current score is higher than previous by 1 SD or more
times_sd = change / sd_val
return True, f"Score increased by {change:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Use 10% change if neither MID nor SD available
else:
threshold_change = abs(prev_score * 0.1)
if change > threshold_change: # Current score is higher by 10% or more
percent_change = change/prev_score*100
return True, f"Score increased by {change:.1f} ({percent_change:.1f}%), exceeding 10% threshold ({threshold_change:.1f})"
return False, ""
def _check_middle_is_better_change(self, prev_score, mid, normative_sd):
"""Check score change significance for 'Middle is Better' direction"""
change = self.score_change
change_magnitude = abs(change)
# MID takes precedence if available
if mid:
mid_val = float(mid)
if change_magnitude >= mid_val: # Change in either direction by MID or more
times_mid = change_magnitude / mid_val
direction = "increased" if change > 0 else "decreased"
return True, f"Score {direction} by {change_magnitude:.1f}, which is {times_mid:.2f} times the MID ({mid_val:.1f})"
# Use 1 SD if MID not available but SD is available
elif normative_sd:
sd_val = float(normative_sd)
if change_magnitude >= sd_val: # Change in either direction by 1 SD or more
times_sd = change_magnitude / sd_val
direction = "increased" if change > 0 else "decreased"
return True, f"Score {direction} by {change_magnitude:.1f}, which is {times_sd:.2f} times the SD ({sd_val:.1f})"
# Use 10% change if neither MID nor SD available
else:
threshold_change = abs(prev_score * 0.1)
if change_magnitude >= threshold_change: # Change in either direction by 10% or more
percent_change = abs(change/prev_score*100)
direction = "increased" if change > 0 else "decreased"
return True, f"Score {direction} by {change_magnitude:.1f} ({percent_change:.1f}%), exceeding 10% threshold ({threshold_change:.1f})"
return False, ""
def _generate_current_score_explanation(self) -> str:
"""Generate explanation for why the current score is clinically significant"""
is_significant, explanation = self._is_current_score_clinically_significant()
return explanation if is_significant else ""
def _generate_score_change_explanation(self) -> str:
"""Generate explanation for why the score change is clinically significant"""
is_significant, explanation = self._is_score_change_clinically_significant()
return explanation if is_significant else ""
def _generate_clinical_significance_summary(self) -> str:
"""Generate a comprehensive clinical significance summary"""
current_significant, current_explanation = self._is_current_score_clinically_significant()
change_significant, change_explanation = self._is_score_change_clinically_significant()
explanations = []
if current_significant:
explanations.append(current_explanation)
if change_significant:
explanations.append(change_explanation)
if explanations:
return ". ".join(explanations) + "."
return ""
[docs]
def is_clinically_significant(self) -> bool:
"""Check if this construct score is clinically significant for any reason"""
current_significant, _ = self._is_current_score_clinically_significant()
change_significant, _ = self._is_score_change_clinically_significant()
return current_significant or change_significant
def _get_aggregation_display_name(self) -> str:
"""Get a user-friendly display name for the aggregation type."""
aggregation_names = {
'median_iqr': 'Median with IQR',
'mean_95ci': 'Mean with 95% CI',
'mean_0.5sd': 'Mean ± 0.5 SD',
'mean_1sd': 'Mean ± 1 SD',
'mean_2sd': 'Mean ± 2 SD',
'mean_2.5sd': 'Mean ± 2.5 SD'
}
return aggregation_names.get(self.aggregation_type, 'Population Data')
def _create_bokeh_plot(self, historical_scores: List[QuestionnaireConstructScore]) -> str:
# Get start date for the patient
start_date = None
if self.patient:
start_date = get_patient_start_date(self.patient, self.start_date_reference)
plotting_logger.info("="*80)
plotting_logger.info(f"PLOTTING DATA for {self.construct.name}")
plotting_logger.info("="*80)
plotting_logger.info(f"Patient: {self.patient.name if self.patient else 'Unknown'}")
plotting_logger.info(f"Start Date: {start_date}")
plotting_logger.info(f"Time Interval Type: {self.time_interval}")
plotting_logger.info(f"Number of Historical Scores: {len(historical_scores)}")
# Filter out scores with negative intervals
if start_date:
filtered_scores = filter_positive_intervals_construct(historical_scores, start_date, self.time_interval)
else:
filtered_scores = historical_scores
plotting_logger.info(f"Filtered Scores (non-negative intervals): {len(filtered_scores)}")
# Prepare data with time intervals and submission dates for tooltip
time_intervals = []
submission_dates = []
for score in reversed(filtered_scores):
# Convert UTC time to local timezone
local_time = timezone.localtime(score.questionnaire_submission.submission_date)
submission_dates.append(local_time.strftime('%d/%m/%y'))
# Calculate time interval from start date
if start_date:
interval_value = calculate_time_interval_value(
score.questionnaire_submission.submission_date,
start_date,
self.time_interval
)
time_intervals.append(interval_value)
else:
time_intervals.append(0)
scores = [float(score.score) if score.score is not None else None for score in reversed(filtered_scores)]
# Log individual patient data in tabular format
plotting_logger.info("\nINDIVIDUAL PATIENT DATA:")
if time_intervals:
# Create DataFrame for nice tabular output
df_individual = pd.DataFrame({
'Time_Interval': [f"{x:.2f}" for x in time_intervals],
'Score': [f"{x:.1f}" if x is not None else "N/A" for x in scores],
'Submission_Date': submission_dates
})
plotting_logger.info(f"\n{df_individual.to_string(index=False)}")
else:
plotting_logger.info("No individual data available")
# Calculate x-axis range to ensure it starts from 0 or positive values
if time_intervals:
x_min = max(0, min(time_intervals) - 0.1 * (max(time_intervals) - min(time_intervals)) if max(time_intervals) > min(time_intervals) else 0)
x_max = max(time_intervals) + 0.1 * (max(time_intervals) - min(time_intervals)) if max(time_intervals) > min(time_intervals) else max(time_intervals) + 1
else:
x_min, x_max = 0, 1
plotting_logger.info(f"\nX-axis range: {x_min:.2f} to {x_max:.2f}")
# If we have aggregated data, extend the range to include all aggregated intervals
if self.aggregated_statistics:
agg_intervals = list(self.aggregated_statistics.keys())
if agg_intervals:
x_min = max(0, min(x_min, min(agg_intervals) - 0.1 * (max(agg_intervals) - min(agg_intervals)) if max(agg_intervals) > min(agg_intervals) else 0))
x_max = max(x_max, max(agg_intervals) + 0.1 * (max(agg_intervals) - min(agg_intervals)) if max(agg_intervals) > min(agg_intervals) else max(agg_intervals) + 1)
# Log aggregated data in tabular format
plotting_logger.info("\nAGGREGATED POPULATION DATA:")
plotting_logger.info(f"Number of time intervals with aggregated data: {len(self.aggregated_statistics)}")
# Create DataFrame for aggregated statistics
agg_data = []
for interval, stats in sorted(self.aggregated_statistics.items()):
agg_data.append({
'Time_Interval': f"{interval:.1f}",
'Central': f"{stats['central']:.2f}",
'Lower_Bound': f"{stats['lower']:.2f}",
'Upper_Bound': f"{stats['upper']:.2f}",
'Sample_Size': stats['n']
})
if agg_data:
df_agg = pd.DataFrame(agg_data)
plotting_logger.info(f"\n{df_agg.to_string(index=False)}")
plotting_logger.info(f"Extended X-axis range: {x_min:.2f} to {x_max:.2f}")
else:
plotting_logger.info("\nNO AGGREGATED DATA AVAILABLE")
# Create figure with linear x-axis for time intervals
interval_label = get_interval_label(self.time_interval)
p = figure(
width=400,
height=180,
tools="pan,box_zoom,reset",
toolbar_location=None,
sizing_mode="scale_width",
x_axis_label=f"Time ({interval_label})",
x_range=(x_min, x_max)
)
plotting_logger.info(f"\nPLOT CONFIGURATION:")
plotting_logger.info(f"X-axis label: Time ({interval_label})")
plotting_logger.info(f"Plot size: 400x200")
plotting_logger.info(f"Tools: pan, box_zoom, reset")
# Style the plot
p.background_fill_color = "#ffffff"
p.border_fill_color = "#ffffff"
p.grid.grid_line_color = "#e5e7eb"
p.grid.grid_line_width = 1
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.axis.minor_tick_line_color = None
# Add main line and points for individual patient
source = ColumnDataSource(data=dict(
time_intervals=time_intervals,
scores=scores,
submission_dates=submission_dates
))
# Add threshold line if available
if self.construct.scale_threshold_score:
threshold = Span(
location=float(self.construct.scale_threshold_score),
dimension='width',
line_color='#f97316',
line_dash='solid',
line_width=1
)
p.add_layout(threshold)
plotting_logger.info(f"Added threshold line at: {self.construct.scale_threshold_score}")
# Add normative line and band if available
if self.construct.scale_normative_score_mean:
normative = Span(
location=float(self.construct.scale_normative_score_mean),
dimension='width',
line_color='#1e3a8a',
line_dash='solid',
line_width=1
)
p.add_layout(normative)
plotting_logger.info(f"Added normative line at: {self.construct.scale_normative_score_mean}")
# Add standard deviation band if available
if self.construct.scale_normative_score_standard_deviation:
sd = float(self.construct.scale_normative_score_standard_deviation)
mean = float(self.construct.scale_normative_score_mean)
band = BoxAnnotation(
bottom=mean - sd,
top=mean + sd,
fill_color='#1e3a8a',
fill_alpha=0.1,
line_width=0
)
p.add_layout(band)
plotting_logger.info(f"Added normative band: {mean - sd:.2f} to {mean + sd:.2f}")
# Add aggregated data if available
if self.aggregated_statistics:
# Prepare aggregated data
agg_intervals = sorted(self.aggregated_statistics.keys())
agg_central = [self.aggregated_statistics[interval]['central'] for interval in agg_intervals]
agg_lower = [self.aggregated_statistics[interval]['lower'] for interval in agg_intervals]
agg_upper = [self.aggregated_statistics[interval]['upper'] for interval in agg_intervals]
agg_n = [self.aggregated_statistics[interval]['n'] for interval in agg_intervals]
# Log aggregated plot arrays in tabular format
plotting_logger.info("\nAGGREGATED PLOT DATA ARRAYS:")
plot_data = []
for i, interval in enumerate(agg_intervals):
plot_data.append({
'Array_Index': i,
'Time_Interval': f"{interval:.1f}",
'Central_Value': f"{agg_central[i]:.2f}",
'Lower_Bound': f"{agg_lower[i]:.2f}",
'Upper_Bound': f"{agg_upper[i]:.2f}",
'Sample_Size': agg_n[i]
})
df_plot = pd.DataFrame(plot_data)
plotting_logger.info(f"\n{df_plot.to_string(index=False)}")
# Create data source for aggregated data
agg_source = ColumnDataSource(data=dict(
time_intervals=agg_intervals,
central=agg_central,
lower=agg_lower,
upper=agg_upper,
n=agg_n
))
# Add aggregated central line (dotted gray)
p.line(
x='time_intervals',
y='central',
source=agg_source,
line_width=2,
line_color='#6b7280',
line_dash='dotted',
alpha=0.8
)
# Add aggregated points
agg_scatter = p.scatter(
x='time_intervals',
y='central',
source=agg_source,
size=4,
fill_color='#6b7280',
line_color='#6b7280',
alpha=0.8
)
# Add error bars for dispersion
from bokeh.models import Whisker
whisker = Whisker(
source=agg_source,
base='time_intervals',
upper='upper',
lower='lower',
line_color='#6b7280',
line_alpha=0.6,
line_width=1
)
p.add_layout(whisker)
plotting_logger.info("Added population line, points, and error bars")
# Determine aggregation display name
aggregation_display_name = self._get_aggregation_display_name()
# Add hover tool for aggregated data
agg_hover = HoverTool(
tooltips=[
('Time Interval', f'@time_intervals{{0.1}} {get_interval_label(self.time_interval).lower()}'),
('Aggregation Type', aggregation_display_name),
('Central Value', '@central{0.1}'),
('Lower Bound', '@lower{0.1}'),
('Upper Bound', '@upper{0.1}'),
('Sample Size', '@n patients')
],
mode='mouse',
point_policy='follow_mouse',
renderers=[agg_scatter]
)
p.add_tools(agg_hover)
# Add individual patient line and points (on top of aggregated data)
p.line(
x='time_intervals',
y='scores',
source=source,
line_width=2,
line_color='#000000'
)
# Add scatter points
individual_scatter = p.scatter(
x='time_intervals',
y='scores',
source=source,
size=6,
fill_color='#000000',
line_color='#000000'
)
plotting_logger.info("Added individual patient line and points (black)")
# Configure hover tool for individual data
individual_hover = HoverTool(
tooltips=[
('Submission Date', '@submission_dates'),
('Time Interval', f'@time_intervals{{0.1}} {get_interval_label(self.time_interval).lower()}'),
('Score', '@scores{0.1}')
],
mode='mouse',
point_policy='follow_mouse',
renderers=[individual_scatter]
)
p.add_tools(individual_hover)
plotting_logger.info("Added hover tooltips for individual data")
# Add diagnosis and treatment indicators if selected
# Filter out None values when calculating max for indicators
valid_scores = [s for s in scores if s is not None]
y_max_for_indicators = max(valid_scores) if valid_scores else 100
add_clinical_indicators_to_plot(p, self.selected_indicators, start_date, self.time_interval, x_min, x_max, y_max_for_indicators)
plotting_logger.info("="*80)
plotting_logger.info(f"END PLOTTING DATA for {self.construct.name}")
plotting_logger.info("="*80)
# Get the plot components
script, div = components(p)
return script + div
[docs]
@staticmethod
def is_important_construct(construct: ConstructScale, current_score: Optional[Decimal]) -> bool:
"""
Determine if a construct is important based on the comprehensive clinical significance rules.
This method creates a temporary ConstructScoreData object to leverage the full significance logic.
"""
logger.info(f"Checking if construct {construct.name} is important (score={current_score})")
if not current_score:
logger.info(f"Construct {construct.name} not important - no current score")
return False
# Create a temporary ConstructScoreData instance to use the comprehensive significance logic
temp_score_data = ConstructScoreData.__new__(ConstructScoreData)
temp_score_data.construct = construct
temp_score_data.score = current_score
temp_score_data.previous_score = None # We don't have previous score context here
temp_score_data.score_change = None
# Check if the current score is clinically significant
is_significant, explanation = temp_score_data._is_current_score_clinically_significant()
logger.info(f"Construct {construct.name} {'is' if is_significant else 'is not'} important - {explanation if explanation else 'no applicable criteria met'}")
return is_significant
[docs]
class CompositeConstructScoreData:
"""Data class for composite construct scores with plotting capabilities."""
[docs]
def __init__(self, composite_construct_scale, current_score: Optional[Decimal],
previous_score: Optional[Decimal], historical_scores: List,
patient=None, start_date_reference='date_of_registration', time_interval='weeks',
selected_indicators=None, generate_plot=True):
self.composite_construct_scale = composite_construct_scale
self.score = current_score
self.previous_score = previous_score
self.score_change = self._calculate_score_change()
self.patient = patient
self.start_date_reference = start_date_reference
self.time_interval = time_interval
self.selected_indicators = selected_indicators or []
# LAZY LOADING: Only generate plot if explicitly requested
self.bokeh_plot = self._create_bokeh_plot(historical_scores) if generate_plot else None
logger.info(f"Created CompositeConstructScoreData for {composite_construct_scale.composite_construct_scale_name}: score={current_score}, previous={previous_score}")
def _calculate_score_change(self) -> Optional[float]:
if self.score is not None and self.previous_score is not None:
change = float(self.score) - float(self.previous_score)
logger.debug(f"Calculated score change for {self.composite_construct_scale.composite_construct_scale_name}: {change}")
return change
logger.debug(f"No score change calculated for {self.composite_construct_scale.composite_construct_scale_name} - missing current or previous score")
return None
def _create_bokeh_plot(self, historical_scores: List) -> str:
"""Create a Bokeh plot for composite construct scores over time."""
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, HoverTool, Span, BoxAnnotation
from bokeh.embed import components
from django.utils import timezone
# Get start date for the patient
start_date = None
if self.patient:
start_date = get_patient_start_date(self.patient, self.start_date_reference)
plotting_logger.info("="*80)
plotting_logger.info(f"PLOTTING DATA for Composite: {self.composite_construct_scale.composite_construct_scale_name}")
plotting_logger.info("="*80)
plotting_logger.info(f"Patient: {self.patient.name if self.patient else 'Unknown'}")
plotting_logger.info(f"Start Date: {start_date}")
plotting_logger.info(f"Time Interval Type: {self.time_interval}")
plotting_logger.info(f"Number of Historical Scores: {len(historical_scores)}")
# Filter out scores with negative intervals
if start_date:
filtered_scores = filter_positive_intervals_composite(historical_scores, start_date, self.time_interval)
else:
filtered_scores = historical_scores
plotting_logger.info(f"Filtered Scores (non-negative intervals): {len(filtered_scores)}")
# Prepare data with time intervals and submission dates for tooltip
time_intervals = []
submission_dates = []
for score in reversed(filtered_scores):
# Convert UTC time to local timezone
local_time = timezone.localtime(score.questionnaire_submission.submission_date)
submission_dates.append(local_time.strftime('%d/%m/%y'))
# Calculate time interval from start date
if start_date:
interval_value = calculate_time_interval_value(
score.questionnaire_submission.submission_date,
start_date,
self.time_interval
)
time_intervals.append(interval_value)
else:
time_intervals.append(0)
scores = [float(score.score) if score.score is not None else None for score in reversed(filtered_scores)]
# Calculate x-axis range
if time_intervals:
x_min = max(0, min(time_intervals) - 0.1 * (max(time_intervals) - min(time_intervals)) if max(time_intervals) > min(time_intervals) else 0)
x_max = max(time_intervals) + 0.1 * (max(time_intervals) - min(time_intervals)) if max(time_intervals) > min(time_intervals) else max(time_intervals) + 1
else:
x_min, x_max = 0, 1
# Create figure
interval_label = get_interval_label(self.time_interval)
p = figure(
width=400,
height=180,
tools="pan,box_zoom,reset",
toolbar_location=None,
sizing_mode="scale_width",
x_axis_label=f"Time ({interval_label})",
x_range=(x_min, x_max)
)
# Style the plot
p.background_fill_color = "#ffffff"
p.border_fill_color = "#ffffff"
p.grid.grid_line_color = "#e5e7eb"
p.grid.grid_line_width = 1
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.axis.minor_tick_line_color = None
# Add main line and points
source = ColumnDataSource(data=dict(
time_intervals=time_intervals,
scores=scores,
submission_dates=submission_dates
))
# Add threshold line if available
if self.composite_construct_scale.composite_construct_scale_threshold_score:
try:
threshold_val = float(self.composite_construct_scale.composite_construct_scale_threshold_score)
threshold = Span(
location=threshold_val,
dimension='width',
line_color='#f97316',
line_dash='solid',
line_width=1
)
p.add_layout(threshold)
plotting_logger.info(f"Added threshold line at: {threshold_val}")
except (ValueError, TypeError):
pass
# Add normative line and band if available
if self.composite_construct_scale.composite_construct_scale_normative_score_mean:
try:
normative_val = float(self.composite_construct_scale.composite_construct_scale_normative_score_mean)
normative = Span(
location=normative_val,
dimension='width',
line_color='#1e3a8a',
line_dash='solid',
line_width=1
)
p.add_layout(normative)
plotting_logger.info(f"Added normative line at: {normative_val}")
# Add standard deviation band if available
if self.composite_construct_scale.composite_construct_scale_normative_score_standard_deviation:
try:
sd = float(self.composite_construct_scale.composite_construct_scale_normative_score_standard_deviation)
band = BoxAnnotation(
bottom=normative_val - sd,
top=normative_val + sd,
fill_color='#1e3a8a',
fill_alpha=0.1,
line_width=0
)
p.add_layout(band)
plotting_logger.info(f"Added normative band: {normative_val - sd:.2f} to {normative_val + sd:.2f}")
except (ValueError, TypeError):
pass
except (ValueError, TypeError):
pass
# Add hover tool first
hover = HoverTool(
tooltips=[
('Time Interval', f'@time_intervals{{0.1}} {get_interval_label(self.time_interval).lower()}'),
('Score', '@scores{0.1}'),
('Date', '@submission_dates')
]
)
# Add line and scatter plot
p.line(
x='time_intervals',
y='scores',
source=source,
line_width=2,
line_color='#3b82f6',
alpha=0.8
)
p.scatter(
x='time_intervals',
y='scores',
source=source,
size=8,
fill_color='#3b82f6',
line_color='#1e40af',
alpha=0.9
)
# Add hover tool to the figure's tools
p.tools.append(hover)
plotting_logger.info("="*80)
plotting_logger.info(f"END PLOTTING DATA for Composite: {self.composite_construct_scale.composite_construct_scale_name}")
plotting_logger.info("="*80)
# Get the plot components
script, div = components(p)
return script + div
[docs]
def filter_positive_intervals_composite(scores, start_date, time_interval):
"""Filter composite construct scores to only include those with non-negative time intervals."""
filtered = []
for score in scores:
interval_value = calculate_time_interval_value(
score.questionnaire_submission.submission_date,
start_date,
time_interval
)
if interval_value >= 0:
filtered.append(score)
return filtered
[docs]
def create_item_response_plot(historical_responses: List['QuestionnaireItemResponse'], item: 'Item',
patient=None, start_date_reference='date_of_registration', time_interval='weeks',
selected_indicators=None) -> str:
"""Create a Bokeh plot for item responses over time.
Args:
historical_responses (List[QuestionnaireItemResponse]): List of historical responses
item (Item): The item being plotted
patient: Patient instance for start date calculation
start_date_reference: Reference date type for time calculation
time_interval: Time interval type for x-axis
Returns:
str: HTML string containing the Bokeh plot components
"""
logger.debug(f"create_item_response_plot called for item {item.id}, type: {item.response_type}, has likert_response: {bool(item.likert_response)}, has range_response: {bool(item.range_response)}")
if item.response_type == 'Likert' and item.likert_response:
return create_likert_response_plot(historical_responses, item, patient, start_date_reference, time_interval, selected_indicators)
else:
return create_numeric_response_plot(historical_responses, item, patient, start_date_reference, time_interval, selected_indicators)
[docs]
def create_likert_response_plot(historical_responses: List['QuestionnaireItemResponse'], item: 'Item',
patient=None, start_date_reference='date_of_registration', time_interval='weeks',
selected_indicators=None) -> str:
"""Create a Bokeh plot specifically for Likert responses.
Args:
historical_responses (List[QuestionnaireItemResponse]): List of historical responses
item (Item): The item being plotted
patient: Patient instance for start date calculation
start_date_reference: Reference date type for time calculation
time_interval: Time interval type for x-axis
Returns:
str: HTML string containing the Bokeh plot components
"""
# Get all options ordered by their value
options = list(item.likert_response.likertscaleresponseoption_set.all().order_by('option_value'))
option_map = {str(opt.option_value): opt.option_text for opt in options}
y_range = [opt.option_text for opt in options]
# === OPTIMIZATION: Calculate colors in Python instead of using get_option_colors ===
# Avoid additional database query by calculating colors directly
better_direction = item.item_better_score_direction or 'Higher is Better'
n_options = len(options)
if n_options > 0:
# Get colors from viridis palette
colors = item.likert_response.get_viridis_colors(n_options)
# Create mapping of option values to colors
color_map = {}
for i, option in enumerate(options):
if better_direction == 'Higher is Better':
# Higher values get lighter colors
color_map[str(option.option_value)] = colors[i]
else:
# Lower values get lighter colors
color_map[str(option.option_value)] = colors[-(i+1)]
else:
color_map = {}
# Get start date for the patient
start_date = None
if patient:
start_date = get_patient_start_date(patient, start_date_reference)
# Filter out responses with negative intervals
if start_date:
filtered_responses = filter_positive_intervals(historical_responses, start_date, time_interval)
else:
filtered_responses = historical_responses
# Prepare data
time_intervals = []
submission_dates = []
option_texts = []
for response in reversed(filtered_responses):
local_time = timezone.localtime(response.questionnaire_submission.submission_date)
submission_dates.append(local_time.strftime('%d/%m/%y'))
# Calculate time interval from start date
if start_date:
interval_value = calculate_time_interval_value(
response.questionnaire_submission.submission_date,
start_date,
time_interval
)
time_intervals.append(interval_value)
else:
time_intervals.append(0)
option_text = option_map.get(str(response.response_value), '')
option_texts.append(option_text)
# Calculate x-axis range to ensure it starts from 0 or positive values
if time_intervals:
x_min = max(0, min(time_intervals) - 0.1 * (max(time_intervals) - min(time_intervals)) if max(time_intervals) > min(time_intervals) else 0)
x_max = max(time_intervals) + 0.1 * (max(time_intervals) - min(time_intervals)) if max(time_intervals) > min(time_intervals) else max(time_intervals) + 1
else:
x_min, x_max = 0, 1
# Create figure with linear x-axis for time intervals
interval_label = get_interval_label(time_interval)
p = figure(
width=400,
height=220,
tools="pan,box_zoom,reset",
toolbar_location=None,
sizing_mode="scale_width",
x_axis_label=f"Time ({interval_label})",
y_range=FactorRange(factors=y_range),
x_range=(x_min, x_max)
)
# Style the plot
p.background_fill_color = "#ffffff"
p.border_fill_color = "#ffffff"
p.grid.grid_line_color = "#e5e7eb"
p.grid.grid_line_width = 1
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.axis.minor_tick_line_color = None
# Format axes
p.yaxis.major_label_orientation = math.pi/4
# Add colored strips for each option
n = len(options)
for i, option in enumerate(options):
color = color_map.get(str(option.option_value), '#ffffff')
if i == 0:
# First option: extend to bottom
bottom = -0.5
top = 0.5
elif i == n - 1:
# Last option: extend to top
bottom = i - 0.5
top = i + 0.5
else:
bottom = i - 0.5
top = i + 0.5
box = BoxAnnotation(
bottom=bottom,
top=top,
fill_color=color,
fill_alpha=0.2,
line_width=0
)
p.add_layout(box)
# Add data for individual patient
source = ColumnDataSource(data=dict(
time_intervals=time_intervals,
responses=option_texts,
submission_dates=submission_dates
))
# Add individual patient line and points (on top of aggregated data)
p.line(
x='time_intervals',
y='responses',
source=source,
line_width=2,
line_color='#000000'
)
individual_scatter = p.scatter(
x='time_intervals',
y='responses',
source=source,
size=6,
fill_color='#000000',
line_color='#000000'
)
# Configure hover tool for individual data
hover = HoverTool(
tooltips=[
('Submission Date', '@submission_dates'),
('Time Interval', '@time_intervals{0.0}'),
('Response', '@responses')
],
mode='mouse',
point_policy='follow_mouse',
renderers=[individual_scatter]
)
p.add_tools(hover)
# Add diagnosis and treatment indicators if selected
add_clinical_indicators_to_plot(p, selected_indicators, start_date, time_interval, x_min, x_max)
# Get the plot components
script, div = components(p)
return script + div
[docs]
def create_numeric_response_plot(historical_responses: List['QuestionnaireItemResponse'], item: 'Item',
patient=None, start_date_reference='date_of_registration', time_interval='weeks',
selected_indicators=None) -> str:
"""Create a Bokeh plot for numeric responses.
Args:
historical_responses (List[QuestionnaireItemResponse]): List of historical responses
item (Item): The item being plotted
patient: Patient instance for start date calculation
start_date_reference: Reference date type for time calculation
time_interval: Time interval type for x-axis
Returns:
str: HTML string containing the Bokeh plot components
"""
# Get start date for the patient
start_date = None
if patient:
start_date = get_patient_start_date(patient, start_date_reference)
# Filter out responses with negative intervals
if start_date:
filtered_responses = filter_positive_intervals(historical_responses, start_date, time_interval)
else:
filtered_responses = historical_responses
# Prepare data
time_intervals = []
submission_dates = []
values = []
for response in reversed(filtered_responses):
local_time = timezone.localtime(response.questionnaire_submission.submission_date)
submission_dates.append(local_time.strftime('%d/%m/%y'))
# Calculate time interval from start date
if start_date:
interval_value = calculate_time_interval_value(
response.questionnaire_submission.submission_date,
start_date,
time_interval
)
time_intervals.append(interval_value)
else:
time_intervals.append(0)
try:
value = float(response.response_value) if response.response_value else None
values.append(value)
except (ValueError, TypeError):
values.append(None)
logger.debug(f"Numeric plot for item {item.id}: Time intervals: {time_intervals}")
logger.debug(f"Numeric plot for item {item.id}: Values: {values}")
logger.debug(f"Numeric plot for item {item.id}: Threshold: {item.item_threshold_score}, Normative Mean: {item.item_normative_score_mean}, SD: {item.item_normative_score_standard_deviation}")
# Calculate x-axis range to ensure it starts from 0 or positive values
if time_intervals:
x_min = max(0, min(time_intervals) - 0.1 * (max(time_intervals) - min(time_intervals)) if max(time_intervals) > min(time_intervals) else 0)
x_max = max(time_intervals) + 0.1 * (max(time_intervals) - min(time_intervals)) if max(time_intervals) > min(time_intervals) else max(time_intervals) + 1
else:
x_min, x_max = 0, 1
# Create figure with linear x-axis for time intervals
interval_label = get_interval_label(time_interval)
p = figure(
width=400,
height=220,
tools="pan,box_zoom,reset",
toolbar_location=None,
sizing_mode="scale_width",
x_axis_label=f"Time ({interval_label})",
x_range=(x_min, x_max)
)
# Style the plot
p.background_fill_color = "#ffffff"
p.border_fill_color = "#ffffff"
p.grid.grid_line_color = "#e5e7eb"
p.grid.grid_line_width = 1
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.axis.minor_tick_line_color = None
# Add data for individual patient
source = ColumnDataSource(data=dict(
time_intervals=time_intervals,
values=values,
submission_dates=submission_dates
))
# Add threshold line if available
if item.item_threshold_score:
threshold = Span(
location=float(item.item_threshold_score),
dimension='width',
line_color='#f97316',
line_dash='solid',
line_width=1
)
p.add_layout(threshold)
# Add normative line and band if available
if item.item_normative_score_mean:
normative = Span(
location=float(item.item_normative_score_mean),
dimension='width',
line_color='#1e3a8a',
line_dash='solid',
line_width=1
)
p.add_layout(normative)
if item.item_normative_score_standard_deviation:
sd = float(item.item_normative_score_standard_deviation)
mean = float(item.item_normative_score_mean)
band = BoxAnnotation(
bottom=mean - sd,
top=mean + sd,
fill_color='#1e3a8a',
fill_alpha=0.1,
line_width=0
)
p.add_layout(band)
# Add individual patient line and points (on top of aggregated data)
p.line(
x='time_intervals',
y='values',
source=source,
line_width=2,
line_color='#000000'
)
individual_scatter = p.scatter(
x='time_intervals',
y='values',
source=source,
size=6,
fill_color='#000000',
line_color='#000000'
)
# Configure hover tool for individual data
hover = HoverTool(
tooltips=[
('Submission Date', '@submission_dates'),
('Time Interval', '@time_intervals{0.0}'),
('Value', '@values{0.0}')
],
mode='mouse',
point_policy='follow_mouse',
renderers=[individual_scatter]
)
p.add_tools(hover)
# Add diagnosis and treatment indicators if selected
# Filter out None values when calculating max for indicators
valid_values = [v for v in values if v is not None]
y_max_for_indicators = max(valid_values) if valid_values else 100
add_clinical_indicators_to_plot(p, selected_indicators, start_date, time_interval, x_min, x_max, y_max_for_indicators)
# Get the plot components
script, div = components(p)
return script + div
def _get_patient_start_date_bulk(patient, start_date_reference, reference_objects_cache):
"""Optimized version of get_patient_start_date_for_aggregation that uses prefetched data.
This function is designed for BULK OPERATIONS where multiple patients are processed
in a loop. It eliminates the N+1 query problem by using prefetched data and cached
reference objects instead of making individual database queries for each patient.
KEY DIFFERENCES vs get_patient_start_date():
==========================================
DATABASE QUERIES:
- get_patient_start_date(): 1-3 queries PER patient (N+1 problem)
- _get_patient_start_date_bulk(): 0 queries per patient (uses prefetched data)
PERFORMANCE:
- get_patient_start_date(): Fine for single patient operations
- _get_patient_start_date_bulk(): Optimized for bulk processing (100x faster for large datasets)
SETUP REQUIREMENTS:
- get_patient_start_date(): No setup required
- _get_patient_start_date_bulk(): Requires prefetched patient data and reference_objects_cache
USAGE:
- get_patient_start_date(): General-purpose, single patient operations
- _get_patient_start_date_bulk(): Specialized for bulk aggregation functions only
EXAMPLE BULK SETUP REQUIRED:
```python
# 1. Prefetch patient data
patients = patients_queryset.prefetch_related(
'diagnosis_set__diagnosis',
'diagnosis_set__treatment_set__treatment_type'
)
# 2. Cache reference objects
reference_objects_cache = {}
if start_date_reference.startswith('date_of_diagnosis_'):
reference_objects_cache['diagnosis'] = Diagnosis.objects.get(id=diagnosis_id)
# 3. Then use this optimized function in a loop
for patient in patients:
start_date = _get_patient_start_date_bulk(patient, start_date_reference, reference_objects_cache)
```
Args:
patient: Patient instance with prefetched diagnosis_set and treatment_set data
start_date_reference: Type of start date reference key
reference_objects_cache: Dict containing cached reference diagnosis/treatment objects.
Should contain 'diagnosis' key for diagnosis-based references,
or 'treatment' key for treatment-based references.
Returns:
datetime.date or None: The start date or None if not available
Note:
This is a private function (underscore prefix) intended only for use within
bulk optimization scenarios. For single patient operations, use get_patient_start_date().
"""
try:
if start_date_reference == 'date_of_registration':
return patient.date_of_registration
elif start_date_reference.startswith('date_of_diagnosis_'):
# Use cached reference diagnosis to find matching diagnosis type
cached_diagnosis = reference_objects_cache.get('diagnosis')
if not cached_diagnosis:
return None
diagnosis_list_id = cached_diagnosis.diagnosis_id
# Use prefetched data to find matching diagnosis
for diagnosis in patient.diagnosis_set.all():
if (diagnosis.diagnosis_id == diagnosis_list_id and
diagnosis.date_of_diagnosis):
return diagnosis.date_of_diagnosis
return None
elif start_date_reference.startswith('date_of_start_of_treatment_'):
# Use cached reference treatment to find matching treatment type
cached_treatment = reference_objects_cache.get('treatment')
if not cached_treatment:
return None
treatment_type_ids = cached_treatment['type_ids']
# Use prefetched data to find matching treatment
for diagnosis in patient.diagnosis_set.all():
for treatment in diagnosis.treatment_set.all():
# Check if any treatment type matches
treatment_types = treatment.treatment_type.all()
for tt in treatment_types:
if tt.id in treatment_type_ids and treatment.date_of_start_of_treatment:
return treatment.date_of_start_of_treatment
return None
elif start_date_reference.startswith('date_of_end_of_treatment_'):
# Use cached reference treatment to find matching treatment type
cached_treatment = reference_objects_cache.get('treatment')
if not cached_treatment:
return None
treatment_type_ids = cached_treatment['type_ids']
# Use prefetched data to find matching treatment
for diagnosis in patient.diagnosis_set.all():
for treatment in diagnosis.treatment_set.all():
# Check if any treatment type matches
treatment_types = treatment.treatment_type.all()
for tt in treatment_types:
if tt.id in treatment_type_ids and treatment.date_of_end_of_treatment:
return treatment.date_of_end_of_treatment
return None
else:
# Fallback to registration date
return patient.date_of_registration
except Exception as e:
logger.error(f"Error getting bulk start date for patient {patient.id}: {e}")
return None
[docs]
def get_patient_start_date_for_aggregation(patient, start_date_reference='date_of_registration'):
"""Get the start date for a patient for aggregation purposes.
For aggregation, we use the same diagnosis/treatment type but allow different dates:
- If start_date_reference is a specific diagnosis, use that same diagnosis type's date
- If start_date_reference is a specific treatment, use that same treatment type's date
- Otherwise use the exact reference
Args:
patient: Patient instance
start_date_reference: Type of start date reference key
Returns:
datetime.date or None: The start date or None if not available
"""
try:
if start_date_reference == 'date_of_registration':
return patient.date_of_registration
elif start_date_reference.startswith('date_of_diagnosis_'):
# Extract the diagnosis ID from the reference to get the diagnosis type
diagnosis_id = start_date_reference.replace('date_of_diagnosis_', '')
try:
# Get the diagnosis type from the reference diagnosis
reference_diagnosis = Diagnosis.objects.get(id=diagnosis_id)
diagnosis_list_id = reference_diagnosis.diagnosis_id
# Find this patient's diagnosis of the same type
patient_diagnosis = patient.diagnosis_set.filter(
diagnosis_id=diagnosis_list_id,
date_of_diagnosis__isnull=False
).order_by('date_of_diagnosis').first()
return patient_diagnosis.date_of_diagnosis if patient_diagnosis else None
except:
# If we can't find the specific diagnosis type, return None
return None
elif start_date_reference.startswith('date_of_start_of_treatment_'):
# Extract the treatment ID from the reference to get the treatment type
treatment_id = start_date_reference.replace('date_of_start_of_treatment_', '')
try:
# Get the treatment types from the reference treatment
reference_treatment = Treatment.objects.get(id=treatment_id)
treatment_type_ids = list(reference_treatment.treatment_type.values_list('id', flat=True))
# Find this patient's treatment with the same types using a single optimized query
patient_treatment = Treatment.objects.filter(
diagnosis__patient=patient,
treatment_type__id__in=treatment_type_ids,
date_of_start_of_treatment__isnull=False
).order_by('date_of_start_of_treatment').first()
return patient_treatment.date_of_start_of_treatment if patient_treatment else None
except:
# If we can't find the specific treatment type, return None
return None
elif start_date_reference.startswith('date_of_end_of_treatment_'):
# Extract the treatment ID from the reference to get the treatment type
treatment_id = start_date_reference.replace('date_of_end_of_treatment_', '')
try:
# Get the treatment types from the reference treatment
reference_treatment = Treatment.objects.get(id=treatment_id)
treatment_type_ids = list(reference_treatment.treatment_type.values_list('id', flat=True))
# Find this patient's treatment with the same types using a single optimized query
patient_treatment = Treatment.objects.filter(
diagnosis__patient=patient,
treatment_type__id__in=treatment_type_ids,
date_of_end_of_treatment__isnull=False
).order_by('date_of_end_of_treatment').first()
return patient_treatment.date_of_end_of_treatment if patient_treatment else None
except:
# If we can't find the specific treatment type, return None
return None
else:
# Fallback to exact reference for other types
return get_patient_start_date(patient, start_date_reference)
except Exception as e:
logger.error(f"Error getting aggregation start date for patient {patient.id}: {e}")
return None
[docs]
def get_filtered_patients_for_aggregation(exclude_patient, patient_filter_gender=None,
patient_filter_diagnosis=None, patient_filter_treatment=None,
patient_filter_min_age=None, patient_filter_max_age=None):
"""Get patients for aggregation based on filtering criteria, excluding the current patient.
Args:
exclude_patient: Patient instance to exclude from aggregation
patient_filter_gender: Gender filter ('match', specific gender, or None for all)
patient_filter_diagnosis: Diagnosis filter ('match', specific diagnosis ID, or None for all)
patient_filter_treatment: Treatment filter ('match', specific treatment type ID, or None for all)
patient_filter_min_age: Minimum age filter (integer or None)
patient_filter_max_age: Maximum age filter (integer or None)
Returns:
QuerySet: Filtered patients excluding the current patient
Note:
Adding New Filter Fields - To add a new filtering criterion, follow this pattern:
1. Add Parameter: Add the new filter parameter to the function signature
2. Add Documentation: Update this docstring with the new parameter description
3. Implement Filter Logic: Add the filtering logic following existing patterns
4. Update Callers: Update all places that call this function
5. Update UI: Add the new filter to the frontend
Filter Types Supported:
- 'match': Match the exclude_patient's value for this field
- Specific value: Filter to patients with this exact value
- None/empty: No filtering applied for this field
- Range values: For numeric fields (min/max parameters)
Important Notes:
- All filters use AND logic (cumulative narrowing)
- Use .distinct() for relationship-based filters to avoid duplicates
- Optimize relationship queries using values_list() instead of loops
- Consider database indexes for new filterable fields
"""
# Start with all patients except the current one
patients = Patient.objects.exclude(id=exclude_patient.id)
# Apply gender filter
if patient_filter_gender:
if patient_filter_gender == 'match':
patients = patients.filter(gender=exclude_patient.gender)
else:
patients = patients.filter(gender=patient_filter_gender)
# Apply diagnosis filter
if patient_filter_diagnosis:
if patient_filter_diagnosis == 'match':
# Get all diagnosis IDs for the current patient
patient_diagnosis_ids = exclude_patient.diagnosis_set.values_list('diagnosis_id', flat=True)
if patient_diagnosis_ids:
patients = patients.filter(diagnosis__diagnosis_id__in=patient_diagnosis_ids).distinct()
else:
patients = patients.filter(diagnosis__diagnosis_id=patient_filter_diagnosis).distinct()
# Apply treatment filter
if patient_filter_treatment:
if patient_filter_treatment == 'match':
# Get all treatment type IDs for the current patient in a single optimized query
patient_treatment_type_ids = list(exclude_patient.diagnosis_set.values_list(
'treatment__treatment_type__id',
flat=True
).distinct())
if patient_treatment_type_ids:
# AND filter: patients must have ALL treatment types the current patient has
# Start with all patients
filtered_patients = patients
# For each treatment type, filter to patients who have that treatment type
for treatment_type_id in patient_treatment_type_ids:
filtered_patients = filtered_patients.filter(
diagnosis__treatment__treatment_type__id=treatment_type_id
)
patients = filtered_patients.distinct()
else:
patients = patients.filter(
diagnosis__treatment__treatment_type__id=patient_filter_treatment
).distinct()
# Apply age filters if specified
if patient_filter_min_age is not None or patient_filter_max_age is not None:
# Filter patients based on age
# Get patient IDs that match age criteria
matching_patient_ids = []
for patient in patients:
age = calculate_patient_age(patient)
if age is not None:
age_matches = True
# Check minimum age
if patient_filter_min_age is not None and age < patient_filter_min_age:
age_matches = False
# Check maximum age
if patient_filter_max_age is not None and age > patient_filter_max_age:
age_matches = False
if age_matches:
matching_patient_ids.append(patient.id)
# Filter queryset to only include patients with matching ages
patients = patients.filter(id__in=matching_patient_ids)
return patients
[docs]
def aggregate_construct_scores_by_time_interval(construct, patients_queryset, start_date_reference,
time_interval, max_time_interval_filter=None,
reference_time_intervals=None):
"""Aggregate construct scores from multiple patients by time intervals.
For each reference time interval from the index patient, find the most recent
observation from other patients that is at or before each reference time point.
Args:
construct: ConstructScale instance
patients_queryset: QuerySet of patients to include in aggregation
start_date_reference: Reference date type for time calculation
time_interval: Time interval type for grouping
max_time_interval_filter: Optional maximum time interval (relative to start date) for filtering submissions
reference_time_intervals: List of reference time intervals from index patient
Returns:
tuple: (aggregated_data dict, metadata dict)
"""
plotting_logger.info("="*80)
plotting_logger.info(f"AGGREGATION DATA for {construct.name}")
plotting_logger.info("="*80)
plotting_logger.info(f"Patients in aggregation: {patients_queryset.count()}")
plotting_logger.info(f"Start date reference: {start_date_reference}")
plotting_logger.info(f"Time interval: {time_interval}")
plotting_logger.info(f"Max time interval filter: {max_time_interval_filter}")
plotting_logger.info(f"Reference time intervals from index patient: {reference_time_intervals}")
if not reference_time_intervals:
plotting_logger.info("No reference time intervals provided - returning empty aggregation")
return {}, {
'total_eligible_patients': patients_queryset.count(),
'contributing_patients': 0,
'total_responses': 0,
'time_intervals_count': 0,
'time_range': 'N/A'
}
# === OPTIMIZATION: Bulk fetch all required data upfront ===
plotting_logger.info("Starting bulk data fetch optimization...")
# 1. Prefetch patients with all related data needed for start date calculations
patients_optimized = patients_queryset.prefetch_related(
'diagnosis_set__diagnosis', # For diagnosis-based start dates
'diagnosis_set__treatment_set__treatment_type' # For treatment-based start dates
).select_related()
# Convert to list to avoid re-executing the query
patients_list = list(patients_optimized)
patient_ids = [p.id for p in patients_list]
plotting_logger.info(f"Fetched {len(patients_list)} patients with prefetched data")
# 2. Bulk fetch all construct scores for all patients at once
all_scores = QuestionnaireConstructScore.objects.filter(
questionnaire_submission__patient_id__in=patient_ids,
construct=construct
).select_related(
'questionnaire_submission'
).order_by('questionnaire_submission__patient_id', 'questionnaire_submission__submission_date')
# Group scores by patient ID for fast lookup
scores_by_patient = {}
for score in all_scores:
patient_id = score.questionnaire_submission.patient_id
if patient_id not in scores_by_patient:
scores_by_patient[patient_id] = []
scores_by_patient[patient_id].append(score)
plotting_logger.info(f"Bulk fetched {all_scores.count()} construct scores across all patients")
# 3. If using diagnosis/treatment-based start dates, bulk fetch reference objects
reference_objects_cache = {}
if start_date_reference.startswith('date_of_diagnosis_'):
diagnosis_id = start_date_reference.replace('date_of_diagnosis_', '')
try:
reference_diagnosis = Diagnosis.objects.select_related('diagnosis').get(id=diagnosis_id)
reference_objects_cache['diagnosis'] = reference_diagnosis
plotting_logger.info(f"Cached reference diagnosis: {reference_diagnosis.diagnosis.diagnosis}")
except Diagnosis.DoesNotExist:
plotting_logger.warning(f"Reference diagnosis {diagnosis_id} not found")
elif start_date_reference.startswith('date_of_start_of_treatment_') or start_date_reference.startswith('date_of_end_of_treatment_'):
treatment_id_key = 'date_of_start_of_treatment_' if start_date_reference.startswith('date_of_start_of_treatment_') else 'date_of_end_of_treatment_'
treatment_id = start_date_reference.replace(treatment_id_key, '')
try:
reference_treatment = Treatment.objects.prefetch_related('treatment_type').get(id=treatment_id)
reference_treatment_type_ids = list(reference_treatment.treatment_type.values_list('id', flat=True))
reference_objects_cache['treatment'] = {
'object': reference_treatment,
'type_ids': reference_treatment_type_ids
}
plotting_logger.info(f"Cached reference treatment with {len(reference_treatment_type_ids)} treatment types")
except Treatment.DoesNotExist:
plotting_logger.warning(f"Reference treatment {treatment_id} not found")
# === OPTIMIZATION: Process all patients using bulk-fetched data ===
aggregated_data = {}
patients_with_data = 0
total_scores_processed = 0
patient_data_list = []
contributing_patients = set()
for patient in patients_list:
# Get start date for this patient using prefetched data
start_date = _get_patient_start_date_bulk(patient, start_date_reference, reference_objects_cache)
if not start_date:
continue
# Get construct scores for this patient from bulk-fetched data
patient_scores_list = scores_by_patient.get(patient.id, [])
# Apply max time interval filter if specified (in memory)
if max_time_interval_filter is not None:
filtered_scores = []
for score in patient_scores_list:
interval_value = calculate_time_interval_value(
score.questionnaire_submission.submission_date,
start_date,
time_interval
)
if interval_value <= max_time_interval_filter:
filtered_scores.append(score)
patient_scores_list = filtered_scores
# Filter out scores with negative time intervals (in memory)
filtered_scores = filter_positive_intervals_construct(patient_scores_list, start_date, time_interval)
if not filtered_scores:
continue
patients_with_data += 1
patient_scores = []
patient_contributed = False
# Calculate time intervals for all scores from this patient
patient_time_data = []
for score in filtered_scores:
if score.score is None:
continue
interval_value = calculate_time_interval_value(
score.questionnaire_submission.submission_date,
start_date,
time_interval
)
patient_time_data.append({
'interval': interval_value,
'score': float(score.score),
'submission_date': score.questionnaire_submission.submission_date
})
# Sort by time interval
patient_time_data.sort(key=lambda x: x['interval'])
# For each reference time interval, find the most recent observation at or before that time
for ref_interval in reference_time_intervals:
# Find all observations at or before this reference time
eligible_observations = [obs for obs in patient_time_data if obs['interval'] <= ref_interval]
if eligible_observations:
# Get the most recent observation (highest interval ≤ ref_interval)
most_recent = max(eligible_observations, key=lambda x: x['interval'])
if ref_interval not in aggregated_data:
aggregated_data[ref_interval] = []
aggregated_data[ref_interval].append(most_recent['score'])
total_scores_processed += 1
patient_contributed = True
# Store for patient-level logging
patient_scores.append({
'Reference_Interval': f"{ref_interval:.2f}",
'Patient_Interval': f"{most_recent['interval']:.2f}",
'Score': f"{most_recent['score']:.1f}",
'Submission_Date': most_recent['submission_date'].strftime('%Y-%m-%d %H:%M')
})
else:
# No observations at or before this reference time
patient_scores.append({
'Reference_Interval': f"{ref_interval:.2f}",
'Patient_Interval': 'No data ≤ ref time',
'Score': 'N/A',
'Submission_Date': 'N/A'
})
# Track patients that actually contributed data
if patient_contributed:
contributing_patients.add(patient.id)
# Add patient data to list for tabular logging
if patient_scores:
patient_data_list.append({
'patient': patient,
'start_date': start_date,
'scores': patient_scores,
'score_count': len([s for s in patient_scores if s['Score'] != 'N/A'])
})
# Log patient-level data in tables
plotting_logger.info("\nPATIENT-LEVEL AGGREGATION DATA:")
plotting_logger.info(f"Patients with data: {patients_with_data}")
plotting_logger.info(f"Total scores processed: {total_scores_processed}")
for patient_data in patient_data_list[:5]: # Show first 5 patients as example
plotting_logger.info(f"\nPatient: {patient_data['patient'].name} (Start: {patient_data['start_date']})")
plotting_logger.info(f"Scores count: {patient_data['score_count']}")
if patient_data['scores']:
df_patient = pd.DataFrame(patient_data['scores'])
plotting_logger.info(f"\n{df_patient.to_string(index=False)}")
if len(patient_data_list) > 5:
plotting_logger.info(f"\n... and {len(patient_data_list) - 5} more patients with data")
# Log aggregated data summary in tabular format
plotting_logger.info("\nAGGREGATED DATA SUMMARY:")
plotting_logger.info(f"Reference intervals with data: {len(aggregated_data)}")
if aggregated_data:
summary_data = []
for interval, values in sorted(aggregated_data.items()):
summary_data.append({
'Reference_Interval': f"{interval:.2f}",
'Score_Count': len(values),
'Min_Score': f"{min(values):.1f}",
'Max_Score': f"{max(values):.1f}",
'Mean_Score': f"{sum(values)/len(values):.2f}",
'Scores': f"[{', '.join([f'{v:.1f}' for v in sorted(values)])}]"
})
df_summary = pd.DataFrame(summary_data)
plotting_logger.info(f"\n{df_summary.to_string(index=False)}")
else:
plotting_logger.info("No aggregated data available")
# Calculate time interval range
time_intervals = sorted(aggregated_data.keys()) if aggregated_data else []
time_range = None
if time_intervals:
min_interval = min(time_intervals)
max_interval = max(time_intervals)
if min_interval == max_interval:
time_range = f"{min_interval:.1f}"
else:
time_range = f"{min_interval:.1f} - {max_interval:.1f}"
# Create metadata with detailed patient information
patient_details = {
'contributing': [],
'non_contributing': []
}
# Add patient details to metadata
for patient_data in patient_data_list:
patient_info = {
'id': patient_data['patient'].id,
'name': patient_data['patient'].name,
'start_date': patient_data['start_date'].strftime('%Y-%m-%d'),
'score_count': patient_data['score_count']
}
if patient_data['patient'].id in contributing_patients:
patient_details['contributing'].append(patient_info)
else:
patient_details['non_contributing'].append(patient_info)
# Add non-contributing patients (those without any scores in the dataset)
for patient in patients_queryset:
if patient.id not in [p['patient'].id for p in patient_data_list]:
start_date = get_patient_start_date_for_aggregation(patient, start_date_reference)
patient_details['non_contributing'].append({
'id': patient.id,
'name': patient.name,
'start_date': start_date.strftime('%Y-%m-%d') if start_date else 'N/A',
'score_count': 0
})
metadata = {
'total_eligible_patients': patients_queryset.count(),
'contributing_patients': len(contributing_patients),
'total_responses': total_scores_processed,
'time_intervals_count': len(time_intervals),
'time_range': time_range or 'N/A',
'time_interval_unit': get_interval_label(time_interval).lower(),
'patient_details': patient_details
}
plotting_logger.info("="*80)
plotting_logger.info(f"OPTIMIZATION RESULTS: Used bulk fetching instead of {len(patients_list)} individual patient queries")
plotting_logger.info(f"AGGREGATION METADATA: {metadata['contributing_patients']}/{metadata['total_eligible_patients']} patients contributed {metadata['total_responses']} scores across {metadata['time_intervals_count']} intervals")
plotting_logger.info("="*80)
return aggregated_data, metadata
[docs]
def calculate_aggregation_statistics(aggregated_data, aggregation_type='median_iqr'):
"""Calculate aggregation statistics for each time interval.
Args:
aggregated_data: Dict mapping time intervals to lists of values
aggregation_type: Type of aggregation to perform
Returns:
dict: Statistics for each time interval
"""
import numpy as np
from scipy import stats
plotting_logger.info("="*80)
plotting_logger.info("STATISTICS CALCULATION")
plotting_logger.info("="*80)
plotting_logger.info(f"Aggregation type: {aggregation_type}")
plotting_logger.info(f"Input data intervals: {len(aggregated_data)}")
statistics = {}
calculation_data = []
for interval, values in aggregated_data.items():
if not values or len(values) < 2: # Need at least 2 values for meaningful statistics
calculation_data.append({
'Time_Interval': f"{interval:.1f}",
'Value_Count': len(values),
'Status': 'Skipped (need ≥2 values)',
'Central': 'N/A',
'Lower': 'N/A',
'Upper': 'N/A',
'Values': f"[{', '.join([f'{v:.1f}' for v in values]) if values else 'None'}]"
})
continue
values_array = np.array(values)
n = len(values)
if aggregation_type == 'median_iqr':
median = np.median(values_array)
q25 = np.percentile(values_array, 25)
q75 = np.percentile(values_array, 75)
statistics[interval] = {
'central': median,
'lower': q25,
'upper': q75,
'n': n
}
calculation_data.append({
'Time_Interval': f"{interval:.1f}",
'Value_Count': n,
'Status': 'Calculated',
'Central': f"{median:.2f} (median)",
'Lower': f"{q25:.2f} (Q25)",
'Upper': f"{q75:.2f} (Q75)",
'Values': f"[{', '.join([f'{v:.1f}' for v in sorted(values)])}]"
})
elif aggregation_type == 'mean_95ci':
mean = np.mean(values_array)
sem = stats.sem(values_array) # Standard error of mean
ci = stats.t.interval(0.95, n-1, loc=mean, scale=sem)
statistics[interval] = {
'central': mean,
'lower': ci[0],
'upper': ci[1],
'n': n
}
calculation_data.append({
'Time_Interval': f"{interval:.1f}",
'Value_Count': n,
'Status': 'Calculated',
'Central': f"{mean:.2f} (mean)",
'Lower': f"{ci[0]:.2f} (95% CI lower)",
'Upper': f"{ci[1]:.2f} (95% CI upper)",
'Values': f"[{', '.join([f'{v:.1f}' for v in sorted(values)])}]"
})
elif aggregation_type.startswith('mean_'):
mean = np.mean(values_array)
std = np.std(values_array, ddof=1) # Sample standard deviation
# Extract the multiplier from the aggregation type
if aggregation_type == 'mean_0.5sd':
multiplier = 0.5
elif aggregation_type == 'mean_1sd':
multiplier = 1.0
elif aggregation_type == 'mean_2sd':
multiplier = 2.0
elif aggregation_type == 'mean_2.5sd':
multiplier = 2.5
else:
multiplier = 1.0
statistics[interval] = {
'central': mean,
'lower': mean - (multiplier * std),
'upper': mean + (multiplier * std),
'n': n
}
calculation_data.append({
'Time_Interval': f"{interval:.1f}",
'Value_Count': n,
'Status': 'Calculated',
'Central': f"{mean:.2f} (mean)",
'Lower': f"{mean - (multiplier * std):.2f} (-{multiplier}SD)",
'Upper': f"{mean + (multiplier * std):.2f} (+{multiplier}SD)",
'Values': f"[{', '.join([f'{v:.1f}' for v in sorted(values)])}]"
})
# Log calculation results in tabular format
plotting_logger.info("\nSTATISTICS CALCULATION RESULTS:")
if calculation_data:
df_calc = pd.DataFrame(calculation_data)
plotting_logger.info(f"\n{df_calc.to_string(index=False)}")
else:
plotting_logger.info("No calculation data available")
# Log final statistics summary
plotting_logger.info(f"\nFINAL STATISTICS SUMMARY:")
plotting_logger.info(f"Valid intervals with statistics: {len(statistics)}")
if statistics:
final_stats = []
for interval, stats_dict in sorted(statistics.items()):
final_stats.append({
'Time_Interval': f"{interval:.1f}",
'Central': f"{stats_dict['central']:.2f}",
'Lower_Bound': f"{stats_dict['lower']:.2f}",
'Upper_Bound': f"{stats_dict['upper']:.2f}",
'Sample_Size': stats_dict['n'],
'Range_Width': f"{stats_dict['upper'] - stats_dict['lower']:.2f}"
})
df_final = pd.DataFrame(final_stats)
plotting_logger.info(f"\n{df_final.to_string(index=False)}")
else:
plotting_logger.info("No valid statistics calculated")
plotting_logger.info("="*80)
plotting_logger.info("END STATISTICS CALCULATION")
plotting_logger.info("="*80)
return statistics
[docs]
def get_plotting_log_file_path():
"""Get the path to the plotting data log file."""
log_dir = os.path.join(settings.BASE_DIR, 'logs')
return os.path.join(log_dir, 'plotting_data.log')
[docs]
def clear_plotting_log():
"""Clear the plotting data log file."""
log_file = get_plotting_log_file_path()
try:
with open(log_file, 'w') as f:
f.write('')
plotting_logger.info("Plotting data log file cleared")
return True
except Exception as e:
logger.error(f"Error clearing plotting log: {e}")
return False
[docs]
def log_plotting_session_start(patient_name, constructs_count):
"""Log the start of a new plotting session."""
plotting_logger.info("=" * 100)
plotting_logger.info(f"NEW PLOTTING SESSION STARTED")
plotting_logger.info(f"Patient: {patient_name}")
plotting_logger.info(f"Number of constructs to plot: {constructs_count}")
plotting_logger.info(f"Session started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
plotting_logger.info("=" * 100)
[docs]
def calculate_patient_age(patient, reference_date=None):
"""Get the age of a patient.
Args:
patient: Patient instance
reference_date: Date to calculate age at (not used, kept for compatibility)
Returns:
int or None: Age in years, or None if age is not available
"""
if not hasattr(patient, 'age') or patient.age is None:
return None
try:
return int(patient.age)
except (ValueError, TypeError):
logger.error(f"Error getting age for patient {patient.id}: invalid age value {patient.age}")
return None
# Institution-based Access Control Utilities
[docs]
def get_user_institution(user):
"""
Get the institution for the current user if they are a provider.
Returns None if the user is not a provider or has no institution.
"""
try:
return user.provider.institution
except AttributeError:
return None
[docs]
def is_provider_user(user):
"""Check if the user is a provider (has a provider profile)."""
try:
return hasattr(user, 'provider') and user.provider is not None
except AttributeError:
return False
[docs]
def filter_patients_by_institution(queryset, user):
"""
Filter a Patient queryset based on the user's institution.
If the user is a provider, only return patients from their institution.
If the user is not a provider, return all patients (assuming they have appropriate permissions).
"""
user_institution = get_user_institution(user)
if user_institution:
return queryset.filter(institution=user_institution)
return queryset
[docs]
def check_patient_access(user, patient):
"""
Check if a user can access a specific patient.
Returns True if access is allowed, False otherwise.
Provider users can only access patients from their institution.
"""
user_institution = get_user_institution(user)
# If user has an institution, check if it matches the patient's institution
if user_institution:
return patient.institution == user_institution
# If user has no institution (not a provider), deny access
# Only providers should be submitting questionnaires
return False
[docs]
def get_accessible_patient_or_404(user, pk):
"""
Get a patient by pk, ensuring the user has access to it.
Raises 404 if patient doesn't exist, PermissionDenied if no access.
"""
from django.core.exceptions import PermissionDenied
patient = get_object_or_404(Patient, pk=pk)
if not check_patient_access(user, patient):
raise PermissionDenied(
"You do not have permission to access patients from other institutions."
)
return patient
# Institution Filtering Mixin for Class-Based Views
[docs]
class InstitutionFilterMixin:
"""
Mixin for class-based views that automatically filters Patient querysets
based on the user's institution.
"""
[docs]
def get_user_institution(self):
"""Get the institution for the current user."""
return get_user_institution(self.request.user)
[docs]
def get_queryset(self):
"""Filter the queryset based on user's institution."""
qs = super().get_queryset()
# Only apply institution filtering if the model is Patient
if hasattr(qs.model, 'institution'):
qs = filter_patients_by_institution(qs, self.request.user)
return qs
[docs]
def get_object(self, queryset=None):
"""
Get the object, ensuring the user has access to it.
This method is called by DetailView, UpdateView, DeleteView, etc.
"""
obj = super().get_object(queryset)
# Only check access if the object is a Patient
if isinstance(obj, Patient):
check_patient_access(self.request.user, obj)
return obj