You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
506 lines
16 KiB
506 lines
16 KiB
package NGCP::BulkProcessor::ServiceProxy;
|
|
use strict;
|
|
|
|
## no critic
|
|
|
|
use threads qw(yield);
|
|
use threads::shared; # qw(shared_clone);
|
|
use Thread::Queue;
|
|
|
|
use Time::HiRes qw(sleep);
|
|
|
|
use NGCP::BulkProcessor::Globals qw(
|
|
@jobservers
|
|
$jobnamespace
|
|
);
|
|
|
|
use NGCP::BulkProcessor::Logging qw(
|
|
getlogger
|
|
servicedebug
|
|
serviceinfo
|
|
);
|
|
use NGCP::BulkProcessor::LogError qw(
|
|
serviceerror
|
|
servicewarn
|
|
notimplementederror
|
|
);
|
|
|
|
use NGCP::BulkProcessor::Utils qw(threadid);
|
|
use NGCP::BulkProcessor::Serialization qw(serialize deserialize);
|
|
use Encode qw(encode_utf8);
|
|
|
|
require Exporter;
|
|
our @ISA = qw(Exporter);
|
|
our @EXPORT_OK = qw(new_async_do new_do);
|
|
|
|
#my $logger = getlogger(__PACKAGE__);
|
|
|
|
use Gearman::Client;
|
|
#sub RECEIVE_EXCEPTIONS {
|
|
# print "RECEIVE_EXCEPTIONS";
|
|
# return 1;
|
|
#}
|
|
use Gearman::Task;
|
|
|
|
my $timeout_secs_default = 0;
|
|
#my $try_timeout_secs_default = 0;
|
|
my $retry_count_default = 0;
|
|
my $high_priority_default = 0;
|
|
|
|
my $block_destroy_default = 1;
|
|
|
|
my $poll_interval_secs = 0.1;
|
|
|
|
my $instance_count = 0;
|
|
|
|
sub new {
|
|
|
|
my $class = shift;
|
|
my $self = bless {}, $class;
|
|
my ($serialization_format,$timeout_secs,$block_destroy) = @_;
|
|
|
|
$self->{serialization_format} = $serialization_format;
|
|
$self->{client} = undef;
|
|
|
|
$self->{timeout_secs} = ((defined $timeout_secs) ? $timeout_secs : $timeout_secs_default);
|
|
#$self->{try_timeout_secs} = $try_timeout_secs_default;
|
|
$self->{retry_count} = $retry_count_default;
|
|
$self->{high_priority} = $high_priority_default;
|
|
|
|
$self->{block_destroy} = ((defined $block_destroy) ? $block_destroy : $block_destroy_default);
|
|
|
|
$self->{arg} = undef;
|
|
#my $ret = undef;
|
|
$self->{ret} = undef; # = share($ret);
|
|
$self->{function} = undef;
|
|
#my $exception = undef;
|
|
$self->{exception} = undef; #share($exception);
|
|
$self->{on_error} = undef;
|
|
|
|
$self->{on_complete} = undef;
|
|
$self->{on_fail} = undef;
|
|
$self->{on_status} = undef;
|
|
|
|
my $async_running = 0;
|
|
$self->{async_running_ref} = share($async_running);
|
|
$self->{thread} = undef;
|
|
$self->{create_tid} = threadid();
|
|
$self->{tid} = $self->{create_tid};
|
|
$self->{wait_tid} = undef;
|
|
$self->{queue} = undef;
|
|
|
|
$self->{instance} = $instance_count;
|
|
$instance_count++;
|
|
|
|
#$self->{taskset} = undef;
|
|
#$self->{task} = undef;
|
|
|
|
servicedebug($self,'service proxy created, job servers ' . join(',',@jobservers),getlogger(__PACKAGE__));
|
|
|
|
return $self;
|
|
|
|
}
|
|
|
|
sub identifier {
|
|
my $self = shift;
|
|
return '(' . $self->{instance} . ') ' . (length($self->{function}) > 0 ? '\'' . $self->{function} . '\'' : __PACKAGE__);
|
|
}
|
|
|
|
sub new_async_do {
|
|
#my ($function_name,$on_complete,$on_error,@args) = @_;
|
|
#my $serialization_format = shift;
|
|
#my $timeout_secs = shift;
|
|
#my $block_destroy = shift;
|
|
my $proxy = __PACKAGE__->new(); #$serialization_format,$timeout_secs,$block_destroy);
|
|
if ($proxy->do_async(@_)) {
|
|
return $proxy;
|
|
}
|
|
return undef;
|
|
}
|
|
sub new_do {
|
|
#my ($function_name,$on_complete,$on_error,@args) = @_;
|
|
#my $serialization_format = shift;
|
|
#my $timeout_secs = shift;
|
|
#my $block_destroy = shift;
|
|
my $proxy = __PACKAGE__->new(); #$serialization_format,$timeout_secs,$block_destroy);
|
|
return $proxy->do(@_);
|
|
}
|
|
|
|
sub do_async {
|
|
my $self = shift;
|
|
my ($function_name,$on_complete,$on_error,@args) = @_;
|
|
|
|
if ($self->_check_async_running($on_error,'do_async \'' . $function_name . '\' failed because do_async \'' . $self->{function} . '\' is waiting',1)) {
|
|
return 0;
|
|
}
|
|
|
|
$self->{client} = undef;
|
|
|
|
$self->{function} = $function_name;
|
|
$self->{ret} = undef;
|
|
$self->{exception} = undef;
|
|
|
|
#$self->{taskset} = undef;
|
|
#$self->{task} = undef;
|
|
#$self->{thread} = undef;
|
|
#$self->{wait_tid} = undef;
|
|
|
|
$self->{on_error} = $on_error;
|
|
$self->{on_complete} = $on_complete;
|
|
$self->{on_fail} = undef;
|
|
$self->{on_status} = undef;
|
|
|
|
my $arg = serialize(\@args,$self->{serialization_format});
|
|
$self->{arg} = \$arg;
|
|
|
|
#if (!defined $self->{queue}) {
|
|
$self->{queue} = Thread::Queue->new();
|
|
#}
|
|
|
|
servicedebug($self,'start waiting do_async \'' . $function_name . '\', args length: ' . length(encode_utf8($arg)),getlogger(__PACKAGE__));
|
|
$self->{thread} = threads->create(\&_wait_thread,
|
|
|
|
{ proxy => $self,
|
|
#logger => $logger,
|
|
}
|
|
|
|
);
|
|
#$self->{wait_tid} = $self->{thread}->tid();
|
|
#$self->{thread}->detach();
|
|
|
|
return 1;
|
|
}
|
|
|
|
sub _get_task_opts {
|
|
my $self = shift;
|
|
return {
|
|
on_complete => undef,
|
|
on_fail => undef,
|
|
on_retry => undef,
|
|
on_status => undef,
|
|
on_exception => undef,
|
|
retry_count => $self->{retry_count},
|
|
high_priority => $self->{high_priority},
|
|
#timeout => $self->{timeout_secs}
|
|
};
|
|
}
|
|
|
|
sub _wait_thread {
|
|
|
|
my $context = shift;
|
|
#my $tid = threadid();
|
|
#${$context->{proxy}->{tid_ref}} = $tid;
|
|
my $proxy = $context->{proxy};
|
|
#$proxy->{create_tid} = undef;
|
|
$proxy->{wait_tid} = threadid();
|
|
$proxy->{tid} = $proxy->{wait_tid};
|
|
servicedebug($proxy,'wait thread tid ' . $proxy->{tid} . ' started',getlogger(__PACKAGE__));
|
|
my $async_running_ref = $proxy->{async_running_ref};
|
|
|
|
my $task_opts = $proxy->_get_task_opts();
|
|
$task_opts->{on_complete} = sub { $proxy->_on_complete(@_); };
|
|
$task_opts->{on_fail} = sub { $proxy->_on_fail(@_); };
|
|
$task_opts->{on_retry} = sub { $proxy->_on_retry(@_); };
|
|
$task_opts->{on_status} = sub { $proxy->_on_status(@_); };
|
|
$task_opts->{on_exception} = sub { $proxy->_on_exception(@_); };
|
|
|
|
$proxy->{client} = Gearman::Client->new(( job_servers => \@jobservers,
|
|
prefix => $jobnamespace,
|
|
exceptions => 1));
|
|
|
|
my $task = Gearman::Task->new($proxy->{function}, $proxy->{arg}, $task_opts);
|
|
if ($proxy->{timeout_secs} > 0) {
|
|
$task->timeout($proxy->{timeout_secs});
|
|
}
|
|
#$proxy->{task} = $task;
|
|
|
|
my $task_set = $proxy->{client}->new_task_set();
|
|
#$proxy->{taskset} = $task_set;
|
|
$task_set->add_task($task);
|
|
|
|
local $SIG{'KILL'} = sub {
|
|
servicedebug($proxy,'kill signal received, exiting wait thread tid ' . $proxy->{tid} . ' ...',getlogger(__PACKAGE__));
|
|
#{
|
|
# lock $async_running_ref;
|
|
# $$async_running_ref = 0;
|
|
#}
|
|
threads->exit();
|
|
|
|
};
|
|
|
|
servicedebug($proxy,'start waiting (do_async) ...',getlogger(__PACKAGE__));
|
|
$task_set->wait(timeout => $task->timeout);
|
|
#return wantarray ? @{$self->{ret}} : $self->{ret}->[0];
|
|
{
|
|
lock $async_running_ref;
|
|
$$async_running_ref = 0;
|
|
}
|
|
|
|
servicedebug($proxy,'shutting down wait thread tid ' . $proxy->{tid} . ' ...',getlogger(__PACKAGE__));
|
|
#threads->exit();
|
|
}
|
|
|
|
sub do {
|
|
my $self = shift;
|
|
my ($function_name,$on_error,@args) = @_;
|
|
|
|
if ($self->_check_async_running($on_error,'do \'' . $function_name . '\' failed because do_async \'' . $self->{function} . '\' is waiting',0)) {
|
|
return undef;
|
|
}
|
|
|
|
$self->{function} = $function_name;
|
|
$self->{ret} = undef;
|
|
$self->{exception} = undef;
|
|
|
|
#$self->{taskset} = undef;
|
|
#$self->{task} = undef;
|
|
#$self->{thread} = undef;
|
|
#$self->{wait_tid} = undef;
|
|
#$self->{queue} = undef;
|
|
|
|
$self->{on_error} = $on_error;
|
|
$self->{on_complete} = undef;
|
|
$self->{on_fail} = undef;
|
|
$self->{on_status} = undef;
|
|
|
|
my $arg = serialize(\@args,$self->{serialization_format});
|
|
$self->{arg} = \$arg;
|
|
|
|
my $task_opts = $self->_get_task_opts();
|
|
$task_opts->{on_complete} = sub { $self->_on_complete(@_); };
|
|
$task_opts->{on_fail} = sub { $self->_on_fail(@_); };
|
|
$task_opts->{on_retry} = sub { $self->_on_retry(@_); };
|
|
$task_opts->{on_status} = sub { $self->_on_status(@_); };
|
|
$task_opts->{on_exception} = sub { $self->_on_exception(@_); };
|
|
|
|
$self->{client} = Gearman::Client->new(( job_servers => \@jobservers,
|
|
prefix => $jobnamespace,
|
|
exceptions => 1));
|
|
|
|
my $task = Gearman::Task->new($function_name, \$arg, $task_opts);
|
|
#$self->{task} = $task;
|
|
if ($self->{timeout_secs} > 0) {
|
|
$task->timeout($self->{timeout_secs});
|
|
}
|
|
|
|
my $task_set = $self->{client}->new_task_set();
|
|
#$self->{taskset} = $task_set;
|
|
$task_set->add_task($task);
|
|
|
|
servicedebug($self,'start waiting do \'' . $function_name . '\', args length: ' . length(encode_utf8($arg)),getlogger(__PACKAGE__));
|
|
$task_set->wait(timeout => $task->timeout);
|
|
return wantarray ? @{$self->{ret}} : $self->{ret}->[0];
|
|
|
|
}
|
|
|
|
sub _enqueue_event {
|
|
my $self = shift;
|
|
my ($event,$args) = @_;
|
|
my $packet = {event => $event,
|
|
args => $args};
|
|
$self->{queue}->enqueue($packet);
|
|
servicedebug($self,'event ' . $event . ' enqueued, ' . $self->{queue}->pending() . ' event(s) pending',getlogger(__PACKAGE__));
|
|
}
|
|
|
|
sub _on_complete {
|
|
my $self = shift;
|
|
my $result_ref = shift;
|
|
if ($self->_is_wait_thread()) {
|
|
$self->_enqueue_event('_on_complete',[$result_ref]);
|
|
} elsif ($self->_is_create_thread()) {
|
|
my $result = $$result_ref;
|
|
$self->{ret} = deserialize($result,$self->{serialization_format});
|
|
servicedebug($self,'on_complete event received, result length: ' . length(encode_utf8($result)),getlogger(__PACKAGE__));
|
|
if (defined $self->{on_complete} and ref $self->{on_complete} eq 'CODE') {
|
|
&{$self->{on_complete}}(@{$self->{ret}});
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
|
|
sub _on_fail {
|
|
my $self = shift;
|
|
if ($self->_is_wait_thread()) {
|
|
$self->_enqueue_event('_on_fail');
|
|
} elsif ($self->_is_create_thread()) {
|
|
servicedebug($self,'on_fail event received',getlogger(__PACKAGE__));
|
|
if (defined $self->{on_fail} and ref $self->{on_fail} eq 'CODE') {
|
|
&{$self->{on_fail}}();
|
|
}
|
|
}
|
|
}
|
|
|
|
sub _on_retry {
|
|
my $self = shift;
|
|
if ($self->_is_wait_thread()) {
|
|
$self->_enqueue_event('_on_retry');
|
|
} elsif ($self->_is_create_thread()) {
|
|
servicedebug($self,'on_retry event received',getlogger(__PACKAGE__));
|
|
if (defined $self->{on_retry} and ref $self->{on_retry} eq 'CODE') {
|
|
&{$self->{on_retry}}();
|
|
}
|
|
}
|
|
}
|
|
|
|
sub _on_status {
|
|
my $self = shift;
|
|
my ($numerator, $denominator) = @_;
|
|
if ($self->_is_wait_thread()) {
|
|
$self->_enqueue_event('_on_status',[$numerator, $denominator]);
|
|
} elsif ($self->_is_create_thread()) {
|
|
servicedebug($self,'on_status event received: ' . $numerator . '/' . $denominator,getlogger(__PACKAGE__));
|
|
if (defined $self->{on_status} and ref $self->{on_status} eq 'CODE') {
|
|
&{$self->{on_status}}($numerator, $denominator);
|
|
}
|
|
}
|
|
}
|
|
|
|
sub _on_exception {
|
|
my $self = shift;
|
|
my $exception = shift;
|
|
$self->{exception} = $exception;
|
|
if ($self->_is_wait_thread()) {
|
|
$self->_enqueue_event('_on_exception',[$exception]);
|
|
#${$self->{async_running_ref}} = 0;
|
|
} elsif ($self->_is_create_thread()) {
|
|
if (defined $self->{on_error} and ref $self->{on_error} eq 'CODE') {
|
|
servicedebug($self,'on_exception event received: ' . $exception,getlogger(__PACKAGE__));
|
|
&{$self->{on_error}}($exception);
|
|
} else {
|
|
servicewarn($self,'on_exception event received: ' . $exception,getlogger(__PACKAGE__));
|
|
}
|
|
}
|
|
}
|
|
|
|
sub _check_async_running {
|
|
my $self = shift;
|
|
my ($on_error,$message,$async_running) = @_;
|
|
if ($self->_is_create_thread()) {
|
|
my $async_running_ref = $self->{async_running_ref};
|
|
lock $async_running_ref;
|
|
if ($$async_running_ref) {
|
|
if (defined $on_error and ref $on_error eq 'CODE') {
|
|
servicedebug($self,$message,getlogger(__PACKAGE__));
|
|
&$on_error($message);
|
|
} elsif (length($message) > 0) {
|
|
servicewarn($self,$message,getlogger(__PACKAGE__));
|
|
}
|
|
return 1;
|
|
} elsif ($async_running) {
|
|
$$async_running_ref = 1;
|
|
}
|
|
#} else {
|
|
# servicewarn($self,$message,getlogger(__PACKAGE__));
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
sub _get_stop_wait_thread {
|
|
my $self = shift;
|
|
my $timeout_secs = shift;
|
|
my $async_running;
|
|
{
|
|
my $async_running_ref = $self->{async_running_ref};
|
|
lock $async_running_ref;
|
|
$async_running = $$async_running_ref;
|
|
}
|
|
if ((not $async_running and $self->{queue}->pending() == 0) or (defined $timeout_secs and $timeout_secs <= 0)) {
|
|
servicedebug($self,'stop waiting now (' .
|
|
($async_running ? 'wait thread running' : 'wait thread not running') .', '.
|
|
$self->{queue}->pending() . ' event(s) queued, ' .
|
|
((defined $timeout_secs) ? 'timeout in ' . sprintf('%.1f',$timeout_secs) . 'secs' : 'no timeout') . ')'
|
|
,getlogger(__PACKAGE__));
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
sub wait {
|
|
my $self = shift;
|
|
my $timeout_secs = shift;
|
|
if ($self->_is_create_thread()) { #$self->_check_async_running()) {
|
|
while (not $self->_get_stop_wait_thread($timeout_secs)) {
|
|
my $packet = $self->{queue}->dequeue_nb();
|
|
if (defined $packet) {
|
|
my $event = $packet->{event};
|
|
servicedebug($self,'event ' . $event . ' dequeued, ' . $self->{queue}->pending() . ' event(s) pending',getlogger(__PACKAGE__));
|
|
$self->$event(@{$packet->{args}});
|
|
yield();
|
|
} else {
|
|
if (defined $timeout_secs) {
|
|
$timeout_secs -= $poll_interval_secs;
|
|
}
|
|
sleep $poll_interval_secs;
|
|
}
|
|
}
|
|
my $killtread = 0;
|
|
{
|
|
my $async_running_ref = $self->{async_running_ref};
|
|
lock $async_running_ref;
|
|
$killtread = ($$async_running_ref and (defined $timeout_secs and $timeout_secs <= 0));
|
|
if ($killtread) {
|
|
servicedebug($self,'wait timeout exceeded (' . sprintf('%.1f',$timeout_secs) . '), killing wait thread ...',getlogger(__PACKAGE__));
|
|
$self->{thread}->kill('KILL')->detach();
|
|
$$async_running_ref = 0;
|
|
}
|
|
}
|
|
if (not $killtread) {
|
|
$self->{thread}->join();
|
|
servicedebug($self,'wait thread joined',getlogger(__PACKAGE__));
|
|
}
|
|
##if ($self->{thread}) {
|
|
# if ($killtread) {
|
|
# servicedebug($self,'killing thread XX',getlogger(__PACKAGE__));
|
|
# $self->{thread}->kill('KILL')->detach();
|
|
# } else {
|
|
# $self->{thread}->join();
|
|
# servicedebug($self,'thread joined',getlogger(__PACKAGE__));
|
|
# }
|
|
##}
|
|
$self->{queue} = undef;
|
|
$self->{thread} = undef;
|
|
$self->{wait_tid} = undef;
|
|
|
|
#if ($killtread) {
|
|
# #servicedebug($self,'killing thread XX',getlogger(__PACKAGE__));
|
|
# #$self->{thread}->kill('KILL')->detach();
|
|
#} else {
|
|
# $self->{thread}->join();
|
|
# servicedebug($self,'thread joined',getlogger(__PACKAGE__));
|
|
#}
|
|
#$self->{queue} = undef;
|
|
#$self->{thread} = undef;
|
|
##$self->{wait_tid} = undef;
|
|
#}
|
|
#} else {
|
|
# print "IGNORE WAIT??????????\n";
|
|
}
|
|
}
|
|
|
|
sub DESTROY {
|
|
|
|
my $self = shift;
|
|
if ($self->_is_create_thread()) {
|
|
servicedebug($self,'destroying proxy ...',getlogger(__PACKAGE__));
|
|
if ($self->{block_destroy}) {
|
|
$self->wait($self->{timeout_secs} > 0 ? $self->{timeout_secs} : undef);
|
|
} else {
|
|
$self->_check_async_running(undef,'do_async \'' . $self->{function} . '\' is still waiting');
|
|
}
|
|
servicedebug($self,'proxy destroyed',getlogger(__PACKAGE__));
|
|
}
|
|
}
|
|
|
|
sub _is_wait_thread {
|
|
my $self = shift;
|
|
return (defined $self->{wait_tid} and $self->{wait_tid} == threadid());
|
|
}
|
|
|
|
sub _is_create_thread {
|
|
my $self = shift;
|
|
return $self->{create_tid} == threadid();
|
|
}
|
|
|
|
1;
|