Skip to content

Instantly share code, notes, and snippets.

@oluies
Created November 1, 2017 13:44

Revisions

  1. oluies created this gist Nov 1, 2017.
    66 changes: 66 additions & 0 deletions beam_elastic.jva
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,66 @@
    public static void main(String[] args) throws IOException {


    PipelineOptionsFactory.register(PipelineCmdLineOptions.class);
    PipelineCmdLineOptions options =
    PipelineOptionsFactory.fromArgs(args).as(PipelineCmdLineOptions.class);


    //Config config = ConfigFactory.parseFile(new File(args[0]));
    //LOG.info(config.root().render(ConfigRenderOptions.concise().setFormatted(true)));


    //options.setJobName("NBI Kafka to Elastic");
    /*ConnectionConfiguration connectionConfiguration = null;
    try {
    connectionConfiguration = getConnectionConfiguration(options,ElasticsearchIOReadOrWrite.WRITE);
    } catch (IOException e) {
    final String msg = "FATAL: Connection to elasticsearch " + options.getElasticsearchServer() + ":" + options.getElasticsearchHttpPort() + " failed";
    System.err.format(msg);
    LOG.error(msg,e);
    // return;
    }
    */

    ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.create(new String[] {"http://elasticsearch:9200"},"bigindex_write","bigindex");

    Pipeline p = Pipeline.create(options);

    PTransform<PBegin, PCollection<KV<String, LineageEvent>>> kafka = KafkaIO.<String,LineageEvent>read()
    .withBootstrapServers("kafka:9092")
    .withTopic("lineage")
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializerAndCoder(LinageEventDeserializer.class, AvroCoder.of(LineageEvent.class))

    // .withTimestampFn(new LinageEventTimeStampFunction())

    .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
    // .updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"http://registry:8081"))
    // .updateConsumerProperties(ImmutableMap.of("specific.avro.reader", (Object)"true"))

    // We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
    // the first 2 records.
    // In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
    //.withMaxNumRecords(10)

    .withoutMetadata();

    long ELASTIC_BATCH_SIZE = 1000;

    p.apply(kafka)
    .apply(Values.<LineageEvent>create())
    .apply("FormatLinageEvents", ParDo.of(new DoFn<LineageEvent, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
    LineageEvent e = c.element();
    final String s = e.toString();
    LOG.info(s);
    c.output(s);
    }
    }))
    .apply(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration).withMaxBatchSize(ELASTIC_BATCH_SIZE));



    p.run().waitUntilFinish();
    }