.NET for Apache Spark - Write and Read data from Azure Data Lake Storage Gen1

August 22, 2020
.NET, Development, Microsoft Azure
azure data lake storage, csharp, dotnet, fsharp, microsoft azure, spark

Introduction #

For the second article in the .NET for Apache Spark series, we’ll be looking into how to interact with Azure Data Lake Storage Gen1. We’ll be creating a DataFrame, writing the DataFrame into a Data Lake and retrieving the written data back into our application.

Prerequisites #

To get started:

Steps #

The steps to be taken are show in the outline below:

SectionSteps
Setup and Configure EnvironmentStep 1. Verify Installed Prerequisites
Step 2. Create a Service Principal
Step 3. Grant Service Principal access to Data Lake
Step 4. Save Service Principal & Data Lake name as Environment Variables
Build ApplicationStep 5. Create a new App
Step 6. Add Microsoft.Spark Package
Step 7. Write Code
Run ApplicationStep 8. Run your Application

Step 1. Verify Installed Prerequisites #

The first step is ensuring you have the required prerequisites installed. Run the following command:

dotnet --version; \
java -version; \
spark-shell --version; \
echo $DOTNET_WORKER_DIR;

Expected output

3.1.302
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1ubuntu1-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.6
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_252
Branch HEAD
Compiled by user holden on 2020-05-29T23:47:51Z
Revision 807e0a484d1de767d1f02bd8a622da6450bdf940
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
/home/usman/bin/Microsoft.Spark.Worker
dotnet --version & ^
java -version & ^
spark-shell --version & ^
echo %DOTNET_WORKER_DIR%

Expected output

3.1.302
java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.1
      /_/

Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_211
Branch
Compiled by user  on 2019-03-26T22:44:44Z
Revision
Url
Type --help for more information.
C:\bin\Microsoft.Spark.Worker\

The important bits highlighted above shows the .NET Core SDK version (3.1.x), Java version (1.8.x), Spark (2.4.x) + Scala (2.11.x) versions, and the configured Spark Worker directory (DOTNET_WORKER_DIR).

Step 2. Create a Service Principal #

We need to create a service principal which our application can use to access the Data Lake. This can be done via Azure Portal, Azure Cloud Shell or using Azure CLI on your local machine.

For Azure Portal, this guide from Microsoft Docs is detailed and covers all steps required. Make sure to retrieve your Tenant ID (Directory ID), service principal’s Client ID (Application ID), and Client Secret.

For Azure CLI method, follow the steps below to create and access your service principal credentials.

  1. Login to your Azure account by running the command below and following the prompt.

    az login
    
  2. Replace myApp in the command below with a name for your service principal, <subscription-id> with your Azure Subscription ID, and <group-name> with the name of your resource group. Subsequently, run this command.

    az ad sp create-for-rbac --name "myApp" --role contributor --scopes /subscriptions/<subscription-id>/resourceGroups/<group-name> --sdk-auth
    
    Letter/white spaces invalidates the name of a service principal, hence not allowed.

    The results of the executed command should look similar to this:

    {
        "clientId": "<client-id>",
        "clientSecret": "<client-secret>",
        "subscriptionId": "<subscription-id>",
        "tenantId": "<tenant-id>",
        "activeDirectoryEndpointUrl": "https://login.microsoftonline.com",
        "resourceManagerEndpointUrl": "https://management.azure.com/",
        "activeDirectoryGraphResourceId": "https://graph.windows.net/",
        "sqlManagementEndpointUrl": "https://management.core.windows.net:8443/",
        "galleryEndpointUrl": "https://gallery.azure.com/",
        "managementEndpointUrl": "https://management.core.windows.net/"
    }
    
  3. Copy the values of clientId, clientSecret and tenantId from the json result.

Step 3. Grant Service Principal access to Data Lake #

Again, this can be achieved via the Data Lake Access control (IAM) settings on Azure Portal. Alternatively, for Azure CLI:

  1. Get the Object ID (OID) of your service principal.

    az ad sp show --id <client-id> --query objectId
    

    Copy the output value.

  2. Set a Read, Write and Execute ACL entry for the service principal by replacing <data-lake-name> with the name of your Data Lake, <object-id> with the value from the above command (without the quotes), and run this command.

    az dls fs access set-entry --account <data-lake-name> --path / --acl-spec user:<object-id>:rwx
    
  3. Final step is to recursively apply the updated permissions to all files in the Data Lake. Unfortunately, there’s no support for this in Azure CLI.

    You need to go to: Data Lake Gen 1 > Data explorer > Access > Advanced. Click Apply to children.

Step 4. Save Service Principal & Data Lake name as Environment Variables #

The next step is to save the service principal’s Client ID (ADLS_SP_CLIENT_ID), Client Secret (ADLS_SP_CLIENT_SECRET), Tenant ID (TENANT_ID) and Data Lake Storage name (ADLS_NAME) as environment variables.

export TENANT_ID="<tenant-id>"
export ADLS_NAME="<data-lake-name>"
export ADLS_SP_CLIENT_ID="<client-id>"
export ADLS_SP_CLIENT_SECRET="<client-secret>"
setx TENANT_ID "<tenant-id>"
setx ADLS_NAME "<data-lake-name>"
setx ADLS_SP_CLIENT_ID "<client-id>"
setx ADLS_SP_CLIENT_SECRET "<client-secret>"

Step 5. Create a new App #

With our development environment now configured, we can create a new application. Run the following to create a new .NET console app.

dotnet new console -o mySparkDataLakeGen1App
cd mySparkDataLakeGen1App
dotnet new console -o mySparkDataLakeGen1App -lang "F#"
cd mySparkDataLakeGen1App

Step 6. Add Microsoft.Spark Package #

Run to install Microsoft.Spark 0.12.1 package from NuGet.

dotnet add package Microsoft.Spark --version 0.12.1

Step 7. Write Code #

  1. Open Program.cs (C#) or Program.fs (F#) in Visual Studio.

  2. Add required references.

    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Types;
    using System.Collections.Generic;
    
    open System
    open Microsoft.Spark.Sql
    open Microsoft.Spark.Sql.Types
    
  3. Specify file path in the Data Lake where our DataFrame will be saved. To spice things up a bit, we’ll be using Parquet format for this example.

    string filePath =
        $"adl://{args[1]}.azuredatalakestore.net/parquet/people.parquet";
    
    We’ll be passing the values of our environment variables (TENANT_ID, ADLS_NAME, ADLS_SP_CLIENT_ID, ADLS_SP_CLIENT_SECRET) as arguments when running the app. These values will be accessible via args[0], args[1], args[2] and args[3] respectively.
    let filePath = 
        sprintf "adl://%s.azuredatalakestore.net/parquet/people.parquet" dataLakeName
    
  4. Create new Spark Session with config for the Data Lake. Here, we’re specifying the FileSystem to be used for the adl scheme, and also providing the service principal credentials for authentication.

    SparkSession spark = SparkSession
        .Builder()
        .AppName("Azure Data Lake Storage example using .NET for Apache Spark")
        .Config("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem")
        .Config("fs.adl.oauth2.access.token.provider.type", "ClientCredential")
        .Config("fs.adl.oauth2.client.id", args[2])
        .Config("fs.adl.oauth2.credential", args[3])
        .Config("fs.adl.oauth2.refresh.url", $"https://login.microsoftonline.com/{args[0]}/oauth2/token")
        .GetOrCreate();
    
    let spark =
        SparkSession.Builder()
            .AppName("Azure Data Lake Storage example using .NET for Apache Spark")
            .Config("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem")
            .Config("fs.adl.oauth2.access.token.provider.type", "ClientCredential")
            .Config("fs.adl.oauth2.client.id",servicePrincipalClientId)
            .Config("fs.adl.oauth2.credential", servicePrincipalClientSecret)
            .Config("fs.adl.oauth2.refresh.url", 
                sprintf "https://login.microsoftonline.com/%s/oauth2/token" tenantId)
            .GetOrCreate()
    
  5. Create list of sample data.

    var data = new List<GenericRow>
    {
        new GenericRow(new object[] { 1, "John Doe" }),
        new GenericRow(new object[] { 2, "Jane Doe" }),
        new GenericRow(new object[] { 3, "Foo Bar" })
    };
    
    let data = 
        [ GenericRow([|1; "John Doe"|]) 
            GenericRow([|2; "Jane Doe"|]) 
            GenericRow([|3; "Foo Bar"|]) ]
    
  6. Create schema for sample data.

    var schema = new StructType(new List<StructField>()
    {
        new StructField("Id", new IntegerType()),
        new StructField("Name", new StringType()),
    });
    
    let schema = 
        StructType
            ([ StructField("Id", IntegerType())
                StructField("Name", StringType()) ])
    
  7. Create new DataFrame using sample data and schema.

    DataFrame df = spark.CreateDataFrame(data, schema);
    df.Show();
    
    let df = spark.CreateDataFrame(data, schema)
    df.Show()
    
  8. Write DataFrame to the Data Lake using the filePath we created earlier.

    df.Write().Mode(SaveMode.Overwrite).Parquet(filePath);
    
    df.Write().Mode(SaveMode.Overwrite).Parquet(filePath)
    
  9. Read saved files back from the Data Lake in a new DataFrame.

    DataFrame readDf = spark.Read().Parquet(filePath);
    readDf.Show();
    
    let readDf = spark.Read().Parquet(filePath)
    readDf.Show()
    
  10. At the end of the steps above, your program should look similar to this:

    using System;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Types;
    using System.Collections.Generic;
    
    namespace mySparkDataLakeGen1App
    {
        class Program
        {
            static void Main(string[] args)
            {
                // Verify environment variables
                if (args.Length != 4)
                {
                    Console.Error.WriteLine("Usage: $TENANT_ID $ADLS_NAME $ADLS_SP_CLIENT_ID $ADLS_SP_CLIENT_SECRET");
                    Environment.Exit(1);
                }
    
                // Specify file path in Azure Data Lake Gen1
                string filePath =
                    $"adl://{args[1]}.azuredatalakestore.net/parquet/people.parquet";
    
                // Create SparkSession
                SparkSession spark = SparkSession
                    .Builder()
                    .AppName("Azure Data Lake Storage example using .NET for Apache Spark")
                    .Config("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem")
                    .Config("fs.adl.oauth2.access.token.provider.type", "ClientCredential")
                    .Config("fs.adl.oauth2.client.id", args[2])
                    .Config("fs.adl.oauth2.credential", args[3])
                    .Config("fs.adl.oauth2.refresh.url", $"https://login.microsoftonline.com/{args[0]}/oauth2/token")
                    .GetOrCreate();
    
                // Create sample data
                var data = new List<GenericRow>
                {
                    new GenericRow(new object[] { 1, "John Doe"}),
                    new GenericRow(new object[] { 2, "Jane Doe"}),
                    new GenericRow(new object[] { 3, "Foo Bar"})
                };
    
                // Create schema for sample data
                var schema = new StructType(new List<StructField>()
                {
                    new StructField("Id", new IntegerType()),
                    new StructField("Name", new StringType()),
                });
    
                // Create DataFrame using data and schema
                DataFrame df = spark.CreateDataFrame(data, schema);
    
                // Print DataFrame
                df.Show();
    
                // Write DataFrame to Azure Data Lake Gen1
                df.Write().Mode(SaveMode.Overwrite).Parquet(filePath);
    
                // Read saved DataFrame from Azure Data Lake Gen1
                DataFrame readDf = spark.Read().Parquet(filePath);
    
                // Print DataFrame
                readDf.Show();
            }
        }
    }
    
    open System
    open Microsoft.Spark.Sql
    open Microsoft.Spark.Sql.Types
    
    [<EntryPoint>]
    let main args =
        match args with
        | [| tenantId; dataLakeName; servicePrincipalClientId; servicePrincipalClientSecret |] ->
    
            // Specify file path in Azure Data Lake Gen1
            let filePath = 
                sprintf "adl://%s.azuredatalakestore.net/parquet/people.parquet" dataLakeName
    
            // Create SparkSession
            let spark =
                SparkSession.Builder()
                    .AppName("Azure Data Lake Storage example using .NET for Apache Spark")
                    .Config("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem")
                    .Config("fs.adl.oauth2.access.token.provider.type", "ClientCredential")
                    .Config("fs.adl.oauth2.client.id",servicePrincipalClientId)
                    .Config("fs.adl.oauth2.credential", servicePrincipalClientSecret)
                    .Config("fs.adl.oauth2.refresh.url", 
                        sprintf "https://login.microsoftonline.com/%s/oauth2/token" tenantId)
                    .GetOrCreate()
    
            // Create sample data
            let data = 
                [ GenericRow([|1; "John Doe"|]) 
                  GenericRow([|2; "Jane Doe"|]) 
                  GenericRow([|3; "Foo Bar"|]) ]
    
            // Create schema for sample data
            let schema = 
                StructType
                    ([ StructField("Id", IntegerType())
                       StructField("Name", StringType()) ])
    
            // Create DataFrame using data and schema
            let df = spark.CreateDataFrame(data, schema)
    
            // Print DataFrame
            df.Show()
    
            // Write DataFrame to Azure Data Lake Gen1
            df.Write().Mode(SaveMode.Overwrite).Parquet(filePath)
    
            // Read saved DataFrame from Azure Data Lake Gen1
            let readDf = spark.Read().Parquet(filePath)
    
            // Print DataFrame
            readDf.Show()
    
            0
        | _ ->
            printfn "Usage: $TENANT_ID $ADLS_NAME $ADLS_SP_CLIENT_ID $ADLS_SP_CLIENT_SECRET"
            1
    
    Make sure to save the changes.
  11. Go back to Terminal / Command Prompt and run this command to build the app.

    dotnet build
    

Step 8. Run your Application #

  1. Navigate to your build output directory.

    cd ./bin/Debug/netcoreapp3.1/
    
    cd .\bin\Debug\netcoreapp3.1\
    
  2. Submit application to run on Apache Spark.

    spark-submit \
    --packages com.rentpath:hadoop-azure-datalake:2.7.3-0.1.0 \
    --class org.apache.spark.deploy.dotnet.DotnetRunner \
    --master local microsoft-spark-2.4.x-0.12.1.jar \
    ./mySparkDataLakeGen1App $TENANT_ID $ADLS_NAME $ADLS_SP_CLIENT_ID $ADLS_SP_CLIENT_SECRET
    
    spark-submit ^
    --packages com.rentpath:hadoop-azure-datalake:2.7.3-0.1.0 ^
    --class org.apache.spark.deploy.dotnet.DotnetRunner ^
    --master local microsoft-spark-2.4.x-0.12.1.jar ^
    mySparkDataLakeGen1App %TENANT_ID% %ADLS_NAME% %ADLS_SP_CLIENT_ID% %ADLS_SP_CLIENT_SECRET%
    
    There’s a known issue when running on Windows where the ShutdownHookManager fails to clear temp directories (Exception while deleting Spark temp dir). This doesn’t affect the functionality of the app. However, there’s a workaround if you’d like to temporarily hide the errors.

    Here, we’re using the Maven package com.rentpath:hadoop-azure-datalake:2.7.3-0.1.0. You can also supply the jar instead by downloading hadoop-azure-datalake-2.7.3-0.1.0.jar. Copy downloaded jar to your build output directory and replace the line: --packages com.rentpath:hadoop-azure-datalake:2.7.3-0.1.0 with --jars hadoop-azure-datalake-2.7.3-0.1.0.jar in the command above.

    Expected output

    ...
    +---+--------+
    | Id|    Name|
    +---+--------+
    |  1|John Doe|
    |  2|Jane Doe|
    |  3| Foo Bar|
    +---+--------+
    ...
    +---+--------+
    | Id|    Name|
    +---+--------+
    |  1|John Doe|
    |  2|Jane Doe|
    |  3| Foo Bar|
    +---+--------+
    ...
    

The first table shows us our DataFrame was successfully created and saved to Azure Data Lake Storage Gen1. Furthermore, the saved files were read back into the app and printed in the second table. You can also view the saved files in your Data Lake via Azure Portal or Azure Storage Explorer.

The sample project for this article can be found here on GitHub.

Additional Resources #

comments powered by Disqus