Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

join two json in Google Cloud Platform with dataflow

I want to find out only female employees out of the two different JSON files and select only the fields which we are interested in and write the output into another JSON.

Also I am trying to implement it in Google's cloud platform using Dataflow. Can someone please provide any sample Java code which can be implemented to get the result.

Employee JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}

Department JSON

{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}

The expected output JSON file should be like

{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}
like image 770
Koushik Chandra Avatar asked Jul 24 '17 18:07

Koushik Chandra


1 Answers

You can do this using CoGroupByKey (where shuffle will be used), or using side inputs, if your departments collection is significantly smaller.

I will give you code in Python, but you can use the same pipeline in Java.


With side inputs, you will:

  1. Convert your departments PCollection into a dictionary that maps dept_id to the department JSON dictionary.

  2. Then you take the employees PCollection as main input, where you can use the dept_id to get the JSON for each department in the departments PCollection.

Like so:

departments = (p | LoadDepts()
                 | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

deps_si = beam.pvalue.AsDict(departments)

employees = (p | LoadEmps())

def join_emp_dept(employee, dept_dict):
  return employee.update(dept_dict[employee['dept_id']])

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)

With CoGroupByKey, you can use dept_id as a key to group both collections. This will result in a PCollection of key-value pairs where the key is the dept_id, and the value are two iterables of the department, and the employees in that department.

departments = (p | LoadDepts()
               | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

employees = (p | LoadEmps()
               | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))

def join_lists((k, v)):
  itertools.product(v['employees'], v['departments'])

joined_dicts = (
    {'employees': employees, 'departments': departments} 
    | beam.CoGroupByKey()    
    | beam.FlatMap(join_lists)
    | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
    | 'filterfields'>> beam.Map(filter_fields)
)
like image 197
Pablo Avatar answered Sep 28 '22 03:09

Pablo