/ / Pomijanie łańcuchów Disco zmniejsza się - python, mapreduce, disco

Pomijanie łańcuchów dyskotekowych zmniejsza się - python, mapreduce, disco

Niedawno znalazłem Disco Project i naprawdę podoba mi się w porównaniu do Hadoop, ale mam problem. Mój projekt jest tak skonfigurowany (chętnie wycięę / wkleję prawdziwy kod, jeśli to pomoże):

myfile.py

from disco.core import Job, result_iterator
import collections, sys
from disco.worker.classic.func import chain_reader
from disco.worker.classic.worker import Params

def helper1():
#do stuff

def helper2():
#do stuff
.
.
.
def helperN():
#do stuff

class A(Job):
@staticmethod
def map_reader(fd, params):
#Read input file
yield line

def map(self, line, params):
#Process lines into dictionary
#Iterate dictionary
yield k, v

def reduce(self, iter, out, params):
#iterate iter
#Process k, v into dictionary, aggregating values
#Process dictionry
#Iterate dictionary
out.add(k,v)

Class B(Job):

map_reader = staticmethod(chain_reader)
map = staticmethod(nop_map)

reduce(self, iter, out, params):
#Process iter
#iterate results
out.add(k,v)


if __name__ == "__main__":
from myfile import A, B
job1 = A().run(input=[input_filename], params=Params(k=k))
job2 = B().run(input=[job1.wait()], params=Params(k=k))
with open(output_filename, "w") as fp:
for count, line in result_iterator(job2.wait(show=True)):
fp.write(str(count) + "," + line + "n")

Mój problem polega na tym, że przepływ pracy całkowicie pomija redukcję A i zmniejsza się do B zmniejszając.

Jakieś pomysły, co tu się dzieje?

Odpowiedzi:

0 dla odpowiedzi № 1

To był łatwy, ale subtelny problem: nie miałem

show = True

do pracy1. Z jakiegoś powodu, przy ustawieniu show dla job2, pokazywał mi kroki map () i map-shuffle () z zadania1, ponieważ nie otrzymałem końcowego wyniku, którego oczekiwałem, a wejście do jednego z zadań2 wygląda źle , Przeskoczyłem do wniosku, że zadanie1 nie działało prawidłowo (zostało to dodatkowo potwierdzone, że zanim dodałem zadanie2, zweryfikowałem dokładność danych wyjściowych zadania1).