IO::Async::Stream->VERSION(
'0.59'
);
my
$CRLF
=
"\x0d\x0a"
;
struct
RequestContext
=> [
qw( req on_read stall_timer resp_header resp_bytes on_done is_done f )
],
named_constructor
=> 1;
use
constant
HTTP_MESSAGE_TRIMS_LWS
=> HTTP::Message->parse(
"Name: value "
)->header(
"Name"
) eq
"value"
;
sub
_init
{
my
$self
=
shift
;
$self
->SUPER::_init(
@_
);
$self
->{requests_in_flight} = 0;
}
sub
configure
{
my
$self
=
shift
;
my
%params
=
@_
;
foreach
(
qw( pipeline max_in_flight ready_queue decode_content is_proxy )
) {
$self
->{
$_
} =
delete
$params
{
$_
}
if
exists
$params
{
$_
};
}
if
(
my
$on_closed
=
$params
{on_closed} ) {
$params
{on_closed} =
sub
{
my
$self
=
shift
;
$self
->debug_printf(
"CLOSED in-flight=$self->{requests_in_flight}"
);
$self
->error_all(
"Connection closed"
);
undef
$self
->{ready_queue};
$on_closed
->(
$self
);
};
}
croak
"max_in_flight parameter required, may be zero"
unless
defined
$self
->{max_in_flight};
$self
->SUPER::configure(
%params
);
}
sub
should_pipeline
{
my
$self
=
shift
;
return
$self
->{pipeline} &&
$self
->{can_pipeline} &&
( !
$self
->{max_in_flight} ||
$self
->{requests_in_flight} <
$self
->{max_in_flight} );
}
sub
connect
{
my
$self
=
shift
;
my
%args
=
@_
;
my
$key
=
defined
$args
{path} ?
$args
{path} :
"$args{host}:$args{service}"
;
$self
->debug_printf(
"CONNECT $key"
);
defined
wantarray
or
die
"VOID ->connect"
;
$self
->SUPER::
connect
(
socktype
=>
"stream"
,
%args
)->on_done(
sub
{
$self
->debug_printf(
"CONNECTED"
);
});
}
sub
ready
{
my
$self
=
shift
;
my
$queue
=
$self
->{ready_queue} or
return
;
if
(
$self
->should_pipeline ) {
$self
->debug_printf(
"READY pipelined"
);
while
(
@$queue
&&
$self
->should_pipeline ) {
my
$ready
=
shift
@$queue
;
my
$f
=
$ready
->future;
next
if
$f
->is_cancelled;
$ready
->connecting and
$ready
->connecting->cancel;
$f
->done(
$self
);
}
}
elsif
(
@$queue
and
$self
->is_idle ) {
$self
->debug_printf(
"READY non-pipelined"
);
while
(
@$queue
) {
my
$ready
=
shift
@$queue
;
my
$f
=
$ready
->future;
next
if
$f
->is_cancelled;
$ready
->connecting and
$ready
->connecting->cancel;
$f
->done(
$self
);
last
;
}
}
else
{
$self
->debug_printf(
"READY cannot-run queue=%d idle=%s"
,
scalar
@$queue
,
$self
->is_idle ?
"Y"
:
"N"
);
}
}
sub
is_idle
{
my
$self
=
shift
;
return
$self
->{requests_in_flight} == 0;
}
sub
on_read
{
my
$self
=
shift
;
my
(
$buffref
,
$closed
) =
@_
;
while
(
my
$head
=
$self
->{request_queue}[0]) {
shift
@{
$self
->{request_queue} } and
next
if
$head
->is_done;
$head
->stall_timer->
reset
if
$head
->stall_timer;
my
$ret
=
$head
->on_read->(
$self
,
$buffref
,
$closed
,
$head
);
if
(
defined
$ret
) {
return
$ret
if
!
ref
$ret
;
$head
->on_read =
$ret
;
return
1;
}
$head
->is_done or
die
"ARGH: undef return without being marked done"
;
shift
@{
$self
->{request_queue} };
return
1
if
!
$closed
and
length
$$buffref
;
return
;
}
return
if
$closed
or !
length
$$buffref
;
$self
->invoke_error(
"Spurious on_read of connection while idle"
,
http_connection
=>
read
=>
$$buffref
);
$$buffref
=
""
;
}
sub
on_write_eof
{
my
$self
=
shift
;
$self
->error_all(
"Connection closed"
,
http
=>
undef
,
undef
);
}
sub
error_all
{
my
$self
=
shift
;
while
(
my
$head
=
shift
@{
$self
->{request_queue} } ) {
$head
->f->fail(
@_
)
unless
$head
->is_done or
$head
->f->is_ready;
}
}
sub
request
{
my
$self
=
shift
;
my
%args
=
@_
;
my
$on_header
=
$args
{on_header} or croak
"Expected 'on_header' as a CODE ref"
;
my
$req
=
$args
{request};
ref
$req
and
$req
->isa(
"HTTP::Request"
) or croak
"Expected 'request' as a HTTP::Request reference"
;
$self
->debug_printf(
"REQUEST %s %s"
,
$req
->method,
$req
->uri );
my
$request_body
=
$args
{request_body};
my
$expect_continue
= !!
$args
{expect_continue};
my
$method
=
$req
->method;
if
(
$method
eq
"POST"
or
$method
eq
"PUT"
or
length
$req
->content ) {
$req
->init_header(
"Content-Length"
,
length
$req
->content );
}
if
(
$expect_continue
) {
$req
->init_header(
"Expect"
,
"100-continue"
);
}
if
(
$self
->{decode_content} ) {
$req
->init_header(
"Accept-Encoding"
,
"gzip"
);
}
my
$f
=
$self
->loop->new_future
->set_label(
"$method "
.
$req
->uri );
$f
->on_cancel(
sub
{
$self
->debug_printf(
"CLOSE on_cancel"
);
$self
->close_now;
});
my
$stall_timer
;
if
(
$args
{stall_timeout} ) {
$stall_timer
= Net::Async::HTTP::StallTimer->new(
delay
=>
$args
{stall_timeout},
future
=>
$f
,
);
$self
->add_child(
$stall_timer
);
my
$remove_timer
=
sub
{
$self
->remove_child(
$stall_timer
)
if
$stall_timer
;
undef
$stall_timer
;
};
$f
->on_ready(
$remove_timer
);
}
push
@{
$self
->{request_queue} },
my
$ctx
= RequestContext(
req
=>
$req
,
on_read
=>
undef
,
stall_timer
=>
$stall_timer
,
resp_header
=>
undef
,
resp_bytes
=> 0,
on_done
=>
$args
{on_done},
is_done
=> 0,
f
=>
$f
,
);
my
$on_body_write
;
if
(
$stall_timer
or
$args
{on_body_write} ) {
my
$inner_on_body_write
=
$args
{on_body_write};
my
$written
= 0;
$on_body_write
=
sub
{
$stall_timer
->
reset
if
$stall_timer
;
$inner_on_body_write
->(
$written
+=
$_
[1] )
if
$inner_on_body_write
;
};
}
my
$write_request_body
=
defined
$request_body
?
sub
{
my
(
$self
) =
@_
;
$self
->
write
(
$request_body
,
on_write
=>
$on_body_write
);
} :
undef
;
my
$headers
=
$req
->headers->clone;
my
$path
;
if
(
$method
eq
"CONNECT"
) {
$path
=
$req
->uri->as_string;
}
else
{
my
$uri
=
$req
->uri;
if
(
$self
->{is_proxy} ) {
$path
=
$uri
->as_string;
}
else
{
$path
=
$uri
->path_query;
$path
=
"/$path"
unless
$path
=~ m{^/};
}
my
$authority
=
$uri
->authority;
if
(
defined
$authority
and
my
(
$user
,
$pass
,
$host
) =
$authority
=~ m/^(.*?):(.*)@(.*)$/ ) {
$headers
->init_header(
Host
=>
$host
);
$headers
->authorization_basic(
$user
,
$pass
);
}
else
{
$headers
->init_header(
Host
=>
$authority
);
}
}
my
$protocol
=
$req
->protocol ||
"HTTP/1.1"
;
my
@headers
= (
"$method $path $protocol"
);
$headers
->scan(
sub
{
my
(
$name
,
$value
) =
@_
;
$name
=~ s/^://;
push
@headers
,
"$name: $value"
;
} );
$stall_timer
->start
if
$stall_timer
;
$stall_timer
->reason =
"writing request"
if
$stall_timer
;
my
$on_header_write
=
$stall_timer
?
sub
{
$stall_timer
->
reset
} :
undef
;
$self
->
write
(
join
(
$CRLF
,
@headers
) .
$CRLF
.
$CRLF
,
on_write
=>
$on_header_write
);
$self
->
write
(
$req
->content,
on_write
=>
$on_body_write
)
if
length
$req
->content;
$write_request_body
->(
$self
)
if
$write_request_body
and !
$expect_continue
;
$self
->
write
(
""
,
on_flush
=>
sub
{
return
unless
$stall_timer
;
$stall_timer
->
reset
;
$stall_timer
->reason =
"waiting for response"
;
})
if
$stall_timer
;
$self
->{requests_in_flight}++;
$ctx
->on_read =
$self
->_mk_on_read_header(
$args
{previous_response},
$expect_continue
?
$write_request_body
:
undef
,
$on_header
);
return
$f
;
}
sub
_mk_on_read_header
{
shift
;
my
(
$previous_response
,
$write_request_body
,
$on_header
) =
@_
;
sub
{
my
(
$self
,
$buffref
,
$closed
,
$ctx
) =
@_
;
my
$req
=
$ctx
->req;
my
$stall_timer
=
$ctx
->stall_timer;
my
$f
=
$ctx
->f;
if
(
$stall_timer
) {
$stall_timer
->reason =
"receiving response header"
;
$stall_timer
->
reset
;
}
if
(
length
$$buffref
>= 4 and
$$buffref
!~ m/^HTTP/ ) {
$self
->debug_printf(
"ERROR fail"
);
$f
->fail(
"Did not receive HTTP response from server"
,
http
=>
undef
,
$req
)
unless
$f
->is_cancelled;
$self
->close_now;
}
unless
(
$$buffref
=~ s/^(.*?
$CRLF
$CRLF
)//s ) {
if
(
$closed
) {
$self
->debug_printf(
"ERROR closed"
);
$f
->fail(
"Connection closed while awaiting header"
,
http
=>
undef
,
$req
)
unless
$f
->is_cancelled;
$self
->close_now;
}
return
0;
}
$ctx
->resp_bytes += $+[0];
my
$header
= HTTP::Response->parse( $1 );
(
my
$status_line
=
$header
->status_line ) =~ s/\r$//;
$ctx
->resp_header =
$header
;
unless
( HTTP_MESSAGE_TRIMS_LWS ) {
my
@headers
;
$header
->scan(
sub
{
my
(
$name
,
$value
) =
@_
;
s/^\s+//, s/\s+$//
for
$value
;
push
@headers
,
$name
=>
$value
;
} );
$header
->header(
@headers
)
if
@headers
;
}
my
$protocol
=
$header
->protocol;
if
(
$protocol
=~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) {
$self
->{can_pipeline} = 1;
}
if
(
$header
->code =~ m/^1/ ) {
$self
->debug_printf(
"HEADER [provisional] %s"
,
$status_line
);
$write_request_body
->(
$self
)
if
$write_request_body
;
return
1;
}
$header
->request(
$req
);
$header
->previous(
$previous_response
)
if
$previous_response
;
$self
->debug_printf(
"HEADER %s"
,
$status_line
);
my
$on_body_chunk
=
$on_header
->(
$header
);
my
$code
=
$header
->code;
my
$content_encoding
=
$header
->header(
"Content-Encoding"
);
my
$decoder
;
if
(
$content_encoding
and
$decoder
= Net::Async::HTTP->can_decode(
$content_encoding
) ) {
$header
->init_header(
"X-Original-Content-Encoding"
=>
$header
->remove_header(
"Content-Encoding"
) );
}
my
$connection_close
=
lc
(
$header
->header(
"Connection"
) || (
$self
->{can_pipeline} ?
"keep-alive"
:
"close"
) )
eq
"close"
;
if
(
$connection_close
) {
$self
->{max_in_flight} = 1;
}
elsif
(
defined
(
my
$keep_alive
=
lc
(
$header
->header(
"Keep-Alive"
) ||
""
) ) ) {
my
(
$max
) = (
$keep_alive
=~ m/max=(\d+)/ );
$self
->{max_in_flight} =
$max
if
$max
&&
$max
<
$self
->{max_in_flight};
}
my
$on_more
=
sub
{
my
(
$chunk
) =
@_
;
if
(
$decoder
and not
eval
{
$chunk
=
$decoder
->(
$chunk
); 1 } ) {
$self
->debug_printf(
"ERROR decode failed"
);
$f
->fail(
"Decode error $@"
,
http
=>
undef
,
$req
);
$self
->
close
;
return
undef
;
}
$on_body_chunk
->(
$chunk
);
return
1;
};
my
$on_done
=
sub
{
my
(
$ctx
) =
@_
;
$ctx
->is_done++;
$self
->
close
if
$connection_close
;
my
$final
;
if
(
$decoder
and not
eval
{
$final
=
$decoder
->(); 1 } ) {
$self
->debug_printf(
"ERROR decode failed"
);
$f
->fail(
"Decode error $@"
,
http
=>
undef
,
$req
);
$self
->
close
;
return
undef
;
}
$on_body_chunk
->(
$final
)
if
defined
$final
and
length
$final
;
my
$response
=
$on_body_chunk
->();
my
$e
=
eval
{
$f
->done(
$response
)
unless
$f
->is_cancelled; 1 } ?
undef
: $@;
$ctx
->on_done->(
$ctx
)
if
$ctx
->on_done;
$self
->{requests_in_flight}--;
$self
->debug_printf(
"DONE remaining in-flight=$self->{requests_in_flight}"
);
$self
->ready;
if
(
defined
$e
) {
chomp
$e
;
$self
->invoke_error(
$e
,
perl
=> );
}
return
undef
;
};
if
(
$req
->method eq
"HEAD"
or
$code
=~ m/^1..$/ or
$code
eq
"204"
or
$code
eq
"304"
) {
$self
->debug_printf(
"BODY done [none]"
);
return
$on_done
->(
$ctx
);
}
my
$transfer_encoding
=
$header
->header(
"Transfer-Encoding"
);
my
$content_length
=
$header
->content_length;
if
(
defined
$transfer_encoding
and
$transfer_encoding
eq
"chunked"
) {
$self
->debug_printf(
"BODY chunks"
);
$stall_timer
->reason =
"receiving body chunks"
if
$stall_timer
;
return
$self
->_mk_on_read_chunked(
$on_more
,
$on_done
);
}
elsif
(
defined
$content_length
) {
$self
->debug_printf(
"BODY length $content_length"
);
if
(
$content_length
== 0 ) {
$self
->debug_printf(
"BODY done [length=0]"
);
return
$on_done
->(
$ctx
);
}
$stall_timer
->reason =
"receiving body"
if
$stall_timer
;
return
$self
->_mk_on_read_length(
$content_length
,
$on_more
,
$on_done
);
}
else
{
$self
->debug_printf(
"BODY until EOF"
);
$stall_timer
->reason =
"receiving body until EOF"
if
$stall_timer
;
return
$self
->_mk_on_read_until_eof(
$on_more
,
$on_done
);
}
};
}
sub
_mk_on_read_chunked
{
shift
;
my
(
$on_more
,
$on_done
) =
@_
;
my
$chunk_length
;
sub
{
my
(
$self
,
$buffref
,
$closed
,
$ctx
) =
@_
;
my
$req
=
$ctx
->req;
my
$f
=
$ctx
->f;
if
( !
defined
$chunk_length
and
$$buffref
=~ s/^(.*?)
$CRLF
// ) {
my
$header
= $1;
$ctx
->resp_bytes += $+[0];
unless
(
$header
=~ s/^([A-Fa-f0-9]+).*// ) {
$f
->fail(
"Corrupted chunk header"
,
http
=>
undef
,
$req
)
unless
$f
->is_cancelled;
$self
->close_now;
return
0;
}
$chunk_length
=
hex
( $1 );
return
1
if
$chunk_length
;
return
$self
->_mk_on_read_chunk_trailer(
$req
,
$on_more
,
$on_done
,
$f
);
}
if
(
defined
$chunk_length
and
length
(
$$buffref
) >=
$chunk_length
+ 2 ) {
my
$chunk
=
substr
(
$$buffref
, 0,
$chunk_length
,
""
);
$ctx
->resp_bytes +=
length
$chunk
;
unless
(
$$buffref
=~ s/^
$CRLF
// ) {
$self
->debug_printf(
"ERROR chunk without CRLF"
);
$f
->fail(
"Chunk of size $chunk_length wasn't followed by CRLF"
,
http
=>
undef
,
$req
)
unless
$f
->is_cancelled;
$self
->
close
;
}
$ctx
->resp_bytes += $+[0];
undef
$chunk_length
;
return
$on_more
->(
$chunk
);
}
if
(
$closed
) {
$self
->debug_printf(
"ERROR closed"
);
$f
->fail(
"Connection closed while awaiting chunk"
,
http
=>
undef
,
$req
)
unless
$f
->is_cancelled;
}
return
0;
};
}
sub
_mk_on_read_chunk_trailer
{
shift
;
my
(
undef
,
$on_more
,
$on_done
) =
@_
;
my
$trailer
=
""
;
sub
{
my
(
$self
,
$buffref
,
$closed
,
$ctx
) =
@_
;
my
$req
=
$ctx
->req;
my
$f
=
$ctx
->f;
if
(
$closed
) {
$self
->debug_printf(
"ERROR closed"
);
$f
->fail(
"Connection closed while awaiting chunk trailer"
,
http
=>
undef
,
$req
)
unless
$f
->is_cancelled;
}
$$buffref
=~ s/^(.*)
$CRLF
// or
return
0;
$trailer
.= $1;
$ctx
->resp_bytes += $+[0];
return
1
if
length
$1;
$self
->debug_printf(
"BODY done [chunked]"
);
return
$on_done
->(
$ctx
);
};
}
sub
_mk_on_read_length
{
shift
;
my
(
$content_length
,
$on_more
,
$on_done
) =
@_
;
sub
{
my
(
$self
,
$buffref
,
$closed
,
$ctx
) =
@_
;
my
$req
=
$ctx
->req;
my
$f
=
$ctx
->f;
my
$content
=
substr
(
$$buffref
, 0,
$content_length
,
""
);
$content_length
-=
length
$content
;
$ctx
->resp_bytes +=
length
$content
;
return
undef
unless
$on_more
->(
$content
);
if
(
$content_length
== 0 ) {
$self
->debug_printf(
"BODY done [length]"
);
return
$on_done
->(
$ctx
);
}
if
(
$closed
) {
$self
->debug_printf(
"ERROR closed"
);
$f
->fail(
"Connection closed while awaiting body"
,
http
=>
undef
,
$req
)
unless
$f
->is_cancelled;
}
return
0;
};
}
sub
_mk_on_read_until_eof
{
shift
;
my
(
$on_more
,
$on_done
) =
@_
;
sub
{
my
(
$self
,
$buffref
,
$closed
,
$ctx
) =
@_
;
my
$content
=
$$buffref
;
$$buffref
=
""
;
$ctx
->resp_bytes +=
length
$content
;
return
undef
unless
$on_more
->(
$content
);
return
0
unless
$closed
;
$self
->debug_printf(
"BODY done [eof]"
);
return
$on_done
->(
$ctx
);
};
}
0x55AA;