Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to load data from Cloud Storage into BigQuery using Java

I want to upload data from Google Cloud Storage to table in Big Query. There is my code to create job:

public class LoadStorageToBigQuery {

// ///////////////////////
// USER GENERATED VALUES: you must fill in values specific to your
// application.
//
// Visit the Google API Console to create a Project and generate an
// OAuth 2.0 Client ID and Secret (http://code.google.com/apis/console).
// Then, add the Project ID below, and update the clientsecrets.json file
// with your client_id and client_secret
//
// ///////////////////////
private static final String PROJECT_ID = "gavd.com:compute";
private static final String CLIENTSECRETS_LOCATION = "/client_secrets.json";
 private static final String RESOURCE_PATH =
          ("E:/Work On/ads/Cloud/Dev/Source/BigQueryDemo02" + CLIENTSECRETS_LOCATION).replace(
              '/', File.separatorChar);
static GoogleClientSecrets clientSecrets = loadClientSecrets();

// Static variables for API scope, callback URI, and HTTP/JSON functions
private static final List<String> SCOPES = Arrays
        .asList("https://www.googleapis.com/auth/bigquery");
private static final String REDIRECT_URI = "urn:ietf:wg:oauth:2.0:oob";
private static final HttpTransport TRANSPORT = new NetHttpTransport();
private static final JsonFactory JSON_FACTORY = new JacksonFactory();

private static GoogleAuthorizationCodeFlow flow = null;

/**
 * @param args
 * @throws IOException
 * @throws InterruptedException
 */
public static void main(String[] args) throws IOException,
        InterruptedException {
    System.out.println(CLIENTSECRETS_LOCATION);
    // Create a new BigQuery client authorized via OAuth 2.0 protocol
    Bigquery bigquery = createAuthorizedClient();

    // Print out available datasets to the console
    listDatasets(bigquery, "publicdata");
    JobReference jobId = startQuery(bigquery, PROJECT_ID);
    System.out.println("Job ID = " + jobId); 
    JobReference jobRef = startQuery(bigquery, PROJECT_ID);
    checkQueryResults(bigquery, PROJECT_ID, jobRef);

}

/**
 * Creates an authorized BigQuery client service using the OAuth 2.0
 * protocol
 * 
 * This method first creates a BigQuery authorization URL, then prompts the
 * user to visit this URL in a web browser to authorize access. The
 * application will wait for the user to paste the resulting authorization
 * code at the command line prompt.
 * 
 * @return an authorized BigQuery client
 * @throws IOException
 */
public static Bigquery createAuthorizedClient() throws IOException {

    String authorizeUrl = new GoogleAuthorizationCodeRequestUrl(
            clientSecrets, REDIRECT_URI, SCOPES).setState("").build();

    System.out
            .println("Paste this URL into a web browser to authorize BigQuery Access:\n"
                    + authorizeUrl);

    System.out.println("... and type the code you received here: ");
    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
    String authorizationCode = in.readLine();

    // Exchange the auth code for an access token and refesh token
    Credential credential = exchangeCode(authorizationCode);

    return Bigquery.builder(TRANSPORT, JSON_FACTORY)
            .setHttpRequestInitializer(credential)
            .setApplicationName("Your User Agent Here").build();
}

/**
 * Display all BigQuery Datasets associated with a Project
 * 
 * @param bigquery
 *            an authorized BigQuery client
 * @param projectId
 *            a string containing the current project ID
 * @throws IOException
 */
public static void listDatasets(Bigquery bigquery, String projectId)
        throws IOException {
    Datasets.List datasetRequest = bigquery.datasets().list(projectId);
    DatasetList datasetList = datasetRequest.execute();
    if (datasetList.getDatasets() != null) {
        List<DatasetList.Datasets> datasets = datasetList.getDatasets();
        System.out.println("Available datasets\n----------------");
        System.out.println( " B = " + datasets.toString());
        for (DatasetList.Datasets dataset : datasets) {
            System.out.format("%s\n", dataset.getDatasetReference()
                    .getDatasetId());
        }
    }
}


public static JobReference startQuery(Bigquery bigquery, String projectId) throws IOException {

    Job job = new Job();
    JobConfiguration config = new JobConfiguration();
    JobConfigurationLoad loadConfig = new JobConfigurationLoad();
    config.setLoad(loadConfig);

    job.setConfiguration(config);

    // Set where you are importing from (i.e. the Google Cloud Storage paths).
    List<String> sources = new ArrayList<String>();
    sources.add("gs://gms_cloud_project/bigquery_data/06_13_2014/namesbystate.csv");
    loadConfig.setSourceUris(sources);
    //state:STRING,sex:STRING,year:INTEGER,name:STRING,occurrence:INTEGER
    // Describe the resulting table you are importing to:
    TableReference tableRef = new TableReference();
    tableRef.setDatasetId("gimasys_database");
    tableRef.setTableId("table_test");
    tableRef.setProjectId(projectId);
    loadConfig.setDestinationTable(tableRef);

    List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
    TableFieldSchema fieldState = new TableFieldSchema();
    fieldState.setName("state");
    fieldState.setType("STRING");
    TableFieldSchema fieldSex = new TableFieldSchema();
    fieldSex.setName("sex");
    fieldSex.setType("STRING");
    TableFieldSchema fieldName = new TableFieldSchema();
    fieldName.setName("name");
    fieldName.setType("STRING");
    TableFieldSchema fieldYear = new TableFieldSchema();
    fieldYear.setName("year");
    fieldYear.setType("INTEGER");
    TableFieldSchema fieldOccur = new TableFieldSchema();
    fieldOccur.setName("occurrence");
    fieldOccur.setType("INTEGER");
    fields.add(fieldState);
    fields.add(fieldSex);
    fields.add(fieldName);
    fields.add(fieldYear);
    fields.add(fieldOccur);
    TableSchema schema = new TableSchema();
    schema.setFields(fields);
    loadConfig.setSchema(schema);

    // Also set custom delimiter or header rows to skip here....
    // [not shown].

    Insert insert = bigquery.jobs().insert(projectId, job);
    insert.setProjectId(projectId);
    JobReference jobRef =  insert.execute().getJobReference();

    // ... see rest of codelab for waiting for job to complete.
    return jobRef;
    //return jobId;
}

/**
 * Polls the status of a BigQuery job, returns Job reference if "Done"
 * 
 * @param bigquery
 *            an authorized BigQuery client
 * @param projectId
 *            a string containing the current project ID
 * @param jobId
 *            a reference to an inserted query Job
 * @return a reference to the completed Job
 * @throws IOException
 * @throws InterruptedException
 */
private static Job checkQueryResults(Bigquery bigquery, String projectId,
        JobReference jobId) throws IOException, InterruptedException {
    // Variables to keep track of total query time
    long startTime = System.currentTimeMillis();
    long elapsedTime;

    while (true) {
        Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId())
                .execute();
        elapsedTime = System.currentTimeMillis() - startTime;
        System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
                jobId.getJobId(), pollJob.getStatus().getState());
        if (pollJob.getStatus().getState().equals("DONE")) {
            return pollJob;
        }
        // Pause execution for one second before polling job status again,
        // to
        // reduce unnecessary calls to the BigQUery API and lower overall
        // application bandwidth.
        Thread.sleep(1000);
    }
}

/**
 * Helper to load client ID/Secret from file.
 * 
 * @return a GoogleClientSecrets object based on a clientsecrets.json
 */
private static GoogleClientSecrets loadClientSecrets() {
    try {
        System.out.println("A");
        System.out.println(CLIENTSECRETS_LOCATION);
        GoogleClientSecrets clientSecrets = GoogleClientSecrets.load(new JacksonFactory(),
                    new FileInputStream(new File(
                        RESOURCE_PATH)));
        return clientSecrets;
    } catch (Exception e) {
        System.out.println("Could not load file Client_Screts");
        e.printStackTrace();
    }
    return clientSecrets;
}

/**
 * Exchange the authorization code for OAuth 2.0 credentials.
 * 
 * @return an authorized Google Auth flow
 */
static Credential exchangeCode(String authorizationCode) throws IOException {
    GoogleAuthorizationCodeFlow flow = getFlow();
    GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
            .setRedirectUri(REDIRECT_URI).execute();
    return flow.createAndStoreCredential(response, null);
}

/**
 * Build an authorization flow and store it as a static class attribute.
 * 
 * @return a Google Auth flow object
 */
static GoogleAuthorizationCodeFlow getFlow() {
    if (flow == null) {
        HttpTransport httpTransport = new NetHttpTransport();
        JacksonFactory jsonFactory = new JacksonFactory();

        flow = new GoogleAuthorizationCodeFlow.Builder(httpTransport,
                jsonFactory, clientSecrets, SCOPES)
                .setAccessType("offline").setApprovalPrompt("force")
                .build();
    }
    return flow;
}

}

Status informed "DONE" in creating table but when checking my project in console.gooogle.com Currently, we also don't see any error or issue or exception but it's not upload any data to my table (Table size 0B). I tried to create table without any data and than to upload data to this table but it isn't.

Job ID = {"jobId":"job_MqfuhuAU1Ms0GIOSbiePFGlc6TE","projectId":"ads.com:compute"}
Job status (451ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (2561ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (6812ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (8273ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (9695ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (11146ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (12466ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (13948ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (15392ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (16796ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (18296ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: RUNNING
Job status (19755ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: RUNNING
Job status (21587ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: DONE

I would appreciate any help,

Thanks

like image 505
user3737020 Avatar asked Jun 17 '14 08:06

user3737020


1 Answers

pollJob.getStatus().getState().equals("DONE") would tell you when the job is done but it does not give you the exit code.

You should be checking errorresult explicitly pollJob.getStatus().getErrorResult();

    while (true) {
     Job pollJob = getBigQuery().jobs().get(projectId, jobId.getJobId()).execute();
     elapsedTime = System.currentTimeMillis() - startTime;
     if (pollJob.getStatus().getErrorResult() != null) {
        // The job ended with an error.
         System.out.format("Job %s ended with error %s", jobId.getJobId(),pollJob.getStatus().getErrorResult().getMessage(), projectId);
         throw new RuntimeException(String.format("Job %s ended with error %s", jobId.getJobId(), 
                 pollJob.getStatus().getErrorResult().getMessage()));       
     }       
     System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
       jobId.getJobId(), pollJob.getStatus().getState());

     if (pollJob.getStatus().getState().equals("DONE")) {
       break;
     }
     Thread.sleep(5000);
    }
like image 96
East2West Avatar answered Nov 15 '22 17:11

East2West