Source code for minus80.CloudData

from .Tools import get_files
from .Config import cf

import os
import lzma
import sys
import threading
import tarfile
import requests
from collections import defaultdict


__all__ = ['CloudData']


[docs]def CloudData(engine='s3'): if engine == 's3': return S3CloudData() else: raise ValueError(f'Cannot use {engine} as a cloud engine.')
class BaseCloudData(object): #pragma: no cover def __init__(self): pass def push(self, name, dtype, raw=False, compress=False): pass def pull(self, name, dtype, raw=False): pass def list(self, name=None, dtype=None, raw=None): pass class publics3CloudData(BaseCloudData): def __init__(self): import xml.etree.ElementTree as ET # Fetch creds from config file aws_endpoint = cf.cloud.endpoint aws_bucket = cf.cloud.bucket # override config with ENV variables if 'CLOUD_ENDPOINT' in os.environ: aws_endpoint = os.environ['CLOUD_ENDPOINT'] if 'CLOUD_BUCKET' in os.environ: aws_bucket = os.environ['CLOUD_BUCKET'] # get the URL for the endpoint r = requests.get(f'{aws_endpoint}/{aws_bucket}') # Get the XML from the string x = ET.fromstring(r.content.decode('utf-8')) self.endpoint = x.find('Endpoint').text def push(self, dtype, name, raw=False): raise NotImplementedError('Cannot push to public repos') def pull(self, dtype, name, raw=False): # https://s3.amazonaws.com/minus80/databases/Cohort.RNACohort if raw == True: dir_prefix = 'Raw' name = os.path.basename(name) else: dir_prefix = 'databases' output = os.path.join(cf.options.basedir,'tmp',f'{dtype}.{name}.tar') r = requests.get(f'https://{self.endpoint}/{dir_prefix}/{dtype}.{name}',stream=True) if r.status_code != 200: raise ValueError(f"An error occured, HTTP code:{r.status_code}") num_bytes = int(r.headers.get('content-length',0)) progress = ProgressDownloadPercentage(output,num_bytes) with open(output,'wb') as OUT: for chunk in r.iter_content(1024): OUT.write(chunk) progress(1024) if output.endswith('.tar'): tar = tarfile.open(output,'r') tar.extractall(path=os.path.join(cf.options.basedir,'databases')) def list(self, dtype=None, name=None, raw=None): pass class ProgressPercentage(object): ''' Borrowed from: https://boto3.readthedocs.io/en/latest/_modules/boto3/s3/transfer.html ''' def __init__(self, filename): self._filename = filename self._size = float(os.path.getsize(filename)) self._seen_so_far = 0 self._lock = threading.Lock() def __call__(self, bytes_amount): # To simplify we'll assume this is hooked up # to a single filename. with self._lock: self._seen_so_far += bytes_amount percentage = (self._seen_so_far / self._size) * 100 sys.stdout.write( "\r%s %s / %s (%.2f%%)" % ( self._filename, self._seen_so_far, self._size, percentage)) sys.stdout.flush() class ProgressDownloadPercentage(object): def __init__(self,filename,total_bytes): self._filename = filename self._total_bytes = total_bytes self._seen_so_far = 0 self._lock = threading.Lock() def __call__(self, bytes_amount): with self._lock: self._seen_so_far += bytes_amount percentage = (self._seen_so_far / self._total_bytes) * 100 sys.stdout.write( "\r%s %s / %s (%.2f%%)" % ( self._filename, self._seen_so_far, self._total_bytes, percentage) ) sys.stdout.flush() class S3CloudData(BaseCloudData): ''' CloudData objects allow minus80 to interact with the cloud to store both prepared as well as raw datasets. ''' def __init__(self): ''' Create a CloudData object. Once proper S3 credentials are stored in the config file (~/.minus80.conf) initialization takes no arguments. ''' import boto3 from botocore.client import Config # Fetch creds from config file aws_endpoint = cf.cloud.endpoint aws_bucket = cf.cloud.bucket aws_access_key = cf.cloud.access_key aws_secret_key = cf.cloud.secret_key # override config with ENV variables if 'CLOUD_ENDPOINT' in os.environ: aws_endpoint = os.environ['CLOUD_ENDPOINT'] if 'CLOUD_BUCKET' in os.environ: aws_bucket = os.environ['CLOUD_BUCKET'] if 'CLOUD_ACCESS_KEY' in os.environ: aws_access_key = os.environ['CLOUD_ACCESS_KEY'] if 'CLOUD_SECRET_KEY' in os.environ: aws_secret_key = os.environ['CLOUD_SECRET_KEY'] try: self.s3 = boto3.client( service_name='s3', endpoint_url=aws_endpoint, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, config=Config(s3={'addressing_style': 'path'}) ) #self.bucket = f'minus80-{aws_access_key.lower()}' self.bucket = aws_bucket # make sure the minus80 bucket exists #if self.bucket not in [x['Name'] for x in self.s3.list_buckets()['Buckets']]: # Append access key to bucket name so multiple users can use host try: self.s3.create_bucket(Bucket=self.bucket) except self.s3.exceptions.BucketAlreadyOwnedByYou as e: pass except Exception as e: raise ValueError( 'Accessing the cloud requires either setting up AWS credentials in ~/.minus80.conf ' 'contact help@linkage.io for assistance',e ) def push(self, dtype, name, raw=False): ''' Store a minus80 dataset in the cloud using its name and dtype (e.g. Cohort). the dtype is the name of the Freezable class or object. See :ref:`freezable`. Assume we are storing ``x = Cohort('experiment1')`` dtype : str The type of freezable object (i.e. 'Cohort') name : str The name of the dataset (i.e. 'experiment1') raw : bool, default=False If True, raw files can be stored in the cloud. In this case, name changes to the file name and dtype changes to a string representing the future dtype or anything that describes the type of data that is being stored. ''' from boto3.s3.transfer import S3Transfer transfer = S3Transfer(self.s3) if raw == True: # The name is a FILENAME filename = name key = os.path.basename(filename) transfer.upload_file(filename, self.bucket, f'Raw/{dtype}.{key}', callback=ProgressPercentage(filename) ) else: key = f'{dtype}.{name}' data_path = os.path.join( cf.options.basedir, 'databases', key ) if not os.path.exists(data_path): raise ValueError('There were no datasets with that name') # Tar it up tarpath = os.path.join(cf.options.basedir,'tmp',key+'.tar') tar = tarfile.open(tarpath,'w',dereference=True) tar.add(data_path,recursive=True,arcname=key) tar.close() transfer.upload_file(tarpath, self.bucket, f'databases/{key}', callback=ProgressPercentage(tarpath) ) os.unlink(tarpath) def pull(self, dtype, name, raw=False, output=None): ''' Retrive a minus80 dataset in the cloud using its name and dtype (e.g. Cohort). the dtype is the name of the Freezable class or object. See :ref:`freezable`. Assume we are storing ``x = Cohort('experiment1')`` dtype : str The type of freezable object (i.e. 'Cohort') name : str The name of the dataset (i.e. 'experiment1') raw : bool, default=False If True, raw files can be stored in the cloud. In this case, name changes to the file name and dtype changes to a string representing the future dtype or anything that describes the type of data that is being stored. ''' from boto3.s3.transfer import S3Transfer transfer = S3Transfer(self.s3) if raw == True: prefix_dir = 'Raw' name = os.path.basename(name) if output is None: output = name else: prefix_dir = 'databases' output = os.path.join(cf.options.basedir,'tmp',f'{dtype}.{name}.tar') # get the number of bytes in the object num_bytes = self.s3.list_objects( Bucket=self.bucket, Prefix=f'{prefix_dir}/{dtype}.{name}' )['Contents'][0]['Size'] # download the object transfer.download_file( self.bucket, f'{prefix_dir}/{dtype}.{name}', output, callback = ProgressDownloadPercentage(output,num_bytes) ) # Extract if its a tar file if output.endswith('.tar'): tar = tarfile.open(output,'r') tar.extractall(path=os.path.join(cf.options.basedir,'databases')) def list(self, name=None, dtype=None, raw=False): ''' List datasets that are in the cloud Parameters ---------- dtype : str The type of freezable object (i.e. 'Cohort') name : str The name of the dataset (i.e. 'experiment1') raw : bool, default=False If true, list raw datasets instead of frozen ones. ''' items = defaultdict(set) try: for item in self.s3.list_objects(Bucket=self.bucket)['Contents']: key = item['Key'] if key.startswith('Raw') and raw == False: pass elif not key.startswith('Raw') and raw == True: pass else: bucket, key = key.split('/') key_dtype, key_name = key.split('.',maxsplit=1) if dtype != None and key_dtype != dtype: pass elif name != None and not key_name.startswith(name): pass else: items[key_dtype].add(key_name) if len(items) == 0: print('Nothing here yet!') else: if raw: print('###### Raw Data: ######') for key, vals in items.items(): print(f'-----{key}------') for i,name in enumerate(vals,1): print(f'{i}. {name}') except KeyError: if len(items) == 0: print('Nothing here yet!') def remove(self, dtype, name, raw=False): ''' Remove a dataset from the cloud ''' if raw: key = f'Raw/{dtype}.{name}' else: key = f'databases/{dtype}.{name}' self.s3.delete_object(Bucket=self.bucket,Key=key) def nuke(self): ''' Nuke all the datasets in the cloud ''' try: for obj in self.s3.list_objects(Bucket=self.bucket)['Contents']: self.s3.delete_object(Bucket=self.bucket,Key=obj['Key']) except KeyError as e: return