-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmetadata_filter.py
More file actions
147 lines (133 loc) · 6.64 KB
/
metadata_filter.py
File metadata and controls
147 lines (133 loc) · 6.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging
import re
from .metadata_collector import FFprobeMetadataCollector, FFprobeMetadataResult
from .exceptions import UnknownFilterSelector, UnknownMetadataParameter, WrongConditionType, UnknownOperator,\
ConditionPairProcessingException, UnknownStreamType, StreamIndexOutOfRange, MetadataCollectionException
from .factory import ffprobe_factory
class FFprobeMetadataFilter:
STREAM_SELECTOR_RE = r'^stream:(v|a):(\d+)$'
COUNT_SELECTOR_RE = r'^count:(v|a)$'
def __init__(self):
logging.debug('Fetching FFprobeMetadataCollector object...')
self._ff_metadata_collector = ffprobe_factory.get_ffprobe_metadata_collector(FFprobeMetadataCollector)
self._stream_selector_re = re.compile(self.STREAM_SELECTOR_RE, re.IGNORECASE)
self._count_selector_re = re.compile(self.COUNT_SELECTOR_RE, re.IGNORECASE)
def filter(self, input_url: str, filter_params: dict) -> bool:
logging.debug('Filtering started with parameters: {}'.format(filter_params))
try:
input_meta = self._ff_metadata_collector.get_metadata(input_url)
except MetadataCollectionException:
logging.warning('Metadata collection error - filter failed')
return False
for selector, selector_data in filter_params.items():
logging.debug('Processing selector "{}"...'.format(selector))
result = False
if selector.lower() == 'format':
logging.debug('This is a format selector')
result = self._filter_format(input_meta, selector_data)
else:
matched = False
match = self._stream_selector_re.match(selector)
if match:
logging.debug('This is a stream selector')
matched = True
result = self._filter_stream(*match.groups(), input_meta, selector_data)
match = self._count_selector_re.match(selector)
if match:
logging.debug('This is a count selector')
matched = True
result = self._filter_count(*match.groups(), input_meta, selector_data)
if not matched:
raise UnknownFilterSelector(selector)
if result:
logging.debug('Passed')
else:
logging.debug('Failed')
return False
logging.debug('Filter passed')
return True
def _filter_format(self, input_meta: FFprobeMetadataResult, selector_data: dict) -> bool:
format_data = input_meta.format
for param, condition in selector_data.items():
try:
param_value = format_data[param]
logging.debug('Format parameter name: {}, value: {}, condition: {}'.format(
param, param_value, condition))
except KeyError as e:
raise UnknownMetadataParameter(param) from e
if not self._process_condition(param_value, condition):
return False
return True
def _filter_stream(self, s_type: str, s_index: str, input_meta: FFprobeMetadataResult, selector_data) -> bool:
s_index = int(s_index)
try:
all_streams_data = getattr(input_meta, '{}_streams'.format(s_type))
except AttributeError as e:
raise UnknownStreamType(s_type) from e
if len(all_streams_data) < s_index:
raise StreamIndexOutOfRange(s_index)
stream_data = all_streams_data[s_index]
for param, condition in selector_data.items():
try:
param_value = stream_data[param]
logging.debug('Stream parameter name: {}, value: {}, condition: {}'.format(
param, param_value, condition))
except KeyError as e:
if param == 'field_mode':
param_value = input_meta.get_field_mode(s_index)
logging.debug('Stream field mode is {}, condition: {}'.format(param_value, condition))
else:
raise UnknownMetadataParameter(param) from e
if not self._process_condition(param_value, condition):
return False
return True
def _filter_count(self, s_type: str, input_meta: FFprobeMetadataResult, selector_data) -> bool:
if s_type == 'v':
vs_count = len(input_meta.v_streams)
logging.debug('Video streams count: {}, condition: {}'.format(vs_count, selector_data))
return self._process_condition(vs_count, selector_data)
elif s_type == 'a':
as_count = len(input_meta.a_streams)
logging.debug('Audio streams count: {}, condition: {}'.format(as_count, selector_data))
return self._process_condition(as_count, selector_data)
else:
raise UnknownStreamType(s_type)
def _process_condition(self, value, condition) -> bool:
t = type(condition)
if t in [int, float, str]:
logging.debug('Condition is a simple value')
return self._process_cond_pair(value, 'eq', condition)
elif t == list:
if type(condition[0]) == str:
logging.debug('Condition is a [operator, value] list')
return self._process_cond_pair(value, *condition)
elif type(condition[0]) == list:
logging.debug('Condition is a list of [operator, value] lists')
return all(map(lambda x: self._process_cond_pair(value, *x), condition))
else:
raise WrongConditionType(type(condition))
def _process_cond_pair(self, value_left, operator: str, value_right) -> bool:
operator = operator.lower()
try:
value_left_typed = type(value_right)(value_left)
except ValueError as e:
raise ConditionPairProcessingException(value_left, operator, value_right) from e
try:
if operator == 'eq':
return value_left_typed == value_right
elif operator == 'neq':
return value_left_typed != value_right
elif operator == 'gt':
return value_left_typed > value_right
elif operator == 'gte':
return value_left_typed >= value_right
elif operator == 'lt':
return value_left_typed < value_right
elif operator == 'lte':
return value_left_typed <= value_right
else:
raise UnknownOperator(operator)
except UnknownOperator as e:
raise e
except Exception as e:
raise ConditionPairProcessingException(value_left_typed, operator, value_right) from e