Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SSIS steps to load CSV from Azure blob to Azure SQL

I need to connect to a CSV file in an Azure blob (source), then load the data into an Azure SQL Server table, then move the CSV file to a different (archive) Azure blob.

Without Azure, I would create a flat file connection to a local file, then a Data Flow task using Source Assistant & Destination Assistant to load the data into a SQL Server table, then a File System task in Control Flow to move the file to the Archive directory.

I'd like to do something similar, with a direct connection to the file in the Azure blob, then after the Data Flow task, do a file Move from one Azure blob (source) to another Azure Blob (archive).

The best I can figure is to use the Azure Blob Download task to move the CSV to the Azure VM where SSIS is running (by the way, can you get an Azure SSIS service without a VM?), then after download create a flat file connection & Data Flow to load the data, then perform an Azure Blob Upload task to the Archive Blob.

Seems there should be a way to connect to the source Azure blob file & read from it directly without having to download it first. Likewise, seems there should be a way to move files between Azure blob containers. Best I can come up with is the download/upload option, but that requires an intermediate location (local directory), and doesn't remove the source file after download. Are there SSIS Azure tools to do these tasks?

like image 861
Scott Duncan Avatar asked Mar 31 '17 00:03

Scott Duncan


People also ask

How do I connect Azure blob storage in SSIS?

The Azure Storage connection manager enables a SQL Server Integration Services (SSIS) package to connect to an Azure Storage account. The connection manager is a component of the SQL Server Integration Services (SSIS) Feature Pack for Azure. In the Add SSIS Connection Manager dialog box, select AzureStorage > Add.


2 Answers

Any ideas for the other half of the problem: moving the files from the Source blob to an Archive blob after processing them?

As I known, there is no in-build task for you to achieve this purpose. Based on my test, I assume that you could leverage Script Task and write code (VB or C#) to handle blobs directly. Here are my detailed steps, you could refer to them:

1) Use Azure Blob Source and OLE DB Destination under Data Flow for loading a CSV file from Azure Blob into Azure SQL database.

enter image description here

2) After successfully load the CSV data into SQL table, use a Script Task for moving the source blob to an archive blob.

enter image description here

I would invoke Blob Service REST API copy Blob and Delete Blob with Container SAS token, you could leverage Microsoft Azure Storage Explorer and follow this official tutorial to generate the SAS token for your blob container.

Assuming the source blob and destination blob are under the same container, then I add three variables (SourceBlobUrl,ContainerSasToken,ArchiveBlobUrl) as follows and add them as ReadOnlyVariables in Script Task Editor, you could refer to this tutorial for using Variables in the Script Task.

enter image description here

Click Edit Script button under Script Task Editor to launch the VSTA development environment in which you write your custom script. Here is the Main method under ScriptMain.cs as follows:

public async void Main()
{
    // TODO: Add your code here
    string sasToken = Dts.Variables["ContainerSasToken"].Value.ToString();
    string sourceBlobUrl = Dts.Variables["SourceBlobUrl"].Value.ToString();
    string archiveBlobUrl = Dts.Variables["ArchiveBlobUrl"].Value.ToString();

    try
    {
        HttpClient client = new HttpClient();
        client.DefaultRequestHeaders.Add("x-ms-copy-source", sourceBlobUrl + sasToken);
        //copy source blob to archive blob
        Dts.Log($"start copying blob from [{sourceBlobUrl}] to [{archiveBlobUrl}]...", 0, new byte[0]);
        HttpResponseMessage response = await client.PutAsync(archiveBlobUrl + sasToken, null);
        if (response.StatusCode == HttpStatusCode.Accepted || response.StatusCode == HttpStatusCode.Created)
        {
            client.DefaultRequestHeaders.Clear();
            Dts.Log($"start deleting blob [{sourceBlobUrl}]...", 0, new byte[0]);
            //delete source blob
            HttpResponseMessage result = await client.DeleteAsync(sourceBlobUrl + sasToken);
            if (result.StatusCode == HttpStatusCode.Accepted || result.StatusCode == HttpStatusCode.Created)
            {
                Dts.TaskResult = (int)ScriptResults.Success;
                return;
            }
        }
        Dts.TaskResult = (int)ScriptResults.Failure;
    }
    catch (Exception ex)
    {
        Dts.Events.FireError(-1, "Script Task - Move source blob to an archive blob", ex.Message + "\r" + ex.StackTrace, String.Empty, 0);
        Dts.TaskResult = (int)ScriptResults.Failure;
    }
}

Result

enter image description here enter image description here

Additionally, you could also leverage Microsoft Azure Storage Client Library for .NET to access storage blob, at this point, you need to load the assembly in a SSIS script task that is not in the GAC, for more details you could refer to this official blog.

like image 92
Bruce Chen Avatar answered Sep 19 '22 18:09

Bruce Chen


As suggested by Bruce, I decided to use the Azure Storage Client Library, which is different from the solution presented by Bruce, so I am posting my working code for anyone who wants to take this approach.

I found two good references for this:

http://microsoft-ssis.blogspot.com/2015/10/azure-file-system-task-for-ssis.html

http://cc.davelozinski.com/code/csharp-azure-blob-storage-manager-class

First, in the Script task, Add Reference to the Microsoft.WindowsAzure.Storage assembly (C:\Program Files\Microsoft SDKs\Azure.NET SDK\v2.9\ToolsRef)

Second, add Namespace references:

    using Microsoft.WindowsAzure.Storage;
    using Microsoft.WindowsAzure.Storage.Blob;

Finally, here is the full main method to move a file between Azure storage accounts:

    public void Main()
    {
        // Get parameter values
        string blobFile = Dts.Variables["$Package::NewClientFile"].Value.ToString();
        string containerName = Dts.Variables["$Project::ClientContainer"].Value.ToString();

        // get connections
        string connStrSource = Dts.Connections["azureadhocstorage"].AcquireConnection(Dts.Transaction).ToString();
        string connStrTarget = Dts.Connections["azurearchivestorage"].AcquireConnection(Dts.Transaction).ToString();

        try
        {
            // Retrieve storage accounts from connection string.
            CloudStorageAccount storageAcctSource = CloudStorageAccount.Parse(connStrSource);
            CloudStorageAccount storageAcctTarget = CloudStorageAccount.Parse(connStrTarget);

            // Create the blob clients
            CloudBlobClient blobClientSource = storageAcctSource.CreateCloudBlobClient();
            CloudBlobClient blobClientTarget = storageAcctTarget.CreateCloudBlobClient();

            // Create a reference to the container you want to delete
            CloudBlobContainer containerSource = blobClientSource.GetContainerReference(containerName);
            CloudBlobContainer containerTarget = blobClientTarget.GetContainerReference(containerName);

            // get blockblob (the files) references
            CloudBlockBlob blobBlockSource = containerSource.GetBlockBlobReference(blobFile);
            CloudBlockBlob blobBlockTarget = containerTarget.GetBlockBlobReference(blobFile);

            // copy the source to the target, waiting for it to finish (it is Asynchronous between separate accounts)
            blobBlockTarget.StartCopy(blobBlockSource);
            while (blobBlockTarget.CopyState.Status == CopyStatus.Pending)
            {
                // not done copying yet, so go to sleep
                System.Threading.Thread.Sleep(100);
                // refresh the copy status
                blobBlockTarget.FetchAttributes();
            }

            // delete the source
            blobBlockSource.Delete();

            // Show success in log
            bool fireAgain = true;
            Dts.Events.FireInformation(0, "Move Block Blob", containerName + ": " + blobFile + " was moved successfully", string.Empty, 0, ref fireAgain);

            // Close Script Task with Success
            Dts.TaskResult = (int)ScriptResults.Success;
        }
        catch (Exception ex)
        {
            // Show Failure in log
            Dts.Events.FireError(0, "Move Block Blob", ex.Message, string.Empty, 0);

            // Close Script Task with Failure
            Dts.TaskResult = (int)ScriptResults.Failure;
        }
    }
like image 33
Scott Duncan Avatar answered Sep 20 '22 18:09

Scott Duncan