scuess 发表于 2015-12-25 15:58:12

Perl语言的多线程(一)

  Perl中的多线程的实现一般有两种办法,而老版本的办法实际上是一种多进程的办法。
  一   Thread->New
  该办法是传统的老办法,它与folk很类似,新建一个进程时,会把当前内存空间的所有变量都复制一份传到新的进程里面。已实现共享数据。而随着技术的发展,本文不针对该方法做深入研究。
  二   IThread
  这种方法是通过新建一个新的perl interpreter。 默认情况下,所有的数据和变量是不被线程共享的。 如果想共享一个变量,需通过threads::shared来实现。在使用此方法的时候,需要注意以下三点:

[*]变量默认是不在线程中共享的。
[*]通过"use threads"引用命名空间,不能通过 eval, do, 或者 require。
[*]如果有变量需要共享,必须引用"threads::shared"。 并在定义变量的时候如下:
  my $var1 : shared = "value";
  以下是一个简单的使用perl 多线程的例子。
  

Code
#!/usr/local/bin/perl   
use threads;   

@domain   =   ("tom.com",   "chinadns.com",   "163.com",   "aol.com");   
for ($i=0;$i<4;$i++)
{   
    print   $i.'.'.$domain[$i].'   ';   
}   
print   "\n";   
   
my   $thr0   =   threads->new(\&checkwhois,   '0',   $domain);   
my   $thr1   =   threads->new(\&checkwhois,   '1',   $domain);   
my   $thr2   =   threads->new(\&checkwhois,   '2',   $domain);   
my   $thr3   =   threads->new(\&checkwhois,   '3',   $domain);   
   
sub   checkwhois()   
{   
    my ($l,$r)=@_;   
    my $i=0;   
    while($i<1000000)   
    {   
          $i*$i;   
          $i++;   
    }   
    print   "done--$l\t\n";   
    print   $l.$r."   query   successful!   \n";   
}

$thr0->join;
$thr1->join;   
$thr2->join;   
$thr3->join;   
  这个简单的perl主要是新建了4个子线程去做不同的事情,然后调用join方法等待他们执行完成并让线程自动回收。但有时,还是需要结合folk 做一些复杂的工作,下面是关于这个的例外一个demo。
  

Code
use strict;
use English;
use threads;
use threads::shared;

my $items = 20;
my $maxchild = 65;
my $pid;
my $forks : shared = 1;

print "startn\n";

my $item : shared = 0;
my $myid = 1;
my $main_pid = $PID;

print "$main_pid \n";

sub Process
{
    my $sid;
   
    {
      lock($item);
      $item++ if ($item < $items);
    }
   
    if($sid < $items)
    {
      print "Child process ($PID/$myid) start : $sid/$forks\n";
      print "$sid \n";
      sleep(1);
      print "Child process ($PID/$myid) end : $sid/$forks\n";
      return 1;
    }
    elsif($main_pid == $PID)
    {
      wait;
      exit 1;
    }
    else
    {
      print "Child process ($PID/$myid) exit : $sid/$forks\n";
      exit 1;
    }
}

while($item < $items)
{
    if(($forks < $maxchild) && ($PID == $main_pid))
    {
      if($pid = fork)
      {
            $| = 1;
            $forks ++;
            $myid++;
            print "Starting Sub Process : ($pid/$PID)\n";
      }
      elsif(defined $pid)
      {
            $| = 1;
            last unless (Process);
      }
      else
      {
            die "cann't fork: $!\n";
      }
    }
}  
  该实例使用了folk 和共享数据等比较高级的用法。
  在本文最后,给一个比较留下的perl 多线程的例子:上传文件到文件服务器ftp。
  

Code
#use strict;
use File::Copy;
use File::stat;
use File::Find;
use Net::FTP;
use threads;
use threads::shared;

my $maxthread=20;
# all running threads.
my $CurrentThreads : shared = 0;
# total files
my $total_files : shared = 0;
# succeed files
my $processed_files : shared = 0;
# skip files
my $skipped_files : shared = 0;
# ftp retry times
my $ftp_retrytimes : shared = 3;
# whether upload all the files or not, -1 indecate no and 1 indicate yes.
my $g_isAllFiles_uploadSuccess : shared = 1;

my $ftp_server="";
my $ftp_dir="";
my $ftp_uid="";
my $ftp_pw="";
my $ftp_timeout = 1800;
my $ftp_debug=0;
my @src_dir_files=();
my @src_dir_NameListFile=();
my @wc_exclude=("_vti", ".lob", "\\bak", "\\data", "server.inc");

my $logFileName = 'upload.log';
my $log_cnt=0;
my $span=0;

my $start_date = TimeString(time);
print $start_date . "\n";
my $g_uploadSuccess = 1;
my $g_strLastError="";

################################################################################
################ Convert between "\"(backlash) and "/"########################
################################################################################
sub BacklashToLash
{
    my ($s) = @_;
    $s = s/\\/\//gis;
    return $s;
}

sub LashToBacklash
{
    my ($s) = @_;
    $s = s/\//\\/gis;
    return $s;
}

################################################################################
####################### format the time strings###############################
################################################################################
sub TimeString
{
    my ($tm) = @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
    return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
}

sub ShortTimeString
{
    my ($tm) = @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
    return sprintf("%04d-%02d-%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min);
}

sub ScanDate
{
    # scan the date format "2009-03-29 09:09:51"
    my ($date) = @_;
    my ($year, $month, $day, $hour, $minute, $seconds);
   
    $year = substr($date, 0, 4);
    $month = substr($date, 5, 2);
    $day = substr($date, 8, 2);
    $hour = substr($date, 11, 2);
    $minute = substr($date, 14, 2);
    $seconds = substr($date, 17, 2);

    return ($year, $month, $day, $hour, $minute, $seconds);
}

################################################################################
############### get the directory of current file name########################
################################################################################
sub GetDirFromFileName
{
    my ($s) = @_;
    my $pos = rindex($s, "\\");
    return substr($s, 0, $pos);
}

################################################################################
######################## log method to log files##############################
################################################################################
my $HLOG;
sub LOG
{
    my ($text) = @_;
    my $time = TimeString(time);
   
    my $LOG_STEP = 10;
    FlushLogFile() if ($log_cnt % $LOG_STEP) == 0 or $log_cnt == 0;
    $log_cnt ++;
    print HLOG "[$time] $text\n";
}

sub OpenLogFile
{
    CloseLogFile();
    open(HLOG, ">>$logFileName") or die ("Open file error.");
}

sub CloseLogFile
{
    close(HLOG) if defined HLOG;
}

sub FlushLogFile
{
    CloseLogFile();
    OpenLogFile();
}

################################################################################
########################   Process File method    ##############################
################################################################################
sub ProcessFile
{
    # The total thread number add one
    {
      lock($CurrentThreads);
      $CurrentThreads++;
    }
   
    # get the thread
    my ($srcThread, $dstThread, $dstdirThread) = @_;
   
    # Increase file number.
    {
      lock($total_files);
      $total_files++;
      LOG("Processing $total_files \"$srcThread\" ");
    }
   
    my $need_upload = 0;
    my $bPutResult = 0;
   
    my $t1 = $lookup{$srcThread};
    my $t2 = TimeString(stat($srcThread)->mtime);
   
    if(not defined $t1)
    {
      $lookup{$srcThread} = $t2;
      $need_upload = 1;
    }
    else
    {
      # time longer than 5
      my $delta_sec = 10;
      $need_upload = 1 if $delta_sec > 5;
    }
   
    if($need_upload > 0)
    {      
      for(my $nProcessIndex = 1; $nProcessIndex < $ftp_retrytimes; $nProcessIndex++)
      {
            my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
            if($@)
            {
                $g_strLastError = "Can't connect to the FTP server, the reason: " . $@;
                LOG("$g_strLastError\n");
            }
            else
            {
                $ftp->binary;
                LOG("The $nProcessIndex time to try upload file from \"$srcThread\" to \"$dstThread\". Current total thread number is $CurrentThreads");
               
                {
                  $bPutResult = 0;
                  $ftp->mkdir($dstdirThread, 1);
                  $ftp->put($srcThread, $dstThread) or $bPutResult = -1;
                }
               
                if($bPutResult < 0)
                {
                  LOG("The $nProcessIndex time to try upload file FAILED from \"$srcThread\" to \"$dstThread\" (des-dir : \"$dstdirThread\").");
                  if($@)
                  {
                        LOG("The reason is $@ \n");
                  }
                }
                else
                {
                  LOG("The $nProcessIndex time to try upload file SUCCEED from \"$srcThread\" to \"$dstThread\"");
                  {
                        lock($processed_files);
                        $processed_files++;
                  }
                  
                  #close the connect
                  $ftp->quit() if ($ftp);
                  last;
                }
            }
            $ftp->quit() if ($ftp);
      }
      
      if($bPutResult < 0)
      {
            # failed for $ftp_retrytimes and skipp
            {
                lock($skipped_files);
                $skipped_files ++;
                lock($g_isAllFiles_uploadSuccess);
                $g_isAllFiles_uploadSuccess = -1;
            }
      }
    }
    else
    {
      # skipp
      {
            lock($skipped_files);
            $skipped_files ++;
      }
    }
   
    # decrease current thread
    {
      lock($CurrentThreads);
      $CurrentThreads--;
    }
}

sub ProcessFiles
{
    my $srcdir = LashToBacklash($File::Find::dir);
    my $srcpath = LashToBacklash($File::Find::name);
    my $base = LashToBacklash($File::Find::topdir);
   
    foreach my $exclude (@wc_exclude)
    {
      if(index($srcpath, $exclude) > -1)
      {
            $File::Find::prune = 1 if -d $srcpath;
            return;
      }
    }
   
    if(-d $srcpath)
    {
      return;
    }
   
    my $dstdir = $srcdir;
    my $dstpath = $srcpath;
    $dstdir =~ s{\Q$base\E}{$ftp_dir}is;
    $dstpath =~ s{\Q$base\E}{$ftp_dir}is;
    $dstdir = BacklashToLash($dstdir);
    $dstpath = BacklashToLash($dstpath);

    # old way. one by one
    # processFile($srcpath, $dstpath, $detdir);
   
    # new waythreads
    while(1)
    {
      if($CurrentThreads < $maxthread)   
      {
            my $thread = threads->create('ProcessFile', $srcpath, $dstpath, $detdir);
            push(@$self, \$thread);
            $thread->detach();
      }
      else
      {
            LOG("-sleep 1 second");
            sleep 1;
      }
    }
}
################################################################################
########################   Main GOES HERE      ###############################
################################################################################

# step 1: try to login the ftp.
$start_date = time();
LOG("Connecting to the ftp server($ftp_server)");
my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
    $g_strLastError = "Can't connect to the FTP server, the reason: " . $@;
    LOG("$g_strLastError\n");
    $g_uploadSuccess = -1;
}
else
{
    $ftp->login($ftp_uid, $ftp_pw);
    if($@)
    {
      $g_strLastError = "Can't login to the FTP server, the reason: " . $@;
      LOG("$g_strLastError\n");
      $g_uploadSuccess = -1;
    }
    else
    {
      LOG("Connect ftp server successful!");
      $ftp->quit();
      
      # step 2: upload the files
      my %lookup;
      LOG("Start to upload files in directory(@src_dir_files)");
      find(\&ProcessFiles, @src_dir_files);
      LOG("The directoty(@src_dir_files) have been completed. The result: ");
      
      foreach my $thread (@$self)
      {
            print("Joining thread\n");
            $$thread->join();
      }
      
      #step 3:
      if($g_isAllFiles_uploadSuccess > 0)
      {
            LOG("+==================================================================+");
            LOG("Start to upload files in directory(@src_dir_NameListFile)");
            find(\&ProcessFiles, @src_dir_NameListFile);
            LOG("The directoty(@src_dir_NameListFile) have been completed. The result: ");
      
            foreach my $thread (@$self)
            {
                print("Joining thread\n");
                $$thread->join();
            }
            LOG("The directory (@rc_dir_NameListFile) has been completed.");
            LOG("+==================================================================+");
      }
      else
      {
            LOG("+==================================================================+");
            LOG("These files will not be upload for directory(@src_dir_files) failed.");
            LOG("+==================================================================+");
      }
      
      #Step 4: log time
      $span = time() - $start_date;
      LOG("Upload succeed! \nTime:$span second. the total files is $total_files. \
      \nSucceed are $processed_files and skipped are $skipped_files.\n");
    }
   
    CloseLogFile();
}
页: [1]
查看完整版本: Perl语言的多线程(一)