You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bulk-processor/lib/NGCP/BulkProcessor/ServiceProxy.pm

506 lines
16 KiB

package NGCP::BulkProcessor::ServiceProxy;
use strict;
## no critic
use threads qw(yield);
use threads::shared; # qw(shared_clone);
use Thread::Queue;
use Time::HiRes qw(sleep);
use NGCP::BulkProcessor::Globals qw(
@jobservers
$jobnamespace
);
use NGCP::BulkProcessor::Logging qw(
getlogger
servicedebug
serviceinfo
);
use NGCP::BulkProcessor::LogError qw(
serviceerror
servicewarn
notimplementederror
);
use NGCP::BulkProcessor::Utils qw(threadid);
use NGCP::BulkProcessor::Serialization qw(serialize deserialize);
use Encode qw(encode_utf8);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(new_async_do new_do);
#my $logger = getlogger(__PACKAGE__);
use Gearman::Client;
#sub RECEIVE_EXCEPTIONS {
# print "RECEIVE_EXCEPTIONS";
# return 1;
#}
use Gearman::Task;
my $timeout_secs_default = 0;
#my $try_timeout_secs_default = 0;
my $retry_count_default = 0;
my $high_priority_default = 0;
my $block_destroy_default = 1;
my $poll_interval_secs = 0.1;
my $instance_count = 0;
sub new {
my $class = shift;
my $self = bless {}, $class;
my ($serialization_format,$timeout_secs,$block_destroy) = @_;
$self->{serialization_format} = $serialization_format;
$self->{client} = undef;
$self->{timeout_secs} = ((defined $timeout_secs) ? $timeout_secs : $timeout_secs_default);
#$self->{try_timeout_secs} = $try_timeout_secs_default;
$self->{retry_count} = $retry_count_default;
$self->{high_priority} = $high_priority_default;
$self->{block_destroy} = ((defined $block_destroy) ? $block_destroy : $block_destroy_default);
$self->{arg} = undef;
#my $ret = undef;
$self->{ret} = undef; # = share($ret);
$self->{function} = undef;
#my $exception = undef;
$self->{exception} = undef; #share($exception);
$self->{on_error} = undef;
$self->{on_complete} = undef;
$self->{on_fail} = undef;
$self->{on_status} = undef;
my $async_running = 0;
$self->{async_running_ref} = share($async_running);
$self->{thread} = undef;
$self->{create_tid} = threadid();
$self->{tid} = $self->{create_tid};
$self->{wait_tid} = undef;
$self->{queue} = undef;
$self->{instance} = $instance_count;
$instance_count++;
#$self->{taskset} = undef;
#$self->{task} = undef;
servicedebug($self,'service proxy created, job servers ' . join(',',@jobservers),getlogger(__PACKAGE__));
return $self;
}
sub identifier {
my $self = shift;
return '(' . $self->{instance} . ') ' . (length($self->{function}) > 0 ? '\'' . $self->{function} . '\'' : __PACKAGE__);
}
sub new_async_do {
#my ($function_name,$on_complete,$on_error,@args) = @_;
#my $serialization_format = shift;
#my $timeout_secs = shift;
#my $block_destroy = shift;
my $proxy = __PACKAGE__->new(); #$serialization_format,$timeout_secs,$block_destroy);
if ($proxy->do_async(@_)) {
return $proxy;
}
return undef;
}
sub new_do {
#my ($function_name,$on_complete,$on_error,@args) = @_;
#my $serialization_format = shift;
#my $timeout_secs = shift;
#my $block_destroy = shift;
my $proxy = __PACKAGE__->new(); #$serialization_format,$timeout_secs,$block_destroy);
return $proxy->do(@_);
}
sub do_async {
my $self = shift;
my ($function_name,$on_complete,$on_error,@args) = @_;
if ($self->_check_async_running($on_error,'do_async \'' . $function_name . '\' failed because do_async \'' . $self->{function} . '\' is waiting',1)) {
return 0;
}
$self->{client} = undef;
$self->{function} = $function_name;
$self->{ret} = undef;
$self->{exception} = undef;
#$self->{taskset} = undef;
#$self->{task} = undef;
#$self->{thread} = undef;
#$self->{wait_tid} = undef;
$self->{on_error} = $on_error;
$self->{on_complete} = $on_complete;
$self->{on_fail} = undef;
$self->{on_status} = undef;
my $arg = serialize(\@args,$self->{serialization_format});
$self->{arg} = \$arg;
#if (!defined $self->{queue}) {
$self->{queue} = Thread::Queue->new();
#}
servicedebug($self,'start waiting do_async \'' . $function_name . '\', args length: ' . length(encode_utf8($arg)),getlogger(__PACKAGE__));
$self->{thread} = threads->create(\&_wait_thread,
{ proxy => $self,
#logger => $logger,
}
);
#$self->{wait_tid} = $self->{thread}->tid();
#$self->{thread}->detach();
return 1;
}
sub _get_task_opts {
my $self = shift;
return {
on_complete => undef,
on_fail => undef,
on_retry => undef,
on_status => undef,
on_exception => undef,
retry_count => $self->{retry_count},
high_priority => $self->{high_priority},
#timeout => $self->{timeout_secs}
};
}
sub _wait_thread {
my $context = shift;
#my $tid = threadid();
#${$context->{proxy}->{tid_ref}} = $tid;
my $proxy = $context->{proxy};
#$proxy->{create_tid} = undef;
$proxy->{wait_tid} = threadid();
$proxy->{tid} = $proxy->{wait_tid};
servicedebug($proxy,'wait thread tid ' . $proxy->{tid} . ' started',getlogger(__PACKAGE__));
my $async_running_ref = $proxy->{async_running_ref};
my $task_opts = $proxy->_get_task_opts();
$task_opts->{on_complete} = sub { $proxy->_on_complete(@_); };
$task_opts->{on_fail} = sub { $proxy->_on_fail(@_); };
$task_opts->{on_retry} = sub { $proxy->_on_retry(@_); };
$task_opts->{on_status} = sub { $proxy->_on_status(@_); };
$task_opts->{on_exception} = sub { $proxy->_on_exception(@_); };
$proxy->{client} = Gearman::Client->new(( job_servers => \@jobservers,
prefix => $jobnamespace,
exceptions => 1));
my $task = Gearman::Task->new($proxy->{function}, $proxy->{arg}, $task_opts);
if ($proxy->{timeout_secs} > 0) {
$task->timeout($proxy->{timeout_secs});
}
#$proxy->{task} = $task;
my $task_set = $proxy->{client}->new_task_set();
#$proxy->{taskset} = $task_set;
$task_set->add_task($task);
local $SIG{'KILL'} = sub {
servicedebug($proxy,'kill signal received, exiting wait thread tid ' . $proxy->{tid} . ' ...',getlogger(__PACKAGE__));
#{
# lock $async_running_ref;
# $$async_running_ref = 0;
#}
threads->exit();
};
servicedebug($proxy,'start waiting (do_async) ...',getlogger(__PACKAGE__));
$task_set->wait(timeout => $task->timeout);
#return wantarray ? @{$self->{ret}} : $self->{ret}->[0];
{
lock $async_running_ref;
$$async_running_ref = 0;
}
servicedebug($proxy,'shutting down wait thread tid ' . $proxy->{tid} . ' ...',getlogger(__PACKAGE__));
#threads->exit();
}
sub do {
my $self = shift;
my ($function_name,$on_error,@args) = @_;
if ($self->_check_async_running($on_error,'do \'' . $function_name . '\' failed because do_async \'' . $self->{function} . '\' is waiting',0)) {
return undef;
}
$self->{function} = $function_name;
$self->{ret} = undef;
$self->{exception} = undef;
#$self->{taskset} = undef;
#$self->{task} = undef;
#$self->{thread} = undef;
#$self->{wait_tid} = undef;
#$self->{queue} = undef;
$self->{on_error} = $on_error;
$self->{on_complete} = undef;
$self->{on_fail} = undef;
$self->{on_status} = undef;
my $arg = serialize(\@args,$self->{serialization_format});
$self->{arg} = \$arg;
my $task_opts = $self->_get_task_opts();
$task_opts->{on_complete} = sub { $self->_on_complete(@_); };
$task_opts->{on_fail} = sub { $self->_on_fail(@_); };
$task_opts->{on_retry} = sub { $self->_on_retry(@_); };
$task_opts->{on_status} = sub { $self->_on_status(@_); };
$task_opts->{on_exception} = sub { $self->_on_exception(@_); };
$self->{client} = Gearman::Client->new(( job_servers => \@jobservers,
prefix => $jobnamespace,
exceptions => 1));
my $task = Gearman::Task->new($function_name, \$arg, $task_opts);
#$self->{task} = $task;
if ($self->{timeout_secs} > 0) {
$task->timeout($self->{timeout_secs});
}
my $task_set = $self->{client}->new_task_set();
#$self->{taskset} = $task_set;
$task_set->add_task($task);
servicedebug($self,'start waiting do \'' . $function_name . '\', args length: ' . length(encode_utf8($arg)),getlogger(__PACKAGE__));
$task_set->wait(timeout => $task->timeout);
return wantarray ? @{$self->{ret}} : $self->{ret}->[0];
}
sub _enqueue_event {
my $self = shift;
my ($event,$args) = @_;
my $packet = {event => $event,
args => $args};
$self->{queue}->enqueue($packet);
servicedebug($self,'event ' . $event . ' enqueued, ' . $self->{queue}->pending() . ' event(s) pending',getlogger(__PACKAGE__));
}
sub _on_complete {
my $self = shift;
my $result_ref = shift;
if ($self->_is_wait_thread()) {
$self->_enqueue_event('_on_complete',[$result_ref]);
} elsif ($self->_is_create_thread()) {
my $result = $$result_ref;
$self->{ret} = deserialize($result,$self->{serialization_format});
servicedebug($self,'on_complete event received, result length: ' . length(encode_utf8($result)),getlogger(__PACKAGE__));
if (defined $self->{on_complete} and ref $self->{on_complete} eq 'CODE') {
&{$self->{on_complete}}(@{$self->{ret}});
}
}
}
sub _on_fail {
my $self = shift;
if ($self->_is_wait_thread()) {
$self->_enqueue_event('_on_fail');
} elsif ($self->_is_create_thread()) {
servicedebug($self,'on_fail event received',getlogger(__PACKAGE__));
if (defined $self->{on_fail} and ref $self->{on_fail} eq 'CODE') {
&{$self->{on_fail}}();
}
}
}
sub _on_retry {
my $self = shift;
if ($self->_is_wait_thread()) {
$self->_enqueue_event('_on_retry');
} elsif ($self->_is_create_thread()) {
servicedebug($self,'on_retry event received',getlogger(__PACKAGE__));
if (defined $self->{on_retry} and ref $self->{on_retry} eq 'CODE') {
&{$self->{on_retry}}();
}
}
}
sub _on_status {
my $self = shift;
my ($numerator, $denominator) = @_;
if ($self->_is_wait_thread()) {
$self->_enqueue_event('_on_status',[$numerator, $denominator]);
} elsif ($self->_is_create_thread()) {
servicedebug($self,'on_status event received: ' . $numerator . '/' . $denominator,getlogger(__PACKAGE__));
if (defined $self->{on_status} and ref $self->{on_status} eq 'CODE') {
&{$self->{on_status}}($numerator, $denominator);
}
}
}
sub _on_exception {
my $self = shift;
my $exception = shift;
$self->{exception} = $exception;
if ($self->_is_wait_thread()) {
$self->_enqueue_event('_on_exception',[$exception]);
#${$self->{async_running_ref}} = 0;
} elsif ($self->_is_create_thread()) {
if (defined $self->{on_error} and ref $self->{on_error} eq 'CODE') {
servicedebug($self,'on_exception event received: ' . $exception,getlogger(__PACKAGE__));
&{$self->{on_error}}($exception);
} else {
servicewarn($self,'on_exception event received: ' . $exception,getlogger(__PACKAGE__));
}
}
}
sub _check_async_running {
my $self = shift;
my ($on_error,$message,$async_running) = @_;
if ($self->_is_create_thread()) {
my $async_running_ref = $self->{async_running_ref};
lock $async_running_ref;
if ($$async_running_ref) {
if (defined $on_error and ref $on_error eq 'CODE') {
servicedebug($self,$message,getlogger(__PACKAGE__));
&$on_error($message);
} elsif (length($message) > 0) {
servicewarn($self,$message,getlogger(__PACKAGE__));
}
return 1;
} elsif ($async_running) {
$$async_running_ref = 1;
}
#} else {
# servicewarn($self,$message,getlogger(__PACKAGE__));
}
return 0;
}
sub _get_stop_wait_thread {
my $self = shift;
my $timeout_secs = shift;
my $async_running;
{
my $async_running_ref = $self->{async_running_ref};
lock $async_running_ref;
$async_running = $$async_running_ref;
}
if ((not $async_running and $self->{queue}->pending() == 0) or (defined $timeout_secs and $timeout_secs <= 0)) {
servicedebug($self,'stop waiting now (' .
($async_running ? 'wait thread running' : 'wait thread not running') .', '.
$self->{queue}->pending() . ' event(s) queued, ' .
((defined $timeout_secs) ? 'timeout in ' . sprintf('%.1f',$timeout_secs) . 'secs' : 'no timeout') . ')'
,getlogger(__PACKAGE__));
return 1;
}
return 0;
}
sub wait {
my $self = shift;
my $timeout_secs = shift;
if ($self->_is_create_thread()) { #$self->_check_async_running()) {
while (not $self->_get_stop_wait_thread($timeout_secs)) {
my $packet = $self->{queue}->dequeue_nb();
if (defined $packet) {
my $event = $packet->{event};
servicedebug($self,'event ' . $event . ' dequeued, ' . $self->{queue}->pending() . ' event(s) pending',getlogger(__PACKAGE__));
$self->$event(@{$packet->{args}});
yield();
} else {
if (defined $timeout_secs) {
$timeout_secs -= $poll_interval_secs;
}
sleep $poll_interval_secs;
}
}
my $killtread = 0;
{
my $async_running_ref = $self->{async_running_ref};
lock $async_running_ref;
$killtread = ($$async_running_ref and (defined $timeout_secs and $timeout_secs <= 0));
if ($killtread) {
servicedebug($self,'wait timeout exceeded (' . sprintf('%.1f',$timeout_secs) . '), killing wait thread ...',getlogger(__PACKAGE__));
$self->{thread}->kill('KILL')->detach();
$$async_running_ref = 0;
}
}
if (not $killtread) {
$self->{thread}->join();
servicedebug($self,'wait thread joined',getlogger(__PACKAGE__));
}
##if ($self->{thread}) {
# if ($killtread) {
# servicedebug($self,'killing thread XX',getlogger(__PACKAGE__));
# $self->{thread}->kill('KILL')->detach();
# } else {
# $self->{thread}->join();
# servicedebug($self,'thread joined',getlogger(__PACKAGE__));
# }
##}
$self->{queue} = undef;
$self->{thread} = undef;
$self->{wait_tid} = undef;
#if ($killtread) {
# #servicedebug($self,'killing thread XX',getlogger(__PACKAGE__));
# #$self->{thread}->kill('KILL')->detach();
#} else {
# $self->{thread}->join();
# servicedebug($self,'thread joined',getlogger(__PACKAGE__));
#}
#$self->{queue} = undef;
#$self->{thread} = undef;
##$self->{wait_tid} = undef;
#}
#} else {
# print "IGNORE WAIT??????????\n";
}
}
sub DESTROY {
my $self = shift;
if ($self->_is_create_thread()) {
servicedebug($self,'destroying proxy ...',getlogger(__PACKAGE__));
if ($self->{block_destroy}) {
$self->wait($self->{timeout_secs} > 0 ? $self->{timeout_secs} : undef);
} else {
$self->_check_async_running(undef,'do_async \'' . $self->{function} . '\' is still waiting');
}
servicedebug($self,'proxy destroyed',getlogger(__PACKAGE__));
}
}
sub _is_wait_thread {
my $self = shift;
return (defined $self->{wait_tid} and $self->{wait_tid} == threadid());
}
sub _is_create_thread {
my $self = shift;
return $self->{create_tid} == threadid();
}
1;