|
用perl做数据库迁移,从MSSQL到MYSQL(三)--V1.1版~多线程+handlerSocket
从前边的程序的运行情况来看,程序是可以运行的,但速度太扯了,在读写1000W条之前速度还是可以的(大概2000条/秒左右),但过了1000W之后(变成400条/秒左右),当然这个与SQL SERVER读取,网络还有服务器等性能都是有关系的,但,这速度,不晓得有测试过的朋友受不受不了,我反正是受不了的,于是想了下,单线程慢,咱得改吧。改成多线程,多进程嘛。
另外再啰嗦一句,经小弟实测,改之后,效率真是快很多。。。。
不再啰嗦,直接上代码吧。

1 #!/usr/bin/perl
2 use DBI;
3 use Switch;
4 use strict;
5 use Net::HandlerSocket;
6 use threads;
7 use Time::HiRes 'time';
8
9 my $source_name = "##ODBC配置##";
10 my $source_user_name = "##隐去的MSSQL数据库用户名##";
11 my $source_user_psd = '##隐去的MSSQL数据库密码##';
12
13 my $aim_ip='##隐去的目标MySQL数据库IP##';
14 my $aim_db_name = "##隐去的目标MySQL数据库名##";
15 ##关于HandlerSocket的安装,配置神马的,小编不想啰嗦,网上很多现成的东西。只是很纠结,以前HandlerSocket有一个很好的官网,不晓得怎么回事,被干了~~
16 my $hs_port = 9999;
17
18 my $dbh=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd);
19 #获取所有的用户表,不导有地理字段的表
20 #地理字段的表,数据量不大,使用前边那个单线程的东西就OK了
21 my $sth=$dbh->prepare("select name,object_id from sys.all_objects ao where type='U' and not exists(
22 select 1 from sys.all_columns col where col.object_id=ao.object_id and system_type_id=240)");
23 $sth->execute();
24
25 ##线程数。。。这个很纠结,小弟的服务器,在导的时候,5个线程以上,服务器会挂起~~~~
26 ##于是想想,把线程改成进程,都不行~~哎!看来我这基础知识还差啊。
27 my $threads_cnt=(not defined $ARGV[0])?5:$ARGV[0];
28 ##每次导的记录数
29 my $per_records=(not defined $ARGV[1])?3000:$ARGV[1];
30
31 my @data;
32 while (@data=$sth->fetchrow_array())
33 {
34 ##测试时用
35 # @data=$sth->fetchrow_array();
36 my ($select_columns,$insert_columns,$column_count,$sort_column,$column_types);
37 #获取某个表的列,并构建 查询,插入,列总数,列类型
38 ##输入参数如下:
39 ###data[0]:表名,data[1]:对像ID
40 ##返回参数描述如下:
41 ###$select_columns:构建SELECT的时候,列字符串
42 ###$insert_columns:构建insert的时候,列字符串。之所以要把这两分开,因为有些类型在select的时候,会用到列属性方法,例如geometry.STAsText()
43 ###$column_count:列数,其实可以从@$column_types得到,但@$columns_types是后边加的,此参数也就没有去掉
44 ###$sort_column:用来排序的字段,因为总结了一下,一般第一个字段都是标识字段,主键,因此,这里只取的第一个字段
45 ###$columns_types:列的类型列表,一个数组。因为sql server里边的某些类型的值,在进mysql的时候,需要做处理,例如geometry
46 #######另外再啰嗦一句,很少见有能同时返回多个值的东东(当然,可能是我把C#忘得差不多了)
47 ($select_columns,$insert_columns,$column_count,$sort_column,$column_types)=get_columns($data[0],$data[1]);
48 #查询结果。如果是导入失败,会返回False,否则为空
49 ##传入参数?说明请参照上边的输出参数
50 my $relt = export_data_in ($select_columns,$insert_columns,$column_count,$sort_column,$data[0],$column_types);
51
52 }
53
54
55
56 $dbh->disconnect;
57
58
59 #插入数据
60 sub export_data_in
61 {
62 #构建SQL
63 my($select_columns,$insert_columns,$columns_count,$sort_column,$table_name,$column_types) = @_;
64 my $rows_count=0;
65 my $dbh2=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd);
66 my $sth_sc=$dbh2->prepare("select count(1) from $table_name");
67 $sth_sc->execute();
68 my @data_count=$sth_sc->fetchrow_array();
69 my $begin_cnt = 0;
70 ##这里不-1,会报21
71 my $end_cnt = $per_records - 1;
72 while($begin_cnt <= @data_count[0])
73 {
74 my @threads;
75 ##循环开启导数据线程
76 for(my $count=1;$count<=$threads_cnt;$count++)
77 {
78 ##基本,下边的SQL语句成了本程序最大的性能瓶颈了。小弟的测试中,前1000W条数据还好,但,在1000W条之后,此SQL语句的查询性能急剧下降,当然,小弟是在远程测试上边导的,
79 ##与没有对MSSQL数据库进行性能优化也有很大关系。。
80 my $sql_select="select *
81 FROM
82 (
83 SELECT $select_columns,ROW_NUMBER() OVER (ORDER BY $sort_column) AS RowNum
84 FROM $table_name
85 ) as t
86 where t.RowNum BETWEEN $begin_cnt and $end_cnt";
87
88 print "exporting data $table_name;total:@data_count[0];now:$begin_cnt \n";
89 ##开线程。参数请参照上边的描述
90 my $res0=threads->new(\&export_data, $table_name,$sql_select,$insert_columns,$columns_count,$column_types);
91 push(@threads,$res0);
92 $begin_cnt = $begin_cnt + $per_records;
93 $end_cnt = $end_cnt + $per_records;
94 }
95 ##回收
96 foreach(@threads)
97 {
98 $_->join;
99 }
100 }
101 # $dbh2->disconnect;
102 }
103
104 sub export_data
105 {
106 my $startTime=time;
107 my ($table_name,$sql_select,$insert_columns,$columns_count,$column_types)=@_;
108 my $dbh_mssql=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd);
109
110 my $sth_select=$dbh_mssql->prepare($sql_select);
111 $sth_select->execute();
112 $sth_select->{LongTruncOk}=1;
113 ##生成标识ID
114 my $gid=rand(3200);
115 my $data_str="";
116
117 my $select_data;
118 ##还是改成fetchrow_arrayref(),小弟测试了下,这个的速度,真不是之前fetchrow_array能比的
119 ##另外,我这种拼handlerSocket多条插入语句的方式,应该还是有点问题。我记得有一种更优化的方式是:insert,但由于找不到资料,只好作罢。
120 while($select_data=$sth_select->fetchrow_arrayref())
121 {
122 if($data_str ne "")
123 {
124 $data_str="$data_str,";
125 }
126
127 $data_str=$data_str."[$gid,'+',['".join("','",@{$select_data})."']]";
128
129 }
130 printf("读出时间%.1f seconds.\n",time-$startTime);
131 $startTime=time;
132 ##测试的时候,查看数据的语句。
133 # print "\n",$data_str,"\n";
134 if($data_str ne "")
135 {
136 $data_str="[$data_str]";
137 my $args = { host => $aim_ip, port => $hs_port };
138 my $hs = new Net::HandlerSocket($args);
139 my $res = $hs->open_index($gid, $aim_db_name, $table_name, 'PRIMARY', "$insert_columns");
140 die $hs->get_error() if $res != 0;
141 ##这里不加EVAL不行的,不信?您试
142 $res = $hs->execute_multi(eval($data_str));
143 die $hs->get_error() if $hs->get_error() != 0;
144 $hs->close();
145 }
146 undef $data_str;
147 printf("写入时间%.1f seconds.\n",time-$startTime);
148
149 ##这里啰嗦一下,也给大家展示一下我的结果 ^-^
150 # exporting data t_p_areagroup_plate_userdiy_l;total:42758121;now:12825000
151 # exporting data t_p_areagroup_plate_userdiy_l;total:42758121;now:12830000
152 # exporting data t_p_areagroup_plate_userdiy_l;total:42758121;now:12835000
153 # exporting data t_p_areagroup_plate_userdiy_l;total:42758121;now:12840000
154 # exporting data t_p_areagroup_plate_userdiy_l;total:42758121;now:12845000
155 # 读出时间18.9 seconds.
156 # 写入时间1.3 seconds.
157 # 读出时间23.3 seconds.
158 # 写入时间1.4 seconds.
159 # 读出时间23.7 seconds.
160 # 写入时间1.1 seconds.
161 # 读出时间25.6 seconds.
162 # 写入时间0.6 seconds.
163 # 读出时间25.6 seconds.
164 # 写入时间0.9 seconds.
165 ##怎么样,写的速度够快吧,这就是TMD HandlerSocket,而且,还不用去考滤锁。
166 }
167
168 sub get_columns
169 {
170 print "loading columns of $_[0] \n";
171 my $sql="select col.name,tp.name from sys.all_columns col
172 inner join sys.types tp on col.system_type_id=tp.system_type_id and col.user_type_id=tp.user_type_id
173 where object_id=$_[1]";
174 my $dbh2=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd);
175 my $cols=$dbh2 -> prepare($sql);
176 $cols->execute();
177 my $cols_insert = "";
178 my $cols_select = "";
179 my $cols_count = 0;
180 my $sort_column="";
181 my @cols_types;
182 my @col;
183 while(@col= $cols->fetchrow_array())
184 {
185 my ($col_name,$type_name)=@col;
186 @cols_types[$cols_count]=$type_name;
187 if($cols_count>0)
188 {
189 $cols_insert="$cols_insert,";
190 $cols_select="$cols_select ,";
191 }
192 else
193 {
194 $sort_column="[$col_name]";
195 }
196 if($type_name eq "hierarchyid")
197 {
198 $cols_select = "$cols_select [$col_name].ToString() as $col_name";
199 $cols_insert = "$cols_insert$col_name";
200 }
201 else
202 {
203 $cols_select="$cols_select [$col_name]";
204 $cols_insert = "$cols_insert$col_name";
205 }
206 $cols_count++;
207 }
208 $dbh2->disconnect;
209 ($cols_select,$cols_insert,$cols_count,$sort_column,\@cols_types);
210 }
211
212
调用方法(将运行结果放到out.log):
1 nohup perl export_data_muti_thread_v0.5.pl 10 5000 > out.log &
另外再啰嗦一句。。。cnblogs的回复真不多,哪怕是拍砖也好呀。别这么死气沉沉的。 |
|