Created
August 8, 2022 11:04
-
-
Save showsky/e7f35c7df6b738280cf6d3ddca6b984b to your computer and use it in GitHub Desktop.
Apache Beam
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tw.com.feebee.staging; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.gson.Gson; | |
import com.google.gson.JsonArray; | |
import com.google.gson.JsonObject; | |
import com.google.gson.JsonParser; | |
import org.apache.beam.runners.direct.DirectOptions; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.io.Compression; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.transforms.*; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import java.util.ArrayList; | |
import java.util.List; | |
/** | |
* Created by showsky on 2022/8/8 | |
*/ | |
public class DemoWriteTableRow { | |
public static List<TableRow> buildData() { | |
ArrayList<TableRow> data = new ArrayList<>(); | |
for (int i = 0; i < 10; i++) { | |
TableRow tableRow = new TableRow(); | |
tableRow.set("name", "showsky" + i); | |
tableRow.set("position", String.valueOf(i)); | |
data.add(tableRow); | |
} | |
return data; | |
} | |
public static void main(String[] args) { | |
// 0. Init Pipeline | |
DirectOptions options = PipelineOptionsFactory | |
.fromArgs(args) | |
.withValidation() | |
.as(DirectOptions.class); | |
options.setTargetParallelism(5); | |
options.setTempLocation("./demo/temp"); | |
Pipeline p = Pipeline.create(options); | |
// 1. Read data | |
PCollection<TableRow> source = p.apply("Read data", Create.of(buildData())); | |
// 2. Add Group ID | |
PCollection<TableRow> collection1 = source.apply("App Group id", ParDo.of(new DoFn<TableRow, TableRow>() { | |
@ProcessElement | |
public void processElement(@Element TableRow tableRow, OutputReceiver<TableRow> out) { | |
int groupId = (int) (Math.random() * 2 + 1); | |
TableRow newTableRow = tableRow.clone(); | |
newTableRow.set("group_id", groupId); | |
out.output(newTableRow); | |
} | |
})); | |
// 3. Group By ID | |
PCollection<String> collection4 = collection1.apply("Group By Id", new PTransform<PCollection<TableRow>, PCollection<String>>() { | |
@Override | |
public PCollection<String> expand(PCollection<TableRow> input) { | |
return input.apply("1. Convert KV", ParDo.of(new DoFn<TableRow, KV<Integer, TableRow>>() { | |
@ProcessElement | |
public void processElement(@Element TableRow tableRow, OutputReceiver<KV<Integer, TableRow>> out) { | |
out.output(KV.of(Integer.valueOf(tableRow.get("group_id").toString()), tableRow)); | |
} | |
})).apply("2. Group By Key", GroupByKey.create()).apply("3. Convert Group Data to String", ParDo.of(new DoFn<KV<Integer, Iterable<TableRow>>, String>() { | |
@ProcessElement | |
public void processElement(@Element KV<Integer, Iterable<TableRow>> in, OutputReceiver<String> out) { | |
Gson gson = new Gson(); | |
JsonArray jsonArray = new JsonArray(); | |
for (TableRow tableRow : in.getValue()) { | |
TableRow newTableRow = tableRow.clone(); | |
newTableRow.remove("group_id"); | |
String jsonString = gson.toJson(newTableRow); | |
jsonArray.add(JsonParser.parseString(jsonString)); | |
} | |
JsonObject jsonObject = new JsonObject(); | |
jsonObject.addProperty("group_id", in.getKey()); | |
jsonObject.addProperty("size", jsonArray.size()); | |
jsonObject.add("data", jsonArray); | |
out.output(gson.toJson(jsonObject)); | |
} | |
})); | |
} | |
}); | |
// 3.1 Save to File | |
collection4.apply("Write Group Data", | |
TextIO.write().to("./demo/result") | |
.withoutSharding() | |
.withSuffix(".json") | |
); | |
// 4. TableRow convert json string | |
TableRowJsonCoder coder = TableRowJsonCoder.of(); | |
PCollection<String> collection2 = collection1.apply("Convert TableRow to Json String", ParDo.of(new DoFn<TableRow, String>() { | |
@ProcessElement | |
public void processElement(@Element TableRow tableRow, OutputReceiver<String> out) { | |
Gson gson = new Gson(); | |
out.output(gson.toJson(tableRow)); | |
} | |
})); | |
// 4. Save to File | |
String filenamePath = "./demo/output"; | |
collection2.apply("Write File", | |
TextIO.write() | |
.to(filenamePath) | |
.withSuffix(".json") | |
.withCompression(Compression.GZIP) | |
.withNumShards(2) | |
); | |
p.run().waitUntilFinish(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment