MyMonPlugins/MyMonPlugin/monitoringplugin.py

498 lines
14 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#####################################################################
# (c) 2010-2011 by Sven Velt and team(ix) GmbH, Nuernberg, Germany #
# sv@teamix.net #
2020-12-29 20:54:11 +00:00
# (c) 2016- by Sven Velt, Germany #
# sven-mymonplugins@velt.biz #
# #
# This file is part of "velt.biz - My Monitoring Plugins" #
# a fork of "team(ix) Monitoring Plugins" in 2015 #
# URL: https://gogs.velt.biz/velt.biz/MyMonPlugins/ #
# #
# This file is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published #
# by the Free Software Foundation, either version 2 of the License, #
# or (at your option) any later version. #
# #
# This file is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this file. If not, see <http://www.gnu.org/licenses/>. #
#####################################################################
2020-12-29 20:54:11 +00:00
import datetime
import optparse
import os
import re
import sys
class MonitoringPlugin(object):
RETURNSTRINGS = { 0: "OK", 1: "WARNING", 2: "CRITICAL", 3: "UNKNOWN", 127: "UNKNOWN" }
RETURNCODE = { 'OK': 0, 'WARNING': 1, 'CRITICAL': 2, 'UNKNOWN': 3 }
returncode_priority = [2, 1, 3, 0]
powers_binary = ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']
powers_binary_lower = [ p.lower() for p in powers_binary]
powers_si = ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']
powers_si_lower = [ p.lower() for p in powers_si]
def __init__(self, *args, **kwargs):
self.__pluginname = kwargs.get('pluginname') or ''
self.__version = kwargs.get('version') or None
self.__tagforstatusline = kwargs.get('tagforstatusline') or ''
self.__tagforstatusline = self.__tagforstatusline.replace('|', ' ')
self.__description = kwargs.get('description') or None
self.__output = []
self.__multilineoutput = []
self.__performancedata = []
self.__returncode = []
self.__brain_checks = []
self.__brain_perfdata = []
self.__brain_perfdatalabels = []
self.__optparser = optparse.OptionParser(version=self.__version, description=self.__description)
self._cmdlineoptions_parsed = False
def add_cmdlineoption(self, shortoption, longoption, dest, help, **kwargs):
2016-09-13 12:42:09 +00:00
if help == None:
help = optparse.SUPPRESS_HELP
self.__optparser.add_option(shortoption, longoption, dest=dest, help=help, **kwargs)
def parse_cmdlineoptions(self):
if self._cmdlineoptions_parsed:
return
# self.__optparser.add_option('-V', '--version', action='version', help='show version number and exit')
2020-12-29 21:22:51 +00:00
self.__optparser.add_option('-v', '--verbose', dest='verbose', help='Verbosity, more for more ;-)', default=0, action='count')
(self.options, self.args) = self.__optparser.parse_args()
self._cmdlineoptions_parsed = True
def args2checks(self):
checks = []
try:
separator = self.options.separator
except AttributeError:
separator = ','
try:
subseparator = self.options.subseparator
except AttributeError:
subseparator = '+'
for quad in self.args:
quad = quad.split(separator)
quad = (quad + ['', '', ''])[:4] # Fix length to 4, fill with ''
# Convert list of checks to list
if subseparator in quad[0]:
quad[0] = quad[0].split(subseparator)
else:
quad[0] = [quad[0],]
# Convert list of targets to list
if subseparator in quad[1]:
quad[1] = quad[1].split(subseparator)
else:
quad[1] = [quad[1],]
for target in quad[1]:
for check in quad[0]:
checks.append(tuple([check, target, quad[2], quad[3]]))
return(checks)
def range_to_limits(self, range):
# Check if we must negate result
if len(range) > 0 and range[0] == '@':
negate = True
range = range[1:]
else:
negate = False
# Look for a ':'...
if range.find(':') >= 0:
# ... this is a range
(low, high) = range.split(':')
if not low:
low = float(0.0)
elif low[0] == '~':
low = float('-infinity')
else:
low = float(low)
if high:
high = float(high)
else:
high = float('infinity')
elif len(range) == 0:
low = float('-infinity')
high = float('infinity')
else:
# ... this is just a number
low = float(0.0)
high = float(range)
return (low, high, negate)
def value_in_range(self, value, range):
if range not in ['', None]:
(low, high, negate) = self.range_to_limits(range)
else:
return True
if value < low or value > high:
result = False
else:
result = True
if negate:
result = not result
return result
def value_wc_to_returncode(self, value, range_warn, range_crit):
if not self.value_in_range(value, range_crit):
return 2
elif not self.value_in_range(value, range_warn):
return 1
return 0
def is_float(self, string):
try:
float(string)
return True
except ValueError:
return False
def special_value_wc_to_returncode(self, value, warn, crit):
# Special add on: WARN > CRIT
if self.is_float(warn) and self.is_float(crit) and float(warn) > float(crit):
# Test if value is *smaller* than thresholds
warn = '@0:' + warn
crit = '@0:' + crit
return self.value_wc_to_returncode(value, warn, crit)
def add_output(self, value):
self.__output.append(value)
def add_multilineoutput(self, value):
self.__multilineoutput.append(value)
def format_performancedata(self, label, value, unit, *args, **kwargs):
label = label.lstrip().rstrip()
if re.search('[=\' ]', label):
label = '\'' + label + '\''
perfdata = label + '=' + str(value)
if unit:
perfdata += str(unit).lstrip().rstrip()
for key in ['warn', 'crit', 'min', 'max']:
perfdata += ';'
if key in kwargs and kwargs[key]!=None:
perfdata += str(kwargs[key])
return perfdata
def add_performancedata(self, perfdata):
self.__performancedata.append(perfdata)
def format_add_performancedata(self, label, value, unit, *args, **kwargs):
self.add_performancedata(self.format_performancedata(label, value, unit, *args, **kwargs))
def add_returncode(self, value):
self.__returncode.append(value)
def tagtarget(self, tag, target):
if target:
return str(tag) + ':' + str(target)
else:
return str(tag)
def remember_check(self, tag, returncode, output, multilineoutput=None, perfdata=None, target=None):
check = {}
check['tag'] = tag
check['returncode'] = returncode
check['output'] = output
check['multilineoutput'] = multilineoutput
check['perfdata'] = perfdata
check['target'] = target
self.remember_perfdata(perfdata)
self.__brain_checks.append(check)
return check
def remember_perfdata(self, perfdata=None):
if perfdata:
for pd in perfdata:
if pd['label'] in self.__brain_perfdatalabels:
pdidx = self.__brain_perfdatalabels.index(pd['label'])
self.__brain_perfdata[pdidx] = pd
else:
self.__brain_perfdata.append(pd)
self.__brain_perfdatalabels.append(pd['label'])
def dump_brain(self):
return (self.__brain_checks, self.__brain_perfdata)
def brain2output(self):
if len(self.__brain_checks) == 1:
check = self.__brain_checks[0]
self.add_output(check.get('output'))
if check.get('multilineoutput'):
self.add_multilineoutput(check.get('multilineoutput'))
self.add_returncode(check.get('returncode') or 0)
else:
out = [[], [], [], []]
for check in self.__brain_checks:
tagtarget = self.tagtarget(check['tag'], check.get('target'))
returncode = check.get('returncode') or 0
self.add_returncode(returncode)
out[returncode].append(tagtarget)
self.add_multilineoutput(self.RETURNSTRINGS[returncode] + ' ' + tagtarget + ' - ' + check.get('output'))
if check.get('multilineoutput'):
self.add_multilineoutput(check.get('multilineoutput'))
statusline = []
for retcode in self.returncode_priority:
if len(out[retcode]):
statusline.append(str(len(out[retcode])) + ' ' + self.RETURNSTRINGS[retcode] + ': ' + ' '.join(out[retcode]))
statusline = ', '.join(statusline)
self.add_output(statusline)
for pd in self.__brain_perfdata:
self.format_add_performancedata(**pd)
def value_to_human_binary(self, value, unit=''):
for power in self.powers_binary:
if value < 1024.0:
return "%3.1f%s%s" % (value, power, unit)
value /= 1024.0
if float(value) not in [float('inf'), float('-inf')]:
return "%3.1fYi%s" % (value, unit)
else:
return value
def value_to_human_si(self, value, unit=''):
for power in self.powers_si:
if value < 1000.0:
return "%3.1f%s%s" % (value, power, unit)
value /= 1000.0
if float(value) not in [float('inf'), float('-inf')]:
return "%3.1fY%s" % (value, unit)
else:
return value
def seconds_to_hms(self, seconds):
seconds = int(seconds)
hours = int(seconds / 3600)
seconds -= (hours * 3600)
minutes = seconds / 60
seconds -= (minutes * 60)
return '%i:%02i:%02i' % (hours, minutes, seconds)
def seconds_to_timedelta(self, seconds):
return datetime.timedelta(seconds=int(seconds))
def human_to_number(self, value, total=None, unit=['',]):
if total:
if not self.is_float(total):
total = self.human_to_number(total, unit=unit)
if type(unit) == list:
unit = [u.lower() for u in unit]
elif type(unit) == str:
unit = [unit.lower(),]
else:
unit = ['',]
if value.lower()[-1] in unit:
value = value[0:-1]
if self.is_float(value):
return float(value)
elif value[-1] == '%':
if total:
return float(value[:-1])/100.0 * float(total)
else:
if total in [0, 0.0]:
return 0.0
else:
return float(value[:-1]) # FIXME: Good idea?
elif value[-1].lower() in self.powers_si_lower:
return 1000.0 ** self.powers_si_lower.index(value[-1].lower()) * float(value[:-1])
elif value[-2:].lower() in self.powers_binary_lower:
return 1024.0 ** self.powers_binary_lower.index(value[-2:].lower()) * float(value[:-2])
else:
return value
def range_dehumanize(self, range, total=None, unit=['',]):
newrange = ''
if len(range):
if range[0] == '@':
newrange += '@'
range = range[1:]
parts = range.split(':')
newrange += ('%s' % self.human_to_number(parts[0], total, unit)).rstrip('0').rstrip('.')
if len(parts) > 1:
newrange += ':' + ('%s' % self.human_to_number(parts[1], total, unit)).rstrip('0').rstrip('.')
if range != newrange:
self.verbose(3, 'Changed range/thresold from "' + range + '" to "' + newrange + '"')
return newrange
else:
return ''
def verbose(self, level, output, prefix=None):
if level <= self.options.verbose:
bol = 'V' + str(level) + ':' + ' ' * level
if prefix:
bol += '%s' % prefix
if type(output) in [str, ]:
print(bol + output.lstrip().rstrip().replace('\n', '\n' + bol))
elif type(output) in [list, ]:
print('\n'.join( ['%s%s' % (bol, l) for l in output] ) )
else:
print('%s%s' % (bol, output) )
def max_returncode(self, returncodes):
for rc in self.returncode_priority:
if rc in returncodes:
break
return rc
def exit(self):
returncode = self.max_returncode(self.__returncode)
self.back2nagios(returncode, statusline=self.__output, multiline=self.__multilineoutput, performancedata=self.__performancedata)
def back2nagios(self, returncode, statusline=None, multiline=None, performancedata=None, subtag=None, exit=True):
# FIXME: Make 'returncode' also accept strings
# Build status line
out = self.__tagforstatusline
if subtag:
out += '(' + subtag.replace('|', ' ') + ')'
out += ' ' + self.RETURNSTRINGS[returncode]
# Check if there's a status line text and build it
if statusline:
out += ' - '
if type(statusline) == str:
out += statusline
elif type(statusline) in [list, tuple]:
out += ', '.join(statusline).replace('|', ' ')
# Check if we have multi line output and build it
if multiline:
if type(multiline) == str:
out += '\n' + multiline.replace('|', ' ')
elif type(multiline) in [list, tuple]:
if type(multiline[0]) in [list, tuple]:
out += '\n' + '\n'.join([item for sublist in multiline for item in sublist]).replace('|', ' ')
else:
out += '\n' + '\n'.join(multiline).replace('|', ' ')
# Check if there's perfdata
if performancedata:
out += '|'
if type(performancedata) == str:
out += performancedata
elif type(performancedata) in [list, tuple]:
out += ' '.join(performancedata).replace('|', ' ')
# Exit program or return output line(s)
if exit:
print(out)
sys.exit(returncode)
else:
return (returncode, out)
##############################################################################
def main():
myplugin = MonitoringPlugin(pluginname='check_testplugin', tagforstatusline='TEST')
from pprint import pprint
pprint(myplugin.back2nagios(0, 'Nr. 01: Simple plugin', exit=False) )
pprint(myplugin.back2nagios(0, 'Nr. 02: Simple plugin with sub tag', subtag='MySubTag', exit=False) )
pprint(myplugin.back2nagios(0, 'Nr. 10: Exit Code OK', exit=False) )
pprint(myplugin.back2nagios(1, 'Nr. 11: Exit Code WARNING', exit=False) )
pprint(myplugin.back2nagios(2, 'Nr. 12: Exit Code CRITICAL', exit=False) )
pprint(myplugin.back2nagios(3, 'Nr. 13: Exit Code UNKNOWN', exit=False) )
ret = myplugin.back2nagios(0, 'Nr. 20: Plugin with string-based multiline output', 'Line 2\nLine 3\nLine4', exit=False)
print(ret[1])
print('Returncode: ' + str(ret[0]))
ret = myplugin.back2nagios(0, 'Nr. 21: Plugin with list-based multiline output', ['Line 2', 'Line 3', 'Line4'], exit=False)
print(ret[1])
print('Returncode: ' + str(ret[0]))
ret = myplugin.back2nagios(0, 'Nr. 22: Plugin with tuple-based multiline output', ('Line 2', 'Line 3', 'Line4'), exit=False)
print(ret[1])
print('Returncode: ' + str(ret[0]))
myplugin.add_performancedata('Val1', 42, '')
myplugin.add_performancedata('Val2', 23, 'c', warn=10, crit=20, min=0, max=100)
myplugin.add_performancedata('Val 3', '2342', 'c', warn=10, crit=20, min=0, max=100)
pprint(myplugin.back2nagios(0, 'Nr. 30: With perfdatas', exit=False) )
myplugin.back2nagios(0, 'Nr. 99: Exit test suite with OK')
if __name__ == '__main__':
main()
2020-12-29 20:54:11 +00:00
#vim: ts=4 sw=4 sts=4