Skip to content

Instantly share code, notes, and snippets.

@maximusfox
Last active May 12, 2020 23:02
Show Gist options
  • Save maximusfox/5e24ca404baa3057cb40f133c58eac84 to your computer and use it in GitHub Desktop.
Save maximusfox/5e24ca404baa3057cb40f133c58eac84 to your computer and use it in GitHub Desktop.
Async sockets file downloading example in Perl (don't work on Windows)
#!/usr/bin/env perl
use strict;
use warnings;
use Fcntl;
use IO::Select;
use IO::Socket::INET;
use constant TIMEOUT_CAN_WRITE => 0;
use constant TIMEOUT_CAN_READ => 0;
use constant TIMEOUT_CAN_EXCEPT => 0;
use constant RESPONSE_TIMEOUT => 20;
use constant CUNK_SIZE => 2048;
use constant CHUNK_DETAILS => 0;
my $files_to_downloading = {
# http://localhost:8000/PT_Security/[PT_Securtiy]%20%D0%90%D1%82%D0%B0%D0%BA%D0%B8%20%D0%BD%D0%B0%20%D0%BA%D0%BB%D0%B8%D0%B5%D0%BD%D1%82%D0%BE%D0%B2..mp4
'data/1.mp4' => {
host => 'localhost',
port => 8000,
path => '/PT_Security/[PT_Securtiy]%20%D0%90%D1%82%D0%B0%D0%BA%D0%B8%20%D0%BD%D0%B0%20%D0%BA%D0%BB%D0%B8%D0%B5%D0%BD%D1%82%D0%BE%D0%B2..mp4',
param => ''
},
# http://localhost:8000/PT_Security/[PT_Secrurity]%20%D0%9E%D1%81%D0%BD%D0%BE%D0%B2%D1%8B%20Reverse%20Engineering.mp4
'data/2.mp4' => {
host => 'localhost',
port => 8000,
path => '/PT_Security/[PT_Secrurity]%20%D0%9E%D1%81%D0%BD%D0%BE%D0%B2%D1%8B%20Reverse%20Engineering.mp4',
param => ''
},
};
# Переводим STDOUT в неблокируемый режим
my $flags = 0;
fcntl(STDOUT, F_GETFL, $flags)
or die "[FATAL] Cannot get flags for STDOUT [$!]";
$flags |= O_NONBLOCK;
fcntl(STDOUT, F_SETFL, $flags)
or die "[FATAL] Cannot set flags for STDOUT [$!]";
# Подключаемся к серверам и открываем файлы на запись
my @sockets;
my $socket_info = {};
for my $local_path (keys %{$files_to_downloading}) {
my $remote = $files_to_downloading->{$local_path};
# Предварительная обработка строки параметров
$remote->{param} = '?'.$remote->{param}
if (defined $remote->{param} and $remote->{param} ne '');
my $socket = IO::Socket::INET->new(
PeerAddr => $remote->{host},
PeerPort => $remote->{port},
Proto => 'tcp',
Blocking => 0
);
sysopen (my $out_file, $local_path, O_RDWR | O_CREAT | O_APPEND | O_NONBLOCK )
or die "Cannot open $local_path! [$!]";
push @sockets, $socket;
$socket_info->{$socket} = {
# Если файл скачан частично, устанавливаем флаг дозагрузки (количество уже загруженых байт)
continue => -s($local_path) || 0,
local_path => $local_path,
req_sent => 0,
out_fh => $out_file,
bytes_read => 0,
content_length => undef,
last_changes => time(),
};
}
my $select = IO::Select->new(@sockets);
while ($select->count > 0) {
# Когда появляется возможность записи в сокет, отправляем запросы
my @writetable = $select->can_write(TIMEOUT_CAN_WRITE);
for my $socket (@writetable) {
my $this_socket_info = $socket_info->{$socket};
# Если запрос в сокет отправлен не был, то отправить
unless ($this_socket_info->{req_sent}) {
my $local_path = $this_socket_info->{local_path};
my $remote = $files_to_downloading->{$local_path};
syswrite STDOUT, sprintf (
"[SEND_REQUEST] Taret [http://%s:%d%s%s]\n",
$remote->{host}, $remote->{port},
$remote->{path}, $remote->{param}
);
send_request($socket, $remote, $this_socket_info->{continue});
$this_socket_info->{req_sent} = 1;
$this_socket_info->{last_changes} = time();
}
}
my @readable = $select->can_read(TIMEOUT_CAN_READ);
for my $socket (@readable) {
my $this_socket_info = $socket_info->{$socket};
my $local_path = $this_socket_info->{local_path};
my $remote = $files_to_downloading->{$local_path};
# Если это первое чтение из сокета - парсим заголовки
unless (defined $this_socket_info->{content_length}) {
my $header = read_header($socket);
my $headers_struct = header_parser($header);
$this_socket_info->{content_length} = $headers_struct->{'Content-Length'};
$this_socket_info->{last_changes} = time();
syswrite STDOUT, sprintf (
"[PARS_RESPONSE] Taret [http://%s:%d%s%s] Size [%015d]\n",
$remote->{host}, $remote->{port},
$remote->{path}, $remote->{param},
$this_socket_info->{content_length} || 0
);
# Если код ответа не 200 или 206, считаем что ошибка на стороне сервера
if ($headers_struct->{Code} != 200 and $headers_struct->{Code} != 206) {
# Сообщаем об ошибке и закрываем соединение
syswrite STDOUT, sprintf (
"[SERVERSIDE_ERROR] Taret [http://%s:%d%s%s] Code [%d] Message [%s]\n",
$remote->{host}, $remote->{port},
$remote->{path}, $remote->{param},
$headers_struct->{Code}, $headers_struct->{Message}
);
close_connection($socket, $select);
next;
}
}
# Считываем часть данных из текущего сокета
my $read = save_chunk($socket, $this_socket_info->{out_fh}, CUNK_SIZE);
if (defined $read and $read > 0) {
# Если считали данные из сокета
$this_socket_info->{bytes_read} += $read;
$this_socket_info->{last_changes} = time();
syswrite STDOUT, sprintf (
"[CHUNK_SAVED] Remote [http://%s:%d%s%s] Local [%s] Chunk size [%015d] Total size [%015d]\n",
$remote->{host}, $remote->{port},
$remote->{path}, $remote->{param},
$local_path, $read, $this_socket_info->{bytes_read}
) if CHUNK_DETAILS;
} elsif (defined $read and $read <= 0) {
# Если читать больше нечего
syswrite STDOUT, sprintf(
"[DONE] Location [%s] Size [%015d] Downloaded now [%015d]\n",
$this_socket_info->{local_path},
$this_socket_info->{bytes_read} + $this_socket_info->{continue},
$this_socket_info->{bytes_read}
);
close_connection($socket, $select);
}
}
my @exception = $select->has_exception(TIMEOUT_CAN_EXCEPT);
for my $socket (@exception) {
my $local_path = $socket_info->{$socket}{local_path};
my $remote = $files_to_downloading->{$local_path};
close_connection($socket, $select);
syswrite STDOUT, sprintf (
"[CONNECTION_EXCEPT] Cannot connect to [http://%s:%d%s%s] Error [%s]\n",
$remote->{host}, $remote->{port},
$remote->{path}, $remote->{param},
$!
);
}
# Обрываем соединене с серверами которые не отвечают продолжительное время
for my $socket ($select->handles) {
my $this_socket_info = $socket_info->{$socket};
my $local_path = $this_socket_info->{local_path};
my $remote = $files_to_downloading->{$local_path};
if (time()-$this_socket_info->{last_changes} >= RESPONSE_TIMEOUT) {
# Закрываем соединение по таймауту если необходимо
syswrite STDOUT, sprintf (
"[TIMEOUT] Taret [http://%s:%d%s%s]\n",
$remote->{host}, $remote->{port},
$remote->{path}, $remote->{param}
);
close_connection($socket, $select);
}
}
}
print "[INFO] Finish!\n";
sub close_connection {
my ($socket, $select) = @_;
$socket_info->{$socket} = undef if exists $socket_info->{$socket};
$select->remove($socket);
$socket->close;
}
sub save_chunk {
my ($socket, $out_handler, $size) = @_;
my $buf;
my $rv = sysread($socket, $buf, $size);
syswrite($out_handler, $buf) if defined $rv and $rv > 0;
return $rv;
}
sub read_header {
my ($socket) = @_;
my ($header, $buf);
while(sysread $socket, $buf, 1) {
$header .= $buf;
last if $header =~ /\r\n\r\n/;
}
$header =~ s/\r\n\r\n$//;
return $header;
}
sub header_parser {
my ($text_header) = @_;
my %result_headers = ();
my @headers_text = split("\n", $text_header);
map { $_ =~ s/[\r\n]//g } @headers_text;
(
$result_headers{Protocol},
$result_headers{Code},
$result_headers{Message}
) = shift(@headers_text) =~ m/^([^\s]+)\s(\d{3})\s(.+)$/;
while (@headers_text) {
my ($header, $value) = shift(@headers_text) =~ m/^(.+?):\s+(.+)$/;
$result_headers{$header} = $value;
}
return \%result_headers;
}
sub send_request {
my ($socket, $remote_file_path, $continue) = @_;
my $request = request_builder(
$remote_file_path->{host},
$remote_file_path->{path},
$remote_file_path->{param},
$continue
);
$socket->send($request);
}
sub request_builder {
my ($host, $path, $param, $continue) = @_;
my $request = 'GET '.$path.$param." HTTP/1.0\r\n".
"Host: $host\r\n".
($continue > 0?'Range: bytes='.$continue."-\r\n":'').
"\r\n";
return $request;
}
# Just example about work with HTTP
package Web;
use strict;
use IO::Socket::INET;
use IO::Socket::SSL;
use Carp qw(croak);
use MIME::Base64 qw(encode_base64);
our $VERSION = 0.1;
sub new
{
my ($self, %cfg) = @_;
my $obj = bless
{
agent => $cfg{agent} || 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.1b4pre)',
timeout => $cfg{timeout} || 20,
}, $self;
return $obj;
}
sub set_cookie
{
my ($self, $cookie) = @_;
$self->{cookie} = $cookie;
}
sub header
{
my ($self, %headers) = @_;
$self->{headers} .= "$_\r\n" for map("$_: $headers{$_}", keys %headers);
}
sub proxy_auth
{
my ($self, $login, $passw) = @_;
$self->{p_auth} = encode_base64 $login.':'.$passw, '';
}
sub socks_auth
{
my ($self, $login, $passw) = @_;
my $l_len = chr length $login;
my $p_len = chr length $passw;
$self->{s_auth} = pack('H*','01').$l_len.$login.$p_len.$passw;
}
sub proxy
{
my ($self, $proxy_type, $proxy) = @_;
($self->{proxy_host}, $self->{proxy_port}) = split /:/, $proxy;
$self->{proxy} = $proxy_type && $self->{proxy_host} && $self->{proxy_port} ? $proxy_type : 0;
}
sub request
{
my ($self, $method, $host, $port, $uri, $payload, $limit) = @_;
my $scheme;
($scheme, $host) = ($host =~ m{^(https*)://(.+)$});
my $header =
"$method $scheme://$host:$port$uri HTTP/1.0\r\n".
"Host: $host\r\n".
"User-Agent: $self->{agent}\r\n".
($self->{p_auth} ? "Proxy-Authorization: Basic $self->{p_auth}\r\n" : "").
($self->{cookie} ? "Cookie: $self->{cookie}\r\n" : "").
($self->{headers} ? $self->{headers} : "").
"Connection: close\r\n";
if($method eq 'GET')
{
$header .= "\r\n";
}
elsif(defined $payload && $method eq 'POST')
{
if(ref $payload eq 'HASH')
{
my $data;
$header .= "Content-Type: multipart/form-data; boundary=2011\r\n";
for my $key(keys %{$payload}) #fuu...
{
if(ref $payload->{$key} eq 'ARRAY')
{
$data .=
"--2011\r\n".
"Content-Disposition: form-data; name=\"$key\"; filename=\"".(@{$payload->{$key}}[0] =~ m{^(?:.*/|)(.+)$})[0]."\"\r\n".
"Content-Type: application/octetstream\r\n\r\n".
load_bin(@{$payload->{$key}}[0])."\r\n";
}
else
{
$data .=
"--2011\r\n".
"Content-Disposition: form-data; name=\"$key\"\r\n\r\n".
"$payload->{$key}\r\n";
}
}
$payload = $data."--2011--";
}
else
{
$header .= "Content-Type: application/x-www-form-urlencoded\r\n";
}
$header .= "Content-Length: ".length($payload)."\r\n\r\n".$payload;
}
$self->{host} = $host;
$self->{port} = $port;
my $socket;
if($scheme eq 'https' && !$self->{proxy})
{
$socket = IO::Socket::SSL->new
(
PeerAddr => $self->{host},
PeerPort => $self->{port},
PeerProto => 'tcp',
TimeOut => $self->{timeout},
);
}
elsif($scheme eq 'http' || $scheme eq 'https')
{
$socket = IO::Socket::INET->new
(
PeerAddr => $self->{proxy} ? $self->{proxy_host} : $self->{host},
PeerPort => $self->{proxy} ? $self->{proxy_port} : $self->{port},
PeerProto => 'tcp',
TimeOut => $self->{timeout},
);
}
else
{
croak "Unsupported scheme: $scheme";
}
#re
unless($socket)
{
warn "Can't connect to ".($self->{proxy} ? $self->{proxy_host} : $self->{host})."\n";
return '';
}
elsif($socket && $scheme eq 'https' && $self->{proxy} == 1)
{
syswrite $socket, "CONNECT $host:$port HTTP/1.0\r\n\r\n";
my $answ = '';
while(sysread $socket, my $buf, 64)
{
$answ .= $buf;
last if $buf =~ /\r\n\r\n/; #erroneous implementation
}
if($answ =~ /Connection Established/)
{
IO::Socket::SSL->start_SSL($socket);
}
else
{
close $socket;
return '';
}
}
elsif($socket && $self->{proxy} == 2)
{
if(socks($self, $socket))
{
IO::Socket::SSL->start_SSL($socket) if $scheme eq 'https';
}
else
{
close $socket;
return '';
}
}
syswrite $socket, $header;
my $answ = '';
while(sysread $socket, my $buf, 32)
{
$answ .= $buf;
last if $limit && length $answ >= $limit;
}
close $socket;
return $answ;
}
sub socks
{
my ($self, $socket) = @_;
my $hex = pack 'H*', '05020002';
syswrite $socket, $hex;
sysread $socket, my $result, 4;
$result = unpack 'H*', $result;
if($result == '0502' && $self->{s_auth})
{
syswrite $socket, $self->{s_auth};
sysread $socket, $result, 4;
$result = substr unpack('H*', $result), 3, 1;
return undef if $result;
}
elsif($result != '0500')
{
return undef;
}
my $len_h = chr length $self->{host};
$hex = pack('H*','05010003').$len_h.$self->{host}.pack('n',$self->{port});
syswrite $socket, $hex;
sysread $socket, $result, 100;
$result = substr unpack('H*', $result), 3, 1;
return $result ? undef : 1;
}
sub load_bin
{
my ($file) = @_;
open F, '<', $file or warn "Can't open file $file - $!\n";
binmode F;
undef local $/;
$file = <F>;
close F;
return $file
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment