Top

data_sources.logs_data module

module that aggregates data from a log file and provides a set of data tables

# Copyright 2020 James P Goodwin data table package to manage sparse columnar data
""" module that aggregates data from a log file and provides a set of data tables """
import locale
locale.setlocale(locale.LC_ALL,'')
import sys
import os
import glob
import gzip
import re
import statistics
from dateutil import parser
from datetime import datetime,timedelta
from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized

format_map = { date_type : format_date, int_type : format_int, float_type : format_float, string_type : format_string }

class ActionCell(Cell):
    def __init__(self,type,value,format,action):
        Cell.__init__(self,type,None,format)
        self.action = action
        self.values = []
        self.put_value(value)

    def default_value(self):
        if self.type == date_type:
            return datetime.min
        elif self.type == int_type:
            return 0
        elif self.type == float_type:
            return 0.0
        elif self.type == string_type:
            return ""

    def put_value(self,value):
        if value == None:
            self.value = self.default_value()
        else:
            self.values.append(value)
            try:
                if self.action == "key":
                    self.value = value
                elif self.action == "avg":
                    self.value = statistics.mean(self.values)
                elif self.action == "mode":
                    self.value = statistics.mode(self.values)
                elif self.action == "median":
                    self.value = statistics.median(self.values)
                elif self.action == "min":
                    self.value = min(self.values)
                elif self.action == "max":
                    self.value = max(self.values)
                elif self.action == "sum":
                    self.value = sum(self.values)
                elif self.action.startswith("count("):
                    regex = self.action.split("(")[1].split(")")[0]
                    if self.value == None:
                        self.value = self.default_value()
                    if re.match(regex,str(value)):
                        self.value += 1
            except:
                self.value = self.default_value()

class Value():
    """ structure for mapped values """
    def __init__(self,column_name,type,action,value):
        """ initialize the value structure with mapping information and value from log """
        self.column_name = column_name
        self.type = type
        self.action = action
        self.value = value

    def get_value(self):
        """ based on type return the value from the log """
        if self.type == date_type:
            try:
                return datetime.fromtimestamp(float(self.value))
            except:
                return parser.parse(self.value)
        elif self.type == int_type:
            if self.action.startswith("count("):
                return self.value
            else:
                return int(self.value)
        elif self.type == float_type:
            return float(self.value)
        elif self.type == string_type:
            return self.value
        else:
            return str(self.value)

    def to_cell(self):
        """ construct and return a cell based on type, action and value """
        return ActionCell( self.type, self.get_value(), format_map[self.type], self.action )

class LogDataTable( DataTable ):
    """ class that collects a time based aggregation of data from the syslog into a data_table """
    def __init__(self,log_glob=None,log_map=None,log_lookback=None,refresh_minutes=10):
        """ Initialize the LogDataTable with a file glob pattern to collect the
        matching logs on this machine, a timespan to aggregate for, aggregation
        bucket in hours, a refresh interval for updating in minutes and a
        log_map of the structure [{
            "line_regex" : "python regex with groups to match columns",
            "num_buckets" : "number of buckets for this key",
            "bucket_size" : "size of a bucket",
            "bucket_type" : "type of buckets",
            "column_map" : [
                [group_number 1..n,
                "Column Name",
                "type one of _int,_float,_string,_date",
                "action one of key,avg,min,max,count(value),mode,median"],
                ...]},...]
        the key action is special and indicates that this is the bucket key for this type of line
        log_lookback is of the form [ days, hours, minutes ] all must be specified """
        self.log_glob = log_glob
        self.log_map = log_map
        self.log_lookback = log_lookback
        self.file_map = {}

        DataTable.__init__(self,None,
            "LogDataTable: %s, %d minutes refresh"%(
                self.log_glob,
                refresh_minutes),
                refresh_minutes)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh or rebuild tables """

        def get_bucket( line_spec,value ):
            if not self.has_column(value.column_name):
                self.add_column(Column(name=value.column_name))
            bc = self.get_column(value.column_name)
            for idx in range(bc.size()):
                if bc.get(idx).get_value() >= value.get_value():
                    break
            else:
                idx = bc.size()
            if idx < bc.size():
                if line_spec["bucket_type"] == string_type:
                    if bc.get(idx).get_value() != value.get_value():
                        bc.ins(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
                return idx
            elif idx == 0 and bc.size() > 0:
                diff = bc.get(idx).get_value() - value.get_value()
                if line_spec["bucket_type"] == date_type:
                    while diff > timedelta(minutes=line_spec["bucket_size"]):
                        new_bucket = bc.get(idx).get_value() - timedelta(minutes=line_spec["bucket_size"])
                        bc.ins(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                        diff = bc.get(idx).get_value() - value.get_value()
                    return idx
                elif line_spec["bucket_type"] == string_type:
                    bc.ins(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
                    return idx
                else:
                    while diff > line_spec["bucket_size"]:
                        new_bucket = bc.get(idx).get_value() - line_spec["bucket_size"]
                        bc.ins(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                        diff = bc.get(idx).get_value() - value.get_value()
                    return idx
            elif idx == bc.size():
                if line_spec["bucket_type"] == string_type:
                    bc.put(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
                    return idx
                else:
                    while True:
                        if idx > 0:
                            prev_bucket = bc.get(idx-1).get_value()
                        else:
                            prev_bucket = value.get_value()

                        if line_spec["bucket_type"] == date_type:
                            new_bucket = prev_bucket + timedelta(minutes=line_spec["bucket_size"])
                        else:
                            new_bucket = prev_bucket + line_spec["bucket_size"]

                        bc.put(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                        if value.get_value() < new_bucket:
                            return idx
                        idx = bc.size()

        def put_value( value, bidx ):
            if not self.has_column(value.column_name):
                self.add_column(Column(name=value.column_name))
            cc = self.get_column(value.column_name)
            if bidx < cc.size():
                c = cc.get(bidx)
                if c.type == blank_type:
                    cc.put(bidx,value.to_cell())
                else:
                    cc.get(bidx).put_value(value.get_value())
            else:
                cc.put(bidx,value.to_cell())

        def prune_buckets( line_spec ):
            for group,column_name,type,action in line_spec["column_map"]:
                if self.has_column(column_name):
                    cc = self.get_column(column_name)
                    while cc.size() > line_spec["num_buckets"]:
                        cc.delete(0)

        def top_buckets( line_spec ):
            columns = []
            key_idx = None
            idx = 0
            for group,column_name,type,action in line_spec["column_map"]:
                columns.append(self.get_column(column_name))
                if action == "key":
                    key_idx = idx
                idx += 1

            sort_rows = []
            for idx in range(columns[key_idx].size()):
                values = []
                for cidx in range(len(columns)):
                    if cidx != key_idx:
                        values.append(columns[cidx].get(idx).get_value())
                values.append(idx)
                sort_rows.append(values)

            sort_rows.sort(reverse=True)
            new_columns = []
            for group,column_name,type,action in line_spec["column_map"]:
                new_columns.append(Column(name=column_name))

            for ridx in range(min(len(sort_rows),line_spec["num_buckets"])):
                for cidx in range(len(columns)):
                    new_columns[cidx].put(sort_rows[ridx][-1],columns[cidx].get(sort_rows[ridx][-1]))

            for c in new_columns:
                self.replace_column(self.map_column(c.get_name()),c)

        lb_days,lb_hours,lb_minutes = self.log_lookback
        start_time = datetime.now() - timedelta(days=lb_days,hours=lb_hours,minutes=lb_minutes)

        log_files = glob.glob(self.log_glob)

        for lf in log_files:
            lfp = 0
            stat = os.stat(lf)
            if stat.st_mtime < start_time.timestamp():
                continue

            if lf in self.file_map:
                lft,lfp = self.file_map[lf]
                if stat.st_mtime <= lft:
                    continue

            if lf.endswith(".gz"):
                lf_f = gzip.open(lf,"rt",encoding="utf-8")
            else:
                lf_f = open(lf,"r",encoding="utf-8")

            lf_f.seek(lfp,0)

            for line in lf_f:
                line = line.strip()
                for line_spec in self.log_map:
                    m = re.match(line_spec["line_regex"],line)
                    if m:
                        values = []
                        key_idx = None
                        for group,column_name,type,action in line_spec["column_map"]:
                            values.append(Value( column_name, type, action, m.group(group) ))
                            if action == "key":
                                key_idx = len(values)-1
                        bidx = get_bucket(line_spec,values[key_idx])
                        for v in values:
                            if v.action != "key":
                                put_value( v, bidx )
                        if values[key_idx].type != string_type:
                            prune_buckets(line_spec)

            self.file_map[lf] = (stat.st_mtime,lf_f.tell())

        for line_spec in self.log_map:
            key_idx = None
            idx = 0
            for group,column_name,type,action in line_spec["column_map"]:
                if action == "key":
                    key_idx = idx
                    break
                idx += 1

            kg,kn,kt,ka = line_spec["column_map"][key_idx]
            kc = self.get_column(kn)
            for idx in range(kc.size()):
                for fg,fn,ft,fa in line_spec["column_map"]:
                    if fn != kn:
                        fc = self.get_column(fn)
                        cc = fc.get(idx)
                        if cc.type == blank_type:
                            fc.put(idx,ActionCell(ft,None,format_map[ft],fa))

            if kt == string_type:
                top_buckets( line_spec )

        self.changed()

        DataTable.refresh(self)

Module variables

var blank_type

var date_type

var float_type

var format_map

var int_type

var string_type

Classes

class ActionCell

class ActionCell(Cell):
    def __init__(self,type,value,format,action):
        Cell.__init__(self,type,None,format)
        self.action = action
        self.values = []
        self.put_value(value)

    def default_value(self):
        if self.type == date_type:
            return datetime.min
        elif self.type == int_type:
            return 0
        elif self.type == float_type:
            return 0.0
        elif self.type == string_type:
            return ""

    def put_value(self,value):
        if value == None:
            self.value = self.default_value()
        else:
            self.values.append(value)
            try:
                if self.action == "key":
                    self.value = value
                elif self.action == "avg":
                    self.value = statistics.mean(self.values)
                elif self.action == "mode":
                    self.value = statistics.mode(self.values)
                elif self.action == "median":
                    self.value = statistics.median(self.values)
                elif self.action == "min":
                    self.value = min(self.values)
                elif self.action == "max":
                    self.value = max(self.values)
                elif self.action == "sum":
                    self.value = sum(self.values)
                elif self.action.startswith("count("):
                    regex = self.action.split("(")[1].split(")")[0]
                    if self.value == None:
                        self.value = self.default_value()
                    if re.match(regex,str(value)):
                        self.value += 1
            except:
                self.value = self.default_value()

Ancestors (in MRO)

  • ActionCell
  • data_sources.data_table.Cell
  • builtins.object

Static methods

def __init__(

self, type, value, format, action)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self,type,value,format,action):
    Cell.__init__(self,type,None,format)
    self.action = action
    self.values = []
    self.put_value(value)

def default_value(

self)

def default_value(self):
    if self.type == date_type:
        return datetime.min
    elif self.type == int_type:
        return 0
    elif self.type == float_type:
        return 0.0
    elif self.type == string_type:
        return ""

def get_float_value(

self)

def get_float_value(self):
    if self.type in [float_type,int_type]:
        return float(self.value)
    elif self.type == date_type:
        return self.value.timestamp()
    else:
        return 0.0

def get_format(

self)

def get_format(self):
    return self.format

def get_type(

self)

def get_type(self):
    return self.type

def get_value(

self)

def get_value(self):
    return self.value

def put_value(

self, value)

def put_value(self,value):
    if value == None:
        self.value = self.default_value()
    else:
        self.values.append(value)
        try:
            if self.action == "key":
                self.value = value
            elif self.action == "avg":
                self.value = statistics.mean(self.values)
            elif self.action == "mode":
                self.value = statistics.mode(self.values)
            elif self.action == "median":
                self.value = statistics.median(self.values)
            elif self.action == "min":
                self.value = min(self.values)
            elif self.action == "max":
                self.value = max(self.values)
            elif self.action == "sum":
                self.value = sum(self.values)
            elif self.action.startswith("count("):
                regex = self.action.split("(")[1].split(")")[0]
                if self.value == None:
                    self.value = self.default_value()
                if re.match(regex,str(value)):
                    self.value += 1
        except:
            self.value = self.default_value()

def set_format(

self, format)

def set_format(self,format):
    self.format = format

Instance variables

var action

var values

class LogDataTable

class that collects a time based aggregation of data from the syslog into a data_table

class LogDataTable( DataTable ):
    """ class that collects a time based aggregation of data from the syslog into a data_table """
    def __init__(self,log_glob=None,log_map=None,log_lookback=None,refresh_minutes=10):
        """ Initialize the LogDataTable with a file glob pattern to collect the
        matching logs on this machine, a timespan to aggregate for, aggregation
        bucket in hours, a refresh interval for updating in minutes and a
        log_map of the structure [{
            "line_regex" : "python regex with groups to match columns",
            "num_buckets" : "number of buckets for this key",
            "bucket_size" : "size of a bucket",
            "bucket_type" : "type of buckets",
            "column_map" : [
                [group_number 1..n,
                "Column Name",
                "type one of _int,_float,_string,_date",
                "action one of key,avg,min,max,count(value),mode,median"],
                ...]},...]
        the key action is special and indicates that this is the bucket key for this type of line
        log_lookback is of the form [ days, hours, minutes ] all must be specified """
        self.log_glob = log_glob
        self.log_map = log_map
        self.log_lookback = log_lookback
        self.file_map = {}

        DataTable.__init__(self,None,
            "LogDataTable: %s, %d minutes refresh"%(
                self.log_glob,
                refresh_minutes),
                refresh_minutes)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh or rebuild tables """

        def get_bucket( line_spec,value ):
            if not self.has_column(value.column_name):
                self.add_column(Column(name=value.column_name))
            bc = self.get_column(value.column_name)
            for idx in range(bc.size()):
                if bc.get(idx).get_value() >= value.get_value():
                    break
            else:
                idx = bc.size()
            if idx < bc.size():
                if line_spec["bucket_type"] == string_type:
                    if bc.get(idx).get_value() != value.get_value():
                        bc.ins(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
                return idx
            elif idx == 0 and bc.size() > 0:
                diff = bc.get(idx).get_value() - value.get_value()
                if line_spec["bucket_type"] == date_type:
                    while diff > timedelta(minutes=line_spec["bucket_size"]):
                        new_bucket = bc.get(idx).get_value() - timedelta(minutes=line_spec["bucket_size"])
                        bc.ins(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                        diff = bc.get(idx).get_value() - value.get_value()
                    return idx
                elif line_spec["bucket_type"] == string_type:
                    bc.ins(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
                    return idx
                else:
                    while diff > line_spec["bucket_size"]:
                        new_bucket = bc.get(idx).get_value() - line_spec["bucket_size"]
                        bc.ins(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                        diff = bc.get(idx).get_value() - value.get_value()
                    return idx
            elif idx == bc.size():
                if line_spec["bucket_type"] == string_type:
                    bc.put(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
                    return idx
                else:
                    while True:
                        if idx > 0:
                            prev_bucket = bc.get(idx-1).get_value()
                        else:
                            prev_bucket = value.get_value()

                        if line_spec["bucket_type"] == date_type:
                            new_bucket = prev_bucket + timedelta(minutes=line_spec["bucket_size"])
                        else:
                            new_bucket = prev_bucket + line_spec["bucket_size"]

                        bc.put(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                        if value.get_value() < new_bucket:
                            return idx
                        idx = bc.size()

        def put_value( value, bidx ):
            if not self.has_column(value.column_name):
                self.add_column(Column(name=value.column_name))
            cc = self.get_column(value.column_name)
            if bidx < cc.size():
                c = cc.get(bidx)
                if c.type == blank_type:
                    cc.put(bidx,value.to_cell())
                else:
                    cc.get(bidx).put_value(value.get_value())
            else:
                cc.put(bidx,value.to_cell())

        def prune_buckets( line_spec ):
            for group,column_name,type,action in line_spec["column_map"]:
                if self.has_column(column_name):
                    cc = self.get_column(column_name)
                    while cc.size() > line_spec["num_buckets"]:
                        cc.delete(0)

        def top_buckets( line_spec ):
            columns = []
            key_idx = None
            idx = 0
            for group,column_name,type,action in line_spec["column_map"]:
                columns.append(self.get_column(column_name))
                if action == "key":
                    key_idx = idx
                idx += 1

            sort_rows = []
            for idx in range(columns[key_idx].size()):
                values = []
                for cidx in range(len(columns)):
                    if cidx != key_idx:
                        values.append(columns[cidx].get(idx).get_value())
                values.append(idx)
                sort_rows.append(values)

            sort_rows.sort(reverse=True)
            new_columns = []
            for group,column_name,type,action in line_spec["column_map"]:
                new_columns.append(Column(name=column_name))

            for ridx in range(min(len(sort_rows),line_spec["num_buckets"])):
                for cidx in range(len(columns)):
                    new_columns[cidx].put(sort_rows[ridx][-1],columns[cidx].get(sort_rows[ridx][-1]))

            for c in new_columns:
                self.replace_column(self.map_column(c.get_name()),c)

        lb_days,lb_hours,lb_minutes = self.log_lookback
        start_time = datetime.now() - timedelta(days=lb_days,hours=lb_hours,minutes=lb_minutes)

        log_files = glob.glob(self.log_glob)

        for lf in log_files:
            lfp = 0
            stat = os.stat(lf)
            if stat.st_mtime < start_time.timestamp():
                continue

            if lf in self.file_map:
                lft,lfp = self.file_map[lf]
                if stat.st_mtime <= lft:
                    continue

            if lf.endswith(".gz"):
                lf_f = gzip.open(lf,"rt",encoding="utf-8")
            else:
                lf_f = open(lf,"r",encoding="utf-8")

            lf_f.seek(lfp,0)

            for line in lf_f:
                line = line.strip()
                for line_spec in self.log_map:
                    m = re.match(line_spec["line_regex"],line)
                    if m:
                        values = []
                        key_idx = None
                        for group,column_name,type,action in line_spec["column_map"]:
                            values.append(Value( column_name, type, action, m.group(group) ))
                            if action == "key":
                                key_idx = len(values)-1
                        bidx = get_bucket(line_spec,values[key_idx])
                        for v in values:
                            if v.action != "key":
                                put_value( v, bidx )
                        if values[key_idx].type != string_type:
                            prune_buckets(line_spec)

            self.file_map[lf] = (stat.st_mtime,lf_f.tell())

        for line_spec in self.log_map:
            key_idx = None
            idx = 0
            for group,column_name,type,action in line_spec["column_map"]:
                if action == "key":
                    key_idx = idx
                    break
                idx += 1

            kg,kn,kt,ka = line_spec["column_map"][key_idx]
            kc = self.get_column(kn)
            for idx in range(kc.size()):
                for fg,fn,ft,fa in line_spec["column_map"]:
                    if fn != kn:
                        fc = self.get_column(fn)
                        cc = fc.get(idx)
                        if cc.type == blank_type:
                            fc.put(idx,ActionCell(ft,None,format_map[ft],fa))

            if kt == string_type:
                top_buckets( line_spec )

        self.changed()

        DataTable.refresh(self)

Ancestors (in MRO)

  • LogDataTable
  • data_sources.data_table.DataTable
  • builtins.object

Static methods

def __init__(

self, log_glob=None, log_map=None, log_lookback=None, refresh_minutes=10)

Initialize the LogDataTable with a file glob pattern to collect the matching logs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes and a log_map of the structure [{ "line_regex" : "python regex with groups to match columns", "num_buckets" : "number of buckets for this key", "bucket_size" : "size of a bucket", "bucket_type" : "type of buckets", "column_map" : [ [group_number 1..n, "Column Name", "type one of _int,_float,_string,_date", "action one of key,avg,min,max,count(value),mode,median"], ...]},...] the key action is special and indicates that this is the bucket key for this type of line log_lookback is of the form [ days, hours, minutes ] all must be specified

def __init__(self,log_glob=None,log_map=None,log_lookback=None,refresh_minutes=10):
    """ Initialize the LogDataTable with a file glob pattern to collect the
    matching logs on this machine, a timespan to aggregate for, aggregation
    bucket in hours, a refresh interval for updating in minutes and a
    log_map of the structure [{
        "line_regex" : "python regex with groups to match columns",
        "num_buckets" : "number of buckets for this key",
        "bucket_size" : "size of a bucket",
        "bucket_type" : "type of buckets",
        "column_map" : [
            [group_number 1..n,
            "Column Name",
            "type one of _int,_float,_string,_date",
            "action one of key,avg,min,max,count(value),mode,median"],
            ...]},...]
    the key action is special and indicates that this is the bucket key for this type of line
    log_lookback is of the form [ days, hours, minutes ] all must be specified """
    self.log_glob = log_glob
    self.log_map = log_map
    self.log_lookback = log_lookback
    self.file_map = {}
    DataTable.__init__(self,None,
        "LogDataTable: %s, %d minutes refresh"%(
            self.log_glob,
            refresh_minutes),
            refresh_minutes)
    self.refresh()

def acquire_refresh_lock(

self)

acquire the refresh lock before reading/writing the table state

def acquire_refresh_lock(self):
    """ acquire the refresh lock before reading/writing the table state """
    self.refresh_lock.acquire()

def add_column(

self, *args, **kwargs)

@synchronized
def add_column(self,column):
    idx = len(self.columns)
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    self.columns.append(column)
    self.cnames[column.get_name()] = column
    column.set_table(self)

def changed(

self)

notify listeners that this table has been changed

def changed(self):
    """ notify listeners that this table has been changed """
    for f in self.listeners:
        f(self)

def get(

self, *args, **kwargs)

@synchronized
def get(self, row, reference ):
    return self.columns[self.map_column(reference)].get(row)

def get_bounds(

self, *args, **kwargs)

return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols

@synchronized
def get_bounds(self):
    """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """
    cols = len(self.columns)
    rows = -1
    for c in self.columns:
        size = c.size()
        if rows < 0 or size > rows:
            rows = size
    return (rows,cols)

def get_column(

self, *args, **kwargs)

@synchronized
def get_column(self, reference):
    return self.columns[self.map_column(reference)]

def get_columns(

self, *args, **kwargs)

return the list of columns

@synchronized
def get_columns(self):
    """ return the list of columns """
    return self.columns

def get_name(

self)

return the name of the table

def get_name(self):
    """ return the name of the table """
    return self.name

def get_names(

self, *args, **kwargs)

return a list of the names of the columns in order

@synchronized
def get_names(self):
    """ return a list of the names of the columns in order"""
    return [c.get_name() for c in self.columns]

def get_refresh_timestamp(

self)

get the time that the table was last refreshed

def get_refresh_timestamp( self ):
    """ get the time that the table was last refreshed """
    return self.refresh_timestamp

def has_column(

self, *args, **kwargs)

@synchronized
def has_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return reference in self.cnames
    elif type(reference) == int:
        return idx < len(self.columns)
    else:
        return False

def insert_column(

self, *args, **kwargs)

@synchronized
def insert_column(self,idx,column):
    while idx > len(self.columns):
        self.add_column(blank_column)
    if idx == len(self.columns):
        self.add_column(column)
    else:
        if not column.get_name():
            column.set_name("%s_%d"%(self.name,idx))
        self.columns.insert(idx,column)
        self.cnames[column.get_name()] = column
        column.set_table(self)
        while idx < len(self.columns):
            if column.get_name() == "%s_%d"%(self.name,idx-1):
                column.set_name("%s_%d"%(self.name,idx))
                self.cnames[column.get_name()] = column
            self.columns[idx].set_idx(idx)
            idx += 1

def listen(

self, listen_func)

register for notifications when a change event is raised on this table

def listen(self,listen_func):
    """ register for notifications when a change event is raised on this table """
    self.listeners.append(listen_func)

def map_column(

self, *args, **kwargs)

@synchronized
def map_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return self.cnames[reference].get_idx()
    elif type(reference) == int:
        return reference
    else:
        raise TypeError("wrong type in mapping")

def perform_refresh(

self)

Thread worker that sleeps and refreshes the data on a schedule

def perform_refresh( self ):
    """ Thread worker that sleeps and refreshes the data on a schedule """
    start_time = time.time()
    while not self.refresh_thread_stop:
        if time.time() - start_time >= self.refresh_minutes*60.0:
            self.refresh()
            start_time = time.time()
        time.sleep(1)

def put(

self, *args, **kwargs)

@synchronized
def put(self, row, reference, value):
    self.columns[self.map_column(reference)].put(row,value)

def refresh(

self, *args, **kwargs)

refresh or rebuild tables

@synchronized
def refresh( self ):
    """ refresh or rebuild tables """
    def get_bucket( line_spec,value ):
        if not self.has_column(value.column_name):
            self.add_column(Column(name=value.column_name))
        bc = self.get_column(value.column_name)
        for idx in range(bc.size()):
            if bc.get(idx).get_value() >= value.get_value():
                break
        else:
            idx = bc.size()
        if idx < bc.size():
            if line_spec["bucket_type"] == string_type:
                if bc.get(idx).get_value() != value.get_value():
                    bc.ins(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
            return idx
        elif idx == 0 and bc.size() > 0:
            diff = bc.get(idx).get_value() - value.get_value()
            if line_spec["bucket_type"] == date_type:
                while diff > timedelta(minutes=line_spec["bucket_size"]):
                    new_bucket = bc.get(idx).get_value() - timedelta(minutes=line_spec["bucket_size"])
                    bc.ins(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                    diff = bc.get(idx).get_value() - value.get_value()
                return idx
            elif line_spec["bucket_type"] == string_type:
                bc.ins(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
                return idx
            else:
                while diff > line_spec["bucket_size"]:
                    new_bucket = bc.get(idx).get_value() - line_spec["bucket_size"]
                    bc.ins(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                    diff = bc.get(idx).get_value() - value.get_value()
                return idx
        elif idx == bc.size():
            if line_spec["bucket_type"] == string_type:
                bc.put(idx,Cell(line_spec["bucket_type"],value.get_value(),format_map[line_spec["bucket_type"]]))
                return idx
            else:
                while True:
                    if idx > 0:
                        prev_bucket = bc.get(idx-1).get_value()
                    else:
                        prev_bucket = value.get_value()
                    if line_spec["bucket_type"] == date_type:
                        new_bucket = prev_bucket + timedelta(minutes=line_spec["bucket_size"])
                    else:
                        new_bucket = prev_bucket + line_spec["bucket_size"]
                    bc.put(idx,Cell(line_spec["bucket_type"],new_bucket,format_map[line_spec["bucket_type"]]))
                    if value.get_value() < new_bucket:
                        return idx
                    idx = bc.size()
    def put_value( value, bidx ):
        if not self.has_column(value.column_name):
            self.add_column(Column(name=value.column_name))
        cc = self.get_column(value.column_name)
        if bidx < cc.size():
            c = cc.get(bidx)
            if c.type == blank_type:
                cc.put(bidx,value.to_cell())
            else:
                cc.get(bidx).put_value(value.get_value())
        else:
            cc.put(bidx,value.to_cell())
    def prune_buckets( line_spec ):
        for group,column_name,type,action in line_spec["column_map"]:
            if self.has_column(column_name):
                cc = self.get_column(column_name)
                while cc.size() > line_spec["num_buckets"]:
                    cc.delete(0)
    def top_buckets( line_spec ):
        columns = []
        key_idx = None
        idx = 0
        for group,column_name,type,action in line_spec["column_map"]:
            columns.append(self.get_column(column_name))
            if action == "key":
                key_idx = idx
            idx += 1
        sort_rows = []
        for idx in range(columns[key_idx].size()):
            values = []
            for cidx in range(len(columns)):
                if cidx != key_idx:
                    values.append(columns[cidx].get(idx).get_value())
            values.append(idx)
            sort_rows.append(values)
        sort_rows.sort(reverse=True)
        new_columns = []
        for group,column_name,type,action in line_spec["column_map"]:
            new_columns.append(Column(name=column_name))
        for ridx in range(min(len(sort_rows),line_spec["num_buckets"])):
            for cidx in range(len(columns)):
                new_columns[cidx].put(sort_rows[ridx][-1],columns[cidx].get(sort_rows[ridx][-1]))
        for c in new_columns:
            self.replace_column(self.map_column(c.get_name()),c)
    lb_days,lb_hours,lb_minutes = self.log_lookback
    start_time = datetime.now() - timedelta(days=lb_days,hours=lb_hours,minutes=lb_minutes)
    log_files = glob.glob(self.log_glob)
    for lf in log_files:
        lfp = 0
        stat = os.stat(lf)
        if stat.st_mtime < start_time.timestamp():
            continue
        if lf in self.file_map:
            lft,lfp = self.file_map[lf]
            if stat.st_mtime <= lft:
                continue
        if lf.endswith(".gz"):
            lf_f = gzip.open(lf,"rt",encoding="utf-8")
        else:
            lf_f = open(lf,"r",encoding="utf-8")
        lf_f.seek(lfp,0)
        for line in lf_f:
            line = line.strip()
            for line_spec in self.log_map:
                m = re.match(line_spec["line_regex"],line)
                if m:
                    values = []
                    key_idx = None
                    for group,column_name,type,action in line_spec["column_map"]:
                        values.append(Value( column_name, type, action, m.group(group) ))
                        if action == "key":
                            key_idx = len(values)-1
                    bidx = get_bucket(line_spec,values[key_idx])
                    for v in values:
                        if v.action != "key":
                            put_value( v, bidx )
                    if values[key_idx].type != string_type:
                        prune_buckets(line_spec)
        self.file_map[lf] = (stat.st_mtime,lf_f.tell())
    for line_spec in self.log_map:
        key_idx = None
        idx = 0
        for group,column_name,type,action in line_spec["column_map"]:
            if action == "key":
                key_idx = idx
                break
            idx += 1
        kg,kn,kt,ka = line_spec["column_map"][key_idx]
        kc = self.get_column(kn)
        for idx in range(kc.size()):
            for fg,fn,ft,fa in line_spec["column_map"]:
                if fn != kn:
                    fc = self.get_column(fn)
                    cc = fc.get(idx)
                    if cc.type == blank_type:
                        fc.put(idx,ActionCell(ft,None,format_map[ft],fa))
        if kt == string_type:
            top_buckets( line_spec )
    self.changed()
    DataTable.refresh(self)

def release_refresh_lock(

self)

release the refresh lock after reading/writing the table state

def release_refresh_lock(self):
    """ release the refresh lock after reading/writing the table state """
    self.refresh_lock.release()

def replace_column(

self, *args, **kwargs)

@synchronized
def replace_column(self,idx,column):
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    if idx == len(self.columns):
        self.columns.append(column)
    else:
        del self.cnames[self.columns[idx].get_name()]
        self.columns[idx] = column
    self.cnames[column.get_name()] = column
    column.set_table(self)

def start_refresh(

self)

Start the background refresh thread

def start_refresh( self ):
    """ Start the background refresh thread """
    self.stop_refresh()
    self.refresh_thread = threading.Thread(target=self.perform_refresh)
    self.refresh_thread.start()

def stop_refresh(

self)

Stop the background refresh thread

def stop_refresh( self ):
    """ Stop the background refresh thread """
    self.refresh_thread_stop = True
    if self.refresh_thread and self.refresh_thread.is_alive():
        self.refresh_thread.join()
    self.refresh_thread = None
    self.refresh_thread_stop = False

def unlisten(

self, listen_func)

unregister for notifications when a change event is raised on this table

def unlisten(self,listen_func):
    """ unregister for notifications when a change event is raised on this table """
    self.listeners.remove(listen_func)

Instance variables

var file_map

var log_glob

var log_lookback

var log_map

class Value

structure for mapped values

class Value():
    """ structure for mapped values """
    def __init__(self,column_name,type,action,value):
        """ initialize the value structure with mapping information and value from log """
        self.column_name = column_name
        self.type = type
        self.action = action
        self.value = value

    def get_value(self):
        """ based on type return the value from the log """
        if self.type == date_type:
            try:
                return datetime.fromtimestamp(float(self.value))
            except:
                return parser.parse(self.value)
        elif self.type == int_type:
            if self.action.startswith("count("):
                return self.value
            else:
                return int(self.value)
        elif self.type == float_type:
            return float(self.value)
        elif self.type == string_type:
            return self.value
        else:
            return str(self.value)

    def to_cell(self):
        """ construct and return a cell based on type, action and value """
        return ActionCell( self.type, self.get_value(), format_map[self.type], self.action )

Ancestors (in MRO)

Static methods

def __init__(

self, column_name, type, action, value)

initialize the value structure with mapping information and value from log

def __init__(self,column_name,type,action,value):
    """ initialize the value structure with mapping information and value from log """
    self.column_name = column_name
    self.type = type
    self.action = action
    self.value = value

def get_value(

self)

based on type return the value from the log

def get_value(self):
    """ based on type return the value from the log """
    if self.type == date_type:
        try:
            return datetime.fromtimestamp(float(self.value))
        except:
            return parser.parse(self.value)
    elif self.type == int_type:
        if self.action.startswith("count("):
            return self.value
        else:
            return int(self.value)
    elif self.type == float_type:
        return float(self.value)
    elif self.type == string_type:
        return self.value
    else:
        return str(self.value)

def to_cell(

self)

construct and return a cell based on type, action and value

def to_cell(self):
    """ construct and return a cell based on type, action and value """
    return ActionCell( self.type, self.get_value(), format_map[self.type], self.action )

Instance variables

var action

var column_name

var type

var value