Created
August 7, 2023 03:07
-
-
Save bvolpato/90cda04a87d6fdfb1a451bd6fe1811cc to your computer and use it in GitHub Desktop.
TextToBigQueryStreamingCustom
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.api.services.bigquery.model.TableFieldSchema; | |
import com.google.api.services.bigquery.model.TableReference; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import java.io.BufferedReader; | |
import java.io.ByteArrayInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.nio.channels.Channels; | |
import java.nio.charset.StandardCharsets; | |
import java.util.List; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.PipelineResult; | |
import org.apache.beam.sdk.coders.Coder; | |
import org.apache.beam.sdk.coders.CoderRegistry; | |
import org.apache.beam.sdk.io.FileIO; | |
import org.apache.beam.sdk.io.fs.MatchResult; | |
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; | |
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; | |
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; | |
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; | |
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; | |
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; | |
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; | |
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.MapElements; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.Watch.Growth; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TypeDescriptor; | |
import org.apache.beam.sdk.values.TypeDescriptors; | |
import org.apache.beam.sdk.values.ValueInSingleWindow; | |
import org.joda.time.Duration; | |
public class TextToBigQueryStreamingCustom { | |
/** Default interval for polling files in GCS. */ | |
private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10); | |
/** | |
* The {@link TextToBigQueryStreamingOptions} class provides the custom execution options passed | |
* by the executor at the command-line. | |
*/ | |
public interface TextToBigQueryStreamingOptions extends PipelineOptions { | |
String getInputFilePattern(); | |
void setInputFilePattern(String value); | |
} | |
/** | |
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If | |
* blocking execution is required, use the {@link | |
* TextToBigQueryStreamingCustom#run(TextToBigQueryStreamingOptions)} method to start the pipeline | |
* and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult} | |
* | |
* @param args The command-line arguments to the pipeline. | |
*/ | |
public static void main(String[] args) { | |
// Parse the user options passed from the command-line | |
TextToBigQueryStreamingOptions options = | |
PipelineOptionsFactory.fromArgs(args) | |
.withValidation() | |
.as(TextToBigQueryStreamingOptions.class); | |
run(options); | |
} | |
/** | |
* Runs the pipeline with the supplied options. | |
* | |
* @param options The execution parameters to the pipeline. | |
* @return The result of the pipeline execution. | |
*/ | |
public static PipelineResult run(TextToBigQueryStreamingOptions options) { | |
// Create the pipeline | |
Pipeline pipeline = Pipeline.create(options); | |
PCollection<MatchResult.Metadata> files = | |
pipeline.apply( | |
"ReadFromSource", | |
FileIO.match() | |
.filepattern(options.getInputFilePattern()) | |
.continuously(DEFAULT_POLL_INTERVAL, Growth.never())); | |
PCollection<FileIO.ReadableFile> readMatches = files.apply("ReadMatches", FileIO.readMatches()); | |
PCollection<KV<String, String>> lines = | |
readMatches.apply( | |
"ReadLines", | |
ParDo.of( | |
new DoFn<FileIO.ReadableFile, KV<String, String>>() { | |
@ProcessElement | |
public void processElement( | |
@Element FileIO.ReadableFile file, | |
OutputReceiver<KV<String, String>> outputReceiver) | |
throws IOException { | |
try (InputStream inputStream = Channels.newInputStream(file.open()); | |
BufferedReader reader = | |
new BufferedReader(new InputStreamReader(inputStream))) { | |
while (reader.ready()) { | |
String line = reader.readLine(); | |
// TODO: Can add logic to whether output the line or not... | |
if (1 == 1) { | |
// TODO: convert line to JSON | |
outputReceiver.output( | |
KV.of(file.getMetadata().resourceId().getFilename(), line)); | |
} | |
} | |
} | |
} | |
})); | |
PCollection<KV<String, TableRow>> convertedTableRows = | |
lines.apply( | |
"MapToTableRow", | |
MapElements.into( | |
TypeDescriptors.kvs( | |
TypeDescriptors.strings(), TypeDescriptor.of(TableRow.class))) | |
.via(json -> convertJsonToTableRow(json))); | |
WriteResult writeResult = | |
convertedTableRows.apply( | |
"InsertIntoBigQuery", | |
BigQueryIO.<KV<String, TableRow>>write() | |
.to( | |
new DynamicDestinations<KV<String, TableRow>, String>() { | |
@Override | |
public String getDestination( | |
ValueInSingleWindow<KV<String, TableRow>> element) { | |
// Destination is keyed by the file name | |
return element.getValue().getKey(); | |
} | |
@Override | |
public TableDestination getTable(String fileName) { | |
// TODO: define project based on the file name | |
String project = "<project>"; | |
String dataset = "<dataset>"; | |
String table = fileName.split("\\.")[0]; | |
return new TableDestination( | |
new TableReference() | |
.setProjectId(project) | |
.setDatasetId(dataset) | |
.setTableId(table), | |
"Table for file " + fileName); | |
} | |
@Override | |
public TableSchema getSchema(String fileName) { | |
// TODO: Function to return the TableSchema based on the fileName | |
return new TableSchema() | |
.setFields( | |
List.of( | |
new TableFieldSchema().setName("id").setType("STRING"), | |
new TableFieldSchema().setName("name").setType("STRING"))); | |
} | |
}) | |
.withFormatFunction(kv -> kv.getValue()) | |
.withExtendedErrorInfo() | |
.withoutValidation() | |
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) | |
.withWriteDisposition(WriteDisposition.WRITE_APPEND) | |
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())); | |
return pipeline.run(); | |
} | |
public static KV<String, TableRow> convertJsonToTableRow(KV<String, String> json) { | |
TableRow row; | |
// Parse the JSON into a {@link TableRow} object. | |
try (InputStream inputStream = | |
new ByteArrayInputStream(json.getValue().getBytes(StandardCharsets.UTF_8))) { | |
row = TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER); | |
} catch (IOException e) { | |
throw new RuntimeException("Failed to serialize json to table row: " + json, e); | |
} | |
return KV.of(json.getKey(), row); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment