Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion src/data_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def export_data(self, df: pd.DataFrame, report_date: datetime.date, overtime_rep
color = workbook.add_format({"bg_color": "#00CC00", "align": "center"})
worksheet = workbook.add_worksheet()
cell_width = 20
worksheet.set_column("A:B", cell_width)
worksheet.set_column("A:F", cell_width) # Extended to column F for new columns
self._write_information(worksheet, df, bold, color, overtime_report)
self._write_times(worksheet, df, color, overtime_report)
workbook.close()
Expand Down Expand Up @@ -64,6 +64,10 @@ def _write_information(
worksheet.write("B6", "Over 8 h")
else:
worksheet.write("B6", "Work Time")
worksheet.write("C6", "Start Time")
worksheet.write("D6", "End Time")
worksheet.write("E6", "Break Time")
worksheet.write("F6", "Total Time")

def _write_times(
self,
Expand All @@ -80,5 +84,33 @@ def _write_times(
_time = self._round_quarterly(max(row["work"] - time_to_subtract, 0))
worksheet.write(f"B{7 + i}", _time, color)

# Add start time
start_time = row.get("start_time")
if start_time and not pd.isna(start_time):
worksheet.write(f"C{7 + i}", start_time.strftime("%H:%M"), color)
else:
worksheet.write(f"C{7 + i}", "-", color)

# Add end time
end_time = row.get("end_time")
if end_time and not pd.isna(end_time):
worksheet.write(f"D{7 + i}", end_time.strftime("%H:%M"), color)
else:
worksheet.write(f"D{7 + i}", "-", color)

# Add break time
break_time = row.get("break_time", 0)
if break_time and not pd.isna(break_time):
worksheet.write(f"E{7 + i}", self._round_quarterly(break_time), color)
else:
worksheet.write(f"E{7 + i}", 0, color)

# Add total time (work + break)
total_time = row.get("work_time", 0)
if total_time and not pd.isna(total_time):
worksheet.write(f"F{7 + i}", self._round_quarterly(total_time), color)
else:
worksheet.write(f"F{7 + i}", 0, color)


EXPORTER = DataExporter()
167 changes: 117 additions & 50 deletions src/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def update_data(self, selected_date: datetime.date | None) -> None:
self.generate_daily_data(selected_date)
_, df = self.generate_month_data(selected_date)
self.df = df
print(df)

def _get_holidays(self, year: int) -> list[datetime.date]:
available_holidays = holidays.CountryHoliday(
Expand Down Expand Up @@ -56,7 +57,12 @@ def get_year_data(self, year: int) -> pd.DataFrame:
# in case there is no data for the year, return empty df
if year_data_df.empty:
return year_data_df
year_data_df = year_data_df.resample("ME").sum()

# Only sum numeric columns, exclude time-based columns
numeric_columns = ["work_time", "pause", "work", "break_time"]
columns_to_sum = [col for col in numeric_columns if col in year_data_df.columns]

year_data_df = year_data_df[columns_to_sum].resample("ME").sum()
year_data_df.index = year_data_df.index.to_period("M") # type: ignore
return year_data_df

Expand All @@ -80,83 +86,144 @@ def generate_month_data(self, selected_date: datetime.date) -> tuple[int, pd.Dat
return (data_hash, df)
if not work_data:
return (data_hash, pd.DataFrame([]))
work_df = self._create_work_df(work_data)
day_list = self._get_days_of_month(selected_date)
daily_time_list = self._generate_monthly_time(work_df, day_list, free_days)
return (data_hash, self._generate_report_df(day_list, daily_time_list, pause_data))

def _create_work_df(self, data: list[tuple[str, str]]) -> pd.DataFrame:
df_data = pd.DataFrame(data, columns=["datetime", "event"])
df_data["datetime"] = df_data["datetime"].apply(pd.to_datetime)
df_data["time"] = df_data["datetime"].dt.time
df_data["date"] = df_data["datetime"].dt.date
return df_data

def _get_days_of_month(self, selected_date: datetime.date) -> pd.DatetimeIndex:
return (data_hash, self._generate_month_report(work_data, selected_date, free_days, pause_data))

def _generate_month_report(
self,
work_data: list[tuple[str, str]],
selected_date: datetime.date,
free_days: list[datetime.date],
pause_data: list[tuple[str, int]],
) -> pd.DataFrame:
"""Generate the complete monthly report DataFrame with all columns."""
daily_minutes = CONFIG_HANDLER.config.daily_hours * 60

work_df = pd.DataFrame(work_data, columns=["datetime", "event"])
work_df["datetime"] = work_df["datetime"].apply(pd.to_datetime)
work_df["time"] = work_df["datetime"].dt.time
work_df["date"] = work_df["datetime"].dt.date

start = datetime.date(selected_date.year, selected_date.month, 1)
end = start + relativedelta(months=+1)
return pd.date_range(start, end - datetime.timedelta(days=1), freq="d")

def _generate_monthly_time(
self, df: pd.DataFrame, full_month: pd.DatetimeIndex, free_days: list[datetime.date]
) -> list[float]:
time_list: list[float] = []
daily_minutes = CONFIG_HANDLER.config.daily_hours * 60
for _day in full_month:
days_data = df[df["date"] == _day.date()]
report_data = []

for day in pd.date_range(start, end - datetime.timedelta(days=1), freq="d"):
days_data = work_df[work_df["date"] == day.date()]
calculated_time = 0.0
# when we got a free day, we get the working time for this day
if _day.date() in free_days:
if day.date() in free_days:
calculated_time += daily_minutes
# also adds working time (in case of work in free day we get overtime)
calculated_time += self._calculate_day_time(days_data)
time_list.append(calculated_time)
return time_list

def _calculate_day_time(self, df: pd.DataFrame) -> float:
day_work_time, start_time, end_time = self._calculate_day_time_with_times(days_data)
calculated_time += day_work_time
report_data.append(
{"day": day, "work_time": calculated_time, "start_time": start_time, "end_time": end_time}
)

combined_df = pd.DataFrame(report_data)
combined_df.set_index("day", inplace=True)

if pause_data:
pause_df = pd.DataFrame(pause_data, columns=["day", "pause"])
pause_df["day"] = pause_df["day"].apply(pd.to_datetime)
pause_df.set_index("day", inplace=True)
combined_df = pd.concat([combined_df, pause_df], axis=1, sort=False)
else:
combined_df["pause"] = 0.0

# Only fill pause column with 0, leave start_time and end_time as None when no data
combined_df["pause"] = combined_df["pause"].fillna(0).astype(float)
combined_df["work_time"] = combined_df["work_time"].apply(lambda x: round(x / 60, 2))
combined_df["pause"] = combined_df["pause"].apply(lambda x: round(x / 60, 2))
combined_df["work"] = combined_df["work_time"] - combined_df["pause"]
combined_df["work"] = combined_df["work"].apply(lambda x: max(x, 0)).apply(lambda x: round(x, 2))

def calculate_break_time(row: pd.Series) -> float:
# Check if we have valid start and end times
start_time = row["start_time"]
end_time = row["end_time"]

# Return 0 if either time is missing or None
if pd.isna(start_time) or pd.isna(end_time) or start_time is None or end_time is None:
return 0.0

# Ensure we have datetime.time objects, not numbers or other types
if not isinstance(start_time, datetime.time) or not isinstance(end_time, datetime.time):
return 0.0

try:
start_dt = datetime.datetime.combine(datetime.date.today(), start_time)
end_dt = datetime.datetime.combine(datetime.date.today(), end_time)

# Handle case where end time is on the next day (overnight work)
if end_dt < start_dt:
end_dt += datetime.timedelta(days=1)

total_minutes = (end_dt - start_dt).total_seconds() / 60
work_time_minutes = row["work_time"] * 60 # work_time is already in hours
break_minutes = total_minutes - work_time_minutes
return round(max(break_minutes / 60, 0), 2) # Convert back to hours, ensure non-negative

except (TypeError, ValueError, AttributeError):
# Handle any unexpected type or value errors gracefully
return 0.0

combined_df["break_time"] = combined_df.apply(calculate_break_time, axis=1)
return combined_df

def _calculate_day_time_with_times(
self, df: pd.DataFrame
) -> tuple[float, datetime.time | None, datetime.time | None]:
"""Calculate the total work time for a day, along with the start and end times.

Args:
df (pd.DataFrame): DataFrame containing work log entries for a specific day.

Returns:
tuple[float, datetime.time | None, datetime.time | None]: A tuple containing the total work time in minutes,
the earliest start time, and the latest end time.

"""
if df.empty:
return 0.0, None, None

total_time = datetime.timedelta()
start_found = False
earliest_start = None
latest_end = None

for _, row in df.iterrows():
if not start_found and row["event"] == "start":
start_found = True
start_time: datetime.datetime = row["datetime"]
if earliest_start is None:
earliest_start = start_time.time()
elif start_found and row["event"] == "stop":
start_found = False
end_time: datetime.datetime = row["datetime"]
latest_end = end_time.time()
total_time += row["datetime"] - start_time

# check if a start was found, but no stop, in this case, either the user forgot to stop the clock or the
# day is currently ongoing (date = today)
if not start_found:
return round(total_time.seconds / 60, 2)
return round(total_time.seconds / 60, 2), earliest_start, latest_end

# check for today.
today = datetime.date.today()
# check if start clock is the same day as today
if df.iloc[0]["date"] == today:
total_time += datetime.datetime.now() - start_time
current_time = datetime.datetime.now()
total_time += current_time - start_time
latest_end = current_time.time()
# else, use the midnight of this day as end
else:
next_day = start_time + datetime.timedelta(days=1)
end_of_day = datetime.datetime.combine(next_day, datetime.time.min) # type: ignore
total_time += end_of_day - start_time
return round(total_time.seconds / 60, 2)

def _generate_report_df(
self, month_list: pd.DatetimeIndex, monthly_time: list[float], pause_time: list[tuple[str, int]]
) -> pd.DataFrame:
work_df = pd.DataFrame({"day": month_list, "work_time": monthly_time})
work_df.set_index("day", inplace=True)

pause_df = pd.DataFrame(pause_time, columns=["day", "pause"])
pause_df["day"] = pause_df["day"].apply(pd.to_datetime)
pause_df.set_index("day", inplace=True)
latest_end = end_of_day.time()

combined_df = pd.concat([work_df, pause_df], axis=1, sort=False)
combined_df["pause"] = combined_df["pause"].astype(float)
combined_df.fillna(0, inplace=True)
combined_df["work_time"] = combined_df["work_time"].apply(lambda x: round(x / 60, 2))
combined_df["pause"] = combined_df["pause"].apply(lambda x: round(x / 60, 2))
combined_df["work"] = combined_df["work_time"] - combined_df["pause"]
combined_df["work"] = combined_df["work"].apply(lambda x: max(x, 0)).apply(lambda x: round(x, 2))
return combined_df
return round(total_time.seconds / 60, 2), earliest_start, latest_end

def is_current_month(self, date: datetime.date) -> bool:
now = datetime.date.today()
Expand Down