I need to define a grok pattern in AWS Glue Classifie to capture the datestamp
with milliseconds on the datetime
column of file (which is converted as string
by AWS Glue Crawler. I used the DATESTAMP_EVENTLOG
predefined in AWS Glue and tried to add the milliseconds into the pattern.
Classification: datetime
Grok pattern: %{DATESTAMP_EVENTLOG:string}
Custom patterns:
MILLISECONDS (\d){3,7}
DATESTAMP_EVENTLOG %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}.%{MILLISECONDS}
I still could not succeed to implement pattern. Any ideas?
The misconception with the Classifiers is that they are for specifying file formats, in addition to the inbuilt ones like JSON, CSV, etc. And NOT for specifying individual data type parse formats.
As user @lilline suggests the best way to change a data type is with an ApplyMapping function.
When creating a Glue Job you can select the option: A proposed script generated by AWS Glue
Then when selecting the table from the Glue Catalog as a source, you can make changes to the datatypes, column names, etc.
The output code might looking something like the following:
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("paymentid", "string", "paymentid", "string"), ("updateddateutc", "string", "updateddateutc", "timestamp"), ...], transformation_ctx = "applymapping1")
Effectively casting the updateddateutc string to a timestamp.
In order to create a Classifier you would need to specify each individual column in the file.
Classifier type: Grok
Classification: Name Grok
pattern: %{MY_TIMESTAMP}
Custom patterns MY_TIMESTAMP (%{USERNAME:test}[,]%{YEAR:year}[-]%{MONTHNUM:mm}[-]%{MONTHDAY:dd} %{TIME:time})
I also was not able to figure out how to do that with the classifiers, but I ended up converting the timestamp from string to datetime by writing a custom transformation to the mapping script (python).
Below my working code. col2 is a column that glue crawler specified as string, and here I'm converting it to python datetime.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "s3_events", table_name = "events", transformation_ctx = "datasource0")
def convert_dates(rec):
rec["col2"] = datetime.strptime(rec["col2"], "%d.%m.%Y")
return rec
custommapping1 = Map.apply(frame = datasource0, f = convert_dates, transformation_ctx = "custommapping1")
applymapping1 = ApplyMapping.apply(frame = custommapping1, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "date", "col2", "date")], transformation_ctx = "applymapping1")
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["col2", "col0", "col1"], transformation_ctx = "selectfields2")
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "mydb", table_name = "mytable", transformation_ctx = "resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "mydb", table_name = "mytable", transformation_ctx = "datasink5")
job.commit()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With