I am trying to read and parse JSON file in Apache Beam code.
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("/Users/xyz/eclipse-workspace/beam-project/myfirst.json"));
System.out.println("lines: " + lines);
Below is the sample JSON that I need to parse testdata
from this:
myfirst.json
{
“testdata":{
“siteOwner”:”xxx”,
“siteInfo”:{
“siteID”:”id_member",
"siteplatform”:”web”,
"siteType”:”soap”,
"siteURL”:”www”
}
}
}
Could someone guide how to parse testdata
and get content from the above JSON file and then I need to stream the data using Beam?
First of all, I don't think that it is possible (or at least common) to process "pretty-printed" JSON. Instead JSON data usually ingested from newline-delimited JSON, so your input file should look like following:
{"testdata":{"siteOwner":"xxx","siteInfo":{"siteID":"id_member","siteplatform":"web","siteType":"soap","siteURL":"www,}}}
{"testdata":{"siteOwner":"yyy","siteInfo":{"siteID":"id_member2","siteplatform":"web","siteType":"soap","siteURL":"www,}}}
After that, with your code in lines
you have "a stream of lines". Next, you can map
this "stream of lines" into "stream of JSONs", by applying parse-function in ParDo
:
static class ParseJsonFn extends DoFn<String, Json> {
@ProcessElement
public void processElement(ProcessContext c) {
// element here is your line, you can whatever you want, parse, print, etc
// this function will be simply applied to all elements in your stream
c.output(parseJson(c.element()))
}
}
PCollection<Json> jsons = lines.apply(ParDo.of(new ParseJsonFn())) // now you have a "stream of JSONs"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With