/ / Rücklieferung nach ACK - Rubin, Hasenmq, Amqp

Rücklieferung nach ACK - Ruby, Rabbitmq, amqp

Warum werden meine RabbitMQ-Nachrichten erneut an meinen Kunden gesendet, nachdem sie ACKd waren? Ich bin neu bei RabbitMQ, ich muss es missbrauchen, oder da ist etwas falsch mit dem Ruby-Amqp-Juwel.

Ich habe ein Ruby-Skript, das eine Warteschlange abonniertund bestätigt jede Nachricht. Wenn ich es den ganzen Weg durch die Nachrichten machen lasse, verschwinden die Nachrichten wirklich aus der Warteschlange; Sie werden nicht erneut gesendet. Aber wenn ich mein Skript vor allen ACKd abbringe und das Skript erneut starte, beginnt die Zustellung von der ersten Nachricht an neu.

Das Verhalten, das ich im Code sehe, spiegelt sich genau in der RabbitMQ Web-Management-Oberfläche wider, die Warteschlange enthält Nachrichten, und trotz der ACKs verschwinden sie nicht.

Hinweis: Ich habe ungefähr 5000 Nachrichten in die Warteschlange gestellt. Wenn ich dem Verbraucher eine signifikante Menge ACK geben lasse, scheinen tatsächlich einige Nachrichten aus der Warteschlange entfernt zu werden (im Gegensatz zu dem, was ich oben gesagt habe). Ich konnte dieses Phänomen nicht feststellen.

Ich benutze Ruby 1.9.3, RabbitMQ 2.8.7, und die Amqp Ruby Edelstein 0.9.8. Es passiert mit Hersteller und Verbraucher auf Ubuntu 12.0.4 oder Mac OS 10.7.4.

Was zum Teufel??

(Siehe Update am Ende dieser Nachricht)

Hier ist der Code für den Verbraucher:

# encoding: utf-8
require "rubygems"
require "amqp"
require "aws-sdk"

queue_name = "some.queue"
begin
AMQP.start("amqp://localhost:5672") do | connection |
channel  = AMQP::Channel.new(connection)
queue = channel.queue(queue_name, :durable => true)
queue.subscribe(:ack => true) do | metadata, payload |
metadata.ack
end
end
end

und hier ist der Produzent:

# encoding: utf-8
require "rubygems"
require "amqp"
require "aws-sdk"

msg = ARGV[0]
queue_name = "some.queue"
begin
AMQP.start("amqp://localhost:5672") do | connection |
channel  = AMQP::Channel.new(connection)
queue    = channel.queue(queue_name, :durable => true)
(1..5000).each do | x |
channel.default_exchange.publish x, :routing_key => queue_name, :persistent => true
end
end
end

Mit Wireshark stellte ich fest, dass die Acks, die ich sendete, nicht zum Broker geschickt wurden. Ich rief metadata.ack an, aber es wurden keine Pakete gesendet.

Basierend auf @Robthewolfs Rat, habe ich Kanal versucht.Vorabholen (1). Als ich diesen Anruf benutzt habe, ist es jedem Makler gelungen. Im Allgemeinen, wenn ich channel.prefetch (n) anrief, schickte ich einmal n (oder manchmal n + 1) acks, und schickte sie an den Broker.

Also habe ich eine neue Frage: Warum legt Prefetch () Parameter fest, wie viele Acks gesendet werden müssen, bevor sie schließlich an den Broker gesendet werden?

Antworten:

1 für die Antwort № 1

Alle nicht gepackten Nachrichten werden in die Warteschlange zurückgeleitet, wenn der Benutzer gestoppt wurde. Wenn nur ein paar gecastet werden, werden sie nicht in die Warteschlange zurückgebracht, aber die anderen werden es sein. Sie können verwenden channel.basicQos(1); um sicherzustellen, dass Sie nur 1 Element zu einem Zeitpunkt aus der Warteschlange lesen. Kein neues Objekt wird gelesen, bis das erste Objekt angekreuzt ist.