Created
October 16, 2015 17:10
-
-
Save chirino/2266b8b12df3d2058292 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples; | |
import io.vertx.core.AsyncResult; | |
import io.vertx.core.AsyncResultHandler; | |
import io.vertx.core.Handler; | |
import io.vertx.core.Vertx; | |
import io.vertx.core.buffer.Buffer; | |
import io.vertx.core.net.NetClient; | |
import io.vertx.core.net.NetSocket; | |
import org.apache.qpid.proton.Proton; | |
import org.apache.qpid.proton.amqp.messaging.AmqpValue; | |
import org.apache.qpid.proton.amqp.messaging.Target; | |
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; | |
import org.apache.qpid.proton.amqp.transport.SenderSettleMode; | |
import org.apache.qpid.proton.engine.Connection; | |
import org.apache.qpid.proton.engine.Delivery; | |
import org.apache.qpid.proton.engine.Sender; | |
import org.apache.qpid.proton.engine.Session; | |
import org.apache.qpid.proton.engine.Transport; | |
import org.apache.qpid.proton.message.Message; | |
import java.nio.ByteBuffer; | |
import java.nio.charset.StandardCharsets; | |
/** | |
* | |
*/ | |
public class AmqpExample { | |
public static void main(String[] args) throws InterruptedException { | |
// Connection are logical and kinda independent of the Socket (they can even switch sockets). | |
final Connection connection = Proton.connection(); | |
connection.setContainer("client-id:1"); | |
connection.open(); | |
Session session = connection.session(); | |
session.open(); | |
Target target = new Target(); | |
target.setAddress("queue://foo"); | |
Sender sender = session.sender("link1"); | |
sender.setTarget(target); | |
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); | |
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); | |
sender.open(); | |
Delivery delivery1 = sendMessage(sender, "msg:1", "Helo World"); | |
Vertx vertx = Vertx.vertx(); | |
NetClient client = connect(vertx, connection, "localhost", 5672); | |
// The proton engine API is not thread safe.. so at this | |
// point this would not be safe since we need to synchronize /w the | |
// actions the NetClient is doing. | |
Delivery delivery2 = sendMessage(sender, "msg:2", "Helo World 2"); | |
Thread.sleep(1000); | |
} | |
private static NetClient connect(Vertx vertx, final Connection connection, String localhost, int port) { | |
// Since the connection is not yet attached to a socket, all the above actions queued up | |
final NetClient client = vertx.createNetClient(); | |
client.connect(port, localhost, new AsyncResultHandler<NetSocket>() { | |
final Transport transport = Proton.transport(); | |
NetSocket socket; | |
ByteBuffer pendingFromVertx; | |
@Override | |
public void handle(AsyncResult<NetSocket> event) { | |
if (event.succeeded()) { | |
transport.bind(connection); | |
socket = event.result(); | |
socket.handler(new Handler<Buffer>() { | |
@Override | |
public void handle(Buffer event) { | |
assert pendingFromVertx == null; | |
pendingFromVertx = ByteBuffer.wrap(event.getBytes()); | |
socket.pause(); | |
// Have proton process bytes from the network | |
pumpPendingFromVertx(); | |
// at this point we really should call back to user logic | |
// to handle processing the updated AMQP state. | |
// Proton my want to reply in response to those bytes.. so | |
// send bytes out of proton into vert.x | |
pumpPendingToVertx(); | |
} | |
}); | |
socket.drainHandler(new Handler<Void>() { | |
@Override | |
public void handle(Void event) { | |
pumpPendingToVertx(); | |
} | |
}); | |
pumpPendingToVertx(); | |
} else { | |
System.out.println("Failed to connect: " + event.cause()); | |
System.exit(1); | |
} | |
} | |
private void pumpPendingToVertx() { | |
ByteBuffer outputBuffer = transport.getOutputBuffer(); | |
while( !socket.writeQueueFull() && outputBuffer.hasRemaining() ) { | |
byte buffer[] = new byte[outputBuffer.remaining()]; | |
outputBuffer.get(buffer); | |
socket.write(Buffer.buffer(buffer)); | |
transport.outputConsumed(); | |
} | |
} | |
private void pumpPendingFromVertx() { | |
// Lets push bytes from vert.x to proton engine. | |
ByteBuffer inputBuffer = transport.getInputBuffer(); | |
while (pendingFromVertx.hasRemaining() && inputBuffer.hasRemaining()) { | |
inputBuffer.put(pendingFromVertx.get()); | |
} | |
transport.processInput().checkIsOk(); | |
if (!pendingFromVertx.hasRemaining()) { | |
pendingFromVertx = null; | |
socket.resume(); | |
} | |
} | |
}); | |
return client; | |
} | |
static private Delivery sendMessage(Sender sender, String deliveryTag, String messageBody) { | |
int BUFFER_SIZE = 1024; | |
Message m = Proton.message(); | |
m.setBody(new AmqpValue(messageBody)); | |
byte[] encodedMessage = new byte[BUFFER_SIZE]; | |
int len = m.encode(encodedMessage, 0, BUFFER_SIZE); | |
byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8); | |
Delivery serverDelivery = sender.delivery(tag); | |
sender.send(encodedMessage, 0, len); | |
sender.advance(); | |
return serverDelivery; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment