Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/airflow/model/common/gantt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl GanttData {
}

/// Recalculate `window_start` and `window_end` from all tries.
fn recompute_window(&mut self) {
pub fn recompute_window(&mut self) {
let mut min_start: Option<OffsetDateTime> = None;
let mut max_end: Option<OffsetDateTime> = None;
let mut any_running = false;
Expand Down
47 changes: 43 additions & 4 deletions src/app/model/taskinstances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use ratatui::widgets::{Block, BorderType, Borders, Row, StatefulWidget, Table, W

use crate::airflow::graph::{sort_task_instances, TaskGraph};
use crate::airflow::model::common::{
calculate_duration, format_duration, GanttData, TaskId, TaskInstance, TaskInstanceState,
calculate_duration, format_duration, DagId, DagRunId, GanttData, TaskId, TaskInstance,
TaskInstanceState,
};
use crate::app::events::custom::FlowrsEvent;
use crate::ui::common::{create_headers, state_to_colored_square};
Expand All @@ -32,6 +33,9 @@ pub struct TaskInstanceModel {
pub popup: Popup<TaskInstancePopUp>,
/// Gantt chart data computed from task instances and their tries
pub gantt_data: GanttData,
/// Tracks which DAG + run the cached `gantt_data` belongs to, so we can
/// invalidate it when the user navigates to a different DAG or run.
current_gantt_key: Option<(DagId, DagRunId)>,
ticks: u32,
event_buffer: Vec<KeyCode>,
pub task_graph: Option<TaskGraph>,
Expand All @@ -43,6 +47,7 @@ impl Default for TaskInstanceModel {
table: FilterableTable::new(),
popup: Popup::None,
gantt_data: GanttData::default(),
current_gantt_key: None,
ticks: 0,
event_buffer: Vec::new(),
task_graph: None,
Expand All @@ -55,6 +60,17 @@ impl TaskInstanceModel {
Self::default()
}

/// Notify the model which DAG + run is now active.
/// Resets `gantt_data` when either the DAG or the run changes so cached
/// retry history cannot leak into a different context.
pub fn set_gantt_context(&mut self, dag_id: &DagId, dag_run_id: &DagRunId) {
let key = (dag_id.clone(), dag_run_id.clone());
if self.current_gantt_key.as_ref() != Some(&key) {
self.gantt_data = GanttData::default();
self.current_gantt_key = Some(key);
}
}

/// Sort task instances by topological order (or timestamp fallback)
pub fn sort_task_instances(&mut self) {
if let Some(graph) = &self.task_graph {
Expand All @@ -64,15 +80,38 @@ impl TaskInstanceModel {

/// Rebuild Gantt data from the current task instance list.
/// Returns task IDs that have retries (`try_number` > 1) for fetching detailed tries.
///
/// For tasks with retries, previously cached try history is preserved so the
/// Gantt chart does not flicker while fresh retry data is being fetched.
pub fn rebuild_gantt(&mut self) -> Vec<TaskId> {
self.gantt_data = GanttData::from_task_instances(&self.table.all);
let mut new_gantt = GanttData::from_task_instances(&self.table.all);

// Collect retried task IDs and carry over cached retry data
let mut seen = HashSet::new();
self.table
let retried: Vec<TaskId> = self
.table
.all
.iter()
.filter(|ti| ti.try_number > 1 && seen.insert(ti.task_id.clone()))
.map(|ti| ti.task_id.clone())
.collect()
.collect();

for task_id in &retried {
if let Some(cached_tries) = self.gantt_data.task_tries.get(task_id) {
// Only carry over if the cache has more tries than the fresh data
// (i.e. it already includes the detailed retry history)
let new_tries = new_gantt.task_tries.get(task_id);
if cached_tries.len() > new_tries.map_or(0, Vec::len) {
new_gantt
.task_tries
.insert(task_id.clone(), cached_tries.clone());
}
}
}

new_gantt.recompute_window();
self.gantt_data = new_gantt;
retried
}

/// Mark a task instance with a new status (optimistic update)
Expand Down
1 change: 1 addition & 0 deletions src/app/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ impl App {
if let (Some(dag_id), Some(dag_run_id)) =
(self.nav_context.dag_id(), self.nav_context.dag_run_id())
{
self.task_instances.set_gantt_context(dag_id, dag_run_id);
self.task_instances.table.all = self
.environment_state
.get_active_task_instances(dag_id, dag_run_id);
Expand Down