设为首页 收藏本站
查看: 1084|回复: 0

[经验分享] Perl语言的多线程(一)

[复制链接]

尚未签到

发表于 2015-12-25 15:58:12 | 显示全部楼层 |阅读模式
  Perl中的多线程的实现一般有两种办法,而老版本的办法实际上是一种多进程的办法。
  一   Thread->New
  该办法是传统的老办法,它与folk很类似,新建一个进程时,会把当前内存空间的所有变量都复制一份传到新的进程里面。已实现共享数据。而随着技术的发展,本文不针对该方法做深入研究。
  二   IThread
  这种方法是通过新建一个新的perl interpreter。 默认情况下,所有的数据和变量是不被线程共享的。 如果想共享一个变量,需通过threads::shared来实现。在使用此方法的时候,需要注意以下三点:

  • 变量默认是不在线程中共享的。
  • 通过"use threads"引用命名空间,不能通过 eval, do, 或者 require。
  • 如果有变量需要共享,必须引用"threads::shared"。 并在定义变量的时候如下:
  my $var1 : shared = "value";
  以下是一个简单的使用perl 多线程的例子。
  

DSC0000.gif DSC0001.gif Code
#!/usr/local/bin/perl   
use threads;   

@domain   =   ("tom.com",   "chinadns.com",   "163.com",   "aol.com");   
for ($i=0;$i<4;$i++)
{   
    print   $i.'.'.$domain[$i].'     ';   
}   
print   "\n";   
   
my   $thr0   =   threads->new(\&checkwhois,   '0',   $domain[0]);   
my   $thr1   =   threads->new(\&checkwhois,   '1',   $domain[1]);   
my   $thr2   =   threads->new(\&checkwhois,   '2',   $domain[2]);   
my   $thr3   =   threads->new(\&checkwhois,   '3',   $domain[3]);   
   
sub   checkwhois()   
{   
    my ($l,$r)=@_;   
    my $i=0;   
    while($i<1000000)   
    {   
          $i*$i;   
          $i++;   
    }   
    print   "done  --$l\t\n";   
    print   $l.$r."   query   successful!   \n";   
}

$thr0->join;  
$thr1->join;   
$thr2->join;   
$thr3->join;   
  这个简单的perl主要是新建了4个子线程去做不同的事情,然后调用join方法等待他们执行完成并让线程自动回收。但有时,还是需要结合folk 做一些复杂的工作,下面是关于这个的例外一个demo。
  

Code
use strict;
use English;
use threads;
use threads::shared;

my $items = 20;
my $maxchild = 65;
my $pid;
my $forks : shared = 1;

print "startn\n";

my $item : shared = 0;
my $myid = 1;
my $main_pid = $PID;

print "$main_pid \n";

sub Process
{
    my $sid;
   
    {
        lock($item);
        $item++ if ($item < $items);
    }
   
    if($sid < $items)
    {
        print "Child process ($PID/$myid) start : $sid/$forks\n";
        print "$sid \n";
        sleep(1);
        print "Child process ($PID/$myid) end : $sid/$forks\n";
        return 1;
    }
    elsif($main_pid == $PID)
    {
        wait;
        exit 1;
    }
    else
    {
        print "Child process ($PID/$myid) exit : $sid/$forks\n";
        exit 1;
    }
}

while($item < $items)
{
    if(($forks < $maxchild) && ($PID == $main_pid))
    {
        if($pid = fork)
        {
            $| = 1;
            $forks ++;
            $myid++;
            print "Starting Sub Process : ($pid/$PID)\n";
        }
        elsif(defined $pid)
        {
            $| = 1;
            last unless (Process);
        }
        else
        {
            die "cann't fork: $!\n";
        }
    }
}  
  该实例使用了folk 和共享数据等比较高级的用法。
  在本文最后,给一个比较留下的perl 多线程的例子:上传文件到文件服务器ftp。
  

Code
#use strict;
use File::Copy;
use File::stat;
use File::Find;
use Net::FTP;
use threads;
use threads::shared;

my $maxthread=20;
# all running threads.
my $CurrentThreads : shared = 0;
# total files
my $total_files : shared = 0;
# succeed files
my $processed_files : shared = 0;
# skip files
my $skipped_files : shared = 0;
# ftp retry times
my $ftp_retrytimes : shared = 3;
# whether upload all the files or not, -1 indecate no and 1 indicate yes.
my $g_isAllFiles_uploadSuccess : shared = 1;

my $ftp_server="";
my $ftp_dir="";
my $ftp_uid="";
my $ftp_pw="";
my $ftp_timeout = 1800;
my $ftp_debug=0;
my @src_dir_files=();
my @src_dir_NameListFile=();
my @wc_exclude=("_vti", ".lob", "\\bak", "\\data", "server.inc");

my $logFileName = 'upload.log';
my $log_cnt=0;
my $span=0;

my $start_date = TimeString(time);
print $start_date . "\n";
my $g_uploadSuccess = 1;
my $g_strLastError="";

################################################################################
################ Convert between "\"(backlash) and "/"  ########################
################################################################################
sub BacklashToLash
{
    my ($s) = @_;
    $s = s/\\/\//gis;
    return $s;
}

sub LashToBacklash
{
    my ($s) = @_;
    $s = s/\//\\/gis;
    return $s;
}

################################################################################
####################### format the time strings  ###############################
################################################################################
sub TimeString
{
    my ($tm) = @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
    return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
}

sub ShortTimeString
{
    my ($tm) = @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
    return sprintf("%04d-%02d-%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min);
}

sub ScanDate
{
    # scan the date format "2009-03-29 09:09:51"
    my ($date) = @_;
    my ($year, $month, $day, $hour, $minute, $seconds);
   
    $year = substr($date, 0, 4);
    $month = substr($date, 5, 2);
    $day = substr($date, 8, 2);
    $hour = substr($date, 11, 2);
    $minute = substr($date, 14, 2);
    $seconds = substr($date, 17, 2);

    return ($year, $month, $day, $hour, $minute, $seconds);
}

################################################################################
############### get the directory of current file name  ########################
################################################################################
sub GetDirFromFileName
{
    my ($s) = @_;
    my $pos = rindex($s, "\\");
    return substr($s, 0, $pos);
}

################################################################################
######################## log method to log files  ##############################
################################################################################
my $HLOG;
sub LOG
{
    my ($text) = @_;
    my $time = TimeString(time);
   
    my $LOG_STEP = 10;
    FlushLogFile() if ($log_cnt % $LOG_STEP) == 0 or $log_cnt == 0;
    $log_cnt ++;
    print HLOG "[$time] $text\n";
}

sub OpenLogFile
{
    CloseLogFile();
    open(HLOG, ">>$logFileName") or die ("Open file error.");  
}

sub CloseLogFile
{
    close(HLOG) if defined HLOG;
}

sub FlushLogFile
{
    CloseLogFile();
    OpenLogFile();
}

################################################################################
########################   Process File method    ##############################
################################################################################
sub ProcessFile
{
    # The total thread number add one
    {
        lock($CurrentThreads);
        $CurrentThreads++;
    }
   
    # get the thread
    my ($srcThread, $dstThread, $dstdirThread) = @_;
   
    # Increase file number.
    {
        lock($total_files);
        $total_files++;
        LOG("Processing $total_files \"$srcThread\" DSC0002.gif ");
    }
   
    my $need_upload = 0;
    my $bPutResult = 0;
   
    my $t1 = $lookup{$srcThread};
    my $t2 = TimeString(stat($srcThread)->mtime);
   
    if(not defined $t1)
    {
        $lookup{$srcThread} = $t2;
        $need_upload = 1;
    }
    else
    {
        # time longer than 5
        my $delta_sec = 10;
        $need_upload = 1 if $delta_sec > 5;
    }
   
    if($need_upload > 0)
    {        
        for(my $nProcessIndex = 1; $nProcessIndex < $ftp_retrytimes; $nProcessIndex++)
        {
            my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
            if($@)
            {
                $g_strLastError = "Can't connect to the FTP server, the reason: " . $@;
                LOG("$g_strLastError\n");
            }
            else
            {
                $ftp->binary;
                LOG("The $nProcessIndex time to try upload file from \"$srcThread\" to \"$dstThread\". Current total thread number is $CurrentThreads");
               
                {
                    $bPutResult = 0;
                    $ftp->mkdir($dstdirThread, 1);
                    $ftp->put($srcThread, $dstThread) or $bPutResult = -1;
                }
               
                if($bPutResult < 0)
                {
                    LOG("The $nProcessIndex time to try upload file FAILED from \"$srcThread\" to \"$dstThread\" (des-dir : \"$dstdirThread\").");
                    if($@)
                    {
                        LOG("The reason is $@ \n");
                    }
                }
                else
                {
                    LOG("The $nProcessIndex time to try upload file SUCCEED from \"$srcThread\" to \"$dstThread\"");
                    {
                        lock($processed_files);
                        $processed_files++;
                    }
                    
                    #close the connect
                    $ftp->quit() if ($ftp);
                    last;
                }
            }
            $ftp->quit() if ($ftp);
        }
        
        if($bPutResult < 0)
        {
            # failed for $ftp_retrytimes and skipp
            {
                lock($skipped_files);
                $skipped_files ++;
                lock($g_isAllFiles_uploadSuccess);
                $g_isAllFiles_uploadSuccess = -1;
            }
        }
    }
    else
    {
        # skipp
        {
            lock($skipped_files);
            $skipped_files ++;
        }
    }
   
    # decrease current thread
    {
        lock($CurrentThreads);
        $CurrentThreads--;
    }
}

sub ProcessFiles
{
    my $srcdir = LashToBacklash($File::Find::dir);
    my $srcpath = LashToBacklash($File::Find::name);
    my $base = LashToBacklash($File::Find::topdir);
   
    foreach my $exclude (@wc_exclude)
    {
        if(index($srcpath, $exclude) > -1)
        {
            $File::Find::prune = 1 if -d $srcpath;
            return;
        }
    }
   
    if(-d $srcpath)
    {
        return;
    }
   
    my $dstdir = $srcdir;
    my $dstpath = $srcpath;
    $dstdir =~ s{\Q$base\E}{$ftp_dir}is;
    $dstpath =~ s{\Q$base\E}{$ftp_dir}is;
    $dstdir = BacklashToLash($dstdir);
    $dstpath = BacklashToLash($dstpath);

    # old way. one by one
    # processFile($srcpath, $dstpath, $detdir);
   
    # new way  threads
    while(1)
    {
        if($CurrentThreads < $maxthread)   
        {
            my $thread = threads->create('ProcessFile', $srcpath, $dstpath, $detdir);
            push(@$self, \$thread);
            $thread->detach();
        }
        else
        {
            LOG("-sleep 1 second");
            sleep 1;
        }
    }
}
################################################################################
########################     Main GOES HERE      ###############################
################################################################################

# step 1: try to login the ftp.
$start_date = time();
LOG("Connecting to the ftp server($ftp_server)");
my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
    $g_strLastError = "Can't connect to the FTP server, the reason: " . $@;
    LOG("$g_strLastError\n");
    $g_uploadSuccess = -1;
}
else
{
    $ftp->login($ftp_uid, $ftp_pw);
    if($@)
    {
        $g_strLastError = "Can't login to the FTP server, the reason: " . $@;
        LOG("$g_strLastError\n");
        $g_uploadSuccess = -1;
    }
    else
    {
        LOG("Connect ftp server successful!");
        $ftp->quit();
        
        # step 2: upload the files
        my %lookup;
        LOG("Start to upload files in directory(@src_dir_files)");
        find(\&ProcessFiles, @src_dir_files);
        LOG("The directoty(@src_dir_files) have been completed. The result: ");
        
        foreach my $thread (@$self)
        {
            print("Joining thread\n");
            $$thread->join();
        }
        
        #step 3:
        if($g_isAllFiles_uploadSuccess > 0)
        {
            LOG("+==================================================================+");
            LOG("Start to upload files in directory(@src_dir_NameListFile)");
            find(\&ProcessFiles, @src_dir_NameListFile);
            LOG("The directoty(@src_dir_NameListFile) have been completed. The result: ");
        
            foreach my $thread (@$self)
            {
                print("Joining thread\n");
                $$thread->join();
            }
            LOG("The directory (@rc_dir_NameListFile) has been completed.");
            LOG("+==================================================================+");
        }
        else
        {
            LOG("+==================================================================+");
            LOG("These files will not be upload for directory(@src_dir_files) failed.");
            LOG("+==================================================================+");
        }
        
        #Step 4: log time
        $span = time() - $start_date;
        LOG("Upload succeed! \nTime:$span second. the total files is $total_files. \
        \nSucceed are $processed_files and skipped are $skipped_files.\n");
    }
   
    CloseLogFile();
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-156317-1-1.html 上篇帖子: perl正则表达式[转] 下篇帖子: [转] perl 半角 全角 转换
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表