I "m implementáciu výrobcu-spotrebiteľa vzor v python pomocou multiprocessing.Pool
a multiprocessing.Queue
, Spotrebitelia sú predvídané procesy, ktoré využívajú gevent
rozdávať viac úloh.
Tu je upravená verzia kódu:
import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time
# Task queue
queue = Queue()
def init_worker ():
# Ignore signals in worker
signal.signal( signal.SIGTERM, signal.SIG_IGN )
signal.signal( signal.SIGINT, signal.SIG_IGN )
signal.signal( signal.SIGQUIT, signal.SIG_IGN )
# One of the worker task
def worker_task1( ):
while True:
try:
m = queue.get( timeout = 2 )
# Break out if producer says quit
if m == "QUIT":
print "TIME TO QUIT"
break
except QueueEmpty:
pass
# Worker
def work( ):
gevent.joinall([
gevent.spawn( worker_task1 ),
])
pool = Pool( 2, init_worker )
for i in xrange( 2 ):
pool.apply_async( work )
try:
while True:
queue.put( "Some Task" )
time.sleep( 2 )
except KeyboardInterrupt as e:
print "STOPPING"
# Signal all workers to quit
for i in xrange( 2 ):
queue.put( "QUIT" )
pool.join()
Keď sa ho pokúsim ukončiť, dostanem nasledujúci stav:
- Rodičovský proces čaká na pripojenie sa jedného z detí.
- Jedno z detí je v zaniknutom stave. Tak skončil, ale rodič čaká na ďalšie dieťa až do konca.
- Iné dieťa zobrazuje:
futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ...
.
Aký je teda správny spôsob, ako takýto proces ukončiť čisto?
odpovede:
13 pre odpoveď č. 1Prišiel som na problém. Podľa dokumentácia pre multiprocessing.Pool.join()
, pool
musí byť close()ed
predtým, ako to bude možné join()ed
, pridanie pool.close()
pred pool.join()
problém vyriešil.