2 Stimmen

Immer nur die letzten 10 Nachrichten im ActiveMQ Topic sicherstellen

Wir haben ein Problem in ActiveMQ, wo wir eine große Anzahl von Nachrichten haben, die nicht von Topics abfallen. Die Topics sind auf nicht-persistent, nicht-dauerhaft eingestellt. Unsere Activemq.xml-Datei lautet

<beans>

  <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false" persistent="false">

<!--
    <persistenceAdapter>
      <journaledJDBC journalLogFiles="5" dataDirectory="../data"/>
    </persistenceAdapter>
-->

        <transportConnectors>
            <transportConnector uri="vm://localhost"/>
        </transportConnectors>

  </broker>

</beans>

und unsere Topic-Definition in messaging-config.xml lautet

<destination id="traceChannel">

    <properties>

        <network>
        <session-timeout>10</session-timeout>
    </network>

        <server>
            <message-time-to-live>10000</message-time-to-live>
            <durable>false</durable>
            <durable-store-manager>flex.messaging.durability.FileStoreManager</durable-store-manager>
        </server>

        <jms>
            <destination-type>Topic</destination-type>
            <message-type>javax.jms.ObjectMessage</message-type>
            <connection-factory>ConnectionFactory</connection-factory>
            <destination-jndi-name>dynamicTopics/traceTopic</destination-jndi-name>
            <delivery-mode>NON_PERSISTENT</delivery-mode>
            <message-priority>DEFAULT_PRIORITY</message-priority>
            <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
            <transacted-sessions>false</transacted-sessions>
            <initial-context-environment>
                <property>
                    <name>Context.INITIAL_CONTEXT_FACTORY</name>
                    <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
                </property>
                <property>
                    <name>Context.PROVIDER_URL</name>
                    <value>tcp://localhost:61616</value>
                </property>
            </initial-context-environment>
        </jms>
    </properties>

    <channels>
        <channel ref="rtmps" />
    </channels>

    <adapter ref="trace" />

</destination>

Was ich versuche zu erreichen, ist, dass nur die letzten 10 Nachrichten auf den Themen zu jeder Zeit als verlassen es läuft über Nacht Ergebnisse in über 150K Nachrichten auf das Thema, obwohl es nur eine sehr kleine Anzahl sein sollte.

5voto

Henryk Konsek Punkte 8856

Soweit ich weiß, sollten Nachrichten, die an nicht dauerhafte Themen ohne Abonnenten gesendet werden, verworfen werden. Nur aktuell registrierte Verbraucher erhalten die Kopie der Nachricht.

Wie überprüfen Sie, ob das Thema diese 150K Nachrichten enthält? Über JMX?

Ungeachtet der Tatsache, dass Ihr nicht dauerhaftes Thema diese 150K Nachrichten nicht zwischenspeichern sollte, können Sie die Menge der pro Verbraucher gespeicherten Nachrichten mit Hilfe der Broker-Richtlinie begrenzen:

<broker>
...    
  <pendingMessageLimitStrategy>
    <constantPendingMessageLimitStrategy limit="10"/>
  </pendingMessageLimitStrategy>
...
</broker>

2voto

srodriguez Punkte 1777

Ich bin mir nicht sicher, ob das, was Sie wollen, archiviert werden kann. Sie können mehrere Dinge tun:

Zunächst einmal können Sie Ihre Nachrichten mit einer Lebenszeit versehen:

public ITopicPublisher CreateTopicPublisher(string selector)
        {
            try
            {
                IMessageProducer producer = m_session.CreateProducer(m_topic);
                ActiveMQPublisher publisher = new ActiveMQPublisher(producer);

                // here we put a time to live to 1min for eg
                TimeSpan messageTTL = TimeSpan.FromMilliseconds(60000);
                publisher.TimeToLive = messageTTL;

                if (!String.IsNullOrEmpty(selector))
                {
                    publisher.IsSelector = true;
                    publisher.Selector = selector;
                }
                return publisher;
            }
            catch (Exception ex)
            {
                Logger.Exception(ex);
                throw;
            }
        }

Morgens erhalten Sie immer noch die 150k Nachrichten, aber sobald sich ein Verbraucher mit Ihrem Thema verbindet, sind die abgelaufenen Nachrichten verschwunden.

Vielleicht möchten Sie auch einen Blick auf die Räumungsstrategie werfen ( Räumungsstrategie )

Edit: Mir ist gerade eine Sache aufgefallen. Du sagst: "Wenn man es über Nacht laufen lässt, hat man über 150.000 Nachrichten zu dem Thema, obwohl es nur eine sehr geringe Anzahl enthalten sollte." In einem Topic gibt es kein Konzept von Nachrichten, die abgeholt werden müssen. Wenn niemand ein Thema abonniert hat, wird die Nachricht, die dorthin geschickt wird, verworfen. Trotzdem erhöht sich die "Message enqueued" um eins. Wenn Sie nur "die letzten 10 Nachrichten" gesendet haben wollen, sollten Sie vielleicht eine Warteschlange anstelle eines Themas verwenden.

1voto

Claudio Punkte 1828

Ich weiß nicht, ob es funktioniert, haven-t bekam die Chance noch zu testen, aber bitte werfen Sie einen Blick auf constantPendingMessageLimitStrategy in hier http://activemq.apache.org/slow-consumer-handling.html

Viel Glück! Claudio

-1voto

CEJ Punkte 19

Ein paar Dinge, die aus diesem Beitrag nicht so klar hervorgehen

  • Basierend auf meinem Parsing und Kämpfe versuchen, dies hinzuzufügen.. es braucht ein bisschen mehr Kontext, so fügte ich es unten für diejenigen, die vielleicht nicht durch die ActiveMQ Google Kampf gehen wollen.
  • Das steht in meinem activemq.xml und ich sehe nicht, dass dies überhaupt funktioniert, also habe ich gehofft, dass ich einige Einblicke von anderen bekommen könnte, die mich vielleicht in die richtige Richtung weisen können.

    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic=">"
                       topicPrefetch="10"/>
          <policyEntry topic=">"
                       producerFlowControl="false"/>
          <policyEntry topic=".>"
                       >
            <messageEvictionStrategy>
              <oldestMessageEvictionStrategy/>
            </messageEvictionStrategy>
    
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="10"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X