It's easy to write multi-threaded programs with python. I've always done
it using a pool of worker threads that get their input from one queue and
send their output to another. Another thread uses their output queue as its
own input queue and processes the results in some manner, e.g. by inserting
it into a database. Here's a basic framework for doing this:
import Queue
import threading
import time
import random
class Worker(threading.Thread):
"""A worker thread."""
def __init__(self, input, output):
self._get_job = input.get
if output:
self._put_job = output.put
threading.Thread.__init__(self)
def run(self):
"""Get a job and process it. Stop when there's no more jobs"""
while True:
job = self._get_job()
if job is None:
break
self._process_job(job)
def _process_job(self, job):
"""Do useful work here."""
time.sleep(random.random())
result = job + 1
self._put_job(result)
class Recorder(Worker):
def _process_job(self, job):
"""Override Worker's _process_job method. Just print our input"""
print job
def main():
NUM_WORKERS = 20
job_queue = Queue.Queue(0)
results_queue = Queue.Queue(0)
# Create our pool of worker threads
for x in range(NUM_WORKERS):
Worker(job_queue, results_queue).start()
# Create our single recording thread
Recorder(results_queue, None).start()
# Give the workers some numbers to crunch
for x in range(NUM_WORKERS*2):
job_queue.put(x)
# Insert end of job markers
for x in range(NUM_WORKERS):
job_queue.put(None)
# Wait for all workers to end
while threading.activeCount() > 2:
time.sleep(0.1)
# Tell recording thread it can stop
results_queue.put(None)
# Wait for recording thread to stop
while threading.activeCount() == 2:
time.sleep(0.1)
if __name__ == '__main__':
main()
As you can see, this doesn't do anything useful, but you could easily subclass Worker with its own _process_job function to make it do what you want. Any questions?
Take care.
|