# mosromgr: Python library for managing MOS running orders
# Copyright 2021 BBC
# SPDX-License-Identifier: Apache-2.0
import logging
import warnings
from .mostypes import MosFile, RunningOrder, RunningOrderEnd
from .utils import s3
from .exc import MosMergeError, InvalidMosCollection, MosMergeNonStrictWarning
logger = logging.getLogger('mosromgr.moscollection')
logging.basicConfig(level=logging.INFO)
[docs]class MosReader:
"""
Internal construct for opening and inspecting a MOS file for the purposes of
classifying, sorting and validating a :class:`MosCollection`. Provides the
means to reconstruct the :class:`~mosromgr.mostypes.MosFile` instance when
needed in order to preserve memory usage.
"""
def __init__(self, mo, *, restore_fn, restore_args):
self._message_id = mo.message_id
self._ro_id = mo.ro_id
self._mos_type = mo.__class__
self._restore_fn = restore_fn
self._restore_args = restore_args
@classmethod
def from_file(cls, mos_file_path):
mo = MosFile.from_file(mos_file_path)
# store a method of restoring the mos object from the determined class
return cls(mo,
restore_fn=mo.__class__.from_file,
restore_args=(mos_file_path, ))
@classmethod
def from_string(cls, mos_file_contents):
mo = MosFile.from_string(mos_file_contents)
# store a method of restoring the mos object from the determined class
return cls(mo,
restore_fn=mo.__class__.from_string,
restore_args=(mos_file_contents, ))
@classmethod
def from_s3(cls, bucket_name, mos_file_key):
mo = MosFile.from_s3(bucket_name=bucket_name, mos_file_key=mos_file_key)
# store a method of restoring the mos object from the determined class
return cls(mo,
restore_fn=mo.__class__.from_s3,
restore_args=(bucket_name, mos_file_key))
def __repr__(self):
return f'<{self.__class__.__name__} type {self.mos_type.__name__}>'
def __lt__(self, other):
return self.message_id < other.message_id
def __gt__(self, other):
return self.message_id > other.message_id
@property
def message_id(self):
"The message ID of the MOS file (:class:`str`)"
return self._message_id
@property
def ro_id(self):
"The MOS file's running order ID (:class:`str`)"
return self._ro_id
@property
def mos_type(self):
"""
The :class:`~mosromgr.mostypes.MosFile` subclass this object was
classified as (returns the class object, not an instance or a string)
"""
return self._mos_type
@property
def mos_object(self):
"""
Restore the MOS object and return it
(:class:`~mosromgr.mostypes.MosFile`)
"""
return self._restore_fn(*self._restore_args)
[docs]class MosCollection:
"""
Wrapper for a collection of MOS files representing a partial or complete
programme
"""
def __init__(self, mos_readers, *, allow_incomplete=False):
logger.info("Making MosCollection from %s MosReaders", len(mos_readers))
self._mos_readers = mos_readers
self._ro = None
try:
self._validate(allow_incomplete=allow_incomplete)
except AssertionError as e:
raise InvalidMosCollection(f"Failed to validate MosCollection: {e}") from e
[docs] @classmethod
def from_files(cls, mos_file_paths, *, allow_incomplete=False):
"""
Construct from a list of MOS file paths
:type mos_file_paths:
list
:param mos_file_paths:
A list of paths to MOS files
:type allow_incomplete:
bool
:param allow_incomplete:
If ``False`` (the default), the collection is permitted to be
constructed without a ``roDelete``. If ``True``, a
:class:`~mosromgr.exc.InvalidMosCollection` will be raised if one is
not present. (keyword-only argument)
"""
logger.info("Making MosCollection from %s files", len(mos_file_paths))
mos_readers = sorted([
mr
for mr in [MosReader.from_file(mfp) for mfp in mos_file_paths]
if mr is not None
])
return cls(mos_readers, allow_incomplete=allow_incomplete)
[docs] @classmethod
def from_strings(cls, mos_file_strings, *, allow_incomplete=False):
"""
Construct from a list of MOS document XML strings
:type mos_file_paths:
list
:param mos_file_paths:
A list of paths to MOS files
:type allow_incomplete:
bool
:param allow_incomplete:
If ``False`` (the default), the collection is permitted to be
constructed without a ``roDelete``. If ``True``, a
:class:`~mosromgr.exc.InvalidMosCollection` will be raised if one is
not present. (keyword-only argument)
"""
logger.info("Making MosCollection from %s strings", len(mos_file_strings))
mos_readers = sorted([
mr
for mr in [MosReader.from_string(mfs) for mfs in mos_file_strings]
if mr is not None
])
return cls(mos_readers, allow_incomplete=allow_incomplete)
[docs] @classmethod
def from_s3(cls, *, bucket_name, prefix, suffix='.mos.xml', allow_incomplete=False):
"""
Construct from a list of MOS files in an S3 bucket
:type bucket_name:
str
:param bucket_name:
The name of the S3 bucket (keyword-only argument)
:type prefix:
str
:param prefix:
The prefix of the file keys in the S3 bucket (keyword-only argument)
:type suffix:
str
:param suffix:
The suffix of the file keys in the S3 bucket (keyword-only argument).
Defaults to '.mos.xml'.
:type allow_incomplete:
bool
:param allow_incomplete:
If ``True``, the collection is permitted to be constructed without a
``roDelete``. If ``False`` (the default), a
:class:`~mosromgr.exc.InvalidMosCollection` will be raised if one is
not present. (keyword-only argument)
"""
mos_file_keys = s3.get_mos_files(
bucket_name=bucket_name,
prefix=prefix,
suffix=suffix,
)
logger.info("Making MosCollection from %s S3 files", len(mos_file_keys))
mos_readers = sorted([
mr
for mr in [MosReader.from_s3(bucket_name, key) for key in mos_file_keys]
if mr is not None
])
return cls(mos_readers, allow_incomplete=allow_incomplete)
def __repr__(self):
if self.ro is not None:
return f'<{self.__class__.__name__} {self.ro_slug}>'
return f'<{self.__class__.__name__}>'
[docs] def __str__(self):
"The XML string of the collection's running order"
return str(self.ro)
@property
def mos_readers(self):
"""
A list of :class:`MosReader` objects representing all MOS files in the
collection, except the :class:`~mosromgr.mostypes.RunningOrder`
(``roCreate``) which is held in :attr:`ro`
"""
return self._mos_readers
@property
def ro(self):
"The collection's :class:`~mosromgr.mostypes.RunningOrder` object"
return self._ro
@property
def ro_id(self):
"The running order ID"
return self.ro.ro_id
@property
def ro_slug(self):
"The running order slug"
return self.ro.ro_slug
@property
def completed(self):
"""
Whether or not the running order has had a
:class:`~mosromgr.mostypes.RunningOrderEnd` merged (:class:`bool`)
"""
return self.ro.completed
def _validate(self, allow_incomplete=False):
"""
Check a single roCreate is present, and if *allow_incomplete* is True,
also check a single roDelete is present.
"""
ro_id = self.mos_readers[0].ro_id
assert all(mr.ro_id == ro_id for mr in self.mos_readers), "Mixed RO IDs found"
ro_creates = [
mr for mr in self.mos_readers if mr.mos_type == RunningOrder
]
assert len(ro_creates) == 1, f"{len(ro_creates)} roCreates found"
self._ro = ro_creates[0].mos_object
ro_deletes = [
mr for mr in self.mos_readers if mr.mos_type == RunningOrderEnd
]
assert len(ro_deletes) < 2, f"{len(ro_deletes)} roDeletes found"
if not allow_incomplete:
assert len(ro_deletes) == 1, f"{len(ro_deletes)} roDeletes found"
self._mos_readers = [
mr for mr in self.mos_readers if mr.mos_type != RunningOrder
]
[docs] def merge(self, *, strict=True):
"""
Merge all MOS files into the collection's running order (:attr:`ro`). If
*strict* is ``True`` (the default), then merge errors will be fatal. If
``False``, then merge errors will be downgraded to warnings.
"""
logger.info("Merging %s MosReaders into RunningOrder", len(self.mos_readers))
for mr in self.mos_readers:
mo = mr.mos_object
logger.info("Merging %s %s", mo.__class__.__name__, mr.message_id)
try:
self._ro += mo
except MosMergeError as e:
if strict:
raise
logger.error(str(e))
warnings.warn(str(e), MosMergeNonStrictWarning)
logger.info("Completed merging %s mos files", len(self.mos_readers))