MT#18663 MT#20893 row bulk processing framework WIP #14

+support for handling multiple ngcp table versions
 at runtime
+setting allowed_ips

Change-Id: I7e87456d561407fd35b4518ed53a3c435c2e7798
changes/38/7538/4
Rene Krenn 9 years ago
parent 0332ec4fc3
commit a2d365e3df

@ -47,11 +47,8 @@ my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -66,7 +63,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -134,7 +131,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -56,11 +56,8 @@ my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -115,7 +112,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -72,11 +72,8 @@ my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -91,7 +88,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -152,7 +149,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -53,11 +53,8 @@ my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -72,7 +69,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -140,7 +137,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -67,11 +67,8 @@ our $TERMINATED_STATE = 'terminated';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -114,7 +111,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -172,7 +169,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -37,11 +37,8 @@ my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -106,7 +103,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -36,11 +36,8 @@ my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -95,7 +92,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -53,11 +53,8 @@ my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -116,7 +113,7 @@ sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,$tablename,$data);
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
@ -125,7 +122,7 @@ sub delete_row {
my ($xa_db,$data) = @_;
check_table();
return delete_record($get_db,$xa_db,$tablename,$data);
return delete_record($get_db,$xa_db,__PACKAGE__,$data);
}
@ -136,7 +133,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -193,7 +190,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -44,11 +44,8 @@ my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -110,7 +107,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -39,11 +39,8 @@ my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -99,7 +96,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -45,11 +45,8 @@ our $SIP_ACCOUNT_HANDLE = 'SIP_ACCOUNT';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -73,7 +70,7 @@ sub findby_resellerid_handle {
push(@params,$handle);
}
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
@ -108,7 +105,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -60,11 +60,8 @@ our $ACTIVE_STATE = 'active';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -134,7 +131,7 @@ sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,$tablename,$data);
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
@ -145,7 +142,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -219,7 +216,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -63,11 +63,8 @@ our $ACTIVE_STATE = 'active';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -138,7 +135,7 @@ sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,$tablename,$data);
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
@ -149,7 +146,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -224,7 +221,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -68,11 +68,8 @@ my $default_tz = 'vienna';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -87,7 +84,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -153,7 +150,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -0,0 +1,142 @@
package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
rowinsertskipped
rowupdateskipped
rowupdated
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_stmt
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
get_id
forupdate_increment
);
my $tablename = 'voip_aig_sequence';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
];
my $indexes = {};
my $insert_unique_fields = [];
my $start_value = 1;
my $increment = 1;
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub get_id {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT ' . $db->columnidentifier('id') . ' FROM ' . $table;
return $db->db_get_value($stmt);
}
sub forupdate_increment {
my ($xa_db) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT ' . $db->columnidentifier('id') . ' FROM ' . $table . ' FOR UPDATE';
my $id = $xa_db->db_get_value($stmt);
if (defined $id) {
$stmt = 'UPDATE ' . $table . ' SET ' . $db->columnidentifier('id') . ' = ? WHERE ' . $db->columnidentifier('id') . ' = ?';
if ($xa_db->db_do($stmt,$id + $increment,$id)) {
rowupdated($db,$tablename,getlogger(__PACKAGE__));
return $id + $increment;
} else {
rowupdateskipped($db,$tablename,0,getlogger(__PACKAGE__));
return undef;
}
} else {
$stmt = insert_stmt($db,__PACKAGE__);
if ($xa_db->db_do($stmt,$start_value)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $start_value;
} else {
rowinsertskipped($db,$tablename,getlogger(__PACKAGE__));
return undef;
}
}
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,170 @@
package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowsdeleted
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_stmt
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_rows
delete_groupid
countby_groupid_ipnet
);
my $tablename = 'voip_allowed_ip_groups';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'group_id',
'ipnet',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub insert_rows {
my ($xa_db,$group_id,$ipnets) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $stmt = insert_stmt($db,__PACKAGE__);
my @ids = ();
foreach my $ipnet (@$ipnets) {
if ($xa_db->db_do($stmt,undef,$group_id,$ipnet)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
push(@ids,$xa_db->db_last_insert_id());
}
}
return \@ids;
}
sub delete_groupid {
my ($xa_db,$group_id) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'DELETE FROM ' . $table . ' WHERE ' .
$db->columnidentifier('group_id') . ' = ?';
my @params = ($group_id);
my $count;
if ($count = $xa_db->db_do($stmt,@params)) {
rowsdeleted($db,$tablename,$count,$count,getlogger(__PACKAGE__));
return 1;
} else {
rowsdeleted($db,$tablename,0,0,getlogger(__PACKAGE__));
return 0;
}
}
sub countby_groupid_ipnet {
my ($group_id,$ipnet) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
my @terms = ();
if ($group_id) {
push(@terms,$db->columnidentifier('group_id') . ' = ?');
push(@params,$group_id);
}
if ($ipnet) {
push(@terms,$db->columnidentifier('ipnet') . ' = ?');
push(@params,$ipnet);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -48,11 +48,8 @@ our $CFNA_TYPE = 'cfna';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -117,7 +114,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -51,11 +51,8 @@ my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -140,7 +137,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -204,7 +201,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -36,11 +36,8 @@ my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -95,7 +92,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -35,6 +35,8 @@ our @EXPORT_OK = qw(
$PEER_AUTH_REGISTER
$FORCE_INBOUND_CALLS_TO_PEER
$ALLOWED_IPS_GRP_ATTRIBUTE
$CONCURRENT_MAX_TOTAL_ATTRIBUTE
);
#$FORCE_OUTBOUND_CALLS_TO_PEER
@ -79,14 +81,15 @@ our $PEER_AUTH_REGISTER = 'peer_auth_register';
our $FORCE_INBOUND_CALLS_TO_PEER = 'force_inbound_calls_to_peer';
#our $FORCE_OUTBOUND_CALLS_TO_PEER = 'force_outbound_calls_to_peer';
our $ALLOWED_IPS_GRP_ATTRIBUTE = 'allowed_ips_grp';
our $CONCURRENT_MAX_TOTAL_ATTRIBUTE = 'concurrent_max_total';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -141,7 +144,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -60,11 +60,8 @@ my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -97,7 +94,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -197,7 +194,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -61,11 +61,8 @@ our $FALSE = undef;
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -131,7 +128,7 @@ sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,$tablename,$data);
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
@ -140,7 +137,7 @@ sub delete_row {
my ($xa_db,$data) = @_;
check_table();
return delete_record($get_db,$xa_db,$tablename,$data);
return delete_record($get_db,$xa_db,__PACKAGE__,$data);
}
@ -186,7 +183,7 @@ sub insert_row {
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) {
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
@ -248,7 +245,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -0,0 +1,116 @@
package NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
findby_prefix
);
my $tablename = 'lnp_providers';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'name',
'prefix',
#'authoritative',
#'skip_rewrite',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_prefix {
my ($prefix,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my @params = ();
my @terms = ();
if ($prefix) {
push(@terms,$db->columnidentifier('prefix') . ' = ?');
push(@params,$prefix);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -24,11 +24,14 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw();
use NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
@ -107,6 +110,9 @@ sub check_billing_db_tables {
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers');
if (not $check_result) {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers');
}
$result &= $check_result; push(@$messages,$message);
return $result;
@ -167,6 +173,12 @@ sub check_provisioning_db_tables {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases');
$result &= $check_result; push(@$messages,$message);

@ -68,11 +68,8 @@ our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -86,8 +83,8 @@ sub create_table {
my $db = &$get_db();
registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef);
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
@ -245,7 +242,7 @@ sub process_records {
return process_table(
get_db => $get_db,
tablename => $tablename,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset);
@ -263,7 +260,7 @@ sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename,$insert_ignore);
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
@ -298,7 +295,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -65,11 +65,8 @@ our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -83,8 +80,8 @@ sub create_table {
my $db = &$get_db();
registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef);
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
@ -235,7 +232,7 @@ sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename,$insert_ignore);
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
@ -271,7 +268,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -82,11 +82,8 @@ our $FORWARD_UNAVAILABLE_OPTION_SET = 'Forward_Unavailable';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -100,8 +97,8 @@ sub create_table {
my $db = &$get_db();
registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef);
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
@ -264,7 +261,7 @@ sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename,$insert_ignore);
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
@ -301,7 +298,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -76,11 +76,8 @@ our $IN_TYPE = 'In';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -94,8 +91,8 @@ sub create_table {
my $db = &$get_db();
registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef);
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
@ -328,7 +325,7 @@ sub process_records {
return process_table(
get_db => $get_db,
tablename => $tablename,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset);
@ -346,7 +343,7 @@ sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename,$insert_ignore);
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
@ -382,7 +379,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -80,11 +80,8 @@ our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -98,8 +95,8 @@ sub create_table {
my $db = &$get_db();
registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef);
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
@ -283,7 +280,7 @@ sub process_records {
return process_table(
get_db => $get_db,
tablename => $tablename,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset);
@ -301,7 +298,7 @@ sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename,$insert_ignore);
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
@ -338,7 +335,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -71,11 +71,8 @@ our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
@ -89,8 +86,8 @@ sub create_table {
my $db = &$get_db();
registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef);
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
@ -223,7 +220,7 @@ sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename,$insert_ignore);
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
@ -260,7 +257,7 @@ sub gettablename {
sub check_table {
return checktableinfo($get_db,
$tablename,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);

@ -11,6 +11,7 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw(
$create_lnps_multithreading
$create_lnps_numofthreads
$create_lnp_block_txn
);
use NGCP::BulkProcessor::Logging qw (
@ -26,6 +27,7 @@ use NGCP::BulkProcessor::LogError qw(
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Lnp qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw();
use NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw();
use NGCP::BulkProcessor::ConnectorPool qw(
@ -56,25 +58,48 @@ sub create_lnps {
process_code => sub {
my ($context,$records,$row_offset) = @_;
my $rownum = $row_offset;
eval {
$context->{db}->db_begin();
if ($create_lnp_block_txn) {
eval {
$context->{db}->db_begin();
foreach my $imported_lnp (@$records) {
$rownum++;
next unless _reset_context($context,$imported_lnp,$rownum);
_create_lnp($context);
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
die($err) if !$skip_errors;
}
} else {
foreach my $imported_lnp (@$records) {
$rownum++;
next unless _reset_context($context,$imported_lnp,$rownum);
_create_lnp($context);
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
eval {
$context->{db}->db_begin();
_create_lnp($context);
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
die($err) if !$skip_errors;
}
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
die($err) if !$skip_errors;
}
#return 0;
@ -186,6 +211,13 @@ sub _create_lnps_checks {
eval {
$lnp_providers = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_prefix($prefix);
};
if ($@) {
rowprocessingwarn(threadid(),"falling back to mr4.4.1 lnp_providers table definition ...",getlogger(__PACKAGE__));
eval {
$lnp_providers = NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers::findby_prefix($prefix);
};
};
if ($@ or (scalar @$lnp_providers) != 1) {
rowprocessingerror(threadid(),"cannot find a (unique) lnp carrier with prefix '$prefix'",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..

@ -25,6 +25,9 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw(
$set_allowed_ips_multithreading
$set_allowed_ips_numofthreads
$allowed_ips
$set_preference_bulk_multithreading
$set_preference_bulk_numofthreads
);
use NGCP::BulkProcessor::Logging qw (
@ -49,6 +52,8 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch qw()
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
@ -72,8 +77,12 @@ our @EXPORT_OK = qw(
set_allowed_ips
set_allowed_ips_batch
set_preference_bulk
set_preference_bulk_batch
clear_preferences
set_preference
get_preference
$INIT_PEER_AUTH_MODE
$SWITCHOVER_PEER_AUTH_MODE
@ -688,27 +697,6 @@ sub _reset_set_peer_auth_context {
}
sub set_allowed_ips {
my ($mode) = @_;
@ -820,59 +808,14 @@ sub _set_allowed_ips_checks {
my $result = _checks($context);
eval {
$context->{peer_auth_user_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_USER);
$context->{allowed_ips_grp_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE);
};
if ($@ or not defined $context->{peer_auth_user_attribute}) {
rowprocessingerror(threadid(),'cannot find peer_auth_user attribute',getlogger(__PACKAGE__));
if ($@ or not defined $context->{allowed_ips_grp_attribute}) {
rowprocessingerror(threadid(),'cannot find allowed_ips_grp attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
eval {
$context->{peer_auth_pass_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_PASS);
};
if ($@ or not defined $context->{peer_auth_pass_attribute}) {
rowprocessingerror(threadid(),'cannot find peer_auth_pass attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
eval {
$context->{peer_auth_realm_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_REALM);
};
if ($@ or not defined $context->{peer_auth_realm_attribute}) {
rowprocessingerror(threadid(),'cannot find peer_auth_realm attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
eval {
$context->{peer_auth_register_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_REGISTER);
};
if ($@ or not defined $context->{peer_auth_register_attribute}) {
rowprocessingerror(threadid(),'cannot find peer_auth_register attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
eval {
$context->{force_inbound_calls_to_peer_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::FORCE_INBOUND_CALLS_TO_PEER);
};
if ($@ or not defined $context->{force_inbound_calls_to_peer_attribute}) {
rowprocessingerror(threadid(),'cannot find force_inbound_calls_to_peer attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
#eval {
# $context->{force_outbound_calls_to_peer_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::FORCE_OUTBOUND_CALLS_TO_PEER);
#};
#if ($@ or not defined $context->{force_outbound_calls_to_peer_attribute}) {
# rowprocessingerror(threadid(),'cannot find force_outbound_calls_to_peer attribute',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
return $result;
}
@ -880,55 +823,33 @@ sub _set_allowed_ips_preferences {
my ($context) = @_;
#if ($INIT_PEER_AUTH_MODE eq $context->{mode}) {
#
# $context->{peer_auth_user_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_user_attribute},$context->{username});
# $context->{peer_auth_pass_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_pass_attribute},$context->{password});
# $context->{peer_auth_realm_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_realm_attribute},$context->{realm});
#
# $context->{peer_auth_register_attribute_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_register_attribute},$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::FALSE);
# $context->{force_inbound_calls_to_peer_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{force_inbound_calls_to_peer_attribute},$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE);
#
# _info($context,"($context->{rownum}) " . $context->{mode} . ' peer authentication preferences for subscriber ' . $context->{cli},1);
#
#} elsif ($SWITCHOVER_PEER_AUTH_MODE eq $context->{mode}) {
#
# $context->{peer_auth_user_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_user_attribute},$context->{username});
# $context->{peer_auth_pass_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_pass_attribute},$context->{password});
# $context->{peer_auth_realm_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_realm_attribute},$context->{realm});
#
# $context->{peer_auth_register_attribute_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_register_attribute},$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE);
# $context->{force_inbound_calls_to_peer_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{force_inbound_calls_to_peer_attribute},$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::FALSE);
#
# _info($context,"($context->{rownum}) " . $context->{mode} . ' peer authentication preferences for subscriber ' . $context->{cli},1);
#
#} elsif ($CLEAR_PEER_AUTH_MODE eq $context->{mode}) {
#
# $context->{peer_auth_user_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_user_attribute},undef);
# $context->{peer_auth_pass_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_pass_attribute},undef);
# $context->{peer_auth_realm_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_realm_attribute},undef);
#
# $context->{peer_auth_register_attribute_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{peer_auth_register_attribute},undef);
# $context->{force_inbound_calls_to_peer_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{force_inbound_calls_to_peer_attribute},undef);
#
# _info($context,"($context->{rownum}) " . $context->{mode} . ' peer authentication preferences for subscriber ' . $context->{cli},1);
#
#}
my $subscriber_id = $context->{provisioning_voip_subscriber}->{id};
my $attribute = $context->{allowed_ips_grp_attribute};
my $allowed_ips_grp_attribute_preference = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid(
$context->{db},$subscriber_id,$attribute->{id})->[0];
if (defined $allowed_ips_grp_attribute_preference) {
$context->{allowed_ip_group_id} = $allowed_ips_grp_attribute_preference->{value};
$context->{allowed_ips_grp_attribute_preference_id} = $allowed_ips_grp_attribute_preference->{id};
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::delete_groupid($context->{db},$context->{allowed_ip_group_id});
_info($context,"($context->{rownum}) " . 'allowed ips group for subscriber ' . $context->{cli} . ' exists, ipnets deleted',1);
} else {
$context->{allowed_ip_group_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::forupdate_increment($context->{db});
_info($context,"($context->{rownum}) " . 'new allowed ips group id for subscriber ' . $context->{cli} . ' aquired',1);
}
$context->{allowed_ips_grp_ipnet_ids} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::insert_rows($context->{db},$context->{allowed_ip_group_id},$context->{allowed_ips});
_info($context,"($context->{rownum}) " . 'new allowed ips group id for subscriber ' . $context->{cli} . ' aquired',1);
if (not defined $allowed_ips_grp_attribute_preference) {
$context->{allowed_ips_grp_attribute_preference_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db},
attribute_id => $attribute->{id},
subscriber_id => $subscriber_id,
value => $context->{allowed_ip_group_id},
);
_info($context,"($context->{rownum}) " . 'new allowed ips group preference value for subscriber ' . $context->{cli} . ' added',1);
}
}
@ -938,75 +859,160 @@ sub _reset_set_allowed_ips_context {
my $result = _reset_context($context,$imported_subscriber,$rownum);
$context->{realm} = $peer_auth_realm;
#$context->{mode} = $mode;
delete $context->{peer_auth_user_preference_id};
delete $context->{peer_auth_pass_preference_id};
delete $context->{peer_auth_realm_preference_id};
$context->{allowed_ips} = $allowed_ips;
delete $context->{peer_auth_register_attribute_preference_id};
delete $context->{force_inbound_calls_to_peer_preference_id};
delete $context->{allowed_ip_group_id};
delete $context->{allowed_ips_grp_attribute_preference_id};
delete $context->{allowed_ips_grp_ipnet_ids};
return $result;
}
sub set_preference_bulk {
my ($bulk_attribute_name,$value) = @_;
my $static_context = { bulk_attribute_name => $bulk_attribute_name, value => $value };
my $result = _set_preference_bulk_checks($static_context);
destroy_all_dbs();
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::process_records(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
my $rownum = $row_offset;
foreach my $imported_subscriber (@$records) {
$rownum++;
next unless _reset_set_preference_bulk_context($context,$imported_subscriber,$rownum);
_set_preference_bulk($context);
}
#return 0;
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
# below is not mandatory..
_check_insert_tables();
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
load_recursive => 0,
multithreading => $set_preference_bulk_multithreading,
numofthreads => $set_preference_bulk_numofthreads,
),$warning_count);
}
sub set_preference_bulk_batch {
my ($bulk_attribute_name,$value) = @_;
my $static_context = { bulk_attribute_name => $bulk_attribute_name, value => $value };
my $result = _set_preference_bulk_checks($static_context);
destroy_all_dbs();
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::process_records(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
my $rownum = $row_offset;
foreach my $record (@$records) {
$rownum++;
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_preference_bulk_context($context,$imported_subscriber,$rownum);
_set_preference_bulk($context);
} else {
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
}
}
}
#return 0;
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
# below is not mandatory..
_check_insert_tables();
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
load_recursive => 0,
multithreading => $set_preference_bulk_multithreading,
numofthreads => $set_preference_bulk_numofthreads,
),$warning_count);
}
sub _set_preference_bulk {
my ($context) = @_;
_set_subscriber_preference($context,\&_set_preferences_bulk);
}
sub _set_preference_bulk_checks {
my ($context) = @_;
my $result = _checks($context);
eval {
$context->{bulk_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$context->{bulk_attribute_name});
};
if ($@ or not defined $context->{bulk_attribute}) {
rowprocessingerror(threadid(),'cannot find ' . $context->{bulk_attribute_name} . ' attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
return $result;
}
sub _set_preferences_bulk {
my ($context) = @_;
$context->{bulk_attribute_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
$context->{bulk_attribute},$context->{value});
_info($context,"($context->{rownum}) " . 'set ' . $context->{bulk_attribute_name} . ' preference value for subscriber ' . $context->{cli},1);
}
sub _reset_set_preference_bulk_context {
my ($context,$imported_subscriber,$rownum) = @_;
my $result = _reset_context($context,$imported_subscriber,$rownum);
delete $context->{bulk_attribute_id};
return $result;
}
sub clear_preferences {
@ -1020,10 +1026,9 @@ sub clear_preferences {
sub set_preference {
my ($context,$subscriber_id,$attribute,$value) = @_;
my $old_preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db},
$subscriber_id,$attribute->{id});
if ($attribute->{max_occur} == 1) {
my $old_preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db},
$subscriber_id,$attribute->{id});
if (defined $value) {
if ((scalar @$old_preferences) == 1) {
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::update_row($context->{db},{
@ -1061,11 +1066,18 @@ sub set_preference {
}
sub get_preference {
my ($context,$subscriber_id,$attribute) = @_;
my $preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db},
$subscriber_id,$attribute->{id});
if ($attribute->{max_occur} == 1) {
return $preferences->[0];
} else {
return $preferences;
}
}
sub _error {

@ -118,6 +118,11 @@ our @EXPORT_OK = qw(
$create_lnps_multithreading
$create_lnps_numofthreads
$create_lnp_block_txn
$set_preference_bulk_multithreading
$set_preference_bulk_numofthreads
$concurrent_max_total
);
our $defaultconfig = 'config.cfg';
@ -203,6 +208,11 @@ our $ringtimeout = undef;
our $create_lnps_multithreading = $enablemultithreading;
our $create_lnps_numofthreads = $cpucount;
our $create_lnp_block_txn = 0;
our $set_preference_bulk_multithreading = $enablemultithreading;
our $set_preference_bulk_numofthreads = $cpucount;
our $concurrent_max_total = undef;
sub update_settings {
@ -310,6 +320,15 @@ sub update_settings {
$create_lnps_multithreading = $data->{create_lnps_multithreading} if exists $data->{create_lnps_multithreading};
$create_lnps_numofthreads = _get_import_numofthreads($cpucount,$data,'create_lnps_numofthreads');
$create_lnp_block_txn = $data->{create_lnp_block_txn} if exists $data->{create_lnp_block_txn};
$set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading};
$set_preference_bulk_numofthreads = _get_import_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads');
$concurrent_max_total = $data->{concurrent_max_total} if exists $data->{concurrent_max_total};
if (defined $concurrent_max_total and $concurrent_max_total <= 0) {
configurationerror($configfile,'empty concurrent_max_total or greater than 0 required',getlogger(__PACKAGE__));
$result = 0;
}
return $result;

@ -31,6 +31,8 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw(
$reseller_id
$barring_profiles_yml
$barring_profiles
$allowed_ips
$concurrent_max_total
);
use NGCP::BulkProcessor::Logging qw(
init_log
@ -75,10 +77,13 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw();
use NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Check qw(
@ -112,6 +117,9 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Preferences qw(
set_allowed_ips
set_allowed_ips_batch
set_preference_bulk
set_preference_bulk_batch
$INIT_PEER_AUTH_MODE
$SWITCHOVER_PEER_AUTH_MODE
$CLEAR_PEER_AUTH_MODE
@ -180,12 +188,15 @@ push(@TASK_OPTS,$switchover_peer_auth_task_opt);
my $clear_peer_auth_task_opt = 'clear_peer_auth';
push(@TASK_OPTS,$clear_peer_auth_task_opt);
#my $set_allowed_ips_task_opt = 'set_allowed_ips';
#push(@TASK_OPTS,$set_allowed_ips_task_opt);
my $set_allowed_ips_task_opt = 'set_allowed_ips';
push(@TASK_OPTS,$set_allowed_ips_task_opt);
my $set_call_forwards_task_opt = 'set_call_forwards';
push(@TASK_OPTS,$set_call_forwards_task_opt);
my $set_concurrent_max_total_task_opt = 'set_concurrent_max_total';
push(@TASK_OPTS,$set_concurrent_max_total_task_opt);
my $create_lnps_task_opt = 'create_lnps';
push(@TASK_OPTS,$create_lnps_task_opt);
@ -301,12 +312,12 @@ sub main() {
$completion |= 1;
}
#} elsif (lc($set_allowed_ips_task_opt) eq lc($task)) {
# if (taskinfo($set_allowed_ips_task_opt,$result,1) and ($result = batchinfo($result))) {
# next unless check_dry();
# $result &= set_allowed_ips_task(\@messages);
# $completion |= 1;
# }
} elsif (lc($set_allowed_ips_task_opt) eq lc($task)) {
if (taskinfo($set_allowed_ips_task_opt,$result,1) and ($result = batchinfo($result))) {
next unless check_dry();
$result &= set_allowed_ips_task(\@messages);
$completion |= 1;
}
} elsif (lc($set_call_forwards_task_opt) eq lc($task)) {
if (taskinfo($set_call_forwards_task_opt,$result,1) and ($result = batchinfo($result))) {
@ -315,6 +326,15 @@ sub main() {
$completion |= 1;
}
} elsif (lc($set_concurrent_max_total_task_opt) eq lc($task)) {
if (taskinfo($set_concurrent_max_total_task_opt,$result,1) and ($result = batchinfo($result))) {
next unless check_dry();
$result &= set_preference_bulk_task(\@messages,
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CONCURRENT_MAX_TOTAL_ATTRIBUTE,
$concurrent_max_total);
$completion |= 1;
}
} elsif (lc($create_lnps_task_opt) eq lc($task)) {
if (taskinfo($create_lnps_task_opt,$result)) {
next unless check_dry();
@ -878,20 +898,20 @@ sub set_peer_auth_task {
my $force_inbound_calls_to_peer_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::FORCE_INBOUND_CALLS_TO_PEER);
$stats .= "\n " . $peer_auth_user_attribute->{attribute} . "': " .
$stats .= "\n '" . $peer_auth_user_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$peer_auth_user_attribute->{id},undef) . ' rows';
$stats .= "\n " . $peer_auth_pass_attribute->{attribute} . "': " .
$stats .= "\n '" . $peer_auth_pass_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$peer_auth_pass_attribute->{id},undef) . ' rows';
$stats .= "\n " . $peer_auth_realm_attribute->{attribute} . "': " .
$stats .= "\n '" . $peer_auth_realm_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$peer_auth_realm_attribute->{id},undef) . ' rows';
$stats .= "\n " . $peer_auth_register_attribute->{attribute} . "': " .
$stats .= "\n '" . $peer_auth_register_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$peer_auth_register_attribute->{id},undef) . ' rows';
$stats .= "\n " . $force_inbound_calls_to_peer_attribute->{attribute} . "': " .
$stats .= "\n '" . $force_inbound_calls_to_peer_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$force_inbound_calls_to_peer_attribute->{id},undef) . ' rows';
};
@ -922,33 +942,17 @@ sub set_allowed_ips_task {
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
eval {
my $peer_auth_user_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_USER);
my $peer_auth_pass_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_PASS);
my $peer_auth_realm_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_REALM);
my $peer_auth_register_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_REGISTER);
my $force_inbound_calls_to_peer_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::FORCE_INBOUND_CALLS_TO_PEER);
$stats .= "\n " . $peer_auth_user_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$peer_auth_user_attribute->{id},undef) . ' rows';
$stats .= "\n " . $peer_auth_pass_attribute->{attribute} . "': " .
my $allowed_ips_grp_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE);
$stats .= "\n '" . $allowed_ips_grp_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$peer_auth_pass_attribute->{id},undef) . ' rows';
$stats .= "\n " . $peer_auth_realm_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$peer_auth_realm_attribute->{id},undef) . ' rows';
$allowed_ips_grp_attribute->{id},undef) . ' rows';
foreach my $ipnet (@$allowed_ips) {
$stats .= "\n '$ipnet': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::countby_groupid_ipnet(undef,$ipnet) . ' rows';
}
$stats .= "\n voip_aig_sequence: " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::get_id();
$stats .= "\n " . $peer_auth_register_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$peer_auth_register_attribute->{id},undef) . ' rows';
$stats .= "\n " . $force_inbound_calls_to_peer_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$force_inbound_calls_to_peer_attribute->{id},undef) . ' rows';
};
if ($err or !$result) {
push(@$messages,"set subscribers\' allowed_ips preference INCOMPLETE$stats");
@ -1009,12 +1013,21 @@ sub create_lnps_task {
};
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
my $lnp_providers = [];
eval {
$lnp_providers = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_prefix();
};
if ($@) {
eval {
$lnp_providers = NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers::findby_prefix();
};
}
eval {
$stats .= "\n lnp_numbers: " .
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number() . ' rows';
foreach my $lnp_provider (@{NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_prefix()}) {
foreach my $lnp_provider (@$lnp_providers) {
$stats .= "\n '" . $lnp_provider->{name} . "': " .
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number($lnp_provider->{id}) . ' rows';
}
@ -1030,6 +1043,37 @@ sub create_lnps_task {
}
sub set_preference_bulk_task {
my ($messages,$bulk_attribute_name,$value) = @_;
my ($result,$warning_count) = (0,0);
eval {
if ($batch) {
($result,$warning_count) = set_preference_bulk_batch($bulk_attribute_name,$value);
} else {
($result,$warning_count) = set_preference_bulk($bulk_attribute_name,$value);
}
};
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
eval {
my $bulk_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute($bulk_attribute_name);
$stats .= "\n '" . $bulk_attribute->{attribute} . "': " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
$bulk_attribute->{id},undef) . ' rows';
};
if ($err or !$result) {
push(@$messages,"set subscribers\' $bulk_attribute_name preference INCOMPLETE$stats");
} else {
push(@$messages,"set subscribers\' $bulk_attribute_name preference completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..

@ -45,7 +45,7 @@ peer_auth_realm = myrealm
set_allowed_ips_multithreading = 1
#set_allowed_ips_numofthreads = 6
allowed_ips=127.0.0.1/24
allowed_ips=127.0.0.1/24,127.0.0.2/24
set_call_forwards_multithreading = 1
#set_call_forwards_numofthreads = 6
@ -61,3 +61,8 @@ ringtimeout = 20
create_lnps_multithreading = 1
#create_lnps_numofthreads = 6
create_lnp_block_txn = 0
set_preference_bulk_multithreading = 1
#set_preference_bulk_numofthreads = 6
concurrent_max_total = 2

@ -13,6 +13,7 @@ our @EXPORT_OK = qw();
sub new {
my $base_class = shift;
my $class = shift;
my $self = bless {}, $class;
return init_item($self,@_);

@ -53,9 +53,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -57,9 +57,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -41,9 +41,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -44,9 +44,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -47,9 +47,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -65,9 +65,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -59,9 +59,7 @@ our $TERMINATED_STATE = 'terminated';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -50,9 +50,7 @@ my $get_item_filter_path_query = sub {
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -52,9 +52,7 @@ my $get_item_filter_path_query = sub {
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -53,9 +53,7 @@ my $get_item_filter_path_query = sub {
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -44,9 +44,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -63,9 +63,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -64,9 +64,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($fieldnames);
bless($self,$class);
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);

@ -43,10 +43,11 @@ my $instance_counts = {};
sub new {
#my $class = shift;
my $base_class = shift;
my $class = shift;
#my $self = bless {}, $class;
my ($class,$functions,$derived_class,$serialization_format,$no_autostart) = @_;
my $self = bless {}, $derived_class;
my ($functions,$serialization_format,$no_autostart) = @_;
my $self = bless {}, $class;
$self->{worker} = undef;
$self->{functions} = $functions;
@ -78,6 +79,7 @@ sub new {
#$self = share($self);
#autostart??
servicedebug($self,$class . ' service created',getlogger(__PACKAGE__));
return $self;

@ -5,7 +5,7 @@ use strict;
use NGCP::BulkProcessor::Logging qw(getlogger servicedebug);
use NGCP::BulkProcessor::Service;
use NGCP::BulkProcessor::Service qw();
#use test::csv_table; # qw(test_table_bycolumn1);
#use test::mysql_table;
@ -42,15 +42,8 @@ my $functions = {
sub new {
#my $class = shift;
#my $self = NGCP::BulkProcessor::Service->new($functions,$class);
#bless($self,$class);
#return $self;
my $self = NGCP::BulkProcessor::Service->new($functions,@_);
servicedebug($self,__PACKAGE__ . ' service created',getlogger(__PACKAGE__));
my $class = shift;
my $self = NGCP::BulkProcessor::Service->new($class,$functions,@_);
return $self;
}

@ -81,6 +81,7 @@ our @EXPORT_OK = qw(
#transfer_record
#transfer_records
my $table_names = {};
my $table_expected_fieldnames = {};
my $table_fieldnames_cached = {};
my $table_primarykeys = {};
@ -107,17 +108,17 @@ my $ERROR = 4;
sub init_record {
my ($record,$get_db,$tablename,$expected_fieldnames,$target_indexes) = @_;
my ($record,$class,$get_db,$tablename,$expected_fieldnames,$target_indexes) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
checktableinfo($db,$tablename,$expected_fieldnames,$target_indexes);
checktableinfo($db,$class,$tablename,$expected_fieldnames,$target_indexes);
if (defined $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}) { # and ref $table_fieldnames_cached->{$connectidentifier}->{$tablename} eq 'ARRAY') {
if (defined $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}) { # and ref $table_fieldnames_cached->{$connectidentifier}->{$tablename} eq 'ARRAY') {
# if there are fieldnames defined, we make a member variable for each and set it to undef
foreach my $fieldname (@{$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}}) {
foreach my $fieldname (@{$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}}) {
$record->{$fieldname} = undef;
}
}
@ -170,6 +171,12 @@ sub cleartableinfo {
my $found = 0;
if (exists $table_names->{$tid}) {
if (exists $table_names->{$tid}->{$connectidentifier}) {
delete $table_names->{$tid}->{$connectidentifier};
$found = 1;
}
}
if (exists $table_expected_fieldnames->{$tid}) {
if (exists $table_expected_fieldnames->{$tid}->{$connectidentifier}) {
delete $table_expected_fieldnames->{$tid}->{$connectidentifier};
@ -195,6 +202,10 @@ sub cleartableinfo {
}
}
if ((scalar keys %{$table_names->{$tid}}) == 0) {
delete $table_names->{$tid};
$found = 1;
}
if ((scalar keys %{$table_expected_fieldnames->{$tid}}) == 0) {
delete $table_expected_fieldnames->{$tid};
$found = 1;
@ -220,13 +231,20 @@ sub cleartableinfo {
sub registertableinfo { # to prepare creation of non-existent tables..
my ($get_db,$tablename,$fieldnames,$indexes,$keycols) = @_;
my ($get_db,$class,$tablename,$fieldnames,$indexes,$keycols) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
if (not exists $table_names->{$tid}) {
$table_names->{$tid} = {};
}
if (not exists $table_names->{$tid}->{$connectidentifier}) {
$table_names->{$tid}->{$connectidentifier} = {};
}
$table_names->{$tid}->{$connectidentifier}->{$class} = $tablename;
if (not exists $table_expected_fieldnames->{$tid}) {
$table_expected_fieldnames->{$tid} = {};
}
@ -234,8 +252,9 @@ sub registertableinfo { # to prepare creation of non-existent tables..
# create an empty category for the connection if none exists yet:
$table_expected_fieldnames->{$tid}->{$connectidentifier} = {};
}
# we prefer to always update the expected fieldnames (that come from a derived class)
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename} = $fieldnames // [];
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = $fieldnames // [];
if (not exists $table_fieldnames_cached->{$tid}) {
$table_fieldnames_cached->{$tid} = {};
@ -252,7 +271,7 @@ sub registertableinfo { # to prepare creation of non-existent tables..
# create an empty primary key column name cache for the connection if none exists yet:
$table_primarykeys->{$tid}->{$connectidentifier} = {};
}
$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename} = $keycols // [];
$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = $keycols // [];
if (not exists $table_target_indexes->{$tid}) {
$table_target_indexes->{$tid} = {};
@ -262,13 +281,13 @@ sub registertableinfo { # to prepare creation of non-existent tables..
$table_target_indexes->{$tid}->{$connectidentifier} = {};
}
# we prefer to always update the target table indexes (that come from a derived class)
$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = $indexes // {};
$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = $indexes // {};
}
sub checktableinfo {
my ($get_db,$tablename,$expected_fieldnames,$target_indexes) = @_;
my ($get_db,$class,$tablename,$expected_fieldnames,$target_indexes) = @_;
my $result = 1;
@ -277,6 +296,13 @@ sub checktableinfo {
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
if (not exists $table_names->{$tid}) {
$table_names->{$tid} = {};
}
if (not exists $table_names->{$tid}->{$connectidentifier}) {
$table_names->{$tid}->{$connectidentifier} = {};
}
$table_names->{$tid}->{$connectidentifier}->{$class} = $tablename;
if (not exists $table_expected_fieldnames->{$tid}) {
#$table_expected_fieldnames->{$tid} = shared_clone({});
$table_expected_fieldnames->{$tid} = {};
@ -287,8 +313,8 @@ sub checktableinfo {
$table_expected_fieldnames->{$tid}->{$connectidentifier} = {};
}
# we prefer to always update the expected fieldnames (that come from a derived class)
#$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($expected_fieldnames);
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename} = $expected_fieldnames // [];
#$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = shared_clone($expected_fieldnames);
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = $expected_fieldnames // [];
if (not exists $table_fieldnames_cached->{$tid}) {
#$table_fieldnames_cached->{$tid} = shared_clone({});
@ -300,24 +326,26 @@ sub checktableinfo {
$table_fieldnames_cached->{$tid}->{$connectidentifier} = {};
}
if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename}) {
if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}) {
# query the database for fieldnames of the table if we don't have a cache entry yet:
#$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($db->getfieldnames($tablename));
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename} = $db->getfieldnames($tablename);
#my $fieldnames = $db->getfieldnames($tablename);
if (!defined $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}
or (scalar @{$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}}) == 0
or setcontains($table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename},1)) {
my $fieldnames = $db->getfieldnames($tablename);
if (!defined $fieldnames
or (scalar @{$fieldnames}) == 0
or setcontains($table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$fieldnames,1)) {
#fieldnames are case insensitive in general
# if not expected fieldnames are given or queried fieldnames match, we log this:
#$table_fieldnames_cached->{$connectidentifier}->{$tablename} = $table_expected_fieldnames->{$connectidentifier}->{$tablename};
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class} = { fieldnames => $fieldnames, ok => 1, };
fieldnamesaquired($db,$tablename,getlogger(__PACKAGE__));
} else {
# otherwise we log a failure (exit? - see Logging Module)
#$table_fieldnames_cached->{$connectidentifier}->{$tablename} = {}; #$fieldnames;
fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename},getlogger(__PACKAGE__));
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class} = { fieldnames => $fieldnames, ok => 0, };
fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$fieldnames,getlogger(__PACKAGE__));
$result = 0;
}
} elsif (not $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}->{ok}) {
fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}->{fieldnames},getlogger(__PACKAGE__));
$result = 0;
}
if (not exists $table_primarykeys->{$tid}) {
@ -329,11 +357,11 @@ sub checktableinfo {
#$table_primarykeys->{$tid}->{$connectidentifier} = shared_clone({});
$table_primarykeys->{$tid}->{$connectidentifier} = {};
}
if (not exists $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}) {
if (not exists $table_primarykeys->{$tid}->{$connectidentifier}->{$class}) {
# query the database for primary keys of the table if we don't have them cached yet:
#$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($db->getprimarykeycols($tablename));
$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename} = $db->getprimarykeycols($tablename);
primarykeycolsaquired($db,$tablename,$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename},getlogger(__PACKAGE__));
#$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = shared_clone($db->getprimarykeycols($class));
$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = $db->getprimarykeycols($tablename);
primarykeycolsaquired($db,$tablename,$table_primarykeys->{$tid}->{$connectidentifier}->{$class},getlogger(__PACKAGE__));
}
if (not exists $table_target_indexes->{$tid}) {
@ -346,8 +374,8 @@ sub checktableinfo {
$table_target_indexes->{$tid}->{$connectidentifier} = {};
}
# we prefer to always update the target table indexes (that come from a derived class)
#$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($target_indexes);
$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = $target_indexes // {};
#$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = shared_clone($target_indexes);
$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = $target_indexes // {};
return $result;
@ -355,7 +383,7 @@ sub checktableinfo {
sub create_targettable {
my ($get_db,$tablename,$get_target_db,$targettablename,$truncate,$defer_indexes,$texttable_engine) = @_;
my ($get_db,$class,$get_target_db,$targetclass,$targettablename,$truncate,$defer_indexes,$texttable_engine) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
@ -369,22 +397,22 @@ sub create_targettable {
}
my $result = $target_db->create_texttable($targettablename,
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},
$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename},
$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename},
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},
$table_primarykeys->{$tid}->{$connectidentifier}->{$class},
$table_target_indexes->{$tid}->{$connectidentifier}->{$class},
# 'ifnotexists' is always true
$truncate,
$defer_indexes,
$texttable_engine);
checktableinfo($target_db,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$defer_indexes ? undef : $table_target_indexes->{$tid}->{$connectidentifier}->{$tablename});
checktableinfo($target_db,$targetclass,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$defer_indexes ? undef : $table_target_indexes->{$tid}->{$connectidentifier}->{$class});
return $result;
}
sub delete_records {
my ($get_db,$get_xa_db,$tablename,$keyfields,$equal,$vals_table) = @_;
my ($get_db,$get_xa_db,$class,$keyfields,$equal,$vals_table) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
@ -392,8 +420,9 @@ sub delete_records {
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename};
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
if (defined $expected_fieldnames and
(defined $keyfields and
@ -463,7 +492,7 @@ sub delete_records {
sub insert_record {
my ($get_db,$get_xa_db,$tablename,$row,$insert_ignore,$unique_count_fields) = @_;
my ($get_db,$get_xa_db,$class,$row,$insert_ignore,$unique_count_fields) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
@ -472,8 +501,9 @@ sub insert_record {
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename};
#my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename};
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
#my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
if (defined $expected_fieldnames and defined $row) {
@ -530,7 +560,7 @@ sub insert_record {
sub delete_record {
my ($get_db,$get_xa_db,$tablename,$row) = @_;
my ($get_db,$get_xa_db,$class,$row) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
@ -539,8 +569,9 @@ sub delete_record {
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename};
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
if (defined $expected_fieldnames and defined $row) {
@ -603,7 +634,7 @@ sub delete_record {
sub update_record {
my ($get_db,$get_xa_db,$tablename,$row) = @_;
my ($get_db,$get_xa_db,$class,$row) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
@ -612,8 +643,9 @@ sub update_record {
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename};
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
if (defined $expected_fieldnames and defined $row) {
@ -683,11 +715,12 @@ sub update_record {
sub insert_stmt {
my ($get_db,$tablename,$insert_ignore) = @_;
my ($get_db,$class,$insert_ignore) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename};
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
return 'INSERT ' . ($insert_ignore ? $db->insert_ignore_phrase() . ' ' : '') . 'INTO ' . $db->tableidentifier($tablename) . ' (' .
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) .
') VALUES (' . substr(',?' x scalar @$expected_fieldnames,1) . ')';
@ -698,8 +731,9 @@ sub transfer_table {
my %params = @_;
my ($get_db,
$tablename,
$class,
$get_target_db,
$targetclass,
$targettablename,
$truncate_targettable,
$create_indexes,
@ -712,8 +746,9 @@ sub transfer_table {
$select,
$values) = @params{qw/
get_db
tablename
class
get_target_db
targetclass
targettablename
truncate_targettable
create_indexes
@ -734,6 +769,10 @@ sub transfer_table {
my $db = &$get_db($reader_connection_name,1);
my $target_db = &$get_target_db(); #$writer_connection_name);
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
my $countstatement;
if (defined $selectcount) {
$countstatement = $selectcount;
@ -756,11 +795,9 @@ sub transfer_table {
$create_indexes = ((defined $create_indexes) ? $create_indexes : $transfer_defer_indexes);
if (create_targettable($db,$tablename,$target_db,$targettablename,$truncate_targettable,$create_indexes,$texttable_engine)) {
if (create_targettable($db,$class,$target_db,$targetclass,$targettablename,$truncate_targettable,$create_indexes,$texttable_engine)) {
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename};
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my @fieldnames = @$expected_fieldnames;
@ -774,7 +811,7 @@ sub transfer_table {
$selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename)
}
my $insertstatement = 'INSERT INTO ' . $target_db->tableidentifier($targettablename) . ' (' . join(', ',map { local $_ = $_; $_ = $target_db->columnidentifier($_); $_; } @fieldnames) . ') VALUES (' . $valueplaceholders . ')';
my $insertstatement = 'INSERT INTO ' . $target_db->tableidentifier($targettablename) . ' (' . join(', ',map { local $_ = $_; $_ = $target_db->columnidentifier($_); $_; } @fieldnames) . ') VALUES (' . $valueplaceholders . ')';
my $blocksize;
@ -813,6 +850,7 @@ sub transfer_table {
threadqueuelength => $tabletransfer_threadqueuelength,
get_db => $get_db,
tablename => $tablename,
class => $class,
selectstatement => $selectstatement,
blocksize => $blocksize,
rowcount => $rowcount,
@ -831,6 +869,7 @@ sub transfer_table {
#writererrorstate_ref => \$writererrorstate,
get_target_db => $get_target_db,
targettablename => $targettablename,
targetclass => $targetclass,
insertstatement => $insertstatement,
blocksize => $blocksize,
rowcount => $rowcount,
@ -937,16 +976,16 @@ sub transfer_table {
eval {
$target_db->create_primarykey($targettablename,
$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename},
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename});
$table_primarykeys->{$tid}->{$connectidentifier}->{$class},
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class});
$target_db->create_indexes($targettablename,
$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename},
$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename});
$table_target_indexes->{$tid}->{$connectidentifier}->{$class},
$table_primarykeys->{$tid}->{$connectidentifier}->{$class});
delete $table_primarykeys->{$tid}->{$target_db->connectidentifier()}->{$targettablename};
checktableinfo($target_db,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename});
delete $table_primarykeys->{$tid}->{$target_db->connectidentifier()}->{$targetclass};
checktableinfo($target_db,$targetclass,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$table_target_indexes->{$tid}->{$connectidentifier}->{$class});
$target_db->vacuum($targettablename);
@ -982,7 +1021,7 @@ sub process_table {
my %params = @_;
my ($get_db,
$tablename,
$class,
$process_code,
$static_context,
$init_process_context_code,
@ -994,7 +1033,7 @@ sub process_table {
$select,
$values) = @params{qw/
get_db
tablename
class
process_code
static_context
init_process_context_code
@ -1013,6 +1052,11 @@ sub process_table {
my $db = &$get_db($reader_connection_name,1);
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $countstatement;
if (defined $selectcount) {
$countstatement = $selectcount;
@ -1029,17 +1073,7 @@ sub process_table {
return;
}
my $errorstate = $RUNNING;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename};
my @fieldnames = @$expected_fieldnames;
#my $setfieldnames = join(', ',@fieldnames);
#my $valueplaceholders = substr(',?' x scalar @fieldnames,1);
my $selectstatement;
if (length($select) > 0) {
$selectstatement = $select;
@ -1047,6 +1081,7 @@ sub process_table {
$selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename);
}
my $errorstate = $RUNNING;
my $blocksize;
if ($enablemultithreading and $multithreading and $db->multithreading_supported() and $cpucount > 1) { # and $multithreaded) { # definitely no multithreading when CSVDB is involved
@ -1084,6 +1119,7 @@ sub process_table {
threadqueuelength => $tableprocessing_threadqueuelength,
get_db => $get_db,
tablename => $tablename,
class => $class,
selectstatement => $selectstatement,
blocksize => $blocksize,
rowcount => $rowcount,

@ -13,9 +13,10 @@ our @EXPORT_OK = qw();
sub new {
my $base_class = shift;
my $class = shift;
my $self = bless {}, $class;
return init_record($self,@_);
return init_record($self,$class,@_);
}

Loading…
Cancel
Save