Created
November 11, 2016 14:48
-
-
Save dehora/fd777139c90dbb57233f11d521575564 to your computer and use it in GitHub Desktop.
Example Streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spike; | |
import java.util.concurrent.TimeUnit; | |
import nakadi.Cursor; | |
import nakadi.LoggingStreamObserverProvider; | |
import nakadi.LoggingStreamOffsetObserver; | |
import nakadi.NakadiClient; | |
import nakadi.Response; | |
import nakadi.StreamConfiguration; | |
import nakadi.StreamProcessor; | |
class StreamsMain { | |
public static void main(String[] args) throws Exception { | |
final NakadiClient client = NakadiClient.newBuilder() | |
.baseURI("http://localhost:9080") | |
.build(); | |
healthcheck(client); | |
streams(client); | |
} | |
private static void streams(NakadiClient client) throws Exception { | |
StreamConfiguration sc = new StreamConfiguration() | |
.eventTypeName("et-1") | |
.cursors(new Cursor("0", "741")) | |
.batchLimit(2) | |
.batchFlushTimeout(3, TimeUnit.SECONDS) | |
; | |
StreamProcessor processor = client.resources().streamBuilder() | |
.streamConfiguration(sc) | |
.streamObserverFactory(new LoggingStreamObserverProvider()) | |
.streamOffsetObserver(new LoggingStreamOffsetObserver()) | |
.build(); | |
processor.start(); | |
//Thread.sleep(500000); | |
//processor.stop(); | |
} | |
private static void healthcheck(NakadiClient client) { | |
final Response healthcheck1 = client.resources().health().healthcheckThrowing(); | |
System.out.println(healthcheck1.statusCode() + " " + healthcheck1.reason()); | |
System.out.println(healthcheck1.headers()); | |
System.out.println(healthcheck1.responseBody().asString()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment