Source code for minus80.Cohort

from functools import lru_cache
from collections import Counter,defaultdict

from minus80 import Accession, Freezable
from difflib import SequenceMatcher

import numbers
import math
import warnings
import logging
import asyncssh
import urllib
import asyncio
import os


__all__ = ['Cohort']

def invalidates_AID_cache(fn):
    from functools import wraps
    @wraps(fn)
    def wrapped(self,*args,**kwargs):
        fn(self,*args,**kwargs) 
        self._get_AID.cache_clear()
    return wrapped


[docs]class Cohort(Freezable): ''' A Cohort is a named set of accessions. Once cohorts are created, they are persistant as they are stored in the disk by minus80. ''' def __init__(self, name, parent=None): super().__init__(name,parent=parent) self.name = name self._initialize_tables() self.log = logging.getLogger(f'minus80.Cohort.{name}') #------------------------------------------------------# # Properties # #------------------------------------------------------# @property def columns(self): ''' Return a list of all the available metadata stored for available Accessions ''' return [ x[0] for x in self._db.cursor().execute(''' SELECT DISTINCT(key) FROM metadata; ''').fetchall() ] @property @lru_cache(65536) def names(self): ''' Return a list of all available names and aliases ''' return self.search_accessions('%') @property def files(self): return [x[0] for x in self._db.cursor().execute(''' SELECT path FROM raw_files WHERE ignore != 1 ''').fetchall() ] @property def unassigned_files(self): assigned = set([x[0] for x in self._db.cursor().execute("SELECT DISTINCT(path) FROM files").fetchall() ]) return [x for x in self.files if x not in assigned] @property def _AID_mapping(self): return { x.name: x['AID'] for x in self } @property def num_files(self): return len(self.files) def as_DataFrame(self): try: import pandas as pd except ImportError as e: raise ImportError('Pandas must be installed to use this feature') long_form = pd.DataFrame(self._db.cursor().execute(''' SELECT name,key,val FROM accessions acc JOIN metadata met on acc.AID = met.AID; ''').fetchall(),columns=['name','key','val']) return long_form.pivot(index='name',columns='key',values='val') #------------------------------------------------------# # Methods # #------------------------------------------------------#
[docs] def random_accession(self): ''' Returns a random accession from the Cohort Parameters ---------- None Returns ------- Accession An Accession object ''' name = self._db.cursor().execute(''' SELECT name from accessions ORDER BY RANDOM() LIMIT 1; ''').fetchone()[0] return self[name]
[docs] def random_accessions(self, n=1, replace=False): ''' Returns a list of random accessions from the Cohort, either with or without replacement. Parameters ---------- n : int The number of random accessions to retrieve replace: bool If false, randomimzation does not include replacement ''' if replace is False: if n > len(self): raise ValueError( f'Only {len(self)} accessions in cohort. Cannot' ' get {n} samples. See replace parameter in help.' ) return ( self[name] for (name, ) in self._db.cursor().execute(''' SELECT name from accessions ORDER BY RANDOM() LIMIT ?; ''', (n, )) ) else: return (self.random_accession() for _ in range(n))
[docs] def add_accessions(self, accessions): ''' Add multiple Accessions at once ''' with self._bulk_transaction() as cur: # When a name is added, it is automatically assigned an ID cur.executemany(''' INSERT OR IGNORE INTO accessions (name) VALUES (?) ''', [(x.name, ) for x in accessions]) # Fetch that ID AID_map = self._AID_mapping # Populate the metadata and files tables cur.executemany(''' INSERT OR REPLACE INTO metadata (AID, key, val) VALUES (?, ?, ?) ''', ( (AID_map[accession.name], k, v) for accession in accessions for k, v in accession.metadata.items() ) ) cur.executemany(''' INSERT OR REPLACE INTO files (AID, path) VALUES (?, ?) ''', ( (AID_map[accession.name], file) for accession in accessions for file in accession.files ) ) return [self[x] for x in accessions]
[docs] def add_accession(self, accession): ''' Add a sample to the Database ''' with self._bulk_transaction() as cur: # When a name is added, it is automatically assigned an ID cur.execute(''' INSERT OR IGNORE INTO accessions (name) VALUES (?) ''', (accession.name, )) # Fetch that ID AID = self._get_AID(accession) # Populate the metadata and files tables cur.executemany(''' INSERT OR REPLACE INTO metadata (AID, key, val) VALUES (?, ?, ?) ''', ((AID, k, v) for k, v in accession.metadata.items()) ) cur.executemany(''' INSERT OR IGNORE INTO raw_files (path) VALUES (?) ''', ((file,) for file in accession.files) ) return self[accession]
[docs] def add_accessions_from_data_frame(self,df,name_col): ''' Add accessions from data frame. This assumes each row is an Accession and that the properties of the accession are stored in the columns. Parameters ---------- df : pandas.DataFrame The pandas data frame containing one accession per row name_col : string The column containing the accession names Example ------- >>> df = pd.DataFrame( [['S1' 23 'O'], ['S2' 30 'O+']], columns = ['Name','Age','Type'] ) >>> x = m80.add_accessions_from_data_frame(df,'Name') Would yield two Accessions: S1 and S2 with Age and Type properties. ''' if name_col not in df.columns: raise ValueError(f'{name_col}S not a valid column name') # filter out rows with NaN name_col values # The tilda operator is a boolean inversion df = df.loc[~df[name_col].isnull(),:] accessions = [] # Iterate over the rows and create and accessions from each one for i,row in df.iterrows(): d = dict(row) name = d[name_col] del d[name_col] # Get rid of missing data for k,v in list(d.items()): if isinstance(v,numbers.Number) and math.isnan(v): del d[k] else: d[k] = str(v) accessions.append(Accession(name, files=None, **d)) self.add_accessions(accessions)
[docs] def alias_column(self, colname,min_alias_length=3): ''' Assign an accession column as aliases ''' cur_names = set(self.names) with self._bulk_transaction() as cur: alias_dict = {a:aid for a,aid in cur.execute(''' SELECT val,AID FROM metadata WHERE key = ? ''',(colname,)).fetchall()} # We only want unique aliases unique_aliases = [] alias_counts = Counter([x for x in alias_dict.keys()]) for alias,count in alias_counts.items(): if count > 1 or alias in cur_names: self.log.warning(f"Cannot use {alias} as it is not unique") elif len(alias) < min_alias_length: self.log.warning(f"Skipping {alias} as it is too short (<{min_alias_length})") else: unique_aliases.append((alias,alias_dict[alias])) cur.executemany(''' INSERT INTO aliases (alias,AID) VALUES (?,?) ''',unique_aliases)
[docs] @invalidates_AID_cache def drop_aliases(self): ''' Clear the aliases from the database ''' self._db.cursor().execute('DELETE FROM aliases')
def drop_accessions(self): with self._bulk_transaction() as cur: cur.execute(''' DELETE FROM accessions; DELETE FROM aliases; DELETE FROM metadata; DELETE FROM aid_files; ''')
[docs] def assimilate_files(self,files,best_only=True): ''' Take a list of files and assign them to Accessions ''' results = defaultdict(set) for f in files: matches = self.search_accessions(os.path.basename(f)) if len(matches) == 0: results['unmatched'].add(f) elif best_only: results[matches[0]].add(f) else: for m in matches: results[m].add(f) return results
[docs] def interactive_ignore_pattern(self,pattern,n=20): ''' Start an interactive prompt to ignore patterns in file names (e.g. "test") ''' from pprint import pprint import click matched_files = self.search_files(pattern) for i in range(0,len(matched_files),n): subset = matched_files[i:i+n] print('Ignore the following?') pprint(subset) if input("[Y/n]:").upper() == 'Y': self.ignore_files(subset) click.clear()
[docs] def ignore_files(self,files): ''' ignore file paths ''' with self._bulk_transaction() as cur: cur.executemany(''' UPDATE raw_files SET ignore = 1 WHERE path = ? ''',[(x,) for x in files])
[docs] def search_files(self,path): ''' Perform a search of files names (path) ''' cur = self._db.cursor() name = f'%{path}%' names = cur.execute( 'SELECT path FROM raw_files WHERE path LIKE ? and ignore != 1',(name,) ).fetchall() return [x[0] for x in names]
[docs] def search_accessions(self,name,include_scores=False,recurse=True): ''' Performs a search of accession names ''' cur = self._db.cursor() name = f'%{name}%' names = cur.execute( 'SELECT name FROM accessions WHERE name LIKE ?',(name,) ).fetchall() aliases = cur.execute( 'SELECT alias FROM aliases WHERE alias LIKE ?',(name,) ).fetchall() results = [(x[0],100) for x in names + aliases] # Find and Subset matches. e.g. Fat_shoulder_1 would # match 'M7956_Fat_shoulder_1' if len(results) == 0 and recurse == True: matches = [ SequenceMatcher(None,name,x).find_longest_match(0,len(name),0,len(x)) \ for x in self.names ] best = sorted(matches,key=lambda x:x.size,reverse=True)[0] best = name[best.a:best.a+best.size] best = self.search_accessions(best,include_scores=True,recurse=False)[0] if len(best) > 0: results.append(best) results = sorted(results,key=lambda x:x[1],reverse=True) if include_scores == False: results = [x[0] for x in results] return results
[docs] def crawl_host(self,hostname='localhost',path='/', username=None,glob='*.fastq'): ''' Use SSH to crawl a host looking for raw files ''' if username is None: username = getpass.getuser() async def crawl(): find_command = f'find -L {path} -name "{glob}"' async with asyncssh.connect(hostname,username=username) as conn: result = await conn.run(find_command,check=False) files = result.stdout.split("\n") return files loop = asyncio.get_event_loop() files = loop.run_until_complete(asyncio.gather(crawl()))[0] for f in files: if not f.startswith('/'): f = path + f self.add_raw_file(f,scheme='ssh',username=username, hostname=hostname) self.log.warning(f'Found {len(files)} raw files')
[docs] def add_raw_file(self,path,scheme='ssh', username=None,hostname=None): ''' Add a raw file to the Cohort ''' url = urllib.parse.urlparse(path) # Override parsed url values with keywords if scheme is not None: url = url._replace(scheme=scheme) # check if URL parameters were provided via path if url.netloc == '': if username is None: username = getpass.getuser() if hostname is None: hostname = socket.gethostname() netloc = f'{username}@{hostname}' url = url._replace(netloc=netloc) # Convert to absolute path if url.path.startswith('./') or url.path.startswith('../'): path = os.path.abspath(path) path = urllib.parse.urlunparse(url) self._db.cursor().execute(''' INSERT OR IGNORE INTO raw_files (path) VALUES (?) ''',(path,))
#------------------------------------------------------# # Magic Methods # #------------------------------------------------------# def __repr__(self): return (f'Cohort("{self.name}") -- \n' f'\tcontains {len(self)} Accessions\n' f'\t{len(self.files)} files ({len(self.unassigned_files)} unassigned)') @invalidates_AID_cache def __delitem__(self, name): ''' Remove a sample by name (or by composition) ''' # First try AID = self._get_AID(name) self._db.cursor().execute(''' DELETE FROM accessions WHERE AID = ?; DELETE FROM metadata WHERE AID = ?; DELETE FROM aid_files WHERE AID = ?; ''', (AID, AID, AID)) def __getitem__(self, name): ''' Get an accession from the database the pythonic way. Paremeters ---------- name : object Can be a string, i.e. the name or alias of an Accession, it can be an Actual Accession OR the AID which is an internal ID for accession ''' AID = self._get_AID(name) cur = self._db.cursor() # Get the name based on AID name, = cur.execute('SELECT name FROM accessions WHERE AID = ?',(AID,)).fetchone() metadata = { k: v for k, v in cur.execute(''' SELECT key, val FROM metadata WHERE AID = ?; ''', (AID, ) ).fetchall() } metadata['AID'] = AID files = [x[0] for x in cur.execute(''' SELECT path FROM files WHERE AID = ?; ''', (AID, ) ).fetchall() ] return Accession(name, files=files, **metadata) def __len__(self): return self._db.cursor().execute(''' SELECT COUNT(*) FROM accessions; ''').fetchone()[0] def __iter__(self): for name in (x[0] for x in self._db.cursor().execute(''' SELECT name FROM accessions ''').fetchall()): yield self[name] def __contains__(self, item): if isinstance(item, Accession): name = item.name else: name = item try: self._get_AID(name) except NameError: return False else: return True #------------------------------------------------------# # Internal Methods # #------------------------------------------------------# def _initialize_tables(self): cur = self._db.cursor() cur.execute(''' CREATE TABLE IF NOT EXISTS accessions ( AID INTEGER PRIMARY KEY AUTOINCREMENT, name NOT NULL UNIQUE ); ''') cur.execute(''' CREATE TABLE IF NOT EXISTS aliases ( alias TEXT UNIQUE, AID INTEGER, FOREIGN KEY(AID) REFERENCES accessions(AID) ); ''') cur.execute(''' CREATE TABLE IF NOT EXISTS metadata ( AID NOT NULL, key TEXT NOL NULL, val TEXT NOT NULL, FOREIGN KEY(AID) REFERENCES accessions(AID) UNIQUE(AID, key, val) ); ''') cur.execute(''' CREATE TABLE IF NOT EXISTS raw_files ( FID INTEGER PRIMARY KEY, path TEXT NOT NULL UNIQUE, ignore INT DEFAULT 0, md5 TEXT DEFAULT NULL, added DATE DEFAULT (datetime('now','localtime')), is_symlink INT DEFAULT 0 ); ''') cur.execute(''' CREATE TABLE IF NOT EXISTS aid_files ( AID INTEGER, FID INTEGER, PRIMARY KEY(AID,FID) FOREIGN KEY(AID) REFERENCES accessions(AID), FOREIGN KEY(FID) REFERENCES raw_files(FID) ); ''') cur.execute(''' CREATE VIEW IF NOT EXISTS files AS SELECT AID,path FROM aid_files JOIN raw_files ON aid_files.FID = raw_files.FID; ''') cur.execute(''' CREATE TRIGGER IF NOT EXISTS assign_FID INSTEAD OF INSERT ON files FOR EACH ROW BEGIN INSERT OR IGNORE INTO raw_files (path) VALUES (NEW.path); INSERT INTO aid_files (AID,FID) SELECT NEW.AID, FID FROM raw_files WHERE path=NEW.path; END; ''') @lru_cache(maxsize=32768) def _get_AID(self, name): ''' Return a Sample ID (AID) ''' if isinstance(name, Accession): name = name.name cur = self._db.cursor() try: return cur.execute( 'SELECT AID FROM accessions WHERE name = ?', (name, ) ).fetchone()[0] except TypeError: pass try: return cur.execute( 'SELECT AID FROM aliases WHERE alias = ?', (name, ) ).fetchone()[0] except TypeError: pass try: return cur.execute( 'SELECT AID FROM accessions WHERE AID = ?', (name,) ).fetchone()[0] except TypeError: raise NameError(f'{name} not in Cohort') #------------------------------------------------------# # Class Methods # #------------------------------------------------------#
[docs] @classmethod def from_yaml(cls, name, yaml_file): #pragma: no cover ''' Create a Cohort from a YAML file. Note: this yaml file must be created from Parameters ---------- name : str The name of the Cohort yaml_file : pathlike The path to the YAML file that contains the Accessions Returns ------- A Cohort object ''' import yaml self = cls(name) accessions = yaml.load(open(yaml_file, 'r')) self.add_accessions(accessions) return self
[docs] @classmethod def from_accessions(cls, name, accessions): ''' Create a Cohort from an iterable of Accessions. Parameters ---------- name : str The name of the Cohort accessions : iterable of Accessions The accessions that will be frozen in the cohort under the given name Returns ------- A Cohort object ''' self = cls(name) self.add_accessions(accessions) return self