Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scale/job/messages/completed_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def process_completed_jobs_with_output(job_ids, when):
completed_job_ids_with_output = Job.objects.process_job_output(job_ids, when)
if completed_job_ids_with_output:
logger.info('Found %d COMPLETED job(s) with output', len(completed_job_ids_with_output))
logger.debug('COMPLETED job(s) with output: {}'.format(completed_job_ids_with_output))

# Create messages to update recipes
from recipe.messages.update_recipe import create_update_recipe_messages_from_node
Expand All @@ -84,6 +85,7 @@ def process_completed_jobs_with_output(job_ids, when):
for job_id in completed_job_ids_with_output:
messages.append(create_publish_job_message(job_id))

logger.debug('Created messages for completed jobs: {}'.format(messages))
return messages


Expand Down
2 changes: 2 additions & 0 deletions scale/job/messages/publish_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ def execute(self):
# Retrieve job and job_exe models
job_exe = JobExecution.objects.get_latest_execution(self.job_id)
job = job_exe.job
logger.debug('Fetched job execution ({}) and job ({})'.format(job_exe, job))

# Publish this job's products
from product.models import ProductFile
logger.debug('Calling publish_products: ({}, {}, {})'.format(job_exe.id, job, when_published))
ProductFile.objects.publish_products(job_exe.id, job, when_published)

return True
12 changes: 8 additions & 4 deletions scale/product/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class ProductFileManager(models.GeoManager):
"""

def filter_products(self, started=None, ended=None, time_field=None, job_type_ids=None, job_type_names=None,
job_type_categories=None, job_ids=None, is_operational=None, is_published=None,
is_superseded=None, file_name=None, job_output=None, recipe_ids=None, recipe_type_ids=None,
job_type_categories=None, job_ids=None, is_operational=None, is_published=None,
is_superseded=None, file_name=None, job_output=None, recipe_ids=None, recipe_type_ids=None,
recipe_job=None, batch_ids=None, order=None):
"""Returns a query for product models that filters on the given fields. The returned query includes the related
workspace, job_type, and job fields, except for the workspace.json_config field. The related countries are set
Expand Down Expand Up @@ -338,7 +338,7 @@ def get_products(self, started=None, ended=None, time_field=None, job_type_ids=N
recipe_ids=recipe_ids, recipe_type_ids=recipe_type_ids, recipe_job=recipe_job,
batch_ids=batch_ids, order=order)

def get_product_sources(self, product_file_id, started=None, ended=None, time_field=None, is_parsed=None,
def get_product_sources(self, product_file_id, started=None, ended=None, time_field=None, is_parsed=None,
file_name=None, order=None):
"""Returns a query for the list of sources that produced the given product file ID.

Expand Down Expand Up @@ -456,6 +456,7 @@ def publish_products(self, job_exe_id, job, when):
:param when: When the products were published
:type when: :class:`datetime.datetime`
"""
logger.debug('Publishing products for job: {}'.format(job.__dict__))

# Don't publish products if the job is already superseded
if job.is_superseded:
Expand All @@ -469,13 +470,16 @@ def publish_products(self, job_exe_id, job, when):
uuids = []
for product_file in self.filter(job_exe_id=job_exe_id):
uuids.append(product_file.uuid)
logger.debug('UUIDs of new products to publish: {}'.format(uuids))

# Supersede products with the same UUIDs (a given UUID should only appear once in the product API calls)
if uuids:
query = self.filter(uuid__in=uuids, has_been_published=True)
logger.debug('Supersede products: {}'.format(query.values()))
query.update(is_published=False, is_superseded=True, superseded=when, last_modified=timezone.now())

# Publish this job execution's products
logger.debug('Publishing job exec products: {}'.format(job_exe_id))
self.filter(job_exe_id=job_exe_id).update(has_been_published=True, is_published=True, published=when,
last_modified=timezone.now())

Expand Down Expand Up @@ -594,7 +598,7 @@ def upload_files(self, file_entries, input_file_ids, job_exe, workspace):
product.recipe_node = job_recipe.node_name

# Add batch info to product if available.

if job_exe.batch:
product.batch_id = job_exe.batch.id
elif job_exe.job.batch:
Expand Down