微软在Hadoop出现不久就已经开始关注,在Azure云服务出现后,微软实现一个Windows平台的Hadoop发行版名为HDInsight,并且在Azure中提供了名为HDInsight的大数据服务。在应用开发中,微软提供了名为Microsoft HDInsight Emulator for Windows Azure的工具来支持开发过程中的调试。这个工具已经过时,不过现在仍然可以使用Web Platform Installer 5.0来安装。实际上这个时期的微软版Hadoop - HDInsight是基于Hortonworks Data Platform for Windows开发的,HDInsight在这个时期也是Windows和Azure平台专属。
这个时期微软还提供了一系列的Framework用于简化使用C#开发Hadoop(准确的说是微软版Hadoop - HDInsight),最初放在Codeplex上,称为Microsoft .NET SDK For Hadoop,现在里面有部分API已经过时了,像是Microsoft.Hadoop.Client等。这些API提供使用C#编写MapReduce的功能,但仅能用于基于Windows的HDInsight。 当下
后来微软拥抱开源,拥抱Linux,微软的产品也不再单一的限于Windows平台或着以.NET作为开发框架。对于HDInsight的开发微软依然继续与Hortonworks合作,但这时的HDInsight(3.4版以后)只有Linux版本。(微软拥抱Linux,Hortonworks的HDP发行版都没有Windows版了)运行在Azure上的HDInsight服务也都是跑在Linux服务器上。对于本地开发可以使用Hortonworks提供的HDP(Hortonworks Data Platform) SandBox作为虚拟环境。HDP SandBox提供了3种不同运行环境的镜像,VirtualBox、VMware及Docker。楼主比较喜欢Docker,有关Docker版HDP SandBox的配置及使用VS连接这个虚拟环境的方法详见此文。
随着微软拥抱Linux,基于Windows的大数据集群不再被支持(HDP for Windows也不再有后续版本了),微软也全面转向基于Linux的大数据集群(包括部署在Azure中的HDInsights也都是运行在Linux系统之上),这些C#写的API都不再被支持(主要原因还是这些基于.NET Framework的程序无法运行在Linux上,只能等未来.NET Core普及了)。
Hadoop API for .NET是微软推出的第一套用于Hadoop的.NET库。Hadoop组件中Hadoop Streaming用来支持与其它语言协同工作完成MapReduce(按国际惯例下文缩写为MR)任务的编写。Hadoop API for .NET包装了Hadoop Streaming方便使用.NET平台语言来编写MR任务。
使用Hadoop API for .NET:
可以更方便的提交MR任务,而不用通过Hadoop Streaming命令行
提供了Mapper、Reducer及Combiner更好的包装类作为基类,方便MR的编写
自动包含依赖的.NET程序集一起作为streaming任务提交
提供StreamingUnit进行对Mapper、Reducer及Combiner的本地单元测试
通过JSON序列化及反序列化,支持输入输出的强类型
Hadoop API for .NET支持的存储包括HDFS和Azure Blob Storage。输入内容的格式就是惯常的\n\r分割行(记录),\r分割列。
配置测试运行环境
要运行这个示例,需要在Windows上安装前面提到的Microsoft HDInsight Emulator for Windows Azure。这个工具需要通过Microsoft Web Platform Installer来安装,搜索hdinsight,结果第一项就是我们要安装的工具:
点击添加 - 安装,耐心等候个半小时(需要下载大约1.2G的文件)
这其中最重要的部分就是用于Windows平台的HDP(Hortonworks Data Platform for Windows),其所使用的是OpenJDK1.7的一个分支 - AZUL公司的Zulu。
安装完成后,桌面会多出3个快捷方式:
static void Main(string[] args)
{
Console.WriteLine("The application is running ...");
var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = ExistingClusterUsername, Password = ExistingClusterPassword };
_hdiJobManagementClient = new HDInsightJobManagementClient(ExistingClusterUri, clusterCredentials);
SubmitMRJob();
Console.WriteLine("Press ENTER to continue ...");
Console.ReadLine();
}
private static void SubmitMRJob()
{
var paras = new MapReduceStreamingJobSubmissionParameters
{
Files = new List<string>() { "/example/app/NetMapper.exe", "/example/app/NetReducer.exe" },
Mapper = "NetMapper.exe",
Reducer = "NetReducer.exe",
Input= "/example/data/gutenberg/davinci.txt",
Output = "/example/data/StreamingOutput/wc.txt"
};
Console.WriteLine("Submitting the MR job to the cluster...");
var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceStreamingJob(paras);
var jobId = jobResponse.JobSubmissionJsonResponse.Id;
Console.WriteLine("Response status code is " + jobResponse.StatusCode);
Console.WriteLine("JobId is " + jobId);
Console.WriteLine("Waiting for the job completion ...");
// Wait for job completion
var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
while (!jobDetail.Status.JobComplete)
{
Thread.Sleep(1000);
jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
}
// Get job output
var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey,
DefaultStorageContainerName);
var output = (jobDetail.ExitValue == 0)
? _hdiJobManagementClient.JobManagement.GetJobOutput(jobId, storageAccess) // fetch stdout output in case of success
: _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); // fetch stderr output in case of failure
Console.WriteLine("Job output is: ");
using (var reader = new StreamReader(output, Encoding.UTF8))
{
string value = reader.ReadToEnd();
Console.WriteLine(value);
}
}
public> {
static void Main(string[] args)
{
ILoggerFactory loggerFactory = new LoggerFactory()
.AddConsole()
.AddDebug();
ILogger logger = loggerFactory.CreateLogger<Program>();
logger.LogInformation(
"This is a test of the emergency broadcast system.");
private static void SubmitMRJob()
{
var paras = new MapReduceStreamingJobSubmissionParameters
{
Files = new List<string>()
{
"/example/coreapp",
},
Mapper = "dotnet coreapp/NetCoreMapper.dll",
Reducer = "dotnet coreapp/NetCoreReducer.dll",
Input = "/example/data/gutenberg/davinci.txt",
Output = "/example/data/StreamingOutput/wc.txt"
};
Console.WriteLine("Submitting the MR job to the cluster...");
var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceStreamingJob(paras);
var jobId = jobResponse.JobSubmissionJsonResponse.Id;
Console.WriteLine("Response status code is " + jobResponse.StatusCode);
Console.WriteLine("JobId is " + jobId);
Console.WriteLine("Waiting for the job completion ...");
// Wait for job completion
var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
while (!jobDetail.Status.JobComplete)
{
Thread.Sleep(1000);
jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
}
// Get job output
var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey,
DefaultStorageContainerName);
var output = (jobDetail.ExitValue == 0)
? _hdiJobManagementClient.JobManagement.GetJobOutput(jobId, storageAccess) // fetch stdout output in case of success
: _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); // fetch stderr output in case of failure
Console.WriteLine("Job output is: ");
using (var reader = new StreamReader(output, Encoding.UTF8))
{
string value = reader.ReadToEnd();
Console.WriteLine(value);
}