Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Timestamp incompatibility (NiFi's PutSQL)

I'm facing issues with the insertion of timestamps in to a PostgreSQL database using the Nifi PutSQL processor.

More specifically when trying to insert a date in the format '2018-01-31T19:01:09+00:00' in to a timestamptz column I get the following error message:

2018-04-01 19:29:40,091 ERROR [Timer-Driven Process Thread-5] o.apache.nifi.processors.standard.PutSQL PutSQL[id=7997503a-0162-1000-ee81-a0361cad5e0c] Failed to update database for StandardFlowFileRecord[uuid=d02e8b39-e564-4c37-a08a-dab8931e9890,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522615075930-15, container=default, section=15], offset=11492, length=163],offset=0,name=32836401373126,size=163] due to java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp; routing to failure: java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
    at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:711)
    at org.apache.nifi.processors.standard.PutSQL.lambda$null$5(PutSQL.java:313)
    at org.apache.nifi.processor.util.pattern.ExceptionHandler.execute(ExceptionHandler.java:127)
    at org.apache.nifi.processors.standard.PutSQL.lambda$new$6(PutSQL.java:311)
    at org.apache.nifi.processors.standard.PutSQL.lambda$new$9(PutSQL.java:354)
    at org.apache.nifi.processor.util.pattern.PutGroup.putFlowFiles(PutGroup.java:91)
    at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:101)
    at org.apache.nifi.processors.standard.PutSQL.lambda$onTrigger$20(PutSQL.java:574)
    at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
    at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
    at org.apache.nifi.processors.standard.PutSQL.onTrigger(PutSQL.java:574)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.text.ParseException: Unparseable date: "2018-01-31T20:19:35+00:00"
    at java.text.DateFormat.parse(DateFormat.java:366)
    at org.apache.nifi.processors.standard.PutSQL.setParameter(PutSQL.java:911)
    at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:707)
    ... 21 common frames omitted

I have tested the insertion of '2018-01-31T19:01:09+00:00' in to the timestamptz column from the command line and it works perfectly. I have tried various alternative formats such as:

  • '2018-01-31 19:01:09+00:00'
  • '2018-01-31 19:01:09+00'
  • '2018-01-31T19:01:09+00'
  • '2018-01-31 19:01:09'

They all fail with the same error in Nifi, even though they are all inserted just fine when performing the INSERT from the command line.

Please find a screenshot of my flow attached. Let me know if you need any more details.

Part of NiFi flow

To be honest I would prefer to avoid the java conversion all together, as leaving the datetime as a string and inserting it directly in to the Postgres DB works just fine. I have tried forcing this by using an UpdateAttribute processor but that lead to additional errors.

I have come across various questions regarding this topic, but I still don't get what is going on. Most notably:

  • PutSql - date format error
  • Nifi PutSQL Timestamp/Datetime error cannot be converted error
  • '0000-00-00 00:00:00' can not be represented as java.sql.Timestamp error
  • Trying To Solve remove the timestamp format conversion error by using "UpdateAttribute" processor i.e. ConvertJSONtoSQL-> UpdateAttribute--> PutSQL
like image 425
vcovo Avatar asked Mar 07 '23 12:03

vcovo


1 Answers

​I resolved this by using an ExecuteStreamCommand processor which calls a python script which converts a JSON line in to it's respective SQL insert statement. The table of interest in this case is reddit_post.

Code of the python script (I'm aware that there is no need for the INSERT argument, but this is there because I plan on adding an UPDATE option later on):

import json
import argparse
import sys

# For command line arguments
parser = argparse.ArgumentParser(description='Converts JSON to respective SQL statement')
parser.add_argument('statement_type', type=str, nargs=1)
parser.add_argument('table_name', type=str, nargs=1)

# Reading the command line arguments
statement_type = parser.parse_args().statement_type[0]
table_name = parser.parse_args().table_name[0]

# Initialize SQL statement 
statement = ''

for line in sys.stdin:
  # Load JSON line
  json_line = json.loads(line)

  # Add table name and SQL syntax
  if statement_type == 'INSERT':
    statement += 'INSERT INTO {} '.format(table_name)

  # Add table parameters and SQL syntax
  statement += '({}) '.format(', '.join(json_line.keys()))

  # Add table values and SQL syntax
  # Note that strings are formatted with single quotes, other data types are converted to strings (for the join method)
  statement += "VALUES ({});".format(', '.join("'{0}'".format(value.replace("'", "''")) if type(value) == str else str(value) for value in json_line.values()))

  # Send statement to stdout
  print(statement)
​

Configuration of ExecuteStreamCommand (Note that Argument Delimeter is set to a single space): enter image description here

Flow snippet:
enter image description here

I hope this can help someone that came across a similar issue. If you have any advice on how to improve the script, flow, or anything else please don't hesitate to let me know!

like image 100
vcovo Avatar answered Mar 10 '23 12:03

vcovo