$Gearman::ResponseParser::VERSION
= version->declare(
"2.004.015"
);
sub
new {
my
$class
=
shift
;
my
%opts
=
@_
;
my
$src
=
delete
$opts
{
'source'
};
die
"unsupported arguments '@{[keys %opts]}'"
if
%opts
;
my
$self
=
bless
{
source
=>
$src
,
},
$class
;
$self
->
reset
;
return
$self
;
}
sub
source {
my
$self
=
shift
;
return
$self
->{source};
}
sub
on_packet {
my
(
$self
,
$packet
,
$parser
) =
@_
;
die
"SUBCLASSES SHOULD OVERRIDE THIS"
;
}
sub
on_error {
my
(
$self
,
$errmsg
,
$parser
) =
@_
;
die
"SUBCLASSES SHOULD OVERRIDE THIS"
;
}
sub
reset
{
my
$self
=
shift
;
$self
->{header} =
''
;
$self
->{pkt} =
undef
;
}
sub
parse_data {
my
(
$self
,
$data
) =
@_
;
my
$dataref
=
ref
$data
?
$data
: \
$data
;
my
$err
=
sub
{
my
$code
=
shift
;
$self
->on_error(
$code
);
return
undef
;
};
while
(
my
$lendata
=
length
$$data
) {
my
$hdr_len
=
length
$self
->{header};
unless
(
$hdr_len
== 12) {
my
$need
= 12 -
$hdr_len
;
$self
->{header} .=
substr
(
$$dataref
, 0,
$need
,
''
);
next
unless
length
$self
->{header} == 12;
my
(
$magic
,
$type
,
$len
) =
unpack
(
"a4NN"
,
$self
->{header});
return
$err
->(
"malformed_magic"
)
unless
$magic
eq
"\0RES"
;
my
$blob
=
""
;
$self
->{pkt} = {
type
=> Gearman::Util::cmd_name(
$type
),
len
=>
$len
,
blobref
=> \
$blob
,
};
next
;
}
my
$need
=
$self
->{pkt}{len} -
length
(${
$self
->{pkt}{blobref} });
my
$to_copy
=
$lendata
>
$need
?
$need
:
$lendata
;
${
$self
->{pkt}{blobref} } .=
substr
(
$$dataref
, 0,
$to_copy
,
''
);
if
(
$to_copy
==
$need
) {
$self
->on_packet(
$self
->{pkt},
$self
);
$self
->
reset
;
}
}
if
(
defined
(
$self
->{pkt})
&&
length
(${
$self
->{pkt}{blobref} }) ==
$self
->{pkt}{len})
{
$self
->on_packet(
$self
->{pkt},
$self
);
$self
->
reset
;
}
}
sub
eof
{
my
$self
=
shift
;
$self
->on_error(
"EOF"
);
}
sub
parse_sock {
my
(
$self
,
$sock
) =
@_
;
my
$res
= Gearman::Util::read_res_packet(
$sock
, \
my
$err
);
if
(
$err
) {
$self
->on_error(
"read_error: ${$err}"
);
return
;
}
$self
->{pkt} =
$res
;
if
(
defined
(
$self
->{pkt})
&&
length
(${
$self
->{pkt}{blobref} }) ==
$self
->{pkt}{len})
{
$self
->on_packet(
$self
->{pkt},
$self
);
$self
->
reset
;
}
}
1;