|
内容概览:
动机
目标
设计
解决方案说明与下载
测试示例
A. 动机
SSIS提供了一强大的可视化的包编辑功能,太过于依懒可视化功能,使程序不够灵活,可重性低。需要把一些简单的,可重用性高的功能迁移到后台去做,而不必要再每个包添加可重用性较高的步骤,这样为以后的唯护减少众多的麻烦
B. 目标:
1、在包中添加连接管理器
2、在包中添加参数
3、在包中添加包配置
4、在包控制流中添加任务
5、在数据流中添加适配器(源与目标)
6、创建适配器这间的管道
7、处理适配器的输入列与输出列的映射
现在做的很简单,希望给有兴趣的朋友一个参考,由于没有向SSIS一样有可视化的数据验证提示功能,可能在创建时不小时就出现COM异常。
非常希望有兴趣的朋友给点意见,和有兴趣朋友去扩展它,让它更强大些,到时别忘了给我一份参考,也让我爽一把!
C. 简洁版创建SQL Server 数据集成包辅助类库设计
如下(图1)所示,类库中包含四大模块: 控制流(ControlFlow),数据流(DataFlow),连接管理器(Connections),外部数据源(ExternalMetadata) 。
控制流模块中中实现功能包含:
1.添加执行SQL任务
2.添加执行包任务
3.创建包中任务之间的连接
4.添加参数集合
5.添加包配置
数据流模块中实现功能包含:
1.创建一个数据流适配器组件
2.创建组件之间路径
3.获取数据流中的组件
4.映射上流数据流组件输出列与下流数据流组件的输入列
5.映射源适配器的输出列
6.映射目标适配器的输入输出列
连接管理器模块实现的功能包含:
1.向包中添加连接器
2.初始化平面文件连接管理器的配置
3.获取创建连接器的短名称
外部数据源
支持平面文件,源文件,OLEDB类型数据源。
(图1)
连接管理器模块
控制流模块
数据流模块
A. 解决方案说明与下载:
解决方案中包含有两个项目,一个是实现创建SSIS包对象辅助类库,一个是测试类库的一个示例。创建SSIS包对象辅助类库代码分四大块:添加连接管理器相关方法的类,添加控制流中任务中的相关方法类,添加数据流中组件的相关方法类,和提供创建包中对象的配置信息实体。
解决方案下载
E 测试示例
在这个测试中,主要是用创建好的Simple.SSIS辅助类去创建两个测式示例:一个是父包调子包,一个是数据清洗例子。父包调子包,父包中包含一个执行包任务,执行包的的任务就是执行已创建好的子包,子包中包含一个执行SQL任务。
数据清洗任务中,数据从一个文本文件中清洗到数据库中,过程中不做任何处理,如果数据库支持的数据
数据清洗示例中文本中的数据TestData.TXT
20080501|89|one
20080502|98|two
20080506|100|three
20080507|110|four
数据清洗示例中数据库表
Code
CREATE TABLE [dbo].[TestTable](
[col1] [nvarchar](100) NULL,
[col2] [int] NULL,
[col3] [nvarchar](100) NULL
) ON [PRIMARY]
Code
using System;
using System.Collections.Generic;
using System.Text;
using c = Simple.SSIS.Connections;
using Simple.SSIS.ControlFlow;
using Simple.SSIS.DataFlow;
using Microsoft.SqlServer.Dts.Runtime;
namespace Simple.SSISTest
{
class Program
{
static void Main(string[] args)
{
TestParent();
TestDataFlow();
}
///
/// 执行包的方法
///
///
public static void Excute(Package package)
{
DTSExecResult result = package.Execute();
//设置执行包后的信息
string message = null;
if (result.Equals(DTSExecResult.Success))
{
message = "包执行成功";
}
if (result.Equals(DTSExecResult.Failure))
{
for (int i = 0; i < package.Errors.Count; i++)
{
message += package.Errors.Description;
}
}
//输出包执行后信息
Console.Write(message);
Console.Read();
}
///
/// 保存包的方法
///
///
///
public static void SavePackage(string path, Package package)
{
Application app = new Application();
app.SaveToXml(path, package,null);
}
///
///创建一个父包执行子包的测试例子
///
static void TestParent()
{
Package parent = new Package();
Package child = new Package();
//创建一个父包
buildParent(parent);
//创建一个子包
buildChild(parent);
//保存包
SavePackage(@"d:\parent.dtsx",parent);
SavePackage(@"d:\child.dtsx", child);
//执行包
Excute(parent);
}
///
/// 创建一个数据清洗的测试例子,从平面文件到数据库
///
static void TestDataFlow( )
{
Package package = new Package();
//创建一个OLEDB连接器的配置实例
c.ConnectionConfig oledbCon = new c.ConnectionConfig();
oledbCon.Name = "OledbCon";
oledbCon.ConnectionString = "Data Source=.;Initial Catalog=AdventureWorks;Provider=SQLOLEDB.1;Integrated Security=SSPI;Persist Security Info=False;Auto Translate=False;";
oledbCon.Type = c.ConnectionType.OLEDB;
//创建一个平面文件连接器的配置实例
c.ConnectionConfig fileCon = new c.ConnectionConfig();
fileCon.Name = "FlatFileCon";
//fileCon.ConnectionString = @"D:\wokerflow\GROUP_MON_PRODUCT.200805.txt";
fileCon.ConnectionString = @"E:\MyProject\Excel\Simple.SSIS\TestData.txt";
fileCon.Type = c.ConnectionType.FLATFILE;
//向包中添加平面文件连接器
c.Connections.AddConnection(package, fileCon);
//向包中添加OLEDB连接管理器
c.Connections.AddConnection(package, oledbCon);
//初始化平面文件的列
c.Connections.InitFlatFileConnection(package,GetFlatFileConfig());
//向包控制流中添加一个数据流会任
ControlFlow.AddDataFlowTask(package,"dataFlow");
//创建一个数据流适配器配置实例,配置信息为平面文件源适配器
AdapterComponentConfig adSour=new AdapterComponentConfig() ;
adSour.AdapterType= AdapterType.FlatFileSource;
adSour.ConnectionName="FlatFileCon" ;
adSour.DataFlowTaskName="dataFlow" ;
adSour.Name="sourceFlat" ;
//创建一个数据流适配器配置实例,配置信息为OLEDB目标适配器
AdapterComponentConfig adDes = new AdapterComponentConfig();
adDes.Name = "desOLEDB";
adDes.DataFlowTaskName = "dataFlow";
adDes.ConnectionName = "OledbCon";
adDes.AdapterType = AdapterType.OLEDBDestination;
// 配置适配器的属性,访问模型为表或视图
Property pr_AccessMode = new Property();
pr_AccessMode.Name = "AccessMode";
pr_AccessMode.Value = 0;
// 配置适配器的属性,配置目标表的名称
Property pr_OpenRowset = new Property();
pr_OpenRowset.Name = "OpenRowset";
pr_OpenRowset.Value = "TestTable";
//向配适器中添加属性
adDes.SetProperty.Add("OpenRowset",pr_OpenRowset);
adDes.SetProperty.Add("AccessMode", pr_AccessMode);
//数据流中添加源和目标适配器
DataFlow.AddAdapterComponent(package, adSour);
DataFlow.AddAdapterComponent(package,adDes);
BuildComponentPathConfig pathConf = new BuildComponentPathConfig();
pathConf.DataFlowName = "dataFlow";
pathConf.FromComponentIndex = "sourceFlat";
pathConf.ToComponentIndex = "desOLEDB";
pathConf.FromComponentOutputIndex = 0;
pathConf.ToComponentInputIndex = 0;
//连接源和目标适配器
DataFlow.BuildComponentPath(package, pathConf);
//初始化源适配器列的映射
DataFlow.MapSourceAdapterComponentColumns(package, "dataFlow", "sourceFlat");
//初始化目标适配器列的映射
DataFlow.MapDestinationAdapterComponentColumns(package, "dataFlow", "desOLEDB");
SavePackage(@"d:\mypackage.dtsx",package);
Excute(package);
}
static c.FlatFileConnectionConfig GetFlatFileConfig()
{
// 创建一个平面文件连接器的配置实例
c.FlatFileConnectionConfig config = new c.FlatFileConnectionConfig();
config.Name = "FlatFileCon";
//配置行界定符为回车和换行
config.RowDelimiter = "\r\n";
Dictionary colums = new Dictionary();
c.FlatFileColumn col;
//创建一个列配置实例
col = new c.FlatFileColumn();
//配置列的界定符为"|"
col.ColumnDelimiter = "|";
col.Name = "col1";
colums.Add(col.Name, col);
col = new c.FlatFileColumn();
col.ColumnDelimiter = "|";
col.Name = "col2";
//col.DataType = c.DataType.Int;
colums.Add(col.Name, col);
//添加最后一列
col = new c.FlatFileColumn();
col.ColumnDelimiter = "\r\n";//最后一列的界定符为回车换行
col.Name = "col3" ;
colums.Add(col.Name, col);
config.Columns = colums;
return config;
}
///
/// 创建一个父包
///
///
static void buildParent(Package package)
{
c.ConnectionConfig fileCon = new c.ConnectionConfig();
fileCon.Name = "FileCon";
fileCon.ConnectionString = @"d:\child.dtsx";
fileCon.Type = c.ConnectionType.FILE;
//向包中添加一个源文件管理器
c.Connections.AddConnection(package,fileCon);
//向包中添加一个执行包任务
ControlFlow.AddExecutePackageTask(package,"FileCon");
}
///
/// 创建一个子包
///
///
static void buildChild(Package package)
{
c.ConnectionConfig oledbCon = new c.ConnectionConfig();
oledbCon.Name = "OledbCon";
oledbCon.ConnectionString = "Data Source=.;Initial Catalog=AdventureWorks;Provider=SQLOLEDB.1;Integrated Security=SSPI;Persist Security Info=False;Auto Translate=False;";
oledbCon.Type = c.ConnectionType.OLEDB;
// 向包中添加一个OLEDB连接器
c.Connections.AddConnection(package, oledbCon);
//向包中添加一个执着行SQL任务
SQLTaskConfig config = new SQLTaskConfig();
config.ConnectionName = "OledbCon";
config.ResultSetType = ResultSetType.None;
config.SqlStatementSourceType = SqlStatementSourceType.DirectInput;
config.SqlStatementSource = "select 'this is user' as test ";
}
}
}
结束!
|
|