Конфигурирам агент, който прочита от FTPсървър и изпращане на файлове към hdfs мивка. Моят голям проблем е, че искам да съхранявам файлове в hdfs с оригиналното им име. Опитах се с Spooldir източник и работи добре и е в състояние да съхранявате файлове в hdfs с техните basename, но фюмър агент смаже:
1) Ако файлът е записан в след като е бил поставен в директорията за прескачане, Flume ще отпечата грешка в своя регистрационен файл и ще прекрати обработката.
2) Ако името на файла бъде повторно използвано по-късно, Flume ще отпечата грешка в своя регистрационен файл и ще прекрати обработката.
Всъщност, източникът на spooldir не е подходящ за употребата ми. И така, има ли идея как да направите ftp източник запази името на файла, впоследствие hdfs съхранява файлове отделно според имената им.
Това е моят агент:
agent.sources = r1
agent.channels = c1
agent.sinks = k
#configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = pwd
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/flume_ftp_source
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
agent.sources.r1.flushlines = true
#configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path = hdfs://hostname:8020/user/admin/DATA/import_flume/agents/agent1/%Y/%m/%d/%H
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize = 1000000
agent.sinks.k.hdfs.fileType = DataStream
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 1000000
agent.sources.r1.channels = c1
agent.sinks.k.channel = c1
Отговори:
0 за отговор № 1Току-що избухнах решение за проекта за FTP github на фюм:
KR, Филип
Има ли трик за това как да се определи факта, че собствеността% {basename} липсва?
0 за отговор № 2
Както казах, съгласно следния код актуализиране: Прикрепвам за източник на FFT Flood, ето как използвах променливата% {basename}:
##############
# COMPONENTS #
##############
myPrj.sources = source_01
myPrj.channels = channel_01
myPrj.sinks = sink_01
############
# BINDINGS #
############
myPrj.sources.source_01.channels = channel_01
myPrj.sinks.sink_01.channel = channel_01
###########
# CHANNEL #
###########
myPrj.channels.channel_01.type = memory
myPrj.channels.channel_01.capacity = 10000
myPrj.channels.channel_01.transactionCapacity = 10000
##########
# SOURCE #
##########
myPrj.sources.source_01.type = org.keedio.flume.source.ftp.source.Source
myPrj.sources.source_01.client.source = ftp
myPrj.sources.source_01.name.server = 127.0.0.1
myPrj.sources.source_01.user = myPrj
myPrj.sources.source_01.password = myPrj
myPrj.sources.source_01.port = 21
#myPrj.sources.source_01.security.enabled = true
#myPrj.sources.source_01.security.cipher = TLS
#myPrj.sources.source_01.security.certificate.enabled = true
#myPrj.sources.source_01.path.keystore = /paht/to/keystore
#myPrj.sources.source_01.store.pass = the_keyStore_password
myPrj.sources.source_01.run.discover.delay = 5000
myPrj.sources.source_01.flushlines = false
myPrj.sources.source_01.chunk.size = 33554432
myPrj.sources.source_01.folder = /home/foo/app-flume-ftp-hdfs
myPrj.sources.source_01.file.name = flume-ftp-hdfs.ser
myPrj.sources.source_01.fileHeader = true
myPrj.sources.source_01.basenameHeader = true
# Deserializer
myPrj.sources.source_01.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
myPrj.sources.source_01.deserializer.maxBlobLength = 33554432
########
# SINK #
########
#myPrj.sinks.sink_01.type = logger
myPrj.sinks.sink_01.type = hdfs
myPrj.sinks.sink_01.hdfs.path = /user/foo/ftp_source/%Y/%m/%d/%H/%M/%{basename}
myPrj.sinks.sink_01.hdfs.filePrefix = FTP_SOURCE
myPrj.sinks.sink_01.hdfs.useLocalTimeStamp = true
myPrj.sinks.sink_01.hdfs.rollCount = 0
myPrj.sinks.sink_01.hdfs.rollInterval = 0
myPrj.sinks.sink_01.hdfs.batchSize = 100
# Data compressed
#myPrj.sinks.sink_01.hdfs.rollSize = 33554432
#myPrj.sinks.sink_01.hdfs.codeC = gzip
#myPrj.sinks.sink_01.hdfs.fileType = CompressedStream
# Data no compressed
myPrj.sinks.sink_01.hdfs.rollSize = 33554432
myPrj.sinks.sink_01.hdfs.fileType = DataStream