.NET for Apache Spark - Write and Read data from Azure Data Lake Storage Gen1
August 22, 2020
Introduction #
For the second article in the .NET for Apache Spark series, we’ll be looking into how to interact with Azure Data Lake Storage Gen1. We’ll be creating a DataFrame, writing the DataFrame into a Data Lake and retrieving the written data back into our application.
Prerequisites #
To get started:
- You’ll need an Azure Data Lake Storage Gen 1 account.
- Download and install .NET Core 3.1 SDK, JDK 8 and Apache Spark 2.4.1. You can have a look at .NET for Apache Spark - Write and Read data from Azure Blob Storage for instructions on how to setup these dependencies.
Steps #
The steps to be taken are show in the outline below:
Section | Steps |
---|---|
Setup and Configure Environment | Step 1. Verify Installed Prerequisites Step 2. Create a Service Principal Step 3. Grant Service Principal access to Data Lake Step 4. Save Service Principal & Data Lake name as Environment Variables |
Build Application | Step 5. Create a new App Step 6. Add Microsoft.Spark Package Step 7. Write Code |
Run Application | Step 8. Run your Application |
Step 1. Verify Installed Prerequisites #
The first step is ensuring you have the required prerequisites installed. Run the following command:
dotnet --version; \
java -version; \
spark-shell --version; \
echo $DOTNET_WORKER_DIR;
Expected output
3.1.302
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1ubuntu1-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.6
/_/
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_252
Branch HEAD
Compiled by user holden on 2020-05-29T23:47:51Z
Revision 807e0a484d1de767d1f02bd8a622da6450bdf940
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
/home/usman/bin/Microsoft.Spark.Worker
dotnet --version & ^
java -version & ^
spark-shell --version & ^
echo %DOTNET_WORKER_DIR%
Expected output
3.1.302
java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_211
Branch
Compiled by user on 2019-03-26T22:44:44Z
Revision
Url
Type --help for more information.
C:\bin\Microsoft.Spark.Worker\
The important bits highlighted above shows the .NET Core SDK version (3.1.x
), Java version (1.8.x
),
Spark (2.4.x
) + Scala (2.11.x
) versions, and the configured Spark Worker directory (DOTNET_WORKER_DIR
).
Step 2. Create a Service Principal #
We need to create a service principal which our application can use to access the Data Lake. This can be done via Azure Portal, Azure Cloud Shell or using Azure CLI on your local machine.
For Azure Portal, this guide from Microsoft Docs is detailed and covers all steps required. Make sure to retrieve your Tenant ID (Directory ID), service principal’s Client ID (Application ID), and Client Secret.
For Azure CLI method, follow the steps below to create and access your service principal credentials.
Login to your Azure account by running the command below and following the prompt.
az login
Replace
myApp
in the command below with a name for your service principal,<subscription-id>
with your Azure Subscription ID, and<group-name>
with the name of your resource group. Subsequently, run this command.az ad sp create-for-rbac --name "myApp" --role contributor --scopes /subscriptions/<subscription-id>/resourceGroups/<group-name> --sdk-auth
Letter/white spaces invalidates the name of a service principal, hence not allowed.
The results of the executed command should look similar to this:
{ "clientId": "<client-id>", "clientSecret": "<client-secret>", "subscriptionId": "<subscription-id>", "tenantId": "<tenant-id>", "activeDirectoryEndpointUrl": "https://login.microsoftonline.com", "resourceManagerEndpointUrl": "https://management.azure.com/", "activeDirectoryGraphResourceId": "https://graph.windows.net/", "sqlManagementEndpointUrl": "https://management.core.windows.net:8443/", "galleryEndpointUrl": "https://gallery.azure.com/", "managementEndpointUrl": "https://management.core.windows.net/" }
Copy the values of
clientId
,clientSecret
andtenantId
from the json result.
Step 3. Grant Service Principal access to Data Lake #
Again, this can be achieved via the Data Lake Access control (IAM) settings on Azure Portal. Alternatively, for Azure CLI:
Get the Object ID (OID) of your service principal.
az ad sp show --id <client-id> --query objectId
Copy the output value.
Set a Read, Write and Execute ACL entry for the service principal by replacing
<data-lake-name>
with the name of your Data Lake,<object-id>
with the value from the above command (without the quotes), and run this command.az dls fs access set-entry --account <data-lake-name> --path / --acl-spec user:<object-id>:rwx
Final step is to recursively apply the updated permissions to all files in the Data Lake. Unfortunately, there’s no support for this in Azure CLI.
You need to go to: Data Lake Gen 1
> Data explorer > Access > Advanced. Click Apply to children.
Step 4. Save Service Principal & Data Lake name as Environment Variables #
The next step is to save the service principal’s Client ID (ADLS_SP_CLIENT_ID
), Client Secret (ADLS_SP_CLIENT_SECRET
), Tenant ID (TENANT_ID
) and Data Lake Storage name (ADLS_NAME
) as environment variables.
export TENANT_ID="<tenant-id>"
export ADLS_NAME="<data-lake-name>"
export ADLS_SP_CLIENT_ID="<client-id>"
export ADLS_SP_CLIENT_SECRET="<client-secret>"
setx TENANT_ID "<tenant-id>"
setx ADLS_NAME "<data-lake-name>"
setx ADLS_SP_CLIENT_ID "<client-id>"
setx ADLS_SP_CLIENT_SECRET "<client-secret>"
Step 5. Create a new App #
With our development environment now configured, we can create a new application. Run the following to create a new .NET console app.
dotnet new console -o mySparkDataLakeGen1App
cd mySparkDataLakeGen1App
dotnet new console -o mySparkDataLakeGen1App -lang "F#"
cd mySparkDataLakeGen1App
Step 6. Add Microsoft.Spark Package #
Run to install Microsoft.Spark 0.12.1 package from NuGet.
dotnet add package Microsoft.Spark --version 0.12.1
Step 7. Write Code #
Open
Program.cs
(C#) orProgram.fs
(F#) in Visual Studio.Add required references.
using Microsoft.Spark.Sql; using Microsoft.Spark.Sql.Types; using System.Collections.Generic;
open System open Microsoft.Spark.Sql open Microsoft.Spark.Sql.Types
Specify file path in the Data Lake where our DataFrame will be saved. To spice things up a bit, we’ll be using Parquet format for this example.
string filePath = $"adl://{args[1]}.azuredatalakestore.net/parquet/people.parquet";
We’ll be passing the values of our environment variables (
TENANT_ID
,ADLS_NAME
,ADLS_SP_CLIENT_ID
,ADLS_SP_CLIENT_SECRET
) as arguments when running the app. These values will be accessible viaargs[0]
,args[1]
,args[2]
andargs[3]
respectively.let filePath = sprintf "adl://%s.azuredatalakestore.net/parquet/people.parquet" dataLakeName
Create new Spark Session with config for the Data Lake. Here, we’re specifying the FileSystem to be used for the
adl
scheme, and also providing the service principal credentials for authentication.SparkSession spark = SparkSession .Builder() .AppName("Azure Data Lake Storage example using .NET for Apache Spark") .Config("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem") .Config("fs.adl.oauth2.access.token.provider.type", "ClientCredential") .Config("fs.adl.oauth2.client.id", args[2]) .Config("fs.adl.oauth2.credential", args[3]) .Config("fs.adl.oauth2.refresh.url", $"https://login.microsoftonline.com/{args[0]}/oauth2/token") .GetOrCreate();
let spark = SparkSession.Builder() .AppName("Azure Data Lake Storage example using .NET for Apache Spark") .Config("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem") .Config("fs.adl.oauth2.access.token.provider.type", "ClientCredential") .Config("fs.adl.oauth2.client.id",servicePrincipalClientId) .Config("fs.adl.oauth2.credential", servicePrincipalClientSecret) .Config("fs.adl.oauth2.refresh.url", sprintf "https://login.microsoftonline.com/%s/oauth2/token" tenantId) .GetOrCreate()
Create list of sample data.
var data = new List<GenericRow> { new GenericRow(new object[] { 1, "John Doe" }), new GenericRow(new object[] { 2, "Jane Doe" }), new GenericRow(new object[] { 3, "Foo Bar" }) };
let data = [ GenericRow([|1; "John Doe"|]) GenericRow([|2; "Jane Doe"|]) GenericRow([|3; "Foo Bar"|]) ]
Create schema for sample data.
var schema = new StructType(new List<StructField>() { new StructField("Id", new IntegerType()), new StructField("Name", new StringType()), });
let schema = StructType ([ StructField("Id", IntegerType()) StructField("Name", StringType()) ])
Create new DataFrame using sample data and schema.
DataFrame df = spark.CreateDataFrame(data, schema); df.Show();
let df = spark.CreateDataFrame(data, schema) df.Show()
Write DataFrame to the Data Lake using the
filePath
we created earlier.df.Write().Mode(SaveMode.Overwrite).Parquet(filePath);
df.Write().Mode(SaveMode.Overwrite).Parquet(filePath)
Read saved files back from the Data Lake in a new DataFrame.
DataFrame readDf = spark.Read().Parquet(filePath); readDf.Show();
let readDf = spark.Read().Parquet(filePath) readDf.Show()
At the end of the steps above, your program should look similar to this:
using System; using Microsoft.Spark.Sql; using Microsoft.Spark.Sql.Types; using System.Collections.Generic; namespace mySparkDataLakeGen1App { class Program { static void Main(string[] args) { // Verify environment variables if (args.Length != 4) { Console.Error.WriteLine("Usage: $TENANT_ID $ADLS_NAME $ADLS_SP_CLIENT_ID $ADLS_SP_CLIENT_SECRET"); Environment.Exit(1); } // Specify file path in Azure Data Lake Gen1 string filePath = $"adl://{args[1]}.azuredatalakestore.net/parquet/people.parquet"; // Create SparkSession SparkSession spark = SparkSession .Builder() .AppName("Azure Data Lake Storage example using .NET for Apache Spark") .Config("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem") .Config("fs.adl.oauth2.access.token.provider.type", "ClientCredential") .Config("fs.adl.oauth2.client.id", args[2]) .Config("fs.adl.oauth2.credential", args[3]) .Config("fs.adl.oauth2.refresh.url", $"https://login.microsoftonline.com/{args[0]}/oauth2/token") .GetOrCreate(); // Create sample data var data = new List<GenericRow> { new GenericRow(new object[] { 1, "John Doe"}), new GenericRow(new object[] { 2, "Jane Doe"}), new GenericRow(new object[] { 3, "Foo Bar"}) }; // Create schema for sample data var schema = new StructType(new List<StructField>() { new StructField("Id", new IntegerType()), new StructField("Name", new StringType()), }); // Create DataFrame using data and schema DataFrame df = spark.CreateDataFrame(data, schema); // Print DataFrame df.Show(); // Write DataFrame to Azure Data Lake Gen1 df.Write().Mode(SaveMode.Overwrite).Parquet(filePath); // Read saved DataFrame from Azure Data Lake Gen1 DataFrame readDf = spark.Read().Parquet(filePath); // Print DataFrame readDf.Show(); } } }
open System open Microsoft.Spark.Sql open Microsoft.Spark.Sql.Types [<EntryPoint>] let main args = match args with | [| tenantId; dataLakeName; servicePrincipalClientId; servicePrincipalClientSecret |] -> // Specify file path in Azure Data Lake Gen1 let filePath = sprintf "adl://%s.azuredatalakestore.net/parquet/people.parquet" dataLakeName // Create SparkSession let spark = SparkSession.Builder() .AppName("Azure Data Lake Storage example using .NET for Apache Spark") .Config("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem") .Config("fs.adl.oauth2.access.token.provider.type", "ClientCredential") .Config("fs.adl.oauth2.client.id",servicePrincipalClientId) .Config("fs.adl.oauth2.credential", servicePrincipalClientSecret) .Config("fs.adl.oauth2.refresh.url", sprintf "https://login.microsoftonline.com/%s/oauth2/token" tenantId) .GetOrCreate() // Create sample data let data = [ GenericRow([|1; "John Doe"|]) GenericRow([|2; "Jane Doe"|]) GenericRow([|3; "Foo Bar"|]) ] // Create schema for sample data let schema = StructType ([ StructField("Id", IntegerType()) StructField("Name", StringType()) ]) // Create DataFrame using data and schema let df = spark.CreateDataFrame(data, schema) // Print DataFrame df.Show() // Write DataFrame to Azure Data Lake Gen1 df.Write().Mode(SaveMode.Overwrite).Parquet(filePath) // Read saved DataFrame from Azure Data Lake Gen1 let readDf = spark.Read().Parquet(filePath) // Print DataFrame readDf.Show() 0 | _ -> printfn "Usage: $TENANT_ID $ADLS_NAME $ADLS_SP_CLIENT_ID $ADLS_SP_CLIENT_SECRET" 1
Make sure to save the changes.
Go back to Terminal / Command Prompt and run this command to build the app.
dotnet build
Step 8. Run your Application #
Navigate to your build output directory.
cd ./bin/Debug/netcoreapp3.1/
cd .\bin\Debug\netcoreapp3.1\
Submit application to run on Apache Spark.
spark-submit \ --packages com.rentpath:hadoop-azure-datalake:2.7.3-0.1.0 \ --class org.apache.spark.deploy.dotnet.DotnetRunner \ --master local microsoft-spark-2.4.x-0.12.1.jar \ ./mySparkDataLakeGen1App $TENANT_ID $ADLS_NAME $ADLS_SP_CLIENT_ID $ADLS_SP_CLIENT_SECRET
spark-submit ^ --packages com.rentpath:hadoop-azure-datalake:2.7.3-0.1.0 ^ --class org.apache.spark.deploy.dotnet.DotnetRunner ^ --master local microsoft-spark-2.4.x-0.12.1.jar ^ mySparkDataLakeGen1App %TENANT_ID% %ADLS_NAME% %ADLS_SP_CLIENT_ID% %ADLS_SP_CLIENT_SECRET%
There’s a known issue when running on Windows where the
ShutdownHookManager
fails to clear temp directories (Exception while deleting Spark temp dir
). This doesn’t affect the functionality of the app. However, there’s a workaround if you’d like to temporarily hide the errors.Here, we’re using the Maven package
com.rentpath:hadoop-azure-datalake:2.7.3-0.1.0
. You can also supply the jar instead by downloading hadoop-azure-datalake-2.7.3-0.1.0.jar. Copy downloaded jar to your build output directory and replace the line:--packages com.rentpath:hadoop-azure-datalake:2.7.3-0.1.0
with--jars hadoop-azure-datalake-2.7.3-0.1.0.jar
in the command above.Expected output
... +---+--------+ | Id| Name| +---+--------+ | 1|John Doe| | 2|Jane Doe| | 3| Foo Bar| +---+--------+ ... +---+--------+ | Id| Name| +---+--------+ | 1|John Doe| | 2|Jane Doe| | 3| Foo Bar| +---+--------+ ...
The first table shows us our DataFrame was successfully created and saved to Azure Data Lake Storage Gen1. Furthermore, the saved files were read back into the app and printed in the second table. You can also view the saved files in your Data Lake via Azure Portal or Azure Storage Explorer.
The sample project for this article can be found here on GitHub.