Perl 多線程理解
Thread:在使用多線程處理比較大的數據量的掃描,遇到讀寫文件可能死鎖的問題。
Perl 線程的生命周期
1.使用 threads 包的 create() 方法:use threads;
sub say_hello {
printf("Hello thread! @_.\n");
return( rand(10) );
}
my $t1 = threads->create( \&say_hello, "param1", "param2" );
my $t2 = threads->create( "say_hello", "param3", "param4" );
my $t3 = threads->create(
sub {
printf("Hello thread! @_\n");
return( rand(10) );
}, "param5", "param6" );
線程一旦被成功創建,它就立刻開始運行了,這個時候你面臨兩種選擇,分別是 join 或者 detach 這個新建線程。當然你也可以什么都不做,不過這可不是一個好習慣、
從字面上來理解,join 就是把新創建的線程結合到當前的主線程中來,把它當成是主線程的一部分,使他們合二為一。join 會觸發兩個動作,首先,主線程會索取新建線程執行結束以后的返回值;其次,新建線程在執行完畢并返回結果以后會自動釋放它自己所占用的系統資源。例如
join收割新建線程
#!/usr/bin/perl
#
use threads;
sub func {
sleep(1);
return(rand(10));
}
my $t1 = threads->create( \&func );
my $t2 = threads->create( \&func );
printf("do something in the main thread\n");
my $t1_res = $t1->join();
my $t2_res = $t2->join();
printf("t1_res = $t1_res\nt2_res = $t2_res\n"); 由此我們不難發現,調用 join 的時機是一個十分有趣的問題。如果調用 join 方法太早,新建線程尚未執行完畢,自然就無法返回任何結果,那么這個時候,主線程就不得不被阻塞,直到新建線程執行完畢之后,才能獲得返回值,然后資源會被釋放,join 才能結束,這在很大程度上破話了線程之間的并行性。相反,如果調用 join 方法太晚,新建線程早已執行完畢,由于一直沒有機會返回結果,它所占用的資源就一直無法得到釋放,直到被 join 為止,這在很大程度上浪費了寶貴的系統資源。因此,join 新建線程的最好時機應該是在它剛剛執行完畢的時候,這樣既不會阻塞當前線程的執行,又可以及時釋放新建線程所占用的系統資源。
foreach ( threads->list(threads::joinable) ){
$_->join( );
}
我們再來看看 detach 方法,這也許是最省心省力的處理方法了。從字面上來理解,detach 就是把新創建的線程與當前的主線程剝離開來,讓它從此和主線程無關。當你使用 detach 方法的時候,表明主線程并不關心新建線程執行以后返回的結果,新建線程執行完畢后 Perl 會自動釋放它所占用的資源。
detach剝離線程
#!/usr/bin/perl
#
use threads;
use Config;
sub say_hello {
my ( $name ) = @_;
printf("Hello World! I am $name.\n");
}
my $t1 = threads->create( \&say_hello, "Alex" );
$t1->detach();
printf("doing something in main thread\n");
sleep(1);
3.線程的消亡
大多數情況下,你希望你創建的線程正常退出,這就意味著線程所對應的函數體在執行完畢后返回并釋放資源。例如在清單 5 的示例中,新建線程被 join 以后的退出過程。可是,如果由于 detach 不當或者由于主線因某些意外的異常提前結束了,盡管它所創建的線程可能尚未執行完畢,但是他們還是會被強制中止,正所謂皮之不存,毛將焉附。這時你也許會得到一個類似于“Perl exited with active threads”的警告。
當然,你也可以顯示地調用 exit() 方法來結束一個線程,不過值得注意的是,默認情況下,如果你在一個線程中調用了 exit() 方法, 其他線程都會隨之一起結束,在很多情況下,這也許不是你想要的,如果你希望 exit() 方法只在調用它的線程內生效,那么你在創建該線程的時候就需要設置’ exit ’ => ’ thread_only ’。例如
為某個線程設置exit屬性#!/usr/bin/perl
#
use threads;
sub say_hello {
printf("Hello thread! @_.\n");
sleep(10);
printf("Bye\n");
}
sub quick_exit {
printf("I will be exit in no time\n");
exit(1);
}
my $t1 = threads->create( \&say_hello, "param1", "param2" );
my $t2 = threads->create( {'exit'=>'thread_only'}, \&quick_exit );
$t1->join();
$t2->join();
use threads ('exit' => 'threads_only');
sub func {
...
if( $condition ) {
exit(1);
}
}
my $t1 = threads->create( \&func );
my $t2 = threads->create( \&func );
$t1->join();
$t2->join();
共享與同步
threads::shared
#!/usr/bin/perl
#
use threads;
use threads::shared;
use strict;
my $var :shared = 0; # use :share tag to define
my @array :shared = (); # use :share tag to define
my %hash = ();
share(%hash); # use share() funtion to define
sub start {
$var = 100;
@array[0] = 200;
@array[1] = 201;
$hash{'1'} = 301;
$hash{'2'} = 302;
}
sub verify {
sleep(1); # make sure thread t1 execute firstly
printf("var = $var\n"); # var=100
for(my $i = 0; $i < scalar(@array); $i++) {
printf("array[$i] = $array[$i]\n"); # array[0]=200; array[1]=201
}
foreach my $key ( sort( keys(%hash) ) ) {
printf("hash{$key} = $hash{$key}\n"); # hash{1}=301; hash{2}=302
}
}
my $t1 = threads->create( \&start );
my $t2 = threads->create( \&verify );
$t1->join();
$t2->join(); 死鎖常是多線程程序中最隱蔽的問題,往往難以發現與調試,也增加了排查問題的難度。為了避免在程序中死鎖的問題,在程序中我們應該盡量避免同時獲取多個共享變量的鎖,如果無法避免,那么一是要盡量使用相同的順序來獲取多個共享變量的鎖,另外也要盡可能地細化上鎖的粒度,減少上鎖的時間。
use threads::shared;
# in thread 1
{
lock( $share ); # lock for 3 seconds
sleep(3); # other threads can not lock again
}
# unlock implicitly now after the block
# in thread 2
{
lock($share); # will be blocked, as already locked by thread 1
$share++; # after thread 1 quit from the block
}
# unlock implicitly now after the block
use threads;
use threads::shared;
{
lock(@share); # the array has been locked
lock(%hash); # the hash has been locked
sleep(3); # other threads can not lock again
}
{
lock($share[1]); # error will occur
lock($hash{key}); # error will occur
}
信號量:Thread::Semaphore
my $semaphore = Thread::Semaphore->new( $max_threads ); #信號量
my $mutex = Thread::Semaphore->new( 1 ); #互斥量
區別:
1. 互斥量用于線程的互斥,信號量用于線程的同步。
這是互斥量和信號量的根本區別,也就是互斥和同步之間的區別。
互斥:是指某一資源同時只允許一個訪問者對其進行訪問,具有唯一性和排它性。但互斥無法限制訪問者對資源的訪問順序,即訪問是無序的。
同步:是指在互斥的基礎上(大多數情況),通過其它機制實現訪問者對資源的有序訪問。在大多數情況下,同步已經實現了互斥,特別是所有寫入資源的情況必定是互斥的。少數情況是指可以允許多個訪問者同時訪問資源
以上區別是主要想記住的。
note:信號量可以用來實現互斥量的功能
2. 互斥量值只能為0/1,信號量值可以為非負整數。
也就是說,一個互斥量只能用于一個資源的互斥訪問,它不能實現多個資源的多線程互斥問題。信號量可以實現多個同類資源的多線程互斥和同步。當信號量為單值信號量是,也可以完成一個資源的互斥訪問。
3. 互斥量的加鎖和解鎖必須由同一線程分別對應使用,信號量可以由一個線程釋放,另一個線程得到。
use threads; use threads::shared; use Thread::Semaphore; my $s = Thread::Semaphore->new(); $s->down(); # P operation ... $s->up(); # V operation
Thread::Queue:生產者-消費者模型對多線程隊列的使用。
生產者可以不斷地在線程隊列上做 enqueue 操作,而消費者只需要不斷地在線程隊列上做 dequeue 操作,這就很簡單地實現了生產者和消費者之間同步的問題。
#!/usr/bin/perl
#
use threads;
use Thread::Queue;
my $q = Thread::Queue->new();
sub produce {
my $name = shift;
while(1) {
my $r = int(rand(100));
$q->enqueue($r);
printf("$name produce $r\n");
sleep(int(rand(3)));
}
}
sub consume {
my $name = shift;
while(my $r = $q->dequeue()) {
printf("consume $r\n");
}
}
my $producer1 = threads->create(\&produce, "producer1");
my $producer2 = threads->create(\&produce, "producer2");
my $consumer1 = threads->create(\&consume, "consumer2");
$producer1->join();
$producer2->join();
$consumer1->join();