Monthly Archives: June 2015

3 “hacks” for Hadoop and HDInsight Clusters

Here are 3 useful “hacks” I’ve uncovered while developing Hadoop MapReduce jobs on HDInsight clusters.

1. Use Multiple Storage Accounts with your HDInsight Cluster

Chances are if you are using an HDInsight cluster you are dealing with lots and lots of data.  With the announcement of Azure Data Lake this won’t be a problem for long, but right now you may be pushing up against the limits on Azure Storage accounts.  This is easily fixed by storing your data across multiple storage accounts, increasing the maximum IoPs and capacity dramatically with each account added.  A common naming convention could be used to make association between all storage accounts easier.  For example, the location below could be used, substituting the storage account name for each that is associated with the cluster.


You may be thinking ” But Andrew, I spin my clusters up to do processing and I don’t want to manage a configuration file with all of this storage account information!”.  Well luckily you don’t have to…

2.  Use Azure Subscription to store storage account state

We can specify an Azure subscription just for the storage that will be associated with our cluster.  We can then use PowerShell to get a reference to each storage account and attach them to our cluster during creation.  I’ll even give you the script to do it 🙂

#Create the configuration for  new cluster

$HDIClusterConfig = New-AzureHDInsightClusterConfig -ClusterSizeInNodes $clusterNodes     | Set-AzureHDInsightDefaultStorage -StorageAccountName     ${defaultStorageAccountName}” -StorageAccountKey     $defaultStorageAccountKey -StorageContainerName $defaultStorageContainerName |     Add-AzureHDInsightStorage -StorageAccountName     ${secondStorageAccountName}” -StorageAccountKey     $secondStorageAccountKey | Add-AzureHDInsightMetastore -SqlAzureServerName     ${metastoreServer}” -DatabaseName $metastoreDatabase     Credential $metastoreCred -MetastoreType HiveMetastore
#Access the Subscription where data is stored
Set-AzureSubscription -SubscriptionName $dataStorageSubscriptionName
select-AzureSubscription -SubscriptionName $dataStorageSubscriptionName
#Parses over the storage accounts in the subscription just accessed, and adds them to the cluster
Get-AzureStorageAccount | ForEach-Object
   {Get-AzureStorageKey -StorageAccountName $_.StorageAccountName} | ForEach-Object{
$HDIClusterConfig = Add-AzureHDInsightStorage -StorageAccountKey $_.Primary     StorageAccountName $_.StorageAccountName -Config $HDIClusterConfig
#Re-select the subscription where your cluster is hosted
Set-AzureSubscription -SubscriptionName $subscriptionName
select-AzureSubscription -SubscriptionName $subscriptionName
#Spin up the cluster!
New-AzureHDInsightCluster -Name $clusterName -Location $clusterLocation -Credential     $clusterCred -Config $HDIClusterConfig -Version 3.1

3. Create Input splits with blob file location, not data

Occasionally there is a use case where the file you are processing cannot be split without losing some of the data’s integrity.  Think of XML/JSON data and the need to keep the file whole.  With these types of files you can create an InputSplit which includes only the location to the file in Azure Storage and not the data itself.  This string can then be passed to the map task, where the logic to read the data will live.  You’ll need a good grasp of the map reduce operations before continuing.  Now on to more code examples!

How to create your list of InputSplits:

ArrayList<InputSplit> ret = new ArrayList<InputSplit>();

/*Do this for each path we receive.  Creates a directory of splits in this order s = input path (S1,1),(s2,1)…(sN,1),(s1,2),(sN,2),(sN,3) etc..
for (int i = numMinNameHashSplits; i <=     Math.min(numMaxNameHashSplits,numNameHashSplits1); i++) {
for (Path inputPath : inputPaths) {
  ret.add(new ParseDirectoryInputSplit(inputPath.toString(), i));
  System.out.println(i + ” “+inputPath.toString());
return ret;

Once the List<InputSplits> is assembled, each InputSplit is handed to a Record Reader class where each Key, Value, pair is read then passed to the map task.  The initialization of the recordreader class uses the InputSplit, a string representing the location of a “folder” of invoices in blob storage, to return a list of all blobs within the folder, the blobs variable below.  The below Java code demonstrates the creation of the record reader for each hashslot and the resulting list of blobs in that location.

Public class ParseDirectoryFileNameRecordReader

extends RecordReader<IntWritable, Text> {
private int nameHashSlot;
private int numNameHashSlots;
private Path myDir;
private Path currentPath;
private Iterator<ListBlobItem> blobs;
private int currentLocation;

public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
   myDir = ((ParseDirectoryInputSplit)split).getDirectoryPath();

//getNameHashSlot tells us which slot this record reader is responsible for
   nameHashSlot = ((ParseDirectoryInputSplit)split).getNameHashSlot();

//gets the total number of hashslots
   numNameHashSlots = getNumNameHashSplits(context.getConfiguration());

//gets the input credientals to the storage account assigned to this record reader.
   String inputCreds = getInputCreds(context.getConfiguration());

//break the directory path to get account name   
   String[] authComponents = myDir.toUri().getAuthority().split(“@”);
   String accountName = authComponents[1].split(“\\.”)[0];
   String containerName = authComponents[0];
   String accountKey = Utils.returnInputkey(inputCreds, accountName);
   System.out.println(“This mapper is assigned the following     account:”+accountName);
StorageCredentials creds = new        StorageCredentialsAccountAndKey(accountName,accountKey);
CloudStorageAccount account = new CloudStorageAccount(creds);
   CloudBlobClient client = account.createCloudBlobClient();
   CloudBlobContainer container =        client.getContainerReference(containerName);
blobs = container.listBlobs(myDir.toUri().getPath().substring(1) +     “/”,     true,EnumSet.noneOf(BlobListingDetails.class), null,null).iterator();
   currentLocation = –1;

Once initialized, the record reader is used to pass the next key to the map task.  This is controlled by the nextKeyValue method, and it is called every time map task starts.  The blow Java code demonstrates this.


//This checks if the next key value is assigned to this task or is assigned to another mapper.  If it assigned to this task the location is passed to the mapper, otherwise return false
public boolean nextKeyValue() throws IOException, InterruptedException {
while (blobs.hasNext()) {
  ListBlobItem currentBlob =;

//Returns a number between 1 and number of hashslots. If it matches the number assigned to this Mapper and its length is greater than 0, return the path to the map function
  if (doesBlobMatchNameHash(currentBlob) && getBlobLength(currentBlob) > 0) {
String[] pathComponents = currentBlob.getUri().getPath().split(“/”);

String pathWithoutContainer =
currentBlob.getUri().getPath().substring(pathComponents[1].length() + 1);

currentPath = new Path(myDir.toUri().getScheme(),     myDir.toUri().getAuthority(),pathWithoutContainer);

return true;
return false;

The logic in the map function is than simply as follows, with inputStream containing the entire XML string

Path inputFile = new Path(value.toString());
FileSystem fs = inputFile.getFileSystem(context.getConfiguration());

//Input stream contains all data from the blob in the location provided by Text
FSDataInputStream inputStream =;

Thanks to Mostafa for all the help getting this to work!   

Cool right!?  Now you can scale your HDInsight Clusters to unpresented size as well as process XML and JSON objects reliably with the Hadoop framework. 

Happy Coding


Streaming Analytics: IoT Data Producer


With everyone talking about IoT, the importance of telemetry data just continues to grow.  To successfully prepare for this paradigm shift, developers need a way to test their telemetry software without setting up a distributed network of test devices.  My favorite way of testing the applications I build uses a small C# application to throw data at an Azure event hub.  We can then see how well our architecture scales, if our Complex Event Processing(CEP) queries are working, and get some real performance benchmarks.  Let’s step through an example using an ecommerce website where we are sending all of our transactions to an event hub.  Our application consists of a number of threads, each creating and sending JSON objects as fast as possible. The thread itself isn’t overly interesting, so we will focus on how we coordinate the start of each task.  If it peaks your interest the full project can be found on my GitHub.


The magic to get the most out of your event hub, it is to send events directly to a partition and not just to the event hub endpoint.  You can create a EventHubSender that is associated with a specific partition and use that object to send your data.  Our problem arises when we have multiple threads and we want each to be sending to a different subset of partitions. This can be achieved by providing each thread a List<EventHubSender> object populated with EventHubSenders already associated with a partition.  Creation of the List of all available EventHubSenders is created below.


Machine generated alternative text:
//Creates a list of avaliable event hub senders based upon the number of partitions of the event hub 
// and avaliable threads 
int x — e; 
List(EventHubSender» senderList — new 
while (x description. Partitioncount) 
EventHubSender partitionedsender — client. Createpartitionedsender(description.partitionldstx)); 
senderList . Add(partitionedSender) ;


The master list must then be split up according to the number of threads that will be sending data to the endpoint.  This can be done using a Linq query to return a List<List<EventHubSender>> which can be associated with each thread


Machine generated alternative text:
var subLists 
1 reference 
— SplitToSub1ists(senderList); 
private static source) 
return source 
i) new Index — i, Value — x 
.GroupBy(x x. Index X numThreads) 
. Select(x x. Select(v v. Value). ToList()) 



Lastly, a task must be started to produce data and send it to the partition that is assigned.



Machine generated alternative text:
//create a list of tasks that independently send events to the event hub 
List(Task) taskList — new 
for (int i — e; i (int)numThreads; i++) 
int indexOfSub1ist i; 
taskList.Add(new Task(() SingleTask.Run(subListsLindexOfSub1istJ))); 
if (nunThreads 
" + numThreads + 
" threads. 
Press enter to continue and produce data"); 
Console. ReadKey( ) ; 
Console. of threads 
Tasks will not start. 
number of sender arrays. 
Console. Read ( ) ; 
// Start Each Event 
taskList. ForEach(a 
//Wait for all to end. 
taskList. ForEach(a 
a. Start()); 
This shouldn't happen but need this here or the project would close 




Using this approach I was able to produce over 3,000 events per second on each started thread.  My 12 partition event hub was able to keep up no problem and I was able to set up some Azure Stream Analytics queries too! 


Happy IoT Developing