data_sources.odbc_data module
module that performs a sql query on an odbc database and forms the result into a data table
# Copyright 2020 James P Goodwin data table package to manage sparse columnar data """ module that performs a sql query on an odbc database and forms the result into a data table """ import locale locale.setlocale(locale.LC_ALL,'') import sys import os import glob import gzip import re import pyodbc import keyring from datetime import datetime,timedelta from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized class ODBCDataTable( DataTable ): """ class that collects data from the response to a specific sql query on an odbc connected database and populates tables based on a field map """ def __init__(self,refresh_minutes=1,sql_spec=None,sql_query=None,sql_map=None): """ Initalize the ODBCDataTable object pass in a sql_spec to connect to the database of the form odbc://user@server/driver/database:port, a sql_query to be executed, and a field map of the form [[sql_column_name, data_table_column_name],..] indicating the columns to collect from the result """ self.sql_spec = sql_spec self.sql_query = sql_query self.sql_map = sql_map DataTable.__init__(self,None, "ODBCDataTable query:%s,database:%s,fieldmap:%s,refreshed every %d minutes"%( sql_query,sql_spec,sql_map,refresh_minutes), refresh_minutes) self.refresh() @synchronized def refresh( self ): """ refresh the table from the query """ username,server,driver,database,port = re.match(r"odbc://([a-z_][a-z0-9_-]*\${0,1})@([^/]*)/([^/]*)/([^:]*):{0,1}(\d*){0,1}",self.sql_spec).groups() password = keyring.get_password(self.sql_spec, username) if not password: return conn = pyodbc.connect("DRIVER={%s};DATABASE=%s;UID=%s;PWD=%s;SERVER=%s;PORT=%s;"%(driver,database,username,password,server,port)) if not conn: return result = conn.execute(self.sql_query) for row in result: for sql_column,data_column in self.sql_map: value = getattr(row,sql_column) if not self.has_column(data_column): self.add_column(Column(name=data_column)) c = self.get_column(data_column) if isinstance(value,datetime): cc = Cell(date_type,value,format_date) elif isinstance(value,int): cc = Cell(int_type,value,format_int) elif isinstance(value,float): cc = Cell(float_type,value,format_float) elif isinstance(value,str): cc = Cell(string_type,value,format_string) else: cc = Cell(string_type,str(value),format_string) c.put(c.size(),cc) self.changed() DataTable.refresh(self)
Module variables
var blank_type
var date_type
var float_type
var int_type
var string_type
Classes
class ODBCDataTable
class that collects data from the response to a specific sql query on an odbc connected database and populates tables based on a field map
class ODBCDataTable( DataTable ): """ class that collects data from the response to a specific sql query on an odbc connected database and populates tables based on a field map """ def __init__(self,refresh_minutes=1,sql_spec=None,sql_query=None,sql_map=None): """ Initalize the ODBCDataTable object pass in a sql_spec to connect to the database of the form odbc://user@server/driver/database:port, a sql_query to be executed, and a field map of the form [[sql_column_name, data_table_column_name],..] indicating the columns to collect from the result """ self.sql_spec = sql_spec self.sql_query = sql_query self.sql_map = sql_map DataTable.__init__(self,None, "ODBCDataTable query:%s,database:%s,fieldmap:%s,refreshed every %d minutes"%( sql_query,sql_spec,sql_map,refresh_minutes), refresh_minutes) self.refresh() @synchronized def refresh( self ): """ refresh the table from the query """ username,server,driver,database,port = re.match(r"odbc://([a-z_][a-z0-9_-]*\${0,1})@([^/]*)/([^/]*)/([^:]*):{0,1}(\d*){0,1}",self.sql_spec).groups() password = keyring.get_password(self.sql_spec, username) if not password: return conn = pyodbc.connect("DRIVER={%s};DATABASE=%s;UID=%s;PWD=%s;SERVER=%s;PORT=%s;"%(driver,database,username,password,server,port)) if not conn: return result = conn.execute(self.sql_query) for row in result: for sql_column,data_column in self.sql_map: value = getattr(row,sql_column) if not self.has_column(data_column): self.add_column(Column(name=data_column)) c = self.get_column(data_column) if isinstance(value,datetime): cc = Cell(date_type,value,format_date) elif isinstance(value,int): cc = Cell(int_type,value,format_int) elif isinstance(value,float): cc = Cell(float_type,value,format_float) elif isinstance(value,str): cc = Cell(string_type,value,format_string) else: cc = Cell(string_type,str(value),format_string) c.put(c.size(),cc) self.changed() DataTable.refresh(self)
Ancestors (in MRO)
- ODBCDataTable
- data_sources.data_table.DataTable
- builtins.object
Static methods
def __init__(
self, refresh_minutes=1, sql_spec=None, sql_query=None, sql_map=None)
Initalize the ODBCDataTable object pass in a sql_spec to connect to the database of the form odbc://user@server/driver/database:port, a sql_query to be executed, and a field map of the form [[sql_column_name, data_table_column_name],..] indicating the columns to collect from the result
def __init__(self,refresh_minutes=1,sql_spec=None,sql_query=None,sql_map=None): """ Initalize the ODBCDataTable object pass in a sql_spec to connect to the database of the form odbc://user@server/driver/database:port, a sql_query to be executed, and a field map of the form [[sql_column_name, data_table_column_name],..] indicating the columns to collect from the result """ self.sql_spec = sql_spec self.sql_query = sql_query self.sql_map = sql_map DataTable.__init__(self,None, "ODBCDataTable query:%s,database:%s,fieldmap:%s,refreshed every %d minutes"%( sql_query,sql_spec,sql_map,refresh_minutes), refresh_minutes) self.refresh()
def acquire_refresh_lock(
self)
acquire the refresh lock before reading/writing the table state
def acquire_refresh_lock(self): """ acquire the refresh lock before reading/writing the table state """ self.refresh_lock.acquire()
def add_column(
self, *args, **kwargs)
@synchronized def add_column(self,column): idx = len(self.columns) column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.append(column) self.cnames[column.get_name()] = column column.set_table(self)
def changed(
self)
notify listeners that this table has been changed
def changed(self): """ notify listeners that this table has been changed """ for f in self.listeners: f(self)
def get(
self, *args, **kwargs)
@synchronized def get(self, row, reference ): return self.columns[self.map_column(reference)].get(row)
def get_bounds(
self, *args, **kwargs)
return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols
@synchronized def get_bounds(self): """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """ cols = len(self.columns) rows = -1 for c in self.columns: size = c.size() if rows < 0 or size > rows: rows = size return (rows,cols)
def get_column(
self, *args, **kwargs)
@synchronized def get_column(self, reference): return self.columns[self.map_column(reference)]
def get_columns(
self, *args, **kwargs)
return the list of columns
@synchronized def get_columns(self): """ return the list of columns """ return self.columns
def get_name(
self)
return the name of the table
def get_name(self): """ return the name of the table """ return self.name
def get_names(
self, *args, **kwargs)
return a list of the names of the columns in order
@synchronized def get_names(self): """ return a list of the names of the columns in order""" return [c.get_name() for c in self.columns]
def get_refresh_timestamp(
self)
get the time that the table was last refreshed
def get_refresh_timestamp( self ): """ get the time that the table was last refreshed """ return self.refresh_timestamp
def has_column(
self, *args, **kwargs)
@synchronized def has_column(self, reference ): if type(reference) == str or type(reference) == str: return reference in self.cnames elif type(reference) == int: return idx < len(self.columns) else: return False
def insert_column(
self, *args, **kwargs)
@synchronized def insert_column(self,idx,column): while idx > len(self.columns): self.add_column(blank_column) if idx == len(self.columns): self.add_column(column) else: if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.insert(idx,column) self.cnames[column.get_name()] = column column.set_table(self) while idx < len(self.columns): if column.get_name() == "%s_%d"%(self.name,idx-1): column.set_name("%s_%d"%(self.name,idx)) self.cnames[column.get_name()] = column self.columns[idx].set_idx(idx) idx += 1
def listen(
self, listen_func)
register for notifications when a change event is raised on this table
def listen(self,listen_func): """ register for notifications when a change event is raised on this table """ self.listeners.append(listen_func)
def map_column(
self, *args, **kwargs)
@synchronized def map_column(self, reference ): if type(reference) == str or type(reference) == str: return self.cnames[reference].get_idx() elif type(reference) == int: return reference else: raise TypeError("wrong type in mapping")
def perform_refresh(
self)
Thread worker that sleeps and refreshes the data on a schedule
def perform_refresh( self ): """ Thread worker that sleeps and refreshes the data on a schedule """ start_time = time.time() while not self.refresh_thread_stop: if time.time() - start_time >= self.refresh_minutes*60.0: self.refresh() start_time = time.time() time.sleep(1)
def put(
self, *args, **kwargs)
@synchronized def put(self, row, reference, value): self.columns[self.map_column(reference)].put(row,value)
def refresh(
self, *args, **kwargs)
refresh the table from the query
@synchronized def refresh( self ): """ refresh the table from the query """ username,server,driver,database,port = re.match(r"odbc://([a-z_][a-z0-9_-]*\${0,1})@([^/]*)/([^/]*)/([^:]*):{0,1}(\d*){0,1}",self.sql_spec).groups() password = keyring.get_password(self.sql_spec, username) if not password: return conn = pyodbc.connect("DRIVER={%s};DATABASE=%s;UID=%s;PWD=%s;SERVER=%s;PORT=%s;"%(driver,database,username,password,server,port)) if not conn: return result = conn.execute(self.sql_query) for row in result: for sql_column,data_column in self.sql_map: value = getattr(row,sql_column) if not self.has_column(data_column): self.add_column(Column(name=data_column)) c = self.get_column(data_column) if isinstance(value,datetime): cc = Cell(date_type,value,format_date) elif isinstance(value,int): cc = Cell(int_type,value,format_int) elif isinstance(value,float): cc = Cell(float_type,value,format_float) elif isinstance(value,str): cc = Cell(string_type,value,format_string) else: cc = Cell(string_type,str(value),format_string) c.put(c.size(),cc) self.changed() DataTable.refresh(self)
def release_refresh_lock(
self)
release the refresh lock after reading/writing the table state
def release_refresh_lock(self): """ release the refresh lock after reading/writing the table state """ self.refresh_lock.release()
def replace_column(
self, *args, **kwargs)
@synchronized def replace_column(self,idx,column): column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) if idx == len(self.columns): self.columns.append(column) else: del self.cnames[self.columns[idx].get_name()] self.columns[idx] = column self.cnames[column.get_name()] = column column.set_table(self)
def start_refresh(
self)
Start the background refresh thread
def start_refresh( self ): """ Start the background refresh thread """ self.stop_refresh() self.refresh_thread = threading.Thread(target=self.perform_refresh) self.refresh_thread.start()
def stop_refresh(
self)
Stop the background refresh thread
def stop_refresh( self ): """ Stop the background refresh thread """ self.refresh_thread_stop = True if self.refresh_thread and self.refresh_thread.is_alive(): self.refresh_thread.join() self.refresh_thread = None self.refresh_thread_stop = False
def unlisten(
self, listen_func)
unregister for notifications when a change event is raised on this table
def unlisten(self,listen_func): """ unregister for notifications when a change event is raised on this table """ self.listeners.remove(listen_func)
Instance variables
var sql_map
var sql_query
var sql_spec