/ / python multiprocessing issues - python, process, queue, multiprocessing

python multiprocessing problémy - python, process, queue, multiprocessing

Mám niekoľko problémov, keď používam procesy a fronty.

Keď spustím nasledujúci kód, cieľová funkcia jednoducho získa položku z hlavného frontu a pridá ju do inej fronty špecifickej pre daný proces.

import sys
import multiprocessing
from Queue import Empty

# This is just taking a number from the queue
# and adding it to another queue
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break

# Create a master queue and fill it with numbers
main_queue = multiprocessing.Queue()
for i in xrange(100):
main_queue.put(i)

all_queues = []
processes = []
# Create processes
for i in xrange(5):
# Each process gets a queue that it will put numbers into
queue = multiprocessing.Queue()
# Keep up with the queue we are creating so we can get it later
all_queues.append(queue)
# Pass in our master queue and the queue we are transferring data to
process = multiprocessing.Process(target=my_callable,
args=(main_queue, queue))
# Keep up with the processes
processes.append(process)

for thread in processes:
thread.start()

for thread in processes:
thread.join()

Keď cieľová funkcia vytlačí použitú frontu, všimnete si, že jedna fronta sa používa takmer výlučne.

Ak vyberiete výstup a vytlačíte, uvidíte, že väčšina čísel skončí pod jedinou frontu.

def queue_get_all(q):
items = []
maxItemsToRetreive = 100
for numOfItemsRetrieved in range(0, maxItemsToRetreive):
try:
if numOfItemsRetrieved == maxItemsToRetreive:
break
items.append(q.get_nowait())
except Empty, e:
break
return items

for tmp in all_queues:
print queue_get_all(tmp)

Čo to spôsobuje? Existuje niečo, čo by som mal robiť v mojom kóde, ktorý by dokonca vyriešil prácu, ktorú tieto procesy robia?

VÝKON

[0, 2, 3, 4, 5, 6, 7, 8]
[1, 9, 10]
[11, 14, 15, 16]
[12, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
[13]

odpovede:

2 pre odpoveď č. 1

Myslím, že tu máte dva problémy:

def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break

Z dokumentov pre dostať:

Odstráňte a vráťte položku z frontu. Ak je voliteľný blok args pravdivý (predvolený) a časový limit je Žiadny (predvolený), v prípade potreby zablokujte, kým nie je k dispozícii položka. Ak je časový limit pozitívnym číslom, zablokuje sa vo väčšine sekúnd a zvyšuje výnimku Queue.Empty, ak v tom čase nebola k dispozícii žiadna položka. V opačnom prípade (blok je nepravdivý) vráťte položku, ak je okamžite k dispozícii, inak zvýšte výnimku Queue.Empty (časový limit je v tomto prípade ignorovaný).

Odkedy prechádzate 0 ako prvý parameter je ekvivalentný get(False), To z neho robí neblokovanie, čo znamená, ak to budenemôže získať hodnotu okamžite, že sa zvýši Prázdna výnimka, ktorá ukončí váš pracovný proces.Vzhľadom k tomu, všetky vaše "pracovné" funkcie sú identické a pokúsiť sa vytiahnuť z hlavnej fronty súčasne, niektorí nemusia byť schopní získajte okamžite hodnotu a zomriete.

Poskytovanie .get() malý časový limit by mal vyriešiť tento problém.

Druhým problémom je, že funkcia "pracovať" má v podstate nulový čas na dokončenie. Dajte mu trochu pauzu sleep(.2) simulovať nejakú nekvalifikovanú prácu a rozdelí sa medzi pracovníkov:

def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(True, .1)
sleep(0.2)
to_queue.put(tmp)
except Empty:
break

EDIT:

Zabudol som povedať, všeobecne je lepšie, aby sa tento typ problému nespoliehal na časový limit .get() na označenie konca frontu. Dostanete väčšiu kontrolu, ak použijete určitý typ objektu "end of queue", ktorý prejdete do frontu, ktorý informuje pracovníkov, že je čas prestať. Týmto spôsobom ich môžete blokovať a čakať na nové vstupy alebo príkazy na ukončenie.