Source code for minus80.Accession

import os
import getpass
import socket
import urllib
import asyncio
import asyncssh
import os

from contextlib import contextmanager

from .Config import cf

[docs]class Accession(object): ''' From google: Definition (noun): a new item added to an existing collection of books, paintings, or artifacts. An Accession is an item that exists in an experimental collection. Most of the time an accession is interoperable with a *sample*. However, the term sample can become confusing when an experiment has multiple samplings from the same sample, e.g. timecourse or different tissues. ''' def __init__(self, name, files=None, **kwargs): ''' Create a new accession. Parameters ---------- name : str The name of the accession files : iterable of str Files associated with the accession **kwargs : keyword arguments Any number of key=value arguments that contain metadata. Returns ------- An accession object ''' self.name = name if files is not None: self.files = set(files) else: self.files = set() self.metadata = kwargs def __getitem__(self, key): ''' Retrieve metadata about an accession. Parameters ---------- key : str Returns ------- Value from the accession metadata corresponding to the key. ''' return self.metadata[key] def __setitem__(self, key, val): ''' Set metadata about an accession Parameters ---------- key : str The metadata name val : str The value of the metadata ''' self.metadata[key] = val
[docs] def add_file(self, path, scheme='ssh', username=None, hostname=None): ''' Add a file that is associated with the accession. This method will attempt to determine where the file is actually stored based on its path. Currently it supports three different protocols: local, ssh and s3. A local file will looks something like: `/tmp/file1.fastq`. Parameters ---------- path/URL: string The path/URL the the file. The string is parsed for default information (e.g. scheme: string (default: ssh) Specifies the scheme/protocol for accessing the file. Defaults to ssh, also supports s3 username : string (default: None) Defines a username that is authorized to access `hostname` using `protocol`. Defaults to None in which case it will be determined by calling `getpass.getuser()`. hostname : sting (default: None) Defines the ostname that the file is accessible through. Defaults to None, where the hostname will be determined port: int (default: 22) Port to access the file through. Defaults to 22, which is for ssh. NOTE: any keyword arguments passed in will override the values parsed out of the path. Returns ------- None ''' url = urllib.parse.urlparse(path) # Override parsed url values with keywords if scheme is not None: url = url._replace(scheme=scheme) # check if URL parameters were provided via path if url.netloc == '': if username is None: username = getpass.getuser() if hostname is None: hostname = socket.gethostname() netloc = f'{username}@{hostname}' url = url._replace(netloc=netloc) # Convert to absolute path if url.path.startswith('./') or url.path.startswith('../'): path = os.path.abspath(path) url = urllib.parse.urlunparse(url) self.files.add(url)
[docs] def add_files(self, paths, skip_test=False): ''' Add multiple paths that are associated with an accession Parameters ---------- paths : iterable of strings The paths the the files skip_test : bool If true, the method will not test if the file exists Returns ------- None ''' for path in paths: self.add_file(path)
def __str__(self): return '\n'.join(repr(self).split(',')) def __repr__(self): # pragma: no cover ''' String representation of Accession ''' return f'Accession({self.name}, files={self.files}, {self.metadata})' @staticmethod async def _check_file(url): #pragma: no cover ''' asyncronously checks a URL based in its scheme ''' # Parse the URL and connect url = urllib.parse.urlparse(url) async with asyncssh.connect( url.hostname, username=url.username) as conn: return await conn.run( f'[[ -f {url.path} ]] && echo -n "Y" || echo -n "N"' ) def _check_files(self): #pragma: no cover ''' Check to see if files attached to an accession are accessible through ssh Parameters ---------- None Returns ------- Returns True if all files are accessible, otherwise returns a list of files that were unreachable. ''' # Set us up the loop tasks = [] loop = asyncio.get_event_loop() # loop through the files and create tasks files = list(self.files) for url in files: tasks.append(self._check_file(url)) tasks = asyncio.gather(*tasks) loop.run_until_complete(tasks) unreachable = [i for i,r in enumerate(tasks.result()) if r.stdout != 'Y'] if len(unreachable) == 0: return True else: return [files[x] for x in unreachable]