Tag Archives: HDInsight

Setting Hive Variables in Hadoop

 

Taking hive from the world of demos into production code almost always results in setting hive variables within your production script. You can set hive variables for table names, locations, partitions etc… For this example we are going to use some sample data that comes on an HDInsight cluster to play with variables.

Continue reading

3 “hacks” for Hadoop and HDInsight Clusters

Here are 3 useful “hacks” I’ve uncovered while developing Hadoop MapReduce jobs on HDInsight clusters.

1. Use Multiple Storage Accounts with your HDInsight Cluster

Chances are if you are using an HDInsight cluster you are dealing with lots and lots of data.  With the announcement of Azure Data Lake this won’t be a problem for long, but right now you may be pushing up against the limits on Azure Storage accounts.  This is easily fixed by storing your data across multiple storage accounts, increasing the maximum IoPs and capacity dramatically with each account added.  A common naming convention could be used to make association between all storage accounts easier.  For example, the location below could be used, substituting the storage account name for each that is associated with the cluster.

http://<storageaccount>.blob.core.windows.net/data/06-08-2015/<filename>

You may be thinking ” But Andrew, I spin my clusters up to do processing and I don’t want to manage a configuration file with all of this storage account information!”.  Well luckily you don’t have to…

2.  Use Azure Subscription to store storage account state

We can specify an Azure subscription just for the storage that will be associated with our cluster.  We can then use PowerShell to get a reference to each storage account and attach them to our cluster during creation.  I’ll even give you the script to do it 🙂

#Create the configuration for  new cluster

$HDIClusterConfig = New-AzureHDInsightClusterConfig -ClusterSizeInNodes $clusterNodes     | Set-AzureHDInsightDefaultStorage -StorageAccountName     ${defaultStorageAccountName}.blob.core.windows.net” -StorageAccountKey     $defaultStorageAccountKey -StorageContainerName $defaultStorageContainerName |     Add-AzureHDInsightStorage -StorageAccountName     ${secondStorageAccountName}.blob.core.windows.net” -StorageAccountKey     $secondStorageAccountKey | Add-AzureHDInsightMetastore -SqlAzureServerName     ${metastoreServer}.database.windows.net” -DatabaseName $metastoreDatabase     Credential $metastoreCred -MetastoreType HiveMetastore
 
#Access the Subscription where data is stored
Set-AzureSubscription -SubscriptionName $dataStorageSubscriptionName
select-AzureSubscription -SubscriptionName $dataStorageSubscriptionName
 
#Parses over the storage accounts in the subscription just accessed, and adds them to the cluster
Get-AzureStorageAccount | ForEach-Object
   {Get-AzureStorageKey -StorageAccountName $_.StorageAccountName} | ForEach-Object{
$HDIClusterConfig = Add-AzureHDInsightStorage -StorageAccountKey $_.Primary     StorageAccountName $_.StorageAccountName -Config $HDIClusterConfig
   }
}
#Re-select the subscription where your cluster is hosted
Set-AzureSubscription -SubscriptionName $subscriptionName
select-AzureSubscription -SubscriptionName $subscriptionName
 
#Spin up the cluster!
New-AzureHDInsightCluster -Name $clusterName -Location $clusterLocation -Credential     $clusterCred -Config $HDIClusterConfig -Version 3.1
 

3. Create Input splits with blob file location, not data

Occasionally there is a use case where the file you are processing cannot be split without losing some of the data’s integrity.  Think of XML/JSON data and the need to keep the file whole.  With these types of files you can create an InputSplit which includes only the location to the file in Azure Storage and not the data itself.  This string can then be passed to the map task, where the logic to read the data will live.  You’ll need a good grasp of the map reduce operations before continuing.  Now on to more code examples!

How to create your list of InputSplits:

ArrayList<InputSplit> ret = new ArrayList<InputSplit>();

/*Do this for each path we receive.  Creates a directory of splits in this order s = input path (S1,1),(s2,1)…(sN,1),(s1,2),(sN,2),(sN,3) etc..
*/
for (int i = numMinNameHashSplits; i <=     Math.min(numMaxNameHashSplits,numNameHashSplits1); i++) {
for (Path inputPath : inputPaths) {
  ret.add(new ParseDirectoryInputSplit(inputPath.toString(), i));
  System.out.println(i + ” “+inputPath.toString());
}
}
return ret;
  }
}

Once the List<InputSplits> is assembled, each InputSplit is handed to a Record Reader class where each Key, Value, pair is read then passed to the map task.  The initialization of the recordreader class uses the InputSplit, a string representing the location of a “folder” of invoices in blob storage, to return a list of all blobs within the folder, the blobs variable below.  The below Java code demonstrates the creation of the record reader for each hashslot and the resulting list of blobs in that location.

Public class ParseDirectoryFileNameRecordReader

extends RecordReader<IntWritable, Text> {
private int nameHashSlot;
private int numNameHashSlots;
private Path myDir;
private Path currentPath;
private Iterator<ListBlobItem> blobs;
private int currentLocation;

public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
   myDir = ((ParseDirectoryInputSplit)split).getDirectoryPath();

//getNameHashSlot tells us which slot this record reader is responsible for
   nameHashSlot = ((ParseDirectoryInputSplit)split).getNameHashSlot();

//gets the total number of hashslots
   numNameHashSlots = getNumNameHashSplits(context.getConfiguration());

//gets the input credientals to the storage account assigned to this record reader.
   String inputCreds = getInputCreds(context.getConfiguration());

//break the directory path to get account name   
   String[] authComponents = myDir.toUri().getAuthority().split(“@”);
   String accountName = authComponents[1].split(“\\.”)[0];
   String containerName = authComponents[0];
   String accountKey = Utils.returnInputkey(inputCreds, accountName);
   System.out.println(“This mapper is assigned the following     account:”+accountName);
StorageCredentials creds = new        StorageCredentialsAccountAndKey(accountName,accountKey);
CloudStorageAccount account = new CloudStorageAccount(creds);
   CloudBlobClient client = account.createCloudBlobClient();
   CloudBlobContainer container =        client.getContainerReference(containerName);
blobs = container.listBlobs(myDir.toUri().getPath().substring(1) +     “/”,     true,EnumSet.noneOf(BlobListingDetails.class), null,null).iterator();
   currentLocation = –1;
return;
}

Once initialized, the record reader is used to pass the next key to the map task.  This is controlled by the nextKeyValue method, and it is called every time map task starts.  The blow Java code demonstrates this.

 
 

//This checks if the next key value is assigned to this task or is assigned to another mapper.  If it assigned to this task the location is passed to the mapper, otherwise return false
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
while (blobs.hasNext()) {
  ListBlobItem currentBlob = blobs.next();

//Returns a number between 1 and number of hashslots. If it matches the number assigned to this Mapper and its length is greater than 0, return the path to the map function
  if (doesBlobMatchNameHash(currentBlob) && getBlobLength(currentBlob) > 0) {
String[] pathComponents = currentBlob.getUri().getPath().split(“/”);

String pathWithoutContainer =
currentBlob.getUri().getPath().substring(pathComponents[1].length() + 1);

currentPath = new Path(myDir.toUri().getScheme(),     myDir.toUri().getAuthority(),pathWithoutContainer);

currentLocation++;
return true;
}
    }
return false;
}

The logic in the map function is than simply as follows, with inputStream containing the entire XML string

Path inputFile = new Path(value.toString());
FileSystem fs = inputFile.getFileSystem(context.getConfiguration());

//Input stream contains all data from the blob in the location provided by Text
FSDataInputStream inputStream = fs.open(inputFile);

Thanks to Mostafa for all the help getting this to work!   

Cool right!?  Now you can scale your HDInsight Clusters to unpresented size as well as process XML and JSON objects reliably with the Hadoop framework. 

Happy Coding

~Andrew