#!/usr/bin/env python3 # -*- encoding: utf-8 -*- ##################################################################### # (c) 2010-2011 by Sven Velt and team(ix) GmbH, Nuernberg, Germany # # sv@teamix.net # # (c) 2016- by Sven Velt, Germany # # sven-mymonplugins@velt.biz # # # # This file is part of "velt.biz - My Monitoring Plugins" # # a fork of "team(ix) Monitoring Plugins" in 2015 # # URL: https://gogs.velt.biz/velt.biz/MyMonPlugins/ # # # # This file is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published # # by the Free Software Foundation, either version 2 of the License, # # or (at your option) any later version. # # # # This file is distributed in the hope that it will be useful, but # # WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this file. If not, see . # ##################################################################### from .monitoringplugin import MonitoringPlugin import os class SNMPMonitoringPlugin(MonitoringPlugin): def __init__(self, *args, **kwargs): # Same as "MonitoringPlugin.__init__(*args, **kwargs)" but a little bit more flexible #super(MonitoringPlugin, self).__init__(*args, **kwargs) MonitoringPlugin.__init__(self, *args, **kwargs) self.add_cmdlineoption('-H', '', 'host', 'Host to check', default='127.0.0.1') self.add_cmdlineoption('-P', '', 'snmpversion', 'SNMP protocol version', metavar='1', default='1') self.add_cmdlineoption('-C', '', 'snmpauth', 'SNMP v1/v2c community OR SNMP v3 quadruple', metavar='public', default='public') self.add_cmdlineoption('', '--snmpcmdlinepath', 'snmpcmdlinepath', 'Path to "snmpget" and "snmpwalk"', metavar='/usr/bin/', default='/usr/bin') # FIXME self.add_cmdlineoption('', '--nonetsnmp', 'nonetsnmp', 'Do not use NET-SNMP python bindings', action='store_true') # self.__optparser.add_option('', '--nonetsnmp', dest='nonetsnmp', help='Do not use NET-SNMP python bindings', action='store_true') self.__SNMP_Cache = {} self.__use_netsnmp = False self.__prepared_snmp = False def prepare_snmp(self): if not self._cmdlineoptions_parsed: self.parse_cmdlineoptions() if not self.options.nonetsnmp: try: import netsnmp self.__use_netsnmp = True except ImportError: pass if self.__use_netsnmp: self.verbose(1, 'Using NET-SNMP Python bindings') self.SNMPGET_wrapper = self.__SNMPGET_netsnmp self.SNMPWALK_wrapper = self.__SNMPWALK_netsnmp if self.options.snmpversion == '2c': self.options.snmpversion = '2' else: self.verbose(1, 'Using NET-SNMP command line tools') self.SNMPGET_wrapper = self.__SNMPGET_cmdline self.SNMPWALK_wrapper = self.__SNMPWALK_cmdline # Building command lines self.__CMDLINE_get = os.path.join(self.options.snmpcmdlinepath, 'snmpget') + ' -OqevtU ' self.__CMDLINE_walk = os.path.join(self.options.snmpcmdlinepath, 'snmpwalk') + ' -OqevtU ' if self.options.snmpversion in [1, 2, '1', '2', '2c']: if self.options.snmpversion in [2, '2']: self.options.snmpversion = '2c' self.__CMDLINE_get += ' -v' + str(self.options.snmpversion) + ' -c' + self.options.snmpauth + ' ' self.__CMDLINE_walk += ' -v' + str(self.options.snmpversion) + ' -c' + self.options.snmpauth + ' ' elif options.snmpversion == [3, '3']: # FIXME: Better error handling try: snmpauth = self.options.snmpauth.split(':') self.__CMDLINE_get += ' -v3 -l' + snmpauth[0] + ' -u' + snmpauth[1] + ' -a' + snmpauth[2] + ' -A' + snmpauth[3] + ' ' self.__CMDLINE_walk += ' -v3 -l' + snmpauth[0] + ' -u' + snmpauth[1] + ' -a' + snmpauth[2] + ' -A' + snmpauth[3] + ' ' except: self.back2nagios(3, 'Could not build SNMPv3 command line, need "SecLevel:SecName:AuthProtocol:AuthKey"') else: self.back2nagios(3, 'Unknown SNMP version "' + str(self.options.snmpversion) + '"') self.__CMDLINE_get += ' ' + self.options.host + ' %s 2>/dev/null' self.__CMDLINE_walk += ' ' + self.options.host + ' %s 2>/dev/null' self.verbose(3, 'Using commandline: ' + self.__CMDLINE_get) self.verbose(3, 'Using commandline: ' + self.__CMDLINE_walk) # Test if snmp(get|walk) are executable for fpath in [self.__CMDLINE_get, self.__CMDLINE_walk,]: fpath = fpath.split(' ',1)[0] if not( os.path.exists(fpath) and os.path.isfile(fpath) and os.access(fpath, os.X_OK) ): self.back2nagios(3, 'Could not execute "%s"' % fpath) self.__prepared_snmp = True def find_index_for_value(self, list_indexes, list_values, wanted): self.verbose(2, 'Look for "' + str(wanted) + '"') index = None if len(list_indexes) != len(list_values): self.verbose(1, 'Length of index and value lists do not match!') return None try: index = list_values.index(wanted) index = list_indexes[index] except ValueError: pass if index: self.verbose(2, 'Found "' + str(wanted) +'" with index "' + str(index) + '"') else: self.verbose(2, 'Nothing found!') return index def find_in_table(self, oid_index, oid_values, wanted): self.verbose(2, 'Look for "' + str(wanted) + '" in "' + str(oid_values) +'"') index = None indexes = list(self.SNMPWALK(oid_index)) values = list(self.SNMPWALK(oid_values)) if len(indexes) != len(values): self.back2nagios(3, 'Different data from 2 SNMP Walks!') return self.find_index_for_value(indexes, values, wanted) def SNMPGET(self, baseoid, idx=None, exitonerror=True): if type(baseoid) in (list, tuple): if idx not in ['', None]: idx = '.' + str(idx) else: idx = '' if self.options.snmpversion in [1, '1']: value_low = int(self.SNMPGET_wrapper(baseoid[1] + idx, exitonerror=exitonerror)) if value_low < 0: value_low += 2 ** 32 value_hi = int(self.SNMPGET_wrapper(baseoid[2] + idx, exitonerror=exitonerror)) if value_hi < 0: value_hi += 2 ** 32 return value_hi * 2 ** 32 + value_low elif self.options.snmpversion in [2, 3, '2', '2c', '3']: return int(self.SNMPGET_wrapper(baseoid[0] + idx, exitonerror=exitonerror)) elif type(baseoid) in (str, ) and idx != None: return self.SNMPGET_wrapper(baseoid + '.' + str(idx), exitonerror=exitonerror) else: return self.SNMPGET_wrapper(baseoid, exitonerror=exitonerror) def SNMPWALK(self, baseoid, exitonerror=True): return self.SNMPWALK_wrapper(baseoid, exitonerror=exitonerror) def __SNMPGET_netsnmp(self, oid, exitonerror=True): if not self.__prepared_snmp: self.prepare_snmp() if oid in self.__SNMP_Cache: self.verbose(2, "%40s -> (CACHED) %s" % (oid, self.__SNMP_Cache[oid])) return self.__SNMP_Cache[oid] result = netsnmp.snmpget(oid, Version=int(self.options.snmpversion), DestHost=self.options.host, Community=self.options.snmpauth)[0] if not result: if exitonerror: self.back2nagios(3, 'Timeout or no answer from "%s" looking for "%s"' % (self.options.host, oid)) else: return None self.__SNMP_Cache[oid] = result self.verbose(2, "%40s -> %s" % (oid, result)) return result def __SNMPWALK_netsnmp(self, oid, exitonerror=True): if not self.__prepared_snmp: self.prepare_snmp() if oid in self.__SNMP_Cache: self.verbose(2, "%40s -> (CACHED) %s" % (oid, self.__SNMP_Cache[oid])) return self.__SNMP_Cache[oid] result = netsnmp.snmpwalk(oid, Version=int(self.options.snmpversion), DestHost=self.options.host, Community=self.options.snmpauth) if not result: if exitonerror: self.back2nagios(3, 'Timeout or no answer from "%s" looking for "%s"' % (self.options.host, oid)) else: return None self.__SNMP_Cache[oid] = result self.verbose(2, "%40s -> %s" % (oid, result)) return result def __SNMPGET_cmdline(self, oid, exitonerror=True): if not self.__prepared_snmp: self.prepare_snmp() cmdline = self.__CMDLINE_get % oid self.verbose(2, cmdline) if oid in self.__SNMP_Cache: self.verbose(2, "(CACHED) %s" % (self.__SNMP_Cache[oid])) return self.__SNMP_Cache[oid] cmd = os.popen(cmdline) out = cmd.readline().rstrip().replace('"','') retcode = cmd.close() if retcode: if not exitonerror: return None if retcode == 256: self.back2nagios(3, 'Timeout - no SNMP answer from "' + self.options.host + '"') elif retcode ==512: self.back2nagios(3, 'OID "' + oid + '" not found') else: self.back2nagios(3, 'Unknown error code "' + str(retcode) + '" from command line utils') self.__SNMP_Cache[oid] = out self.verbose(1, out) return out def __SNMPWALK_cmdline(self, oid, exitonerror=True): if not self.__prepared_snmp: self.prepare_snmp() cmdline = self.__CMDLINE_walk % oid self.verbose(2, cmdline) if oid in self.__SNMP_Cache: self.verbose(2, "(CACHED) %s" % (self.__SNMP_Cache[oid])) return self.__SNMP_Cache[oid] cmd = os.popen(cmdline) out = cmd.readlines() retcode = cmd.close() if retcode: if not exitonerror: return None if retcode == 256: self.back2nagios(3, 'Timeout - no SNMP answer from "' + self.options.host + '"') elif retcode ==512: self.back2nagios(3, 'OID "' + oid + '" not found') else: self.back2nagios(3, 'Unknown error code "' + str(retcode) + '" from command line utils') for line in range(0,len(out)): out[line] = out[line].rstrip().replace('"','') self.__SNMP_Cache[oid] = out self.verbose(1, str(out)) return out