I'd been aware of the Twisted
project for a while before I looked at it. When I did look, I was
overwhelmed by the amount of stuff therein, and I put off looking further.
I knew there was good stuff in there but I just didn't have the time,
nor the motivation to dive into it.
Then I read a small example of using it to implement an echo
server in Alex Martelli's wonderful Python in a Nutshell.
I decided I needed to look at Twisted again. I wrote a few very small
client and server programs for a co-worker. They worked! But I still
didn't understand how to use some of the fundamental concepts of Twisted
even though I could easily produce results using it. That bothered me.
Next came OSCON 2003 where I enrolled in a Twisted tutorial given by
Itamar Shtull-Trauring, one of the developers of Twisted. The good news was
that my understanding of Twisted expanded, especially in the area of
Factories and Protocols. The bad news was that I still didn't get one of
Twisted's key concepts - deferreds. Ugh. When I got back to work, I
resolved to make time for learning more about this powerful framework.
Meanwhile, I had projects to implement, one of which was perfect for
Twisted. It was an application which polled almost all of our network
devices via SNMP and stashed the data away for us to look at as needed.
Two problems were that there was no SNMP module written for Twisted,
and, even if there were, I wasn't confident enough to write the program
using Twisted anyway. I wrote it using threads and queues. If you know
python's queue module, you know this is fairly simple. It's up and running
now, but performance isn't that good, IMO. While the polling is running
it impacts its host machine quite heavily. It also takes longer than I
would like to finish a complete polling cycle.
Last month, TwistedSNMP
was announced on the Twisted mailing list. I grabed it and started
examining it. Of course it contained all the tricky bits that I didn't
yet understand, but I was determined to figure it out. I started out
by writing some code which attempts to find an working SNMP community
name for a given device. That code looked like this:
from twistedsnmp.snmpprotocol import SNMPProtocol
from twisted.internet import reactor
import pprint
class CredFinder:
def __init__(self, host, creds, version='v1'):
self.creds = creds
self.finished = False
self.success = False
oid = ['.1.3.6.1.2.1.1.3.0']
self.port = reactor.connectUDP(host, 161,
SNMPProtocol(host,
community=creds,
snmpVersion=version))
self.df = self.port.protocol.get(oid)
self.df.addCallbacks(self._onSuccess, self._onFailure)
def _onSuccess(self, value):
self.finished = True
self.success = True
def _onFailure(self, reason):
self.finished = True
if __name__ == '__main__':
creds = ['public1', 'foobar1', 'bad1', 'good1', 'public2', 'foobar2',
'bad2', 'good2', 'public3', 'foobar3', 'bad3', 'good3', '1LIwbUcMbp*']
wait_time = 0.01
d = {}
host= 'mkt14-sw.dccs.upenn.edu'
for cred in creds:
d[cred] = CredFinder(host, cred)
good_one = []
def checkDone():
l = []
for cred, finder in d.iteritems():
if finder.finished and finder.success:
good_one.append(cred)
print 'found it!'
reactor.stop()
else:
l.append(cred)
for k in l:
del d[k]
if not d:
reactor.stop()
else:
reactor.callLater(wait_time, checkDone)
reactor.callLater(wait_time, checkDone)
reactor.run()
print good_one
Terrible. I know. :-) At the time, I was just trying to get something
that worked, which this did. I got some feedback from an expert and
re-wrote the entire mess as follows:
"""
Find SNMP Community names asyncronous with a rate limit.
Notes:
It's a bad idea to start and stop the reactor. We should
return a deferred and let the caller handle the reactor.
The order of Credentials used isn't guaranteed.
SNMPProtocol was modified to save a reference to its
callback to _timeout. This allows us to cancel those
callbacks.
"""
from twistedsnmp.snmpprotocol import SNMPProtocol
from twisted.internet import reactor
def unique(l):
"""Returns list of unique items in l"""
return dict(zip(l,l)).keys()
class SNMPCredFinder(object):
"""
SNMPCredFinder
Send up to max_jobs number of SNMP queries, each using a different
community name in an attempt to find which community name works.
"""
max_jobs = 6
wait_time = 0
oid = ['.1.3.6.1.2.1.1.3.0']
def __init__(self, host, cred_list, snmp_version='v1'):
self.host = host
self.cred_list = unique(cred_list)
self.snmp_version = snmp_version
self.answer = None
self.protos = []
self.attempts = 0
def _succeeded(self, value, cred):
"""when a community name works, we stop"""
self.answer = cred
# cancel outstanding requests
for proto in self.protos:
proto.tid.cancel()
reactor.callLater(0, reactor.stop)
def _failed(self, reason, cred):
"""when a community name fails, we carry on until done"""
self.attempts -= 1
# If we've tried all community names, we're done
if not self.cred_list and self.attempts == 0:
reactor.stop()
def _add_creds(self):
"""send out more queries, up to our limit."""
if not self.cred_list:
return
num_free = self.max_jobs - self.attempts
while num_free > 0 and self.cred_list:
num_free -= 1
self._add_attempt(self.cred_list.pop(0))
if self.cred_list:
reactor.callLater(self.wait_time, self._add_creds)
def _add_attempt(self, cred):
"""start a new query attempt"""
ver = self.snmp_version
proto = SNMPProtocol(self.host, community=cred, snmpVersion=ver)
# save the SNMPProtocol so we can cancel its _timeout later
self.protos.append(proto)
port = reactor.connectUDP(self.host, 161, proto)
df = port.protocol.get(self.oid)
df.addCallback(self._succeeded, cred)
df.addErrback(self._failed, cred)
self.attempts += 1
def get_answer(self):
"""Returns a working community name, or None if it can't be found"""
if not self.cred_list:
return self.answer
# only rate limit if max_jobs > 1
if self.max_jobs < 1:
self.max_jobs = len(self.cred_list)
reactor.callLater(0, self._add_creds)
# blocks here
reactor.run()
return self.answer
def test():
import sys
creds = [
'public1', 'foobar1', 'bad1', 'good1', 'public2', 'foobar2',
'bad2', 'good2', 'public3', 'foobar3', 'bad3', 'good3',
'fuhgitaboutit',
]
try:
host = sys.argv[1]
except IndexError:
host = 'localhost'
job1 = SNMPCredFinder(host, creds)
job1.max_jobs = 0
answer = job1.get_answer()
print answer
job2 = SNMPCredFinder(host, creds)
job2.max_jobs = 5
answer = job2.get_answer()
print answer
if __name__ == '__main__':
test()
This probably looks better in many respects (cleaner, docstrings,
etc), but it's really a misue of Twisted because each instance of my
SNMPCredFinder class starts and stops Twisted's event loop, aka the
reactor. I learned from some Twisted gurus later that this is a bad idea,
and running my code showed the problem pretty clearly itself. It would
raise exceptions.
The problem was that after the SNMPCredFinder found a working community
name, it stopped the reactor thus leaving all its other SNMPProtocol
instances's "outstanding jobs" hanging around. Each of the
SNMPPprotocols had been sending SNMP packets with community names
that were wrong, so the device wouldn't respond. At the time they sent the
first query, they had also scheduled their next attempt to be run after a
timeout period using Twisted's reactor.callLater() method. Since SNMP
uses UDP, where you can't know if the packet you sent didn't reach its
destination, so resending queries is quite normal.
If I only used one instance of my SNMPCredFinder, I wouldn't see the
problem. However, if my code created a second instance after one had
succeeded, one exception would be raised for each "outstanding job" left
over from the previous SNMPProtocols as they tried to re-send the SNMP
packet on a socket that not longer existed (having been garbage-collected
by python I presume).
I had to add code in the SNMPProtocol class to deal with the side
effects of this misuse. I added a _cancel_timeout() method which stopped
them from sending further queries. You can see how I use it in the
SNMPCredFinder._succeeded() method. This did get rid of the Exceptions,
but I knew I shouldn't be abusing Twisted like this.
The "real" answer was to use deferreds. Yes, the dreaded d-word with
which I had struggled with each time I had looked at Twisted. By this
time however, understanding of deferreds had finally come to me. I re-wrote
my credential finder like this:
from twistedsnmp.snmpprotocol import SNMPProtocol
from twisted.internet import reactor, defer
from twisted.python import failure
def unique(l):
"""Returns list of unique items in l"""
return dict(zip(l,l)).keys()
class SNMPCredFinder(object):
"""
SNMPCredFinder
Send up to max_jobs number of SNMP queries, each using a different
community name in an attempt to find which community name works.
"""
max_jobs = 6
wait_time = 0
oid = ['.1.3.6.1.2.1.1.3.0']
def __init__(self, host, cred_list, snmp_version='v1'):
self.host = host
self.cred_list = unique(cred_list)
self.snmp_version = snmp_version
self.attempts = 0
def _succeeded(self, value, cred):
"""when a community name works, we stop"""
self.answer.callback((self.host, cred))
def _failed(self, reason):
"""when a community name fails, we carry on until done"""
self.attempts -= 1
if not self.cred_list and self.attempts == 0:
print 'ending on failure'
self.answer.errback((self.host, ''))
def _add_creds(self):
"""send out more queries, up to our limit."""
if not self.cred_list:
return
num_free = self.max_jobs - self.attempts
while num_free > 0 and self.cred_list:
num_free -= 1
self._add_attempt(self.cred_list.pop(0))
if self.cred_list:
reactor.callLater(self.wait_time, self._add_creds)
def _add_attempt(self, cred):
"""start a new query attempt"""
ver = self.snmp_version
proto = SNMPProtocol(self.host, community=cred, snmpVersion=ver)
port = reactor.connectUDP(self.host, 161, proto)
df = port.protocol.get(self.oid)
df.addCallback(self._succeeded, cred)
df.addErrback(self._failed)
self.attempts += 1
def get_answer(self):
"""Returns a deferred to (host, cred)"""
self.answer = defer.Deferred()
if not self.cred_list:
self.answer.errback((self.host, ''))
# only rate limit if max_jobs > 1
if self.max_jobs < 1:
self.max_jobs = len(self.cred_list)
reactor.callLater(0, self._add_creds)
return self.answer
def test():
import sys
creds = [
'public1', 'foobar1', 'bad1', 'good1', 'public2', 'foobar2',
'bad2', 'good2', 'public3', 'foobar3', 'bad3', 'good3',
'fuhgitaboutit',
]
try:
host = sys.argv[1]
except IndexError:
host = 'localhost'
def _onSuccess(result):
print '%s: %s' % result
def _onFailure(result):
if type(result) == type(()):
print result
job1 = SNMPCredFinder(host, creds)
job1.max_jobs = 0
answer = job1.get_answer()
answer.addCallbacks(_onSuccess, _onFailure)
job2 = SNMPCredFinder(host, creds)
job2.max_jobs = 5
answer2 = job2.get_answer()
answer2.addCallbacks(_onSuccess, _onFailure)
reactor.run()
if __name__ == '__main__':
test()
The new SNMPCredFinder.get_answer() method doesn't return a string,
it returns a deferred, and the reactor is controlled outside of the
class. Everything still works, and I don't get any Exceptions raised.
I imagine extra SNMP queries are still being sent since I don't cancel
them. Once I confirm that, I'll fix it.
I plan to actually use my SNMPCredFinder class in a project that I'm
working on, and I still need to redo my polling program to use Twisted
rather than threads. I assume an async version of it will be more
efficient, but I'll have to wait until its done to test that theory. The
good news is that I now have the tools and knowledge I need to make it
happen.
The deferred returning version also doesn't have a way to stop itself,
so it makes it hard to use as a library in non-twisted code, which is
what the rest of my stuff currenlty is. I have a few ideas on how to
solve this problem, but that there's another post.
Take care. |