data_sources.remote_data module
module that implements a remote data table proxy over ssh connection to the dashboard running as a table server
# Copyright 2017 James P Goodwin data table package to manage sparse columnar data """ module that implements a remote data table proxy over ssh connection to the dashboard running as a table server """ import sys import os import re from datetime import datetime import threading import time import csv import json from io import StringIO from paramiko.client import SSHClient import keyring from functools import wraps from data_sources.data_table import DataTable,Cell,Column,from_json,to_json,synchronized from dashboard.version import __version__ def sync_connection(method): @wraps(method) def wrapper(self, *args, **kwargs): with self.connection_lock: return method(self, *args, **kwargs) return wrapper def sync_manager(method): @wraps(method) def wrapper(self, *args, **kwargs): with self.manager_lock: return method(self, *args, **kwargs) return wrapper class Connection(): def __init__(self, owner=None, ssh_client=None, session=None, stdin=None, stdout=None, stderr=None ): """ provides protocol to the table server """ self.ssh_client = ssh_client self.session = session self.clients = [] self.stdout_lines = [] self.stderr_lines = [] self.owner = owner self.stdin = stdin self.stdout = stdout self.stderr = stderr self.connection_lock = threading.RLock() self.reader_lock = threading.RLock() self.stdout_reader_thread = threading.Thread(target=self.reader,args=(self.stdout,self.stdout_lines)) self.stdout_reader_thread.start() self.stderr_reader_thread = threading.Thread(target=self.reader,args=(self.stderr,self.stderr_lines)) self.stderr_reader_thread.start() def reader( self, stream, lines ): """ worker thread that reads from stdout and pushes data onto stdout_lines and stderr_lines """ while not self.session.exit_status_ready(): line = stream.readline() with self.reader_lock: lines.append(line) with self.reader_lock: lines += stream.readlines() @sync_connection def get_stdout_line( self ): """ fetch a line from the queue of stdout_lines """ while True: with self.reader_lock: if len(self.stdout_lines): return self.stdout_lines.pop(0) time.sleep(1) @sync_connection def get_stderr_line( self ): """ fetch a line from the queue of stderr_lines """ while True: with self.reader_lock: if len(self.stderr_lines): return self.stderr_lines.pop(0) time.sleep(1) @sync_connection def open( self, client ): """ register this client as a user of this connection """ if client not in self.clients: self.clients.append(client) @sync_connection def table(self, table_def): """ send request to create a new remote table, returns loaded response """ print("table:%s"%table_def,file=self.stdin,flush=True) return self.get_stdout_line() @sync_connection def refresh(self, table_name): """ send request to refresh a remote table named table_name and return response """ print("refresh:%s"%table_name,file=self.stdin,flush=True) return self.get_stdout_line() @sync_connection def get(self, table_name): """ send request to fetch table_name and return response """ print("get:%s"%table_name,file=self.stdin,flush=True) return self.get_stdout_line() @sync_connection def exit(self): """ terminate the server and clean up this connection """ print("exit",file=self.stdin,flush=True) return "" @sync_connection def close(self,client): """ close this client's use of this connection """ if client in self.clients: self.clients.remove(client) class ConnectionManager(): def __init__(self): """ manages connections to servers and their initial setup """ self.connections = {} self.manager_lock = threading.RLock() def __del__(self): """ just in case clean up any connections """ self.shutdown() @sync_manager def shutdown( self ): """ shut down the connection manager and close all the pooled connections """ for cn in self.connections: self.connections[cn].exit() self.connections[cn].ssh_client.close() self.connections = {} @sync_manager def connect(self,ssh_spec,client): """ create a connection to a server """ if ssh_spec in self.connections: connection = self.connections[ssh_spec] connection.open(client) return connection username,server,port = re.match(r"ssh://([a-z_][a-z0-9_-]*\${0,1})@([^:]*):{0,1}(\d*){0,1}",ssh_spec).groups() password = keyring.get_password(server,username) if not password: return None ssh_client = SSHClient() ssh_client.load_system_host_keys() local_keys = os.path.expanduser('~/.ssh/known_hosts') if os.path.exists(local_keys): try: ssh_client.load_host_keys(local_keys) except: pass ssh_client.connect( hostname=server, port=int(port if port else 22), username=username, password=password) if self.setup( ssh_client ): session,stdin,stdout,stderror = self.start_command(ssh_client,"~/.local/bin/dashboard --server") connection = Connection(self,ssh_client,session,stdin,stdout,stderror) connection.open(client) self.connections[ssh_spec] = connection return connection raise Exception("Setup of remote dashboard failed") @sync_manager def start_command(self, ssh_client, command ): """ start a command and return a tuple with (channel,stdin,stdout,stderr) for running process """ transport = ssh_client.get_transport() session = transport.open_session() session.exec_command(command) stdout = session.makefile("r",1) stderr = session.makefile_stderr("r",1) stdin = session.makefile_stdin("w",1) return (session,stdin,stdout,stderr) @sync_manager def run_command(self, ssh_client, command ): """ run a command wait for it to exit and return the output (retcode,stdout_str,stderr_str) """ session,stdin,stdout,stderr = self.start_command(ssh_client,command) stderr_output = StringIO() stdout_output = StringIO() while not session.exit_status_ready(): stdout_output.write(stdout.readline()) stderr_output.write(stderr.readline()) stdout_output.write("".join(stdout.readlines())) stderr_output.write("".join(stderr.readlines())) exit_status = session.recv_exit_status() return (exit_status,stdout_output.getvalue(),stderr_output.getvalue()) @sync_manager def setup(self, ssh_client ): """ check to see that dashboard is installed and install it if needed """ exit_status,stdout_str,stderr_str = self.run_command(ssh_client,"~/.local/bin/dashboard --version") stdout_str = stdout_str.strip() if stdout_str.startswith("dashboard version"): if stdout_str.endswith(__version__): return True exit_status,stdout_str,stderr_str = self.run_command(ssh_client,'python3 -m pip install --upgrade "terminal-dashboard==%s"'%(__version__)) if exit_status: raise Exception(exit_status,stdout_str,stderr_str) return True _connection_manager = None def get_connection_manager(): """ return the connection manager create one if it doesn't exit """ global _connection_manager if not _connection_manager: _connection_manager = ConnectionManager() return _connection_manager def shutdown_connection_manager(): """ shut down the connection manager if it was ever started """ global _connection_manager if _connection_manager: _connection_manager.shutdown() _connection_manager = None class RemoteDataTable( DataTable ): def __init__(self,ssh_spec=None,table_def=None,name=None,refresh_minutes=1): """ accepts an ssh_spec to connect to of the form ssh://username@server_name:port_number, a json string with the definition for the remote table, the local name for this table, and the number of minutes for refresh """ DataTable.__init__(self,None,name,refresh_minutes) self.ssh_spec = ssh_spec self.table_def = table_def self.connection = None self.refresh() @synchronized def refresh(self): """ create a connection to the remote dashboard table server and refresh our internal state """ if not self.connection: cm = get_connection_manager() connection = cm.connect(self.ssh_spec,self) if not connection: return self.connection = connection response = self.connection.table(json.dumps(self.table_def)) if not response.startswith("loaded:%s"%self.table_def["name"]): return table_data = self.connection.get(self.table_def["name"]) name,json_blob = table_data.split(":",1) dt = from_json(StringIO(json_blob)) rows,cols = dt.get_bounds() for idx in range(cols): self.replace_column(idx,dt.get_column(idx)) self.changed() DataTable.refresh(self)
Functions
def get_connection_manager(
)
return the connection manager create one if it doesn't exit
def get_connection_manager(): """ return the connection manager create one if it doesn't exit """ global _connection_manager if not _connection_manager: _connection_manager = ConnectionManager() return _connection_manager
def shutdown_connection_manager(
)
shut down the connection manager if it was ever started
def shutdown_connection_manager(): """ shut down the connection manager if it was ever started """ global _connection_manager if _connection_manager: _connection_manager.shutdown() _connection_manager = None
def sync_connection(
method)
def sync_connection(method): @wraps(method) def wrapper(self, *args, **kwargs): with self.connection_lock: return method(self, *args, **kwargs) return wrapper
def sync_manager(
method)
def sync_manager(method): @wraps(method) def wrapper(self, *args, **kwargs): with self.manager_lock: return method(self, *args, **kwargs) return wrapper
Classes
class Connection
class Connection(): def __init__(self, owner=None, ssh_client=None, session=None, stdin=None, stdout=None, stderr=None ): """ provides protocol to the table server """ self.ssh_client = ssh_client self.session = session self.clients = [] self.stdout_lines = [] self.stderr_lines = [] self.owner = owner self.stdin = stdin self.stdout = stdout self.stderr = stderr self.connection_lock = threading.RLock() self.reader_lock = threading.RLock() self.stdout_reader_thread = threading.Thread(target=self.reader,args=(self.stdout,self.stdout_lines)) self.stdout_reader_thread.start() self.stderr_reader_thread = threading.Thread(target=self.reader,args=(self.stderr,self.stderr_lines)) self.stderr_reader_thread.start() def reader( self, stream, lines ): """ worker thread that reads from stdout and pushes data onto stdout_lines and stderr_lines """ while not self.session.exit_status_ready(): line = stream.readline() with self.reader_lock: lines.append(line) with self.reader_lock: lines += stream.readlines() @sync_connection def get_stdout_line( self ): """ fetch a line from the queue of stdout_lines """ while True: with self.reader_lock: if len(self.stdout_lines): return self.stdout_lines.pop(0) time.sleep(1) @sync_connection def get_stderr_line( self ): """ fetch a line from the queue of stderr_lines """ while True: with self.reader_lock: if len(self.stderr_lines): return self.stderr_lines.pop(0) time.sleep(1) @sync_connection def open( self, client ): """ register this client as a user of this connection """ if client not in self.clients: self.clients.append(client) @sync_connection def table(self, table_def): """ send request to create a new remote table, returns loaded response """ print("table:%s"%table_def,file=self.stdin,flush=True) return self.get_stdout_line() @sync_connection def refresh(self, table_name): """ send request to refresh a remote table named table_name and return response """ print("refresh:%s"%table_name,file=self.stdin,flush=True) return self.get_stdout_line() @sync_connection def get(self, table_name): """ send request to fetch table_name and return response """ print("get:%s"%table_name,file=self.stdin,flush=True) return self.get_stdout_line() @sync_connection def exit(self): """ terminate the server and clean up this connection """ print("exit",file=self.stdin,flush=True) return "" @sync_connection def close(self,client): """ close this client's use of this connection """ if client in self.clients: self.clients.remove(client)
Ancestors (in MRO)
- Connection
- builtins.object
Static methods
def __init__(
self, owner=None, ssh_client=None, session=None, stdin=None, stdout=None, stderr=None)
provides protocol to the table server
def __init__(self, owner=None, ssh_client=None, session=None, stdin=None, stdout=None, stderr=None ): """ provides protocol to the table server """ self.ssh_client = ssh_client self.session = session self.clients = [] self.stdout_lines = [] self.stderr_lines = [] self.owner = owner self.stdin = stdin self.stdout = stdout self.stderr = stderr self.connection_lock = threading.RLock() self.reader_lock = threading.RLock() self.stdout_reader_thread = threading.Thread(target=self.reader,args=(self.stdout,self.stdout_lines)) self.stdout_reader_thread.start() self.stderr_reader_thread = threading.Thread(target=self.reader,args=(self.stderr,self.stderr_lines)) self.stderr_reader_thread.start()
def close(
self, *args, **kwargs)
close this client's use of this connection
@sync_connection def close(self,client): """ close this client's use of this connection """ if client in self.clients: self.clients.remove(client)
def exit(
self, *args, **kwargs)
terminate the server and clean up this connection
@sync_connection def exit(self): """ terminate the server and clean up this connection """ print("exit",file=self.stdin,flush=True) return ""
def get(
self, *args, **kwargs)
send request to fetch table_name and return response
@sync_connection def get(self, table_name): """ send request to fetch table_name and return response """ print("get:%s"%table_name,file=self.stdin,flush=True) return self.get_stdout_line()
def get_stderr_line(
self, *args, **kwargs)
fetch a line from the queue of stderr_lines
@sync_connection def get_stderr_line( self ): """ fetch a line from the queue of stderr_lines """ while True: with self.reader_lock: if len(self.stderr_lines): return self.stderr_lines.pop(0) time.sleep(1)
def get_stdout_line(
self, *args, **kwargs)
fetch a line from the queue of stdout_lines
@sync_connection def get_stdout_line( self ): """ fetch a line from the queue of stdout_lines """ while True: with self.reader_lock: if len(self.stdout_lines): return self.stdout_lines.pop(0) time.sleep(1)
def open(
self, *args, **kwargs)
register this client as a user of this connection
@sync_connection def open( self, client ): """ register this client as a user of this connection """ if client not in self.clients: self.clients.append(client)
def reader(
self, stream, lines)
worker thread that reads from stdout and pushes data onto stdout_lines and stderr_lines
def reader( self, stream, lines ): """ worker thread that reads from stdout and pushes data onto stdout_lines and stderr_lines """ while not self.session.exit_status_ready(): line = stream.readline() with self.reader_lock: lines.append(line) with self.reader_lock: lines += stream.readlines()
def refresh(
self, *args, **kwargs)
send request to refresh a remote table named table_name and return response
@sync_connection def refresh(self, table_name): """ send request to refresh a remote table named table_name and return response """ print("refresh:%s"%table_name,file=self.stdin,flush=True) return self.get_stdout_line()
def table(
self, *args, **kwargs)
send request to create a new remote table, returns loaded response
@sync_connection def table(self, table_def): """ send request to create a new remote table, returns loaded response """ print("table:%s"%table_def,file=self.stdin,flush=True) return self.get_stdout_line()
Instance variables
var clients
var connection_lock
var owner
var reader_lock
var session
var ssh_client
var stderr
var stderr_lines
var stderr_reader_thread
var stdin
var stdout
var stdout_lines
var stdout_reader_thread
class ConnectionManager
class ConnectionManager(): def __init__(self): """ manages connections to servers and their initial setup """ self.connections = {} self.manager_lock = threading.RLock() def __del__(self): """ just in case clean up any connections """ self.shutdown() @sync_manager def shutdown( self ): """ shut down the connection manager and close all the pooled connections """ for cn in self.connections: self.connections[cn].exit() self.connections[cn].ssh_client.close() self.connections = {} @sync_manager def connect(self,ssh_spec,client): """ create a connection to a server """ if ssh_spec in self.connections: connection = self.connections[ssh_spec] connection.open(client) return connection username,server,port = re.match(r"ssh://([a-z_][a-z0-9_-]*\${0,1})@([^:]*):{0,1}(\d*){0,1}",ssh_spec).groups() password = keyring.get_password(server,username) if not password: return None ssh_client = SSHClient() ssh_client.load_system_host_keys() local_keys = os.path.expanduser('~/.ssh/known_hosts') if os.path.exists(local_keys): try: ssh_client.load_host_keys(local_keys) except: pass ssh_client.connect( hostname=server, port=int(port if port else 22), username=username, password=password) if self.setup( ssh_client ): session,stdin,stdout,stderror = self.start_command(ssh_client,"~/.local/bin/dashboard --server") connection = Connection(self,ssh_client,session,stdin,stdout,stderror) connection.open(client) self.connections[ssh_spec] = connection return connection raise Exception("Setup of remote dashboard failed") @sync_manager def start_command(self, ssh_client, command ): """ start a command and return a tuple with (channel,stdin,stdout,stderr) for running process """ transport = ssh_client.get_transport() session = transport.open_session() session.exec_command(command) stdout = session.makefile("r",1) stderr = session.makefile_stderr("r",1) stdin = session.makefile_stdin("w",1) return (session,stdin,stdout,stderr) @sync_manager def run_command(self, ssh_client, command ): """ run a command wait for it to exit and return the output (retcode,stdout_str,stderr_str) """ session,stdin,stdout,stderr = self.start_command(ssh_client,command) stderr_output = StringIO() stdout_output = StringIO() while not session.exit_status_ready(): stdout_output.write(stdout.readline()) stderr_output.write(stderr.readline()) stdout_output.write("".join(stdout.readlines())) stderr_output.write("".join(stderr.readlines())) exit_status = session.recv_exit_status() return (exit_status,stdout_output.getvalue(),stderr_output.getvalue()) @sync_manager def setup(self, ssh_client ): """ check to see that dashboard is installed and install it if needed """ exit_status,stdout_str,stderr_str = self.run_command(ssh_client,"~/.local/bin/dashboard --version") stdout_str = stdout_str.strip() if stdout_str.startswith("dashboard version"): if stdout_str.endswith(__version__): return True exit_status,stdout_str,stderr_str = self.run_command(ssh_client,'python3 -m pip install --upgrade "terminal-dashboard==%s"'%(__version__)) if exit_status: raise Exception(exit_status,stdout_str,stderr_str) return True
Ancestors (in MRO)
- ConnectionManager
- builtins.object
Static methods
def __init__(
self)
manages connections to servers and their initial setup
def __init__(self): """ manages connections to servers and their initial setup """ self.connections = {} self.manager_lock = threading.RLock()
def connect(
self, *args, **kwargs)
create a connection to a server
@sync_manager def connect(self,ssh_spec,client): """ create a connection to a server """ if ssh_spec in self.connections: connection = self.connections[ssh_spec] connection.open(client) return connection username,server,port = re.match(r"ssh://([a-z_][a-z0-9_-]*\${0,1})@([^:]*):{0,1}(\d*){0,1}",ssh_spec).groups() password = keyring.get_password(server,username) if not password: return None ssh_client = SSHClient() ssh_client.load_system_host_keys() local_keys = os.path.expanduser('~/.ssh/known_hosts') if os.path.exists(local_keys): try: ssh_client.load_host_keys(local_keys) except: pass ssh_client.connect( hostname=server, port=int(port if port else 22), username=username, password=password) if self.setup( ssh_client ): session,stdin,stdout,stderror = self.start_command(ssh_client,"~/.local/bin/dashboard --server") connection = Connection(self,ssh_client,session,stdin,stdout,stderror) connection.open(client) self.connections[ssh_spec] = connection return connection raise Exception("Setup of remote dashboard failed")
def run_command(
self, *args, **kwargs)
run a command wait for it to exit and return the output (retcode,stdout_str,stderr_str)
@sync_manager def run_command(self, ssh_client, command ): """ run a command wait for it to exit and return the output (retcode,stdout_str,stderr_str) """ session,stdin,stdout,stderr = self.start_command(ssh_client,command) stderr_output = StringIO() stdout_output = StringIO() while not session.exit_status_ready(): stdout_output.write(stdout.readline()) stderr_output.write(stderr.readline()) stdout_output.write("".join(stdout.readlines())) stderr_output.write("".join(stderr.readlines())) exit_status = session.recv_exit_status() return (exit_status,stdout_output.getvalue(),stderr_output.getvalue())
def setup(
self, *args, **kwargs)
check to see that dashboard is installed and install it if needed
@sync_manager def setup(self, ssh_client ): """ check to see that dashboard is installed and install it if needed """ exit_status,stdout_str,stderr_str = self.run_command(ssh_client,"~/.local/bin/dashboard --version") stdout_str = stdout_str.strip() if stdout_str.startswith("dashboard version"): if stdout_str.endswith(__version__): return True exit_status,stdout_str,stderr_str = self.run_command(ssh_client,'python3 -m pip install --upgrade "terminal-dashboard==%s"'%(__version__)) if exit_status: raise Exception(exit_status,stdout_str,stderr_str) return True
def shutdown(
self, *args, **kwargs)
shut down the connection manager and close all the pooled connections
@sync_manager def shutdown( self ): """ shut down the connection manager and close all the pooled connections """ for cn in self.connections: self.connections[cn].exit() self.connections[cn].ssh_client.close() self.connections = {}
def start_command(
self, *args, **kwargs)
start a command and return a tuple with (channel,stdin,stdout,stderr) for running process
@sync_manager def start_command(self, ssh_client, command ): """ start a command and return a tuple with (channel,stdin,stdout,stderr) for running process """ transport = ssh_client.get_transport() session = transport.open_session() session.exec_command(command) stdout = session.makefile("r",1) stderr = session.makefile_stderr("r",1) stdin = session.makefile_stdin("w",1) return (session,stdin,stdout,stderr)
Instance variables
var connections
var manager_lock
class RemoteDataTable
class RemoteDataTable( DataTable ): def __init__(self,ssh_spec=None,table_def=None,name=None,refresh_minutes=1): """ accepts an ssh_spec to connect to of the form ssh://username@server_name:port_number, a json string with the definition for the remote table, the local name for this table, and the number of minutes for refresh """ DataTable.__init__(self,None,name,refresh_minutes) self.ssh_spec = ssh_spec self.table_def = table_def self.connection = None self.refresh() @synchronized def refresh(self): """ create a connection to the remote dashboard table server and refresh our internal state """ if not self.connection: cm = get_connection_manager() connection = cm.connect(self.ssh_spec,self) if not connection: return self.connection = connection response = self.connection.table(json.dumps(self.table_def)) if not response.startswith("loaded:%s"%self.table_def["name"]): return table_data = self.connection.get(self.table_def["name"]) name,json_blob = table_data.split(":",1) dt = from_json(StringIO(json_blob)) rows,cols = dt.get_bounds() for idx in range(cols): self.replace_column(idx,dt.get_column(idx)) self.changed() DataTable.refresh(self)
Ancestors (in MRO)
- RemoteDataTable
- data_sources.data_table.DataTable
- builtins.object
Static methods
def __init__(
self, ssh_spec=None, table_def=None, name=None, refresh_minutes=1)
accepts an ssh_spec to connect to of the form ssh://username@server_name:port_number, a json string with the definition for the remote table, the local name for this table, and the number of minutes for refresh
def __init__(self,ssh_spec=None,table_def=None,name=None,refresh_minutes=1): """ accepts an ssh_spec to connect to of the form ssh://username@server_name:port_number, a json string with the definition for the remote table, the local name for this table, and the number of minutes for refresh """ DataTable.__init__(self,None,name,refresh_minutes) self.ssh_spec = ssh_spec self.table_def = table_def self.connection = None self.refresh()
def acquire_refresh_lock(
self)
acquire the refresh lock before reading/writing the table state
def acquire_refresh_lock(self): """ acquire the refresh lock before reading/writing the table state """ self.refresh_lock.acquire()
def add_column(
self, *args, **kwargs)
@synchronized def add_column(self,column): idx = len(self.columns) column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.append(column) self.cnames[column.get_name()] = column column.set_table(self)
def changed(
self)
notify listeners that this table has been changed
def changed(self): """ notify listeners that this table has been changed """ for f in self.listeners: f(self)
def get(
self, *args, **kwargs)
@synchronized def get(self, row, reference ): return self.columns[self.map_column(reference)].get(row)
def get_bounds(
self, *args, **kwargs)
return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols
@synchronized def get_bounds(self): """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """ cols = len(self.columns) rows = -1 for c in self.columns: size = c.size() if rows < 0 or size > rows: rows = size return (rows,cols)
def get_column(
self, *args, **kwargs)
@synchronized def get_column(self, reference): return self.columns[self.map_column(reference)]
def get_columns(
self, *args, **kwargs)
return the list of columns
@synchronized def get_columns(self): """ return the list of columns """ return self.columns
def get_name(
self)
return the name of the table
def get_name(self): """ return the name of the table """ return self.name
def get_names(
self, *args, **kwargs)
return a list of the names of the columns in order
@synchronized def get_names(self): """ return a list of the names of the columns in order""" return [c.get_name() for c in self.columns]
def get_refresh_timestamp(
self)
get the time that the table was last refreshed
def get_refresh_timestamp( self ): """ get the time that the table was last refreshed """ return self.refresh_timestamp
def has_column(
self, *args, **kwargs)
@synchronized def has_column(self, reference ): if type(reference) == str or type(reference) == str: return reference in self.cnames elif type(reference) == int: return idx < len(self.columns) else: return False
def insert_column(
self, *args, **kwargs)
@synchronized def insert_column(self,idx,column): while idx > len(self.columns): self.add_column(blank_column) if idx == len(self.columns): self.add_column(column) else: if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.insert(idx,column) self.cnames[column.get_name()] = column column.set_table(self) while idx < len(self.columns): if column.get_name() == "%s_%d"%(self.name,idx-1): column.set_name("%s_%d"%(self.name,idx)) self.cnames[column.get_name()] = column self.columns[idx].set_idx(idx) idx += 1
def listen(
self, listen_func)
register for notifications when a change event is raised on this table
def listen(self,listen_func): """ register for notifications when a change event is raised on this table """ self.listeners.append(listen_func)
def map_column(
self, *args, **kwargs)
@synchronized def map_column(self, reference ): if type(reference) == str or type(reference) == str: return self.cnames[reference].get_idx() elif type(reference) == int: return reference else: raise TypeError("wrong type in mapping")
def perform_refresh(
self)
Thread worker that sleeps and refreshes the data on a schedule
def perform_refresh( self ): """ Thread worker that sleeps and refreshes the data on a schedule """ start_time = time.time() while not self.refresh_thread_stop: if time.time() - start_time >= self.refresh_minutes*60.0: self.refresh() start_time = time.time() time.sleep(1)
def put(
self, *args, **kwargs)
@synchronized def put(self, row, reference, value): self.columns[self.map_column(reference)].put(row,value)
def refresh(
self, *args, **kwargs)
create a connection to the remote dashboard table server and refresh our internal state
@synchronized def refresh(self): """ create a connection to the remote dashboard table server and refresh our internal state """ if not self.connection: cm = get_connection_manager() connection = cm.connect(self.ssh_spec,self) if not connection: return self.connection = connection response = self.connection.table(json.dumps(self.table_def)) if not response.startswith("loaded:%s"%self.table_def["name"]): return table_data = self.connection.get(self.table_def["name"]) name,json_blob = table_data.split(":",1) dt = from_json(StringIO(json_blob)) rows,cols = dt.get_bounds() for idx in range(cols): self.replace_column(idx,dt.get_column(idx)) self.changed() DataTable.refresh(self)
def release_refresh_lock(
self)
release the refresh lock after reading/writing the table state
def release_refresh_lock(self): """ release the refresh lock after reading/writing the table state """ self.refresh_lock.release()
def replace_column(
self, *args, **kwargs)
@synchronized def replace_column(self,idx,column): column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) if idx == len(self.columns): self.columns.append(column) else: del self.cnames[self.columns[idx].get_name()] self.columns[idx] = column self.cnames[column.get_name()] = column column.set_table(self)
def start_refresh(
self)
Start the background refresh thread
def start_refresh( self ): """ Start the background refresh thread """ self.stop_refresh() self.refresh_thread = threading.Thread(target=self.perform_refresh) self.refresh_thread.start()
def stop_refresh(
self)
Stop the background refresh thread
def stop_refresh( self ): """ Stop the background refresh thread """ self.refresh_thread_stop = True if self.refresh_thread and self.refresh_thread.is_alive(): self.refresh_thread.join() self.refresh_thread = None self.refresh_thread_stop = False
def unlisten(
self, listen_func)
unregister for notifications when a change event is raised on this table
def unlisten(self,listen_func): """ unregister for notifications when a change event is raised on this table """ self.listeners.remove(listen_func)
Instance variables
var connection
var ssh_spec
var table_def