Ich habe ein Skript, das ein Verzeichnis durchläuft und alle Dateien mit einer bestimmten Endung (d. H. XML) nach bestimmten Zeichenfolgen durchsucht und diese ersetzt. Um dies zu erreichen, habe ich die Python Multiprocessing Library verwendet.
Als Beispiel verwende ich 1100 .xml-Dateien mit ca. 200 MB Daten. Die vollständige Ausführungszeit beträgt auf meinem MBP "15 15" 8 Minuten.
Aber nach ein paar Minuten wird Prozess für Prozess einschlafen, was ich in "oben" sehe (hier nach 7m ...).
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPR PGRP PPID STATE BOOSTS %CPU_ME %CPU_OTHRS
1007 Python 0.0 07:03.51 1 0 7 5196K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1006 Python 99.8 07:29.07 1/1 0 7 4840K 0B 0B 998 998 running *0[1] 0.00000 0.00000
1005 Python 0.0 02:10.02 1 0 7 4380K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1004 Python 0.0 04:24.44 1 0 7 4624K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1003 Python 0.0 04:25.34 1 0 7 4572K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
1002 Python 0.0 04:53.40 1 0 7 4612K 0B 0B 998 998 sleeping *0[1] 0.00000 0.00000
Jetzt erledigt nur noch ein Prozess die ganze Arbeit, während die anderen nach 4 Minuten eingeschlafen sind.
Code-Auszug
# set cpu pool to cores in computer
pool_size = multiprocessing.cpu_count()
# create pool
pool = multiprocessing.Pool(processes=pool_size)
# give pool function and input data - here for each file in file_list
pool_outputs = pool.map(check_file, file_list)
# if no more tasks are available: close all
pool.close()
pool.join()
Warum schlafen dann alle Prozesse?
Meine Vermutung: Die Dateiliste ist auf alle Worker im Pool aufgeteilt (jeweils die gleiche Anzahl), und einige haben nur "Glück", die kleinen Dateien zu erhalten - und sind daher früher fertig. Kann das wahr sein? Ich dachte nur, dass es eher wie eine Warteschlange funktioniert, damit jeder Arbeiter eine neue Datei bekommt, wenn es fertig ist - bis die Liste leer ist.
Antworten:
2 für die Antwort № 1Wie @ Felipe-Lema betonte, handelt es sich um ein klassisches RTFM.
Ich habe den erwähnten Teil des Skripts mit einer Multiprocessing Queue anstelle eines Pools überarbeitet und die Laufzeit verbessert:
def check_files(file_list):
"""Checks and replaces lines in files
@param file_list: list of files to search
@return counter: number of occurrence """
# as much workers as CPUs are available (HT included)
workers = multiprocessing.cpu_count()
# create two queues: one for files, one for results
work_queue = Queue()
done_queue = Queue()
processes = []
# add every file to work queue
for filename in file_list:
work_queue.put(filename)
# start processes
for w in xrange(workers):
p = Process(target=worker, args=(work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put("STOP")
# wait until all processes finished
for p in processes:
p.join()
done_queue.put("STOP")
# beautify results and return them
results = []
for status in iter(done_queue.get, "STOP"):
if status is not None:
results.append(status)
return results