Top 3 XAML Controls for Windows 10

I’m taking a quick departure from my typical Azure and Big Data topics to post a video walk through of Kat Harris‘s favorite XAML controls coming in Windows 10.  Kat’s demo really highlights the power of what’s possible with the new controls and she dives deeper into it here. Kat has also made her source code available on github if you are interested. 

My favorite XAML Controls for Windows 10, are

  • state triggers,
  • split views
  • relative panel controls

Coincidently these are the three outlined below 🙂  The local meet-up group I presented this at really enjoyed the walk though and I hope you do too!

 

~Andrew

 

3 “hacks” for Hadoop and HDInsight Clusters

Here are 3 useful “hacks” I’ve uncovered while developing Hadoop MapReduce jobs on HDInsight clusters.

1. Use Multiple Storage Accounts with your HDInsight Cluster

Chances are if you are using an HDInsight cluster you are dealing with lots and lots of data.  With the announcement of Azure Data Lake this won’t be a problem for long, but right now you may be pushing up against the limits on Azure Storage accounts.  This is easily fixed by storing your data across multiple storage accounts, increasing the maximum IoPs and capacity dramatically with each account added.  A common naming convention could be used to make association between all storage accounts easier.  For example, the location below could be used, substituting the storage account name for each that is associated with the cluster.

http://<storageaccount>.blob.core.windows.net/data/06-08-2015/<filename>

You may be thinking ” But Andrew, I spin my clusters up to do processing and I don’t want to manage a configuration file with all of this storage account information!”.  Well luckily you don’t have to…

2.  Use Azure Subscription to store storage account state

We can specify an Azure subscription just for the storage that will be associated with our cluster.  We can then use PowerShell to get a reference to each storage account and attach them to our cluster during creation.  I’ll even give you the script to do it 🙂

#Create the configuration for  new cluster

$HDIClusterConfig = New-AzureHDInsightClusterConfig -ClusterSizeInNodes $clusterNodes     | Set-AzureHDInsightDefaultStorage -StorageAccountName     ${defaultStorageAccountName}.blob.core.windows.net” -StorageAccountKey     $defaultStorageAccountKey -StorageContainerName $defaultStorageContainerName |     Add-AzureHDInsightStorage -StorageAccountName     ${secondStorageAccountName}.blob.core.windows.net” -StorageAccountKey     $secondStorageAccountKey | Add-AzureHDInsightMetastore -SqlAzureServerName     ${metastoreServer}.database.windows.net” -DatabaseName $metastoreDatabase     Credential $metastoreCred -MetastoreType HiveMetastore
 
#Access the Subscription where data is stored
Set-AzureSubscription -SubscriptionName $dataStorageSubscriptionName
select-AzureSubscription -SubscriptionName $dataStorageSubscriptionName
 
#Parses over the storage accounts in the subscription just accessed, and adds them to the cluster
Get-AzureStorageAccount | ForEach-Object
   {Get-AzureStorageKey -StorageAccountName $_.StorageAccountName} | ForEach-Object{
$HDIClusterConfig = Add-AzureHDInsightStorage -StorageAccountKey $_.Primary     StorageAccountName $_.StorageAccountName -Config $HDIClusterConfig
   }
}
#Re-select the subscription where your cluster is hosted
Set-AzureSubscription -SubscriptionName $subscriptionName
select-AzureSubscription -SubscriptionName $subscriptionName
 
#Spin up the cluster!
New-AzureHDInsightCluster -Name $clusterName -Location $clusterLocation -Credential     $clusterCred -Config $HDIClusterConfig -Version 3.1
 

3. Create Input splits with blob file location, not data

Occasionally there is a use case where the file you are processing cannot be split without losing some of the data’s integrity.  Think of XML/JSON data and the need to keep the file whole.  With these types of files you can create an InputSplit which includes only the location to the file in Azure Storage and not the data itself.  This string can then be passed to the map task, where the logic to read the data will live.  You’ll need a good grasp of the map reduce operations before continuing.  Now on to more code examples!

How to create your list of InputSplits:

ArrayList<InputSplit> ret = new ArrayList<InputSplit>();

/*Do this for each path we receive.  Creates a directory of splits in this order s = input path (S1,1),(s2,1)…(sN,1),(s1,2),(sN,2),(sN,3) etc..
*/
for (int i = numMinNameHashSplits; i <=     Math.min(numMaxNameHashSplits,numNameHashSplits1); i++) {
for (Path inputPath : inputPaths) {
  ret.add(new ParseDirectoryInputSplit(inputPath.toString(), i));
  System.out.println(i + ” “+inputPath.toString());
}
}
return ret;
  }
}

Once the List<InputSplits> is assembled, each InputSplit is handed to a Record Reader class where each Key, Value, pair is read then passed to the map task.  The initialization of the recordreader class uses the InputSplit, a string representing the location of a “folder” of invoices in blob storage, to return a list of all blobs within the folder, the blobs variable below.  The below Java code demonstrates the creation of the record reader for each hashslot and the resulting list of blobs in that location.

Public class ParseDirectoryFileNameRecordReader

extends RecordReader<IntWritable, Text> {
private int nameHashSlot;
private int numNameHashSlots;
private Path myDir;
private Path currentPath;
private Iterator<ListBlobItem> blobs;
private int currentLocation;

public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
   myDir = ((ParseDirectoryInputSplit)split).getDirectoryPath();

//getNameHashSlot tells us which slot this record reader is responsible for
   nameHashSlot = ((ParseDirectoryInputSplit)split).getNameHashSlot();

//gets the total number of hashslots
   numNameHashSlots = getNumNameHashSplits(context.getConfiguration());

//gets the input credientals to the storage account assigned to this record reader.
   String inputCreds = getInputCreds(context.getConfiguration());

//break the directory path to get account name   
   String[] authComponents = myDir.toUri().getAuthority().split(“@”);
   String accountName = authComponents[1].split(“\\.”)[0];
   String containerName = authComponents[0];
   String accountKey = Utils.returnInputkey(inputCreds, accountName);
   System.out.println(“This mapper is assigned the following     account:”+accountName);
StorageCredentials creds = new        StorageCredentialsAccountAndKey(accountName,accountKey);
CloudStorageAccount account = new CloudStorageAccount(creds);
   CloudBlobClient client = account.createCloudBlobClient();
   CloudBlobContainer container =        client.getContainerReference(containerName);
blobs = container.listBlobs(myDir.toUri().getPath().substring(1) +     “/”,     true,EnumSet.noneOf(BlobListingDetails.class), null,null).iterator();
   currentLocation = –1;
return;
}

Once initialized, the record reader is used to pass the next key to the map task.  This is controlled by the nextKeyValue method, and it is called every time map task starts.  The blow Java code demonstrates this.

 
 

//This checks if the next key value is assigned to this task or is assigned to another mapper.  If it assigned to this task the location is passed to the mapper, otherwise return false
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
while (blobs.hasNext()) {
  ListBlobItem currentBlob = blobs.next();

//Returns a number between 1 and number of hashslots. If it matches the number assigned to this Mapper and its length is greater than 0, return the path to the map function
  if (doesBlobMatchNameHash(currentBlob) && getBlobLength(currentBlob) > 0) {
String[] pathComponents = currentBlob.getUri().getPath().split(“/”);

String pathWithoutContainer =
currentBlob.getUri().getPath().substring(pathComponents[1].length() + 1);

currentPath = new Path(myDir.toUri().getScheme(),     myDir.toUri().getAuthority(),pathWithoutContainer);

currentLocation++;
return true;
}
    }
return false;
}

The logic in the map function is than simply as follows, with inputStream containing the entire XML string

Path inputFile = new Path(value.toString());
FileSystem fs = inputFile.getFileSystem(context.getConfiguration());

//Input stream contains all data from the blob in the location provided by Text
FSDataInputStream inputStream = fs.open(inputFile);

Thanks to Mostafa for all the help getting this to work!   

Cool right!?  Now you can scale your HDInsight Clusters to unpresented size as well as process XML and JSON objects reliably with the Hadoop framework. 

Happy Coding

~Andrew

Streaming Analytics: IoT Data Producer

 

With everyone talking about IoT, the importance of telemetry data just continues to grow.  To successfully prepare for this paradigm shift, developers need a way to test their telemetry software without setting up a distributed network of test devices.  My favorite way of testing the applications I build uses a small C# application to throw data at an Azure event hub.  We can then see how well our architecture scales, if our Complex Event Processing(CEP) queries are working, and get some real performance benchmarks.  Let’s step through an example using an ecommerce website where we are sending all of our transactions to an event hub.  Our application consists of a number of threads, each creating and sending JSON objects as fast as possible. The thread itself isn’t overly interesting, so we will focus on how we coordinate the start of each task.  If it peaks your interest the full project can be found on my GitHub.

 

The magic to get the most out of your event hub, it is to send events directly to a partition and not just to the event hub endpoint.  You can create a EventHubSender that is associated with a specific partition and use that object to send your data.  Our problem arises when we have multiple threads and we want each to be sending to a different subset of partitions. This can be achieved by providing each thread a List<EventHubSender> object populated with EventHubSenders already associated with a partition.  Creation of the List of all available EventHubSenders is created below.

 

Machine generated alternative text:
//Creates a list of avaliable event hub senders based upon the number of partitions of the event hub 
// and avaliable threads 
int x — e; 
List(EventHubSender» senderList — new 
while (x description. Partitioncount) 
EventHubSender partitionedsender — client. Createpartitionedsender(description.partitionldstx)); 
senderList . Add(partitionedSender) ;

 

The master list must then be split up according to the number of threads that will be sending data to the endpoint.  This can be done using a Linq query to return a List<List<EventHubSender>> which can be associated with each thread

 

Machine generated alternative text:
var subLists 
1 reference 
— SplitToSub1ists(senderList); 
private static source) 
return source 
i) new Index — i, Value — x 
.GroupBy(x x. Index X numThreads) 
. Select(x x. Select(v v. Value). ToList()) 
.TOList();

 

 

Lastly, a task must be started to produce data and send it to the partition that is assigned.

 

 

Machine generated alternative text:
//create a list of tasks that independently send events to the event hub 
List(Task) taskList — new 
for (int i — e; i (int)numThreads; i++) 
int indexOfSub1ist i; 
taskList.Add(new Task(() SingleTask.Run(subListsLindexOfSub1istJ))); 
if (nunThreads 
subLists.Count) 
" + numThreads + 
" threads. 
Press enter to continue and produce data"); 
Console. ReadKey( ) ; 
else 
Console. of threads 
Tasks will not start. 
Increase 
number of sender arrays. 
Console. Read ( ) ; 
// Start Each Event 
taskList. ForEach(a 
//Wait for all to end. 
taskList. ForEach(a 
a. Start()); 
This shouldn't happen but need this here or the project would close 
a.Wait());

 

 

 

Using this approach I was able to produce over 3,000 events per second on each started thread.  My 12 partition event hub was able to keep up no problem and I was able to set up some Azure Stream Analytics queries too! 

 

Happy IoT Developing

~Andrew

Microsoft Virtual Academy

Many of you probably know about Microsoft Virtual Academy but in case you don’t, here is a quick reminder.

What is Microsoft Virtual Academy?

Microsoft Virtual Academy is a place to go to learn about Microsoft technology from those who know it best.  Typically talks are given by those with close ties to the product teams or even the developers themselves.  They offer the ability to get a class setting without ever having to get out of your basketball shorts.  So order in some food, maybe grab a beverage, and get to watching!

 

Brand new to Hadoop?  Personally I think modules 1,2, 5, and 6 are enough to get you started

http://www.microsoftvirtualacademy.com/training-courses/implementing-big-data-analysis

 

Already a Data Scientist?  Or an aspiring Data Scientist?

http://www.microsoftvirtualacademy.com/training-courses/getting-started-with-microsoft-azure-machine-learning

 

Have you been hearing a lot about Streaming Analytics and want to learn the concepts? Check out Module 1 and research Azure Stream Analytics to see the newest tech.

http://www.microsoftvirtualacademy.com/training-courses/breakthrough-insights-using-microsoft-sql-server-2012-training-scalable-data-warehouse

 

Trying to take your Excel skillz to the next level?  PowerBI Baby

http://www.microsoftvirtualacademy.com/training-courses/faster-insights-to-data-with-power-bi-jump-start

 

~Andrew

Hive Date Format Manipulation

Doing some work at a customer recently, I ran into a tricky problem around handling dates.  Within the data, we had date expressed as yyyyMMdd.  After trying to create an external table over the file location it was apparent that hive didn’t want to play nicely with this format.  Unfortunately, when creating a date data type hive expects the data to be yyyy-MM-dd format and will import null when in any other format.  We were getting worried that we may have to build some custom code to reformat the data.

After looking closer at the date functions, it became apparent that it’s possible to convert a date string into a unix timestamp while specifying the expected input format.  After we have a unix timestamp it’s possible to convert that back into a date with the yyyy-MM-dd format!

 

CREATE EXTERNAL TABLE dateExampleStaging ( dt STRING, otherData STRING)

LOCATION “wasb://<container>@<storage>.blob.core.windows.net/files/”

 

CREATE TABLE dateExampleFinal (dt DATE, otherData STRING)

 

INSERT INTO TABLE dateExampleFinal

SELECT

from_unixtime(unix_timestamp(dt,’yyyyMMdd’ ),

otherData

FROM

dateExampleStaging

 

Wala!! your date worries are no more.  A cool and easy trick

Using OAuth to connect to Streaming Twitter API

Twitter just updated their API’s to v1.1 and with that update, now require Open Authentication or OAuth to access their streaming API.  OAuth has been required to access their REST API for some time now but it is a new requirement for their streaming services.  The first step is to get your authentication token from twitter.  Login on their developers page and choose my applications from the dropdown menu

image

Chose create new application and fill out the form.  If you do not have a website or callback url just add a place holder for now.  After creating you application scroll to the bottom of the page and click create my access token

image

At the top of this page there are tabs, choose the OAuth tool and you will see the details of your token there.

image

We will use this token in the code snippet below.  This code is from the twitter streaming class in the demo located at twitterbigdata.codeplex.com.

var oauth_consumer_key = “Enter your consumer key here”;
var oauth_consumer_secret = “Enter your consumer secret key here”;
var oauth_token = “Enter your token here”;
var oauth_token_secret = “Enter your token secret key here”;
var oauth_version = “1.0”;
var oauth_signature_method = “HMAC-SHA1”;
// unique request details
var oauth_nonce = Convert.ToBase64String(
new ASCIIEncoding().GetBytes(DateTime.Now.Ticks.ToString()));
var timeSpan = DateTime.UtcNow
– new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
var oauth_timestamp = Convert.ToInt64(timeSpan.TotalSeconds).ToString();
var resource_url = “https://stream.twitter.com/1.1/statuses/filter.json”;

  // create oauth signature(this could be different for the normal Twitter API as well as any other social API’s
var baseFormat = “oauth_consumer_key={0}&oauth_nonce={1}&oauth_signature_method={2}” +
“&oauth_timestamp={3}&oauth_token={4}&oauth_version={5}&track={6}”;
var baseString = string.Format(baseFormat,
oauth_consumer_key,
oauth_nonce,
oauth_signature_method,
oauth_timestamp,
oauth_token,
oauth_version,
Uri.EscapeDataString(_config.Parameters)
);
baseString = string.Concat(“POST&”, Uri.EscapeDataString(resource_url), “&”, Uri.EscapeDataString(baseString));
var compositeKey = string.Concat(Uri.EscapeDataString(oauth_consumer_secret),
“&”, Uri.EscapeDataString(oauth_token_secret));
string oauth_signature;
using (HMACSHA1 hasher = new HMACSHA1(ASCIIEncoding.ASCII.GetBytes(compositeKey)))
{
oauth_signature = Convert.ToBase64String(
hasher.ComputeHash(ASCIIEncoding.ASCII.GetBytes(baseString)));
}
           // create the request header
           var headerFormat = “OAuth oauth_nonce=\”{0}\”, oauth_signature_method=\”{1}\”, ” +
“oauth_timestamp=\”{2}\”, oauth_consumer_key=\”{3}\”, ” +
“oauth_token=\”{4}\”, oauth_signature=\”{5}\”, ” +
“oauth_version=\”{6}\””;
var authHeader = string.Format(headerFormat,
Uri.EscapeDataString(oauth_nonce),
Uri.EscapeDataString(oauth_signature_method),
Uri.EscapeDataString(oauth_timestamp),
Uri.EscapeDataString(oauth_consumer_key),
Uri.EscapeDataString(oauth_token),
Uri.EscapeDataString(oauth_signature),
Uri.EscapeDataString(oauth_version)
);
// make the request
ServicePointManager.Expect100Continue = false;
var postBody = “track=”+_config.Parameters; // “screen_name=” + Uri.EscapeDataString(screen_name);//
resource_url += “?” + postBody;
HttpWebRequest request = (HttpWebRequest)WebRequest.Create(resource_url);
request.Headers.Add(“Authorization”, authHeader);
request.Method = “POST”;
request.ContentType = “application/x-www-form-urlencoded”;
request.PreAuthenticate = true;
request.AllowWriteStreamBuffering = true;
WebResponse response = request.GetResponse();
dreturn new StreamReader(response.GetResponseStream());
If you add your token and consumer information to the above code and use this as the twitter streaming class in the project at twitterbigdata.codeplex.com you will be able to access twitters streaming API.  This can also be used as a base when using OAuth v1 with any other social sites/API’s.

Use StreamInsight to Assign Sentiment Scores to a List of Items in a CSV (Part Two: Assign Sentiment)

In part one of this series, we discussed how to import a CSV file and then read from it using StreamInsight.  This post will take that a step further and show how to use the Sentiment140 API to assign sentiment to a string.  Within the ForEach(activeInterval) loop there are two calls that are crucial to assigning sentiment(below).

//Score the current SocialBlurb’s content attribute.
var result = sentiment.Analyze( activeInterval.l.CONTENT);
//Set the active SocialBlob’s sentiment140_mood to the sentiment received.
activeInterval.l.Sentiment140_Mood = (int) result.Mood;

Below is the Sentiment140 class that these calls relate to.  The analyze function takes the content, appends it to a URL, makes/receives a HTTP call, parses the result, and returns the sentiment score.

public class Sentiment140
{
private string _jsonURL = @”http://www.sentiment140.com/api/classify”;
public SentimentAnalysisResult Analyze(string textToAnalyze)
{
//Format url to include the text we want to analyze
          string url = string.Format(“{0}?text={1}”, this._jsonURL,
HttpUtility.UrlEncode(textToAnalyze, System.Text.Encoding.UTF8));
//Maximum url length check and set to neutral if larger than allowed
if (url.Length > 600)
{
SentimentAnalysisResult results = new SentimentAnalysisResult() { Mood = SentimentScore.Neutral, Probability = 100 };
return results;
}

//Create the HTTP Request
var request = HttpWebRequest.Create(url);
SentimentAnalysisResult result = new SentimentAnalysisResult() { Mood = SentimentScore.Neutral, Probability = 100 };
try
{
//Get the Response from Sentiment140
var response = request.GetResponse();
using (var streamReader = new StreamReader(response.GetResponseStream()))
{
// Read from source
var line = streamReader.ReadLine();
// Parse
var jObject = JObject.Parse(line);
int polarity = jObject.SelectToken(“results”, true).SelectToken(“polarity”, true).Value<int>();
switch (polarity)
{
case 0:
result.Mood = SentimentScore.Negative;
break;
case 4:
result.Mood = SentimentScore.Positive;
break;
default: // 2 or others
result.Mood = SentimentScore.Neutral;
break;
}
response.Close();
}
}
catch (System.Exception)
{
result.Mood = SentimentScore.Neutral;
}
return result;
}
}

This specific project uses Sentiment140 to assign the sentiment scores but could just as easily use another sentiment engine that has API calls.  The framework would stay similar but the URL and Parsing would change slightly to fit what was required by the new API.
Combining these two blog posts makes it is possible to read from a CSV file using StreamInsight and add sentiment values to the content from the CSV.  This is a solution that fits nicely with customers who are using a service that crawls the web and returns mentions of their enterprise.  Most of these services provide some sentiment scoring capability but if a customer wants to implement their own engine, a sentiment engine that is not offered though the service, or multiple engines, this solution is one we can offer to them.

Use StreamInsight to Assign Sentiment Scores to a List of Items in a CSV (Part One: Reading Input)

There will come a time when you need to read from a CSV file using StreamInsight.  It may be to match reference data with a current stream or utilize the processing power of StreamInsight to do many things at once.  This example uses the observable programming method to add input from a CSV to a list then enumerate through that list creating point events.

First things first, we need to get the data from the CSV file into a list.  I was using a “small” data set and was able to load the entire file into one list.  When working with larger files, simply process the CSV file in chunks.  The CSV parser(KBCsv) that I used can be found here.  The code below is taking the CSV file, reading it in, and creating a list of the social blurb class.

if (File.Exists(filepath))

{
using (var reader = new CsvReader(filepath))
{
reader.ReadHeaderRecord();
while (reader.HasMoreRecords)
{
var columnline = reader.ReadDataRecord();
myList.Add(new SocialBlurb())
{          ARTICLE_ID = columnline[0].Trim(),
HEADLINE = columnline[1].Trim(),
AUTHOR = columnline[2].Trim(),
CONTENT = columnline[3].Trim()
}}}}
Now that we have a list of SocialBlurb’s we need to create a SI application, turn the list into a point stream, add a CTI event, and do processing.  The code for this is shown below.

using (var server = Server.Create(“StreamInsightDefault”))
{
//Create SI app on Server
var application = server.CreateApplication(“My Application”);

//Query of all Items in the List
Var listquery = from l in myList
select l;
//Add CTI to the point event stream by increasing starttime
var streamquery = listquery.AsEnumerable().ToPointStream(application,
l => PointEvent.CreateInsert(DateTime.Now, new { l }),   AdvanceTimeSettings.IncreasingStartTime);

//read the results as starttime and payload(SocialBlurb)
var results = from Pointevent in streamquery.ToPointEnumerable()
where Pointevent.EventKind != EventKind.Cti
select new
{
Pointevent.StartTime,
Pointevent.Payload.l
};
//Do something with each point event(the Sentiment part will come in a part 2 post 🙂 )
foreach (var activeInterval in results)
{
var sentiment = new Sentiment140();
var result = sentiment.Analyze activeInterval.l.CONTENT);
activeInterval.l.Sentiment140_Mood = (int) result.Mood;

Console.WriteLine(activeInterval.l.AUTHOR);
Console.WriteLine(activeInterval.l.CONTENT);
Console.WriteLine(activeInterval.l.Sentiment140_Mood);
}
This post explains how to read lines from a CSV file and turn that input into a point event stream.  My next post will detail the sentiment scoring class shown above and point out how another sentiment engine could be implemented.