Top

data_sources.json_data module

module that reads a JSON file from disk forms the result into a data table

# Copyright 2020 James P Goodwin data table package to manage sparse columnar data
""" module that reads a JSON file from disk forms the result into a data table """
import locale
locale.setlocale(locale.LC_ALL,'')
import sys
import os
import glob
import gzip
import re
import keyring
from datetime import datetime,timedelta
from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized,from_json

class JSONDataTable( DataTable ):
    """ class that collects data from a JSON file on disk of the form written by data_sources.data_table.to_json() and updates this table with it """
    def __init__(self, json_spec = None ):
        """ Initialize the JSONDataTable object from the file named in json_spec, refresh minutes will come from the loaded json """
        self.json_spec = json_spec
        DataTable.__init__(self,None,"JSONDataTable",120)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh the table by opening the JSON file and loading it into a table """
        dt = from_json(open(self.json_spec,"r"))
        if dt:
            rows,cols = dt.get_bounds()
            for idx in range(cols):
                self.replace_column(idx,dt.get_column(idx))

            if dt.get_name():
                self.name = dt.get_name()

            self.refresh_minutes = dt.refresh_minutes

            self.changed()
            DataTable.refresh(self)

Module variables

var blank_type

var date_type

var float_type

var int_type

var string_type

Classes

class JSONDataTable

class that collects data from a JSON file on disk of the form written by data_sources.data_table.to_json() and updates this table with it

class JSONDataTable( DataTable ):
    """ class that collects data from a JSON file on disk of the form written by data_sources.data_table.to_json() and updates this table with it """
    def __init__(self, json_spec = None ):
        """ Initialize the JSONDataTable object from the file named in json_spec, refresh minutes will come from the loaded json """
        self.json_spec = json_spec
        DataTable.__init__(self,None,"JSONDataTable",120)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh the table by opening the JSON file and loading it into a table """
        dt = from_json(open(self.json_spec,"r"))
        if dt:
            rows,cols = dt.get_bounds()
            for idx in range(cols):
                self.replace_column(idx,dt.get_column(idx))

            if dt.get_name():
                self.name = dt.get_name()

            self.refresh_minutes = dt.refresh_minutes

            self.changed()
            DataTable.refresh(self)

Ancestors (in MRO)

  • JSONDataTable
  • data_sources.data_table.DataTable
  • builtins.object

Static methods

def __init__(

self, json_spec=None)

Initialize the JSONDataTable object from the file named in json_spec, refresh minutes will come from the loaded json

def __init__(self, json_spec = None ):
    """ Initialize the JSONDataTable object from the file named in json_spec, refresh minutes will come from the loaded json """
    self.json_spec = json_spec
    DataTable.__init__(self,None,"JSONDataTable",120)
    self.refresh()

def acquire_refresh_lock(

self)

acquire the refresh lock before reading/writing the table state

def acquire_refresh_lock(self):
    """ acquire the refresh lock before reading/writing the table state """
    self.refresh_lock.acquire()

def add_column(

self, *args, **kwargs)

@synchronized
def add_column(self,column):
    idx = len(self.columns)
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    self.columns.append(column)
    self.cnames[column.get_name()] = column
    column.set_table(self)

def changed(

self)

notify listeners that this table has been changed

def changed(self):
    """ notify listeners that this table has been changed """
    for f in self.listeners:
        f(self)

def get(

self, *args, **kwargs)

@synchronized
def get(self, row, reference ):
    return self.columns[self.map_column(reference)].get(row)

def get_bounds(

self, *args, **kwargs)

return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols

@synchronized
def get_bounds(self):
    """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """
    cols = len(self.columns)
    rows = -1
    for c in self.columns:
        size = c.size()
        if rows < 0 or size > rows:
            rows = size
    return (rows,cols)

def get_column(

self, *args, **kwargs)

@synchronized
def get_column(self, reference):
    return self.columns[self.map_column(reference)]

def get_columns(

self, *args, **kwargs)

return the list of columns

@synchronized
def get_columns(self):
    """ return the list of columns """
    return self.columns

def get_name(

self)

return the name of the table

def get_name(self):
    """ return the name of the table """
    return self.name

def get_names(

self, *args, **kwargs)

return a list of the names of the columns in order

@synchronized
def get_names(self):
    """ return a list of the names of the columns in order"""
    return [c.get_name() for c in self.columns]

def get_refresh_timestamp(

self)

get the time that the table was last refreshed

def get_refresh_timestamp( self ):
    """ get the time that the table was last refreshed """
    return self.refresh_timestamp

def has_column(

self, *args, **kwargs)

@synchronized
def has_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return reference in self.cnames
    elif type(reference) == int:
        return idx < len(self.columns)
    else:
        return False

def insert_column(

self, *args, **kwargs)

@synchronized
def insert_column(self,idx,column):
    while idx > len(self.columns):
        self.add_column(blank_column)
    if idx == len(self.columns):
        self.add_column(column)
    else:
        if not column.get_name():
            column.set_name("%s_%d"%(self.name,idx))
        self.columns.insert(idx,column)
        self.cnames[column.get_name()] = column
        column.set_table(self)
        while idx < len(self.columns):
            if column.get_name() == "%s_%d"%(self.name,idx-1):
                column.set_name("%s_%d"%(self.name,idx))
                self.cnames[column.get_name()] = column
            self.columns[idx].set_idx(idx)
            idx += 1

def listen(

self, listen_func)

register for notifications when a change event is raised on this table

def listen(self,listen_func):
    """ register for notifications when a change event is raised on this table """
    self.listeners.append(listen_func)

def map_column(

self, *args, **kwargs)

@synchronized
def map_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return self.cnames[reference].get_idx()
    elif type(reference) == int:
        return reference
    else:
        raise TypeError("wrong type in mapping")

def perform_refresh(

self)

Thread worker that sleeps and refreshes the data on a schedule

def perform_refresh( self ):
    """ Thread worker that sleeps and refreshes the data on a schedule """
    start_time = time.time()
    while not self.refresh_thread_stop:
        if time.time() - start_time >= self.refresh_minutes*60.0:
            self.refresh()
            start_time = time.time()
        time.sleep(1)

def put(

self, *args, **kwargs)

@synchronized
def put(self, row, reference, value):
    self.columns[self.map_column(reference)].put(row,value)

def refresh(

self, *args, **kwargs)

refresh the table by opening the JSON file and loading it into a table

@synchronized
def refresh( self ):
    """ refresh the table by opening the JSON file and loading it into a table """
    dt = from_json(open(self.json_spec,"r"))
    if dt:
        rows,cols = dt.get_bounds()
        for idx in range(cols):
            self.replace_column(idx,dt.get_column(idx))
        if dt.get_name():
            self.name = dt.get_name()
        self.refresh_minutes = dt.refresh_minutes
        self.changed()
        DataTable.refresh(self)

def release_refresh_lock(

self)

release the refresh lock after reading/writing the table state

def release_refresh_lock(self):
    """ release the refresh lock after reading/writing the table state """
    self.refresh_lock.release()

def replace_column(

self, *args, **kwargs)

@synchronized
def replace_column(self,idx,column):
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    if idx == len(self.columns):
        self.columns.append(column)
    else:
        del self.cnames[self.columns[idx].get_name()]
        self.columns[idx] = column
    self.cnames[column.get_name()] = column
    column.set_table(self)

def start_refresh(

self)

Start the background refresh thread

def start_refresh( self ):
    """ Start the background refresh thread """
    self.stop_refresh()
    self.refresh_thread = threading.Thread(target=self.perform_refresh)
    self.refresh_thread.start()

def stop_refresh(

self)

Stop the background refresh thread

def stop_refresh( self ):
    """ Stop the background refresh thread """
    self.refresh_thread_stop = True
    if self.refresh_thread and self.refresh_thread.is_alive():
        self.refresh_thread.join()
    self.refresh_thread = None
    self.refresh_thread_stop = False

def unlisten(

self, listen_func)

unregister for notifications when a change event is raised on this table

def unlisten(self,listen_func):
    """ unregister for notifications when a change event is raised on this table """
    self.listeners.remove(listen_func)

Instance variables

var json_spec