Eu tenho alguns trabalhos Samza correndo todos lendomensagens de um tópico do Kafka e escrever uma nova mensagem para um novo tópico. Para enviar as novas mensagens, estou usando o Samza "s construído em OutgoingMessageEnvelope. Também usando um MessageCollector para enviar a nova mensagem. É algo como isto:
collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage))
Existe uma maneira que eu possa usar isso para adicionar partições ao tópico Kafka? Tal como o particionamento em um ID de usuário ou algo parecido.
Ou se há uma maneira melhor, eu adoraria ouvir isso!
Respostas:
3 para resposta № 1Você deve conseguir enviar mensagens usando um chave de particionamento,
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
Usando este método irá particionar seus dados. No entanto eu acho que se você está olhando para controlar o número de partições programaticamente, você deve usar a API kafka para criar / alterar o tópico como mencionado Aqui