Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions galicaster/classui/managerui.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,11 @@ def ingest_question(self,package):

elif 0 < operations_dialog.response <= len(response_list):
chosen_job = response_list[operations_dialog.response-1].lower().replace (" ", "")
params = package.getProperty('enqueue_params')
if chosen_job.count('nightly'):
context.get_worker().do_job_nightly(chosen_job.replace("_",""), package)
context.get_worker().do_job_nightly(chosen_job.replace("_",""), package, params)
else:
context.get_worker().do_job(chosen_job, package)
context.get_worker().do_job(chosen_job, package, params)
return True

else:
Expand Down
21 changes: 11 additions & 10 deletions galicaster/core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import os
import tempfile
import Queue
import json

from datetime import datetime

Expand Down Expand Up @@ -236,7 +235,7 @@ def enqueue_nightly_job_by_name(self, operation, mp, params={}):
"""
self.logger.info("Set nightly operation {} for MP {}".format(operation, mp.getIdentifier()))
mp.setOpStatus(operation,mediapackage.OP_NIGHTLY)
mp.setProperty("enqueue_params", json.dumps(params))
mp.setProperty("enqueue_params", params)
self.repo.update(mp)
self.dispatcher.emit('action-mm-refresh-row', mp.identifier)

Expand All @@ -258,7 +257,7 @@ def do_job(self, name, mp, params={}):


def do_job_nightly(self, name, mp, params={}):
"""Calls cancel_nightly or operation_nithly depending on the argument's value.
"""Calls cancel_nightly or operation_nightly depending on the argument's value.
Args:
name (str): the name of a nightly operation. It must contain the word "cancel" in order to cancel the operation.
mp (Mediapackage): the mediapackage.
Expand Down Expand Up @@ -297,17 +296,21 @@ def gen_location(self, extension):
return os.path.join(self.export_path, name + '.' + extension)


def _ingest(self, mp, params={}):
def _ingest(self, mp, params=None):
"""Tries to immediately ingest the mediapackage into opencast.
If the ingest cannot be done, logger prints it properly.
Args:
mp(Mediapackage): the mediapackage to be immediately ingested.
mp (Mediapackage): the mediapackage to be immediately ingested.
params (dict): workflow and workflow_parameters for ingesting.
"""
if not self.oc_client:
raise Exception('Opencast client is not enabled')

workflow = None if not "workflow" in params else params["workflow"]
workflow_parameters = None if not "workflow_parameters" in params else params["workflow_parameters"]
workflow = None
workflow_parameters = None
if isinstance(params, dict):
workflow = params.get('workflow')
workflow_parameters = params.get('workflow_parameters')

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is a good way to simplify the conditionals here as you can't have workflow without workflow_parameters i believe

self.dispatcher.emit('action-mm-refresh-row', mp.identifier)

Expand Down Expand Up @@ -501,7 +504,5 @@ def exec_nightly(self, sender=None):
for mp in self.repo.values():
for (op_name, op_status) in mp.operation.iteritems():
if op_status == mediapackage.OP_NIGHTLY:
params = {}
if mp.getProperty("enqueue_params"):
params = json.loads(mp.getProperty("enqueue_params"))
params = mp.getProperty("enqueue_params")
self.enqueue_job_by_name(op_name, mp, params)
38 changes: 28 additions & 10 deletions galicaster/mediapackage/mediapackage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@
the Element superclass.
"""

import json
import uuid
import time
import os
from os import path
from datetime import datetime
from xml.dom import minidom

from galicaster.core import context
from galicaster.mediapackage.utils import _checknget, read_ini

from galicaster.utils.mediainfo import get_duration
from galicaster.utils.i18n import _

logger = context.get_logger()

# Mediapackage Status
NEW = 0
UNSCHEDULED = 1
Expand Down Expand Up @@ -1520,26 +1525,39 @@ def getOCCaptureAgentProperties(self):

def setProperty(self, prop=None, value=None):
"""Sets the value of a property in the mediapackage.
The enqueue_params property will be JSON serialized.
Args:
prop (str): the name of the property.
value (str): the new value of the property name.
value (str): the new value of the property.
"""
if not prop or value is None:
return None
else:
self.properties[prop] = value
return True
property_value = value
if prop == "enqueue_params":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allow for this kind of API:
media_package.setProperty('enqueue_params', {'workflow': 'test'})
What we need to do is to support serializing/deserializing objects on the serializer/deserializer, not here.

Also, this support should be added generically, not just for an "enqueue_params" property.

Essentially, to add this functionality what we should do is to support dictionary properties, at least. And serialize/deserialize them on the corresponding classes. The setProperty function should not be the one carrying this logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Alfro, I agree but went with the minimal changes to provide this functionality for enqueue_params as I wasn't sure how much else would be impacted by a bigger change to serialize/deserialize other properties as well.
The file where this data is stored has changed in master from xml to json (not part of a built release yet as far as I'm aware) so that should make things easier.
Due to some changes for us I'm not sure if/when I'll have time to work on this next and I'm unlikely to be able to devote much time to development so someone else may want to continue with this.

try:
property_value = json.dumps(value)
except Exception:
logger.warn('Unable to serialize property {} of media package {}'.format(prop, self.getIdentifier()))
return False
self.properties[prop] = property_value
return True


def getProperty(self, prop=None):
"""Gets the property specified of the mediapackage.
When specifying the enqueue_params property the object returned will be
the result of JSON deserialization.
See https://docs.python.org/2/library/json.html#encoders-and-decoders
Returns:
Obj: the object with the value of the property specified
"""
if not prop:
return None
else:
if prop in self.properties:
return self.properties[prop]
else:
if prop and prop in self.properties:
property_string = self.properties[prop]
if prop == "enqueue_params":
try:
return json.loads(property_string)
except Exception:
logger.warn('Unable to deserialize property {} of media package {}'.format(prop, self.getIdentifier()))
return None
return property_string
return None
5 changes: 3 additions & 2 deletions galicaster/recorder/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,11 @@ def __close_mp(self):
def enqueue_ingest(self, mp):
if self.conf.get_boolean("ingest", "active"):
code = 'manual' if mp.manual else 'scheduled'
params = mp.getProperty("enqueue_params")
if self.conf.get_lower('ingest', code) == 'immediately':
self.worker.enqueue_job_by_name('ingest', mp)
self.worker.enqueue_job_by_name('ingest', mp, params)
elif self.conf.get_lower('ingest', code) == 'nightly':
self.worker.enqueue_nightly_job_by_name('ingest', mp)
self.worker.enqueue_nightly_job_by_name('ingest', mp, params)

def pause(self):
self.logger.info("Pausing recorder")
Expand Down
4 changes: 2 additions & 2 deletions tests/recorder/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
class TestFunctions(TestCase):

class WorkerMock(object):
def enqueue_job_by_name(self, operation, mp):
def enqueue_job_by_name(self, operation, mp, params):
pass
def enqueue_nightly_job_by_name(self, operation, mp):
def enqueue_nightly_job_by_name(self, operation, mp, params):
pass


Expand Down