diff --git a/account_statement_import_sheet_file_bg/models/account_statement_import.py b/account_statement_import_sheet_file_bg/models/account_statement_import.py index 0f192a1a..e2b004bb 100644 --- a/account_statement_import_sheet_file_bg/models/account_statement_import.py +++ b/account_statement_import_sheet_file_bg/models/account_statement_import.py @@ -124,7 +124,8 @@ def import_file_button(self, wizard_data=None): def split_base64_excel(self, header_rows_count, rows_per_file_limit): """Split Excel file into multiple parts to avoid overloading the system. - Returns empty list if file is not a valid Excel or if split is not needed.""" + Returns empty list if file is not a valid Excel or if split is not needed. + Only processes rows where the date column is not empty.""" if not self.statement_file: return [] @@ -143,6 +144,16 @@ def split_base64_excel(self, header_rows_count, rows_per_file_limit): header_rows = all_rows[:header_rows_count] data_rows = all_rows[header_rows_count:] + + # Get the date column index from the sheet mapping using the parser's method + parser = self.env["account.statement.import.sheet.parser"] + header = parser.parse_header((input_workbook, input_worksheet), self.sheet_mapping_id) + date_column_indexes = parser._get_column_indexes(header, "timestamp_column", self.sheet_mapping_id) + date_column_index = date_column_indexes[0] if date_column_indexes else None + + # Filter out rows where the date column is empty + data_rows = self._filter_rows_with_date(data_rows, date_column_index) + start_row_index = 0 total_data_rows = len(data_rows) @@ -169,3 +180,20 @@ def split_base64_excel(self, header_rows_count, rows_per_file_limit): start_row_index = end_row_index return output_base64_list + + def _filter_rows_with_date(self, data_rows, date_column_index): + """Filter data rows to only include rows where the date column is not empty. + If date_column_index is None, return all rows.""" + if date_column_index is None: + return data_rows + + filtered_rows = [] + for row in data_rows: + # Check if the row has enough columns and the date column is not empty + if len(row) > date_column_index and row[date_column_index].value: + filtered_rows.append(row) + elif len(row) > date_column_index and not row[date_column_index].value: + # Stop processing when we find the first empty date + break + + return filtered_rows diff --git a/base_bg/__manifest__.py b/base_bg/__manifest__.py index 2ee6b6f3..4334f1a5 100644 --- a/base_bg/__manifest__.py +++ b/base_bg/__manifest__.py @@ -19,7 +19,7 @@ ############################################################################## { "name": "Base Background Jobs", - "version": "18.0.1.0.1", + "version": "18.0.1.0.2", "category": "Technical", "author": "ADHOC SA", "website": "https://www.adhoc.com.ar", diff --git a/base_bg/demo/bg_job_demo.xml b/base_bg/demo/bg_job_demo.xml index 1cae93dd..09891ab9 100644 --- a/base_bg/demo/bg_job_demo.xml +++ b/base_bg/demo/bg_job_demo.xml @@ -1,5 +1,6 @@ + BG Demo Customer bg.demo.customer@example.com @@ -10,26 +11,109 @@ bg.demo.vendor@example.com - - Demo Data Cleanup + + + Single Job - Data Cleanup res.partner exists 5 done + single-batch-001 2024-01-01 08:00:00 2024-01-01 08:05:00 + + + + + + + Export Batch - Part 3/3 + res.partner + exists + 10 + done + batch-completed-001 + 2024-01-02 09:20:00 + 2024-01-02 09:30:00 + + + 2024-01-02 08:00:00 + + + + Export Batch - Part 2/3 + res.partner + exists + 10 + done + batch-completed-001 + 2024-01-02 09:10:00 + 2024-01-02 09:20:00 + + + + 2024-01-02 07:00:00 + + + + Export Batch - Part 1/3 + res.partner + exists + 10 + done + batch-completed-001 + 2024-01-02 09:00:00 + 2024-01-02 09:10:00 + + + + 2024-01-02 06:00:00 + + + + + Validation Batch - Part 3/3 + res.partner + exists + 15 + canceled + batch-failed-002 + 2024-01-03 14:10:00 + Canceled due to previous job failure in batch + 2024-01-03 14:00:00 - - Demo Follow Up + + Validation Batch - Part 2/3 res.partner exists 15 failed - 2024-01-02 10:00:00 - 2024-01-02 12:00:00 - Timeout while sending follow up + batch-failed-002 + 2024-01-03 14:05:00 + 2024-01-03 14:10:00 + Validation error: Invalid data format + + + + 2024-01-03 13:00:00 + + + Validation Batch - Part 1/3 + res.partner + exists + 15 + done + batch-failed-002 + 2024-01-03 14:00:00 + 2024-01-03 14:05:00 + + + + 2024-01-03 12:00:00 + + diff --git a/base_bg/migrations/18.0.1.0.2/post-migration.py b/base_bg/migrations/18.0.1.0.2/post-migration.py new file mode 100644 index 00000000..d4b0d726 --- /dev/null +++ b/base_bg/migrations/18.0.1.0.2/post-migration.py @@ -0,0 +1,31 @@ +############################################################################## +# For copyright and license notices, see __manifest__.py file in module root +# directory +############################################################################## +import logging +import uuid + +_logger = logging.getLogger(__name__) + + +def migrate(cr, version): + """ + Migration script to set batch_key for existing jobs. + + Each existing job will be treated as a single-job batch: + - batch_key: unique UUID for each job + """ + cr.execute(""" + SELECT id FROM bg_job + WHERE batch_key IS NULL + """) + job_ids = [row[0] for row in cr.fetchall()] + updates = [(str(uuid.uuid4()), job_id) for job_id in job_ids] + cr.executemany( + """ + UPDATE bg_job + SET batch_key = %s + WHERE id = %s + """, + updates, + ) diff --git a/base_bg/models/base_bg.py b/base_bg/models/base_bg.py index 52634247..2631dafe 100644 --- a/base_bg/models/base_bg.py +++ b/base_bg/models/base_bg.py @@ -3,56 +3,111 @@ # directory ############################################################################## +import json +import uuid +from typing import TYPE_CHECKING, Any + from odoo import _, api, models +if TYPE_CHECKING: + from base_bg.models.bg_job import BgJob + class BaseBg(models.AbstractModel): _name = "base.bg" _description = "Background Job Mixin" @api.model - def bg_enqueue(self, method: str, *args, **kwargs): + def bg_enqueue_records( + self, records: models.BaseModel, method: str, threshold: int | None = None, *args, **kwargs + ) -> tuple[dict, "BgJob"]: """ - Enqueue a background job for execution. - - :param method: The method name to execute + Enqueue background jobs in batches based on record threshold. - Special kwargs: - :param max_retries: Maximum retry attempts (default: 3) + This is a model/API method and must be called on the model, passing + the target records as the first argument. Example: + self.env['base.bg'].bg_enqueue_records(records, 'method_name', threshold=..., ...) - :return: A display notification + :param records: recordset to process; can be empty for no specific targets + :param method: The method name to execute on each batch + :param threshold: Maximum number of records per job + :param args: Positional arguments for the method + :param kwargs: Keyword arguments for the method + Special kwargs: + :param priority: Job priority (default: 10) + :param max_retries: Maximum retries for the job (default: 3) + :param name: Base name for the job(s) (default: model.method-uuid) + :return: A display notification and the created jobs """ + # Normalize records into ids; allow None/empty to mean no targets + jobs = self.env["bg.job"] + model = records._name + record_ids = records.ids if records else [] + context = {k: v for k, v in self.env.context.items() if self.is_serializable(v)} + priority = max(kwargs.pop("priority", 10), 0) max_retries = kwargs.pop("max_retries", 3) - name = kwargs.pop("name", f"{self._name}.{method}") - job_vals = { - "name": name, - "model": self._name, - "method": method, - "max_retries": max_retries, - "context_json": dict(self.env.context), - } - - # Handle recordset: store IDs for later reconstruction - if self: - kwargs["_record_ids"] = self.ids - - # Serialize arguments - job_vals["args_json"] = list(args) if args else [] - job_vals["kwargs_json"] = kwargs - self.env["bg.job"].create(job_vals) + name = kwargs.pop("name", "") + + def _get_name(batch_key: str, queue_order: int) -> str: + return name or "%s.%s-%s-%s" % (model, method, batch_key[0:8], queue_order) + + batch_key = str(uuid.uuid4()) + total = len(record_ids) or 1 # Ensure at least one job if no records + threshold = max(1, threshold or total) + prev_job = None + for i in range(0, total, threshold): + chunk_ids = record_ids[i : i + threshold] + queue_order = i // threshold + job_vals = { + "name": _get_name(batch_key, queue_order), + "model": model, + "method": method, + "priority": priority, + "max_retries": max_retries, + "context_json": context, + "batch_key": batch_key, + "state": "enqueued" if queue_order == 0 else "waiting", + } + job_kwargs = kwargs.copy() + job_kwargs["_record_ids"] = list(chunk_ids) if chunk_ids else [] + job_vals["args_json"] = self.check_serializable(list(args)) if args else [] + job_vals["kwargs_json"] = self.check_serializable(job_kwargs) + job = self.env["bg.job"].create(job_vals) + jobs |= job + # Link previous job to current so sequence is established in one pass + if prev_job: + prev_job.next_job_id = job.id + prev_job = job + self.sudo()._trigger_crons() - title = _("Process sent to background successfully") - message = _("You will be notified when it is done.") - return { - "type": "ir.actions.client", - "tag": "display_notification", - "params": { - "title": title, - "type": "success", - "message": message, - "next": {"type": "ir.actions.act_window_close"}, + title = _("Processes sent to background successfully") + message = _("You will be notified when they are done.") + return ( + { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": title, + "type": "success", + "message": message, + "next": {"type": "ir.actions.act_window_close"}, + }, }, - } + jobs, + ) + + def bg_enqueue(self, method: str, threshold: int | None = None, *args, **kwargs) -> tuple[dict, "BgJob"]: + """ + Instance-style enqueuing helper. + + Usage: + _inherit = ['base.bg', ...] + ... + records.bg_enqueue('method_name', threshold=..., ...) + + Delegates to the model API `bg_enqueue_records` using the calling recordset as the `records` parameter. + """ + return self.bg_enqueue_records(self, method, threshold, *args, **kwargs) def _trigger_crons(self): """ @@ -62,3 +117,33 @@ def _trigger_crons(self): crons = self.env["ir.cron"].search([("code", "ilike", code)]) for cron in crons: cron._trigger() + + @api.model + def is_serializable(self, value: Any) -> bool: + """ + Checks if a value is JSON serializable. + + :param value: The value to check + :return: True if serializable, False otherwise + """ + if isinstance(value, dict): + return all(self.is_serializable(k) and self.is_serializable(v) for k, v in value.items()) + if isinstance(value, (list, tuple)): + return all(self.is_serializable(item) for item in value) + try: + json.dumps(value) + return True + except Exception: + return False + + @api.model + def check_serializable(self, value: Any) -> Any: + """ + Ensures a value is JSON serializable. + + :param value: The value to check + :raises ValueError: If the value is not serializable + """ + if not self.is_serializable(value): + raise ValueError(_("Value %s is not JSON serializable") % repr(value)) + return value diff --git a/base_bg/models/bg_job.py b/base_bg/models/bg_job.py index 2d4f5e2b..2b0aed87 100644 --- a/base_bg/models/bg_job.py +++ b/base_bg/models/bg_job.py @@ -20,11 +20,12 @@ class BgJob(models.Model): name = fields.Char( string="Job Name", required=True, - help="Human readable job name", + readonly=True, ) state = fields.Selection( [ ("enqueued", "Enqueued"), + ("waiting", "Waiting For Previous Job"), ("running", "Running"), ("done", "Done"), ("failed", "Failed"), @@ -32,34 +33,43 @@ class BgJob(models.Model): ], default="enqueued", required=True, + help="Current state of the job", ) model = fields.Char( required=True, + readonly=True, help="The model name on which the job method will be executed", ) method = fields.Char( required=True, + readonly=True, help="The method name to be executed", ) args_json = fields.Json( + readonly=True, help="Positional arguments for the method call, serialized as JSON", ) kwargs_json = fields.Json( + readonly=True, help="Keyword arguments for the method call, serialized as JSON", ) context_json = fields.Json( + readonly=True, help="Context to be used when executing the job, serialized as JSON", ) priority = fields.Integer( default=10, + readonly=True, help="Job priority (lower number means higher priority)", ) max_retries = fields.Integer( default=3, + readonly=True, help="Maximum number of retry attempts", ) retry_count = fields.Integer( default=0, + readonly=True, help="Current number of retry attempts", ) start_time = fields.Datetime( @@ -80,8 +90,20 @@ class BgJob(models.Model): help="Job execution duration in seconds", ) error_message = fields.Text( + readonly=True, help="Error message from the last failed execution", ) + batch_key = fields.Char( + required=True, + readonly=True, + index=True, + help="Identifier for related jobs in a batch", + ) + next_job_id = fields.Many2one( + "bg.job", + readonly=True, + help="Next job in the batch sequence", + ) @api.depends("start_time", "end_time") def _compute_duration(self): @@ -99,12 +121,8 @@ def action_cancel(self): self.ensure_one() if self.state != "enqueued": raise UserError(_("Only enqueued jobs can be canceled")) - self.write( - { - "state": "canceled", - "cancel_time": fields.Datetime.now(), - } - ) + + (self | self._get_next_jobs()).cancel() def action_retry(self): """ @@ -113,13 +131,9 @@ def action_retry(self): self.ensure_one() if self.state != "failed": raise UserError(_("Only failed jobs can be retried")) - self.write( - { - "state": "enqueued", - "retry_count": 0, - "error_message": False, - } - ) + + self.enqueue(retry=True) + self._get_next_jobs().wait() def action_open_records(self) -> dict: """ @@ -138,6 +152,19 @@ def action_open_records(self) -> dict: "domain": [("id", "in", records.ids)], } + def action_open_batch_jobs(self) -> dict: + """ + Action to open all jobs in the same batch + """ + self.ensure_one() + return { + "name": _("Batch Jobs: %s", self.batch_key[:8]), + "type": "ir.actions.act_window", + "res_model": "bg.job", + "view_mode": "list,form", + "domain": [("batch_key", "=", self.batch_key)], + } + def run(self): """ Executes the job @@ -146,32 +173,23 @@ def run(self): if self.state != "enqueued": raise UserError(_("Only enqueued jobs can be executed")) - self.write( - { - "state": "running", - "start_time": fields.Datetime.now(), - } - ) + self.start() self.env.cr.commit() # pylint: disable=invalid-commit try: context = self.context_json or {} - context.update({"bg_job": True}) + context.update({"bg_job": True, "bg_job_id": self.id}) # Extract record IDs if present in kwargs or args model = self.env[self.model] args = self.args_json or [] kwargs = self.kwargs_json or {} - record_ids = kwargs.pop("_record_ids", None) + record_ids = kwargs.pop("_record_ids", []) records = model.browse(record_ids).with_context(**context).with_user(self.create_uid) - result = getattr(records, self.method)(*args, **kwargs) - self.write( - { - "state": "done", - "end_time": fields.Datetime.now(), - } - ) + # Execute the method and capture the result + result = getattr(records, self.method)(*args, **kwargs) + self.finish() if result: self._notify_user(result) self.env.cr.commit() # pylint: disable=invalid-commit @@ -180,7 +198,72 @@ def run(self): self._handle_job_error(e) raise - def _handle_job_error(self, error: Exception): + def enqueue(self, retry: bool = False): + """Mark the job as enqueued.""" + data = { + "state": "enqueued", + } + if retry: + data.update( + { + "retry_count": 0, + "error_message": False, + } + ) + self.write(data) + + def start(self): + """Mark the job as running and set the start time.""" + self.write( + { + "state": "running", + "start_time": fields.Datetime.now(), + } + ) + + def finish(self): + """ + Mark the job as done and set the end time. + Also enqueue the next job in the batch if it exists. + """ + self.write( + { + "state": "done", + "end_time": fields.Datetime.now(), + } + ) + self.filtered("next_job_id").mapped("next_job_id").enqueue() + self.env["base.bg"].sudo()._trigger_crons() + + def wait(self): + """Mark the job as waiting for the previous job to complete.""" + self.write( + { + "state": "waiting", + } + ) + + def fail(self, error_message: str): + """Mark the job as failed with an error message.""" + self.write( + { + "state": "failed", + "end_time": fields.Datetime.now(), + "error_message": error_message, + } + ) + + def cancel(self, message: str | None = None): + """Cancel the jobs received.""" + self.write( + { + "state": "canceled", + "cancel_time": fields.Datetime.now(), + "error_message": message, + } + ) + + def _handle_job_error(self, error: Exception | str): """ Handle job execution error @@ -189,21 +272,12 @@ def _handle_job_error(self, error: Exception): error_msg = str(error) self.retry_count += 1 if self.retry_count < self.max_retries: - self.write( - { - "state": "enqueued", - } - ) + self.enqueue() _logger.warning("Job %s failed, scheduling retry #%d: %s", self.name, self.retry_count, error_msg) else: # Max retries reached, mark as failed - self.write( - { - "state": "failed", - "end_time": fields.Datetime.now(), - "error_message": error_msg, - } - ) + self.fail(error_msg) + self._get_next_jobs().cancel(message=_("Previous job in batch failed")) _logger.error("Job %s failed permanently: %s", self.name, error_msg) def _notify_user(self, result: str): @@ -221,6 +295,20 @@ def _notify_user(self, result: str): subtype_xmlid="mail.mt_comment", ) + def _get_next_jobs(self) -> "BgJob": + """ + Get the next jobs in the same batch. + + :return: Recordset of next jobs in the batch + """ + self.ensure_one() + current_job = self + jobs = self.env["bg.job"] + while current_job.next_job_id: + jobs |= current_job.next_job_id + current_job = current_job.next_job_id + return jobs + @api.model def _cron_run_enqueued_jobs(self, limit: int = 5): """ @@ -261,6 +349,7 @@ def _cron_check_running_jobs(self): ] ) for job in jobs: - job.write({"state": "failed", "error_message": _("Job timed out")}) - message = _("Job %s timed out") % job.name - job._notify_user(message) + job._handle_job_error(_("Job timed out")) + if job.state == "failed": + message = _("Job %s timed out") % job._get_html_link(title=job.name) + job._notify_user(message) diff --git a/base_bg/tests/test_bg_job.py b/base_bg/tests/test_bg_job.py index 65dc88a0..873b40d9 100644 --- a/base_bg/tests/test_bg_job.py +++ b/base_bg/tests/test_bg_job.py @@ -4,8 +4,10 @@ ############################################################################## from datetime import timedelta from unittest.mock import patch +from uuid import uuid4 from odoo import fields, tools +from odoo.addons.base_bg.models.base_bg import BaseBg from odoo.exceptions import UserError from odoo.tests.common import TransactionCase @@ -15,6 +17,7 @@ def setUp(self): """Prepare environment references and keep cron timeout to restore later.""" super(TestBgJob, self).setUp() self.BgJob = self.env["bg.job"] + self.base_bg_model = self.env["base.bg"] self._limit_time_real_cron = tools.config.get("limit_time_real_cron", 120) def tearDown(self): @@ -32,10 +35,15 @@ def _create_job(self, **vals): "name": "Test Job", "model": "res.partner", "method": "exists", + "batch_key": str(uuid4()), } defaults.update(vals) return self.BgJob.create(defaults) + def _job_by_name(self, name): + """Locate a bg.job record by its name.""" + return self.BgJob.search([("name", "=", name)], limit=1) + def test_create_bg_job(self): """Basic test for job creation.""" job = self._create_job() @@ -47,10 +55,29 @@ def test_create_bg_job(self): def test_job_cancel(self): """Basic test for job cancellation.""" - job = self._create_job(name="Cancel Test Job") - - job.action_cancel() - self.assertEqual(job.state, "canceled") + # Create a chain of jobs + job1 = self._create_job(name="Cancel Test Job 1") + job2 = self._create_job(name="Cancel Test Job 2", state="waiting") + job3 = self._create_job(name="Cancel Test Job 3", state="waiting") + job1.next_job_id = job2 + job2.next_job_id = job3 + + # Cancel the first job + job1.action_cancel() + + # Refresh from DB + job1 = self.BgJob.browse(job1.id) + job2 = self.BgJob.browse(job2.id) + job3 = self.BgJob.browse(job3.id) + + # All jobs in the chain should be canceled + self.assertEqual(job1.state, "canceled") + self.assertEqual(job2.state, "canceled") + self.assertEqual(job3.state, "canceled") + # Canceled jobs should have cancel_time set + self.assertIsNotNone(job1.cancel_time) + self.assertIsNotNone(job2.cancel_time) + self.assertIsNotNone(job3.cancel_time) def test_job_retry(self): """Basic test for job retry.""" @@ -70,11 +97,12 @@ def test_cron_check_running_jobs(self): """Test cron method for checking timed out running jobs.""" # Create a job that appears to be running for too long old_time = fields.Datetime.now() - timedelta(hours=6) - job = self._create_job(name="Timed Out Job", state="running", start_time=old_time) + job = self._create_job(name="Timed Out Job", state="running", start_time=old_time, max_retries=1) # Run the cron method self._set_cron_timeout(300) - self.BgJob._cron_check_running_jobs() + with patch("odoo.addons.base_bg.models.bg_job._logger.error"): + self.BgJob._cron_check_running_jobs() # Refresh the job from database job = self.BgJob.browse(job.id) @@ -94,6 +122,14 @@ def test_cron_check_running_jobs_recent(self): job = self.BgJob.browse(job.id) self.assertEqual(job.state, "running") # Should still be running + def test_jobs_are_sorted_by_priority(self): + """Jobs with lower priority value should be returned first.""" + low_priority = self._create_job(name="Low Priority", priority=20) + high_priority = self._create_job(name="High Priority", priority=0) + + jobs = self.BgJob.search([("id", "in", [low_priority.id, high_priority.id])]) + self.assertEqual(jobs[0], high_priority) + def test_job_duration_computation(self): """Test that job duration is computed correctly.""" start_time = fields.Datetime.now() @@ -106,17 +142,6 @@ def test_job_duration_computation(self): ) self.assertEqual(job.duration, 30.0) - def test_jobs_are_sorted_by_priority(self): - """Jobs with lower priority value should be returned first.""" - low_priority = self._create_job(name="Low Priority", priority=20) - high_priority = self._create_job(name="High Priority", priority=0) - - jobs = self.BgJob.search( - [("id", "in", [low_priority.id, high_priority.id])], - order="priority, create_date desc", - ) - self.assertEqual(jobs[0], high_priority) - def test_action_open_records_returns_expected_domain(self): """The helper action must target the provided record IDs.""" partner_1 = self.env["res.partner"].create({"name": "Partner 1"}) @@ -190,3 +215,188 @@ def test_run_skip_notification_for_falsy_results(self): mock_method.assert_called_once() mock_notify.assert_not_called() + + def test_bg_enqueue_applies_custom_priority(self): + """bg_enqueue must propagate the provided priority into bg.job.""" + job_name = f"Priority Test Job {uuid4().hex}" + partners = self.env["res.partner"].create([{"name": "Test Partner 1"}]) + with patch.object(BaseBg, "_trigger_crons"): + self.env["base.bg"].bg_enqueue_records( + partners, + "dummy_priority_method", + name=job_name, + priority=3, + ) + + job = self._job_by_name(job_name) + self.assertTrue(job, "The priority test job should exist") + self.assertEqual(job.priority, 3) + + def test_bg_enqueue_filters_unserializable_context_entries(self): + """Only JSON-safe context keys should be stored in bg.job.""" + job_name = f"Context Test Job {uuid4().hex}" + partners = self.env["res.partner"].create([{"name": "Test Partner"}]) + with patch.object(BaseBg, "_trigger_crons"): + self.env["base.bg"].with_context( + serializable_flag="ok", + unserializable_env=self.env, + ).bg_enqueue_records(partners, "dummy_context_method", name=job_name) + + job = self._job_by_name(job_name) + self.assertTrue(job, "The context test job should exist") + self.assertEqual(job.context_json, {"serializable_flag": "ok"}) + + def test_jobs_linking_and_states_after_enqueue(self): + """Ensure bg_enqueue links jobs via next_job_id and sets states correctly.""" + partners = self.env["res.partner"].create([{"name": f"Partner {i}"} for i in range(3)]) + job_name = f"Linked Batch Job {uuid4().hex}" + with patch.object(BaseBg, "_trigger_crons"): + _, jobs = self.env["base.bg"].bg_enqueue_records(partners, "dummy_batch_method", threshold=1, name=job_name) + + self.assertEqual(len(jobs), 3) + # first must be enqueued and point to the next; others waiting + self.assertEqual(jobs[0].state, "enqueued") + self.assertEqual(jobs[0].next_job_id, jobs[1]) + self.assertEqual(jobs[1].state, "waiting") + self.assertEqual(jobs[1].next_job_id, jobs[2]) + self.assertEqual(jobs[2].state, "waiting") + + def test_bg_enqueue_records_creates_job_when_no_records(self): + """Calling bg_enqueue_records with no records must still create a job.""" + job_name = f"No Records Job {uuid4().hex}" + with patch.object(BaseBg, "_trigger_crons"): + _, job = self.env["base.bg"].bg_enqueue_records(self.env["res.partner"], "dummy_method", name=job_name) + + self.assertTrue(job, "A job should have been created even with no records") + self.assertEqual(job.state, "enqueued") + self.assertEqual(job.kwargs_json.get("_record_ids"), []) + + def test_bg_enqueue_records_splits_by_threshold(self): + """bg_enqueue_records must split partner records into multiple jobs by threshold.""" + partners = self.env["res.partner"].create([{"name": f"Partner {i}"} for i in range(5)]) + threshold = 2 + job_name = f"Batch Partners {uuid4().hex}" + + with patch.object(BaseBg, "_trigger_crons"): + _, jobs = self.env["base.bg"].bg_enqueue_records( + partners, "dummy_batch_method", threshold=threshold, name=job_name + ) + + # Expect 3 jobs: 2,2,1 + self.assertEqual(len(jobs), 3) + batch_key = jobs[0].batch_key + self.assertTrue(batch_key) + + sizes = [len(j.kwargs_json.get("_record_ids", [])) for j in jobs] + self.assertEqual(sizes, [2, 2, 1]) + + for i, job in enumerate(jobs): + self.assertEqual(job.batch_key, batch_key) + self.assertEqual(job.model, "res.partner") + self.assertEqual(job.method, "dummy_batch_method") + if i == 0: + self.assertEqual(job.state, "enqueued") + else: + self.assertEqual(job.state, "waiting") + if i < len(jobs) - 1: + self.assertEqual(job.next_job_id, jobs[i + 1]) + + def test_fail_first_job_cancels_following_batch_jobs(self): + """When a job fails permanently, all next jobs in the same batch are canceled.""" + batch_key = str(uuid4()) + # Create three linked jobs in the same batch + job1 = self._create_job(name="Failing Job", batch_key=batch_key, state="enqueued", max_retries=1) + job2 = self._create_job(name="Next Job 1", batch_key=batch_key, state="waiting") + job3 = self._create_job(name="Next Job 2", batch_key=batch_key, state="waiting") + job1.next_job_id = job2.id + job2.next_job_id = job3.id + + # Force the job to be considered at its final retry and trigger error handling + job1.write({"retry_count": job1.max_retries - 1}) + + # Call the handler to simulate a permanent failure + with patch("odoo.addons.base_bg.models.bg_job._logger.error"): + job1._handle_job_error("Permanent failure") + + # Refresh records from DB + job1 = self.BgJob.browse(job1.id) + job2 = self.BgJob.browse(job2.id) + job3 = self.BgJob.browse(job3.id) + + self.assertEqual(job1.state, "failed") + self.assertEqual(job2.state, "canceled") + self.assertEqual(job3.state, "canceled") + # Canceled jobs must have a cancel_time and an explanatory error_message + self.assertIsNotNone(job2.cancel_time) + + def test_bg_enqueue_helper_delegates_to_bg_enqueue_records(self): + """bg_enqueue helper must delegate to bg_enqueue_records with self as records.""" + job_name = f"Helper Test Job {uuid4().hex}" + with patch.object(type(self.base_bg_model), "_trigger_crons"), patch.object( + type(self.base_bg_model), "bg_enqueue_records" + ) as mock_enqueue_records: + self.env["base.bg"].bg_enqueue("dummy_method", threshold=5, name=job_name, priority=2) + + mock_enqueue_records.assert_called_once_with(self.env["base.bg"], "dummy_method", 5, name=job_name, priority=2) + + def test_is_serializable_filters_json_safe_values(self): + """is_serializable must return True for JSON serializable values and False otherwise.""" + base_bg = self.env["base.bg"] + self.assertTrue(base_bg.is_serializable("string")) + self.assertTrue(base_bg.is_serializable(123)) + self.assertTrue(base_bg.is_serializable([1, 2, 3])) + self.assertTrue(base_bg.is_serializable({"key": "value"})) + self.assertFalse(base_bg.is_serializable(self.env)) # Environment object + self.assertFalse(base_bg.is_serializable(lambda x: x)) # Function + + def test_get_next_jobs_returns_chained_jobs(self): + """_get_next_jobs must return all subsequent jobs in the batch chain.""" + job1 = self._create_job(name="Job 1", batch_key="test-batch") + job2 = self._create_job(name="Job 2", batch_key="test-batch") + job3 = self._create_job(name="Job 3", batch_key="test-batch") + job1.next_job_id = job2 + job2.next_job_id = job3 + + next_jobs = job1._get_next_jobs() + self.assertEqual(next_jobs, job2 | job3) + + def test_job_completion_enqueues_next_job(self): + """When a job completes successfully, the next job in batch must be enqueued.""" + batch_key = str(uuid4()) + partner = self.env["res.partner"].create({"name": "Test Partner"}) + job1 = self._create_job( + name="First Job", + batch_key=batch_key, + state="enqueued", + kwargs_json={"_record_ids": [partner.id]}, + ) + job2 = self._create_job( + name="Next Job", + batch_key=batch_key, + state="waiting", + ) + job1.next_job_id = job2 + + # Simulate job run completion + with patch.object(self.env.cr, "commit"): + job1.run() + + job2.invalidate_recordset() + self.assertEqual(job2.state, "enqueued") + + def test_check_serializable(self): + """check_serializable must raise ValueError for unserializable objects.""" + base_bg = self.env["base.bg"] + # env is not serializable + dict_data = { + "serializable": "ok", + "unserializable": self.env, + } + list_data = ["ok", self.env, 123] + function_data = lambda x: x + with self.assertRaises(ValueError): + base_bg.check_serializable(dict_data) + with self.assertRaises(ValueError): + base_bg.check_serializable(list_data) + with self.assertRaises(ValueError): + base_bg.check_serializable(function_data) diff --git a/base_bg/views/bg_job_views.xml b/base_bg/views/bg_job_views.xml index deb2c2f2..2ef3bd06 100644 --- a/base_bg/views/bg_job_views.xml +++ b/base_bg/views/bg_job_views.xml @@ -16,10 +16,11 @@ + - +