#!/usr/bin/python # -*- coding: utf8 -*- # # library for communicating with an isc dhcp server over the omapi protocol # # Copyright (C) 2010-2012 Cygnus Networks GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """ For an example see http://code.google.com/p/pypureomapi/wiki/Example. """ # Message format: # # authid (netint32) # authlen (netint32) # opcode (netint32) # handle (netint32) # tid (netint32) # rid (netint32) # message (dictionary) # object (dictionary) # signature (length is authlen) # # dictionary = entry* 0x00 0x00 # entry = key (net16str) value (net32str) __author__ = "Helmut Grohne, Torge Szczepanek" __copyright__ = "Cygnus Networks GmbH" __licence__ = "GPL-3" __version__ = "0.3" __maintainer__ = "Torge Szczepanek" __email__ = "info@cygnusnetworks.de" __all__ = [] import binascii import struct import hmac import io import socket import random import operator import sys try: basestring except NameError: basestring = str sysrand = random.SystemRandom() __all__.extend("OMAPI_OP_OPEN OMAPI_OP_REFRESH OMAPI_OP_UPDATE".split()) __all__.extend("OMAPI_OP_NOTIFY OMAPI_OP_STATUS OMAPI_OP_DELETE".split()) OMAPI_OP_OPEN = 1 OMAPI_OP_REFRESH = 2 OMAPI_OP_UPDATE = 3 OMAPI_OP_NOTIFY = 4 OMAPI_OP_STATUS = 5 OMAPI_OP_DELETE = 6 def repr_opcode(opcode): """Returns a textual representation for the given opcode. @type opcode: int @rtype: str """ opmap = {1: "open", 2: "refresh", 3: "update", 4: "notify", 5: "status", 6: "delete"} return opmap.get(opcode, "unknown (%d)" % opcode) __all__.append("OmapiError") class OmapiError(Exception): """OMAPI exception base class.""" __all__.append("OmapiSizeLimitError") class OmapiSizeLimitError(OmapiError): """Packet size limit reached.""" def __init__(self): OmapiError.__init__(self, "Packet size limit reached.") __all__.append("OmapiErrorNotFound") class OmapiErrorNotFound(OmapiError): """Not found.""" def __init__(self): OmapiError.__init__(self, "not found") class OutBuffer: """Helper class for constructing network packets.""" sizelimit = 65536 def __init__(self): self.buff = io.BytesIO() def __len__(self): """Return the number of bytes in the buffer. @rtype: int """ # On Py2.7 tell returns long, but __len__ is required to return int. return int(self.buff.tell()) def add(self, data): """ @type data: bytes @returns: self @raises OmapiSizeLimitError: """ if len(self) + len(data) > self.sizelimit: raise OmapiSizeLimitError() self.buff.write(data) return self def add_net32int(self, integer): """ @type integer: int @param integer: a 32bit unsigned integer @returns: self @raises OmapiSizeLimitError: """ if integer < 0 or integer >= (1 << 32): raise ValueError("not a 32bit unsigned integer") return self.add(struct.pack("!L", integer)) def add_net16int(self, integer): """ @type integer: int @param integer: a 16bit unsigned integer @returns: self @raises OmapiSizeLimitError: """ if integer < 0 or integer >= (1 << 16): raise ValueError("not a 16bit unsigned integer") return self.add(struct.pack("!H", integer)) def add_net32string(self, string): """ >>> r = b'\\x00\\x00\\x00\\x01x' >>> OutBuffer().add_net32string(b"x").getvalue() == r True @type string: bytes @param string: maximum length must fit in a 32bit integer @returns: self @raises OmapiSizeLimitError: """ if len(string) >= (1 << 32): raise ValueError("string too long") return self.add_net32int(len(string)).add(string) def add_net16string(self, string): """ >>> OutBuffer().add_net16string(b"x").getvalue() == b'\\x00\\x01x' True @type string: bytes @param string: maximum length must fit in a 16bit integer @returns: self @raises OmapiSizeLimitError: """ if len(string) >= (1 << 16): raise ValueError("string too long") return self.add_net16int(len(string)).add(string) def add_bindict(self, items): """ >>> r = b'\\x00\\x03foo\\x00\\x00\\x00\\x03bar\\x00\\x00' >>> OutBuffer().add_bindict({b"foo": b"bar"}).getvalue() == r True @type items: [(bytes, bytes)] or {bytes: bytes} @returns: self @raises OmapiSizeLimitError: """ if not isinstance(items, list): items = items.items() for key, value in items: self.add_net16string(key).add_net32string(value) return self.add(b"\x00\x00") # end marker def getvalue(self): """ >>> OutBuffer().add(b"sp").add(b"am").getvalue() == b"spam" True @rtype: bytes """ return self.buff.getvalue() def consume(self, length): """ >>> OutBuffer().add(b"spam").consume(2).getvalue() == b"am" True @type length: int @returns: self """ self.buff = io.BytesIO(self.getvalue()[length:]) return self class OmapiStartupMessage: """Class describing the protocol negotiation messages.""" implemented_protocol_version = 100 implemented_header_size = 4 * 6 def __init__(self, protocol_version=None, header_size=None): """ @type protocol_version: int or None @type header_size: int or None """ if protocol_version is None: protocol_version = self.implemented_protocol_version if header_size is None: header_size = self.implemented_header_size self.protocol_version = protocol_version self.header_size = header_size def validate(self): """Checks whether this OmapiStartupMessage matches the implementation. @raises OmapiError: """ if self.implemented_protocol_version != self.protocol_version: raise OmapiError("protocol mismatch") if self.implemented_header_size != self.header_size: raise OmapiError("header size mismatch") def as_string(self): """ @rtype: bytes """ ret = OutBuffer() self.serialize(ret) return ret.getvalue() def serialize(self, outbuffer): """Serialize this OmapiStartupMessage to the given outbuffer. @type outbuffer: OutBuffer """ outbuffer.add_net32int(self.protocol_version) outbuffer.add_net32int(self.header_size) class OmapiAuthenticatorBase: """Base class for OMAPI authenticators. @cvar authlen: is the length of a signature as returned by the sign method @type authlen: int @cvar algorithm: is a textual name for the algorithm @type algorithm: str or None @ivar authid: is the authenticator id as assigned during the handshake @type authid: int """ authlen = -1 # must be overwritten algorithm = None authid = -1 # will be an instance attribute def __init__(self): pass def auth_object(self): """ @rtype: {bytes: bytes} @returns: object part of an omapi authentication message """ raise NotImplementedError def sign(self, message): """ @type message: bytes @rtype: bytes @returns: a signature of length self.authlen """ raise NotImplementedError() class OmapiNullAuthenticator(OmapiAuthenticatorBase): authlen = 0 authid = 0 # always 0 def __init__(self): OmapiAuthenticatorBase.__init__(self) def auth_object(self): return {} def sign(self, _): return b"" class OmapiHMACMD5Authenticator(OmapiAuthenticatorBase): authlen = 16 algorithm = b"hmac-md5.SIG-ALG.REG.INT." def __init__(self, user, key): """ @type user: bytes @type key: bytes @param key: base64 encoded key @raises binascii.Error: for bad base64 encoding """ OmapiAuthenticatorBase.__init__(self) self.user = user self.key = binascii.a2b_base64(key) def auth_object(self): return {b"name": self.user, b"algorithm": self.algorithm} def sign(self, message): """ >>> authlen = OmapiHMACMD5Authenticator.authlen >>> len(OmapiHMACMD5Authenticator(b"foo", 16*b"x").sign(b"baz")) == authlen True @type message: bytes @rtype: bytes @returns: a signature of length self.authlen """ return hmac.HMAC(self.key, message).digest() __all__.append("OmapiMessage") class OmapiMessage: """ @type authid: int @ivar authid: The id of the message authenticator. @type opcode: int @ivar opcode: One out of OMAPI_OP_{OPEN,REFRESH,UPDATE,NOTIFY,STATUS,DELETE}. @type handle: int @ivar handle: The id of a handle acquired from a previous request or 0. @type tid: int @ivar tid: Transmission identifier. @type rid: int @ivar rid: Receive identifier (of a response is the tid of the request). @type message: [(bytes, bytes)] @ivar message: A list of (key, value) pairs. @type obj: [(bytes, bytes)] @ivar obj: A list of (key, value) pairs. @type signature: bytes @ivar signature: A signature on this message as generated by an authenticator. """ def __init__(self, authid=0, opcode=0, handle=0, tid=0, rid=0, message=None, obj=None, signature=b""): """ Construct an OmapiMessage from the given fields. No error checking is performed. @type authid: int @type opcode: int @type handle: int @type tid: int @param tid: The special value -1 causes a tid to be generated randomly. @type rid: int @type message: [(bytes, bytes)] @type obj: [(bytes, bytes)] @type signature: str @rtype: OmapiMessage """ self.authid, self.opcode, self.handle = authid, opcode, handle self.handle, self.tid, self.rid = handle, tid, rid self.message = message or [] self.obj = obj or [] self.signature = signature if self.tid == -1: self.generate_tid() def generate_tid(self): """Generate a random transmission id for this OMAPI message. >>> OmapiMessage(tid=-1).tid != OmapiMessage(tid=-1).tid True """ self.tid = sysrand.randrange(0, 1<<32) def serialize(self, outbuffer, forsigning=False): """ @type outbuffer: OutBuffer @type forsigning: bool @raises OmapiSizeLimitError: """ if not forsigning: outbuffer.add_net32int(self.authid) outbuffer.add_net32int(len(self.signature)) outbuffer.add_net32int(self.opcode) outbuffer.add_net32int(self.handle) outbuffer.add_net32int(self.tid) outbuffer.add_net32int(self.rid) outbuffer.add_bindict(self.message) outbuffer.add_bindict(self.obj) if not forsigning: outbuffer.add(self.signature) def as_string(self, forsigning=False): """ >>> len(OmapiMessage().as_string(True)) >= 24 True @type forsigning: bool @rtype: bytes @raises OmapiSizeLimitError: """ ret = OutBuffer() self.serialize(ret, forsigning) return ret.getvalue() def sign(self, authenticator): """Sign this OMAPI message. @type authenticator: OmapiAuthenticatorBase """ self.authid = authenticator.authid self.signature = b"\0" * authenticator.authlen # provide authlen self.signature = authenticator.sign(self.as_string(forsigning=True)) assert len(self.signature) == authenticator.authlen def verify(self, authenticators): """Verify this OMAPI message. @type authenticators: {int: OmapiAuthenticatorBase} @rtype: bool """ try: return authenticators[self.authid]. \ sign(self.as_string(forsigning=True)) == \ self.signature except KeyError: return False @classmethod def open(cls, typename): """Create an OMAPI open message with given typename. @type typename: bytes @rtype: OmapiMessage """ return cls(opcode=OMAPI_OP_OPEN, message=[(b"type", typename)], tid=-1) @classmethod def update(cls, handle): """Create an OMAPI update message for the given handle. @type handle: int @rytpe: OmapiMessage """ return cls(opcode=OMAPI_OP_UPDATE, handle=handle, tid=-1) @classmethod def delete(cls, handle): """Create an OMAPI delete message for given handle. @type handle: int @rtype: OmapiMessage """ return cls(opcode=OMAPI_OP_DELETE, handle=handle, tid=-1) def is_response(self, other): """Check whether this OMAPI message is a response to the given OMAPI message. @rtype: bool """ return self.rid == other.tid def update_object(self, update): """ @type update: {bytes: bytes} """ self.obj = [(key, value) for key, value in self.obj if key not in update] self.obj.extend(update.items()) def dump(self): """ @rtype: str @returns: a human readable representation of the message """ return "".join(("Omapi message attributes:\n", "authid:\t\t%d\n" % self.authid, "authlen:\t%d\n" % len(self.signature), "opcode:\t\t%s\n" % repr_opcode(self.opcode), "handle:\t\t%d\n" % self.handle, "tid:\t\t%d\n" % self.tid, "rid:\t\t%d\n" % self.rid, "message:\t%r\n" % self.message, "obj:\t\t%r\n" % self.obj, "signature:\t%r\n" % self.signature)) def parse_map(filterfun, parser): """Creates a new parser that passes the result of the given parser through the given filterfun. >>> list(parse_map(int, (None, "42"))) [None, 42] @type filterfun: obj -> obj @param parser: parser @returns: parser """ for element in parser: if element is None: yield None else: yield filterfun(element) break def parse_chain(*args): """Creates a new parser that executes the passed parsers (args) with the previous results and yields a tuple of the results. >>> list(parse_chain(lambda: (None, 1), lambda one: (None, 2))) [None, None, (1, 2)] @param args: parsers @returns: parser """ items = [] for parser in args: for element in parser(*items): if element is None: yield None else: items.append(element) break yield tuple(items) class InBuffer: sizelimit = 65536 def __init__(self, initial=b""): """ @type initial: bytes @param initial: initial value of the buffer @raises OmapiSizeLimitError: """ self.buff = b"" self.totalsize = 0 if initial: self.feed(initial) def feed(self, data): """ @type data: bytes @returns: self @raises OmapiSizeLimitError: """ if self.totalsize + len(data) > self.sizelimit: raise OmapiSizeLimitError() self.buff += data self.totalsize += len(data) return self def resetsize(self): """This method is to be called after handling a packet to reset the total size to be parsed at once and that way not overflow the size limit. """ self.totalsize = len(self.buff) def parse_fixedbuffer(self, length): """ @type length: int """ while len(self.buff) < length: yield None result = self.buff[:length] self.buff = self.buff[length:] yield result def parse_net16int(self): """ >>> hex(next(InBuffer(b"\\x01\\x02").parse_net16int())) '0x102' """ return parse_map(lambda data: struct.unpack("!H", data)[0], self.parse_fixedbuffer(2)) def parse_net32int(self): """ >>> hex(int(next(InBuffer(b"\\x01\\0\\0\\x02").parse_net32int()))) '0x1000002' """ return parse_map(lambda data: struct.unpack("!L", data)[0], self.parse_fixedbuffer(4)) def parse_net16string(self): """ >>> next(InBuffer(b"\\0\\x03eggs").parse_net16string()) == b'egg' True """ return parse_map(operator.itemgetter(1), parse_chain(self.parse_net16int, self.parse_fixedbuffer)) def parse_net32string(self): """ >>> next(InBuffer(b"\\0\\0\\0\\x03eggs").parse_net32string()) == b'egg' True """ return parse_map(operator.itemgetter(1), parse_chain(self.parse_net32int, self.parse_fixedbuffer)) def parse_bindict(self): """ >>> d = b"\\0\\x01a\\0\\0\\0\\x01b\\0\\0spam" >>> next(InBuffer(d).parse_bindict()) == [(b'a', b'b')] True """ entries = [] try: while True: for key in self.parse_net16string(): if key is None: yield None elif not key: raise StopIteration() else: for value in self.parse_net32string(): if value is None: yield None else: entries.append((key, value)) break break # Abusing StopIteration here, since nothing should be throwing # it at us. except StopIteration: yield entries def parse_startup_message(self): """results in an OmapiStartupMessage >>> d = b"\\0\\0\\0\\x64\\0\\0\\0\\x18" >>> next(InBuffer(d).parse_startup_message()).validate() """ return parse_map(lambda args: OmapiStartupMessage(*args), parse_chain(self.parse_net32int, lambda _: self.parse_net32int())) def parse_message(self): """results in an OmapiMessage""" parser = parse_chain(self.parse_net32int, # authid lambda *_: self.parse_net32int(), # authlen lambda *_: self.parse_net32int(), # opcode lambda *_: self.parse_net32int(), # handle lambda *_: self.parse_net32int(), # tid lambda *_: self.parse_net32int(), # rid lambda *_: self.parse_bindict(), # message lambda *_: self.parse_bindict(), # object lambda *args: self.parse_fixedbuffer(args[1])) # signature return parse_map(lambda args: # skip authlen in args: OmapiMessage(*(args[0:1] + args[2:])), parser) if isinstance(bytes(b"x")[0], int): def bytes_to_int_seq(b): return b int_seq_to_bytes = bytes # raises ValueError else: def bytes_to_int_seq(b): return map(ord, b) def int_seq_to_bytes(s): return "".join(map(chr, s)) # raises ValueError __all__.append("pack_ip") def pack_ip(ipstr): """Converts an ip address given in dotted notation to a four byte string in network byte order. >>> len(pack_ip("127.0.0.1")) 4 >>> pack_ip("foo") Traceback (most recent call last): ... ValueError: given ip address has an invalid number of dots @type ipstr: str @rtype: bytes @raises ValueError: for badly formatted ip addresses """ if not isinstance(ipstr, basestring): raise ValueError("given ip address is not a string") parts = ipstr.split('.') if len(parts) != 4: raise ValueError("given ip address has an invalid number of dots") parts = map(int, parts) # raises ValueError return int_seq_to_bytes(parts) # raises ValueError __all__.append("unpack_ip") def unpack_ip(fourbytes): """Converts an ip address given in a four byte string in network byte order to a string in dotted notation. >>> unpack_ip(b"dead") '100.101.97.100' >>> unpack_ip(b"alive") Traceback (most recent call last): ... ValueError: given buffer is not exactly four bytes long @type fourbytes: bytes @rtype: str @raises ValueError: for bad input """ if not isinstance(fourbytes, bytes): raise ValueError("given buffer is not a string") if len(fourbytes) != 4: raise ValueError("given buffer is not exactly four bytes long") return ".".join(map(str, bytes_to_int_seq(fourbytes))) __all__.append("pack_mac") def pack_mac(macstr): """Converts a mac address given in colon delimited notation to a six byte string in network byte order. >>> pack_mac("30:31:32:33:34:35") == b'012345' True >>> pack_mac("bad") Traceback (most recent call last): ... ValueError: given mac addresses has an invalid number of colons @type macstr: str @rtype: bytes @raises ValueError: for badly formatted mac addresses """ if not isinstance(macstr, basestring): raise ValueError("given mac addresses is not a string") parts = macstr.split(":") if len(parts) != 6: raise ValueError("given mac addresses has an invalid number of colons") parts = [int(part, 16) for part in parts] # raises ValueError return int_seq_to_bytes(parts) # raises ValueError __all__.append("unpack_mac") def unpack_mac(sixbytes): """Converts a mac address given in a six byte string in network byte order to a string in colon delimited notation. >>> unpack_mac(b"012345") '30:31:32:33:34:35' >>> unpack_mac(b"bad") Traceback (most recent call last): ... ValueError: given buffer is not exactly six bytes long @type sixbytes: bytes @rtype: str @raises ValueError: for bad input """ if not isinstance(sixbytes, bytes): raise ValueError("given buffer is not a string") if len(sixbytes) != 6: raise ValueError("given buffer is not exactly six bytes long") return ":".join(map("%2.2x".__mod__, bytes_to_int_seq(sixbytes))) __all__.append("Omapi") class Omapi: def __init__(self, hostname, port, username=None, key=None, debug=False): """ @type hostname: str @type port: int @type username: bytes or None @type key: bytes or None @type debug: bool @param key: if given, it must be base64 encoded @raises binascii.Error: for bad base64 encoding @raises socket.error: @raises OmapiError: """ self.hostname = hostname self.port = port self.authenticators = {0: OmapiNullAuthenticator()} self.defauth = 0 self.debug = debug newauth = None if username is not None and key is not None: newauth = OmapiHMACMD5Authenticator(username, key) self.connection = socket.socket() self.inbuffer = InBuffer() self.connection.connect((hostname, port)) self.send_protocol_initialization() self.recv_protocol_initialization() if newauth: self.initialize_authenticator(newauth) def close(self): """Close the omapi connection if it is open.""" if self.connection: self.connection.close() self.connection = None def check_connected(self): """Raise an OmapiError unless connected. @raises OmapiError: """ if not self.connection: raise OmapiError("not connected") def recv_conn(self, length): """Receive up to length bytes of data from the connection. @type length: int @raises OmapiError: if not connected @raises socket.error: """ self.check_connected() try: return self.connection.recv(length) except socket.error: self.close() raise def send_conn(self, data): """Send all of data to the connection. @type data: str @raises OmapiError: if not connected @raises socket.error: """ self.check_connected() try: self.connection.sendall(data) except socket.error: self.close() raise def fill_inbuffer(self): """ @raises OmapiError: @raises socket.error: """ data = self.recv_conn(2048) if not data: self.close() raise OmapiError("connection closed") try: self.inbuffer.feed(data) except OmapiSizeLimitError: self.close() raise def send_protocol_initialization(self): """ @raises OmapiError: @raises socket.error: """ self.check_connected() self.send_conn(OmapiStartupMessage().as_string()) def recv_protocol_initialization(self): """ @raises OmapiError: @raises socket.error: """ for result in self.inbuffer.parse_startup_message(): if result is None: self.fill_inbuffer() else: self.inbuffer.resetsize() try: result.validate() except OmapiError: self.close() raise def receive_message(self): """Read the next message from the connection. @rtype: OmapiMessage @raises OmapiError: @raises socket.error: """ for message in self.inbuffer.parse_message(): if message is None: self.fill_inbuffer() else: self.inbuffer.resetsize() if not message.verify(self.authenticators): self.close() raise OmapiError("bad omapi message signature") if self.debug: sys.stdout.write("debug recv\n") sys.stdout.write(message.dump()) return message def receive_response(self, message, insecure=False): """Read the response for the given message. @type message: OmapiMessage @type insecure: bool @param insecure: avoid an OmapiError about a wrong authenticator @rtype: OmapiMessage @raises OmapiError: @raises socket.error: """ response = self.receive_message() if not response.is_response(message): raise OmapiError("received message is not the desired response") # signature already verified if response.authid != self.defauth and not insecure: raise OmapiError("received message is signed with wrong " + "authenticator") return response def send_message(self, message, sign=True): """Sends the given message to the connection. @type message: OmapiMessage @type sign: bool @param sign: whether the message needs to be signed @raises OmapiError: @raises socket.error: """ self.check_connected() if sign: message.sign(self.authenticators[self.defauth]) if self.debug: sys.stdout.write("debug send\n") sys.stdout.write(message.dump()) self.send_conn(message.as_string()) def query_server(self, message): """Send the message and receive a response for it. @type message: OmapiMessage @rtype: OmapiMessage @raises OmapiError: @raises socket.error: """ self.send_message(message) return self.receive_response(message) def initialize_authenticator(self, authenticator): """ @type authenticator: OmapiAuthenticatorBase @raises OmapiError: @raises socket.error: """ msg = OmapiMessage.open(b"authenticator") msg.update_object(authenticator.auth_object()) response = self.query_server(msg) if response.opcode != OMAPI_OP_UPDATE: raise OmapiError("received non-update response for open") authid = response.handle if authid == 0: raise OmapiError("received invalid authid from server") self.authenticators[authid] = authenticator authenticator.authid = authid self.defauth = authid def add_host(self, ip, mac): """ @type ip: str @type mac: str @raises ValueError: @raises OmapiError: @raises socket.error: """ msg = OmapiMessage.open(b"host") msg.message.append((b"create", struct.pack("!I", 1))) msg.message.append((b"exclusive", struct.pack("!I", 1))) msg.obj.append((b"hardware-address", pack_mac(mac))) msg.obj.append((b"hardware-type", struct.pack("!I", 1))) msg.obj.append((b"ip-address", pack_ip(ip))) response = self.query_server(msg) if response.opcode != OMAPI_OP_UPDATE: raise OmapiError("add failed") def del_host(self, mac): """ @type mac: str @raises ValueError: @raises OmapiError: @raises socket.error: """ msg = OmapiMessage.open(b"host") msg.obj.append((b"hardware-address", pack_mac(mac))) msg.obj.append((b"hardware-type", struct.pack("!I", 1))) response = self.query_server(msg) if response.opcode != OMAPI_OP_UPDATE: raise OmapiErrorNotFound() if response.handle == 0: raise OmapiError("received invalid handle from server") response = self.query_server(OmapiMessage.delete(response.handle)) if response.opcode != OMAPI_OP_STATUS: raise OmapiError("delete failed") def lookup_ip(self, mac): """ @type mac: str @rtype: str or None @raises ValueError: @raises OmapiError: @raises socket.error: """ msg = OmapiMessage.open(b"lease") msg.obj.append((b"hardware-address", pack_mac(mac))) response = self.query_server(msg) if response.opcode != OMAPI_OP_UPDATE: raise OmapiErrorNotFound() try: return unpack_ip(dict(response.obj)[b"ip-address"]) except KeyError: # ip-address raise OmapiErrorNotFound() def lookup_mac(self, ip): """ @type ip: str @rtype: str or None @raises ValueError: @raises OmapiError: @raises socket.error: """ msg = OmapiMessage.open(b"lease") msg.obj.append((b"ip-address", pack_ip(ip))) response = self.query_server(msg) if response.opcode != OMAPI_OP_UPDATE: raise OmapiErrorNotFound() try: return unpack_mac(dict(response.obj)[b"hardware-address"]) except KeyError: # hardware-address raise OmapiErrorNotFound() if __name__ == '__main__': import doctest doctest.testmod()