From 581baf49010e0a1faca3c81b5f78457b8e14ebaf Mon Sep 17 00:00:00 2001
From: Paul Gration
Date: Wed, 31 Jan 2018 15:35:32 +0000
Subject: [PATCH 1/5] Move serializing/deserializing of enqueue_params property
to media package
---
galicaster/core/worker.py | 9 ++----
galicaster/mediapackage/mediapackage.py | 38 ++++++++++++++++++-------
2 files changed, 31 insertions(+), 16 deletions(-)
diff --git a/galicaster/core/worker.py b/galicaster/core/worker.py
index f763c1ae..bcbe3ed6 100644
--- a/galicaster/core/worker.py
+++ b/galicaster/core/worker.py
@@ -14,7 +14,6 @@
import os
import tempfile
import Queue
-import json
from datetime import datetime
@@ -236,7 +235,7 @@ def enqueue_nightly_job_by_name(self, operation, mp, params={}):
"""
self.logger.info("Set nightly operation {} for MP {}".format(operation, mp.getIdentifier()))
mp.setOpStatus(operation,mediapackage.OP_NIGHTLY)
- mp.setProperty("enqueue_params", json.dumps(params))
+ mp.setProperty("enqueue_params", params)
self.repo.update(mp)
self.dispatcher.emit('action-mm-refresh-row', mp.identifier)
@@ -258,7 +257,7 @@ def do_job(self, name, mp, params={}):
def do_job_nightly(self, name, mp, params={}):
- """Calls cancel_nightly or operation_nithly depending on the argument's value.
+ """Calls cancel_nightly or operation_nightly depending on the argument's value.
Args:
name (str): the name of a nightly operation. It must contain the word "cancel" in order to cancel the operation.
mp (Mediapackage): the mediapackage.
@@ -501,7 +500,5 @@ def exec_nightly(self, sender=None):
for mp in self.repo.values():
for (op_name, op_status) in mp.operation.iteritems():
if op_status == mediapackage.OP_NIGHTLY:
- params = {}
- if mp.getProperty("enqueue_params"):
- params = json.loads(mp.getProperty("enqueue_params"))
+ params = mp.getProperty("enqueue_params")
self.enqueue_job_by_name(op_name, mp, params)
diff --git a/galicaster/mediapackage/mediapackage.py b/galicaster/mediapackage/mediapackage.py
index 15f50ad0..fd22d36a 100644
--- a/galicaster/mediapackage/mediapackage.py
+++ b/galicaster/mediapackage/mediapackage.py
@@ -19,17 +19,22 @@
the Element superclass.
"""
+import json
import uuid
import time
import os
from os import path
from datetime import datetime
from xml.dom import minidom
+
+from galicaster.core import context
from galicaster.mediapackage.utils import _checknget, read_ini
from galicaster.utils.mediainfo import get_duration
from galicaster.utils.i18n import _
+logger = context.get_logger()
+
# Mediapackage Status
NEW = 0
UNSCHEDULED = 1
@@ -1520,26 +1525,39 @@ def getOCCaptureAgentProperties(self):
def setProperty(self, prop=None, value=None):
"""Sets the value of a property in the mediapackage.
+ The enqueue_params property will be JSON serialized.
Args:
prop (str): the name of the property.
- value (str): the new value of the property name.
+ value (str): the new value of the property.
"""
if not prop or value is None:
return None
- else:
- self.properties[prop] = value
- return True
+ property_value = value
+ if prop == "enqueue_params":
+ try:
+ property_value = json.dumps(value)
+ except Exception:
+ logger.warn('Unable to serialize property {} of media package {}'.format(prop, self.getIdentifier()))
+ return False
+ self.properties[prop] = property_value
+ return True
def getProperty(self, prop=None):
"""Gets the property specified of the mediapackage.
+ When specifying the enqueue_params property the object returned will be
+ the result of JSON deserialization.
+ See https://docs.python.org/2/library/json.html#encoders-and-decoders
Returns:
Obj: the object with the value of the property specified
"""
- if not prop:
- return None
- else:
- if prop in self.properties:
- return self.properties[prop]
- else:
+ if prop and prop in self.properties:
+ property_string = self.properties[prop]
+ if prop == "enqueue_params":
+ try:
+ return json.loads(property_string)
+ except Exception:
+ logger.warn('Unable to deserialize property {} of media package {}'.format(prop, self.getIdentifier()))
return None
+ return property_string
+ return None
From b98983a5c34e3de7569960c2314c0a5fb141c9fa Mon Sep 17 00:00:00 2001
From: Paul Gration
Date: Wed, 31 Jan 2018 15:43:15 +0000
Subject: [PATCH 2/5] Ensure enqueue_params is used when ingesting from
repository via managerui
---
galicaster/classui/managerui.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/galicaster/classui/managerui.py b/galicaster/classui/managerui.py
index 7c181125..97458c54 100644
--- a/galicaster/classui/managerui.py
+++ b/galicaster/classui/managerui.py
@@ -224,10 +224,11 @@ def ingest_question(self,package):
elif 0 < operations_dialog.response <= len(response_list):
chosen_job = response_list[operations_dialog.response-1].lower().replace (" ", "")
+ params = package.getProperty('enqueue_params')
if chosen_job.count('nightly'):
- context.get_worker().do_job_nightly(chosen_job.replace("_",""), package)
+ context.get_worker().do_job_nightly(chosen_job.replace("_",""), package, params)
else:
- context.get_worker().do_job(chosen_job, package)
+ context.get_worker().do_job(chosen_job, package, params)
return True
else:
From d45666ab75f0166a7dc4417074b6e2ef2e39c3cf Mon Sep 17 00:00:00 2001
From: Paul Gration
Date: Wed, 31 Jan 2018 15:44:37 +0000
Subject: [PATCH 3/5] Ensure enqueue_params are used when ingesting via
recorder service
---
galicaster/recorder/service.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/galicaster/recorder/service.py b/galicaster/recorder/service.py
index 41e56dac..8eb4a04d 100644
--- a/galicaster/recorder/service.py
+++ b/galicaster/recorder/service.py
@@ -219,10 +219,11 @@ def __close_mp(self):
def enqueue_ingest(self, mp):
if self.conf.get_boolean("ingest", "active"):
code = 'manual' if mp.manual else 'scheduled'
+ params = mp.getProperty("enqueue_params")
if self.conf.get_lower('ingest', code) == 'immediately':
- self.worker.enqueue_job_by_name('ingest', mp)
+ self.worker.enqueue_job_by_name('ingest', mp, params)
elif self.conf.get_lower('ingest', code) == 'nightly':
- self.worker.enqueue_nightly_job_by_name('ingest', mp)
+ self.worker.enqueue_nightly_job_by_name('ingest', mp, params)
def pause(self):
self.logger.info("Pausing recorder")
From 5128c687692e5b554c6f58c1def607a3b07b8540 Mon Sep 17 00:00:00 2001
From: Paul Gration
Date: Wed, 31 Jan 2018 15:46:09 +0000
Subject: [PATCH 4/5] Improve getting of workflow parameters when ingesting
---
galicaster/core/worker.py | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/galicaster/core/worker.py b/galicaster/core/worker.py
index bcbe3ed6..c7449435 100644
--- a/galicaster/core/worker.py
+++ b/galicaster/core/worker.py
@@ -296,17 +296,21 @@ def gen_location(self, extension):
return os.path.join(self.export_path, name + '.' + extension)
- def _ingest(self, mp, params={}):
+ def _ingest(self, mp, params=None):
"""Tries to immediately ingest the mediapackage into opencast.
If the ingest cannot be done, logger prints it properly.
Args:
- mp(Mediapackage): the mediapackage to be immediately ingested.
+ mp (Mediapackage): the mediapackage to be immediately ingested.
+ params (dict): workflow and workflow_parameters for ingesting.
"""
if not self.oc_client:
raise Exception('Opencast client is not enabled')
- workflow = None if not "workflow" in params else params["workflow"]
- workflow_parameters = None if not "workflow_parameters" in params else params["workflow_parameters"]
+ workflow = None
+ workflow_parameters = None
+ if isinstance(params, dict):
+ workflow = params.get('workflow')
+ workflow_parameters = params.get('workflow_parameters')
self.dispatcher.emit('action-mm-refresh-row', mp.identifier)
From 39697ee6fb0cda7b5f6ae83d72b21e640aab9d92 Mon Sep 17 00:00:00 2001
From: Paul Gration
Date: Mon, 5 Feb 2018 12:31:36 +0000
Subject: [PATCH 5/5] Update worker tests to accept params
---
tests/recorder/service.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/tests/recorder/service.py b/tests/recorder/service.py
index cc79858e..04aef7e3 100644
--- a/tests/recorder/service.py
+++ b/tests/recorder/service.py
@@ -46,9 +46,9 @@
class TestFunctions(TestCase):
class WorkerMock(object):
- def enqueue_job_by_name(self, operation, mp):
+ def enqueue_job_by_name(self, operation, mp, params):
pass
- def enqueue_nightly_job_by_name(self, operation, mp):
+ def enqueue_nightly_job_by_name(self, operation, mp, params):
pass