Top

data_sources.remote_data module

module that implements a remote data table proxy over ssh connection to the dashboard running as a table server

# Copyright 2017 James P Goodwin data table package to manage sparse columnar data
""" module that implements a remote data table proxy over ssh connection to the dashboard running as a table server  """
import sys
import os
import re
from datetime import datetime
import threading
import time
import csv
import json
from io import StringIO
from paramiko.client import SSHClient
import keyring
from functools import wraps
from data_sources.data_table import DataTable,Cell,Column,from_json,to_json,synchronized
from dashboard.version import __version__

def sync_connection(method):
    @wraps(method)
    def wrapper(self, *args, **kwargs):
        with self.connection_lock:
            return method(self, *args, **kwargs)
    return wrapper

def sync_manager(method):
    @wraps(method)
    def wrapper(self, *args, **kwargs):
        with self.manager_lock:
            return method(self, *args, **kwargs)
    return wrapper

class Connection():
    def __init__(self, owner=None, ssh_client=None, session=None, stdin=None, stdout=None, stderr=None ):
        """ provides protocol to the table server """
        self.ssh_client = ssh_client
        self.session = session
        self.clients = []
        self.stdout_lines = []
        self.stderr_lines = []
        self.owner = owner
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.connection_lock = threading.RLock()
        self.reader_lock = threading.RLock()
        self.stdout_reader_thread = threading.Thread(target=self.reader,args=(self.stdout,self.stdout_lines))
        self.stdout_reader_thread.start()
        self.stderr_reader_thread = threading.Thread(target=self.reader,args=(self.stderr,self.stderr_lines))
        self.stderr_reader_thread.start()

    def reader( self, stream, lines ):
        """ worker thread that reads from stdout and pushes data onto stdout_lines and stderr_lines """
        while not self.session.exit_status_ready():
            line = stream.readline()
            with self.reader_lock:
                lines.append(line)

        with self.reader_lock:
            lines += stream.readlines()

    @sync_connection
    def get_stdout_line( self ):
        """ fetch a line from the queue of stdout_lines """
        while True:
            with self.reader_lock:
                if len(self.stdout_lines):
                    return self.stdout_lines.pop(0)
            time.sleep(1)

    @sync_connection
    def get_stderr_line( self ):
        """ fetch a line from the queue of stderr_lines """
        while True:
            with self.reader_lock:
                if len(self.stderr_lines):
                    return self.stderr_lines.pop(0)
            time.sleep(1)

    @sync_connection
    def open( self, client ):
        """ register this client as a user of this connection """
        if client not in self.clients:
            self.clients.append(client)

    @sync_connection
    def table(self, table_def):
        """ send request to create a new remote table, returns loaded response """
        print("table:%s"%table_def,file=self.stdin,flush=True)
        return self.get_stdout_line()

    @sync_connection
    def refresh(self, table_name):
        """ send request to refresh a remote table named table_name and return response """
        print("refresh:%s"%table_name,file=self.stdin,flush=True)
        return self.get_stdout_line()

    @sync_connection
    def get(self, table_name):
        """ send request to fetch table_name and return response """
        print("get:%s"%table_name,file=self.stdin,flush=True)
        return self.get_stdout_line()

    @sync_connection
    def exit(self):
        """ terminate the server and clean up this connection """
        print("exit",file=self.stdin,flush=True)
        return ""

    @sync_connection
    def close(self,client):
        """ close this client's use of this connection """
        if client in self.clients:
            self.clients.remove(client)

class ConnectionManager():
    def __init__(self):
        """ manages connections to servers and their initial setup """
        self.connections = {}
        self.manager_lock = threading.RLock()

    def __del__(self):
        """ just in case clean up any connections """
        self.shutdown()

    @sync_manager
    def shutdown( self ):
        """ shut down the connection manager and close all the pooled connections """
        for cn in self.connections:
            self.connections[cn].exit()
            self.connections[cn].ssh_client.close()
        self.connections = {}

    @sync_manager
    def connect(self,ssh_spec,client):
        """ create a connection to a server """
        if ssh_spec in self.connections:
            connection = self.connections[ssh_spec]
            connection.open(client)
            return connection

        username,server,port = re.match(r"ssh://([a-z_][a-z0-9_-]*\${0,1})@([^:]*):{0,1}(\d*){0,1}",ssh_spec).groups()

        password = keyring.get_password(server,username)
        if not password:
            return None

        ssh_client = SSHClient()
        ssh_client.load_system_host_keys()
        local_keys = os.path.expanduser('~/.ssh/known_hosts')
        if os.path.exists(local_keys):
            try:
                ssh_client.load_host_keys(local_keys)
            except:
                pass
        ssh_client.connect( hostname=server, port=int(port if port else 22), username=username, password=password)

        if self.setup( ssh_client ):
            session,stdin,stdout,stderror = self.start_command(ssh_client,"~/.local/bin/dashboard --server")
            connection = Connection(self,ssh_client,session,stdin,stdout,stderror)
            connection.open(client)
            self.connections[ssh_spec] = connection
            return connection

        raise Exception("Setup of remote dashboard failed")

    @sync_manager
    def start_command(self, ssh_client, command ):
        """ start a command and return a tuple with (channel,stdin,stdout,stderr) for running process """
        transport = ssh_client.get_transport()
        session = transport.open_session()
        session.exec_command(command)
        stdout = session.makefile("r",1)
        stderr = session.makefile_stderr("r",1)
        stdin = session.makefile_stdin("w",1)
        return (session,stdin,stdout,stderr)

    @sync_manager
    def run_command(self, ssh_client, command ):
        """ run a command wait for it to exit and return the output (retcode,stdout_str,stderr_str) """
        session,stdin,stdout,stderr = self.start_command(ssh_client,command)

        stderr_output = StringIO()
        stdout_output = StringIO()
        while not session.exit_status_ready():
            stdout_output.write(stdout.readline())
            stderr_output.write(stderr.readline())
        stdout_output.write("".join(stdout.readlines()))
        stderr_output.write("".join(stderr.readlines()))
        exit_status = session.recv_exit_status()
        return (exit_status,stdout_output.getvalue(),stderr_output.getvalue())

    @sync_manager
    def setup(self, ssh_client ):
        """ check to see that dashboard is installed and install it if needed """
        exit_status,stdout_str,stderr_str = self.run_command(ssh_client,"~/.local/bin/dashboard --version")
        stdout_str = stdout_str.strip()
        if stdout_str.startswith("dashboard version"):
            if stdout_str.endswith(__version__):
                return True

        exit_status,stdout_str,stderr_str = self.run_command(ssh_client,'python3 -m pip install --upgrade "terminal-dashboard==%s"'%(__version__))
        if exit_status:
            raise Exception(exit_status,stdout_str,stderr_str)

        return True


_connection_manager = None
def get_connection_manager():
    """ return the connection manager create one if it doesn't exit """
    global _connection_manager
    if not _connection_manager:
        _connection_manager = ConnectionManager()
    return _connection_manager

def shutdown_connection_manager():
    """ shut down the connection manager if it was ever started """
    global _connection_manager
    if _connection_manager:
        _connection_manager.shutdown()
        _connection_manager = None

class RemoteDataTable( DataTable ):
    def __init__(self,ssh_spec=None,table_def=None,name=None,refresh_minutes=1):
        """ accepts an ssh_spec to connect to of the form ssh://username@server_name:port_number, a json string with the definition for the remote table, the local name for this table, and the number of minutes for refresh """
        DataTable.__init__(self,None,name,refresh_minutes)
        self.ssh_spec = ssh_spec
        self.table_def = table_def
        self.connection = None
        self.refresh()

    @synchronized
    def refresh(self):
        """ create a connection to the remote dashboard table server and refresh our internal state """

        if not self.connection:
            cm = get_connection_manager()
            connection = cm.connect(self.ssh_spec,self)
            if not connection:
                return
            self.connection = connection
            response = self.connection.table(json.dumps(self.table_def))
            if not response.startswith("loaded:%s"%self.table_def["name"]):
                return

        table_data = self.connection.get(self.table_def["name"])
        name,json_blob = table_data.split(":",1)
        dt = from_json(StringIO(json_blob))

        rows,cols = dt.get_bounds()
        for idx in range(cols):
            self.replace_column(idx,dt.get_column(idx))

        self.changed()
        DataTable.refresh(self)

Functions

def get_connection_manager(

)

return the connection manager create one if it doesn't exit

def get_connection_manager():
    """ return the connection manager create one if it doesn't exit """
    global _connection_manager
    if not _connection_manager:
        _connection_manager = ConnectionManager()
    return _connection_manager

def shutdown_connection_manager(

)

shut down the connection manager if it was ever started

def shutdown_connection_manager():
    """ shut down the connection manager if it was ever started """
    global _connection_manager
    if _connection_manager:
        _connection_manager.shutdown()
        _connection_manager = None

def sync_connection(

method)

def sync_connection(method):
    @wraps(method)
    def wrapper(self, *args, **kwargs):
        with self.connection_lock:
            return method(self, *args, **kwargs)
    return wrapper

def sync_manager(

method)

def sync_manager(method):
    @wraps(method)
    def wrapper(self, *args, **kwargs):
        with self.manager_lock:
            return method(self, *args, **kwargs)
    return wrapper

Classes

class Connection

class Connection():
    def __init__(self, owner=None, ssh_client=None, session=None, stdin=None, stdout=None, stderr=None ):
        """ provides protocol to the table server """
        self.ssh_client = ssh_client
        self.session = session
        self.clients = []
        self.stdout_lines = []
        self.stderr_lines = []
        self.owner = owner
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.connection_lock = threading.RLock()
        self.reader_lock = threading.RLock()
        self.stdout_reader_thread = threading.Thread(target=self.reader,args=(self.stdout,self.stdout_lines))
        self.stdout_reader_thread.start()
        self.stderr_reader_thread = threading.Thread(target=self.reader,args=(self.stderr,self.stderr_lines))
        self.stderr_reader_thread.start()

    def reader( self, stream, lines ):
        """ worker thread that reads from stdout and pushes data onto stdout_lines and stderr_lines """
        while not self.session.exit_status_ready():
            line = stream.readline()
            with self.reader_lock:
                lines.append(line)

        with self.reader_lock:
            lines += stream.readlines()

    @sync_connection
    def get_stdout_line( self ):
        """ fetch a line from the queue of stdout_lines """
        while True:
            with self.reader_lock:
                if len(self.stdout_lines):
                    return self.stdout_lines.pop(0)
            time.sleep(1)

    @sync_connection
    def get_stderr_line( self ):
        """ fetch a line from the queue of stderr_lines """
        while True:
            with self.reader_lock:
                if len(self.stderr_lines):
                    return self.stderr_lines.pop(0)
            time.sleep(1)

    @sync_connection
    def open( self, client ):
        """ register this client as a user of this connection """
        if client not in self.clients:
            self.clients.append(client)

    @sync_connection
    def table(self, table_def):
        """ send request to create a new remote table, returns loaded response """
        print("table:%s"%table_def,file=self.stdin,flush=True)
        return self.get_stdout_line()

    @sync_connection
    def refresh(self, table_name):
        """ send request to refresh a remote table named table_name and return response """
        print("refresh:%s"%table_name,file=self.stdin,flush=True)
        return self.get_stdout_line()

    @sync_connection
    def get(self, table_name):
        """ send request to fetch table_name and return response """
        print("get:%s"%table_name,file=self.stdin,flush=True)
        return self.get_stdout_line()

    @sync_connection
    def exit(self):
        """ terminate the server and clean up this connection """
        print("exit",file=self.stdin,flush=True)
        return ""

    @sync_connection
    def close(self,client):
        """ close this client's use of this connection """
        if client in self.clients:
            self.clients.remove(client)

Ancestors (in MRO)

Static methods

def __init__(

self, owner=None, ssh_client=None, session=None, stdin=None, stdout=None, stderr=None)

provides protocol to the table server

def __init__(self, owner=None, ssh_client=None, session=None, stdin=None, stdout=None, stderr=None ):
    """ provides protocol to the table server """
    self.ssh_client = ssh_client
    self.session = session
    self.clients = []
    self.stdout_lines = []
    self.stderr_lines = []
    self.owner = owner
    self.stdin = stdin
    self.stdout = stdout
    self.stderr = stderr
    self.connection_lock = threading.RLock()
    self.reader_lock = threading.RLock()
    self.stdout_reader_thread = threading.Thread(target=self.reader,args=(self.stdout,self.stdout_lines))
    self.stdout_reader_thread.start()
    self.stderr_reader_thread = threading.Thread(target=self.reader,args=(self.stderr,self.stderr_lines))
    self.stderr_reader_thread.start()

def close(

self, *args, **kwargs)

close this client's use of this connection

@sync_connection
def close(self,client):
    """ close this client's use of this connection """
    if client in self.clients:
        self.clients.remove(client)

def exit(

self, *args, **kwargs)

terminate the server and clean up this connection

@sync_connection
def exit(self):
    """ terminate the server and clean up this connection """
    print("exit",file=self.stdin,flush=True)
    return ""

def get(

self, *args, **kwargs)

send request to fetch table_name and return response

@sync_connection
def get(self, table_name):
    """ send request to fetch table_name and return response """
    print("get:%s"%table_name,file=self.stdin,flush=True)
    return self.get_stdout_line()

def get_stderr_line(

self, *args, **kwargs)

fetch a line from the queue of stderr_lines

@sync_connection
def get_stderr_line( self ):
    """ fetch a line from the queue of stderr_lines """
    while True:
        with self.reader_lock:
            if len(self.stderr_lines):
                return self.stderr_lines.pop(0)
        time.sleep(1)

def get_stdout_line(

self, *args, **kwargs)

fetch a line from the queue of stdout_lines

@sync_connection
def get_stdout_line( self ):
    """ fetch a line from the queue of stdout_lines """
    while True:
        with self.reader_lock:
            if len(self.stdout_lines):
                return self.stdout_lines.pop(0)
        time.sleep(1)

def open(

self, *args, **kwargs)

register this client as a user of this connection

@sync_connection
def open( self, client ):
    """ register this client as a user of this connection """
    if client not in self.clients:
        self.clients.append(client)

def reader(

self, stream, lines)

worker thread that reads from stdout and pushes data onto stdout_lines and stderr_lines

def reader( self, stream, lines ):
    """ worker thread that reads from stdout and pushes data onto stdout_lines and stderr_lines """
    while not self.session.exit_status_ready():
        line = stream.readline()
        with self.reader_lock:
            lines.append(line)
    with self.reader_lock:
        lines += stream.readlines()

def refresh(

self, *args, **kwargs)

send request to refresh a remote table named table_name and return response

@sync_connection
def refresh(self, table_name):
    """ send request to refresh a remote table named table_name and return response """
    print("refresh:%s"%table_name,file=self.stdin,flush=True)
    return self.get_stdout_line()

def table(

self, *args, **kwargs)

send request to create a new remote table, returns loaded response

@sync_connection
def table(self, table_def):
    """ send request to create a new remote table, returns loaded response """
    print("table:%s"%table_def,file=self.stdin,flush=True)
    return self.get_stdout_line()

Instance variables

var clients

var connection_lock

var owner

var reader_lock

var session

var ssh_client

var stderr

var stderr_lines

var stderr_reader_thread

var stdin

var stdout

var stdout_lines

var stdout_reader_thread

class ConnectionManager

class ConnectionManager():
    def __init__(self):
        """ manages connections to servers and their initial setup """
        self.connections = {}
        self.manager_lock = threading.RLock()

    def __del__(self):
        """ just in case clean up any connections """
        self.shutdown()

    @sync_manager
    def shutdown( self ):
        """ shut down the connection manager and close all the pooled connections """
        for cn in self.connections:
            self.connections[cn].exit()
            self.connections[cn].ssh_client.close()
        self.connections = {}

    @sync_manager
    def connect(self,ssh_spec,client):
        """ create a connection to a server """
        if ssh_spec in self.connections:
            connection = self.connections[ssh_spec]
            connection.open(client)
            return connection

        username,server,port = re.match(r"ssh://([a-z_][a-z0-9_-]*\${0,1})@([^:]*):{0,1}(\d*){0,1}",ssh_spec).groups()

        password = keyring.get_password(server,username)
        if not password:
            return None

        ssh_client = SSHClient()
        ssh_client.load_system_host_keys()
        local_keys = os.path.expanduser('~/.ssh/known_hosts')
        if os.path.exists(local_keys):
            try:
                ssh_client.load_host_keys(local_keys)
            except:
                pass
        ssh_client.connect( hostname=server, port=int(port if port else 22), username=username, password=password)

        if self.setup( ssh_client ):
            session,stdin,stdout,stderror = self.start_command(ssh_client,"~/.local/bin/dashboard --server")
            connection = Connection(self,ssh_client,session,stdin,stdout,stderror)
            connection.open(client)
            self.connections[ssh_spec] = connection
            return connection

        raise Exception("Setup of remote dashboard failed")

    @sync_manager
    def start_command(self, ssh_client, command ):
        """ start a command and return a tuple with (channel,stdin,stdout,stderr) for running process """
        transport = ssh_client.get_transport()
        session = transport.open_session()
        session.exec_command(command)
        stdout = session.makefile("r",1)
        stderr = session.makefile_stderr("r",1)
        stdin = session.makefile_stdin("w",1)
        return (session,stdin,stdout,stderr)

    @sync_manager
    def run_command(self, ssh_client, command ):
        """ run a command wait for it to exit and return the output (retcode,stdout_str,stderr_str) """
        session,stdin,stdout,stderr = self.start_command(ssh_client,command)

        stderr_output = StringIO()
        stdout_output = StringIO()
        while not session.exit_status_ready():
            stdout_output.write(stdout.readline())
            stderr_output.write(stderr.readline())
        stdout_output.write("".join(stdout.readlines()))
        stderr_output.write("".join(stderr.readlines()))
        exit_status = session.recv_exit_status()
        return (exit_status,stdout_output.getvalue(),stderr_output.getvalue())

    @sync_manager
    def setup(self, ssh_client ):
        """ check to see that dashboard is installed and install it if needed """
        exit_status,stdout_str,stderr_str = self.run_command(ssh_client,"~/.local/bin/dashboard --version")
        stdout_str = stdout_str.strip()
        if stdout_str.startswith("dashboard version"):
            if stdout_str.endswith(__version__):
                return True

        exit_status,stdout_str,stderr_str = self.run_command(ssh_client,'python3 -m pip install --upgrade "terminal-dashboard==%s"'%(__version__))
        if exit_status:
            raise Exception(exit_status,stdout_str,stderr_str)

        return True

Ancestors (in MRO)

Static methods

def __init__(

self)

manages connections to servers and their initial setup

def __init__(self):
    """ manages connections to servers and their initial setup """
    self.connections = {}
    self.manager_lock = threading.RLock()

def connect(

self, *args, **kwargs)

create a connection to a server

@sync_manager
def connect(self,ssh_spec,client):
    """ create a connection to a server """
    if ssh_spec in self.connections:
        connection = self.connections[ssh_spec]
        connection.open(client)
        return connection
    username,server,port = re.match(r"ssh://([a-z_][a-z0-9_-]*\${0,1})@([^:]*):{0,1}(\d*){0,1}",ssh_spec).groups()
    password = keyring.get_password(server,username)
    if not password:
        return None
    ssh_client = SSHClient()
    ssh_client.load_system_host_keys()
    local_keys = os.path.expanduser('~/.ssh/known_hosts')
    if os.path.exists(local_keys):
        try:
            ssh_client.load_host_keys(local_keys)
        except:
            pass
    ssh_client.connect( hostname=server, port=int(port if port else 22), username=username, password=password)
    if self.setup( ssh_client ):
        session,stdin,stdout,stderror = self.start_command(ssh_client,"~/.local/bin/dashboard --server")
        connection = Connection(self,ssh_client,session,stdin,stdout,stderror)
        connection.open(client)
        self.connections[ssh_spec] = connection
        return connection
    raise Exception("Setup of remote dashboard failed")

def run_command(

self, *args, **kwargs)

run a command wait for it to exit and return the output (retcode,stdout_str,stderr_str)

@sync_manager
def run_command(self, ssh_client, command ):
    """ run a command wait for it to exit and return the output (retcode,stdout_str,stderr_str) """
    session,stdin,stdout,stderr = self.start_command(ssh_client,command)
    stderr_output = StringIO()
    stdout_output = StringIO()
    while not session.exit_status_ready():
        stdout_output.write(stdout.readline())
        stderr_output.write(stderr.readline())
    stdout_output.write("".join(stdout.readlines()))
    stderr_output.write("".join(stderr.readlines()))
    exit_status = session.recv_exit_status()
    return (exit_status,stdout_output.getvalue(),stderr_output.getvalue())

def setup(

self, *args, **kwargs)

check to see that dashboard is installed and install it if needed

@sync_manager
def setup(self, ssh_client ):
    """ check to see that dashboard is installed and install it if needed """
    exit_status,stdout_str,stderr_str = self.run_command(ssh_client,"~/.local/bin/dashboard --version")
    stdout_str = stdout_str.strip()
    if stdout_str.startswith("dashboard version"):
        if stdout_str.endswith(__version__):
            return True
    exit_status,stdout_str,stderr_str = self.run_command(ssh_client,'python3 -m pip install --upgrade "terminal-dashboard==%s"'%(__version__))
    if exit_status:
        raise Exception(exit_status,stdout_str,stderr_str)
    return True

def shutdown(

self, *args, **kwargs)

shut down the connection manager and close all the pooled connections

@sync_manager
def shutdown( self ):
    """ shut down the connection manager and close all the pooled connections """
    for cn in self.connections:
        self.connections[cn].exit()
        self.connections[cn].ssh_client.close()
    self.connections = {}

def start_command(

self, *args, **kwargs)

start a command and return a tuple with (channel,stdin,stdout,stderr) for running process

@sync_manager
def start_command(self, ssh_client, command ):
    """ start a command and return a tuple with (channel,stdin,stdout,stderr) for running process """
    transport = ssh_client.get_transport()
    session = transport.open_session()
    session.exec_command(command)
    stdout = session.makefile("r",1)
    stderr = session.makefile_stderr("r",1)
    stdin = session.makefile_stdin("w",1)
    return (session,stdin,stdout,stderr)

Instance variables

var connections

var manager_lock

class RemoteDataTable

class RemoteDataTable( DataTable ):
    def __init__(self,ssh_spec=None,table_def=None,name=None,refresh_minutes=1):
        """ accepts an ssh_spec to connect to of the form ssh://username@server_name:port_number, a json string with the definition for the remote table, the local name for this table, and the number of minutes for refresh """
        DataTable.__init__(self,None,name,refresh_minutes)
        self.ssh_spec = ssh_spec
        self.table_def = table_def
        self.connection = None
        self.refresh()

    @synchronized
    def refresh(self):
        """ create a connection to the remote dashboard table server and refresh our internal state """

        if not self.connection:
            cm = get_connection_manager()
            connection = cm.connect(self.ssh_spec,self)
            if not connection:
                return
            self.connection = connection
            response = self.connection.table(json.dumps(self.table_def))
            if not response.startswith("loaded:%s"%self.table_def["name"]):
                return

        table_data = self.connection.get(self.table_def["name"])
        name,json_blob = table_data.split(":",1)
        dt = from_json(StringIO(json_blob))

        rows,cols = dt.get_bounds()
        for idx in range(cols):
            self.replace_column(idx,dt.get_column(idx))

        self.changed()
        DataTable.refresh(self)

Ancestors (in MRO)

Static methods

def __init__(

self, ssh_spec=None, table_def=None, name=None, refresh_minutes=1)

accepts an ssh_spec to connect to of the form ssh://username@server_name:port_number, a json string with the definition for the remote table, the local name for this table, and the number of minutes for refresh

def __init__(self,ssh_spec=None,table_def=None,name=None,refresh_minutes=1):
    """ accepts an ssh_spec to connect to of the form ssh://username@server_name:port_number, a json string with the definition for the remote table, the local name for this table, and the number of minutes for refresh """
    DataTable.__init__(self,None,name,refresh_minutes)
    self.ssh_spec = ssh_spec
    self.table_def = table_def
    self.connection = None
    self.refresh()

def acquire_refresh_lock(

self)

acquire the refresh lock before reading/writing the table state

def acquire_refresh_lock(self):
    """ acquire the refresh lock before reading/writing the table state """
    self.refresh_lock.acquire()

def add_column(

self, *args, **kwargs)

@synchronized
def add_column(self,column):
    idx = len(self.columns)
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    self.columns.append(column)
    self.cnames[column.get_name()] = column
    column.set_table(self)

def changed(

self)

notify listeners that this table has been changed

def changed(self):
    """ notify listeners that this table has been changed """
    for f in self.listeners:
        f(self)

def get(

self, *args, **kwargs)

@synchronized
def get(self, row, reference ):
    return self.columns[self.map_column(reference)].get(row)

def get_bounds(

self, *args, **kwargs)

return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols

@synchronized
def get_bounds(self):
    """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """
    cols = len(self.columns)
    rows = -1
    for c in self.columns:
        size = c.size()
        if rows < 0 or size > rows:
            rows = size
    return (rows,cols)

def get_column(

self, *args, **kwargs)

@synchronized
def get_column(self, reference):
    return self.columns[self.map_column(reference)]

def get_columns(

self, *args, **kwargs)

return the list of columns

@synchronized
def get_columns(self):
    """ return the list of columns """
    return self.columns

def get_name(

self)

return the name of the table

def get_name(self):
    """ return the name of the table """
    return self.name

def get_names(

self, *args, **kwargs)

return a list of the names of the columns in order

@synchronized
def get_names(self):
    """ return a list of the names of the columns in order"""
    return [c.get_name() for c in self.columns]

def get_refresh_timestamp(

self)

get the time that the table was last refreshed

def get_refresh_timestamp( self ):
    """ get the time that the table was last refreshed """
    return self.refresh_timestamp

def has_column(

self, *args, **kwargs)

@synchronized
def has_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return reference in self.cnames
    elif type(reference) == int:
        return idx < len(self.columns)
    else:
        return False

def insert_column(

self, *args, **kwargs)

@synchronized
def insert_column(self,idx,column):
    while idx > len(self.columns):
        self.add_column(blank_column)
    if idx == len(self.columns):
        self.add_column(column)
    else:
        if not column.get_name():
            column.set_name("%s_%d"%(self.name,idx))
        self.columns.insert(idx,column)
        self.cnames[column.get_name()] = column
        column.set_table(self)
        while idx < len(self.columns):
            if column.get_name() == "%s_%d"%(self.name,idx-1):
                column.set_name("%s_%d"%(self.name,idx))
                self.cnames[column.get_name()] = column
            self.columns[idx].set_idx(idx)
            idx += 1

def listen(

self, listen_func)

register for notifications when a change event is raised on this table

def listen(self,listen_func):
    """ register for notifications when a change event is raised on this table """
    self.listeners.append(listen_func)

def map_column(

self, *args, **kwargs)

@synchronized
def map_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return self.cnames[reference].get_idx()
    elif type(reference) == int:
        return reference
    else:
        raise TypeError("wrong type in mapping")

def perform_refresh(

self)

Thread worker that sleeps and refreshes the data on a schedule

def perform_refresh( self ):
    """ Thread worker that sleeps and refreshes the data on a schedule """
    start_time = time.time()
    while not self.refresh_thread_stop:
        if time.time() - start_time >= self.refresh_minutes*60.0:
            self.refresh()
            start_time = time.time()
        time.sleep(1)

def put(

self, *args, **kwargs)

@synchronized
def put(self, row, reference, value):
    self.columns[self.map_column(reference)].put(row,value)

def refresh(

self, *args, **kwargs)

create a connection to the remote dashboard table server and refresh our internal state

@synchronized
def refresh(self):
    """ create a connection to the remote dashboard table server and refresh our internal state """
    if not self.connection:
        cm = get_connection_manager()
        connection = cm.connect(self.ssh_spec,self)
        if not connection:
            return
        self.connection = connection
        response = self.connection.table(json.dumps(self.table_def))
        if not response.startswith("loaded:%s"%self.table_def["name"]):
            return
    table_data = self.connection.get(self.table_def["name"])
    name,json_blob = table_data.split(":",1)
    dt = from_json(StringIO(json_blob))
    rows,cols = dt.get_bounds()
    for idx in range(cols):
        self.replace_column(idx,dt.get_column(idx))
    self.changed()
    DataTable.refresh(self)

def release_refresh_lock(

self)

release the refresh lock after reading/writing the table state

def release_refresh_lock(self):
    """ release the refresh lock after reading/writing the table state """
    self.refresh_lock.release()

def replace_column(

self, *args, **kwargs)

@synchronized
def replace_column(self,idx,column):
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    if idx == len(self.columns):
        self.columns.append(column)
    else:
        del self.cnames[self.columns[idx].get_name()]
        self.columns[idx] = column
    self.cnames[column.get_name()] = column
    column.set_table(self)

def start_refresh(

self)

Start the background refresh thread

def start_refresh( self ):
    """ Start the background refresh thread """
    self.stop_refresh()
    self.refresh_thread = threading.Thread(target=self.perform_refresh)
    self.refresh_thread.start()

def stop_refresh(

self)

Stop the background refresh thread

def stop_refresh( self ):
    """ Stop the background refresh thread """
    self.refresh_thread_stop = True
    if self.refresh_thread and self.refresh_thread.is_alive():
        self.refresh_thread.join()
    self.refresh_thread = None
    self.refresh_thread_stop = False

def unlisten(

self, listen_func)

unregister for notifications when a change event is raised on this table

def unlisten(self,listen_func):
    """ unregister for notifications when a change event is raised on this table """
    self.listeners.remove(listen_func)

Instance variables

var connection

var ssh_spec

var table_def