For context, I currently have a Data Factory v2 pipeline with a ForEach Activity that calls a Copy Activity. The Copy Activity simply copies data from an FTP server to a blob storage container.
Here is the pipeline json file :
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "ForEach1",
"type": "ForEach",
"typeProperties": {
"items": {
"value": "@pipeline().parameters.InputParams",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Copy1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "FileSystemSource",
"recursive": true
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "FtpDataset",
"type": "DatasetReference",
"parameters": {
"FtpFileName": "@item().FtpFileName",
"FtpFolderPath": "@item().FtpFolderPath"
}
}
],
"outputs": [
{
"referenceName": "BlobDataset",
"type": "DatasetReference",
"parameters": {
"BlobFileName": "@item().BlobFileName",
"BlobFolderPath": "@item().BlobFolderPath"
}
}
]
}
]
}
}
],
"parameters": {
"InputParams": {
"type": "Array",
"defaultValue": [
{
"FtpFolderPath": "/Folder1/",
"FtpFileName": "@concat('File_',formatDateTime(utcnow(), 'yyyyMMdd'), '.txt')",
"BlobFolderPath": "blobfolderpath",
"BlobFileName": "blobfile1"
},
{
"FtpFolderPath": "/Folder2/",
"FtpFileName": "@concat('File_',formatDateTime(utcnow(), 'yyyyMMdd'), '.txt')",
"BlobFolderPath": "blobfolderpath",
"BlobFileName": "blobfile2"
}
]
}
}
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
The issue I am having is that when specifying pipeline parameters, it seems I cannot use system variables and functions the same way I can when for example specifying folder paths for a blob storage dataset.
The consequence of this is that formatDateTime(utcnow(), 'yyyyMMdd')
is not being interpreted as function calls but rather the actual string with value formatDateTime(utcnow(), 'yyyyMMdd')
.
To counter this I am guessing I should be using a trigger to execute my pipeline and pass the trigger's execution time as a parameter to the pipeline like trigger().startTime
but is this the only way? Am I simply doing something wrong in my pipeline's JSON?
This should work:
File_@{formatDateTime(utcnow(), 'yyyyMMdd')}
Or complex paths as well:
rootfolder/subfolder/@{formatDateTime(utcnow(),'yyyy')}/@{formatDateTime(utcnow(),'MM')}/@{formatDateTime(utcnow(),'dd')}/@{formatDateTime(utcnow(),'HH')}
You can't put a dynamic expression in the default value. You should define this expression and function either when you creating a trigger, or when you define dataset parameters in sink/source in copy activity. So either you create dataset property FtpFileName with some default value in source dataset, and then in copy activity, you can in source category to specify that dynamic expression.
Another way is to define pipeline parameter, and then to add dynamic expression to that pipeline parameter when you are defining a trigger. Hope this is a clear answer to you. :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With