sub
_identity
{
return
$_
[0];
}
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_identity
,
'type_agg'
,
'AGGREGATOR'
);
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_identity
,
'type_valuation'
,
'VALUATION'
);
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_identity
,
'type_scalar'
,
'SCALAR'
);
sub
_snippet_upgrade
{
my
$snippet
=
shift
;
my
$ret
= App::RecordStream::DomainLanguage::Value->new();
$ret
->add_possibility(
'VALUATION'
, App::RecordStream::DomainLanguage::Valuation::Sub->new(
sub
{
return
$snippet
->evaluate_as(
'SCALAR'
, {
'$r'
=>
$_
[0]}); }));
$ret
->add_possibility(
'SNIPPET'
,
$snippet
);
return
$ret
;
}
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_snippet_upgrade
,
'snip'
,
'SNIPPET'
);
sub
_rec_valuation
{
return
App::RecordStream::DomainLanguage::Valuation::Sub->new(
sub
{
return
$_
[0]; });
}
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_rec_valuation
,
'record'
);
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_rec_valuation
,
'rec'
);
sub
_raw_valuation
{
my
$v
=
shift
;
if
(
ref
(
$v
) eq
"CODE"
)
{
return
App::RecordStream::DomainLanguage::Valuation::Sub->new(
$v
);
}
return
App::RecordStream::DomainLanguage::Valuation::KeySpec->new(
$v
);
}
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_raw_valuation
,
'valuation'
,
'SCALAR'
);
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_raw_valuation
,
'val'
,
'SCALAR'
);
sub
inject_into_aggregator
{
my
$initial_snippet
=
shift
;
my
$combine_snippet
=
shift
;
my
$squish_snippet
=
shift
|| App::RecordStream::DomainLanguage::Snippet->new(
'$a'
);
my
$initial_sub
=
sub
{
return
$initial_snippet
->evaluate_as(
'SCALAR'
);
};
my
$combine_sub
=
sub
{
my
$cookie
=
shift
;
my
$record
=
shift
;
return
$combine_snippet
->evaluate_as(
'SCALAR'
, {
'$a'
=>
$cookie
,
'$r'
=>
$record
});
};
my
$squish_sub
=
sub
{
my
$cookie
=
shift
;
return
$squish_snippet
->evaluate_as(
'SCALAR'
, {
'$a'
=>
$cookie
});
};
return
App::RecordStream::Aggregator::InjectInto::Subrefs->new(
$initial_sub
,
$combine_sub
,
$squish_sub
);
}
for
my
$ii_name
(
'ii'
,
'inject_into'
)
{
for
my
$agg_name
(
'agg'
,
'aggregator'
)
{
App::RecordStream::DomainLanguage::Registry::register_fn(\
&inject_into_aggregator
,
$ii_name
.
'_'
.
$agg_name
,
'SNIPPET'
,
'SNIPPET'
);
App::RecordStream::DomainLanguage::Registry::register_fn(\
&inject_into_aggregator
,
$ii_name
.
'_'
.
$agg_name
,
'SNIPPET'
,
'SNIPPET'
,
'SNIPPET'
);
}
}
sub
map_reduce_aggregator
{
my
$map_snippet
=
shift
;
my
$reduce_snippet
=
shift
;
my
$squish_snippet
=
shift
|| App::RecordStream::DomainLanguage::Snippet->new(
'$a'
);
my
$map_sub
=
sub
{
my
$record
=
shift
;
return
$map_snippet
->evaluate_as(
'SCALAR'
, {
'$r'
=>
$record
});
};
my
$reduce_sub
=
sub
{
my
$cookie1
=
shift
;
my
$cookie2
=
shift
;
return
$reduce_snippet
->evaluate_as(
'SCALAR'
, {
'$a'
=>
$cookie1
,
'$b'
=>
$cookie2
});
};
my
$squish_sub
=
sub
{
my
$cookie
=
shift
;
return
$squish_snippet
->evaluate_as(
'SCALAR'
, {
'$a'
=>
$cookie
});
};
return
App::RecordStream::Aggregator::MapReduce::Subrefs->new(
$map_sub
,
$reduce_sub
,
$squish_sub
);
}
for
my
$mr_name
(
'mr'
,
'map_reduce'
)
{
for
my
$agg_name
(
'agg'
,
'aggregator'
)
{
App::RecordStream::DomainLanguage::Registry::register_fn(\
&map_reduce_aggregator
,
$mr_name
.
'_'
.
$agg_name
,
'SNIPPET'
,
'SNIPPET'
);
App::RecordStream::DomainLanguage::Registry::register_fn(\
&map_reduce_aggregator
,
$mr_name
.
'_'
.
$agg_name
,
'SNIPPET'
,
'SNIPPET'
,
'SNIPPET'
);
}
}
sub
_subset_agg
{
my
$match_snippet
=
shift
;
my
$aggregator
=
shift
;
my
$initial_sub
=
sub
{
return
$aggregator
->initial();
};
my
$combine_sub
=
sub
{
my
$cookie
=
shift
;
my
$record
=
shift
;
if
(
$match_snippet
->evaluate_as(
'SCALAR'
, {
'$r'
=>
$record
}))
{
$cookie
=
$aggregator
->combine(
$cookie
,
$record
);
}
return
$cookie
;
};
my
$squish_sub
=
sub
{
my
$cookie
=
shift
;
return
$aggregator
->squish(
$cookie
);
};
return
App::RecordStream::Aggregator::InjectInto::Subrefs->new(
$initial_sub
,
$combine_sub
,
$squish_sub
);
}
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_subset_agg
,
'subset_aggregator'
,
'SNIPPET'
,
'AGGREGATOR'
);
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_subset_agg
,
'subset_agg'
,
'SNIPPET'
,
'AGGREGATOR'
);
sub
_xform_agg
{
my
$aggregator
=
shift
;
my
$snippet
=
shift
;
my
$initial_sub
=
sub
{
return
$aggregator
->initial();
};
my
$combine_sub
=
sub
{
my
$cookie
=
shift
;
my
$record
=
shift
;
return
$aggregator
->combine(
$cookie
,
$record
);
};
my
$squish_sub
=
sub
{
my
$cookie
=
shift
;
my
$result
=
$aggregator
->squish(
$cookie
);
return
$snippet
->evaluate_as(
'SCALAR'
, {
'$r'
=>
$result
});
};
return
App::RecordStream::Aggregator::InjectInto::Subrefs->new(
$initial_sub
,
$combine_sub
,
$squish_sub
);
}
App::RecordStream::DomainLanguage::Registry::register_fn(\
&_xform_agg
,
'xform'
,
'AGGREGATOR'
,
'SNIPPET'
);
1;