MT#63182 /api/subscribers add lockwait and deadlock detection

* API::check_deadlock() add lockwait detection.
* lockwait and deadlock detection is now in place for ALL
  /api/subscribers endpoint methods. because of logic complexity
  it uses own deadlock detection instead of relying on Entities.
  It however uses $self->check_deadlock() and similar logic.

Change-Id: Idc1876910711563fe14c878541e350ad373d6d31
(cherry picked from commit 3bb3d9e4e4)
(cherry picked from commit 09d011af45)
mr13.4.1
Kirill Solomko 4 months ago
parent 5c0e6cc7d1
commit 124c343049

@ -315,57 +315,69 @@ sub GET :Allow {
my $rows = $c->request->params->{rows} // 10;
my $schema = $c->model('DB');
$schema->set_transaction_isolation('READ COMMITTED');
my $guard = $schema->txn_scope_guard;
{
my $subscribers_rs = $self->item_rs($c);
(my $total_count, $subscribers_rs, my $subscribers_rows) = $self->paginate_order_collection($c, $subscribers_rs);
my $subscribers = NGCP::Panel::Utils::Contract::acquire_contract_rowlocks(
c => $c,
rs => $subscribers_rs,
contract_id_field => 'contract_id',
skip_locked => ($c->request->header('X-Delay-Commit') ? 0 : 1),
);
my $now = NGCP::Panel::Utils::DateTime::current_local;
my (@embedded, @links, %contract_map);
my ($form) = $self->get_form($c);
$self->expand_prepare_collection($c);
for my $subscriber (@$subscribers) {
my $contract = $subscriber->contract;
NGCP::Panel::Utils::ProfilePackages::get_contract_balance(c => $c,
contract => $contract,
now => $now) if !exists $contract_map{$contract->id}; #apply underrun lock level
$contract_map{$contract->id} = 1;
my $resource = $self->resource_from_item($c, $subscriber, $form);
push @embedded, $self->hal_from_item($c, $subscriber, $resource, $form);
push @links, Data::HAL::Link->new(
relation => 'ngcp:'.$self->resource_name,
href => sprintf('%s%d', $self->dispatch_path, $subscriber->id),
TX_START:
$c->clear_errors;
try {
my $guard = $schema->txn_scope_guard;
{
my $subscribers_rs = $self->item_rs($c);
(my $total_count, $subscribers_rs, my $subscribers_rows) = $self->paginate_order_collection($c, $subscribers_rs);
my $subscribers = NGCP::Panel::Utils::Contract::acquire_contract_rowlocks(
c => $c,
rs => $subscribers_rs,
contract_id_field => 'contract_id',
skip_locked => ($c->request->header('X-Delay-Commit') ? 0 : 1),
);
}
$self->expand_collection_fields($c, \@embedded);
$self->delay_commit($c,$guard);
push @links,
Data::HAL::Link->new(
relation => 'curies',
href => 'http://purl.org/sipwise/ngcp-api/#rel-{rel}',
name => 'ngcp',
templated => true,
),
Data::HAL::Link->new(relation => 'profile', href => 'http://purl.org/sipwise/ngcp-api/'),
$self->collection_nav_links($c, $page, $rows, $total_count, $c->request->path, $c->request->query_params);
my $now = NGCP::Panel::Utils::DateTime::current_local;
my (@embedded, @links, %contract_map);
my ($form) = $self->get_form($c);
$self->expand_prepare_collection($c);
for my $subscriber (@$subscribers) {
my $contract = $subscriber->contract;
NGCP::Panel::Utils::ProfilePackages::get_contract_balance(c => $c,
contract => $contract,
now => $now) if !exists $contract_map{$contract->id}; #apply underrun lock level
$contract_map{$contract->id} = 1;
my $resource = $self->resource_from_item($c, $subscriber, $form);
push @embedded, $self->hal_from_item($c, $subscriber, $resource, $form);
push @links, Data::HAL::Link->new(
relation => 'ngcp:'.$self->resource_name,
href => sprintf('%s%d', $self->dispatch_path, $subscriber->id),
);
}
$self->expand_collection_fields($c, \@embedded);
$self->delay_commit($c,$guard);
push @links,
Data::HAL::Link->new(
relation => 'curies',
href => 'http://purl.org/sipwise/ngcp-api/#rel-{rel}',
name => 'ngcp',
templated => true,
),
Data::HAL::Link->new(relation => 'profile', href => 'http://purl.org/sipwise/ngcp-api/'),
$self->collection_nav_links($c, $page, $rows, $total_count, $c->request->path, $c->request->query_params);
my $hal = Data::HAL->new(
embedded => [@embedded],
links => [@links],
);
$hal->resource({
total_count => $total_count,
});
my $response = HTTP::Response->new(HTTP_OK, undef,
HTTP::Headers->new($hal->http_headers(skip_links => 1)), $hal->as_json);
$c->response->headers($response->headers);
$c->response->body($response->content);
return;
my $hal = Data::HAL->new(
embedded => [@embedded],
links => [@links],
);
$hal->resource({
total_count => $total_count,
});
my $response = HTTP::Response->new(HTTP_OK, undef,
HTTP::Headers->new($hal->http_headers(skip_links => 1)), $hal->as_json);
$c->response->headers($response->headers);
$c->response->body($response->content);
return;
}
} catch($e) {
if ($self->check_deadlock($c, $e)) {
goto TX_START;
}
unless ($c->has_errors) {
$self->error($c, HTTP_INTERNAL_SERVER_ERROR, 'Internal Server Error', $e);
last;
}
}
return;
}
@ -373,119 +385,133 @@ sub GET :Allow {
sub POST :Allow {
my ($self, $c) = @_;
my $schema = $c->model('DB');
$schema->set_transaction_isolation('READ COMMITTED');
my $guard = $schema->txn_scope_guard;
{
my $resource = $self->get_valid_post_data(
c => $c,
media_type => 'application/json',
);
last unless $resource;
TX_START:
$c->clear_errors;
try {
my $guard = $schema->txn_scope_guard;
{
my $resource = $self->get_valid_post_data(
c => $c,
media_type => 'application/json',
);
last unless $resource;
my $r = $self->prepare_resource($c, $schema, $resource);
last unless($r);
my $subscriber;
my $customer = $r->{customer};
my $alias_numbers = $r->{alias_numbers};
my $preferences = $r->{preferences};
my $groups = $r->{groups};
my $groupmembers = $r->{groupmembers};
$resource = $r->{resource};
my $error_info = { extended => {} };
my $r = $self->prepare_resource($c, $schema, $resource);
last unless($r);
my $subscriber;
my $customer = $r->{customer};
my $alias_numbers = $r->{alias_numbers};
my $preferences = $r->{preferences};
my $groups = $r->{groups};
my $groupmembers = $r->{groupmembers};
$resource = $r->{resource};
my $error_info = { extended => {} };
try {
my ($uuid_bin, $uuid_string);
UUID::generate($uuid_bin);
UUID::unparse($uuid_bin, $uuid_string);
try {
my ($uuid_bin, $uuid_string);
UUID::generate($uuid_bin);
UUID::unparse($uuid_bin, $uuid_string);
my @events_to_create = ();
my $event_context = { events_to_create => \@events_to_create };
$subscriber = NGCP::Panel::Utils::Subscriber::create_subscriber(
c => $c,
schema => $schema,
contract => $r->{customer},
params => $resource,
preferences => $preferences,
admin_default => 0,
event_context => $event_context,
error => $error_info,
);
if($resource->{status} eq 'locked') {
NGCP::Panel::Utils::Subscriber::lock_provisoning_voip_subscriber(
c => $c,
prov_subscriber => $subscriber->provisioning_voip_subscriber,
level => $resource->{lock} || 4,
my @events_to_create = ();
my $event_context = { events_to_create => \@events_to_create };
$subscriber = NGCP::Panel::Utils::Subscriber::create_subscriber(
c => $c,
schema => $schema,
contract => $r->{customer},
params => $resource,
preferences => $preferences,
admin_default => 0,
event_context => $event_context,
error => $error_info,
);
} else {
NGCP::Panel::Utils::Subscriber::lock_provisoning_voip_subscriber(
c => $c,
prov_subscriber => $subscriber->provisioning_voip_subscriber,
level => $resource->{lock} || 0,
) if exists $resource->{lock};
NGCP::Panel::Utils::ProfilePackages::underrun_lock_subscriber(c => $c, subscriber => $subscriber);
}
NGCP::Panel::Utils::Subscriber::update_subscriber_numbers(
c => $c,
schema => $schema,
alias_numbers => $alias_numbers,
reseller_id => $customer->contact->reseller_id,
subscriber_id => $subscriber->id,
);
$subscriber->discard_changes; # reload row because of new number
NGCP::Panel::Utils::Subscriber::manage_pbx_groups(
c => $c,
schema => $schema,
groups => $groups,
groupmembers => $groupmembers,
customer => $customer,
subscriber => $subscriber,
);
NGCP::Panel::Utils::Events::insert_deferred(
c => $c, schema => $schema,
events_to_create => \@events_to_create,
);
} catch(DBIx::Class::Exception $e where { /Duplicate entry '([^']+)' for key ('number_idx'|'webuser_dom_idx')/ }) {
$e =~ /Duplicate entry '([^']+)' for key ('number_idx'|'webuser_dom_idx')/;
my $log_error;
my @http_errors;
if ($2 eq '\'number_idx\'') {
$log_error = "failed to create subscriber, number " . $c->qs($1) . " already exists";
@http_errors = ("Number '" . $1 . "' already exists.", "Number already exists.");
}
elsif ($2 eq '\'webuser_dom_idx\'') {
$log_error = "failed to create subscriber, webusername-domain combination " . $c->qs($1) . " already exists";
@http_errors = ("Webusername-Domain combination '" . $1 . "' already exists.", "Webusername-Domain combination already exists.");
}
$self->error($c, HTTP_UNPROCESSABLE_ENTITY, $http_errors[0], $http_errors[1], $log_error);
last;
} catch($e) {
if (ref $error_info->{extended} eq 'HASH' && $error_info->{extended}->{response_code}) {
$self->error($c,
$error_info->{extended}->{response_code},
$error_info->{extended}->{description},
$error_info->{extended}->{error});
last;
} else {
$self->error($c, HTTP_INTERNAL_SERVER_ERROR, "Failed to create subscriber", $e);
if($resource->{status} eq 'locked') {
NGCP::Panel::Utils::Subscriber::lock_provisoning_voip_subscriber(
c => $c,
prov_subscriber => $subscriber->provisioning_voip_subscriber,
level => $resource->{lock} || 4,
);
} else {
NGCP::Panel::Utils::Subscriber::lock_provisoning_voip_subscriber(
c => $c,
prov_subscriber => $subscriber->provisioning_voip_subscriber,
level => $resource->{lock} || 0,
) if exists $resource->{lock};
NGCP::Panel::Utils::ProfilePackages::underrun_lock_subscriber(c => $c, subscriber => $subscriber);
}
NGCP::Panel::Utils::Subscriber::update_subscriber_numbers(
c => $c,
schema => $schema,
alias_numbers => $alias_numbers,
reseller_id => $customer->contact->reseller_id,
subscriber_id => $subscriber->id,
);
$subscriber->discard_changes; # reload row because of new number
NGCP::Panel::Utils::Subscriber::manage_pbx_groups(
c => $c,
schema => $schema,
groups => $groups,
groupmembers => $groupmembers,
customer => $customer,
subscriber => $subscriber,
);
NGCP::Panel::Utils::Events::insert_deferred(
c => $c, schema => $schema,
events_to_create => \@events_to_create,
);
} catch(DBIx::Class::Exception $e where { /Duplicate entry '([^']+)' for key ('number_idx'|'webuser_dom_idx')/ }) {
$e =~ /Duplicate entry '([^']+)' for key ('number_idx'|'webuser_dom_idx')/;
my $log_error;
my @http_errors;
if ($2 eq '\'number_idx\'') {
$log_error = "failed to create subscriber, number " . $c->qs($1) . " already exists";
@http_errors = ("Number '" . $1 . "' already exists.", "Number already exists.");
}
elsif ($2 eq '\'webuser_dom_idx\'') {
$log_error = "failed to create subscriber, webusername-domain combination " . $c->qs($1) . " already exists";
@http_errors = ("Webusername-Domain combination '" . $1 . "' already exists.", "Webusername-Domain combination already exists.");
}
$self->error($c, HTTP_UNPROCESSABLE_ENTITY, $http_errors[0], $http_errors[1], $log_error);
last;
} catch($e) {
if ($self->check_deadlock($c, $e)) {
goto TX_START;
}
if (ref $error_info->{extended} eq 'HASH' && $error_info->{extended}->{response_code}) {
$self->error($c,
$error_info->{extended}->{response_code},
$error_info->{extended}->{description},
$error_info->{extended}->{error});
last;
} else {
$self->error($c, HTTP_INTERNAL_SERVER_ERROR, "Failed to create subscriber", $e);
last;
}
}
}
last unless $self->add_create_journal_item_hal($c,sub {
my $self = shift;
my ($c) = @_;
my ($_form) = $self->get_form($c);
my $_subscriber = $self->item_by_id($c, $subscriber->id);
my $_resource = $self->resource_from_item($c, $_subscriber, $_form);
return $self->hal_from_item($c,$_subscriber,$_resource,$_form); });
last unless $self->add_create_journal_item_hal($c,sub {
my $self = shift;
my ($c) = @_;
my ($_form) = $self->get_form($c);
my $_subscriber = $self->item_by_id($c, $subscriber->id);
my $_resource = $self->resource_from_item($c, $_subscriber, $_form);
return $self->hal_from_item($c,$_subscriber,$_resource,$_form); });
$guard->commit;
$guard->commit;
$c->response->status(HTTP_CREATED);
$c->response->header(Location => sprintf('%s%d', $self->dispatch_path, $subscriber->id));
$c->response->body(q());
$c->response->status(HTTP_CREATED);
$c->response->header(Location => sprintf('%s%d', $self->dispatch_path, $subscriber->id));
$c->response->body(q());
}
} catch($e) {
if ($self->check_deadlock($c, $e)) {
goto TX_START;
}
unless ($c->has_errors) {
$self->error($c, HTTP_INTERNAL_SERVER_ERROR, 'Internal Server Error', $e);
last;
}
}
return;
}

@ -39,31 +39,43 @@ sub journal_query_params {
sub GET :Allow {
my ($self, $c, $id) = @_;
$c->model('DB')->set_transaction_isolation('READ COMMITTED');
my $guard = $c->model('DB')->txn_scope_guard;
{
last unless $self->valid_id($c, $id);
my $subscriber = $self->item_by_id($c, $id);
last unless $self->resource_exists($c, subscriber => $subscriber);
my $balance = NGCP::Panel::Utils::ProfilePackages::get_contract_balance(c => $c,
contract => $subscriber->contract,
); #apply underrun lock level
my ($form) = $self->get_form($c);
my $resource = $self->resource_from_item($c, $subscriber, $form);
my $hal = $self->hal_from_item($c, $subscriber, $resource, $form);
$guard->commit; #potential db write ops in hal_from
my $response = HTTP::Response->new(HTTP_OK, undef, HTTP::Headers->new(
(map { # XXX Data::HAL must be able to generate links with multiple relations
s|rel="(http://purl.org/sipwise/ngcp-api/#rel-resellers)"|rel="item $1"|r =~
s/rel=self/rel="item self"/r;
} $hal->http_headers),
), $hal->as_json);
$c->response->headers($response->headers);
$c->response->body($response->content);
return;
TX_START:
$c->clear_errors;
try {
my $guard = $c->model('DB')->txn_scope_guard;
{
last unless $self->valid_id($c, $id);
my $subscriber = $self->item_by_id($c, $id);
last unless $self->resource_exists($c, subscriber => $subscriber);
my $balance = NGCP::Panel::Utils::ProfilePackages::get_contract_balance(c => $c,
contract => $subscriber->contract,
); #apply underrun lock level
my ($form) = $self->get_form($c);
my $resource = $self->resource_from_item($c, $subscriber, $form);
my $hal = $self->hal_from_item($c, $subscriber, $resource, $form);
$guard->commit; #potential db write ops in hal_from
my $response = HTTP::Response->new(HTTP_OK, undef, HTTP::Headers->new(
(map { # XXX Data::HAL must be able to generate links with multiple relations
s|rel="(http://purl.org/sipwise/ngcp-api/#rel-resellers)"|rel="item $1"|r =~
s/rel=self/rel="item self"/r;
} $hal->http_headers),
), $hal->as_json);
$c->response->headers($response->headers);
$c->response->body($response->content);
return;
}
} catch($e) {
if ($self->check_deadlock($c, $e)) {
goto TX_START;
}
unless ($c->has_errors) {
$self->error($c, HTTP_INTERNAL_SERVER_ERROR, 'Internal Server Error', $e);
last;
}
}
return;
}
@ -75,38 +87,50 @@ sub PUT :Allow {
my $schema = $c->model('DB');
$schema->set_transaction_isolation('READ COMMITTED');
my $guard = $schema->txn_scope_guard;
{
my $preference = $self->require_preference($c);
last unless $preference;
TX_START:
$c->clear_errors;
try {
my $guard = $schema->txn_scope_guard;
{
my $preference = $self->require_preference($c);
last unless $preference;
my $subscriber = $self->item_by_id($c, $id);
last unless $self->resource_exists($c, subscriber => $subscriber);
my $balance = NGCP::Panel::Utils::ProfilePackages::get_contract_balance(c => $c,
contract => $subscriber->contract,
); #apply underrun lock level
$c->stash->{subscriber} = $subscriber; # password validation
my $resource = $self->get_valid_put_data(
c => $c,
id => $id,
media_type => 'application/json',
);
last unless $resource;
my $r = $self->prepare_resource($c, $schema, $resource, $subscriber);
last unless $r;
my $subscriber = $self->item_by_id($c, $id);
last unless $self->resource_exists($c, subscriber => $subscriber);
my $balance = NGCP::Panel::Utils::ProfilePackages::get_contract_balance(c => $c,
contract => $subscriber->contract,
); #apply underrun lock level
$c->stash->{subscriber} = $subscriber; # password validation
my $resource = $self->get_valid_put_data(
c => $c,
id => $id,
media_type => 'application/json',
);
last unless $resource;
my $r = $self->prepare_resource($c, $schema, $resource, $subscriber);
last unless $r;
$resource = $r->{resource};
my ($form) = $self->get_form($c);
$subscriber = $self->update_item($c, $schema, $subscriber, $r, $resource, $form);
last unless $subscriber;
$resource = $self->resource_from_item($c, $subscriber, $form);
my $hal = $self->hal_from_item($c, $subscriber, $resource, $form);
last unless $self->add_update_journal_item_hal($c,$hal);
$resource = $r->{resource};
$guard->commit;
$self->return_representation($c, 'hal' => $hal, 'preference' => $preference );
my ($form) = $self->get_form($c);
$subscriber = $self->update_item($c, $schema, $subscriber, $r, $resource, $form);
last unless $subscriber;
$resource = $self->resource_from_item($c, $subscriber, $form);
my $hal = $self->hal_from_item($c, $subscriber, $resource, $form);
last unless $self->add_update_journal_item_hal($c,$hal);
$guard->commit;
$self->return_representation($c, 'hal' => $hal, 'preference' => $preference );
}
} catch($e) {
if ($self->check_deadlock($c, $e)) {
goto TX_START;
}
unless ($c->has_errors) {
$self->error($c, HTTP_INTERNAL_SERVER_ERROR, 'Internal Server Error', $e);
last;
}
}
return;
}
@ -118,46 +142,58 @@ sub PATCH :Allow {
my $schema = $c->model('DB');
$schema->set_transaction_isolation('READ COMMITTED');
my $guard = $schema->txn_scope_guard;
{
my $preference = $self->require_preference($c);
last unless $preference;
TX_START:
$c->clear_errors;
try {
my $guard = $schema->txn_scope_guard;
{
my $preference = $self->require_preference($c);
last unless $preference;
my $subscriber = $self->item_by_id($c, $id);
last unless $self->resource_exists($c, subscriber => $subscriber);
my $balance = NGCP::Panel::Utils::ProfilePackages::get_contract_balance(c => $c,
contract => $subscriber->contract,
); #apply underrun lock level
$c->stash->{subscriber} = $subscriber; # password validation
my $json = $self->get_valid_patch_data(
c => $c,
id => $id,
media_type => 'application/json-patch+json',
ops => ["add", "replace", "copy", "remove"],
);
last unless $json;
my $subscriber = $self->item_by_id($c, $id);
last unless $self->resource_exists($c, subscriber => $subscriber);
my $balance = NGCP::Panel::Utils::ProfilePackages::get_contract_balance(c => $c,
contract => $subscriber->contract,
); #apply underrun lock level
$c->stash->{subscriber} = $subscriber; # password validation
my $json = $self->get_valid_patch_data(
c => $c,
id => $id,
media_type => 'application/json-patch+json',
ops => ["add", "replace", "copy", "remove"],
);
last unless $json;
my $patch_mode = 1;
my ($form) = $self->get_form($c);
my $old_resource = $self->resource_from_item($c, $subscriber, $form, $patch_mode);
$old_resource = clone($old_resource);
my $resource = $self->apply_patch($c, $old_resource, $json);
last unless $resource;
my $update = 1;
my $r = $self->prepare_resource($c, $schema, $resource, $subscriber, $patch_mode);
last unless $r;
$resource = $r->{resource};
$subscriber = $self->update_item($c, $schema, $subscriber, $r, $resource, $form);
last unless $subscriber;
$resource = $self->resource_from_item($c, $subscriber, $form);
my $hal = $self->hal_from_item($c, $subscriber, $resource, $form);
last unless $self->add_update_journal_item_hal($c,$hal);
my $patch_mode = 1;
my ($form) = $self->get_form($c);
my $old_resource = $self->resource_from_item($c, $subscriber, $form, $patch_mode);
$old_resource = clone($old_resource);
my $resource = $self->apply_patch($c, $old_resource, $json);
last unless $resource;
$guard->commit;
$self->return_representation($c, 'hal' => $hal, 'preference' => $preference );
my $update = 1;
my $r = $self->prepare_resource($c, $schema, $resource, $subscriber, $patch_mode);
last unless $r;
$resource = $r->{resource};
$subscriber = $self->update_item($c, $schema, $subscriber, $r, $resource, $form);
last unless $subscriber;
$resource = $self->resource_from_item($c, $subscriber, $form);
my $hal = $self->hal_from_item($c, $subscriber, $resource, $form);
last unless $self->add_update_journal_item_hal($c,$hal);
$guard->commit;
$self->return_representation($c, 'hal' => $hal, 'preference' => $preference );
}
} catch($e) {
if ($self->check_deadlock($c, $e)) {
goto TX_START;
}
unless ($c->has_errors) {
$self->error($c, HTTP_INTERNAL_SERVER_ERROR, 'Internal Server Error', $e);
last;
}
}
return;
}

@ -2207,17 +2207,20 @@ sub check_deadlock {
return 0 unless $error;
#my $lock_retry = $error =~ /Lock wait timeout exceeded; try restarting transaction/;
my $lockwait_retry = $error =~ /Lock wait timeout exceeded; try restarting transaction/;
my $deadlock_retry = $error =~ /Deadlock found when trying to get lock; try restarting transaction/;
return 0 unless $deadlock_retry;
return 0 unless $deadlock_retry or $lockwait_retry;
my $attempt = $c->stash->{deadlock_retry_attempt} //= 1;
my $lockwait_err = "lock timeout detected, retry transaction attempt=$attempt/$max_attempts";
my $deadlock_err = "deadlock detected, retry transaction attempt=$attempt/$max_attempts";
return 0 if $attempt > $max_attempts;
NGCP::Panel::Utils::Message::info(
c => $c,
type => 'api_retry',
log => "deadlock detected, retry transaction attempt=$attempt/$max_attempts",
log => ($lockwait_retry and $lockwait_err or $deadlock_err),
);
$c->stash->{deadlock_retry_attempt} = $attempt+1;
return 1;

Loading…
Cancel
Save