Source code for minus80.Freezable

#!/usr/bin/env python3
import tempfile
import re
# Suppress the warning until the next wersion
import bcolz as bcz

import apsw as lite
import os as os
import numpy as np
import pandas as pd

from .Config import cf
from contextlib import contextmanager

__all__ = ['Freezable']

class sqlite_dict(object):
    def __init__(self,con):
        self._con = con
        con.cursor().execute('''
            CREATE TABLE IF NOT EXISTS globals (
                key TEXT,
                val TEXT,
                type TEXT
            );
            CREATE UNIQUE INDEX IF NOT EXISTS uniqkey ON globals(key)
        ''')


    def __call__(self,key,val=None):
        try:
            if val is not None:
                val_type = guess_type(val)
                if val_type not in ('int', 'float', 'str'):
                    raise TypeError(
                        f'val must be in [int, float, str], not {val_type}'
                    )
                self._con.cursor().execute(
                    '''
                    INSERT OR REPLACE INTO globals
                    (key, val, type)VALUES (?, ?, ?)''', (key, val, val_type)
                )
            else:
                (valtype, value) = self._con.cursor().execute(
                    '''SELECT type, val FROM globals WHERE key = ?''', (key, )
                ).fetchone()
                if valtype == 'int':
                    return int(value)
                elif valtype == 'float':
                    return float(value)
                elif valtype == 'str':
                    return str(value)
        except TypeError:
            raise ValueError('{} not in database'.format(key))

    def __contains__(self,key):
        (num,) = self._con.cursor().execute(
            'SELECT COUNT(key) FROM globals WHERE key = ?', (key,)
        ).fetchone()
        if num == 0:
            return False
        elif num == 1:
            return True

    def keys(self):
        all_keys = self._con.cursor().execute('SELECT key from globals')
        return [x for x, in all_keys ]

    def __getitem__(self,key):
        return self(key)

    def __setitem__(self,key,val):
        self(key,val=val)

    def __delitem__(self,key):
        self._con.cursor().execute(
            'DELETE FROM globals WHERE key = ?',(key,)
        )


[docs]class Freezable(object): ''' Freezable is an abstract class. Things that inherit from Freezable can be loaded and unloaded from the Minus80. A freezable object is a persistant object that lives in a known directory aimed to make expensive to build objects and databases loadable from new runtimes. The three main things that a Freezable object supplies are: * access to a sqlite database (relational records) * access to a bcolz databsase (columnar/table data) * access to a persistant key/val store * access to named temp files '''
[docs] def __init__(self, name, parent=None, basedir=None): ''' Initialize the Freezable Object. Parameters ---------- name : str The name of the frozen object. parent: Freezable object or None The parent object ''' # Set the m80 name self._m80_name = name # Set the m80 dtype self._m80_dtype = guess_type(self) # Keep track of children self._children = [] if basedir is None: basedir = cf.options.basedir # Set up our base directory if parent is None: # set as the top level basedir as specified in the config file self._basedir = os.path.join( basedir, 'databases', f'{self._m80_dtype}.{self._m80_name}' ) self._parent = None else: self._basedir = os.path.join( parent._basedir, f'{self._m80_dtype}.{self._m80_name}' ) self._parent = parent parent._add_child(self) os.makedirs(self._basedir,exist_ok=True) # Get a handle to the sql database self._db = self._sqlite() # Set up a table self._dict = sqlite_dict(self._db)
[docs] def _add_child(self,child): ''' Register a child dataset ''' self._children.append(child)
[docs] @contextmanager def _bulk_transaction(self): ''' This is a context manager that handles bulk transaction. i.e. this context will handle the BEGIN, END and appropriate ROLLBACKS. Usage: >>> with x.bulk_transaction() as cur: cur.execute('INSERT INTO table XXX VALUES YYY') ''' cur = self._db.cursor() cur.execute('PRAGMA synchronous = off') cur.execute('PRAGMA journal_mode = memory') cur.execute('SAVEPOINT bulk_transaction') try: yield cur except Exception as e: cur.execute('ROLLBACK TO SAVEPOINT bulk_transaction') raise e finally: cur.execute('RELEASE SAVEPOINT bulk_transaction')
def _query(self,q): cur = self._db.cursor().execute(q) names = [x[0] for x in cur.description] rows = cur.fetchall() result = pd.DataFrame(rows,columns=names) return result
[docs] def _get_dbpath(self, extension, create=False): ''' Get the path to database files Parameters ---------- ''' path = os.path.expanduser( os.path.join( self._basedir, f'{extension}' ) ) if create: os.makedirs(path,exist_ok=True) return path
[docs] def _sqlite(self): ''' This is the access point to the sqlite database ''' # return a connection if exists filename = os.path.join(self._get_dbpath('db.sqlite')) return lite.Connection(filename)
[docs] def _bcolz_array(self, name, array=None, m80name=None, m80type=None): ''' Routines to set/get arrays from the bcolz store ''' # Fill in the defaults if they were not provided if m80type is None: m80type = self._m80_dtype if m80name is None: m80name = self._m80_name # function is a getter if df is provided path = self._get_dbpath('bcz', create=True) if array is None: # GETTER arr = bcz.open(os.path.join(path, name)) return arr else: # SETTER bcz.carray(array, mode='w', rootdir=os.path.join(path, name))
[docs] def _bcolz(self, tblname, df=None, m80name=None, m80type=None, blaze=False): ''' This is the access point to the bcolz database ''' try: import blaze as blz except FutureWarning: # pragma: no cover pass import warnings # from flask.exthook import ExtDeprecationWarning # warnings.simplefilter('ignore', ExtDeprecationWarning) warnings.simplefilter('ignore', FutureWarning) # Fill in the defaults if they were not provided if m80type is None: m80type = self._m80_dtype if m80name is None: m80name = self._m80_name path = self._get_dbpath('bcz',create=True) # function is a getter if df is provided if df is None: # return the dataframe if it exists try: df = bcz.open(os.path.join(path, tblname)) except IOError: raise IOError( f'could not open database for {m80type}:{m80name} ' ) else: if len(df) == 0: df = pd.DataFrame() if blaze: df = blz.data(df) else: if blaze: df = blz.data(df) else: df = df.todataframe() if not blaze and f'{tblname}_index' in self._dict: df.set_index( self._dict[f'{tblname}_index'], inplace=True) return df # If df is set, then store the table else: df = df.copy() if df.index.name is not None: # We need to remember to index self._dict[tblname+'_index'] = df.index.name df.reset_index(inplace=True) path = os.path.join(path, tblname) if df.empty: bcz.fromiter( (), dtype=np.int32, mode='w', count=0, rootdir=path ) else: bcz.ctable.fromdataframe(df, mode='w', rootdir=path) return
@staticmethod def _tmpfile(*args, **kwargs): # returns a handle to a tmp file return tempfile.NamedTemporaryFile( 'w', dir=os.path.expanduser( os.path.join( # use the top level basedir cf.options.basedir, "tmp" ) ), **kwargs )
def guess_type(object): ''' Guess the type of object from the class attribute ''' # retrieve a list of classes classes = re.match( "<class '(.+)'>", str(object.__class__) ).groups()[0].split('.') # Return the most specific one return classes[-1]