/ / python многопроцесорни въпроси - python, процес, опашка, мултипроцесиране

python multiprocessing - python, process, queue, multiprocessing

Имам няколко проблема, които се появяват, когато използвам процеси и опашки.

Когато изпълня следния код, целевата функция просто получава елемент от главната опашка и я добавя към друга опашка, специфична за този процес.

import sys
import multiprocessing
from Queue import Empty

# This is just taking a number from the queue
# and adding it to another queue
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break

# Create a master queue and fill it with numbers
main_queue = multiprocessing.Queue()
for i in xrange(100):
main_queue.put(i)

all_queues = []
processes = []
# Create processes
for i in xrange(5):
# Each process gets a queue that it will put numbers into
queue = multiprocessing.Queue()
# Keep up with the queue we are creating so we can get it later
all_queues.append(queue)
# Pass in our master queue and the queue we are transferring data to
process = multiprocessing.Process(target=my_callable,
args=(main_queue, queue))
# Keep up with the processes
processes.append(process)

for thread in processes:
thread.start()

for thread in processes:
thread.join()

Когато целевата функция отпечатва използваната опашка, ще забележите, че една опашка се използва почти изключително.

Ако след това вземете изхода и го отпечатате, ще видите, че повечето от номерата завършват под една опашка.

def queue_get_all(q):
items = []
maxItemsToRetreive = 100
for numOfItemsRetrieved in range(0, maxItemsToRetreive):
try:
if numOfItemsRetrieved == maxItemsToRetreive:
break
items.append(q.get_nowait())
except Empty, e:
break
return items

for tmp in all_queues:
print queue_get_all(tmp)

Какво причинява това? Има ли нещо в моя код, което трябва да правя, което ще изравни работата, която тези процеси правят?

OUTPUT

[0, 2, 3, 4, 5, 6, 7, 8]
[1, 9, 10]
[11, 14, 15, 16]
[12, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
[13]

Отговори:

2 за отговор № 1

Мисля, че имате два проблема тук:

def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break

От документите за получавам:

Премахнете и върнете елемент от опашката. Ако опционалният аргумент блок е True (по подразбиране) и timeout е None (по подразбиране), блокирайте, ако е необходимо, докато елементът е наличен. Ако timeout е положително число, то блокира най-много секунди за изчакване и повдига изключението Queue.Empty, ако в рамките на това време не е наличен елемент. В противен случай (блокът е False), върнете елемент, ако той е наличен веднага, иначе повдигнете изключението Queue.Empty (таймаутът се игнорира в този случай).

Тъй като вие минавате 0 като първи параметър, той е еквивалентен на get(False), Това го прави неблокиращ, което означава, ако това е такане може веднага да получи стойност, тя ще повдигне изключение за празно, което ще сложи край на работния ви процес. Тъй като всичките ви "работни" функции са идентични и се опитват да изтеглят от главната опашка едновременно, някои може да не успеят получи стойност веднага и ще умре.

Даване на .get() малък период на изчакване трябва да реши този проблем.

Вторият проблем е, че вашата "работа" функция отнема основно нула време за завършване. Дайте му малка пауза sleep(.2) да симулира някаква работа, която не е тривиална и тя ще разпредели по работниците:

def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(True, .1)
sleep(0.2)
to_queue.put(tmp)
except Empty:
break

РЕДАКТИРАНЕ:

Забравих да кажа, по принцип е по-добре за този тип проблеми да не разчитат на изтичането на .get() да сигнализира края на опашката. Получавате повече контрол, ако използвате някакъв тип маркер-обект "край на опашката", който преминавате в опашката, която казва на работниците, че е време да се откажат. По този начин можете да ги блокирате, чакайки нова команда или изходна "команда".