Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions base_import_async/models/base_import_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ def do(self, fields, options, dryrun=False):
(translated_model_name, self.file_name)
attachment = self._create_csv_attachment(
import_fields, data, options, self.file_name)
delayed_job = self.with_delay(description=description)._split_file(
delayed_job = self.with_delay(
description=description,
keep_context=True
)._split_file(
model_name=self.res_model,
translated_model_name=translated_model_name,
attachment=attachment,
Expand Down Expand Up @@ -166,8 +169,11 @@ def _split_file(self, model_name, translated_model_name,
fields, data[row_from:row_to + 1], options,
file_name=root + '-' + chunk + ext)
delayed_job = self.with_context(
job_batch=batch).with_delay(
description=description, priority=priority)._import_one_chunk(
job_batch=batch.id).with_delay(
description=description,
priority=priority,
keep_context=True
)._import_one_chunk(
model_name=model_name,
attachment=attachment,
options=options)
Expand Down
8 changes: 8 additions & 0 deletions queue_job/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ def convert_to_cache(self, value, record, validate=True):
class JobEncoder(json.JSONEncoder):
"""Encode Odoo recordsets so that we can later recompose them"""

def _get_record_context(self, obj):
context = obj.env.context.copy()
return context

def default(self, obj):
if isinstance(obj, models.BaseModel):
context = self._get_record_context(obj)
return {'_type': 'odoo_recordset',
'model': obj._name,
'ids': obj.ids,
'uid': obj.env.uid,
'context': context,
}
elif isinstance(obj, datetime):
return {'_type': 'datetime_isoformat',
Expand Down Expand Up @@ -61,6 +67,8 @@ def object_hook(self, obj):
type_ = obj['_type']
if type_ == 'odoo_recordset':
model = self.env[obj['model']]
if obj.get("context"):
model = model.with_context(**obj.get("context"))
if obj.get('uid'):
model = model.sudo(obj['uid'])
return model.browse(obj['ids'])
Expand Down
58 changes: 52 additions & 6 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import functools
import hashlib
import logging
import json
import yaml
import uuid
import sys
from datetime import datetime, timedelta

import odoo
from odoo.addons.queue_job.fields import JobEncoder, JobDecoder

from .exception import (NoSuchJobError,
FailedJobError,
Expand Down Expand Up @@ -55,14 +58,15 @@ class DelayableRecordset(object):

def __init__(self, recordset, priority=None, eta=None,
max_retries=None, description=None, channel=None,
identity_key=None):
identity_key=None, keep_context=False):
self.recordset = recordset
self.priority = priority
self.eta = eta
self.max_retries = max_retries
self.description = description
self.channel = channel
self.identity_key = identity_key
self.keep_context = keep_context

def __getattr__(self, name):
if name in self.recordset:
Expand All @@ -87,7 +91,9 @@ def delay(*args, **kwargs):
eta=self.eta,
description=self.description,
channel=self.channel,
identity_key=self.identity_key)
identity_key=self.identity_key,
keep_context=self.keep_context
)
return delay

def __str__(self):
Expand Down Expand Up @@ -297,6 +303,7 @@ def _load_from_db_record(cls, job_db_record):
if stored.company_id:
job_.company_id = stored.company_id.id
job_.identity_key = stored.identity_key
job_.keep_context = stored.context or {}
return job_

def job_record_with_same_identity_key(self):
Expand All @@ -311,7 +318,7 @@ def job_record_with_same_identity_key(self):
@classmethod
def enqueue(cls, func, args=None, kwargs=None,
priority=None, eta=None, max_retries=None, description=None,
channel=None, identity_key=None):
channel=None, identity_key=None, keep_context=None):
"""Create a Job and enqueue it in the queue. Return the job uuid.

This expects the arguments specific to the job to be already extracted
Expand All @@ -324,7 +331,8 @@ def enqueue(cls, func, args=None, kwargs=None,
new_job = cls(func=func, args=args,
kwargs=kwargs, priority=priority, eta=eta,
max_retries=max_retries, description=description,
channel=channel, identity_key=identity_key)
channel=channel, identity_key=identity_key,
keep_context=keep_context)
if new_job.identity_key:
existing = new_job.job_record_with_same_identity_key()
if existing:
Expand Down Expand Up @@ -355,7 +363,8 @@ def db_record_from_uuid(env, job_uuid):
def __init__(self, func,
args=None, kwargs=None, priority=None,
eta=None, job_uuid=None, max_retries=None,
description=None, channel=None, identity_key=None):
description=None, channel=None,
identity_key=None, keep_context=False):
""" Create a Job

:param func: function to execute
Expand All @@ -381,6 +390,8 @@ def __init__(self, func,
as argument)
:param env: Odoo Environment
:type env: :class:`odoo.api.Environment`
:param keep_context: Determine if the current context should be restored
:type keep_context: :bool or list
"""
if args is None:
args = ()
Expand All @@ -397,6 +408,7 @@ def __init__(self, func,

recordset = func.__self__
env = recordset.env
self.keep_context = keep_context
self.model_name = recordset._name
self.method_name = func.__name__
self.recordset = recordset
Expand Down Expand Up @@ -500,6 +512,10 @@ def store(self):
}

dt_to_string = odoo.fields.Datetime.to_string
context = {}
if self.keep_context:
context = self.env.context.copy()
vals.update({"context": json.dumps(context, cls=JobEncoder, skipkeys=True)})
if self.date_enqueued:
vals['date_enqueued'] = dt_to_string(self.date_enqueued)
if self.date_started:
Expand All @@ -516,6 +532,10 @@ def store(self):
db_record.write(vals)
else:
date_created = dt_to_string(self.date_created)
# We store the original context used at import on create
ctx = self.env.context.copy() or '{}'
vals.update({'original_context': json.dumps(
ctx, cls=JobEncoder, skipkeys=True) or ''})
# The following values must never be modified after the
# creation of the job
vals.update({'uuid': self.uuid,
Expand All @@ -537,9 +557,35 @@ def store(self):
def db_record(self):
return self.db_record_from_uuid(self.env, self.uuid)

def _get_abs_context(self, original_ctx, ctx):
try:
import_ctx = json.loads(original_ctx, cls=JobDecoder, env=self.env)
current_ctx = json.loads(ctx, cls=JobDecoder, env=self.env)
except Exception as e:
_logger.error("\n\nERROR CONTEXT JSON CONVERSION: %s\n\n" % e)
return self.env.context.copy()
else:
if isinstance(import_ctx, dict) and isinstance(current_ctx, dict):
import_ctx.update(current_ctx)
return import_ctx
return self.env.context.copy()

def _get_record_context(self):
"""
Get the context to execute the job
"""
ctx = self._get_abs_context(self.db_record().original_context,
self.db_record().context)
if self.company_id:
ctx.update({'allowed_company_ids': [self.company_id]})
if self.uuid:
ctx.update({"job_uuid": self.uuid})
return ctx

@property
def func(self):
recordset = self.recordset.with_context(job_uuid=self.uuid)
context = self._get_record_context()
recordset = self.recordset.with_context(**context)
recordset = recordset.sudo(self.user_id)
return getattr(recordset, self.method_name)

Expand Down
9 changes: 7 additions & 2 deletions queue_job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _register_hook(self):
@api.multi
def with_delay(self, priority=None, eta=None,
max_retries=None, description=None,
channel=None, identity_key=None):
channel=None, identity_key=None, keep_context=False):
""" Return a ``DelayableRecordset``

The returned instance allow to enqueue any method of the recordset's
Expand Down Expand Up @@ -62,6 +62,9 @@ def with_delay(self, priority=None, eta=None,
:param identity_key: key uniquely identifying the job, if specified
and a job with the same key has not yet been run,
the new job will not be added.
:param keep_context: boolean to set if the current context
should be restored on the recordset
(default: False).
:return: instance of a DelayableRecordset
:rtype: :class:`odoo.addons.queue_job.job.DelayableRecordset`

Expand Down Expand Up @@ -90,4 +93,6 @@ def with_delay(self, priority=None, eta=None,
max_retries=max_retries,
description=description,
channel=channel,
identity_key=identity_key)
identity_key=identity_key,
keep_context=keep_context
)
12 changes: 12 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ class QueueJob(models.Model):
company_id = fields.Many2one(comodel_name='res.company',
string='Company', index=True)
name = fields.Char(string='Description', readonly=True)
context = fields.Char(
string="Context Value",
default="{}",
help="Context dictionary as Python expression, empty by default "
"(Default: {})",
readonly=True,
)
original_context = fields.Char(
string="Original Context Value",
default="{}",
help="This is the context dictionary that was used on import"
)

model_name = fields.Char(string='Model', readonly=True)
method_name = fields.Char(readonly=True)
Expand Down
1 change: 1 addition & 0 deletions queue_job/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from . import test_runner_runner
from . import test_json_field
from . import test_model_job_channel

1 change: 1 addition & 0 deletions queue_job/tests/test_json_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_encoder_recordset(self):
"_type": "odoo_recordset",
"model": "res.partner",
"ids": [partner.id],
"context": {},
}]
self.assertEqual(json.loads(value_json), expected)

Expand Down
4 changes: 4 additions & 0 deletions queue_job/views/queue_job_views.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
<field name="date_done"/>
</group>
</group>
<group name="context_grp">
<field name="context"/>
<field name="original_context"/>
</group>
<group colspan="4">
<div>
<label for="retry" string="Current try / max. retries" />
Expand Down
2 changes: 1 addition & 1 deletion test_queue_job_batch/tests/test_queue_job_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def test_batch(self):
batch = self.env['queue.job.batch'].get_new_batch('TEST')
self.assertFalse(batch.job_ids)
model = self.env['test.queue.job'].with_context(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thomaspaulb the culprit was such line which made the conversion have issues when we assign batch record instead of the batch.id

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I think we were going about this totally the wrong way, because we did not understand the original PR.

Instead of using a text or JSON field, they're actually (JSON-)encoding the original context in a variable keep_context using the JobEncoder, which also encodes other details about the job.

Then instead of using self.keep_context as a boolean flag in the way we're doing now, they're using:

               if isinstance(self.keep_context, list):
                    context = {k: context.get(k) for k in self.keep_context}

So this then reinstates the context keys that were saved in that variable, one by one.

I guess if that works, then it avoids 1) Deviating from the other PR and 2) spending time getting our own JSON conversion right

The only comment on the current original PR was that by default it does not keep context, and most reviewers want it to keep context by default, and I agree with that too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already takes care of properly encoding and decoding Odoo recordsets:

https://github.com/OCA/queue/blob/11.0/queue_job/fields.py#L31

job_batch=batch
job_batch=batch.id
)
job_1 = model.with_delay().testing_method()
self.assertEqual(job_1.db_record().state, 'pending')
Expand Down