From a2d365e3df8233c8183677695e87c88e376653cb Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Fri, 5 Aug 2016 15:42:58 +0200 Subject: [PATCH] MT#18663 MT#20893 row bulk processing framework WIP #14 +support for handling multiple ngcp table versions at runtime +setting allowed_ips Change-Id: I7e87456d561407fd35b4518ed53a3c435c2e7798 --- .../Dao/Trunk/billing/billing_mappings.pm | 11 +- .../Dao/Trunk/billing/billing_profiles.pm | 9 +- .../Dao/Trunk/billing/contacts.pm | 11 +- .../Dao/Trunk/billing/contract_balances.pm | 11 +- .../Dao/Trunk/billing/contracts.pm | 11 +- .../Dao/Trunk/billing/domain_resellers.pm | 9 +- .../Dao/Trunk/billing/domains.pm | 9 +- .../Dao/Trunk/billing/lnp_numbers.pm | 15 +- .../Dao/Trunk/billing/lnp_providers.pm | 9 +- .../Dao/Trunk/billing/ncos_levels.pm | 9 +- .../Dao/Trunk/billing/products.pm | 11 +- .../Dao/Trunk/billing/voip_numbers.pm | 13 +- .../Dao/Trunk/billing/voip_subscribers.pm | 13 +- .../Dao/Trunk/kamailio/voicemail_users.pm | 11 +- .../Trunk/provisioning/voip_aig_sequence.pm | 142 ++++++++ .../provisioning/voip_allowed_ip_groups.pm | 170 +++++++++ .../Trunk/provisioning/voip_cf_mappings.pm | 9 +- .../Dao/Trunk/provisioning/voip_dbaliases.pm | 11 +- .../Dao/Trunk/provisioning/voip_domains.pm | 9 +- .../Trunk/provisioning/voip_preferences.pm | 15 +- .../Trunk/provisioning/voip_subscribers.pm | 11 +- .../provisioning/voip_usr_preferences.pm | 15 +- .../Dao/mr441/billing/lnp_providers.pm | 116 ++++++ .../Projects/Migration/IPGallery/Check.pm | 12 + .../Migration/IPGallery/Dao/import/Batch.pm | 17 +- .../IPGallery/Dao/import/FeatureOption.pm | 15 +- .../Dao/import/FeatureOptionSetItem.pm | 15 +- .../Migration/IPGallery/Dao/import/Lnp.pm | 17 +- .../IPGallery/Dao/import/Subscriber.pm | 17 +- .../IPGallery/Dao/import/UsernamePassword.pm | 15 +- .../Projects/Migration/IPGallery/Lnp.pm | 62 +++- .../Migration/IPGallery/Preferences.pm | 342 +++++++++--------- .../Projects/Migration/IPGallery/Settings.pm | 19 + .../Projects/Migration/IPGallery/process.pl | 120 ++++-- .../Projects/Migration/IPGallery/settings.cfg | 7 +- lib/NGCP/BulkProcessor/RestItem.pm | 1 + .../RestRequests/Trunk/BillingFees.pm | 4 +- .../RestRequests/Trunk/BillingProfiles.pm | 4 +- .../RestRequests/Trunk/BillingZones.pm | 4 +- .../RestRequests/Trunk/CallForwards.pm | 4 +- .../RestRequests/Trunk/Contracts.pm | 4 +- .../RestRequests/Trunk/CustomerContacts.pm | 4 +- .../RestRequests/Trunk/Customers.pm | 4 +- .../RestRequests/Trunk/Domains.pm | 4 +- .../RestRequests/Trunk/LnpCarriers.pm | 4 +- .../RestRequests/Trunk/NcosLevels.pm | 4 +- .../RestRequests/Trunk/Resellers.pm | 4 +- .../RestRequests/Trunk/Subscribers.pm | 4 +- .../RestRequests/Trunk/SystemContacts.pm | 4 +- lib/NGCP/BulkProcessor/Service.pm | 8 +- lib/NGCP/BulkProcessor/Service/TestService.pm | 13 +- lib/NGCP/BulkProcessor/SqlProcessor.pm | 178 +++++---- lib/NGCP/BulkProcessor/SqlRecord.pm | 3 +- 53 files changed, 1025 insertions(+), 538 deletions(-) create mode 100644 lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_aig_sequence.pm create mode 100644 lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_allowed_ip_groups.pm create mode 100644 lib/NGCP/BulkProcessor/Dao/mr441/billing/lnp_providers.pm diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_mappings.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_mappings.pm index afa16d9..dcb5120 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_mappings.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_mappings.pm @@ -47,11 +47,8 @@ my $insert_unique_fields = []; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -66,7 +63,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -134,7 +131,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_profiles.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_profiles.pm index c2276dc..9528686 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_profiles.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_profiles.pm @@ -56,11 +56,8 @@ my $indexes = {}; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -115,7 +112,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contacts.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contacts.pm index f5499ed..56a3157 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contacts.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contacts.pm @@ -72,11 +72,8 @@ my $insert_unique_fields = []; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -91,7 +88,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -152,7 +149,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm index 21a8523..7610b20 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm @@ -53,11 +53,8 @@ my $insert_unique_fields = []; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -72,7 +69,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -140,7 +137,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm index 564a6ad..cedb880 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm @@ -67,11 +67,8 @@ our $TERMINATED_STATE = 'terminated'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -114,7 +111,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -172,7 +169,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domain_resellers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domain_resellers.pm index 3fd7d0d..46c5145 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domain_resellers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domain_resellers.pm @@ -37,11 +37,8 @@ my $indexes = {}; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -106,7 +103,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domains.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domains.pm index 8260084..0919490 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domains.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domains.pm @@ -36,11 +36,8 @@ my $indexes = {}; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -95,7 +92,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_numbers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_numbers.pm index fced976..d5787ce 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_numbers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_numbers.pm @@ -53,11 +53,8 @@ my $insert_unique_fields = []; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -116,7 +113,7 @@ sub update_row { my ($xa_db,$data) = @_; check_table(); - return update_record($get_db,$xa_db,$tablename,$data); + return update_record($get_db,$xa_db,__PACKAGE__,$data); } @@ -125,7 +122,7 @@ sub delete_row { my ($xa_db,$data) = @_; check_table(); - return delete_record($get_db,$xa_db,$tablename,$data); + return delete_record($get_db,$xa_db,__PACKAGE__,$data); } @@ -136,7 +133,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -193,7 +190,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_providers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_providers.pm index 0c68fdc..4f2c730 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_providers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_providers.pm @@ -44,11 +44,8 @@ my $insert_unique_fields = []; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -110,7 +107,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/ncos_levels.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/ncos_levels.pm index 97845a8..730492c 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/ncos_levels.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/ncos_levels.pm @@ -39,11 +39,8 @@ my $indexes = {}; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -99,7 +96,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/products.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/products.pm index 1310379..ad98fe7 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/products.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/products.pm @@ -45,11 +45,8 @@ our $SIP_ACCOUNT_HANDLE = 'SIP_ACCOUNT'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -73,7 +70,7 @@ sub findby_resellerid_handle { push(@params,$handle); } my $rows = $db->db_get_all_arrayref($stmt,@params); - + return buildrecords_fromrows($rows,$load_recursive); } @@ -108,7 +105,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm index 3cac82b..1604401 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm @@ -60,11 +60,8 @@ our $ACTIVE_STATE = 'active'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -134,7 +131,7 @@ sub update_row { my ($xa_db,$data) = @_; check_table(); - return update_record($get_db,$xa_db,$tablename,$data); + return update_record($get_db,$xa_db,__PACKAGE__,$data); } @@ -145,7 +142,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -219,7 +216,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm index 90cbafd..0cf25c2 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm @@ -63,11 +63,8 @@ our $ACTIVE_STATE = 'active'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -138,7 +135,7 @@ sub update_row { my ($xa_db,$data) = @_; check_table(); - return update_record($get_db,$xa_db,$tablename,$data); + return update_record($get_db,$xa_db,__PACKAGE__,$data); } @@ -149,7 +146,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -224,7 +221,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_users.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_users.pm index d165b05..f0027c5 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_users.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_users.pm @@ -68,11 +68,8 @@ my $default_tz = 'vienna'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -87,7 +84,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -153,7 +150,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_aig_sequence.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_aig_sequence.pm new file mode 100644 index 0000000..d29bd97 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_aig_sequence.pm @@ -0,0 +1,142 @@ +package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted + rowinsertskipped + rowupdateskipped + rowupdated +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_provisioning_db +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_stmt + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + get_id + forupdate_increment + +); + +my $tablename = 'voip_aig_sequence'; +my $get_db = \&get_provisioning_db; + +my $expected_fieldnames = [ + 'id', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +my $start_value = 1; +my $increment = 1; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub get_id { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT ' . $db->columnidentifier('id') . ' FROM ' . $table; + return $db->db_get_value($stmt); + +} + +sub forupdate_increment { + + my ($xa_db) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT ' . $db->columnidentifier('id') . ' FROM ' . $table . ' FOR UPDATE'; + my $id = $xa_db->db_get_value($stmt); + if (defined $id) { + $stmt = 'UPDATE ' . $table . ' SET ' . $db->columnidentifier('id') . ' = ? WHERE ' . $db->columnidentifier('id') . ' = ?'; + if ($xa_db->db_do($stmt,$id + $increment,$id)) { + rowupdated($db,$tablename,getlogger(__PACKAGE__)); + return $id + $increment; + } else { + rowupdateskipped($db,$tablename,0,getlogger(__PACKAGE__)); + return undef; + } + } else { + $stmt = insert_stmt($db,__PACKAGE__); + if ($xa_db->db_do($stmt,$start_value)) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $start_value; + } else { + rowinsertskipped($db,$tablename,getlogger(__PACKAGE__)); + return undef; + } + } + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_allowed_ip_groups.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_allowed_ip_groups.pm new file mode 100644 index 0000000..5f487ff --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_allowed_ip_groups.pm @@ -0,0 +1,170 @@ +package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowsdeleted + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_provisioning_db +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_stmt + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + insert_rows + delete_groupid + + countby_groupid_ipnet + +); + +my $tablename = 'voip_allowed_ip_groups'; +my $get_db = \&get_provisioning_db; + +my $expected_fieldnames = [ + 'id', + 'group_id', + 'ipnet', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub insert_rows { + + my ($xa_db,$group_id,$ipnets) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + + my $stmt = insert_stmt($db,__PACKAGE__); + + my @ids = (); + foreach my $ipnet (@$ipnets) { + if ($xa_db->db_do($stmt,undef,$group_id,$ipnet)) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + push(@ids,$xa_db->db_last_insert_id()); + } + } + + return \@ids; +} + +sub delete_groupid { + + my ($xa_db,$group_id) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'DELETE FROM ' . $table . ' WHERE ' . + $db->columnidentifier('group_id') . ' = ?'; + my @params = ($group_id); + + my $count; + if ($count = $xa_db->db_do($stmt,@params)) { + rowsdeleted($db,$tablename,$count,$count,getlogger(__PACKAGE__)); + return 1; + } else { + rowsdeleted($db,$tablename,0,0,getlogger(__PACKAGE__)); + return 0; + } + +} + +sub countby_groupid_ipnet { + + my ($group_id,$ipnet) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + my @terms = (); + if ($group_id) { + push(@terms,$db->columnidentifier('group_id') . ' = ?'); + push(@params,$group_id); + } + if ($ipnet) { + push(@terms,$db->columnidentifier('ipnet') . ' = ?'); + push(@params,$ipnet); + } + + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm index a20a189..2273ed3 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm @@ -48,11 +48,8 @@ our $CFNA_TYPE = 'cfna'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -117,7 +114,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm index 672ce0c..613a30b 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm @@ -51,11 +51,8 @@ my $insert_unique_fields = []; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -140,7 +137,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -204,7 +201,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_domains.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_domains.pm index 4420d4c..65195e6 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_domains.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_domains.pm @@ -36,11 +36,8 @@ my $indexes = {}; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -95,7 +92,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm index 3011afa..05c12d4 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm @@ -35,6 +35,8 @@ our @EXPORT_OK = qw( $PEER_AUTH_REGISTER $FORCE_INBOUND_CALLS_TO_PEER + $ALLOWED_IPS_GRP_ATTRIBUTE + $CONCURRENT_MAX_TOTAL_ATTRIBUTE ); #$FORCE_OUTBOUND_CALLS_TO_PEER @@ -79,14 +81,15 @@ our $PEER_AUTH_REGISTER = 'peer_auth_register'; our $FORCE_INBOUND_CALLS_TO_PEER = 'force_inbound_calls_to_peer'; #our $FORCE_OUTBOUND_CALLS_TO_PEER = 'force_outbound_calls_to_peer'; +our $ALLOWED_IPS_GRP_ATTRIBUTE = 'allowed_ips_grp'; + +our $CONCURRENT_MAX_TOTAL_ATTRIBUTE = 'concurrent_max_total'; + sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -141,7 +144,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_subscribers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_subscribers.pm index 2d9eb83..62a1e55 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_subscribers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_subscribers.pm @@ -60,11 +60,8 @@ my $insert_unique_fields = []; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -97,7 +94,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -197,7 +194,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_usr_preferences.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_usr_preferences.pm index a2de9a9..c16f78e 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_usr_preferences.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_usr_preferences.pm @@ -61,11 +61,8 @@ our $FALSE = undef; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -131,7 +128,7 @@ sub update_row { my ($xa_db,$data) = @_; check_table(); - return update_record($get_db,$xa_db,$tablename,$data); + return update_record($get_db,$xa_db,__PACKAGE__,$data); } @@ -140,7 +137,7 @@ sub delete_row { my ($xa_db,$data) = @_; check_table(); - return delete_record($get_db,$xa_db,$tablename,$data); + return delete_record($get_db,$xa_db,__PACKAGE__,$data); } @@ -186,7 +183,7 @@ sub insert_row { if ('HASH' eq ref $_[0]) { my ($data,$insert_ignore) = @_; check_table(); - if (insert_record($db,$xa_db,$tablename,$data,$insert_ignore,$insert_unique_fields)) { + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { return $xa_db->db_last_insert_id(); } } else { @@ -248,7 +245,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Dao/mr441/billing/lnp_providers.pm b/lib/NGCP/BulkProcessor/Dao/mr441/billing/lnp_providers.pm new file mode 100644 index 0000000..496ea50 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr441/billing/lnp_providers.pm @@ -0,0 +1,116 @@ +package NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + findby_prefix +); + +my $tablename = 'lnp_providers'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'name', + 'prefix', + #'authoritative', + #'skip_rewrite', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_prefix { + + my ($prefix,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my @params = (); + my @terms = (); + if ($prefix) { + push(@terms,$db->columnidentifier('prefix') . ' = ?'); + push(@params,$prefix); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Check.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Check.pm index 8290ba8..46e866f 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Check.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Check.pm @@ -24,11 +24,14 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw(); +use NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); @@ -107,6 +110,9 @@ sub check_billing_db_tables { $result &= $check_result; push(@$messages,$message); ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers'); + if (not $check_result) { + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers'); + } $result &= $check_result; push(@$messages,$message); return $result; @@ -167,6 +173,12 @@ sub check_provisioning_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences'); $result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups'); + $result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases'); $result &= $check_result; push(@$messages,$message); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Batch.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Batch.pm index fb9614f..12e09f5 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Batch.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Batch.pm @@ -68,11 +68,8 @@ our $added_delta = 'ADDED'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -86,8 +83,8 @@ sub create_table { my $db = &$get_db(); - registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); - return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef); + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); } @@ -245,7 +242,7 @@ sub process_records { return process_table( get_db => $get_db, - tablename => $tablename, + class => __PACKAGE__, process_code => sub { my ($context,$rowblock,$row_offset) = @_; return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset); @@ -263,7 +260,7 @@ sub getinsertstatement { my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename,$insert_ignore); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); } @@ -298,7 +295,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/FeatureOption.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/FeatureOption.pm index e2da414..d52fb7c 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/FeatureOption.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/FeatureOption.pm @@ -65,11 +65,8 @@ our $added_delta = 'ADDED'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -83,8 +80,8 @@ sub create_table { my $db = &$get_db(); - registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); - return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef); + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); } @@ -235,7 +232,7 @@ sub getinsertstatement { my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename,$insert_ignore); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); } @@ -271,7 +268,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/FeatureOptionSetItem.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/FeatureOptionSetItem.pm index 93b0d98..6051d5e 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/FeatureOptionSetItem.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/FeatureOptionSetItem.pm @@ -82,11 +82,8 @@ our $FORWARD_UNAVAILABLE_OPTION_SET = 'Forward_Unavailable'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -100,8 +97,8 @@ sub create_table { my $db = &$get_db(); - registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); - return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef); + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); } @@ -264,7 +261,7 @@ sub getinsertstatement { my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename,$insert_ignore); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); } @@ -301,7 +298,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Lnp.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Lnp.pm index 2342680..fce010e 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Lnp.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Lnp.pm @@ -76,11 +76,8 @@ our $IN_TYPE = 'In'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -94,8 +91,8 @@ sub create_table { my $db = &$get_db(); - registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); - return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef); + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); } @@ -328,7 +325,7 @@ sub process_records { return process_table( get_db => $get_db, - tablename => $tablename, + class => __PACKAGE__, process_code => sub { my ($context,$rowblock,$row_offset) = @_; return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset); @@ -346,7 +343,7 @@ sub getinsertstatement { my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename,$insert_ignore); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); } @@ -382,7 +379,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Subscriber.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Subscriber.pm index 5e2358e..f836b7e 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Subscriber.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/Subscriber.pm @@ -80,11 +80,8 @@ our $added_delta = 'ADDED'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -98,8 +95,8 @@ sub create_table { my $db = &$get_db(); - registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); - return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef); + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); } @@ -283,7 +280,7 @@ sub process_records { return process_table( get_db => $get_db, - tablename => $tablename, + class => __PACKAGE__, process_code => sub { my ($context,$rowblock,$row_offset) = @_; return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset); @@ -301,7 +298,7 @@ sub getinsertstatement { my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename,$insert_ignore); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); } @@ -338,7 +335,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/UsernamePassword.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/UsernamePassword.pm index 4223543..0e46604 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/UsernamePassword.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/import/UsernamePassword.pm @@ -71,11 +71,8 @@ our $added_delta = 'ADDED'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, - $tablename, - $expected_fieldnames,$indexes); - - bless($self,$class); + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); copy_row($self,shift,$expected_fieldnames); @@ -89,8 +86,8 @@ sub create_table { my $db = &$get_db(); - registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); - return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef); + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); } @@ -223,7 +220,7 @@ sub getinsertstatement { my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename,$insert_ignore); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); } @@ -260,7 +257,7 @@ sub gettablename { sub check_table { return checktableinfo($get_db, - $tablename, + __PACKAGE__,$tablename, $expected_fieldnames, $indexes); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Lnp.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Lnp.pm index 608648c..e3c29fb 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Lnp.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Lnp.pm @@ -11,6 +11,7 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw( $create_lnps_multithreading $create_lnps_numofthreads + $create_lnp_block_txn ); use NGCP::BulkProcessor::Logging qw ( @@ -26,6 +27,7 @@ use NGCP::BulkProcessor::LogError qw( use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Lnp qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw(); +use NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw(); use NGCP::BulkProcessor::ConnectorPool qw( @@ -56,25 +58,48 @@ sub create_lnps { process_code => sub { my ($context,$records,$row_offset) = @_; my $rownum = $row_offset; - eval { - $context->{db}->db_begin(); + if ($create_lnp_block_txn) { + eval { + $context->{db}->db_begin(); + foreach my $imported_lnp (@$records) { + $rownum++; + next unless _reset_context($context,$imported_lnp,$rownum); + _create_lnp($context); + } + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + die($err) if !$skip_errors; + } + } else { foreach my $imported_lnp (@$records) { $rownum++; next unless _reset_context($context,$imported_lnp,$rownum); - _create_lnp($context); - } - if ($dry) { - $context->{db}->db_rollback(0); - } else { - $context->{db}->db_commit(); + eval { + $context->{db}->db_begin(); + _create_lnp($context); + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + die($err) if !$skip_errors; + } } - }; - my $err = $@; - if ($err) { - eval { - $context->{db}->db_rollback(1); - }; - die($err) if !$skip_errors; } #return 0; @@ -186,6 +211,13 @@ sub _create_lnps_checks { eval { $lnp_providers = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_prefix($prefix); }; + if ($@) { + rowprocessingwarn(threadid(),"falling back to mr4.4.1 lnp_providers table definition ...",getlogger(__PACKAGE__)); + eval { + $lnp_providers = NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers::findby_prefix($prefix); + }; + }; + if ($@ or (scalar @$lnp_providers) != 1) { rowprocessingerror(threadid(),"cannot find a (unique) lnp carrier with prefix '$prefix'",getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Preferences.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Preferences.pm index 942b2e4..d31939e 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Preferences.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Preferences.pm @@ -25,6 +25,9 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw( $set_allowed_ips_multithreading $set_allowed_ips_numofthreads $allowed_ips + + $set_preference_bulk_multithreading + $set_preference_bulk_numofthreads ); use NGCP::BulkProcessor::Logging qw ( @@ -49,6 +52,8 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch qw() use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw(); use NGCP::BulkProcessor::ConnectorPool qw( get_xa_db @@ -72,8 +77,12 @@ our @EXPORT_OK = qw( set_allowed_ips set_allowed_ips_batch + set_preference_bulk + set_preference_bulk_batch + clear_preferences set_preference + get_preference $INIT_PEER_AUTH_MODE $SWITCHOVER_PEER_AUTH_MODE @@ -688,27 +697,6 @@ sub _reset_set_peer_auth_context { } - - - - - - - - - - - - - - - - - - - - - sub set_allowed_ips { my ($mode) = @_; @@ -820,59 +808,14 @@ sub _set_allowed_ips_checks { my $result = _checks($context); eval { - $context->{peer_auth_user_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_USER); + $context->{allowed_ips_grp_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE); }; - if ($@ or not defined $context->{peer_auth_user_attribute}) { - rowprocessingerror(threadid(),'cannot find peer_auth_user attribute',getlogger(__PACKAGE__)); + if ($@ or not defined $context->{allowed_ips_grp_attribute}) { + rowprocessingerror(threadid(),'cannot find allowed_ips_grp attribute',getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. } - eval { - $context->{peer_auth_pass_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_PASS); - }; - if ($@ or not defined $context->{peer_auth_pass_attribute}) { - rowprocessingerror(threadid(),'cannot find peer_auth_pass attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } - - eval { - $context->{peer_auth_realm_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_REALM); - }; - if ($@ or not defined $context->{peer_auth_realm_attribute}) { - rowprocessingerror(threadid(),'cannot find peer_auth_realm attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } - - eval { - $context->{peer_auth_register_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_REGISTER); - }; - if ($@ or not defined $context->{peer_auth_register_attribute}) { - rowprocessingerror(threadid(),'cannot find peer_auth_register attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } - - eval { - $context->{force_inbound_calls_to_peer_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::FORCE_INBOUND_CALLS_TO_PEER); - }; - if ($@ or not defined $context->{force_inbound_calls_to_peer_attribute}) { - rowprocessingerror(threadid(),'cannot find force_inbound_calls_to_peer attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } - - #eval { - # $context->{force_outbound_calls_to_peer_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - # $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::FORCE_OUTBOUND_CALLS_TO_PEER); - #}; - #if ($@ or not defined $context->{force_outbound_calls_to_peer_attribute}) { - # rowprocessingerror(threadid(),'cannot find force_outbound_calls_to_peer attribute',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} - return $result; } @@ -880,55 +823,33 @@ sub _set_allowed_ips_preferences { my ($context) = @_; - #if ($INIT_PEER_AUTH_MODE eq $context->{mode}) { - # - # $context->{peer_auth_user_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_user_attribute},$context->{username}); - # $context->{peer_auth_pass_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_pass_attribute},$context->{password}); - # $context->{peer_auth_realm_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_realm_attribute},$context->{realm}); - # - # $context->{peer_auth_register_attribute_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_register_attribute},$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::FALSE); - # $context->{force_inbound_calls_to_peer_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{force_inbound_calls_to_peer_attribute},$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE); - # - # _info($context,"($context->{rownum}) " . $context->{mode} . ' peer authentication preferences for subscriber ' . $context->{cli},1); - # - #} elsif ($SWITCHOVER_PEER_AUTH_MODE eq $context->{mode}) { - # - # $context->{peer_auth_user_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_user_attribute},$context->{username}); - # $context->{peer_auth_pass_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_pass_attribute},$context->{password}); - # $context->{peer_auth_realm_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_realm_attribute},$context->{realm}); - # - # $context->{peer_auth_register_attribute_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_register_attribute},$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE); - # $context->{force_inbound_calls_to_peer_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{force_inbound_calls_to_peer_attribute},$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::FALSE); - # - # _info($context,"($context->{rownum}) " . $context->{mode} . ' peer authentication preferences for subscriber ' . $context->{cli},1); - # - #} elsif ($CLEAR_PEER_AUTH_MODE eq $context->{mode}) { - # - # $context->{peer_auth_user_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_user_attribute},undef); - # $context->{peer_auth_pass_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_pass_attribute},undef); - # $context->{peer_auth_realm_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_realm_attribute},undef); - # - # $context->{peer_auth_register_attribute_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{peer_auth_register_attribute},undef); - # $context->{force_inbound_calls_to_peer_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, - # $context->{force_inbound_calls_to_peer_attribute},undef); - # - # _info($context,"($context->{rownum}) " . $context->{mode} . ' peer authentication preferences for subscriber ' . $context->{cli},1); - # - #} + my $subscriber_id = $context->{provisioning_voip_subscriber}->{id}; + my $attribute = $context->{allowed_ips_grp_attribute}; + + my $allowed_ips_grp_attribute_preference = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid( + $context->{db},$subscriber_id,$attribute->{id})->[0]; + + if (defined $allowed_ips_grp_attribute_preference) { + $context->{allowed_ip_group_id} = $allowed_ips_grp_attribute_preference->{value}; + $context->{allowed_ips_grp_attribute_preference_id} = $allowed_ips_grp_attribute_preference->{id}; + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::delete_groupid($context->{db},$context->{allowed_ip_group_id}); + _info($context,"($context->{rownum}) " . 'allowed ips group for subscriber ' . $context->{cli} . ' exists, ipnets deleted',1); + } else { + $context->{allowed_ip_group_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::forupdate_increment($context->{db}); + _info($context,"($context->{rownum}) " . 'new allowed ips group id for subscriber ' . $context->{cli} . ' aquired',1); + } + + $context->{allowed_ips_grp_ipnet_ids} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::insert_rows($context->{db},$context->{allowed_ip_group_id},$context->{allowed_ips}); + _info($context,"($context->{rownum}) " . 'new allowed ips group id for subscriber ' . $context->{cli} . ' aquired',1); + + if (not defined $allowed_ips_grp_attribute_preference) { + $context->{allowed_ips_grp_attribute_preference_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db}, + attribute_id => $attribute->{id}, + subscriber_id => $subscriber_id, + value => $context->{allowed_ip_group_id}, + ); + _info($context,"($context->{rownum}) " . 'new allowed ips group preference value for subscriber ' . $context->{cli} . ' added',1); + } } @@ -938,75 +859,160 @@ sub _reset_set_allowed_ips_context { my $result = _reset_context($context,$imported_subscriber,$rownum); - $context->{realm} = $peer_auth_realm; - #$context->{mode} = $mode; - - delete $context->{peer_auth_user_preference_id}; - delete $context->{peer_auth_pass_preference_id}; - delete $context->{peer_auth_realm_preference_id}; + $context->{allowed_ips} = $allowed_ips; - delete $context->{peer_auth_register_attribute_preference_id}; - delete $context->{force_inbound_calls_to_peer_preference_id}; + delete $context->{allowed_ip_group_id}; + delete $context->{allowed_ips_grp_attribute_preference_id}; + delete $context->{allowed_ips_grp_ipnet_ids}; return $result; } +sub set_preference_bulk { + my ($bulk_attribute_name,$value) = @_; + my $static_context = { bulk_attribute_name => $bulk_attribute_name, value => $value }; + my $result = _set_preference_bulk_checks($static_context); + destroy_all_dbs(); + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::process_records( + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + my $rownum = $row_offset; + foreach my $imported_subscriber (@$records) { + $rownum++; + next unless _reset_set_preference_bulk_context($context,$imported_subscriber,$rownum); + _set_preference_bulk($context); + } + #return 0; + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + # below is not mandatory.. + _check_insert_tables(); + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + load_recursive => 0, + multithreading => $set_preference_bulk_multithreading, + numofthreads => $set_preference_bulk_numofthreads, + ),$warning_count); +} +sub set_preference_bulk_batch { + my ($bulk_attribute_name,$value) = @_; + my $static_context = { bulk_attribute_name => $bulk_attribute_name, value => $value }; + my $result = _set_preference_bulk_checks($static_context); + destroy_all_dbs(); + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::process_records( + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + my $rownum = $row_offset; + foreach my $record (@$records) { + $rownum++; + my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number}); + if (defined $imported_subscriber) { + next unless _reset_set_preference_bulk_context($context,$imported_subscriber,$rownum); + _set_preference_bulk($context); + } else { + if ($skip_errors) { + _warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number}); + next; + } else { + _error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number}); + } + } + } + #return 0; + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + # below is not mandatory.. + _check_insert_tables(); + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + load_recursive => 0, + multithreading => $set_preference_bulk_multithreading, + numofthreads => $set_preference_bulk_numofthreads, + ),$warning_count); +} +sub _set_preference_bulk { + my ($context) = @_; + _set_subscriber_preference($context,\&_set_preferences_bulk); +} +sub _set_preference_bulk_checks { + my ($context) = @_; + my $result = _checks($context); + eval { + $context->{bulk_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $context->{bulk_attribute_name}); + }; + if ($@ or not defined $context->{bulk_attribute}) { + rowprocessingerror(threadid(),'cannot find ' . $context->{bulk_attribute_name} . ' attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } + return $result; +} +sub _set_preferences_bulk { + my ($context) = @_; + $context->{bulk_attribute_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, + $context->{bulk_attribute},$context->{value}); + _info($context,"($context->{rownum}) " . 'set ' . $context->{bulk_attribute_name} . ' preference value for subscriber ' . $context->{cli},1); +} +sub _reset_set_preference_bulk_context { + my ($context,$imported_subscriber,$rownum) = @_; + my $result = _reset_context($context,$imported_subscriber,$rownum); + delete $context->{bulk_attribute_id}; + return $result; - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +} sub clear_preferences { @@ -1020,10 +1026,9 @@ sub clear_preferences { sub set_preference { my ($context,$subscriber_id,$attribute,$value) = @_; - my $old_preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db}, - $subscriber_id,$attribute->{id}); - if ($attribute->{max_occur} == 1) { + my $old_preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db}, + $subscriber_id,$attribute->{id}); if (defined $value) { if ((scalar @$old_preferences) == 1) { NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::update_row($context->{db},{ @@ -1061,11 +1066,18 @@ sub set_preference { } +sub get_preference { + my ($context,$subscriber_id,$attribute) = @_; + my $preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db}, + $subscriber_id,$attribute->{id}); + if ($attribute->{max_occur} == 1) { + return $preferences->[0]; + } else { + return $preferences; + } - - - +} sub _error { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm index c9921e1..c0c01a7 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm @@ -118,6 +118,11 @@ our @EXPORT_OK = qw( $create_lnps_multithreading $create_lnps_numofthreads + $create_lnp_block_txn + + $set_preference_bulk_multithreading + $set_preference_bulk_numofthreads + $concurrent_max_total ); our $defaultconfig = 'config.cfg'; @@ -203,6 +208,11 @@ our $ringtimeout = undef; our $create_lnps_multithreading = $enablemultithreading; our $create_lnps_numofthreads = $cpucount; +our $create_lnp_block_txn = 0; + +our $set_preference_bulk_multithreading = $enablemultithreading; +our $set_preference_bulk_numofthreads = $cpucount; +our $concurrent_max_total = undef; sub update_settings { @@ -310,6 +320,15 @@ sub update_settings { $create_lnps_multithreading = $data->{create_lnps_multithreading} if exists $data->{create_lnps_multithreading}; $create_lnps_numofthreads = _get_import_numofthreads($cpucount,$data,'create_lnps_numofthreads'); + $create_lnp_block_txn = $data->{create_lnp_block_txn} if exists $data->{create_lnp_block_txn}; + + $set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading}; + $set_preference_bulk_numofthreads = _get_import_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads'); + $concurrent_max_total = $data->{concurrent_max_total} if exists $data->{concurrent_max_total}; + if (defined $concurrent_max_total and $concurrent_max_total <= 0) { + configurationerror($configfile,'empty concurrent_max_total or greater than 0 required',getlogger(__PACKAGE__)); + $result = 0; + } return $result; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/process.pl b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/process.pl index 4d8c1f9..4bee18a 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/process.pl @@ -31,6 +31,8 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw( $reseller_id $barring_profiles_yml $barring_profiles + $allowed_ips + $concurrent_max_total ); use NGCP::BulkProcessor::Logging qw( init_log @@ -75,10 +77,13 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw(); +use NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); use NGCP::BulkProcessor::Projects::Migration::IPGallery::Check qw( @@ -112,6 +117,9 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Preferences qw( set_allowed_ips set_allowed_ips_batch + set_preference_bulk + set_preference_bulk_batch + $INIT_PEER_AUTH_MODE $SWITCHOVER_PEER_AUTH_MODE $CLEAR_PEER_AUTH_MODE @@ -180,12 +188,15 @@ push(@TASK_OPTS,$switchover_peer_auth_task_opt); my $clear_peer_auth_task_opt = 'clear_peer_auth'; push(@TASK_OPTS,$clear_peer_auth_task_opt); -#my $set_allowed_ips_task_opt = 'set_allowed_ips'; -#push(@TASK_OPTS,$set_allowed_ips_task_opt); +my $set_allowed_ips_task_opt = 'set_allowed_ips'; +push(@TASK_OPTS,$set_allowed_ips_task_opt); my $set_call_forwards_task_opt = 'set_call_forwards'; push(@TASK_OPTS,$set_call_forwards_task_opt); +my $set_concurrent_max_total_task_opt = 'set_concurrent_max_total'; +push(@TASK_OPTS,$set_concurrent_max_total_task_opt); + my $create_lnps_task_opt = 'create_lnps'; push(@TASK_OPTS,$create_lnps_task_opt); @@ -301,12 +312,12 @@ sub main() { $completion |= 1; } - #} elsif (lc($set_allowed_ips_task_opt) eq lc($task)) { - # if (taskinfo($set_allowed_ips_task_opt,$result,1) and ($result = batchinfo($result))) { - # next unless check_dry(); - # $result &= set_allowed_ips_task(\@messages); - # $completion |= 1; - # } + } elsif (lc($set_allowed_ips_task_opt) eq lc($task)) { + if (taskinfo($set_allowed_ips_task_opt,$result,1) and ($result = batchinfo($result))) { + next unless check_dry(); + $result &= set_allowed_ips_task(\@messages); + $completion |= 1; + } } elsif (lc($set_call_forwards_task_opt) eq lc($task)) { if (taskinfo($set_call_forwards_task_opt,$result,1) and ($result = batchinfo($result))) { @@ -315,6 +326,15 @@ sub main() { $completion |= 1; } + } elsif (lc($set_concurrent_max_total_task_opt) eq lc($task)) { + if (taskinfo($set_concurrent_max_total_task_opt,$result,1) and ($result = batchinfo($result))) { + next unless check_dry(); + $result &= set_preference_bulk_task(\@messages, + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CONCURRENT_MAX_TOTAL_ATTRIBUTE, + $concurrent_max_total); + $completion |= 1; + } + } elsif (lc($create_lnps_task_opt) eq lc($task)) { if (taskinfo($create_lnps_task_opt,$result)) { next unless check_dry(); @@ -878,20 +898,20 @@ sub set_peer_auth_task { my $force_inbound_calls_to_peer_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::FORCE_INBOUND_CALLS_TO_PEER); - $stats .= "\n " . $peer_auth_user_attribute->{attribute} . "': " . + $stats .= "\n '" . $peer_auth_user_attribute->{attribute} . "': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, $peer_auth_user_attribute->{id},undef) . ' rows'; - $stats .= "\n " . $peer_auth_pass_attribute->{attribute} . "': " . + $stats .= "\n '" . $peer_auth_pass_attribute->{attribute} . "': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, $peer_auth_pass_attribute->{id},undef) . ' rows'; - $stats .= "\n " . $peer_auth_realm_attribute->{attribute} . "': " . + $stats .= "\n '" . $peer_auth_realm_attribute->{attribute} . "': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, $peer_auth_realm_attribute->{id},undef) . ' rows'; - $stats .= "\n " . $peer_auth_register_attribute->{attribute} . "': " . + $stats .= "\n '" . $peer_auth_register_attribute->{attribute} . "': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, $peer_auth_register_attribute->{id},undef) . ' rows'; - $stats .= "\n " . $force_inbound_calls_to_peer_attribute->{attribute} . "': " . + $stats .= "\n '" . $force_inbound_calls_to_peer_attribute->{attribute} . "': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, $force_inbound_calls_to_peer_attribute->{id},undef) . ' rows'; }; @@ -922,33 +942,17 @@ sub set_allowed_ips_task { my $err = $@; my $stats = ($skip_errors ? ": $warning_count warnings" : ''); eval { - my $peer_auth_user_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_USER); - my $peer_auth_pass_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_PASS); - my $peer_auth_realm_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_REALM); - my $peer_auth_register_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_REGISTER); - my $force_inbound_calls_to_peer_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::FORCE_INBOUND_CALLS_TO_PEER); - $stats .= "\n " . $peer_auth_user_attribute->{attribute} . "': " . - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, - $peer_auth_user_attribute->{id},undef) . ' rows'; - $stats .= "\n " . $peer_auth_pass_attribute->{attribute} . "': " . + my $allowed_ips_grp_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE); + $stats .= "\n '" . $allowed_ips_grp_attribute->{attribute} . "': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, - $peer_auth_pass_attribute->{id},undef) . ' rows'; - $stats .= "\n " . $peer_auth_realm_attribute->{attribute} . "': " . - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, - $peer_auth_realm_attribute->{id},undef) . ' rows'; + $allowed_ips_grp_attribute->{id},undef) . ' rows'; + foreach my $ipnet (@$allowed_ips) { + $stats .= "\n '$ipnet': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::countby_groupid_ipnet(undef,$ipnet) . ' rows'; + } + $stats .= "\n voip_aig_sequence: " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::get_id(); - $stats .= "\n " . $peer_auth_register_attribute->{attribute} . "': " . - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, - $peer_auth_register_attribute->{id},undef) . ' rows'; - $stats .= "\n " . $force_inbound_calls_to_peer_attribute->{attribute} . "': " . - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, - $force_inbound_calls_to_peer_attribute->{id},undef) . ' rows'; }; if ($err or !$result) { push(@$messages,"set subscribers\' allowed_ips preference INCOMPLETE$stats"); @@ -1009,12 +1013,21 @@ sub create_lnps_task { }; my $err = $@; my $stats = ($skip_errors ? ": $warning_count warnings" : ''); + my $lnp_providers = []; + eval { + $lnp_providers = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_prefix(); + }; + if ($@) { + eval { + $lnp_providers = NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers::findby_prefix(); + }; + } eval { $stats .= "\n lnp_numbers: " . NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number() . ' rows'; - foreach my $lnp_provider (@{NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_prefix()}) { + foreach my $lnp_provider (@$lnp_providers) { $stats .= "\n '" . $lnp_provider->{name} . "': " . NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number($lnp_provider->{id}) . ' rows'; } @@ -1030,6 +1043,37 @@ sub create_lnps_task { } +sub set_preference_bulk_task { + + my ($messages,$bulk_attribute_name,$value) = @_; + my ($result,$warning_count) = (0,0); + eval { + if ($batch) { + ($result,$warning_count) = set_preference_bulk_batch($bulk_attribute_name,$value); + } else { + ($result,$warning_count) = set_preference_bulk($bulk_attribute_name,$value); + } + }; + my $err = $@; + my $stats = ($skip_errors ? ": $warning_count warnings" : ''); + eval { + my $bulk_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute($bulk_attribute_name); + + $stats .= "\n '" . $bulk_attribute->{attribute} . "': " . + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, + $bulk_attribute->{id},undef) . ' rows'; + + }; + if ($err or !$result) { + push(@$messages,"set subscribers\' $bulk_attribute_name preference INCOMPLETE$stats"); + } else { + push(@$messages,"set subscribers\' $bulk_attribute_name preference completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + #END { # # this should not be required explicitly, but prevents Log4Perl's # # "rootlogger not initialized error upon exit.. diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/settings.cfg index 39cc035..9d6d0c4 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/settings.cfg @@ -45,7 +45,7 @@ peer_auth_realm = myrealm set_allowed_ips_multithreading = 1 #set_allowed_ips_numofthreads = 6 -allowed_ips=127.0.0.1/24 +allowed_ips=127.0.0.1/24,127.0.0.2/24 set_call_forwards_multithreading = 1 #set_call_forwards_numofthreads = 6 @@ -61,3 +61,8 @@ ringtimeout = 20 create_lnps_multithreading = 1 #create_lnps_numofthreads = 6 +create_lnp_block_txn = 0 + +set_preference_bulk_multithreading = 1 +#set_preference_bulk_numofthreads = 6 +concurrent_max_total = 2 diff --git a/lib/NGCP/BulkProcessor/RestItem.pm b/lib/NGCP/BulkProcessor/RestItem.pm index fb890c9..2a3a6d5 100644 --- a/lib/NGCP/BulkProcessor/RestItem.pm +++ b/lib/NGCP/BulkProcessor/RestItem.pm @@ -13,6 +13,7 @@ our @EXPORT_OK = qw(); sub new { + my $base_class = shift; my $class = shift; my $self = bless {}, $class; return init_item($self,@_); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingFees.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingFees.pm index 31850f7..90ce912 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingFees.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingFees.pm @@ -53,9 +53,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm index 468de9a..86401c8 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm @@ -57,9 +57,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingZones.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingZones.pm index 0291328..1d7d769 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingZones.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingZones.pm @@ -41,9 +41,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/CallForwards.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/CallForwards.pm index d1b58e5..f47425d 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/CallForwards.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/CallForwards.pm @@ -44,9 +44,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm index b032ffe..642af73 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm @@ -47,9 +47,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/CustomerContacts.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/CustomerContacts.pm index 1ce4825..f8615f8 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/CustomerContacts.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/CustomerContacts.pm @@ -65,9 +65,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm index 300c4be..4a9f1d7 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm @@ -59,9 +59,7 @@ our $TERMINATED_STATE = 'terminated'; sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Domains.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Domains.pm index 6ad8b30..1e93323 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Domains.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Domains.pm @@ -50,9 +50,7 @@ my $get_item_filter_path_query = sub { sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/LnpCarriers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/LnpCarriers.pm index 5188399..c0b0624 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/LnpCarriers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/LnpCarriers.pm @@ -52,9 +52,7 @@ my $get_item_filter_path_query = sub { sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/NcosLevels.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/NcosLevels.pm index b506b34..ae856ee 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/NcosLevels.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/NcosLevels.pm @@ -53,9 +53,7 @@ my $get_item_filter_path_query = sub { sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Resellers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Resellers.pm index 5bb9466..624c97e 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Resellers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Resellers.pm @@ -44,9 +44,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Subscribers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Subscribers.pm index 62d6f58..96b806b 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Subscribers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Subscribers.pm @@ -63,9 +63,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/SystemContacts.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/SystemContacts.pm index 93719b2..096e86e 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/SystemContacts.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/SystemContacts.pm @@ -64,9 +64,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::RestItem->new($fieldnames); - - bless($self,$class); + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); copy_row($self,shift,$fieldnames); diff --git a/lib/NGCP/BulkProcessor/Service.pm b/lib/NGCP/BulkProcessor/Service.pm index 0460fc1..5018ea9 100755 --- a/lib/NGCP/BulkProcessor/Service.pm +++ b/lib/NGCP/BulkProcessor/Service.pm @@ -43,10 +43,11 @@ my $instance_counts = {}; sub new { - #my $class = shift; + my $base_class = shift; + my $class = shift; #my $self = bless {}, $class; - my ($class,$functions,$derived_class,$serialization_format,$no_autostart) = @_; - my $self = bless {}, $derived_class; + my ($functions,$serialization_format,$no_autostart) = @_; + my $self = bless {}, $class; $self->{worker} = undef; $self->{functions} = $functions; @@ -78,6 +79,7 @@ sub new { #$self = share($self); #autostart?? + servicedebug($self,$class . ' service created',getlogger(__PACKAGE__)); return $self; diff --git a/lib/NGCP/BulkProcessor/Service/TestService.pm b/lib/NGCP/BulkProcessor/Service/TestService.pm index 8c7ed6a..bd96847 100755 --- a/lib/NGCP/BulkProcessor/Service/TestService.pm +++ b/lib/NGCP/BulkProcessor/Service/TestService.pm @@ -5,7 +5,7 @@ use strict; use NGCP::BulkProcessor::Logging qw(getlogger servicedebug); -use NGCP::BulkProcessor::Service; +use NGCP::BulkProcessor::Service qw(); #use test::csv_table; # qw(test_table_bycolumn1); #use test::mysql_table; @@ -42,15 +42,8 @@ my $functions = { sub new { - #my $class = shift; - #my $self = NGCP::BulkProcessor::Service->new($functions,$class); - - #bless($self,$class); - - #return $self; - - my $self = NGCP::BulkProcessor::Service->new($functions,@_); - servicedebug($self,__PACKAGE__ . ' service created',getlogger(__PACKAGE__)); + my $class = shift; + my $self = NGCP::BulkProcessor::Service->new($class,$functions,@_); return $self; } diff --git a/lib/NGCP/BulkProcessor/SqlProcessor.pm b/lib/NGCP/BulkProcessor/SqlProcessor.pm index 3421a75..8964b14 100644 --- a/lib/NGCP/BulkProcessor/SqlProcessor.pm +++ b/lib/NGCP/BulkProcessor/SqlProcessor.pm @@ -81,6 +81,7 @@ our @EXPORT_OK = qw( #transfer_record #transfer_records +my $table_names = {}; my $table_expected_fieldnames = {}; my $table_fieldnames_cached = {}; my $table_primarykeys = {}; @@ -107,17 +108,17 @@ my $ERROR = 4; sub init_record { - my ($record,$get_db,$tablename,$expected_fieldnames,$target_indexes) = @_; + my ($record,$class,$get_db,$tablename,$expected_fieldnames,$target_indexes) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); - checktableinfo($db,$tablename,$expected_fieldnames,$target_indexes); + checktableinfo($db,$class,$tablename,$expected_fieldnames,$target_indexes); - if (defined $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}) { # and ref $table_fieldnames_cached->{$connectidentifier}->{$tablename} eq 'ARRAY') { + if (defined $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}) { # and ref $table_fieldnames_cached->{$connectidentifier}->{$tablename} eq 'ARRAY') { # if there are fieldnames defined, we make a member variable for each and set it to undef - foreach my $fieldname (@{$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}}) { + foreach my $fieldname (@{$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}}) { $record->{$fieldname} = undef; } } @@ -170,6 +171,12 @@ sub cleartableinfo { my $found = 0; + if (exists $table_names->{$tid}) { + if (exists $table_names->{$tid}->{$connectidentifier}) { + delete $table_names->{$tid}->{$connectidentifier}; + $found = 1; + } + } if (exists $table_expected_fieldnames->{$tid}) { if (exists $table_expected_fieldnames->{$tid}->{$connectidentifier}) { delete $table_expected_fieldnames->{$tid}->{$connectidentifier}; @@ -195,6 +202,10 @@ sub cleartableinfo { } } + if ((scalar keys %{$table_names->{$tid}}) == 0) { + delete $table_names->{$tid}; + $found = 1; + } if ((scalar keys %{$table_expected_fieldnames->{$tid}}) == 0) { delete $table_expected_fieldnames->{$tid}; $found = 1; @@ -220,13 +231,20 @@ sub cleartableinfo { sub registertableinfo { # to prepare creation of non-existent tables.. - my ($get_db,$tablename,$fieldnames,$indexes,$keycols) = @_; + my ($get_db,$class,$tablename,$fieldnames,$indexes,$keycols) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); + if (not exists $table_names->{$tid}) { + $table_names->{$tid} = {}; + } + if (not exists $table_names->{$tid}->{$connectidentifier}) { + $table_names->{$tid}->{$connectidentifier} = {}; + } + $table_names->{$tid}->{$connectidentifier}->{$class} = $tablename; if (not exists $table_expected_fieldnames->{$tid}) { $table_expected_fieldnames->{$tid} = {}; } @@ -234,8 +252,9 @@ sub registertableinfo { # to prepare creation of non-existent tables.. # create an empty category for the connection if none exists yet: $table_expected_fieldnames->{$tid}->{$connectidentifier} = {}; } + # we prefer to always update the expected fieldnames (that come from a derived class) - $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename} = $fieldnames // []; + $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = $fieldnames // []; if (not exists $table_fieldnames_cached->{$tid}) { $table_fieldnames_cached->{$tid} = {}; @@ -252,7 +271,7 @@ sub registertableinfo { # to prepare creation of non-existent tables.. # create an empty primary key column name cache for the connection if none exists yet: $table_primarykeys->{$tid}->{$connectidentifier} = {}; } - $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename} = $keycols // []; + $table_primarykeys->{$tid}->{$connectidentifier}->{$class} = $keycols // []; if (not exists $table_target_indexes->{$tid}) { $table_target_indexes->{$tid} = {}; @@ -262,13 +281,13 @@ sub registertableinfo { # to prepare creation of non-existent tables.. $table_target_indexes->{$tid}->{$connectidentifier} = {}; } # we prefer to always update the target table indexes (that come from a derived class) - $table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = $indexes // {}; + $table_target_indexes->{$tid}->{$connectidentifier}->{$class} = $indexes // {}; } sub checktableinfo { - my ($get_db,$tablename,$expected_fieldnames,$target_indexes) = @_; + my ($get_db,$class,$tablename,$expected_fieldnames,$target_indexes) = @_; my $result = 1; @@ -277,6 +296,13 @@ sub checktableinfo { my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); + if (not exists $table_names->{$tid}) { + $table_names->{$tid} = {}; + } + if (not exists $table_names->{$tid}->{$connectidentifier}) { + $table_names->{$tid}->{$connectidentifier} = {}; + } + $table_names->{$tid}->{$connectidentifier}->{$class} = $tablename; if (not exists $table_expected_fieldnames->{$tid}) { #$table_expected_fieldnames->{$tid} = shared_clone({}); $table_expected_fieldnames->{$tid} = {}; @@ -287,8 +313,8 @@ sub checktableinfo { $table_expected_fieldnames->{$tid}->{$connectidentifier} = {}; } # we prefer to always update the expected fieldnames (that come from a derived class) - #$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($expected_fieldnames); - $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename} = $expected_fieldnames // []; + #$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = shared_clone($expected_fieldnames); + $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = $expected_fieldnames // []; if (not exists $table_fieldnames_cached->{$tid}) { #$table_fieldnames_cached->{$tid} = shared_clone({}); @@ -300,24 +326,26 @@ sub checktableinfo { $table_fieldnames_cached->{$tid}->{$connectidentifier} = {}; } - if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename}) { + if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}) { # query the database for fieldnames of the table if we don't have a cache entry yet: - #$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($db->getfieldnames($tablename)); - $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename} = $db->getfieldnames($tablename); - #my $fieldnames = $db->getfieldnames($tablename); - if (!defined $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename} - or (scalar @{$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}}) == 0 - or setcontains($table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename},1)) { + my $fieldnames = $db->getfieldnames($tablename); + if (!defined $fieldnames + or (scalar @{$fieldnames}) == 0 + or setcontains($table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$fieldnames,1)) { #fieldnames are case insensitive in general - # if not expected fieldnames are given or queried fieldnames match, we log this: - #$table_fieldnames_cached->{$connectidentifier}->{$tablename} = $table_expected_fieldnames->{$connectidentifier}->{$tablename}; + $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class} = { fieldnames => $fieldnames, ok => 1, }; fieldnamesaquired($db,$tablename,getlogger(__PACKAGE__)); } else { # otherwise we log a failure (exit? - see Logging Module) #$table_fieldnames_cached->{$connectidentifier}->{$tablename} = {}; #$fieldnames; - fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename},getlogger(__PACKAGE__)); + $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class} = { fieldnames => $fieldnames, ok => 0, }; + fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$fieldnames,getlogger(__PACKAGE__)); $result = 0; } + } elsif (not $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}->{ok}) { + fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}, + $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}->{fieldnames},getlogger(__PACKAGE__)); + $result = 0; } if (not exists $table_primarykeys->{$tid}) { @@ -329,11 +357,11 @@ sub checktableinfo { #$table_primarykeys->{$tid}->{$connectidentifier} = shared_clone({}); $table_primarykeys->{$tid}->{$connectidentifier} = {}; } - if (not exists $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}) { + if (not exists $table_primarykeys->{$tid}->{$connectidentifier}->{$class}) { # query the database for primary keys of the table if we don't have them cached yet: - #$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($db->getprimarykeycols($tablename)); - $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename} = $db->getprimarykeycols($tablename); - primarykeycolsaquired($db,$tablename,$table_primarykeys->{$tid}->{$connectidentifier}->{$tablename},getlogger(__PACKAGE__)); + #$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = shared_clone($db->getprimarykeycols($class)); + $table_primarykeys->{$tid}->{$connectidentifier}->{$class} = $db->getprimarykeycols($tablename); + primarykeycolsaquired($db,$tablename,$table_primarykeys->{$tid}->{$connectidentifier}->{$class},getlogger(__PACKAGE__)); } if (not exists $table_target_indexes->{$tid}) { @@ -346,8 +374,8 @@ sub checktableinfo { $table_target_indexes->{$tid}->{$connectidentifier} = {}; } # we prefer to always update the target table indexes (that come from a derived class) - #$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($target_indexes); - $table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = $target_indexes // {}; + #$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = shared_clone($target_indexes); + $table_target_indexes->{$tid}->{$connectidentifier}->{$class} = $target_indexes // {}; return $result; @@ -355,7 +383,7 @@ sub checktableinfo { sub create_targettable { - my ($get_db,$tablename,$get_target_db,$targettablename,$truncate,$defer_indexes,$texttable_engine) = @_; + my ($get_db,$class,$get_target_db,$targetclass,$targettablename,$truncate,$defer_indexes,$texttable_engine) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; @@ -369,22 +397,22 @@ sub create_targettable { } my $result = $target_db->create_texttable($targettablename, - $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}, - $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}, - $table_target_indexes->{$tid}->{$connectidentifier}->{$tablename}, + $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}, + $table_primarykeys->{$tid}->{$connectidentifier}->{$class}, + $table_target_indexes->{$tid}->{$connectidentifier}->{$class}, # 'ifnotexists' is always true $truncate, $defer_indexes, $texttable_engine); - checktableinfo($target_db,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$defer_indexes ? undef : $table_target_indexes->{$tid}->{$connectidentifier}->{$tablename}); + checktableinfo($target_db,$targetclass,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$defer_indexes ? undef : $table_target_indexes->{$tid}->{$connectidentifier}->{$class}); return $result; } sub delete_records { - my ($get_db,$get_xa_db,$tablename,$keyfields,$equal,$vals_table) = @_; + my ($get_db,$get_xa_db,$class,$keyfields,$equal,$vals_table) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db); @@ -392,8 +420,9 @@ sub delete_records { my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); - my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}; - my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}; + my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}; + my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class}; + my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class}; if (defined $expected_fieldnames and (defined $keyfields and @@ -463,7 +492,7 @@ sub delete_records { sub insert_record { - my ($get_db,$get_xa_db,$tablename,$row,$insert_ignore,$unique_count_fields) = @_; + my ($get_db,$get_xa_db,$class,$row,$insert_ignore,$unique_count_fields) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db); @@ -472,8 +501,9 @@ sub insert_record { my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); - my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}; - #my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}; + my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}; + #my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class}; + my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class}; if (defined $expected_fieldnames and defined $row) { @@ -530,7 +560,7 @@ sub insert_record { sub delete_record { - my ($get_db,$get_xa_db,$tablename,$row) = @_; + my ($get_db,$get_xa_db,$class,$row) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db); @@ -539,8 +569,9 @@ sub delete_record { my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); - my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}; - my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}; + my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}; + my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class}; + my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class}; if (defined $expected_fieldnames and defined $row) { @@ -603,7 +634,7 @@ sub delete_record { sub update_record { - my ($get_db,$get_xa_db,$tablename,$row) = @_; + my ($get_db,$get_xa_db,$class,$row) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db); @@ -612,8 +643,9 @@ sub update_record { my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); - my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}; - my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}; + my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}; + my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class}; + my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class}; if (defined $expected_fieldnames and defined $row) { @@ -683,11 +715,12 @@ sub update_record { sub insert_stmt { - my ($get_db,$tablename,$insert_ignore) = @_; + my ($get_db,$class,$insert_ignore) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); - my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}; + my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}; + my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class}; return 'INSERT ' . ($insert_ignore ? $db->insert_ignore_phrase() . ' ' : '') . 'INTO ' . $db->tableidentifier($tablename) . ' (' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ') VALUES (' . substr(',?' x scalar @$expected_fieldnames,1) . ')'; @@ -698,8 +731,9 @@ sub transfer_table { my %params = @_; my ($get_db, - $tablename, + $class, $get_target_db, + $targetclass, $targettablename, $truncate_targettable, $create_indexes, @@ -712,8 +746,9 @@ sub transfer_table { $select, $values) = @params{qw/ get_db - tablename + class get_target_db + targetclass targettablename truncate_targettable create_indexes @@ -734,6 +769,10 @@ sub transfer_table { my $db = &$get_db($reader_connection_name,1); my $target_db = &$get_target_db(); #$writer_connection_name); + my $connectidentifier = $db->connectidentifier(); + my $tid = threadid(); + my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class}; + my $countstatement; if (defined $selectcount) { $countstatement = $selectcount; @@ -756,11 +795,9 @@ sub transfer_table { $create_indexes = ((defined $create_indexes) ? $create_indexes : $transfer_defer_indexes); - if (create_targettable($db,$tablename,$target_db,$targettablename,$truncate_targettable,$create_indexes,$texttable_engine)) { + if (create_targettable($db,$class,$target_db,$targetclass,$targettablename,$truncate_targettable,$create_indexes,$texttable_engine)) { - my $connectidentifier = $db->connectidentifier(); - my $tid = threadid(); - my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}; + my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}; my @fieldnames = @$expected_fieldnames; @@ -774,7 +811,7 @@ sub transfer_table { $selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename) } - my $insertstatement = 'INSERT INTO ' . $target_db->tableidentifier($targettablename) . ' (' . join(', ',map { local $_ = $_; $_ = $target_db->columnidentifier($_); $_; } @fieldnames) . ') VALUES (' . $valueplaceholders . ')'; + my $insertstatement = 'INSERT INTO ' . $target_db->tableidentifier($targettablename) . ' (' . join(', ',map { local $_ = $_; $_ = $target_db->columnidentifier($_); $_; } @fieldnames) . ') VALUES (' . $valueplaceholders . ')'; my $blocksize; @@ -813,6 +850,7 @@ sub transfer_table { threadqueuelength => $tabletransfer_threadqueuelength, get_db => $get_db, tablename => $tablename, + class => $class, selectstatement => $selectstatement, blocksize => $blocksize, rowcount => $rowcount, @@ -831,6 +869,7 @@ sub transfer_table { #writererrorstate_ref => \$writererrorstate, get_target_db => $get_target_db, targettablename => $targettablename, + targetclass => $targetclass, insertstatement => $insertstatement, blocksize => $blocksize, rowcount => $rowcount, @@ -937,16 +976,16 @@ sub transfer_table { eval { $target_db->create_primarykey($targettablename, - $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}, - $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}); + $table_primarykeys->{$tid}->{$connectidentifier}->{$class}, + $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}); $target_db->create_indexes($targettablename, - $table_target_indexes->{$tid}->{$connectidentifier}->{$tablename}, - $table_primarykeys->{$tid}->{$connectidentifier}->{$tablename}); + $table_target_indexes->{$tid}->{$connectidentifier}->{$class}, + $table_primarykeys->{$tid}->{$connectidentifier}->{$class}); - delete $table_primarykeys->{$tid}->{$target_db->connectidentifier()}->{$targettablename}; - checktableinfo($target_db,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename}); + delete $table_primarykeys->{$tid}->{$target_db->connectidentifier()}->{$targetclass}; + checktableinfo($target_db,$targetclass,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$table_target_indexes->{$tid}->{$connectidentifier}->{$class}); $target_db->vacuum($targettablename); @@ -982,7 +1021,7 @@ sub process_table { my %params = @_; my ($get_db, - $tablename, + $class, $process_code, $static_context, $init_process_context_code, @@ -994,7 +1033,7 @@ sub process_table { $select, $values) = @params{qw/ get_db - tablename + class process_code static_context init_process_context_code @@ -1013,6 +1052,11 @@ sub process_table { my $db = &$get_db($reader_connection_name,1); + my $connectidentifier = $db->connectidentifier(); + my $tid = threadid(); + my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class}; + my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}; + my $countstatement; if (defined $selectcount) { $countstatement = $selectcount; @@ -1029,17 +1073,7 @@ sub process_table { return; } - my $errorstate = $RUNNING; - - my $connectidentifier = $db->connectidentifier(); - my $tid = threadid(); - my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}; - my @fieldnames = @$expected_fieldnames; - - #my $setfieldnames = join(', ',@fieldnames); - #my $valueplaceholders = substr(',?' x scalar @fieldnames,1); - my $selectstatement; if (length($select) > 0) { $selectstatement = $select; @@ -1047,6 +1081,7 @@ sub process_table { $selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename); } + my $errorstate = $RUNNING; my $blocksize; if ($enablemultithreading and $multithreading and $db->multithreading_supported() and $cpucount > 1) { # and $multithreaded) { # definitely no multithreading when CSVDB is involved @@ -1084,6 +1119,7 @@ sub process_table { threadqueuelength => $tableprocessing_threadqueuelength, get_db => $get_db, tablename => $tablename, + class => $class, selectstatement => $selectstatement, blocksize => $blocksize, rowcount => $rowcount, diff --git a/lib/NGCP/BulkProcessor/SqlRecord.pm b/lib/NGCP/BulkProcessor/SqlRecord.pm index 46a4ad3..a5871fa 100644 --- a/lib/NGCP/BulkProcessor/SqlRecord.pm +++ b/lib/NGCP/BulkProcessor/SqlRecord.pm @@ -13,9 +13,10 @@ our @EXPORT_OK = qw(); sub new { + my $base_class = shift; my $class = shift; my $self = bless {}, $class; - return init_record($self,@_); + return init_record($self,$class,@_); }