Top

data_sources.csv_data module

module that reads a csv file from disk forms the result into a data table based on a column mapping

# Copyright 2020 James P Goodwin data table package to manage sparse columnar data
""" module that reads a csv file from disk forms the result into a data table based on a column mapping """
import locale
locale.setlocale(locale.LC_ALL,'')
import sys
import os
import glob
import gzip
import re
import keyring
from datetime import datetime,timedelta
from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized,from_csv


class CSVDataTable( DataTable ):
    """ class that collects data from a CSV file on disk and extracts columns based on a column map of  the form [[CSV_column_name, DataTable_column_name,DataTable_type (one of _string,_int,_float,_date )],...] """
    def __init__(self, refresh_minutes=1, csv_spec = None, csv_map= None, csv_name= None ):
        """ Initialize the CSVDataTable object from the file named in csv_spec and extract the columns in the provided csv_map, name the table based on the name provided or extracted from the CSV """
        self.csv_spec = csv_spec
        self.csv_map = csv_map
        self.csv_name = csv_name
        DataTable.__init__(self,None,(csv_name if csv_name else None),refresh_minutes)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh the table by opening the csv file and loading it into a table """
        dt = from_csv(open(self.csv_spec,"r"),self.name,self.csv_map)
        if dt:
            rows,cols = dt.get_bounds()
            for idx in range(cols):
                self.replace_column(idx,dt.get_column(idx))

            if dt.get_name():
                self.name = dt.get_name()

            self.changed()
            DataTable.refresh(self)

Module variables

var blank_type

var date_type

var float_type

var int_type

var string_type

Classes

class CSVDataTable

class that collects data from a CSV file on disk and extracts columns based on a column map of the form [[CSV_column_name, DataTable_column_name,DataTable_type (one of _string,_int,_float,_date )],...]

class CSVDataTable( DataTable ):
    """ class that collects data from a CSV file on disk and extracts columns based on a column map of  the form [[CSV_column_name, DataTable_column_name,DataTable_type (one of _string,_int,_float,_date )],...] """
    def __init__(self, refresh_minutes=1, csv_spec = None, csv_map= None, csv_name= None ):
        """ Initialize the CSVDataTable object from the file named in csv_spec and extract the columns in the provided csv_map, name the table based on the name provided or extracted from the CSV """
        self.csv_spec = csv_spec
        self.csv_map = csv_map
        self.csv_name = csv_name
        DataTable.__init__(self,None,(csv_name if csv_name else None),refresh_minutes)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh the table by opening the csv file and loading it into a table """
        dt = from_csv(open(self.csv_spec,"r"),self.name,self.csv_map)
        if dt:
            rows,cols = dt.get_bounds()
            for idx in range(cols):
                self.replace_column(idx,dt.get_column(idx))

            if dt.get_name():
                self.name = dt.get_name()

            self.changed()
            DataTable.refresh(self)

Ancestors (in MRO)

  • CSVDataTable
  • data_sources.data_table.DataTable
  • builtins.object

Static methods

def __init__(

self, refresh_minutes=1, csv_spec=None, csv_map=None, csv_name=None)

Initialize the CSVDataTable object from the file named in csv_spec and extract the columns in the provided csv_map, name the table based on the name provided or extracted from the CSV

def __init__(self, refresh_minutes=1, csv_spec = None, csv_map= None, csv_name= None ):
    """ Initialize the CSVDataTable object from the file named in csv_spec and extract the columns in the provided csv_map, name the table based on the name provided or extracted from the CSV """
    self.csv_spec = csv_spec
    self.csv_map = csv_map
    self.csv_name = csv_name
    DataTable.__init__(self,None,(csv_name if csv_name else None),refresh_minutes)
    self.refresh()

def acquire_refresh_lock(

self)

acquire the refresh lock before reading/writing the table state

def acquire_refresh_lock(self):
    """ acquire the refresh lock before reading/writing the table state """
    self.refresh_lock.acquire()

def add_column(

self, *args, **kwargs)

@synchronized
def add_column(self,column):
    idx = len(self.columns)
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    self.columns.append(column)
    self.cnames[column.get_name()] = column
    column.set_table(self)

def changed(

self)

notify listeners that this table has been changed

def changed(self):
    """ notify listeners that this table has been changed """
    for f in self.listeners:
        f(self)

def get(

self, *args, **kwargs)

@synchronized
def get(self, row, reference ):
    return self.columns[self.map_column(reference)].get(row)

def get_bounds(

self, *args, **kwargs)

return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols

@synchronized
def get_bounds(self):
    """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """
    cols = len(self.columns)
    rows = -1
    for c in self.columns:
        size = c.size()
        if rows < 0 or size > rows:
            rows = size
    return (rows,cols)

def get_column(

self, *args, **kwargs)

@synchronized
def get_column(self, reference):
    return self.columns[self.map_column(reference)]

def get_columns(

self, *args, **kwargs)

return the list of columns

@synchronized
def get_columns(self):
    """ return the list of columns """
    return self.columns

def get_name(

self)

return the name of the table

def get_name(self):
    """ return the name of the table """
    return self.name

def get_names(

self, *args, **kwargs)

return a list of the names of the columns in order

@synchronized
def get_names(self):
    """ return a list of the names of the columns in order"""
    return [c.get_name() for c in self.columns]

def get_refresh_timestamp(

self)

get the time that the table was last refreshed

def get_refresh_timestamp( self ):
    """ get the time that the table was last refreshed """
    return self.refresh_timestamp

def has_column(

self, *args, **kwargs)

@synchronized
def has_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return reference in self.cnames
    elif type(reference) == int:
        return idx < len(self.columns)
    else:
        return False

def insert_column(

self, *args, **kwargs)

@synchronized
def insert_column(self,idx,column):
    while idx > len(self.columns):
        self.add_column(blank_column)
    if idx == len(self.columns):
        self.add_column(column)
    else:
        if not column.get_name():
            column.set_name("%s_%d"%(self.name,idx))
        self.columns.insert(idx,column)
        self.cnames[column.get_name()] = column
        column.set_table(self)
        while idx < len(self.columns):
            if column.get_name() == "%s_%d"%(self.name,idx-1):
                column.set_name("%s_%d"%(self.name,idx))
                self.cnames[column.get_name()] = column
            self.columns[idx].set_idx(idx)
            idx += 1

def listen(

self, listen_func)

register for notifications when a change event is raised on this table

def listen(self,listen_func):
    """ register for notifications when a change event is raised on this table """
    self.listeners.append(listen_func)

def map_column(

self, *args, **kwargs)

@synchronized
def map_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return self.cnames[reference].get_idx()
    elif type(reference) == int:
        return reference
    else:
        raise TypeError("wrong type in mapping")

def perform_refresh(

self)

Thread worker that sleeps and refreshes the data on a schedule

def perform_refresh( self ):
    """ Thread worker that sleeps and refreshes the data on a schedule """
    start_time = time.time()
    while not self.refresh_thread_stop:
        if time.time() - start_time >= self.refresh_minutes*60.0:
            self.refresh()
            start_time = time.time()
        time.sleep(1)

def put(

self, *args, **kwargs)

@synchronized
def put(self, row, reference, value):
    self.columns[self.map_column(reference)].put(row,value)

def refresh(

self, *args, **kwargs)

refresh the table by opening the csv file and loading it into a table

@synchronized
def refresh( self ):
    """ refresh the table by opening the csv file and loading it into a table """
    dt = from_csv(open(self.csv_spec,"r"),self.name,self.csv_map)
    if dt:
        rows,cols = dt.get_bounds()
        for idx in range(cols):
            self.replace_column(idx,dt.get_column(idx))
        if dt.get_name():
            self.name = dt.get_name()
        self.changed()
        DataTable.refresh(self)

def release_refresh_lock(

self)

release the refresh lock after reading/writing the table state

def release_refresh_lock(self):
    """ release the refresh lock after reading/writing the table state """
    self.refresh_lock.release()

def replace_column(

self, *args, **kwargs)

@synchronized
def replace_column(self,idx,column):
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    if idx == len(self.columns):
        self.columns.append(column)
    else:
        del self.cnames[self.columns[idx].get_name()]
        self.columns[idx] = column
    self.cnames[column.get_name()] = column
    column.set_table(self)

def start_refresh(

self)

Start the background refresh thread

def start_refresh( self ):
    """ Start the background refresh thread """
    self.stop_refresh()
    self.refresh_thread = threading.Thread(target=self.perform_refresh)
    self.refresh_thread.start()

def stop_refresh(

self)

Stop the background refresh thread

def stop_refresh( self ):
    """ Stop the background refresh thread """
    self.refresh_thread_stop = True
    if self.refresh_thread and self.refresh_thread.is_alive():
        self.refresh_thread.join()
    self.refresh_thread = None
    self.refresh_thread_stop = False

def unlisten(

self, listen_func)

unregister for notifications when a change event is raised on this table

def unlisten(self,listen_func):
    """ unregister for notifications when a change event is raised on this table """
    self.listeners.remove(listen_func)

Instance variables

var csv_map

var csv_name

var csv_spec