# mosromgr: Python library for managing MOS running orders
# Copyright 2021 BBC
# SPDX-License-Identifier: Apache-2.0
from functools import total_ordering
import logging
import warnings
from collections.abc import Callable
from typing import Union, Tuple, List
from pathlib import Path
from .mostypes import MosFile, RunningOrder, RunningOrderEnd
from .utils import s3
from .exc import MosMergeError, InvalidMosCollection, MosMergeNonStrictWarning
logger = logging.getLogger('mosromgr.moscollection')
logging.basicConfig(level=logging.INFO)
[docs]@total_ordering
class MosReader:
"""
Internal construct for opening and inspecting a MOS file for the purposes of
classifying, sorting and validating a :class:`MosCollection`. Provides the
means to reconstruct the :class:`~mosromgr.mostypes.MosFile` instance when
needed in order to preserve memory usage.
"""
def __init__(self, mo: MosFile, *, restore_fn: Callable, restore_args: Tuple):
self._message_id = mo.message_id
self._ro_id = mo.ro_id
self._mos_type = mo.__class__
self._restore_fn = restore_fn
self._restore_args = restore_args
@classmethod
def from_file(cls, mos_file_path: Union[Path, str]):
mo = MosFile.from_file(mos_file_path)
# store a method of restoring the mos object from the determined class
return cls(mo,
restore_fn=mo.__class__.from_file,
restore_args=(mos_file_path, ))
@classmethod
def from_string(cls, mos_file_contents: str):
mo = MosFile.from_string(mos_file_contents)
# store a method of restoring the mos object from the determined class
return cls(mo,
restore_fn=mo.__class__.from_string,
restore_args=(mos_file_contents, ))
@classmethod
def from_s3(cls, bucket_name: str, mos_file_key: str):
mo = MosFile.from_s3(bucket_name=bucket_name, mos_file_key=mos_file_key)
# store a method of restoring the mos object from the determined class
return cls(mo,
restore_fn=mo.__class__.from_s3,
restore_args=(bucket_name, mos_file_key))
def __repr__(self):
return f'<{self.__class__.__name__} type {self.mos_type.__name__}>'
def __lt__(self, other):
return self.message_id < other.message_id
@property
def message_id(self) -> str:
"""
The message ID of the MOS file
"""
return self._message_id
@property
def ro_id(self) -> str:
"""
The MOS file's running order ID
"""
return self._ro_id
@property
def mos_type(self) -> MosFile:
"""
The :class:`~mosromgr.mostypes.MosFile` subclass this object was
classified as (returns the class object, not an instance or a string)
"""
return self._mos_type
@property
def mos_object(self) -> MosFile:
"""
Restore the MOS object and return it
"""
return self._restore_fn(*self._restore_args)
[docs]class MosCollection:
"""
Wrapper for a collection of MOS files representing a partial or complete
programme
"""
def __init__(self, mos_readers: List[MosReader], *, allow_incomplete: bool = False):
logger.info("Making MosCollection from %s MosReaders", len(mos_readers))
self._mos_readers = mos_readers
self._ro = None
try:
self._validate(allow_incomplete=allow_incomplete)
except AssertionError as e:
raise InvalidMosCollection(f"Failed to validate MosCollection: {e}") from e
[docs] @classmethod
def from_files(cls, mos_file_paths: List[Union[Path, str]], *, allow_incomplete: bool = False):
"""
Construct from a list of MOS file paths
:type mos_file_paths:
list
:param mos_file_paths:
A list of paths to MOS files
:type allow_incomplete:
bool
:param allow_incomplete:
If ``False`` (the default), the collection is permitted to be
constructed without a ``roDelete``. If ``True``, a
:class:`~mosromgr.exc.InvalidMosCollection` will be raised if one is
not present. (keyword-only argument)
"""
logger.info("Making MosCollection from %s files", len(mos_file_paths))
mos_readers = sorted([
mr
for mr in [MosReader.from_file(mfp) for mfp in mos_file_paths]
if mr is not None
])
return cls(mos_readers, allow_incomplete=allow_incomplete)
[docs] @classmethod
def from_strings(cls, mos_file_strings: List[str], *, allow_incomplete: bool = False):
"""
Construct from a list of MOS document XML strings
:type mos_file_strings:
list
:param mos_file_strings:
A list of strings containing MOS file contents
:type allow_incomplete:
bool
:param allow_incomplete:
If ``False`` (the default), the collection is permitted to be
constructed without a ``roDelete``. If ``True``, a
:class:`~mosromgr.exc.InvalidMosCollection` will be raised if one is
not present. (keyword-only argument)
"""
logger.info("Making MosCollection from %s strings", len(mos_file_strings))
mos_readers = sorted([
mr
for mr in [MosReader.from_string(mfs) for mfs in mos_file_strings]
if mr is not None
])
return cls(mos_readers, allow_incomplete=allow_incomplete)
[docs] @classmethod
def from_s3(
cls,
*,
bucket_name: str,
prefix: str,
suffix: str = '.mos.xml',
allow_incomplete: bool = False
):
"""
Construct from a list of MOS files in an S3 bucket
:type bucket_name:
str
:param bucket_name:
The name of the S3 bucket (keyword-only argument)
:type prefix:
str
:param prefix:
The prefix of the file keys in the S3 bucket (keyword-only argument)
:type suffix:
str
:param suffix:
The suffix of the file keys in the S3 bucket (keyword-only argument).
Defaults to '.mos.xml'.
:type allow_incomplete:
bool
:param allow_incomplete:
If ``True``, the collection is permitted to be constructed without a
``roDelete``. If ``False`` (the default), a
:class:`~mosromgr.exc.InvalidMosCollection` will be raised if one is
not present. (keyword-only argument)
"""
mos_file_keys = s3.get_mos_files(
bucket_name=bucket_name,
prefix=prefix,
suffix=suffix,
)
logger.info("Making MosCollection from %s S3 files", len(mos_file_keys))
mos_readers = sorted([
mr
for mr in [MosReader.from_s3(bucket_name, key) for key in mos_file_keys]
if mr is not None
])
return cls(mos_readers, allow_incomplete=allow_incomplete)
def __repr__(self):
if self.ro is not None:
return f'<{self.__class__.__name__} {self.ro_slug}>'
return f'<{self.__class__.__name__}>'
[docs] def __str__(self):
"The XML string of the collection's running order"
return str(self.ro)
@property
def mos_readers(self) -> List[MosReader]:
"""
A list of :class:`MosReader` objects representing all MOS files in the
collection, except the :class:`~mosromgr.mostypes.RunningOrder`
(``roCreate``) which is held in :attr:`ro`
"""
return self._mos_readers
@property
def ro(self) -> RunningOrder:
"""
The collection's :class:`~mosromgr.mostypes.RunningOrder` object
"""
return self._ro
@property
def ro_id(self) -> str:
"""
The running order ID
"""
return self.ro.ro_id
@property
def ro_slug(self) -> str:
"""
The running order slug
"""
return self.ro.ro_slug
@property
def completed(self) -> bool:
"""
Whether or not the running order has had a
:class:`~mosromgr.mostypes.RunningOrderEnd` merged (:class:`bool`)
"""
return self.ro.completed
def _validate(self, allow_incomplete: bool = False):
"""
Check a single roCreate is present, and if *allow_incomplete* is True,
also check a single roDelete is present.
"""
ro_id = self.mos_readers[0].ro_id
assert all(mr.ro_id == ro_id for mr in self.mos_readers), "Mixed RO IDs found"
ro_creates = [
mr for mr in self.mos_readers if mr.mos_type == RunningOrder
]
assert len(ro_creates) == 1, f"{len(ro_creates)} roCreates found"
self._ro = ro_creates[0].mos_object
ro_deletes = [
mr for mr in self.mos_readers if mr.mos_type == RunningOrderEnd
]
assert len(ro_deletes) < 2, f"{len(ro_deletes)} roDeletes found"
if not allow_incomplete:
assert len(ro_deletes) == 1, f"{len(ro_deletes)} roDeletes found"
self._mos_readers = [
mr for mr in self.mos_readers if mr.mos_type != RunningOrder
]
[docs] def merge(self, *, strict: bool = True):
"""
Merge all MOS files into the collection's running order (:attr:`ro`). If
*strict* is ``True`` (the default), then merge errors will be fatal. If
``False``, then merge errors will be downgraded to warnings.
"""
logger.info("Merging %s MosReaders into RunningOrder", len(self.mos_readers))
for mr in self.mos_readers:
mo = mr.mos_object
logger.info("Merging %s %s", mo.__class__.__name__, mr.message_id)
try:
self._ro += mo
except MosMergeError as e:
if strict:
raise
logger.error(str(e))
warnings.warn(str(e), MosMergeNonStrictWarning)
logger.info("Completed merging %s mos files", len(self.mos_readers))