/ / как да запазите оригиналните базисни имена на файловете в ftp източник на фюм агент - hadoop, ftp, sftp, flume, flume-ng

τη30 not30 τη not not year not30 τη30 not τη30 τη30 τη year year not τη year τη not year τη τη year not not year not not year year30 τη τη30 τη τη30 not30 τη30 not year year30 τη30 not30 τη30 τη30 τη τη30 τη30 τη30 τη30 τη30 τη30 τη30 τη30 τη30 τη30 τη30 τη30 not τη30 τη τη τη30 τη year not not year year not year not not year not not year not not30 τη30 not τη30 τη30 τη30 τη30 τη30 τη30 τη30 τη30 τη30 not τη30 τη30 τη τη30 not not not not year not not not not year not not year not not

Конфигурирам агент, който прочита от FTPсървър и изпращане на файлове към hdfs мивка. Моят голям проблем е, че искам да съхранявам файлове в hdfs с оригиналното им име. Опитах се с Spooldir източник и работи добре и е в състояние да съхранявате файлове в hdfs с техните basename, но фюмър агент смаже:

1) Ако файлът е записан в след като е бил поставен в директорията за прескачане, Flume ще отпечата грешка в своя регистрационен файл и ще прекрати обработката.

2) Ако името на файла бъде повторно използвано по-късно, Flume ще отпечата грешка в своя регистрационен файл и ще прекрати обработката.

Всъщност, източникът на spooldir не е подходящ за употребата ми. И така, има ли идея как да направите ftp източник запази името на файла, впоследствие hdfs съхранява файлове отделно според имената им.

Това е моят агент:

agent.sources = r1
agent.channels = c1
agent.sinks = k

#configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = pwd
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/flume_ftp_source
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
agent.sources.r1.flushlines = true

#configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path =  hdfs://hostname:8020/user/admin/DATA/import_flume/agents/agent1/%Y/%m/%d/%H
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize =    1000000
agent.sinks.k.hdfs.fileType = DataStream

agent.channels.c1.type = memory
agent.channels.c1.capacity =  1000000
agent.channels.c1.transactionCapacity =   1000000

agent.sources.r1.channels = c1
agent.sinks.k.channel = c1

Отговори:

0 за отговор № 1

Току-що избухнах решение за проекта за FTP github на фюм:

KR, Филип

Има ли трик за това как да се определи факта, че собствеността% {basename} липсва?


0 за отговор № 2

Както казах, съгласно следния код актуализиране: Прикрепвам за източник на FFT Flood, ето как използвах променливата% {basename}:

##############
# COMPONENTS #
##############
myPrj.sources = source_01
myPrj.channels = channel_01
myPrj.sinks = sink_01

############
# BINDINGS #
############
myPrj.sources.source_01.channels = channel_01
myPrj.sinks.sink_01.channel = channel_01

###########
# CHANNEL #
###########
myPrj.channels.channel_01.type = memory
myPrj.channels.channel_01.capacity = 10000
myPrj.channels.channel_01.transactionCapacity = 10000

##########
# SOURCE #
##########
myPrj.sources.source_01.type = org.keedio.flume.source.ftp.source.Source

myPrj.sources.source_01.client.source = ftp
myPrj.sources.source_01.name.server = 127.0.0.1
myPrj.sources.source_01.user = myPrj
myPrj.sources.source_01.password = myPrj
myPrj.sources.source_01.port = 21

#myPrj.sources.source_01.security.enabled = true
#myPrj.sources.source_01.security.cipher = TLS
#myPrj.sources.source_01.security.certificate.enabled = true
#myPrj.sources.source_01.path.keystore = /paht/to/keystore
#myPrj.sources.source_01.store.pass = the_keyStore_password

myPrj.sources.source_01.run.discover.delay = 5000
myPrj.sources.source_01.flushlines = false
myPrj.sources.source_01.chunk.size = 33554432

myPrj.sources.source_01.folder = /home/foo/app-flume-ftp-hdfs
myPrj.sources.source_01.file.name = flume-ftp-hdfs.ser

myPrj.sources.source_01.fileHeader = true
myPrj.sources.source_01.basenameHeader = true

# Deserializer
myPrj.sources.source_01.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
myPrj.sources.source_01.deserializer.maxBlobLength = 33554432

########
# SINK #
########
#myPrj.sinks.sink_01.type = logger
myPrj.sinks.sink_01.type = hdfs
myPrj.sinks.sink_01.hdfs.path = /user/foo/ftp_source/%Y/%m/%d/%H/%M/%{basename}
myPrj.sinks.sink_01.hdfs.filePrefix = FTP_SOURCE
myPrj.sinks.sink_01.hdfs.useLocalTimeStamp = true
myPrj.sinks.sink_01.hdfs.rollCount = 0
myPrj.sinks.sink_01.hdfs.rollInterval = 0
myPrj.sinks.sink_01.hdfs.batchSize = 100

# Data compressed
#myPrj.sinks.sink_01.hdfs.rollSize = 33554432
#myPrj.sinks.sink_01.hdfs.codeC = gzip
#myPrj.sinks.sink_01.hdfs.fileType = CompressedStream

# Data no compressed
myPrj.sinks.sink_01.hdfs.rollSize = 33554432
myPrj.sinks.sink_01.hdfs.fileType = DataStream