Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Azure Data Factory v2 using utcnow() as a pipeline parameter

For context, I currently have a Data Factory v2 pipeline with a ForEach Activity that calls a Copy Activity. The Copy Activity simply copies data from an FTP server to a blob storage container.

Here is the pipeline json file :

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "ForEach1",
                "type": "ForEach",
                "typeProperties": {
                    "items": {
                        "value": "@pipeline().parameters.InputParams",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Copy1",
                            "type": "Copy",
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "FileSystemSource",
                                    "recursive": true
                                },
                                "sink": {
                                    "type": "BlobSink"
                                },
                                "enableStaging": false,
                                "cloudDataMovementUnits": 0
                            },
                            "inputs": [
                                {
                                    "referenceName": "FtpDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "FtpFileName": "@item().FtpFileName",
                                        "FtpFolderPath": "@item().FtpFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "BlobDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "BlobFileName": "@item().BlobFileName",
                                        "BlobFolderPath": "@item().BlobFolderPath"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "InputParams": {
                "type": "Array",
                "defaultValue": [
                    {
                        "FtpFolderPath": "/Folder1/",
                        "FtpFileName": "@concat('File_',formatDateTime(utcnow(), 'yyyyMMdd'), '.txt')",
                        "BlobFolderPath": "blobfolderpath",
                        "BlobFileName": "blobfile1"
                    },
                    {
                        "FtpFolderPath": "/Folder2/",
                        "FtpFileName": "@concat('File_',formatDateTime(utcnow(), 'yyyyMMdd'), '.txt')",
                        "BlobFolderPath": "blobfolderpath",
                        "BlobFileName": "blobfile2"
                    }
                ]
            }
        }
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

The issue I am having is that when specifying pipeline parameters, it seems I cannot use system variables and functions the same way I can when for example specifying folder paths for a blob storage dataset. The consequence of this is that formatDateTime(utcnow(), 'yyyyMMdd') is not being interpreted as function calls but rather the actual string with value formatDateTime(utcnow(), 'yyyyMMdd').

To counter this I am guessing I should be using a trigger to execute my pipeline and pass the trigger's execution time as a parameter to the pipeline like trigger().startTime but is this the only way? Am I simply doing something wrong in my pipeline's JSON?

like image 710
Thomas Pouget Avatar asked Jun 22 '18 11:06

Thomas Pouget


2 Answers

This should work: File_@{formatDateTime(utcnow(), 'yyyyMMdd')}

Or complex paths as well:

rootfolder/subfolder/@{formatDateTime(utcnow(),'yyyy')}/@{formatDateTime(utcnow(),'MM')}/@{formatDateTime(utcnow(),'dd')}/@{formatDateTime(utcnow(),'HH')}
like image 66
Dipen Lama Avatar answered Nov 18 '22 16:11

Dipen Lama


You can't put a dynamic expression in the default value. You should define this expression and function either when you creating a trigger, or when you define dataset parameters in sink/source in copy activity. So either you create dataset property FtpFileName with some default value in source dataset, and then in copy activity, you can in source category to specify that dynamic expression.

example

Another way is to define pipeline parameter, and then to add dynamic expression to that pipeline parameter when you are defining a trigger. Hope this is a clear answer to you. :)

like image 2
DraganB Avatar answered Nov 18 '22 14:11

DraganB