.NET for Apache Spark - Write and Read data from Azure Blob Storage

August 6, 2020
.NET, Development, Microsoft Azure
csharp, dotnet, fsharp, microsoft azure, spark, azure blob storage

Introduction #

In this article, we’ll look into how to create a DataFrame, write the DataFrame into Azure Blob Storage, and read the written data back into our application.

Prerequisites #

To get started:

Steps #

We’ve categorized the steps to be taken in the outline below:

SectionSteps
Setup and Configure EnvironmentStep 1. Verify Installed Prerequisites
Step 2. Retrieve Storage Account Access Key
Step 3. Persist Storage Account Name & Access Key as Environment Variables
Build ApplicationStep 4. Create a new App
Step 5. Add Microsoft.Spark Package
Step 6. Write Code
Run ApplicationStep 7. Run your Application

Step 1. Verify Installed Prerequisites #

Lets make sure you have everything in place. Run the following command:

dotnet --version; \
java -version; \
spark-shell --version; \
echo $DOTNET_WORKER_DIR;

Expected output

3.1.302
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1ubuntu1-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.6
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_252
Branch HEAD
Compiled by user holden on 2020-05-29T23:47:51Z
Revision 807e0a484d1de767d1f02bd8a622da6450bdf940
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
/home/usman/bin/Microsoft.Spark.Worker
dotnet --version & ^
java -version & ^
spark-shell --version & ^
echo %DOTNET_WORKER_DIR%

Expected output

3.1.302
java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.1
      /_/

Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_211
Branch
Compiled by user  on 2019-03-26T22:44:44Z
Revision
Url
Type --help for more information.
C:\bin\Microsoft.Spark.Worker\

The important bits highlighted above shows the .NET Core SDK version (3.1.x), Java version (1.8.x), Spark (2.4.x) + Scala (2.11.x) versions and the configured Spark Worker directory (DOTNET_WORKER_DIR).

Step 2. Retrieve Storage Account Access Key #

  1. You can access your Azure Storage Access Key by navigating to Azure Portal.

  2. Go to Storage Accounts > storage-account-name > Settings > Access Keys.

  3. Copy the value of Key under Key1 field.

Step 3. Persist Storage Account Name & Access Key as Environment Variables #

To avoid persisting secrets in code, we’ll be saving the Storage Account Name and Access Key as environment variables.

Replace <storage-account-name> with the name of your Storage Account, and <storage-account-key> with your Access Key and run these commands.

export AZURE_STORAGE_ACCOUNT="<storage-account-name>"
export AZURE_STORAGE_KEY="<storage-account-key>"
setx AZURE_STORAGE_ACCOUNT "<storage-account-name>"
setx AZURE_STORAGE_KEY "<storage-account-key>"

Step 4. Create a new App #

Run the following to create a new .NET console app.

dotnet new console -o mySparkBlobStorageApp
cd mySparkBlobStorageApp
dotnet new console -o mySparkBlobStorageApp -lang "F#"
cd mySparkBlobStorageApp

Step 5. Add Microsoft.Spark Package #

Run to install Microsoft.Spark package from NuGet.

dotnet add package Microsoft.Spark --version 0.10.0

Step 6. Write Code #

With our new project created, we’re now ready to write some code.

  1. Open Program.cs (C#) or Program.fs (F#) in Visual Studio.

  2. Add required references.

    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Types;
    using System.Collections.Generic;
    
    open System
    open Microsoft.Spark.Sql
    open Microsoft.Spark.Sql.Types
    
  3. Specify file path in Azure Storage where DataFrame will be saved. dotnet-spark is the name of the container used in this example.

    string filePath = 
        $"wasbs://dotnet-spark@{args[0]}.blob.core.windows.net/json/people.json";
    
    Because we wanted to avoid persisting secrets in code, we’ll be passing the values of our environment variables (AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_KEY) as arguments when running the app. These values will be accessible via args[0]and args[1] respectively.
    let filePath = 
        sprintf "wasbs://dotnet-spark@%s.blob.core.windows.net/json/people.json" storageAccountName
    
  4. Create new Spark Session with configs for Azure Storage. We’re also specifying the FileSystem to be used by Spark for wasbs scheme in the first config. Second config is for Storage Account credentials.

    SparkSession spark = SparkSession
        .Builder()
        .AppName("Azure Storage example using .NET for Apache Spark")
        .Config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        .Config($"fs.azure.account.key.{args[0]}.blob.core.windows.net", args[1])
        .GetOrCreate();
    
    let spark =
        SparkSession.Builder()
            .AppName("Azure Storage example using .NET for Apache Spark")
            .Config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
            .Config(
                sprintf "fs.azure.account.key.%s.blob.core.windows.net" storageAccountName,
                storageAccountKey)
            .GetOrCreate()
    
  5. Create list of sample data.

    var data = new List<GenericRow>
    {
        new GenericRow(new object[] { 1, "John Doe" }),
        new GenericRow(new object[] { 2, "Jane Doe" }),
        new GenericRow(new object[] { 3, "Foo Bar" })
    };
    
    let data = 
        [ GenericRow([|1; "John Doe"|]) 
            GenericRow([|2; "Jane Doe"|]) 
            GenericRow([|3; "Foo Bar"|]) ]
    
  6. Create schema for sample data.

    var schema = new StructType(new List<StructField>()
    {
        new StructField("Id", new IntegerType()),
        new StructField("Name", new StringType()),
    });
    
    let schema = 
        StructType
            ([ StructField("Id", IntegerType())
                StructField("Name", StringType()) ])
    
  7. Create new DataFrame using sample data and schema.

    DataFrame df = spark.CreateDataFrame(data, schema);
    df.Show();
    
    let df = spark.CreateDataFrame(data, schema)
    df.Show()
    
  8. Write DataFrame to Azure Storage using the filePath we created earlier.

    df.Write().Mode(SaveMode.Overwrite).Json(filePath);
    
    df.Write().Mode(SaveMode.Overwrite).Json(filePath)
    
  9. Read saved DataFrame from Azure Storage.

    DataFrame readDf = spark.Read().Json(filePath);
    readDf.Show();
    
    let readDf = spark.Read().Json(filePath)
    readDf.Show()
    
  10. At the end of the steps above, your program should look similar to this:

    using System;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Types;
    using System.Collections.Generic;
    
    namespace mySparkBlobStorageApp
    {
        class Program
        {
            static void Main(string[] args)
            {
                // Verify environment variables
                if (args.Length != 2)
                {
                    Console.Error.WriteLine("Usage: $AZURE_STORAGE_ACCOUNT $AZURE_STORAGE_KEY");
                    Environment.Exit(1);
                }
    
                // Specify file path in Azure Storage
                string filePath =
                    $"wasbs://dotnet-spark@{args[0]}.blob.core.windows.net/json/people.json";
                
                // Create SparkSession
                SparkSession spark = SparkSession
                    .Builder()
                    .AppName("Azure Storage example using .NET for Apache Spark")
                    .Config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
                    .Config($"fs.azure.account.key.{args[0]}.blob.core.windows.net", args[1])
                    .GetOrCreate();
    
                // Create sample data
                var data = new List<GenericRow>
                {
                    new GenericRow(new object[] { 1, "John Doe" }),
                    new GenericRow(new object[] { 2, "Jane Doe" }),
                    new GenericRow(new object[] { 3, "Foo Bar" })
                };
    
                // Create schema for sample data
                var schema = new StructType(new List<StructField>()
                {
                    new StructField("Id", new IntegerType()),
                    new StructField("Name", new StringType()),
                });
    
                // Create DataFrame using data and schema
                DataFrame df = spark.CreateDataFrame(data, schema);
    
                // Print DataFrame
                df.Show();
    
                // Write DataFrame to Azure Storage
                df.Write().Mode(SaveMode.Overwrite).Json(filePath);
    
                // Read saved DataFrame from Azure Storage
                DataFrame readDf = spark.Read().Json(filePath);
    
                // Print DataFrame
                readDf.Show();
            }
        }
    }
    
    open System
    open Microsoft.Spark.Sql
    open Microsoft.Spark.Sql.Types
    
    [<EntryPoint>]
    let main args =
        match args with
        | [| storageAccountName; storageAccountKey |] ->
    
            // Specify file path in Azure Storage
            let filePath = 
                sprintf "wasbs://dotnet-spark@%s.blob.core.windows.net/json/people.json" storageAccountName
    
            // Create SparkSession
            let spark =
                SparkSession.Builder()
                    .AppName("Azure Storage example using .NET for Apache Spark")
                    .Config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
                    .Config(
                        sprintf "fs.azure.account.key.%s.blob.core.windows.net" storageAccountName,
                        storageAccountKey)
                    .GetOrCreate()
    
            // Create sample data
            let data = 
                [ GenericRow([|1; "John Doe"|]) 
                  GenericRow([|2; "Jane Doe"|]) 
                  GenericRow([|3; "Foo Bar"|]) ]
    
            // Create schema for sample data
            let schema = 
                StructType
                    ([ StructField("Id", IntegerType())
                       StructField("Name", StringType()) ])
    
            // Create DataFrame using data and schema
            let df = spark.CreateDataFrame(data, schema)
    
            // Print DataFrame
            df.Show()
    
            // Write DataFrame to Azure Storage
            df.Write().Mode(SaveMode.Overwrite).Json(filePath)
    
            // Read saved DataFrame from Azure Storage
            let readDf = spark.Read().Json(filePath)
    
            // Print DataFrame
            readDf.Show()
    
            0
        | _ ->
            printfn "Usage: $AZURE_STORAGE_ACCOUNT $AZURE_STORAGE_KEY"
            1
    
    Make sure to save the changes.
  11. Go back to Terminal / Command Prompt and run this command to build the app.

    dotnet build
    

Step 7. Run your Application #

  1. Navigate to your build output directory.

    cd ./bin/Debug/netcoreapp3.1/
    
    cd .\bin\Debug\netcoreapp3.1\
    
  2. Submit application to run on Apache Spark.

    spark-submit \
    --packages org.apache.hadoop:hadoop-azure:2.7.3,com.microsoft.azure:azure-storage:3.1.0 \
    --class org.apache.spark.deploy.dotnet.DotnetRunner \
    --master local microsoft-spark-2.4.x-0.10.0.jar \
    ./mySparkBlobStorageApp $AZURE_STORAGE_ACCOUNT $AZURE_STORAGE_KEY
    
    spark-submit ^
    --packages org.apache.hadoop:hadoop-azure:2.7.3,com.microsoft.azure:azure-storage:3.1.0 ^
    --class org.apache.spark.deploy.dotnet.DotnetRunner ^
    --master local microsoft-spark-2.4.x-0.10.0.jar ^
    mySparkBlobStorageApp %AZURE_STORAGE_ACCOUNT% %AZURE_STORAGE_KEY%
    
    There’s a known issue when running on Windows where the ShutdownHookManager fails to clear temp directories (Exception while deleting Spark temp dir). This doesn’t affect the functionality of the app. However, there’s a workaround if you’d like to temporarily hide the errors.

    Note that we’re referencing Maven coordinates org.apache.hadoop:hadoop-azure:2.7.3 and com.microsoft.azure:azure-storage:3.1.0. These packages are required by Spark and Hadoop for Blob Storage access.

    Alternatively, you can supply the jars by downloading hadoop-azure-2.7.3.jar and azure-storage-3.1.0.jar. Copy downloaded jars to your build output directory and replace the line: --packages org.apache.hadoop:hadoop-azure:2.7.3,com.microsoft.azure:azure-storage:3.1.0 with --jars hadoop-azure-2.7.3.jar,azure-storage-3.1.0.jar in your spark-submit command.

    Expected output

    ...
    +---+--------+
    | Id|    Name|
    +---+--------+
    |  1|John Doe|
    |  2|Jane Doe|
    |  3| Foo Bar|
    +---+--------+
    ...
    +---+--------+
    | Id|    Name|
    +---+--------+
    |  1|John Doe|
    |  2|Jane Doe|
    |  3| Foo Bar|
    +---+--------+
    ...
    

The first table shows us our DataFrame was successfully created and saved to Azure Storage. The saved files were subsequently read back into the app and printed in the second table.

You can also view the saved files in your Storage Account via Azure Portal or Azure Storage Explorer.

The sample project for this article can be found here on GitHub.

Additional Resources #

comments powered by Disqus