.NET for Apache Spark - Write and Read data from Azure Blob Storage
August 6, 2020
Introduction #
In this article, we’ll look into how to create a DataFrame, write the DataFrame into Azure Blob Storage, and read the written data back into our application.
Prerequisites #
To get started:
- You’ll need Azure Storage account.
- Complete .NET for Apache Spark Tutorial - Get started in 10 minutes. This guide will show you how to download and install .NET Core 3.1 SDK, JDK 8 and Apache Spark 2.4.1.
- You’ll also need to download and install Visual Studio 2019 or Visual Studio Code.
Steps #
We’ve categorized the steps to be taken in the outline below:
Section | Steps |
---|---|
Setup and Configure Environment | Step 1. Verify Installed Prerequisites Step 2. Retrieve Storage Account Access Key Step 3. Persist Storage Account Name & Access Key as Environment Variables |
Build Application | Step 4. Create a new App Step 5. Add Microsoft.Spark Package Step 6. Write Code |
Run Application | Step 7. Run your Application |
Step 1. Verify Installed Prerequisites #
Lets make sure you have everything in place. Run the following command:
dotnet --version; \
java -version; \
spark-shell --version; \
echo $DOTNET_WORKER_DIR;
Expected output
3.1.302
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1ubuntu1-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.6
/_/
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_252
Branch HEAD
Compiled by user holden on 2020-05-29T23:47:51Z
Revision 807e0a484d1de767d1f02bd8a622da6450bdf940
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
/home/usman/bin/Microsoft.Spark.Worker
dotnet --version & ^
java -version & ^
spark-shell --version & ^
echo %DOTNET_WORKER_DIR%
Expected output
3.1.302
java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_211
Branch
Compiled by user on 2019-03-26T22:44:44Z
Revision
Url
Type --help for more information.
C:\bin\Microsoft.Spark.Worker\
The important bits highlighted above shows the .NET Core SDK version (3.1.x
), Java version (1.8.x
),
Spark (2.4.x
) + Scala (2.11.x
) versions and the configured Spark Worker directory (DOTNET_WORKER_DIR
).
Step 2. Retrieve Storage Account Access Key #
You can access your Azure Storage Access Key by navigating to Azure Portal.
Go to Storage Accounts > storage-account-name > Settings > Access Keys.
Copy the value of
Key
under Key1 field.
Step 3. Persist Storage Account Name & Access Key as Environment Variables #
To avoid persisting secrets in code, we’ll be saving the Storage Account Name and Access Key as environment variables.
Replace <storage-account-name>
with the name of your Storage Account, and <storage-account-key>
with your Access Key and run these commands.
export AZURE_STORAGE_ACCOUNT="<storage-account-name>"
export AZURE_STORAGE_KEY="<storage-account-key>"
setx AZURE_STORAGE_ACCOUNT "<storage-account-name>"
setx AZURE_STORAGE_KEY "<storage-account-key>"
Step 4. Create a new App #
Run the following to create a new .NET console app.
dotnet new console -o mySparkBlobStorageApp
cd mySparkBlobStorageApp
dotnet new console -o mySparkBlobStorageApp -lang "F#"
cd mySparkBlobStorageApp
Step 5. Add Microsoft.Spark Package #
Run to install Microsoft.Spark package from NuGet.
dotnet add package Microsoft.Spark --version 0.10.0
Step 6. Write Code #
With our new project created, we’re now ready to write some code.
Open
Program.cs
(C#) orProgram.fs
(F#) in Visual Studio.Add required references.
using Microsoft.Spark.Sql; using Microsoft.Spark.Sql.Types; using System.Collections.Generic;
open System open Microsoft.Spark.Sql open Microsoft.Spark.Sql.Types
Specify file path in Azure Storage where DataFrame will be saved.
dotnet-spark
is the name of the container used in this example.string filePath = $"wasbs://dotnet-spark@{args[0]}.blob.core.windows.net/json/people.json";
Because we wanted to avoid persisting secrets in code, we’ll be passing the values of our environment variables (
AZURE_STORAGE_ACCOUNT
,AZURE_STORAGE_KEY
) as arguments when running the app. These values will be accessible viaargs[0]
andargs[1]
respectively.let filePath = sprintf "wasbs://dotnet-spark@%s.blob.core.windows.net/json/people.json" storageAccountName
Create new Spark Session with configs for Azure Storage. We’re also specifying the FileSystem to be used by Spark for
wasbs
scheme in the first config. Second config is for Storage Account credentials.SparkSession spark = SparkSession .Builder() .AppName("Azure Storage example using .NET for Apache Spark") .Config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") .Config($"fs.azure.account.key.{args[0]}.blob.core.windows.net", args[1]) .GetOrCreate();
let spark = SparkSession.Builder() .AppName("Azure Storage example using .NET for Apache Spark") .Config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") .Config( sprintf "fs.azure.account.key.%s.blob.core.windows.net" storageAccountName, storageAccountKey) .GetOrCreate()
Create list of sample data.
var data = new List<GenericRow> { new GenericRow(new object[] { 1, "John Doe" }), new GenericRow(new object[] { 2, "Jane Doe" }), new GenericRow(new object[] { 3, "Foo Bar" }) };
let data = [ GenericRow([|1; "John Doe"|]) GenericRow([|2; "Jane Doe"|]) GenericRow([|3; "Foo Bar"|]) ]
Create schema for sample data.
var schema = new StructType(new List<StructField>() { new StructField("Id", new IntegerType()), new StructField("Name", new StringType()), });
let schema = StructType ([ StructField("Id", IntegerType()) StructField("Name", StringType()) ])
Create new DataFrame using sample data and schema.
DataFrame df = spark.CreateDataFrame(data, schema); df.Show();
let df = spark.CreateDataFrame(data, schema) df.Show()
Write DataFrame to Azure Storage using the
filePath
we created earlier.df.Write().Mode(SaveMode.Overwrite).Json(filePath);
df.Write().Mode(SaveMode.Overwrite).Json(filePath)
Read saved DataFrame from Azure Storage.
DataFrame readDf = spark.Read().Json(filePath); readDf.Show();
let readDf = spark.Read().Json(filePath) readDf.Show()
At the end of the steps above, your program should look similar to this:
using System; using Microsoft.Spark.Sql; using Microsoft.Spark.Sql.Types; using System.Collections.Generic; namespace mySparkBlobStorageApp { class Program { static void Main(string[] args) { // Verify environment variables if (args.Length != 2) { Console.Error.WriteLine("Usage: $AZURE_STORAGE_ACCOUNT $AZURE_STORAGE_KEY"); Environment.Exit(1); } // Specify file path in Azure Storage string filePath = $"wasbs://dotnet-spark@{args[0]}.blob.core.windows.net/json/people.json"; // Create SparkSession SparkSession spark = SparkSession .Builder() .AppName("Azure Storage example using .NET for Apache Spark") .Config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") .Config($"fs.azure.account.key.{args[0]}.blob.core.windows.net", args[1]) .GetOrCreate(); // Create sample data var data = new List<GenericRow> { new GenericRow(new object[] { 1, "John Doe" }), new GenericRow(new object[] { 2, "Jane Doe" }), new GenericRow(new object[] { 3, "Foo Bar" }) }; // Create schema for sample data var schema = new StructType(new List<StructField>() { new StructField("Id", new IntegerType()), new StructField("Name", new StringType()), }); // Create DataFrame using data and schema DataFrame df = spark.CreateDataFrame(data, schema); // Print DataFrame df.Show(); // Write DataFrame to Azure Storage df.Write().Mode(SaveMode.Overwrite).Json(filePath); // Read saved DataFrame from Azure Storage DataFrame readDf = spark.Read().Json(filePath); // Print DataFrame readDf.Show(); } } }
open System open Microsoft.Spark.Sql open Microsoft.Spark.Sql.Types [<EntryPoint>] let main args = match args with | [| storageAccountName; storageAccountKey |] -> // Specify file path in Azure Storage let filePath = sprintf "wasbs://dotnet-spark@%s.blob.core.windows.net/json/people.json" storageAccountName // Create SparkSession let spark = SparkSession.Builder() .AppName("Azure Storage example using .NET for Apache Spark") .Config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") .Config( sprintf "fs.azure.account.key.%s.blob.core.windows.net" storageAccountName, storageAccountKey) .GetOrCreate() // Create sample data let data = [ GenericRow([|1; "John Doe"|]) GenericRow([|2; "Jane Doe"|]) GenericRow([|3; "Foo Bar"|]) ] // Create schema for sample data let schema = StructType ([ StructField("Id", IntegerType()) StructField("Name", StringType()) ]) // Create DataFrame using data and schema let df = spark.CreateDataFrame(data, schema) // Print DataFrame df.Show() // Write DataFrame to Azure Storage df.Write().Mode(SaveMode.Overwrite).Json(filePath) // Read saved DataFrame from Azure Storage let readDf = spark.Read().Json(filePath) // Print DataFrame readDf.Show() 0 | _ -> printfn "Usage: $AZURE_STORAGE_ACCOUNT $AZURE_STORAGE_KEY" 1
Make sure to save the changes.
Go back to Terminal / Command Prompt and run this command to build the app.
dotnet build
Step 7. Run your Application #
Navigate to your build output directory.
cd ./bin/Debug/netcoreapp3.1/
cd .\bin\Debug\netcoreapp3.1\
Submit application to run on Apache Spark.
spark-submit \ --packages org.apache.hadoop:hadoop-azure:2.7.3,com.microsoft.azure:azure-storage:3.1.0 \ --class org.apache.spark.deploy.dotnet.DotnetRunner \ --master local microsoft-spark-2.4.x-0.10.0.jar \ ./mySparkBlobStorageApp $AZURE_STORAGE_ACCOUNT $AZURE_STORAGE_KEY
spark-submit ^ --packages org.apache.hadoop:hadoop-azure:2.7.3,com.microsoft.azure:azure-storage:3.1.0 ^ --class org.apache.spark.deploy.dotnet.DotnetRunner ^ --master local microsoft-spark-2.4.x-0.10.0.jar ^ mySparkBlobStorageApp %AZURE_STORAGE_ACCOUNT% %AZURE_STORAGE_KEY%
There’s a known issue when running on Windows where the
ShutdownHookManager
fails to clear temp directories (Exception while deleting Spark temp dir
). This doesn’t affect the functionality of the app. However, there’s a workaround if you’d like to temporarily hide the errors.Note that we’re referencing Maven coordinates
org.apache.hadoop:hadoop-azure:2.7.3
andcom.microsoft.azure:azure-storage:3.1.0
. These packages are required by Spark and Hadoop for Blob Storage access.Alternatively, you can supply the jars by downloading hadoop-azure-2.7.3.jar and azure-storage-3.1.0.jar. Copy downloaded jars to your build output directory and replace the line:
--packages org.apache.hadoop:hadoop-azure:2.7.3,com.microsoft.azure:azure-storage:3.1.0
with--jars hadoop-azure-2.7.3.jar,azure-storage-3.1.0.jar
in your spark-submit command.Expected output
... +---+--------+ | Id| Name| +---+--------+ | 1|John Doe| | 2|Jane Doe| | 3| Foo Bar| +---+--------+ ... +---+--------+ | Id| Name| +---+--------+ | 1|John Doe| | 2|Jane Doe| | 3| Foo Bar| +---+--------+ ...
The first table shows us our DataFrame was successfully created and saved to Azure Storage. The saved files were subsequently read back into the app and printed in the second table.
You can also view the saved files in your Storage Account via Azure Portal or Azure Storage Explorer.
The sample project for this article can be found here on GitHub.