10 Stimmen

Filterung von JMS-Nachrichtenempfängern nach JMSCorrelationID

Wie kann ich einen JMS-Warteschlangen-Listener in Java (JRE /JDK / J2EE 1.4) instanziieren, der nur Nachrichten empfängt, die einer bestimmten JMSCorrelationID entsprechen? Die Nachrichten, die ich abholen möchte, wurden in einer Warteschlange und nicht in einem Thema veröffentlicht, obwohl sich das bei Bedarf ändern kann.

Hier ist der Code, mit dem ich derzeit die Nachricht in die Warteschlange stelle:

/**
 * publishResponseToQueue publishes Requests to the Queue.
 *
 * @param   jmsQueueFactory             -Name of the queue-connection-factory
 * @param   jmsQueue                    -The queue name for the request
 * @param   response                     -A response object that needs to be published
 * 
 * @throws  ServiceLocatorException     -An exception if a request message
 *                                      could not be published to the Topic
 */
private void publishResponseToQueue( String jmsQueueFactory,
                                    String jmsQueue,
                                    Response response )
        throws ServiceLocatorException {

    if ( logger.isInfoEnabled() ) {
        logger.info( "Begin publishRequestToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + "," + response );
    }
    logger.assertLog( jmsQueue != null && !jmsQueue.equals(""),
                      "jmsQueue cannot be null" );
    logger.assertLog( jmsQueueFactory != null && !jmsQueueFactory.equals(""),
                      "jmsQueueFactory cannot be null" );
    logger.assertLog( response != null, "Request cannot be null" );

    try {

        Queue queue = (Queue)_context.lookup( jmsQueue );

        QueueConnectionFactory factory = (QueueConnectionFactory)
            _context.lookup( jmsQueueFactory );

        QueueConnection connection = factory.createQueueConnection();
        connection.start();
        QueueSession session = connection.createQueueSession( false,
                                    QueueSession.AUTO_ACKNOWLEDGE );

        ObjectMessage objectMessage = session.createObjectMessage();

        objectMessage.setJMSCorrelationID(response.getID());

        objectMessage.setObject( response );

        session.createSender( queue ).send( objectMessage );

        session.close();
        connection.close();

    } catch ( Exception e ) {
        //XC3.2  Added/Modified BEGIN
        logger.error( "ServiceLocator.publishResponseToQueue - Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        throw new ServiceLocatorException( "ServiceLocator.publishResponseToQueue " +
                                           "- Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        //XC3.2  Added/Modified END
    }

    if ( logger.isInfoEnabled() ) {
        logger.info( "End publishResponseToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + response );
    }

}  // end of publishResponseToQueue method

11voto

Robin Punkte 23622

Die Einrichtung der Warteschlangenverbindung ist dieselbe, aber sobald Sie die QueueSession haben, legen Sie den Selektor fest, wenn Sie einen Empfänger erstellen.

    QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'");

dann

receiver.receive()

oder

receiver.setListener(myListener);

5voto

James Strachan Punkte 9078

BTW, während seine nicht die eigentliche Frage, die Sie gefragt - wenn Sie versuchen, Anfrage Antwort über JMS zu implementieren würde ich empfehlen Lesen dieses Artikels denn die JMS-API ist viel komplexer, als Sie sich vorstellen können, und eine effiziente Umsetzung ist viel schwieriger, als es aussieht.

Insbesondere JMS effizient zu nutzen Sie sollten es vermeiden, Verbraucher für eine einzige Nachricht zu schaffen usw.

Auch weil die JMS-API so komplex ist, um sie korrekt und effizient zu nutzen - insbesondere mit Pooling, Transaktionen und gleichzeitiger Verarbeitung - empfehle ich den Leuten die Middleware vor ihrem Anwendungscode verbergen z. B. durch die Verwendung von Apache Camel's Spring Remoting Implementierung für JMS

2voto

saptarshi Punkte 97

Ich hoffe, das hilft. Ich habe Open MQ verwendet.

package com.MQueues;

import java.util.UUID;

import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.sun.messaging.BasicQueue;
import com.sun.messaging.QueueConnectionFactory;

public class HelloProducerConsumer {

public static String queueName = "queue0";
public static String correlationId;

public static String getCorrelationId() {
    return correlationId;
}

public static void setCorrelationId(String correlationId) {
    HelloProducerConsumer.correlationId = correlationId;
}

public static String getQueueName() {
    return queueName;
}

public static void sendMessage(String threadName) {
    correlationId = UUID.randomUUID().toString();
    try {

        // Start connection
        QueueConnectionFactory cf = new QueueConnectionFactory();
        QueueConnection connection = cf.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        BasicQueue destination = (BasicQueue) session.createQueue(threadName);
        MessageProducer producer = session.createProducer(destination);
        connection.start();

        // create message to send
        TextMessage message = session.createTextMessage();
        message.setJMSCorrelationID(correlationId);
        message.setText(threadName + "(" + System.currentTimeMillis() 
                + ") " + correlationId +" from Producer");

        System.out.println(correlationId +" Send from Producer");
        producer.send(message);

        // close everything
        producer.close();
        session.close();
        connection.close();

    } catch (JMSException ex) {
        System.out.println("Error = " + ex.getMessage());
    }
}

public static void receivemessage(final String correlationId) {
    try {

        QueueConnectionFactory cf = new QueueConnectionFactory();
        QueueConnection connection = cf.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        BasicQueue destination = (BasicQueue) session.createQueue(getQueueName());

        connection.start();

        System.out.println("\n");
        System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
        long now = System.currentTimeMillis();

        // receive our message
        String filter = "JMSCorrelationID = '" + correlationId  + "'";
        QueueReceiver receiver = session.createReceiver(destination, filter);
        TextMessage m = (TextMessage) receiver.receive();
        System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp());

        System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");

        session.close();
        connection.close();

    } catch (JMSException ex) {
        System.out.println("Error = " + ex.getMessage());
    }
}

public static void main(String args[]) {
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId1 = getCorrelationId();
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId2 = getCorrelationId();
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId3 = getCorrelationId();

    HelloProducerConsumer.receivemessage(correlationId2);

    HelloProducerConsumer.receivemessage(correlationId1);

    HelloProducerConsumer.receivemessage(correlationId3);
}
}

0voto

Trying Punkte 13480
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
QueueReceiver receiver = session.createReceiver(queue, filter);

Hier erhält der Empfänger die Nachrichten, für die JMSCorrelationID ist gleich MessageID Dies ist sehr hilfreich für das Paradigma Anfrage/Antwort.

oder Sie können dies direkt auf einen beliebigen Wert setzen:

QueueReceiver receiver = session.createReceiver(queue,  "JMSCorrelationID ='"+id+"'";);

Dann können Sie entweder receiver.receive(2000); oder receiver.setMessageListener(this);

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X