Top

data_sources.proc_data module

module that aggregates data from system information using psutil and provides a set of data tables

# Copyright 2020 James P Goodwin data table package to manage sparse columnar data
""" module that aggregates data from system information using psutil and provides a set of data tables """
import locale
locale.setlocale(locale.LC_ALL,'')
import sys
import os
import glob
import gzip
import re
import psutil
from datetime import datetime,timedelta
from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized

class AverageCell(Cell):
    def __init__(self,type,value,format):
        self.type = type
        self.value = value
        self.format = format
        self.values = [ value ]

    def put_value(self,value):
        self.values.append(value)
        self.value = sum(self.values)/len(self.values)

class ProcDataTable( DataTable ):
    """ class that collects a time based aggregation of data from system process information into a data_table """
    def __init__(self,num_hours=24,bucket_hours=1,refresh_minutes=10):
        """ Initialize the ProcDataTable to collect system process information, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """
        self.num_hours = num_hours
        self.bucket_hours = bucket_hours
        DataTable.__init__(self,None,
            "Proc Data: for the last %d hours in %d hour buckets, refreshed every %d minutes"%(
            self.num_hours,
            self.bucket_hours,
            refresh_minutes),refresh_minutes)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh or rebuild tables """

        current_time = datetime.now()

        column_names = [ "Time Stamps", "CPU Percent", "Load Avg",
            "Total Virtual Memory", "Available Virtual Memory",
            "Filesystem Percent Full", "Filesystem Read Bytes", "Filesystem Write Bytes",
            "Network Sent Bytes","Network Received Bytes","Network Connections" ]

        def bucket_idx( timestamp, column ):
            for idx in range(column.size()):
                if column.get(idx).get_value() >= timestamp:
                    return idx
            else:
                return -1

        def append_bucket( timestamp, column ):
            column.put(column.size(),Cell(date_type,timestamp+timedelta( hours=self.bucket_hours),format_date))
            if column.size() > self.num_hours/self.bucket_hours:
                for cn in column_names:
                    self.get_column(cn).delete(0)
            return column.size()-1

        def add_average( column_name, idx, value ):
            column = self.get_column(column_name)
            if not column.size() or idx >= column.size():
                column.put(idx, AverageCell(float_type,value,format_float))
            else:
                column.get(idx).put_value(value)

        bidx = 0
        for cn in column_names:
            if not self.has_column(cn):
                self.add_column(Column(name=cn))
            if cn == "Time Stamps":
                bidx = bucket_idx( current_time, self.get_column(cn))
                if bidx < 0:
                    bidx = append_bucket( current_time, self.get_column(cn))
            elif cn == "CPU Percent":
                add_average(cn,bidx,psutil.cpu_percent())
            elif cn == "Load Avg":
                add_average(cn,bidx,psutil.getloadavg()[2])
            elif cn == "Total Virtual Memory":
                add_average(cn,bidx,psutil.virtual_memory().total)
            elif cn == "Available Virtual Memory":
                add_average(cn,bidx,psutil.virtual_memory().available)
            elif cn == "Filesystem Percent Full":
                add_average(cn,bidx,psutil.disk_usage("/").percent)
            elif cn == "Filesystem Read Bytes":
                add_average(cn,bidx,psutil.disk_io_counters().read_bytes)
            elif cn == "Filesystem Write Bytes":
                add_average(cn,bidx,psutil.disk_io_counters().write_bytes)
            elif cn == "Network Sent Bytes":
                add_average(cn,bidx,psutil.net_io_counters().bytes_sent)
            elif cn == "Network Received Bytes":
                add_average(cn,bidx,psutil.net_io_counters().bytes_recv)
            elif cn == "Network Connections":
                add_average(cn,bidx,float(len(psutil.net_connections())))
        self.changed()

        DataTable.refresh(self)

Module variables

var blank_type

var date_type

var float_type

var int_type

var string_type

Classes

class AverageCell

class AverageCell(Cell):
    def __init__(self,type,value,format):
        self.type = type
        self.value = value
        self.format = format
        self.values = [ value ]

    def put_value(self,value):
        self.values.append(value)
        self.value = sum(self.values)/len(self.values)

Ancestors (in MRO)

  • AverageCell
  • data_sources.data_table.Cell
  • builtins.object

Static methods

def __init__(

self, type, value, format)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self,type,value,format):
    self.type = type
    self.value = value
    self.format = format
    self.values = [ value ]

def get_float_value(

self)

def get_float_value(self):
    if self.type in [float_type,int_type]:
        return float(self.value)
    elif self.type == date_type:
        return self.value.timestamp()
    else:
        return 0.0

def get_format(

self)

def get_format(self):
    return self.format

def get_type(

self)

def get_type(self):
    return self.type

def get_value(

self)

def get_value(self):
    return self.value

def put_value(

self, value)

def put_value(self,value):
    self.values.append(value)
    self.value = sum(self.values)/len(self.values)

def set_format(

self, format)

def set_format(self,format):
    self.format = format

Instance variables

var format

var type

var value

var values

class ProcDataTable

class that collects a time based aggregation of data from system process information into a data_table

class ProcDataTable( DataTable ):
    """ class that collects a time based aggregation of data from system process information into a data_table """
    def __init__(self,num_hours=24,bucket_hours=1,refresh_minutes=10):
        """ Initialize the ProcDataTable to collect system process information, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """
        self.num_hours = num_hours
        self.bucket_hours = bucket_hours
        DataTable.__init__(self,None,
            "Proc Data: for the last %d hours in %d hour buckets, refreshed every %d minutes"%(
            self.num_hours,
            self.bucket_hours,
            refresh_minutes),refresh_minutes)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh or rebuild tables """

        current_time = datetime.now()

        column_names = [ "Time Stamps", "CPU Percent", "Load Avg",
            "Total Virtual Memory", "Available Virtual Memory",
            "Filesystem Percent Full", "Filesystem Read Bytes", "Filesystem Write Bytes",
            "Network Sent Bytes","Network Received Bytes","Network Connections" ]

        def bucket_idx( timestamp, column ):
            for idx in range(column.size()):
                if column.get(idx).get_value() >= timestamp:
                    return idx
            else:
                return -1

        def append_bucket( timestamp, column ):
            column.put(column.size(),Cell(date_type,timestamp+timedelta( hours=self.bucket_hours),format_date))
            if column.size() > self.num_hours/self.bucket_hours:
                for cn in column_names:
                    self.get_column(cn).delete(0)
            return column.size()-1

        def add_average( column_name, idx, value ):
            column = self.get_column(column_name)
            if not column.size() or idx >= column.size():
                column.put(idx, AverageCell(float_type,value,format_float))
            else:
                column.get(idx).put_value(value)

        bidx = 0
        for cn in column_names:
            if not self.has_column(cn):
                self.add_column(Column(name=cn))
            if cn == "Time Stamps":
                bidx = bucket_idx( current_time, self.get_column(cn))
                if bidx < 0:
                    bidx = append_bucket( current_time, self.get_column(cn))
            elif cn == "CPU Percent":
                add_average(cn,bidx,psutil.cpu_percent())
            elif cn == "Load Avg":
                add_average(cn,bidx,psutil.getloadavg()[2])
            elif cn == "Total Virtual Memory":
                add_average(cn,bidx,psutil.virtual_memory().total)
            elif cn == "Available Virtual Memory":
                add_average(cn,bidx,psutil.virtual_memory().available)
            elif cn == "Filesystem Percent Full":
                add_average(cn,bidx,psutil.disk_usage("/").percent)
            elif cn == "Filesystem Read Bytes":
                add_average(cn,bidx,psutil.disk_io_counters().read_bytes)
            elif cn == "Filesystem Write Bytes":
                add_average(cn,bidx,psutil.disk_io_counters().write_bytes)
            elif cn == "Network Sent Bytes":
                add_average(cn,bidx,psutil.net_io_counters().bytes_sent)
            elif cn == "Network Received Bytes":
                add_average(cn,bidx,psutil.net_io_counters().bytes_recv)
            elif cn == "Network Connections":
                add_average(cn,bidx,float(len(psutil.net_connections())))
        self.changed()

        DataTable.refresh(self)

Ancestors (in MRO)

  • ProcDataTable
  • data_sources.data_table.DataTable
  • builtins.object

Static methods

def __init__(

self, num_hours=24, bucket_hours=1, refresh_minutes=10)

Initialize the ProcDataTable to collect system process information, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes

def __init__(self,num_hours=24,bucket_hours=1,refresh_minutes=10):
    """ Initialize the ProcDataTable to collect system process information, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """
    self.num_hours = num_hours
    self.bucket_hours = bucket_hours
    DataTable.__init__(self,None,
        "Proc Data: for the last %d hours in %d hour buckets, refreshed every %d minutes"%(
        self.num_hours,
        self.bucket_hours,
        refresh_minutes),refresh_minutes)
    self.refresh()

def acquire_refresh_lock(

self)

acquire the refresh lock before reading/writing the table state

def acquire_refresh_lock(self):
    """ acquire the refresh lock before reading/writing the table state """
    self.refresh_lock.acquire()

def add_column(

self, *args, **kwargs)

@synchronized
def add_column(self,column):
    idx = len(self.columns)
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    self.columns.append(column)
    self.cnames[column.get_name()] = column
    column.set_table(self)

def changed(

self)

notify listeners that this table has been changed

def changed(self):
    """ notify listeners that this table has been changed """
    for f in self.listeners:
        f(self)

def get(

self, *args, **kwargs)

@synchronized
def get(self, row, reference ):
    return self.columns[self.map_column(reference)].get(row)

def get_bounds(

self, *args, **kwargs)

return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols

@synchronized
def get_bounds(self):
    """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """
    cols = len(self.columns)
    rows = -1
    for c in self.columns:
        size = c.size()
        if rows < 0 or size > rows:
            rows = size
    return (rows,cols)

def get_column(

self, *args, **kwargs)

@synchronized
def get_column(self, reference):
    return self.columns[self.map_column(reference)]

def get_columns(

self, *args, **kwargs)

return the list of columns

@synchronized
def get_columns(self):
    """ return the list of columns """
    return self.columns

def get_name(

self)

return the name of the table

def get_name(self):
    """ return the name of the table """
    return self.name

def get_names(

self, *args, **kwargs)

return a list of the names of the columns in order

@synchronized
def get_names(self):
    """ return a list of the names of the columns in order"""
    return [c.get_name() for c in self.columns]

def get_refresh_timestamp(

self)

get the time that the table was last refreshed

def get_refresh_timestamp( self ):
    """ get the time that the table was last refreshed """
    return self.refresh_timestamp

def has_column(

self, *args, **kwargs)

@synchronized
def has_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return reference in self.cnames
    elif type(reference) == int:
        return idx < len(self.columns)
    else:
        return False

def insert_column(

self, *args, **kwargs)

@synchronized
def insert_column(self,idx,column):
    while idx > len(self.columns):
        self.add_column(blank_column)
    if idx == len(self.columns):
        self.add_column(column)
    else:
        if not column.get_name():
            column.set_name("%s_%d"%(self.name,idx))
        self.columns.insert(idx,column)
        self.cnames[column.get_name()] = column
        column.set_table(self)
        while idx < len(self.columns):
            if column.get_name() == "%s_%d"%(self.name,idx-1):
                column.set_name("%s_%d"%(self.name,idx))
                self.cnames[column.get_name()] = column
            self.columns[idx].set_idx(idx)
            idx += 1

def listen(

self, listen_func)

register for notifications when a change event is raised on this table

def listen(self,listen_func):
    """ register for notifications when a change event is raised on this table """
    self.listeners.append(listen_func)

def map_column(

self, *args, **kwargs)

@synchronized
def map_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return self.cnames[reference].get_idx()
    elif type(reference) == int:
        return reference
    else:
        raise TypeError("wrong type in mapping")

def perform_refresh(

self)

Thread worker that sleeps and refreshes the data on a schedule

def perform_refresh( self ):
    """ Thread worker that sleeps and refreshes the data on a schedule """
    start_time = time.time()
    while not self.refresh_thread_stop:
        if time.time() - start_time >= self.refresh_minutes*60.0:
            self.refresh()
            start_time = time.time()
        time.sleep(1)

def put(

self, *args, **kwargs)

@synchronized
def put(self, row, reference, value):
    self.columns[self.map_column(reference)].put(row,value)

def refresh(

self, *args, **kwargs)

refresh or rebuild tables

@synchronized
def refresh( self ):
    """ refresh or rebuild tables """
    current_time = datetime.now()
    column_names = [ "Time Stamps", "CPU Percent", "Load Avg",
        "Total Virtual Memory", "Available Virtual Memory",
        "Filesystem Percent Full", "Filesystem Read Bytes", "Filesystem Write Bytes",
        "Network Sent Bytes","Network Received Bytes","Network Connections" ]
    def bucket_idx( timestamp, column ):
        for idx in range(column.size()):
            if column.get(idx).get_value() >= timestamp:
                return idx
        else:
            return -1
    def append_bucket( timestamp, column ):
        column.put(column.size(),Cell(date_type,timestamp+timedelta( hours=self.bucket_hours),format_date))
        if column.size() > self.num_hours/self.bucket_hours:
            for cn in column_names:
                self.get_column(cn).delete(0)
        return column.size()-1
    def add_average( column_name, idx, value ):
        column = self.get_column(column_name)
        if not column.size() or idx >= column.size():
            column.put(idx, AverageCell(float_type,value,format_float))
        else:
            column.get(idx).put_value(value)
    bidx = 0
    for cn in column_names:
        if not self.has_column(cn):
            self.add_column(Column(name=cn))
        if cn == "Time Stamps":
            bidx = bucket_idx( current_time, self.get_column(cn))
            if bidx < 0:
                bidx = append_bucket( current_time, self.get_column(cn))
        elif cn == "CPU Percent":
            add_average(cn,bidx,psutil.cpu_percent())
        elif cn == "Load Avg":
            add_average(cn,bidx,psutil.getloadavg()[2])
        elif cn == "Total Virtual Memory":
            add_average(cn,bidx,psutil.virtual_memory().total)
        elif cn == "Available Virtual Memory":
            add_average(cn,bidx,psutil.virtual_memory().available)
        elif cn == "Filesystem Percent Full":
            add_average(cn,bidx,psutil.disk_usage("/").percent)
        elif cn == "Filesystem Read Bytes":
            add_average(cn,bidx,psutil.disk_io_counters().read_bytes)
        elif cn == "Filesystem Write Bytes":
            add_average(cn,bidx,psutil.disk_io_counters().write_bytes)
        elif cn == "Network Sent Bytes":
            add_average(cn,bidx,psutil.net_io_counters().bytes_sent)
        elif cn == "Network Received Bytes":
            add_average(cn,bidx,psutil.net_io_counters().bytes_recv)
        elif cn == "Network Connections":
            add_average(cn,bidx,float(len(psutil.net_connections())))
    self.changed()
    DataTable.refresh(self)

def release_refresh_lock(

self)

release the refresh lock after reading/writing the table state

def release_refresh_lock(self):
    """ release the refresh lock after reading/writing the table state """
    self.refresh_lock.release()

def replace_column(

self, *args, **kwargs)

@synchronized
def replace_column(self,idx,column):
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    if idx == len(self.columns):
        self.columns.append(column)
    else:
        del self.cnames[self.columns[idx].get_name()]
        self.columns[idx] = column
    self.cnames[column.get_name()] = column
    column.set_table(self)

def start_refresh(

self)

Start the background refresh thread

def start_refresh( self ):
    """ Start the background refresh thread """
    self.stop_refresh()
    self.refresh_thread = threading.Thread(target=self.perform_refresh)
    self.refresh_thread.start()

def stop_refresh(

self)

Stop the background refresh thread

def stop_refresh( self ):
    """ Stop the background refresh thread """
    self.refresh_thread_stop = True
    if self.refresh_thread and self.refresh_thread.is_alive():
        self.refresh_thread.join()
    self.refresh_thread = None
    self.refresh_thread_stop = False

def unlisten(

self, listen_func)

unregister for notifications when a change event is raised on this table

def unlisten(self,listen_func):
    """ unregister for notifications when a change event is raised on this table """
    self.listeners.remove(listen_func)

Instance variables

var bucket_hours

var num_hours