while( 1 )
{
# join all threads which can be joined
foreach ( threads->list(threads::joinable) )
{
$_->join( );
}
# if there are no url need process.
my $item = $queue->pending();
if( $item == 0 )
{
# there are no active thread, we finish the job
if( threads->list(threads::running) == 0 )
{
print "All done!n";
last;
}
# we will get some more url if there are some active threads, just wait for them
else
{
sleep 1;
next;
}
}
# if there are some url need process
while( $semaphore->down )
{
threads->create( &ProcessUrl );
}
}
# join all threads which can be joined
foreach ( threads->list() )
{
$_->join( );
}
sub ProcessUrl
{
my $scraper = scraper
{
process '//a', 'links[]' => '@href';
};
my $res;
my $link;
while( my $url = $queue->dequeue_nb() )
{
eval
{
$res = $scraper->scrape( URI->new($url) )->{'links'};
};
if( $@ )
{
warn "$@\n";
next;
}
next if (! defined $res );
#print "there are ".scalar(threads->list(threads::running))." threads, ", $queue->pending(), " urls need process.n";