Как использовать потоки в Perl?

#multithreading #perl

#многопоточность #perl

Вопрос:

Я хочу использовать потоки в Perl, чтобы увеличить скорость моей программы… например, я хочу использовать 20 потоков в этом коде:

 use IO::Socket;
my $in_file2 = 'rang.txt';
open DAT,$in_file2;
my @ip=<DAT>;
close DAT;
chomp(@ip);
foreach my $ip(@ip)
{
    $host = IO::Socket::INET->new(
        PeerAddr => $ip,
        PeerPort => 80,
        proto    => 'tcp',
        Timeout=> 1
    ) 
    and open(OUT, ">>port.txt");
    print OUT $ip."n";
    close(OUT);
}
 

В приведенном выше коде мы даем список IP-адресов и сканируем заданный порт. Я хочу использовать потоки в этом коде. Есть ли какой-либо другой способ увеличить скорость моего кода?

Спасибо.

Ответ №1:

Вместо использования потоков вы можете захотеть заглянуть в AnyEvent::Socket, или Coro::Socket, или POE, или Parallel::ForkManager .

Ответ №2:

Ответ №3:

Perl может выполнять как потоковую обработку, так и разветвление. «потоки» официально не рекомендуются — в немалой степени потому, что они недостаточно понятны и — возможно, немного нелогично — не являются легкими, как потоки в некоторых языках программирования.

Если вы особенно заинтересованы в потоках, «рабочая» модель потоковой обработки работает намного лучше, чем создание потока для каждой задачи. Вы можете сделать последнее на некоторых языках — в perl это очень неэффективно.

Таким образом, вы могли бы сделать что-то вроде этого:

 #!/usr/bin/env perl

use strict;
use warnings;

use threads;
use Thread::Queue;
use IO::Socket;

my $nthreads = 20;

my $in_file2 = 'rang.txt';

my $work_q   = Thread::Queue->new;
my $result_q = Thread::Queue->new;

sub ip_checker {
    while ( my $ip = $work_q->dequeue ) {
        chomp($ip);
        $host = IO::Socket::INET->new(
            PeerAddr => $ip,
            PeerPort => 80,
            proto    => 'tcp',
            Timeout  => 1
        );
        if ( defined $host ) {
            $result_q->enqueue($ip);
        }
    }
}

sub file_writer {
    open( my $output_fh, ">>", "port.txt" ) or die $!;
    while ( my $ip = $result_q->dequeue ) {
        print {$output_fh} "$ipn";
    }
    close($output_fh);
}


for ( 1 .. $nthreads ) {
    push( @workers, threads->create( amp;ip_checker ) );
}
my $writer = threads->create( amp;file_writer );

open( my $dat, "<", $in_file2 ) or die $!;
$work_q->enqueue(<$dat>);
close($dat);
$work_q->end;

foreach my $thr (@workers) {
    $thr->join();
}

$result_q->end;
$writer->join();
 

При этом используется очередь для подачи набора (20) рабочих потоков со списком IP-адресов и прохождения их по ним, сопоставления и печати результатов через writer поток.

Но поскольку потоки больше не рекомендуются, лучшим способом может быть использование Parallel::ForkManager which с вашим кодом может выглядеть примерно так:

 #!/usr/bin/env perl

use strict;
use warnings;

use Fcntl qw ( :flock );
use IO::Socket;

my $in_file2 = 'rang.txt';
open( my $input,  "<", $in_file2 )  or die $!;
open( my $output, ">", "port.txt" ) or die $!;

my $manager = Parallel::ForkManager->new(20);
foreach my $ip (<$input>) {
    $manager->start and next;

    chomp($ip);
    my $host = IO::Socket::INET->new(
        PeerAddr => $ip,
        PeerPort => 80,
        proto    => 'tcp',
        Timeout  => 1
    );
    if ( defined $host ) {
        flock( $output, LOCK_EX );    #exclusive or write lock
        print {$output} $ip, "n";
        flock( $output, LOCK_UN );    #unlock
    }
    $manager->finish;
}
$manager->wait_all_children;
close($output);
close($input);
 

Вам нужно быть особенно осторожным с вводом-выводом файлов при многопроцессорной обработке, потому что весь смысл в том, что ваша последовательность выполнения больше не четко определена. Таким образом, безумно легко в конечном итоге получить разные потоки, которые закрывают файлы, открытые другим потоком, но не загруженные на диск.

Я отмечаю ваш код — вы, кажется, полагаетесь на сбой открытия файла, чтобы не печатать в нем. Это не очень приятное занятие, особенно когда ваш дескриптор файла не имеет лексической области.

Но в обеих парадигмах многопроцессорной обработки, которые я изложил выше (есть и другие, они наиболее распространены), вам все равно придется иметь дело с сериализацией ввода-вывода файлов. Обратите внимание, что ваши «результаты» будут в случайном порядке в обоих, потому что это будет очень сильно зависеть от того, когда задача завершится. Если это важно для вас, то вам нужно будет сопоставить и отсортировать после завершения ваших потоков или форков.

Вероятно, обычно лучше смотреть в сторону разветвления — как сказано выше, в threads документах:

«Потоки на основе интерпретатора», предоставляемые Perl, не являются быстрой и легкой системой для многозадачности, на которую можно было бы ожидать или надеяться. Потоки реализованы таким образом, что ими легко злоупотреблять. Мало кто знает, как правильно их использовать или сможет оказать помощь. Использование потоков на основе интерпретатора в perl официально не рекомендуется.