Top

data_sources.odbc_data module

module that performs a sql query on an odbc database and forms the result into a data table

# Copyright 2020 James P Goodwin data table package to manage sparse columnar data
""" module that performs a sql query on an odbc database and forms the result into a data table """
import locale
locale.setlocale(locale.LC_ALL,'')
import sys
import os
import glob
import gzip
import re
import pyodbc
import keyring
from datetime import datetime,timedelta
from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized


class ODBCDataTable( DataTable ):
    """ class that collects data from the response to a specific sql query on an odbc connected database and populates tables based on a field map """
    def __init__(self,refresh_minutes=1,sql_spec=None,sql_query=None,sql_map=None):
        """ Initalize the ODBCDataTable object pass in a sql_spec to connect to the database of the form odbc://user@server/driver/database:port, a sql_query to be executed, and a field map of the form [[sql_column_name, data_table_column_name],..] indicating the columns to collect from the result """
        self.sql_spec = sql_spec
        self.sql_query = sql_query
        self.sql_map = sql_map
        DataTable.__init__(self,None,
            "ODBCDataTable query:%s,database:%s,fieldmap:%s,refreshed every %d minutes"%(
            sql_query,sql_spec,sql_map,refresh_minutes),
            refresh_minutes)

        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh the table from the query """
        username,server,driver,database,port = re.match(r"odbc://([a-z_][a-z0-9_-]*\${0,1})@([^/]*)/([^/]*)/([^:]*):{0,1}(\d*){0,1}",self.sql_spec).groups()

        password = keyring.get_password(self.sql_spec, username)
        if not password:
            return

        conn = pyodbc.connect("DRIVER={%s};DATABASE=%s;UID=%s;PWD=%s;SERVER=%s;PORT=%s;"%(driver,database,username,password,server,port))
        if not conn:
            return

        result = conn.execute(self.sql_query)

        for row in result:
            for sql_column,data_column in self.sql_map:
                value = getattr(row,sql_column)
                if not self.has_column(data_column):
                    self.add_column(Column(name=data_column))
                c = self.get_column(data_column)
                if isinstance(value,datetime):
                    cc = Cell(date_type,value,format_date)
                elif isinstance(value,int):
                    cc = Cell(int_type,value,format_int)
                elif isinstance(value,float):
                    cc = Cell(float_type,value,format_float)
                elif isinstance(value,str):
                    cc = Cell(string_type,value,format_string)
                else:
                    cc = Cell(string_type,str(value),format_string)
                c.put(c.size(),cc)

        self.changed()
        DataTable.refresh(self)

Module variables

var blank_type

var date_type

var float_type

var int_type

var string_type

Classes

class ODBCDataTable

class that collects data from the response to a specific sql query on an odbc connected database and populates tables based on a field map

class ODBCDataTable( DataTable ):
    """ class that collects data from the response to a specific sql query on an odbc connected database and populates tables based on a field map """
    def __init__(self,refresh_minutes=1,sql_spec=None,sql_query=None,sql_map=None):
        """ Initalize the ODBCDataTable object pass in a sql_spec to connect to the database of the form odbc://user@server/driver/database:port, a sql_query to be executed, and a field map of the form [[sql_column_name, data_table_column_name],..] indicating the columns to collect from the result """
        self.sql_spec = sql_spec
        self.sql_query = sql_query
        self.sql_map = sql_map
        DataTable.__init__(self,None,
            "ODBCDataTable query:%s,database:%s,fieldmap:%s,refreshed every %d minutes"%(
            sql_query,sql_spec,sql_map,refresh_minutes),
            refresh_minutes)

        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh the table from the query """
        username,server,driver,database,port = re.match(r"odbc://([a-z_][a-z0-9_-]*\${0,1})@([^/]*)/([^/]*)/([^:]*):{0,1}(\d*){0,1}",self.sql_spec).groups()

        password = keyring.get_password(self.sql_spec, username)
        if not password:
            return

        conn = pyodbc.connect("DRIVER={%s};DATABASE=%s;UID=%s;PWD=%s;SERVER=%s;PORT=%s;"%(driver,database,username,password,server,port))
        if not conn:
            return

        result = conn.execute(self.sql_query)

        for row in result:
            for sql_column,data_column in self.sql_map:
                value = getattr(row,sql_column)
                if not self.has_column(data_column):
                    self.add_column(Column(name=data_column))
                c = self.get_column(data_column)
                if isinstance(value,datetime):
                    cc = Cell(date_type,value,format_date)
                elif isinstance(value,int):
                    cc = Cell(int_type,value,format_int)
                elif isinstance(value,float):
                    cc = Cell(float_type,value,format_float)
                elif isinstance(value,str):
                    cc = Cell(string_type,value,format_string)
                else:
                    cc = Cell(string_type,str(value),format_string)
                c.put(c.size(),cc)

        self.changed()
        DataTable.refresh(self)

Ancestors (in MRO)

  • ODBCDataTable
  • data_sources.data_table.DataTable
  • builtins.object

Static methods

def __init__(

self, refresh_minutes=1, sql_spec=None, sql_query=None, sql_map=None)

Initalize the ODBCDataTable object pass in a sql_spec to connect to the database of the form odbc://user@server/driver/database:port, a sql_query to be executed, and a field map of the form [[sql_column_name, data_table_column_name],..] indicating the columns to collect from the result

def __init__(self,refresh_minutes=1,sql_spec=None,sql_query=None,sql_map=None):
    """ Initalize the ODBCDataTable object pass in a sql_spec to connect to the database of the form odbc://user@server/driver/database:port, a sql_query to be executed, and a field map of the form [[sql_column_name, data_table_column_name],..] indicating the columns to collect from the result """
    self.sql_spec = sql_spec
    self.sql_query = sql_query
    self.sql_map = sql_map
    DataTable.__init__(self,None,
        "ODBCDataTable query:%s,database:%s,fieldmap:%s,refreshed every %d minutes"%(
        sql_query,sql_spec,sql_map,refresh_minutes),
        refresh_minutes)
    self.refresh()

def acquire_refresh_lock(

self)

acquire the refresh lock before reading/writing the table state

def acquire_refresh_lock(self):
    """ acquire the refresh lock before reading/writing the table state """
    self.refresh_lock.acquire()

def add_column(

self, *args, **kwargs)

@synchronized
def add_column(self,column):
    idx = len(self.columns)
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    self.columns.append(column)
    self.cnames[column.get_name()] = column
    column.set_table(self)

def changed(

self)

notify listeners that this table has been changed

def changed(self):
    """ notify listeners that this table has been changed """
    for f in self.listeners:
        f(self)

def get(

self, *args, **kwargs)

@synchronized
def get(self, row, reference ):
    return self.columns[self.map_column(reference)].get(row)

def get_bounds(

self, *args, **kwargs)

return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols

@synchronized
def get_bounds(self):
    """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """
    cols = len(self.columns)
    rows = -1
    for c in self.columns:
        size = c.size()
        if rows < 0 or size > rows:
            rows = size
    return (rows,cols)

def get_column(

self, *args, **kwargs)

@synchronized
def get_column(self, reference):
    return self.columns[self.map_column(reference)]

def get_columns(

self, *args, **kwargs)

return the list of columns

@synchronized
def get_columns(self):
    """ return the list of columns """
    return self.columns

def get_name(

self)

return the name of the table

def get_name(self):
    """ return the name of the table """
    return self.name

def get_names(

self, *args, **kwargs)

return a list of the names of the columns in order

@synchronized
def get_names(self):
    """ return a list of the names of the columns in order"""
    return [c.get_name() for c in self.columns]

def get_refresh_timestamp(

self)

get the time that the table was last refreshed

def get_refresh_timestamp( self ):
    """ get the time that the table was last refreshed """
    return self.refresh_timestamp

def has_column(

self, *args, **kwargs)

@synchronized
def has_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return reference in self.cnames
    elif type(reference) == int:
        return idx < len(self.columns)
    else:
        return False

def insert_column(

self, *args, **kwargs)

@synchronized
def insert_column(self,idx,column):
    while idx > len(self.columns):
        self.add_column(blank_column)
    if idx == len(self.columns):
        self.add_column(column)
    else:
        if not column.get_name():
            column.set_name("%s_%d"%(self.name,idx))
        self.columns.insert(idx,column)
        self.cnames[column.get_name()] = column
        column.set_table(self)
        while idx < len(self.columns):
            if column.get_name() == "%s_%d"%(self.name,idx-1):
                column.set_name("%s_%d"%(self.name,idx))
                self.cnames[column.get_name()] = column
            self.columns[idx].set_idx(idx)
            idx += 1

def listen(

self, listen_func)

register for notifications when a change event is raised on this table

def listen(self,listen_func):
    """ register for notifications when a change event is raised on this table """
    self.listeners.append(listen_func)

def map_column(

self, *args, **kwargs)

@synchronized
def map_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return self.cnames[reference].get_idx()
    elif type(reference) == int:
        return reference
    else:
        raise TypeError("wrong type in mapping")

def perform_refresh(

self)

Thread worker that sleeps and refreshes the data on a schedule

def perform_refresh( self ):
    """ Thread worker that sleeps and refreshes the data on a schedule """
    start_time = time.time()
    while not self.refresh_thread_stop:
        if time.time() - start_time >= self.refresh_minutes*60.0:
            self.refresh()
            start_time = time.time()
        time.sleep(1)

def put(

self, *args, **kwargs)

@synchronized
def put(self, row, reference, value):
    self.columns[self.map_column(reference)].put(row,value)

def refresh(

self, *args, **kwargs)

refresh the table from the query

@synchronized
def refresh( self ):
    """ refresh the table from the query """
    username,server,driver,database,port = re.match(r"odbc://([a-z_][a-z0-9_-]*\${0,1})@([^/]*)/([^/]*)/([^:]*):{0,1}(\d*){0,1}",self.sql_spec).groups()
    password = keyring.get_password(self.sql_spec, username)
    if not password:
        return
    conn = pyodbc.connect("DRIVER={%s};DATABASE=%s;UID=%s;PWD=%s;SERVER=%s;PORT=%s;"%(driver,database,username,password,server,port))
    if not conn:
        return
    result = conn.execute(self.sql_query)
    for row in result:
        for sql_column,data_column in self.sql_map:
            value = getattr(row,sql_column)
            if not self.has_column(data_column):
                self.add_column(Column(name=data_column))
            c = self.get_column(data_column)
            if isinstance(value,datetime):
                cc = Cell(date_type,value,format_date)
            elif isinstance(value,int):
                cc = Cell(int_type,value,format_int)
            elif isinstance(value,float):
                cc = Cell(float_type,value,format_float)
            elif isinstance(value,str):
                cc = Cell(string_type,value,format_string)
            else:
                cc = Cell(string_type,str(value),format_string)
            c.put(c.size(),cc)
    self.changed()
    DataTable.refresh(self)

def release_refresh_lock(

self)

release the refresh lock after reading/writing the table state

def release_refresh_lock(self):
    """ release the refresh lock after reading/writing the table state """
    self.refresh_lock.release()

def replace_column(

self, *args, **kwargs)

@synchronized
def replace_column(self,idx,column):
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    if idx == len(self.columns):
        self.columns.append(column)
    else:
        del self.cnames[self.columns[idx].get_name()]
        self.columns[idx] = column
    self.cnames[column.get_name()] = column
    column.set_table(self)

def start_refresh(

self)

Start the background refresh thread

def start_refresh( self ):
    """ Start the background refresh thread """
    self.stop_refresh()
    self.refresh_thread = threading.Thread(target=self.perform_refresh)
    self.refresh_thread.start()

def stop_refresh(

self)

Stop the background refresh thread

def stop_refresh( self ):
    """ Stop the background refresh thread """
    self.refresh_thread_stop = True
    if self.refresh_thread and self.refresh_thread.is_alive():
        self.refresh_thread.join()
    self.refresh_thread = None
    self.refresh_thread_stop = False

def unlisten(

self, listen_func)

unregister for notifications when a change event is raised on this table

def unlisten(self,listen_func):
    """ unregister for notifications when a change event is raised on this table """
    self.listeners.remove(listen_func)

Instance variables

var sql_map

var sql_query

var sql_spec