Source code for mosromgr.utils.s3

# mosromgr: Python library for managing MOS running orders
# Copyright 2021 BBC
# SPDX-License-Identifier: Apache-2.0

from typing import Optional, List

import boto3

class S3:
    Class to defer initialising S3 resources until needed
    def __init__(self):
        self._resource = None
        self._client = None

    def resource(self):
        if self._resource is None:
            self._resource = boto3.resource('s3')
        return self._resource

    def client(self):
        if self._client is None:
            self._client = boto3.client('s3')
        return self._client

[docs]def get_mos_files( bucket_name: str, prefix: Optional[str] = None, *, suffix: str = '.mos.xml' ) -> List[str]: """ Retrieve MOS files from given S3 bucket in location defined by *prefix*. Returns a list of file keys. """ if prefix is None: prefix = '' paginator = s3.client.get_paginator('list_objects') files = [] for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix): try: contents = page['Contents'] except KeyError: break for file in contents: key = file['Key'] if key.endswith(suffix): files.append(key) return files
[docs]def get_file_contents(bucket_name, file_key): """ Open the S3 file and return its contents as a string """ o = s3.resource.Object(bucket_name, file_key).get() b = o['Body'] return
s3 = S3()