data_sources.json_data module
module that reads a JSON file from disk forms the result into a data table
# Copyright 2020 James P Goodwin data table package to manage sparse columnar data """ module that reads a JSON file from disk forms the result into a data table """ import locale locale.setlocale(locale.LC_ALL,'') import sys import os import glob import gzip import re import keyring from datetime import datetime,timedelta from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized,from_json class JSONDataTable( DataTable ): """ class that collects data from a JSON file on disk of the form written by data_sources.data_table.to_json() and updates this table with it """ def __init__(self, json_spec = None ): """ Initialize the JSONDataTable object from the file named in json_spec, refresh minutes will come from the loaded json """ self.json_spec = json_spec DataTable.__init__(self,None,"JSONDataTable",120) self.refresh() @synchronized def refresh( self ): """ refresh the table by opening the JSON file and loading it into a table """ dt = from_json(open(self.json_spec,"r")) if dt: rows,cols = dt.get_bounds() for idx in range(cols): self.replace_column(idx,dt.get_column(idx)) if dt.get_name(): self.name = dt.get_name() self.refresh_minutes = dt.refresh_minutes self.changed() DataTable.refresh(self)
Module variables
var blank_type
var date_type
var float_type
var int_type
var string_type
Classes
class JSONDataTable
class that collects data from a JSON file on disk of the form written by data_sources.data_table.to_json() and updates this table with it
class JSONDataTable( DataTable ): """ class that collects data from a JSON file on disk of the form written by data_sources.data_table.to_json() and updates this table with it """ def __init__(self, json_spec = None ): """ Initialize the JSONDataTable object from the file named in json_spec, refresh minutes will come from the loaded json """ self.json_spec = json_spec DataTable.__init__(self,None,"JSONDataTable",120) self.refresh() @synchronized def refresh( self ): """ refresh the table by opening the JSON file and loading it into a table """ dt = from_json(open(self.json_spec,"r")) if dt: rows,cols = dt.get_bounds() for idx in range(cols): self.replace_column(idx,dt.get_column(idx)) if dt.get_name(): self.name = dt.get_name() self.refresh_minutes = dt.refresh_minutes self.changed() DataTable.refresh(self)
Ancestors (in MRO)
- JSONDataTable
- data_sources.data_table.DataTable
- builtins.object
Static methods
def __init__(
self, json_spec=None)
Initialize the JSONDataTable object from the file named in json_spec, refresh minutes will come from the loaded json
def __init__(self, json_spec = None ): """ Initialize the JSONDataTable object from the file named in json_spec, refresh minutes will come from the loaded json """ self.json_spec = json_spec DataTable.__init__(self,None,"JSONDataTable",120) self.refresh()
def acquire_refresh_lock(
self)
acquire the refresh lock before reading/writing the table state
def acquire_refresh_lock(self): """ acquire the refresh lock before reading/writing the table state """ self.refresh_lock.acquire()
def add_column(
self, *args, **kwargs)
@synchronized def add_column(self,column): idx = len(self.columns) column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.append(column) self.cnames[column.get_name()] = column column.set_table(self)
def changed(
self)
notify listeners that this table has been changed
def changed(self): """ notify listeners that this table has been changed """ for f in self.listeners: f(self)
def get(
self, *args, **kwargs)
@synchronized def get(self, row, reference ): return self.columns[self.map_column(reference)].get(row)
def get_bounds(
self, *args, **kwargs)
return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols
@synchronized def get_bounds(self): """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """ cols = len(self.columns) rows = -1 for c in self.columns: size = c.size() if rows < 0 or size > rows: rows = size return (rows,cols)
def get_column(
self, *args, **kwargs)
@synchronized def get_column(self, reference): return self.columns[self.map_column(reference)]
def get_columns(
self, *args, **kwargs)
return the list of columns
@synchronized def get_columns(self): """ return the list of columns """ return self.columns
def get_name(
self)
return the name of the table
def get_name(self): """ return the name of the table """ return self.name
def get_names(
self, *args, **kwargs)
return a list of the names of the columns in order
@synchronized def get_names(self): """ return a list of the names of the columns in order""" return [c.get_name() for c in self.columns]
def get_refresh_timestamp(
self)
get the time that the table was last refreshed
def get_refresh_timestamp( self ): """ get the time that the table was last refreshed """ return self.refresh_timestamp
def has_column(
self, *args, **kwargs)
@synchronized def has_column(self, reference ): if type(reference) == str or type(reference) == str: return reference in self.cnames elif type(reference) == int: return idx < len(self.columns) else: return False
def insert_column(
self, *args, **kwargs)
@synchronized def insert_column(self,idx,column): while idx > len(self.columns): self.add_column(blank_column) if idx == len(self.columns): self.add_column(column) else: if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.insert(idx,column) self.cnames[column.get_name()] = column column.set_table(self) while idx < len(self.columns): if column.get_name() == "%s_%d"%(self.name,idx-1): column.set_name("%s_%d"%(self.name,idx)) self.cnames[column.get_name()] = column self.columns[idx].set_idx(idx) idx += 1
def listen(
self, listen_func)
register for notifications when a change event is raised on this table
def listen(self,listen_func): """ register for notifications when a change event is raised on this table """ self.listeners.append(listen_func)
def map_column(
self, *args, **kwargs)
@synchronized def map_column(self, reference ): if type(reference) == str or type(reference) == str: return self.cnames[reference].get_idx() elif type(reference) == int: return reference else: raise TypeError("wrong type in mapping")
def perform_refresh(
self)
Thread worker that sleeps and refreshes the data on a schedule
def perform_refresh( self ): """ Thread worker that sleeps and refreshes the data on a schedule """ start_time = time.time() while not self.refresh_thread_stop: if time.time() - start_time >= self.refresh_minutes*60.0: self.refresh() start_time = time.time() time.sleep(1)
def put(
self, *args, **kwargs)
@synchronized def put(self, row, reference, value): self.columns[self.map_column(reference)].put(row,value)
def refresh(
self, *args, **kwargs)
refresh the table by opening the JSON file and loading it into a table
@synchronized def refresh( self ): """ refresh the table by opening the JSON file and loading it into a table """ dt = from_json(open(self.json_spec,"r")) if dt: rows,cols = dt.get_bounds() for idx in range(cols): self.replace_column(idx,dt.get_column(idx)) if dt.get_name(): self.name = dt.get_name() self.refresh_minutes = dt.refresh_minutes self.changed() DataTable.refresh(self)
def release_refresh_lock(
self)
release the refresh lock after reading/writing the table state
def release_refresh_lock(self): """ release the refresh lock after reading/writing the table state """ self.refresh_lock.release()
def replace_column(
self, *args, **kwargs)
@synchronized def replace_column(self,idx,column): column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) if idx == len(self.columns): self.columns.append(column) else: del self.cnames[self.columns[idx].get_name()] self.columns[idx] = column self.cnames[column.get_name()] = column column.set_table(self)
def start_refresh(
self)
Start the background refresh thread
def start_refresh( self ): """ Start the background refresh thread """ self.stop_refresh() self.refresh_thread = threading.Thread(target=self.perform_refresh) self.refresh_thread.start()
def stop_refresh(
self)
Stop the background refresh thread
def stop_refresh( self ): """ Stop the background refresh thread """ self.refresh_thread_stop = True if self.refresh_thread and self.refresh_thread.is_alive(): self.refresh_thread.join() self.refresh_thread = None self.refresh_thread_stop = False
def unlisten(
self, listen_func)
unregister for notifications when a change event is raised on this table
def unlisten(self,listen_func): """ unregister for notifications when a change event is raised on this table """ self.listeners.remove(listen_func)
Instance variables
var json_spec