From bfa657558136445a238c248816501138dca55df5 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Fri, 22 Sep 2017 19:42:55 +0200 Subject: [PATCH] TT#18883 implement teletek importer #8 + prepare/merge/clean callforward+voicemail Y/N data + add voip_cf_destinations dao + add voip_cf_destination_sets dao + voip_cf_mappings, destination_sets and destinations insert dao method + write callforwards + write callforward related preference values + not well-formed enough: fixed issues popped up when trying to view/edit created callfowards in panel + fix "database locked" error - properly handle SQLite's serialized transactions (single transaction at a time). + write report file for debugging/review - json graph of merged data of a subscriber - including fields info/warning/error messages for a subscriber - write it snychronized now + get rid of JSON::XS + extend NGCPRestApi connector to support file transfers + heuristic for missing "channels" (by subscriber number count) + prevent mysql deadlocks when writing to ngcp + strictly consider record order from imports + task end result stats polished + cleanup code a bit + add kmailio.location dao + add kmailio.location insert dao method + writing "permanent registrations" to kamailio.location + generate location "ruid" and "partition" according to kamailio Change-Id: Ief9a7634b4930e51d79ac5e963ba48769d3708ea --- Build.PL | 1 - debian/control | 1 - .../Dao/Trunk/billing/voip_numbers.pm | 17 + .../Dao/Trunk/kamailio/location.pm | 257 +++++++++++ .../provisioning/voip_cf_destination_sets.pm | 148 ++++++ .../provisioning/voip_cf_destinations.pm | 163 +++++++ .../Trunk/provisioning/voip_cf_mappings.pm | 49 ++ .../Dao/Trunk/provisioning/voip_dbaliases.pm | 28 ++ .../Trunk/provisioning/voip_preferences.pm | 7 + lib/NGCP/BulkProcessor/LogError.pm | 2 +- .../Projects/Migration/Teletek/Api.pm | 433 ------------------ .../Projects/Migration/Teletek/Check.pm | 67 +-- .../Teletek/Dao/import/AllowedCli.pm | 14 +- .../Teletek/Dao/import/CallForward.pm | 1 + .../Migration/Teletek/Dao/import/Clir.pm | 1 + .../Teletek/Dao/import/Registration.pm | 3 +- .../Teletek/Dao/import/Subscriber.pm | 38 +- .../Projects/Migration/Teletek/Import.pm | 27 +- .../Projects/Migration/Teletek/Preferences.pm | 3 +- .../Migration/Teletek/Provisioning.pm | 291 ++++++++++-- .../Projects/Migration/Teletek/Settings.pm | 124 ++--- .../Projects/Migration/Teletek/config.cfg | 2 - .../Projects/Migration/Teletek/process.pl | 129 ++---- .../Projects/Migration/Teletek/settings.cfg | 23 +- lib/NGCP/BulkProcessor/RestConnector.pm | 160 ++++++- lib/NGCP/BulkProcessor/RestProcessor.pm | 25 +- lib/NGCP/BulkProcessor/Serialization.pm | 9 +- lib/NGCP/BulkProcessor/SqlConnector.pm | 11 + lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm | 7 + .../BulkProcessor/SqlConnectors/MySQLDB.pm | 7 + .../BulkProcessor/SqlConnectors/OracleDB.pm | 7 + .../SqlConnectors/PostgreSQLDB.pm | 7 + .../SqlConnectors/SQLServerDB.pm | 7 + .../BulkProcessor/SqlConnectors/SQLiteDB.pm | 31 +- lib/NGCP/BulkProcessor/SqlProcessor.pm | 19 +- 35 files changed, 1354 insertions(+), 765 deletions(-) create mode 100644 lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/location.pm create mode 100644 lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destination_sets.pm create mode 100644 lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destinations.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Api.pm diff --git a/Build.PL b/Build.PL index f6537b7..9c1fe00 100644 --- a/Build.PL +++ b/Build.PL @@ -37,7 +37,6 @@ my $builder = Module::Build->new( 'MIME::Base64' => 0, 'MIME::Lite' => 0, 'Net::SMTP' => 0, - 'JSON::XS' => 0, 'Data::Dump' => 0, 'YAML::XS' => 0, 'XML::Dumper' => '0.81', diff --git a/debian/control b/debian/control index 17afd1c..3172e91 100644 --- a/debian/control +++ b/debian/control @@ -38,7 +38,6 @@ Depends: libintl-perl, libio-compress-perl, libio-socket-ssl-perl, - libjson-xs-perl, liblog-log4perl-perl, libmail-imapclient-perl, libmarpa-r2-perl, diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm index b5eb7b5..8ff906c 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm @@ -109,6 +109,23 @@ sub forupdate_cc_ac_sn_subscriberid { return buildrecords_fromrows($rows,$load_recursive)->[0]; + #my $stmt = $db->paginate_sort_query('SELECT ' . $db->columnidentifier('id') . ' FROM ' . $table . ' WHERE ' . + # $db->columnidentifier('cc') . ' = ?' . + # ' AND ' . $db->columnidentifier('ac') . ' = ?' . + # ' AND ' . $db->columnidentifier('sn') . ' = ?' . + # ' AND (' . $db->columnidentifier('subscriber_id') . ' = ? OR ' . $db->columnidentifier('subscriber_id') . ' IS NULL)',undef,undef,[{ + # column => 'id', + # numeric => 1, + # dir => 1, + # }]); + #my @params = ($cc,$ac,$sn,$subscriber_id); + #foreach my $id (@{$xa_db->db_get_col($stmt,@params)}) { + # return buildrecords_fromrows([ + # $xa_db->db_get_row('SELECT * FROM ' . $table . ' WHERE ' . $db->columnidentifier('id') . ' = ? FOR UPDATE',$id) + # ],$load_recursive)->[0]; + #} + #return undef; + } sub release_subscriber_numbers { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/location.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/location.pm new file mode 100644 index 0000000..c169db0 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/location.pm @@ -0,0 +1,257 @@ +package NGCP::BulkProcessor::Dao::Trunk::kamailio::location; +use strict; + +## no critic + +#use threads::shared qw(); + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_kamailio_db +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_record + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +use NGCP::BulkProcessor::Utils qw(threadid); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + insert_row + countby_usernamedomain + next_ruid +); + +my $tablename = 'location'; +my $get_db = \&get_kamailio_db; + +my $expected_fieldnames = [ + 'id', + 'username', + 'domain', + 'contact', + 'received', + 'path', + 'expires', + 'q', + 'callid', + 'cseq', + 'last_modified', + 'flags', + 'cflags', + 'user_agent', + 'socket', + 'methods', + 'ruid', + 'reg_id', + 'instance', + 'server_id', + 'connection_id', + 'keepalive', + 'partition', + +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +#/*! call-id used for ul_add and ul_rm_contact */ +#static str mi_ul_cid = str_init("dfjrewr12386fd6-343@Kamailio.mi"); +#/*! user agent used for ul_add */ +#static str mi_ul_ua = str_init("Kamailio MI Server"); + +my $default_expires = 0; #4294967295 +my $default_path = ''; +my $default_q = 1.0; +my $default_cseq = 1; +my $default_callid = 'dfjrewr12386fd6-343@Kamailio.mi'; +my $default_useragent = 'SIP Router MI Server'; #'Kamailio MI Server'; +#\kamailio-master\src\lib\srutils\sruid.c +my $ruid_time = time(); +my $ruid_counter = 0; +my $ruid_format = 'ulcx-%x-%x-%x'; +my $partition_counter = 0; +my $max_partitions = undef; #>30...; + +sub next_ruid { + return sprintf($ruid_format,$ruid_time,threadid(),$ruid_counter++); +} + +sub _get_partition { + my $partition = $partition_counter + threadid(); + $partition_counter++; + if (defined $max_partitions and $max_partitions > 0) { + return $partition % $max_partitions; + } + return $partition; +} + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub countby_usernamedomain { + + my ($username,$domain) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + my @terms = (); + if (defined $username) { + push(@terms,'username = ?'); + push(@params,$username); + } + if (defined $domain) { + push(@terms,'domain = ?'); + push(@params,$domain); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($username, + $domain, + $contact, + $q, + $expires, + $ruid) = @params{qw/ + username + domain + contact + q + expires + ruid + /}; + + $expires //= $default_expires; + $q //= $default_q; + $ruid //= next_ruid(); + my $partition = _get_partition(); + my $path = $default_path; + my $cseq = $default_cseq; + my $callid = $default_callid; + my $useragent = $default_useragent; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('username') . ', ' . + $db->columnidentifier('domain') . ', ' . + $db->columnidentifier('contact') . ', ' . + $db->columnidentifier('path') . ', ' . + $db->columnidentifier('q') . ', ' . + $db->columnidentifier('last_modified') . ', ' . + $db->columnidentifier('expires') . ', ' . + $db->columnidentifier('cseq') . ', ' . + $db->columnidentifier('callid') . ', ' . + $db->columnidentifier('user_agent') . ', ' . + $db->columnidentifier('partition') . ', ' . + $db->columnidentifier('ruid') . ') VALUES (' . + '?, ' . + '?, ' . + '?, ' . + '?, ' . + '?, ' . + 'FROM_UNIXTIME(0), ' . + 'FROM_UNIXTIME(?), ' . + '?, ' . + '?, ' . + '?, ' . + '?, ' . + '?)', + $username, + $domain, + $contact, + $path, + $q, + $expires, + $cseq, + $callid, + $useragent, + $partition, + $ruid, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destination_sets.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destination_sets.pm new file mode 100644 index 0000000..cff3857 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destination_sets.pm @@ -0,0 +1,148 @@ +package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_provisioning_db + +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row + insert_record +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + countby_subscriberid_type + insert_row +); + +my $tablename = 'voip_cf_destination_sets'; +my $get_db = \&get_provisioning_db; + +my $expected_fieldnames = [ + 'id', + 'subscriber_id', + 'name', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub countby_subscriberid_type { + + my ($subscriber_id,$type,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + my @terms = (); + if ($subscriber_id) { + push(@terms,$db->columnidentifier('subscriber_id') . ' = ?'); + push(@params,$subscriber_id); + } + if ($type) { + push(@terms,$db->columnidentifier('type') . ' = ?'); + push(@params,$type); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($subscriber_id, + $name) = @params{qw/ + subscriber_id + name + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('subscriber_id') . ', ' . + $db->columnidentifier('name') .') VALUES (' . + '?, ' . + '?)' + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destinations.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destinations.pm new file mode 100644 index 0000000..ee634ef --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destinations.pm @@ -0,0 +1,163 @@ +package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_provisioning_db + +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row + insert_record +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + countby_subscriberid_type + insert_row +); + +my $tablename = 'voip_cf_destinations'; +my $get_db = \&get_provisioning_db; + +my $expected_fieldnames = [ + 'id', + 'destination_set_id', + 'destination', + 'priority', + 'timeout', + 'announcement_id', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub countby_subscriberid_type { + + my ($subscriber_id,$type,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + my @terms = (); + if ($subscriber_id) { + push(@terms,$db->columnidentifier('subscriber_id') . ' = ?'); + push(@params,$subscriber_id); + } + if ($type) { + push(@terms,$db->columnidentifier('type') . ' = ?'); + push(@params,$type); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($destination_set_id, + $destination, + $priority, + $timeout, + $announcement_id) = @params{qw/ + destination_set_id + destination + priority + timeout + announcement_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('destination_set_id') . ', ' . + $db->columnidentifier('destination') . ', ' . + $db->columnidentifier('priority') . ', ' . + $db->columnidentifier('timeout') . ', ' . + $db->columnidentifier('announcement_id') .') VALUES (' . + '?, ' . + '?, ' . + '?, ' . + '?, ' . + 'NULL)' + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm index 2273ed3..be212dd 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm @@ -11,6 +11,7 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::SqlProcessor qw( checktableinfo copy_row + insert_record ); use NGCP::BulkProcessor::SqlRecord qw(); @@ -25,6 +26,8 @@ our @EXPORT_OK = qw( $CFT_TYPE $CFU_TYPE $CFNA_TYPE + + insert_row ); my $tablename = 'voip_cf_mappings'; @@ -40,6 +43,8 @@ my $expected_fieldnames = [ my $indexes = {}; +my $insert_unique_fields = []; + our $CFB_TYPE = 'cfb'; our $CFT_TYPE = 'cft'; our $CFU_TYPE = 'cfu'; @@ -84,6 +89,50 @@ sub countby_subscriberid_type { } +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($subscriber_id, + $type, + $destination_set_id, + $time_set_id) = @params{qw/ + subscriber_id + type + destination_set_id + time_set_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('subscriber_id') . ', ' . + $db->columnidentifier('type') . ', ' . + $db->columnidentifier('destination_set_id') . ', ' . + $db->columnidentifier('time_set_id') . ') VALUES (' . + '?, ' . + '?, ' . + '?, ' . + '?)', + $subscriber_id, + $type, + $destination_set_id, + $time_set_id + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm index dfcb5c1..0c14edf 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm @@ -31,6 +31,7 @@ our @EXPORT_OK = qw( findby_subscriberid_username findby_domainid_username + countby_subscriberidisprimary ); my $tablename = 'voip_dbaliases'; @@ -102,6 +103,33 @@ sub findby_domainid_username { } +sub countby_subscriberidisprimary { + + my ($subscriber_id,$is_primary) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + my @terms = (); + if (defined $subscriber_id) { + push(@terms,$db->columnidentifier('subscriber_id') . ' = ?'); + push(@params,$subscriber_id); + } + if (defined $is_primary) { + push(@terms,$db->columnidentifier('is_primary') . ' = ?'); + push(@params,$is_primary); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + sub delete_dbaliases { my ($xa_db,$subscriber_id,$usernames) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm index 6b63256..61cd5c0 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm @@ -38,6 +38,9 @@ our @EXPORT_OK = qw( $ALLOWED_IPS_GRP_ATTRIBUTE $CONCURRENT_MAX_TOTAL_ATTRIBUTE $CONCURRENT_MAX_PER_ACCOUNT + + @CF_ATTRIBUTES + $RINGTIMEOUT_ATTRIBUTE ); #$FORCE_OUTBOUND_CALLS_TO_PEER @@ -88,6 +91,10 @@ our $CONCURRENT_MAX_TOTAL_ATTRIBUTE = 'concurrent_max_total'; our $CONCURRENT_MAX_PER_ACCOUNT_ATTRIBUTE = 'concurrent_max_per_account'; our $CLIR_ATTRIBUTE = 'clir'; +our @CF_ATTRIBUTES = qw(cfu cft cfna cfb); #skip sms for now + +our $RINGTIMEOUT_ATTRIBUTE = 'ringtimeout'; + sub new { my $class = shift; diff --git a/lib/NGCP/BulkProcessor/LogError.pm b/lib/NGCP/BulkProcessor/LogError.pm index 9b16ed1..c777e98 100644 --- a/lib/NGCP/BulkProcessor/LogError.pm +++ b/lib/NGCP/BulkProcessor/LogError.pm @@ -341,7 +341,7 @@ sub restwarn { sub restrequesterror { - my ($restapi, $message, $request, $logger) = @_; + my ($restapi, $message, $request, $data, $logger) = @_; $message = _getrestconnectorinstanceprefix($restapi) . _getrestconnectidentifiermessage($restapi,$message); if (defined $logger) { $logger->error($message); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Api.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Api.pm deleted file mode 100644 index ad477fd..0000000 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Api.pm +++ /dev/null @@ -1,433 +0,0 @@ -package NGCP::BulkProcessor::Projects::Migration::Teletek::Api; -use strict; - -## no critic - -use threads::shared qw(); -#use List::Util qw(); - -use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( - $dry - $skip_errors - $batch - - $domain_name - $reseller_id - $subsciber_username_prefix - - $set_call_forwards_multithreading - $set_call_forwards_numofthreads - $cfb_priorities - $cfb_timeouts - $cfu_priorities - $cfu_timeouts - $cft_priorities - $cft_timeouts - $cfna_priorities - $cfna_timeouts - $cfnumber_exclude_pattern - $cfnumber_trim_pattern - $ringtimeout -); - -use NGCP::BulkProcessor::Logging qw ( - getlogger - processing_info - processing_debug -); -use NGCP::BulkProcessor::LogError qw( - rowprocessingerror - rowprocessingwarn -); - -use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); - -use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); - -use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); - -use NGCP::BulkProcessor::RestRequests::Trunk::CallForwards qw(); - -use NGCP::BulkProcessor::ConnectorPool qw( - get_xa_db -); - -use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw( - destroy_all_dbs -); - -use NGCP::BulkProcessor::Utils qw(threadid); - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT_OK = qw( - set_call_forwards -); - -sub set_call_forwards { - - my $static_context = {}; - my $result = _set_call_forwards_checks($static_context); - - destroy_all_dbs(); - my $warning_count :shared = 0; - return ($result && NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::process_records( - static_context => $static_context, - process_code => sub { - my ($context,$records,$row_offset) = @_; - my $rownum = $row_offset; - foreach my $imported_subscriber (@$records) { - $rownum++; - next unless _reset_set_call_forward_context($context,$imported_subscriber,$rownum); - _set_call_forward($context); - } - - #return 0; - return 1; - }, - init_process_context_code => sub { - my ($context)= @_; - $context->{db} = &get_xa_db(); - $context->{error_count} = 0; - $context->{warning_count} = 0; - # below is not mandatory.. - _check_insert_tables(); - }, - uninit_process_context_code => sub { - my ($context)= @_; - undef $context->{db}; - destroy_all_dbs(); - { - lock $warning_count; - $warning_count += $context->{warning_count}; - } - }, - load_recursive => 0, - multithreading => $set_call_forwards_multithreading, - numofthreads => $set_call_forwards_numofthreads, - ),$warning_count); -} - -sub _check_insert_tables { - -} - -sub _invoke_api { - my ($context,$api_code) = @_; - - eval { - $context->{db}->db_begin(); - #rowprocessingwarn($context->{tid},'AutoCommit is on' ,getlogger(__PACKAGE__)) if $context->{db}->{drh}->{AutoCommit}; - - my $existing_billing_voip_subscribers = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_domainid_username_states($context->{db}, - $context->{billing_domain}->{id},$context->{username},{ 'NOT IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::TERMINATED_STATE}); - if ((scalar @$existing_billing_voip_subscribers) == 0) { - - #if ($context->{subscriberdelta} eq - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::deleted_delta) { - # _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' is deleted, and no active subscriber found',1); - #} else { - # _warn($context,"($context->{rownum}) no active subscriber found for susbcriber " . $context->{cli}); - #} - } elsif ((scalar @$existing_billing_voip_subscribers) == 1) { - $context->{billing_voip_subscriber} = $existing_billing_voip_subscribers->[0]; - $context->{provisioning_voip_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid( - $context->{db},$context->{billing_voip_subscriber}->{uuid}); - if (defined $context->{provisioning_voip_subscriber}) { - #if ($context->{subscriberdelta} eq - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::deleted_delta) { - # - # _warn($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' is deleted, but active subscriber found'); - - #} else { - if (defined $api_code and 'CODE' eq ref $api_code) { - &$api_code($context); - } - #} - } else { - if ($skip_errors) { - _warn($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli}); - } else { - _error($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli}); - } - } - } else { - rowprocessingwarn($context->{tid},"($context->{rownum}) " . 'multiple (' . (scalar @$existing_billing_voip_subscribers) . ') existing billing subscribers with username ' . $context->{username} . ' found, skipping' ,getlogger(__PACKAGE__)); - } - - if ($dry) { - $context->{db}->db_rollback(0); - } else { - $context->{db}->db_commit(); - } - - }; - my $err = $@; - if ($err) { - eval { - $context->{db}->db_rollback(1); - }; - if ($skip_errors) { - _warn($context,"($context->{rownum}) " . 'database error with subscriber ' . $context->{cli} . ': ' . $err); - } else { - _error($context,"($context->{rownum}) " . 'database error with subscriber ' . $context->{cli} . ': ' . $err); - } - } - -} - -sub _set_call_forward { - my ($context) = @_; - _invoke_api($context,\&_set_cf_simple); -} - -sub _checks { - - my ($context) = @_; - - my $result = 1; - - #my $userpasswordcount = 0; - #eval { - # $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::UsernamePassword::countby_fqdn(); - #}; - #if ($@ or $userpasswordcount == 0) { - # rowprocessingerror(threadid(),'please import user passwords first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} - #my $subscribercount = 0; - #my $subscriber_barring_profiles = []; - #eval { - # $subscribercount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::countby_subscribernumber(); - # $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::list_barringprofiles(); - #}; - #if ($@ or $subscribercount == 0) { - # rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} - - #eval { - # $context->{billing_domain} = NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_domain($domain_name); - # if (defined $context->{billing_domain} - # and NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers::countby_domainid_resellerid($context->{billing_domain}->{id},$reseller_id) == 0) { - # undef $context->{billing_domain}; - # } - #}; - #if ($@ or not defined $context->{billing_domain}) { - # rowprocessingerror(threadid(),'cannot find billing domain',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} - - return $result; - -} - -sub _set_call_forwards_checks { - my ($context) = @_; - - my $result = _checks($context); - - #my $optioncount = 0; - #eval { - # $optioncount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOption::countby_subscribernumber_option(); - #}; - #if ($@ or $optioncount == 0) { - # rowprocessingerror(threadid(),'please import subscriber features first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} - - return $result; -} - -sub _set_cf_simple { - - my ($context) = @_; - - my $result = 0; - #my $cf_path = NGCP::BulkProcessor::RestRequests::Trunk::CallForwards::get_item_path($context->{billing_voip_subscriber}->{id}); - #eval { - # my $callforwards; - # if ($dry) { - # $callforwards = NGCP::BulkProcessor::RestRequests::Trunk::CallForwards::get_item($context->{billing_voip_subscriber}->{id}); - # } else { - # $callforwards = NGCP::BulkProcessor::RestRequests::Trunk::CallForwards::set_item( - # $context->{billing_voip_subscriber}->{id},$context->{call_forwards}); - # } - # $result = (defined $callforwards ? 1 : 0); - #}; - #if ($@ or not $result) { - # if ($skip_errors) { - # _warn($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ': could not ' . ($dry ? 'fetch' : 'set') . ' call forwards ' . $cf_path); - # } else { - # _error($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ': could not ' . ($dry ? 'fetch' : 'set') . ' call forwards ' . $cf_path); - # } - #} else { - # _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ': call forwards ' . $cf_path . ($dry ? ' fetched' : ' set')); - #} - return $result; - -} - -sub _reset_context { - - my ($context,$imported_subscriber,$rownum) = @_; - - my $result = 1; - - $context->{rownum} = $rownum; - - #$context->{cli} = $imported_subscriber->subscribernumber(); - #$context->{e164} = {}; - #$context->{e164}->{cc} = substr($context->{cli},0,3); - #$context->{e164}->{ac} = ''; - #$context->{e164}->{sn} = substr($context->{cli},3); - - #$context->{subscriberdelta} = $imported_subscriber->{delta}; - - #my $userpassword = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::UsernamePassword::findby_fqdn($context->{cli}); - #if (defined $userpassword) { - # $context->{username} = (defined $subsciber_username_prefix ? $subsciber_username_prefix : '') . $userpassword->{username}; - # $context->{password} = $userpassword->{password}; - # $context->{userpassworddelta} = $userpassword->{delta}; - #} else { - # # once full username+passwords is available: - # delete $context->{username}; - # delete $context->{password}; - # delete $context->{userpassworddelta}; - # if ($context->{subscriberdelta} eq - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::deleted_delta) { - # - # } else { - # $result &= 0; - # - # if ($skip_errors) { - # # for now, as username+passwords are incomplete: - # _warn($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli}); - # } else { - # _error($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli}); - # } - # } - #} - - #delete $context->{billing_voip_subscriber}; - #delete $context->{provisioning_voip_subscriber}; - - return $result; - -} - -sub _reset_set_call_forward_context { - - my ($context,$imported_subscriber,$rownum) = @_; - - my $result = _reset_context($context,$imported_subscriber,$rownum); - - #my $call_forwards = {}; - #if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::countby_subscribernumber_option_optionsetitem( - # $context->{cli}, { 'IN' => [ - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ON_BUSY_OPTION_SET, - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ALL_CALLS_OPTION_SET, - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ON_NO_ANSWER_OPTION_SET, - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_UNAVAILABLE_OPTION_SET, - # ]}) > 0) { - - # $call_forwards->{cfb} = _prepare_callforward($context,$cfb_priorities,$cfb_timeouts, - # NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::findby_subscribernumber_option_optionsetitem( - # $context->{cli}, - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ON_BUSY_OPTION_SET, - # )); - - # $call_forwards->{cfu} = _prepare_callforward($context,$cfu_priorities,$cfu_timeouts, - # NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::findby_subscribernumber_option_optionsetitem( - # $context->{cli}, - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ALL_CALLS_OPTION_SET, - # )); - - # $call_forwards->{cft} = _prepare_callforward($context,$cft_priorities,$cft_timeouts, - # NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::findby_subscribernumber_option_optionsetitem( - # $context->{cli}, - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ON_NO_ANSWER_OPTION_SET, - # )); - # $call_forwards->{cft}->{ringtimeout} = $ringtimeout if defined $call_forwards->{cft}; - - # $call_forwards->{cfna} = _prepare_callforward($context,$cfna_priorities,$cfna_timeouts, - # NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::findby_subscribernumber_option_optionsetitem( - # $context->{cli}, - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_UNAVAILABLE_OPTION_SET, - # )); - #} else { - # _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' never had call forwards, skipping',1); - # $call_forwards->{cfb} = undef; - # $call_forwards->{cfu} = undef; - # $call_forwards->{cft} = undef; - # $call_forwards->{cfna} = undef; - # $result = 0; - #} - #$context->{call_forwards} = $call_forwards; - - return $result; - -} - -sub _prepare_callforward { - - my ($context,$priorities,$timeouts,$cf_option_set_items) = @_; - my @destinations = (); - #my $i = 0; - #foreach my $cf_option_set_item (@$cf_option_set_items) { - # if (defined $cf_option_set_item and $cf_option_set_item->{delta} ne - # $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::deleted_delta) { - # if (defined $cfnumber_exclude_pattern and $cf_option_set_item->{optionsetitem} =~ $cfnumber_exclude_pattern) { - # _warn($context,"($context->{rownum}) " . $cf_option_set_item->{option} . " '" . $cf_option_set_item->{optionsetitem} . "' of subscriber " . $context->{cli} . ': exclude pattern match'); - # } else { - # my $destination = $cf_option_set_item->{optionsetitem}; - # if (defined $cfnumber_trim_pattern) { - # $destination =~ s/$cfnumber_trim_pattern//; - # if ($cf_option_set_item->{optionsetitem} ne $destination) { - # _info($context,"($context->{rownum}) " . $cf_option_set_item->{option} . " '" . $cf_option_set_item->{optionsetitem} . "' of subscriber " . $context->{cli} . ": trim pattern match, changed to to '$destination'"); - # } - # } - # push(@destinations, { - # destination => $destination, - # priority => (defined $priorities->[$i] ? $priorities->[$i] : $priorities->[-1]), - # timeout => (defined $timeouts->[$i] ? $timeouts->[$i] : $timeouts->[-1]), - # }); - # $i++; - # } - # } - #} - if ((scalar @destinations) > 0) { - return { destinations => \@destinations , times => [], }; - } else { - return undef; - } - -} - -sub _error { - - my ($context,$message) = @_; - $context->{error_count} = $context->{error_count} + 1; - rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__)); - -} - -sub _warn { - - my ($context,$message) = @_; - $context->{warning_count} = $context->{warning_count} + 1; - rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__)); - -} - -sub _info { - - my ($context,$message,$debug) = @_; - if ($debug) { - processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); - } else { - processing_info($context->{tid},$message,getlogger(__PACKAGE__)); - } -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm index 7cd27cd..04f8695 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm @@ -5,14 +5,6 @@ use strict; no strict 'refs'; - - -#use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( -# $reseller_id -# $domain_name -# $billing_profile_id -#); - use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); @@ -23,10 +15,6 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::products qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw(); -#use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw(); -#use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw(); -#use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw(); -#use NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); @@ -36,8 +24,11 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw(); use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw(); +use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw(); @@ -48,8 +39,6 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw(); use NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles qw(); -#use NGCP::BulkProcessor::RestRequests::Trunk::NcosLevels qw(); -#use NGCP::BulkProcessor::RestRequests::Trunk::LnpCarriers qw(); require Exporter; our @ISA = qw(Exporter); @@ -107,15 +96,6 @@ sub check_billing_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers'); $result &= $check_result; push(@$messages,$message); - #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers'); - #$result &= $check_result; push(@$messages,$message); - - #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers'); - #if (not $check_result) { - # ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers'); - #} - #$result &= $check_result; push(@$messages,$message); - return $result; } @@ -183,6 +163,12 @@ sub check_provisioning_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings'); $result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations'); + $result &= $check_result; push(@$messages,$message); + return $result; } @@ -200,6 +186,8 @@ sub check_kamailio_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users'); $result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::location'); + $result &= $check_result; push(@$messages,$message); return $result; @@ -231,38 +219,7 @@ sub check_rest_get_items { my $message_prefix = 'NGCP id\'s/constants - '; - #($check_result,$message, my $reseller) = _check_rest_get_item($message_prefix, - # 'NGCP::BulkProcessor::RestRequests::Trunk::Resellers', - # $reseller_id, - # 'name'); - #$result &= $check_result; push(@$messages,$message); - - #($check_result,$message, my $domain) = _check_rest_get_item($message_prefix, - # 'NGCP::BulkProcessor::RestRequests::Trunk::Domains', - # { 'domain' => $domain_name, 'reseller_id' => $reseller_id }, - # 'domain', - # 'get_item_filtered', - # 'get_item_filter_path'); - #$result &= $check_result; push(@$messages,$message); - - #($check_result,$message, my $domain) = _check_rest_get_item($message_prefix, - # 'NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles', - # $billing_profile_id, - # 'handle'); - #$result &= $check_result; push(@$messages,$message); - - #foreach my $level (values %$barring_profiles) { - # if (defined $level and length($level) > 0) { - # ($check_result,$message, my $ncos_level) = _check_rest_get_item($message_prefix, - # 'NGCP::BulkProcessor::RestRequests::Trunk::NcosLevels', - # { 'level' => $level, 'reseller_id' => $reseller_id }, - # 'level', - # 'get_item_filtered', - # 'get_item_filter_path'); - # $result &= $check_result; push(@$messages,$message); - # } - #} - + return $result; } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/AllowedCli.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/AllowedCli.pm index 58cfbc4..4e58fc0 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/AllowedCli.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/AllowedCli.pm @@ -67,6 +67,7 @@ our @fieldnames = ( #calculated fields at the end! 'rownum', + 'filenum', 'filename', ); @@ -170,10 +171,19 @@ sub findby_sipusername { #return [] unless (defined $cc or defined $ac or defined $sn); my $rows = $db->db_get_all_arrayref( - 'SELECT * FROM ' . + $db->paginate_sort_query('SELECT * FROM ' . $table . ' WHERE ' . - $db->columnidentifier('sip_username') . ' = ?' + $db->columnidentifier('sip_username') . ' = ?', + undef,undef,[{ + column => 'filenum', + numeric => 1, + dir => 1, + },{ + column => 'rownum', + numeric => 1, + dir => 1, + }]) ,$sip_username); return buildrecords_fromrows($rows,$load_recursive); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/CallForward.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/CallForward.pm index 609d183..2a03fcb 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/CallForward.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/CallForward.pm @@ -68,6 +68,7 @@ our @fieldnames = ( #calculated fields at the end! "sip_username", 'rownum', + 'filenum', 'filename', ); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Clir.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Clir.pm index ff17df9..639a0a5 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Clir.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Clir.pm @@ -64,6 +64,7 @@ our @fieldnames = ( #calculated fields at the end! 'rownum', + 'filenum', 'filename', ); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Registration.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Registration.pm index 5cacc3e..34bf6a8 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Registration.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Registration.pm @@ -59,6 +59,7 @@ our @fieldnames = ( #calculated fields at the end! 'rownum', + 'filenum', 'filename', ); @@ -68,7 +69,7 @@ my $expected_fieldnames = [ ]; # table creation: -my $primarykey_fieldnames = [ 'sip_username' ]; +my $primarykey_fieldnames = [ 'sip_username' ]; #, 'domain', 'contact' ]; my $indexes = { $tablename . '_rownum' => [ 'rownum(11)' ], $tablename . '_delta' => [ 'delta(7)' ], diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Subscriber.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Subscriber.pm index d430e39..450598f 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Subscriber.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Subscriber.pm @@ -93,6 +93,7 @@ our @fieldnames = ( 'rownum', 'range', 'contact_hash', + 'filenum', 'filename', ); my $expected_fieldnames = [ @@ -196,11 +197,20 @@ sub findby_domain_sipusername { #return [] unless (defined $cc or defined $ac or defined $sn); my $rows = $db->db_get_all_arrayref( - 'SELECT * FROM ' . + $db->paginate_sort_query('SELECT * FROM ' . $table . ' WHERE ' . $db->columnidentifier('domain') . ' = ?' . - ' AND ' . $db->columnidentifier('sip_username') . ' = ?' + ' AND ' . $db->columnidentifier('sip_username') . ' = ?', + undef,undef,[{ + column => 'filenum', + numeric => 1, + dir => 1, + },{ + column => 'rownum', + numeric => 1, + dir => 1, + }]) ,$domain,$sip_username); return buildrecords_fromrows($rows,$load_recursive); @@ -238,11 +248,20 @@ sub findby_domain_webusername { #return [] unless (defined $cc or defined $ac or defined $sn); my $rows = $db->db_get_all_arrayref( - 'SELECT * FROM ' . + $db->paginate_sort_query('SELECT * FROM ' . $table . ' WHERE ' . $db->columnidentifier('domain') . ' = ?' . - ' AND ' . $db->columnidentifier('web_username') . ' = ?' + ' AND ' . $db->columnidentifier('web_username') . ' = ?', + undef,undef,[{ + column => 'filenum', + numeric => 1, + dir => 1, + },{ + column => 'rownum', + numeric => 1, + dir => 1, + }]) ,$domain,$web_username); return buildrecords_fromrows($rows,$load_recursive); @@ -404,7 +423,16 @@ sub process_records { destroy_reader_dbs_code => \&destroy_all_dbs, multithreading => $multithreading, tableprocessing_threads => $numofthreads, - 'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols), + #'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols), + 'select' => $db->paginate_sort_query('SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),undef,undef,[{ + column => 'filenum', + numeric => 1, + dir => 1, + },{ + column => 'rownum', + numeric => 1, + dir => 1, + }]), 'select_count' => 'SELECT COUNT(DISTINCT(' . join(',',@cols) . ')) FROM ' . $table, ); } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm index 94972d9..1ced891 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm @@ -82,7 +82,9 @@ sub import_subscriber { destroy_all_dbs(); #close all db connections before forking.. my $warning_count :shared = 0; + my $filenum = 0; foreach my $file (@files) { + $filenum++; $result &= $importer->process( file => $file, process_code => sub { @@ -98,6 +100,7 @@ sub import_subscriber { $record->{ac} //= ''; $record->{sn} //= ''; $record->{rownum} = $rownum; + $record->{filenum} = $filenum; $record->{filename} = $file; my %r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); @@ -229,7 +232,9 @@ sub import_allowedcli { destroy_all_dbs(); #close all db connections before forking.. my $warning_count :shared = 0; + my $filenum = 0; foreach my $file (@files) { + $filenum++; $result &= $importer->process( file => $file, process_code => sub { @@ -245,6 +250,7 @@ sub import_allowedcli { $record->{ac} //= ''; $record->{sn} //= ''; $record->{rownum} = $rownum; + $record->{filenum} = $filenum; $record->{filename} = $file; if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) { @@ -470,7 +476,9 @@ sub import_clir { destroy_all_dbs(); #close all db connections before forking.. my $warning_count :shared = 0; + my $filenum = 0; foreach my $file (@files) { + $filenum++; $result &= $importer->process( file => $file, process_code => sub { @@ -483,6 +491,7 @@ sub import_clir { $row = [ map { local $_ = $_; trim($_); } @$row ]; my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir->new($row); $record->{rownum} = $rownum; + $record->{filenum} = $filenum; $record->{filename} = $file; if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) { @@ -623,7 +632,9 @@ sub import_callforward { destroy_all_dbs(); #close all db connections before forking.. my $warning_count :shared = 0; + my $filenum = 0; foreach my $file (@files) { + $filenum++; $result &= $importer->process( file => $file, process_code => sub { @@ -639,6 +650,7 @@ sub import_callforward { $record->{ac} //= ''; $record->{sn} //= ''; $record->{rownum} = $rownum; + $record->{filenum} = $filenum; $record->{filename} = $file; if (my $subscriber = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) { @@ -767,15 +779,6 @@ sub _insert_callforward_rows { } } - - - - - - - - - sub import_registration { my (@files) = @_; @@ -792,7 +795,9 @@ sub import_registration { destroy_all_dbs(); #close all db connections before forking.. my $warning_count :shared = 0; + my $filenum = 0; foreach my $file (@files) { + $filenum++; $result &= $importer->process( file => $file, process_code => sub { @@ -805,6 +810,7 @@ sub import_registration { $row = [ map { local $_ = $_; trim($_); } @$row ]; my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration->new($row); $record->{rownum} = $rownum; + $record->{filenum} = $filenum; $record->{filename} = $file; if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) { @@ -930,9 +936,6 @@ sub _insert_registration_rows { } - - - sub _error { my ($context,$message) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Preferences.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Preferences.pm index 27ec874..c29b3a0 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Preferences.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Preferences.pm @@ -12,8 +12,6 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( $dry $skip_errors - - ); use NGCP::BulkProcessor::Logging qw ( @@ -63,6 +61,7 @@ our @EXPORT_OK = qw( ); + sub cleanup_aig_sequence_ids { my ($context) = @_; eval { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Provisioning.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Provisioning.pm index 16842b0..cbba029 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Provisioning.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Provisioning.pm @@ -7,10 +7,13 @@ use threads::shared qw(); use String::MkPasswd qw(); #use List::Util qw(); +use JSON qw(); +use Tie::IxHash; + use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( $dry $skip_errors - + $report_filename @@ -18,10 +21,15 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( $provision_subscriber_numofthreads $webpassword_length $webusername_length + $default_channels_map $reseller_mapping $barring_profiles + $cf_default_priority + $cf_default_timeout + $cft_default_ringtimeout + ); use NGCP::BulkProcessor::Logging qw ( @@ -32,11 +40,14 @@ use NGCP::BulkProcessor::Logging qw ( use NGCP::BulkProcessor::LogError qw( rowprocessingerror rowprocessingwarn + fileerror ); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::products qw(); @@ -56,8 +67,12 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw(); use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw(); +use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Subscribers qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Customers qw(); @@ -95,6 +110,13 @@ my $split_ipnets_pattern = join('|',( quotemeta(';'), #quotemeta('/') )); +my $cf_types_pattern = '^' . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFB_TYPE . '|' + . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFT_TYPE . '|' + . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFU_TYPE . '|' + . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFNA_TYPE . '$'; + +my $db_lock :shared = undef; +my $file_lock :shared = undef; sub provision_subscribers { @@ -103,18 +125,27 @@ sub provision_subscribers { destroy_all_dbs(); my $warning_count :shared = 0; - #my $updated_password_count :shared = 0; + my %nonunique_contacts :shared = (); return ($result && NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::process_records( static_context => $static_context, process_code => sub { my ($context,$records,$row_offset) = @_; $context->{rowcount} = $row_offset; + my @report_data = (); foreach my $domain_sipusername (@$records) { $context->{rowcount} += 1; next unless _provision_susbcriber($context, NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_domain_sipusername(@$domain_sipusername)); + push(@report_data,_get_report_obj($context)); } cleanup_aig_sequence_ids($context); + if (defined $report_filename) { + lock $file_lock; + open(my $fh, '>>', $report_filename) or fileerror('cannot open file ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__)); + binmode($fh); + print $fh JSON::to_json(\@report_data,{ allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, }); + close $fh; + } return 1; }, init_process_context_code => sub { @@ -122,42 +153,63 @@ sub provision_subscribers { $context->{db} = &get_xa_db(); $context->{error_count} = 0; $context->{warning_count} = 0; - #$context->{updated_password_count} = 0; + $context->{nonunique_contacts} = {}; # below is not mandatory.. - _check_insert_tables(); + #_check_insert_tables(); }, uninit_process_context_code => sub { my ($context)= @_; undef $context->{db}; - _warn($context,'non-unique contacts: ' . join("\n",keys %{$context->{nonunique_contacts}})) - if (scalar keys %{$context->{nonunique_contacts}}) > 0; destroy_all_dbs(); { lock $warning_count; $warning_count += $context->{warning_count}; - #$updated_password_count += $context->{updated_password_count}; + } + { + lock %nonunique_contacts; + foreach my $sip_username (keys %{$context->{nonunique_contacts}}) { + $nonunique_contacts{$sip_username} = $context->{nonunique_contacts}->{$sip_username}; + } } }, load_recursive => 0, multithreading => $provision_subscriber_multithreading, numofthreads => $provision_subscriber_numofthreads, - ),$warning_count); + ),$warning_count,\%nonunique_contacts); } - -sub _check_insert_tables { - - NGCP::BulkProcessor::Dao::Trunk::billing::contacts::check_table(); - NGCP::BulkProcessor::Dao::Trunk::billing::contracts::check_table(); - NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings::check_table(); - NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances::check_table(); - NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::check_table(); - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::check_table(); - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::check_table(); - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::check_table(); - NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users::check_table(); - +sub _get_report_obj { + my ($context) = @_; + my %dump = (); + tie(%dump, 'Tie::IxHash'); + foreach my $key (sort keys %$context) { + $dump{$key} = $context->{$key} if 'CODE' ne ref $context->{$key}; + } + foreach my $key (qw/ + sip_account_product + reseller + billing_profile + reseller_map + domain_map + domain + now + error_count + warning_count + attributes + ncos_level_map + ncos_level + nonunique_contacts + tid + db + blocksize + errorstates + queue + readertid + /) { + delete $dump{$key}; + } + return \%dump; } sub _provision_susbcriber { @@ -166,6 +218,7 @@ sub _provision_susbcriber { return 0 unless _provision_susbcriber_init_context($context,$subscriber_group); eval { + lock $db_lock; $context->{db}->db_begin(); #_warn($context,'AutoCommit is on') if $context->{db}->{drh}->{AutoCommit}; @@ -180,9 +233,14 @@ sub _provision_susbcriber { _update_contact($context); _update_contract($context); - _update_subscriber($context); - _create_aliases($context); + #{ + # lock $db_lock; #concurrent writes to voip_numbers causes deadlocks + _update_subscriber($context); + _create_aliases($context); + #} _update_preferences($context); + _set_registrations($context); + _set_callforwards($context); #todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it #in this main provisioning loop, or align it in separate run-modes, according to the files given. @@ -477,6 +535,29 @@ sub _provision_subscribers_checks { processing_info(threadid(),"adm_ncos_id attribute found",getlogger(__PACKAGE__)); } + foreach my $cf_attribute (@NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CF_ATTRIBUTES) { + eval { + $context->{attributes}->{$cf_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute($cf_attribute); + }; + if ($@ or not defined $context->{attributes}->{$cf_attribute}) { + rowprocessingerror(threadid(),"cannot find $cf_attribute attribute",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"$cf_attribute attribute found",getlogger(__PACKAGE__)); + } + } + + eval { + $context->{attributes}->{ringtimeout} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::RINGTIMEOUT_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{ringtimeout}) { + rowprocessingerror(threadid(),'cannot find ringtimeout attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"ringtimeout attribute found",getlogger(__PACKAGE__)); + } + return $result; } @@ -828,12 +909,80 @@ sub _create_aliases { return $result; } +sub _set_registrations { + + my ($context) = @_; + my $result = 1; + foreach my $registration (@{$context->{registrations}}) { + #print "blah"; + $registration->{id} = NGCP::BulkProcessor::Dao::Trunk::kamailio::location::insert_row($context->{db}, + %$registration); + _info($context,"permanent registration $registration->{contact} added",1); + } + return $result; + +} + +sub _set_callforwards { + + my ($context) = @_; + my $result = 1; + foreach my $type (keys %{$context->{callforwards}}) { + #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw(); + #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw(); + + my $destination_set_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets::insert_row($context->{db},{ + subscriber_id => $context->{prov_subscriber}->{id}, + name => "quickset_$type", + }); + foreach my $callforward (@{$context->{callforwards}->{$type}}) { + $callforward->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations::insert_row($context->{db},{ + %$callforward, + destination_set_id => $destination_set_id, + }); + } + my $cf_mapping_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::insert_row($context->{db},{ + subscriber_id => $context->{prov_subscriber}->{id}, + type => $type, + destination_set_id => $destination_set_id, + #time_set_id + }); + + $context->{preferences}->{$type} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{$type}, + $cf_mapping_id), value => $cf_mapping_id }; + + if (defined $context->{ringtimeout}) { + $context->{preferences}->{ringtimeout} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{ringtimeout}, + $context->{ringtimeout}), value => $context->{ringtimeout} }; + } + _info($context,"$type created (destination(s) " . join(', ',(map { $_->{destination}; } @{$context->{callforwards}->{$type}})) . ")",1); + + $context->{callforwards}->{$type} = { + destination_set => { + destinations => $context->{callforwards}->{$type}, + id => $destination_set_id, + }, + id => $cf_mapping_id, + }; + } + return $result; + +} + sub _provision_susbcriber_init_context { my ($context,$subscriber_group) = @_; my $result = 1; + $context->{log_info} = []; + $context->{log_warning} = []; + $context->{log_error} = []; + my $first = $subscriber_group->[0]; unless (defined $first->{sip_username} and length($first->{sip_username}) > 0) { @@ -869,6 +1018,7 @@ sub _provision_susbcriber_init_context { my %contact_dupes = (); my %allowed_ips = (); my %barrings = (); + my $voicemail = 0; foreach my $subscriber (@$subscriber_group) { my $number = $subscriber->{cc} . $subscriber->{ac} . $subscriber->{sn}; if (not exists $number_dupes{$number}) { @@ -877,12 +1027,13 @@ sub _provision_susbcriber_init_context { ac => $subscriber->{ac}, sn => $subscriber->{sn}, number => $number, - delta => $subscriber->{delta}, + #delta => $subscriber->{delta}, additional => 0, + filename => $subscriber->{filename}, }); $number_dupes{$number} = 1; } else { - _warn($context,'duplicate number $number (subscriber table) ignored'); + _warn($context,"duplicate number $number ($subscriber->{filename}) ignored"); } if (not exists $contact_dupes{$subscriber->{contact_hash}}) { @@ -908,7 +1059,8 @@ sub _provision_susbcriber_init_context { }; $contact_dupes{$subscriber->{contact_hash}} = 1; } else { - _warn($context,'non-unique contact hash, skipped'); + _warn($context,'non-unique contact data, skipped'); + $context->{nonunique_contacts}->{$context->{prov_subscriber}->{username}} += 1; } } @@ -953,6 +1105,18 @@ sub _provision_susbcriber_init_context { $barrings{$subscriber->{barrings}} = 1; } + $voicemail = stringtobool($subscriber->{voicemail}) unless $voicemail; + + } + unless (defined $context->{channels}) { + my $default_channels = 1; + foreach my $numbers (sort { $a <=> $b } keys %$default_channels_map) { + if ((scalar @numbers) > $numbers) { + $default_channels = $default_channels_map->{$numbers}; + } + } + _info($context,"using $default_channels channels by default for " . (scalar @numbers) . ' numbers',1); + $context->{channels} = $default_channels; } unless (defined $context->{prov_subscriber}->{webusername} and length($context->{prov_subscriber}->{webusername}) > 0) { @@ -1006,12 +1170,13 @@ sub _provision_susbcriber_init_context { ac => $allowed_cli->{ac}, sn => $allowed_cli->{sn}, number => $number, - delta => $allowed_cli->{delta}, + #delta => $allowed_cli->{delta}, additional => 1, + filename => $allowed_cli->{filename}, }); $number_dupes{$number} = 1; } else { - _warn($context,'duplicate number $number (allowed_cli table) ignored'); + _warn($context,"duplicate number $number ($allowed_cli->{filename}) ignored"); } } @@ -1055,6 +1220,72 @@ sub _provision_susbcriber_init_context { $context->{clir} = stringtobool($clir->{clir}); } + $context->{ringtimeout} = undef; + my %cfsimple = (); + my $callforwards = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward::findby_sipusername($first->{sip_username}); + if ((scalar @$callforwards) > 0 or $voicemail) { + my %vmcf = (); + my %maxpriority = (); + foreach my $callforward (@$callforwards) { + my $type = lc($callforward->{type}); + if ($type =~ /$cf_types_pattern/) { + unless (defined $callforward->{destination} and length($callforward->{destination}) > 0) { + _warn($context,"empty callforward destination, ignoring"); + next; + } + if ($callforward->{destination} =~ /voicemail/i) { + $callforward->{destination} = 'sip:vm' . ('cfb' eq $type ? 'b' : 'u') . $context->{numbers}->{primary}->{number} . '@voicebox.local'; + $vmcf{$type} = 1 unless $vmcf{$type}; + } elsif ($callforward->{destination} !~ /^\d+$/i) { + _warn($context,"invalid callforward destination '$callforward->{destination}', ignoring"); + next; + } else { #todo: allow sip uri destinations + $callforward->{destination} .= '@' . $context->{domain}->{domain}; + } + $callforward->{priority} //= $cf_default_priority; + $callforward->{timeout} //= $cf_default_timeout; + $callforward->{ringtimeout} //= $cft_default_ringtimeout if 'cft' eq $type; + $context->{ringtimeout} = $callforward->{ringtimeout} if ('cft' eq $type and (not defined $context->{ringtimeout} or $callforward->{ringtimeout} > $context->{ringtimeout})); + + $cfsimple{$type} = [] unless exists $cfsimple{$type}; + push(@{$cfsimple{$type}},{ + destination => $callforward->{destination}, + priority => $callforward->{priority}, + timeout => $callforward->{timeout}, + }); + #$vmcf{$type} = ($callforward->{destination} =~ /voicemail/i) unless $vmcf{$type}; + $maxpriority{$type} = $callforward->{priority} if (not defined $maxpriority{$type} or $callforward->{priority} > $maxpriority{$type}); + } else { + _warn($context,"invalid callforward type '$type', ignoring"); + } + } + if ($voicemail) { + foreach my $type (($NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFNA_TYPE, + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFT_TYPE)) { + next if $vmcf{$type}; + $cfsimple{$type} = [] unless exists $cfsimple{$type}; + push(@{$cfsimple{$type}},{ + destination => 'sip:vmu' . $context->{numbers}->{primary}->{number} . '@voicebox.local', + priority => (defined $maxpriority{$type} ? $maxpriority{$type} + 1 : $cf_default_priority), + timeout => $cf_default_timeout, + }); + $context->{ringtimeout} = $cft_default_ringtimeout if ('cft' eq $type and not defined $context->{ringtimeout}); # or $cft_default_ringtimeout > $context->{ringtimeout})); + } + } + } + $context->{callforwards} = \%cfsimple; + + my @registrations = (); + if (my $registration = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::findby_sipusername($first->{sip_username})) { + #todo: check/transform, multiple contacts + push(@registrations,{ + username => $registration->{sip_username}, + domain => $registration->{domain}, + contact => $registration->{sip_contact}, + ruid => NGCP::BulkProcessor::Dao::Trunk::kamailio::location::next_ruid(), + }); + } + $context->{registrations} = \@registrations; #$context->{counts} = {} unless defined $context->{counts}; @@ -1062,6 +1293,7 @@ sub _provision_susbcriber_init_context { } + sub _generate_webpassword { return String::MkPasswd::mkpasswd( -length => $webpassword_length, @@ -1086,6 +1318,7 @@ sub _error { my ($context,$message) = @_; $context->{error_count} = $context->{error_count} + 1; + push(@{$context->{log_error}},$message) if exists $context->{log_error}; if ($context->{prov_subscriber}) { $message = ($context->{prov_subscriber}->{username} ? $context->{prov_subscriber}->{username} : '') . ': ' . $message; } @@ -1097,6 +1330,7 @@ sub _warn { my ($context,$message) = @_; $context->{warning_count} = $context->{warning_count} + 1; + push(@{$context->{log_warning}},$message) if exists $context->{log_warning}; if ($context->{prov_subscriber}) { $message = ($context->{prov_subscriber}->{username} ? $context->{prov_subscriber}->{username} : '') . ': ' . $message; } @@ -1107,6 +1341,7 @@ sub _warn { sub _info { my ($context,$message,$debug) = @_; + push(@{$context->{log_info}},$message) if exists $context->{log_info}; if ($context->{prov_subscriber}) { $message = ($context->{prov_subscriber}->{username} ? $context->{prov_subscriber}->{username} : '') . ': ' . $message; } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm index 38476ba..5ab3130 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm @@ -18,6 +18,7 @@ use NGCP::BulkProcessor::Logging qw( use NGCP::BulkProcessor::LogError qw( fileerror + filewarn configurationwarn configurationerror ); @@ -26,7 +27,7 @@ use NGCP::BulkProcessor::LoadConfig qw( split_tuple parse_regexp ); -use NGCP::BulkProcessor::Utils qw(prompt); +use NGCP::BulkProcessor::Utils qw(prompt timestampdigits); #format_number check_ipnet require Exporter; @@ -39,6 +40,7 @@ our @EXPORT_OK = qw( $input_path $output_path + $report_filename $defaultsettings $defaultconfig @@ -85,43 +87,27 @@ our @EXPORT_OK = qw( $provision_subscriber_numofthreads $webpassword_length $webusername_length + $default_channels_map - - $set_call_forwards_multithreading - $set_call_forwards_numofthreads - $cfb_priorities - $cfb_timeouts - $cfu_priorities - $cfu_timeouts - $cft_priorities - $cft_timeouts - $cfna_priorities - $cfna_timeouts - $cfnumber_exclude_pattern - $cfnumber_trim_pattern - $ringtimeout - - $set_preference_bulk_multithreading - $set_preference_bulk_numofthreads + $cf_default_priority + $cf_default_timeout + $cft_default_ringtimeout ); -#$concurrent_max_total -# $set_allowed_ips_multithreading -# $set_allowed_ips_numofthreads -# $allowed_ips our $defaultconfig = 'config.cfg'; our $defaultsettings = 'settings.cfg'; our $input_path = $working_path . 'input/'; our $output_path = $working_path . 'output/'; +our $report_filename = undef; our $force = 0; our $dry = 0; our $skip_errors = 0; our $run_id = ''; our $import_db_file = _get_import_db_file($run_id,'import'); -our $import_multithreading = $enablemultithreading; +our $import_multithreading = 0; #$enablemultithreading; our @subscriber_filenames = (); our $subscriber_import_numofthreads = $cpucount; @@ -158,27 +144,15 @@ our $provision_subscriber_multithreading = $enablemultithreading; our $provision_subscriber_numofthreads = $cpucount; our $webpassword_length = 8; our $webusername_length = 8; -#our $set_allowed_ips_multithreading = $enablemultithreading; -#our $set_allowed_ips_numofthreads = $cpucount; -#our $allowed_ips = []; - -our $set_call_forwards_multithreading = $enablemultithreading; -our $set_call_forwards_numofthreads = $cpucount; -our $cfb_priorities = []; -our $cfb_timeouts = []; -our $cfu_priorities = []; -our $cfu_timeouts = []; -our $cft_priorities = []; -our $cft_timeouts = []; -our $cfna_priorities = []; -our $cfna_timeouts = []; -our $cfnumber_exclude_pattern = undef; -our $cfnumber_trim_pattern = undef; -our $ringtimeout = undef; - -#our $set_preference_bulk_multithreading = $enablemultithreading; -#our $set_preference_bulk_numofthreads = $cpucount; -#our $concurrent_max_total = undef; +our $default_channels_map = { + 0 => 1, + 4 => 10, + 8 => 25, # "more than 10 numbers" => concurrent_max = 25 +}; + +our $cf_default_priority = 1; +our $cf_default_timeout = 300; +our $cft_default_ringtimeout = 20; sub update_settings { @@ -192,11 +166,23 @@ sub update_settings { #&$configurationinfocode("testinfomessage",$configlogger); $result &= _prepare_working_paths(1); + if ($data->{report_filename}) { + $report_filename = $output_path . sprintf('/' . $data->{report_filename},timestampdigits()); + if (-e $report_filename and (unlink $report_filename) == 0) { + filewarn('cannot remove ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__)); + $report_filename = undef; + } + } else { + $report_filename = undef; + } $dry = $data->{dry} if exists $data->{dry}; $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; $import_db_file = _get_import_db_file($run_id,'import'); $import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading}; + #if ($import_multithreading) { + # configurationerror($configfile,"import_multithreading must be disabled to preserve record order",getlogger(__PACKAGE__)); + #} @subscriber_filenames = _get_import_filenames(\@subscriber_filenames,$data,'subscriber_filenames'); $subscriber_import_numofthreads = _get_numofthreads($cpucount,$data,'subscriber_import_numofthreads'); @@ -239,45 +225,11 @@ sub update_settings { configurationerror($configfile,'webusername_length greater than 7 required',getlogger(__PACKAGE__)); $result = 0; } - #$set_allowed_ips_multithreading = $data->{set_allowed_ips_multithreading} if exists $data->{set_allowed_ips_multithreading}; - #$set_allowed_ips_numofthreads = _get_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads'); - #$allowed_ips = [ split_tuple($data->{allowed_ips}) ] if exists $data->{allowed_ips}; - #foreach my $ipnet (@$allowed_ips) { - # if (not check_ipnet($ipnet)) { - # configurationerror($configfile,"invalid allowed_ip '$ipnet'",getlogger(__PACKAGE__)); - # $result = 0; - # } - #} - - $set_call_forwards_multithreading = $data->{set_call_forwards_multithreading} if exists $data->{set_call_forwards_multithreading}; - $set_call_forwards_numofthreads = _get_numofthreads($cpucount,$data,'set_call_forwards_numofthreads'); - $cfb_priorities = [ split_tuple($data->{cfb_priorities}) ] if exists $data->{cfb_priorities}; - $cfb_timeouts = [ split_tuple($data->{cfb_timeouts}) ] if exists $data->{cfb_timeouts}; - $cfu_priorities = [ split_tuple($data->{cfu_priorities}) ] if exists $data->{cfu_priorities}; - $cfu_timeouts = [ split_tuple($data->{cfu_timeouts}) ] if exists $data->{cfu_timeouts}; - $cft_priorities = [ split_tuple($data->{cft_priorities}) ] if exists $data->{cft_priorities}; - $cft_timeouts = [ split_tuple($data->{cft_timeouts}) ] if exists $data->{cft_timeouts}; - $cfna_priorities = [ split_tuple($data->{cfna_priorities}) ] if exists $data->{cfna_priorities}; - $cfna_timeouts = [ split_tuple($data->{cfna_timeouts}) ] if exists $data->{cfna_timeouts}; - $cfnumber_exclude_pattern = $data->{cfnumber_exclude_pattern} if exists $data->{cfnumber_exclude_pattern}; - ($regexp_result,$cfnumber_exclude_pattern) = parse_regexp($cfnumber_exclude_pattern,$configfile); - $result &= $regexp_result; - $cfnumber_trim_pattern = $data->{cfnumber_trim_pattern} if exists $data->{cfnumber_trim_pattern}; - ($regexp_result,$cfnumber_trim_pattern) = parse_regexp($cfnumber_trim_pattern,$configfile); - $result &= $regexp_result; - $ringtimeout = $data->{ringtimeout} if exists $data->{ringtimeout}; - if (not defined $ringtimeout or $ringtimeout <= 0) { - configurationerror($configfile,'ringtimeout greater than 0 required',getlogger(__PACKAGE__)); - $result = 0; - } + #$default_channels = $data->{default_channels} if exists $data->{default_channels}; - #$set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading}; - #$set_preference_bulk_numofthreads = _get_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads'); - #$concurrent_max_total = $data->{concurrent_max_total} if exists $data->{concurrent_max_total}; - #if (defined $concurrent_max_total and $concurrent_max_total <= 0) { - # configurationerror($configfile,'empty concurrent_max_total or greater than 0 required',getlogger(__PACKAGE__)); - # $result = 0; - #} + $cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority}; + $cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout}; + $cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout}; return $result; @@ -303,10 +255,10 @@ sub _prepare_working_paths { sub _get_numofthreads { my ($default_value,$data,$key) = @_; - my $_numofthreads = $default_value; - $_numofthreads = $data->{$key} if exists $data->{$key}; - $_numofthreads = $cpucount if $_numofthreads > $cpucount; - return $_numofthreads; + my $numofthreads = $default_value; + $numofthreads = $data->{$key} if exists $data->{$key}; + $numofthreads = $cpucount if $numofthreads > $cpucount; + return $numofthreads; } sub _get_import_db_file { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/config.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/config.cfg index 8b69bdd..9c66de3 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/config.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/config.cfg @@ -6,8 +6,6 @@ enablemultithreading = 0 ##gearman/service listener config: jobservers = 127.0.0.1:4730 -#provisioning_conf = /etc/ngcp-panel/provisioning.conf - ##NGCP MySQL connectivity - "accounting" db: accounting_host = 192.168.0.84 accounting_port = 3306 diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl index dac5b00..de2ecc3 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl @@ -77,13 +77,11 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); - -use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); -use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw(); -use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw(); -use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); +use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw(); + use NGCP::BulkProcessor::Projects::Migration::Teletek::Check qw( check_billing_db_tables check_provisioning_db_tables @@ -104,16 +102,6 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Provisioning qw( provision_subscribers ); -#use NGCP::BulkProcessor::Projects::Migration::Teletek::Preferences qw( -# set_allowed_ips -# -# set_preference_bulk -#); - -#use NGCP::BulkProcessor::Projects::Migration::Teletek::Api qw( -# set_call_forwards -#); - scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet my @TASK_OPTS = (); @@ -409,9 +397,9 @@ sub import_allowedcli_task { $stats .= "\n removed: $deleted_count rows"; }; if ($err or !$result) { - push(@$messages,"importing allowed clis INCOMPLETE$stats"); + push(@$messages,"importing allowed clis (additional numbers) INCOMPLETE$stats"); } else { - push(@$messages,"importing allowed clis completed$stats"); + push(@$messages,"importing allowed clis (additional numbers) completed$stats"); } destroy_all_dbs(); #every task should leave with closed connections. return $result; @@ -428,13 +416,13 @@ sub import_truncate_allowedcli_task { my $err = $@; my $stats = ''; eval { - $stats .= "\n total allowed cli records: " . + $stats .= "\n total allowed cli (additional numbers) records: " . NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::countby_ccacsn() . ' rows'; }; if ($err or !$result) { - push(@$messages,"truncating imported allowed clis INCOMPLETE$stats"); + push(@$messages,"truncating imported allowed clis (additional numbers) INCOMPLETE$stats"); } else { - push(@$messages,"truncating imported allowed clis completed$stats"); + push(@$messages,"truncating imported allowed clis (additional numbers) completed$stats"); } destroy_all_dbs(); #every task should leave with closed connections. return $result; @@ -502,7 +490,6 @@ sub import_truncate_clir_task { } - sub import_callforward_task { my ($messages) = @_; @@ -563,9 +550,6 @@ sub import_truncate_callforward_task { } - - - sub import_registration_task { my ($messages) = @_; @@ -625,99 +609,48 @@ sub import_truncate_registration_task { } - - - - - sub create_subscriber_task { my ($messages) = @_; - my ($result,$warning_count) = (0,0); + my ($result,$warning_count,$nonunique_contacts) = (0,0,{}); eval { - ($result,$warning_count) = provision_subscribers(); + ($result,$warning_count,$nonunique_contacts) = provision_subscribers(); }; my $err = $@; my $stats = ": $warning_count warnings"; eval { - #$stats .= "\n total contracts: " . - # NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(undef,$reseller_id) . ' rows'; - #my $active_count = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid( - # $NGCP::BulkProcessor::Dao::Trunk::billing::contracts::ACTIVE_STATE, - # $reseller_id - #); - #$stats .= "\n active: $active_count rows"; - #my $terminated_count = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid( - # $NGCP::BulkProcessor::Dao::Trunk::billing::contracts::TERMINATED_STATE, - # $reseller_id - #); - #$stats .= "\n terminated: $terminated_count rows"; - - #$stats .= "\n total subscribers: " . - # NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(undef,$reseller_id) . ' rows'; - #$active_count = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid( - # $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE, - # $reseller_id - #); - #$stats .= "\n active: $active_count rows"; - #$terminated_count = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid( - # $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::TERMINATED_STATE, - # $reseller_id - #); - #$stats .= "\n terminated: $terminated_count rows"; + $stats .= "\n total contracts: " . + NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(undef,undef) . ' rows'; + $stats .= "\n total subscribers: " . + NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(undef,undef) . ' rows'; + + $stats .= "\n total aliases: " . + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,undef) . ' rows'; + $stats .= "\n primary aliases: " . + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,1) . ' rows'; + + $stats .= "\n call forwards: " . + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,undef) . ' rows'; + + $stats .= "\n registrations: " . + NGCP::BulkProcessor::Dao::Trunk::kamailio::location::countby_usernamedomain(undef,undef) . ' rows'; + + $stats .= "\n non-unique contacts skipped:\n " . join("\n ",keys %$nonunique_contacts) + if (scalar keys %$nonunique_contacts) > 0; }; if ($err or !$result) { push(@$messages,"create subscribers INCOMPLETE$stats"); } else { push(@$messages,"create subscribers completed$stats"); - #if (not $dry and $reprovision_upon_password_change and $updated_password_count > 0) { - # push(@$messages,"THERE WERE $updated_password_count UPDATED PASSWORDS. YOU MIGHT WANT TO RESTART SEMS NOW ..."); - #} + if (not $dry) { + push(@$messages,"YOU MIGHT WANT TO RESTART KAMAILIO FOR PERMANENT REGISTRATIONS TO COME INTO EFFECT"); + } } destroy_all_dbs(); #every task should leave with closed connections. return $result; } -#sub set_call_forwards_task { -# -# my ($messages,$mode) = @_; -# my ($result,$warning_count) = (0,0); -# eval { -# if ($batch) { -# ($result,$warning_count) = set_call_forwards_batch($mode); -# } else { -# ($result,$warning_count) = set_call_forwards($mode); -# } -# }; -# my $err = $@; -# my $stats = ($skip_errors ? ": $warning_count warnings" : ''); -# eval { -# $stats .= "\n '" . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFU_TYPE . "': " . -# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef, -# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFU_TYPE) . ' rows'; -# -# $stats .= "\n '" . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFB_TYPE . "': " . -# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef, -# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFB_TYPE) . ' rows'; -# -# $stats .= "\n '" . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFT_TYPE . "': " . -# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef, -# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFT_TYPE) . ' rows'; -# -# $stats .= "\n '" . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFNA_TYPE . "': " . -# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef, -# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFNA_TYPE) . ' rows'; -# }; -# if ($err or !$result) { -# push(@$messages,"set subscribers\' call forwards INCOMPLETE$stats"); -# } else { -# push(@$messages,"set subscribers\' call forwards completed$stats"); -# } -# destroy_all_dbs(); #every task should leave with closed connections. -# return $result; -# -#} #END { # # this should not be required explicitly, but prevents Log4Perl's diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg index 1e79358..18d54db 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg @@ -40,22 +40,13 @@ ignore_registration_unique = 0 registration_import_single_row_txn = 1 provision_subscriber_multithreading = 1 -#provision_subscriber_numofthreads = 6 +provision_subscriber_numofthreads = 2 webpassword_length = 8 webusername_length = 8 +report_filename = provision.txt +#report_filename = provision_%s.json +#default_channels = 1 - - -set_call_forwards_multithreading = 1 -#set_call_forwards_numofthreads = 6 -cfb_priorities = -cfb_timeouts = 300 -cfu_priorities = 1 -cfu_timeouts = 300 -cft_priorities = 1 -cft_timeouts = 300 -cfna_priorities = 1 -cfna_timeouts = 300 -ringtimeout = 20 -#cfnumber_exclude_pattern = -cfnumber_trim_pattern = ^05\d{2} +cf_default_priority: 1 +cf_default_timeout: 300 +cft_default_ringtimeout: 20 diff --git a/lib/NGCP/BulkProcessor/RestConnector.pm b/lib/NGCP/BulkProcessor/RestConnector.pm index 5e55334..376bcb5 100644 --- a/lib/NGCP/BulkProcessor/RestConnector.pm +++ b/lib/NGCP/BulkProcessor/RestConnector.pm @@ -25,6 +25,7 @@ require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( _add_headers + convert_bools ); #my $logger = getlogger(__PACKAGE__); @@ -88,6 +89,11 @@ sub baseuri { } +sub path { + my $self = shift; + return $self->{path}; +} + sub _clearrequestdata { my $self = shift; @@ -188,11 +194,15 @@ sub _get_request_uri { if (defined $path_query) { if (blessed($path_query) and $path_query->isa('URI')) { $path_query = $path_query->path_query(); + if (defined $self->{path} and length($self->{path}) > 0) { + $path_query =~ s!^$self->{path}!!; + } } } else { $path_query = ''; } if (defined $self->{path} and length($self->{path}) > 0) { + #$path_query =~ s!^$self->{path}!!; $path_query =~ s!^/!!; $path_query = $self->{path} . $path_query; } @@ -244,6 +254,33 @@ sub _post { } +sub _post_raw { + + my $self = shift; + my ($path_query_request,$data,$headers) = @_; + $self->_clearrequestdata(); + $self->{requestdata} = $data; + if (blessed($path_query_request) and $path_query_request->isa('HTTP::Request')) { + $self->{req} = $path_query_request; + $self->_log_request($self->{req}); + } else { + $self->{req} = HTTP::Request->new('POST',$self->_get_request_uri($path_query_request)); + _add_headers($self->{req},$headers); + $self->_log_request($self->{req}); + $self->{req}->content($data); + } + $self->{res} = $self->_ua_request($self->{req}); + $self->_log_response($self->{res}); + eval { + $self->{responsedata} = $self->_decode_post_response($self->{res}->decoded_content()); + }; + if ($@) { + restresponseerror($self,'error decoding POST response content: ' . $@,$self->{res},getlogger(__PACKAGE__)); + } + return $self->{res}; + +} + sub post { my $self = shift; notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); @@ -281,11 +318,62 @@ sub _get { } +sub _get_raw { + + my $self = shift; + my ($path_query,$headers) = @_; + $self->_clearrequestdata(); + $self->{req} = HTTP::Request->new('GET',$self->_get_request_uri($path_query)); + _add_headers($self->{req},$headers); + $self->_log_request($self->{req}); + $self->{res} = $self->_ua_request($self->{req}); + $self->_log_response($self->{res}); + return $self->{res}; + +} + sub get { my $self = shift; notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); } +sub _add_head_headers { + my $self = shift; + my ($req,$headers) = @_; + _add_headers($req,$headers); +} + +sub _decode_head_response { + my $self = shift; + my ($data) = @_; + return $self->_decode_response_content($data); +} + +sub _head { + + my $self = shift; + my ($path_query,$headers) = @_; + $self->_clearrequestdata(); + $self->{req} = HTTP::Request->new('HEAD',$self->_get_request_uri($path_query)); + $self->_add_head_headers($self->{req},$headers); + $self->_log_request($self->{req}); + $self->{res} = $self->_ua_request($self->{req}); + $self->_log_response($self->{res}); + eval { + $self->{responsedata} = $self->_decode_head_response($self->{res}->decoded_content()); + }; + if ($@) { + restresponseerror($self,'error decoding HEAD response content: ' . $@,$self->{res},getlogger(__PACKAGE__)); + } + return $self->{res}; + +} + +sub head { + my $self = shift; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); +} + sub _add_patch_headers { my $self = shift; my ($req,$headers) = @_; @@ -381,6 +469,7 @@ sub _put { } + sub put { my $self = shift; notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); @@ -435,9 +524,37 @@ sub _get_page_size_query_param { notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); } +sub _get_total_count_expected_query_param { + my $self = shift; + my ($total_count_expected) = @_; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); +} + +sub _get_sf_query_param { + my $self = shift; + my ($sf) = @_; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); +} + sub get_collection_page_query_uri { my $self = shift; - my ($collection_path_query,$page_size,$page_num) = @_; + my $collection_path_query = shift; + my $page_size; + my $page_num; + my $total_count_expected; + my $sf; + if (ref $_[0]) { + my $p = shift; + $page_size = $p->{page_size}; + $page_num = $p->{page_num}; + $total_count_expected = 1; + $sf = shift; + } else { + ($page_size,$page_num) = @_; + $total_count_expected = 0; + $sf = undef; + } + #my ($collection_path_query,$page_size,$page_num) = @_; #if ($page_size <= 0) { # resterror($self,"positive collection page size required",getlogger(__PACKAGE__)); #} @@ -447,10 +564,15 @@ sub get_collection_page_query_uri { my $page_uri = $self->_get_request_uri($collection_path_query); my $page_size_query_param = $self->_get_page_size_query_param($page_size); my $page_num_query_param = $self->_get_page_num_query_param($page_num); + my $total_count_expected_query_param = $self->_get_total_count_expected_query_param($total_count_expected); + my $sf_query_param; + $sf_query_param = $self->_get_sf_query_param($sf) if defined $sf; my @query_params = (); push(@query_params,$page_uri->query()) if $page_uri->query(); push(@query_params,$page_size_query_param) if defined $page_size_query_param && length($page_size_query_param) > 0; push(@query_params,$page_num_query_param) if defined $page_num_query_param && length($page_num_query_param) > 0; + push(@query_params,$total_count_expected_query_param) if defined $total_count_expected_query_param && length($total_count_expected_query_param) > 0; + push(@query_params,$sf_query_param) if defined $sf_query_param && length($sf_query_param) > 0; $page_uri->query(join('&',@query_params)); @@ -498,4 +620,40 @@ sub get_defaultcollectionpagesize { notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); } +sub get_firscollectionpagenum { + my $self = shift; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); +} + +sub convert_bools { + my %unrecognized; + + local *_convert_bools = sub { + my $ref_type = ref($_[0]); + if (!$ref_type) { + # Nothing. + } + elsif ($ref_type eq 'HASH') { + _convert_bools($_) for values(%{ $_[0] }); + } + elsif ($ref_type eq 'ARRAY') { + _convert_bools($_) for @{ $_[0] }; + } + elsif ( + $ref_type eq 'JSON::PP::Boolean' # JSON::PP + || $ref_type eq 'Types::Serialiser::Boolean' # JSON::XS + ) { + $_[0] = $_[0] ? 1 : 0; + } + else { + ++$unrecognized{$ref_type}; + } + }; + + &_convert_bools; + + carp("Encountered an object of unrecognized type $_") + for sort values(%unrecognized); +} + 1; diff --git a/lib/NGCP/BulkProcessor/RestProcessor.pm b/lib/NGCP/BulkProcessor/RestProcessor.pm index f44f870..68812ac 100644 --- a/lib/NGCP/BulkProcessor/RestProcessor.pm +++ b/lib/NGCP/BulkProcessor/RestProcessor.pm @@ -27,7 +27,7 @@ use NGCP::BulkProcessor::LogError qw( restprocessingfailed ); -use NGCP::BulkProcessor::Utils qw(threadid urlencode urldecode); +use NGCP::BulkProcessor::Utils qw(threadid); require Exporter; our @ISA = qw(Exporter); @@ -36,6 +36,7 @@ our @EXPORT_OK = qw( copy_row process_collection get_query_string + override_fields ); my $collectionprocessing_threadqueuelength = 10; @@ -55,12 +56,18 @@ sub get_query_string { } else { $query .= '&'; } - #$query .= URI::Escape::uri_escape($param) . '=' . URI::Escape::uri_escape($filters->{$param}); - $query .= urlencode($param) . '=' . urlencode($filters->{$param}); + $query .= URI::Escape::uri_escape($param) . '=' . URI::Escape::uri_escape_utf8($filters->{$param}); } return $query; }; +sub override_fields { + my ($item,$load_recursive) = @_; + foreach my $override (keys %{$load_recursive->{_overrides}}) { + $item->{$override} = $load_recursive->{_overrides}->{$override}; + } +} + sub init_item { my ($item,$fieldnames) = @_; @@ -102,6 +109,7 @@ sub copy_row { $item->{$fieldname} = undef; } } else { + $item->{$fieldname} = $row; #scalar last; } } @@ -114,6 +122,7 @@ sub process_collection { my %params = @_; my ($get_restapi, $path_query, + $post_data, $headers, $extract_collection_items_params, $process_code, @@ -125,6 +134,7 @@ sub process_collection { $collectionprocessing_threads) = @params{qw/ get_restapi path_query + post_data headers extract_collection_items_params process_code @@ -163,6 +173,7 @@ sub process_collection { headers => $headers, blocksize => $blocksize, extract_collection_items_params => $extract_collection_items_params, + post_data => $post_data, }); for (my $i = 0; $i < $collectionprocessing_threads; $i++) { @@ -215,7 +226,9 @@ sub process_collection { my $i = 0; while (1) { fetching_items($restapi,$path_query,$i,$blocksize,getlogger(__PACKAGE__)); - my $collection_page = $restapi->get($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount),$headers); + my $collection_page; + $collection_page = $restapi->get($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$headers) unless $post_data; + $collection_page = $restapi->post($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$post_data,$headers) if $post_data; my $rowblock = $restapi->extract_collection_items($collection_page,$blocksize,$blockcount,$extract_collection_items_params); my $realblocksize = scalar @$rowblock; if ($realblocksize > 0) { @@ -292,7 +305,9 @@ sub _reader { while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer fetching_items($restapi,$context->{path_query},$i,$blocksize,getlogger(__PACKAGE__)); - my $collection_page = $restapi->get($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount),$context->{headers}); + my $collection_page; + $collection_page = $restapi->get($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$context->{headers}) unless $context->{post_data}; + $collection_page = $restapi->post($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$context->{post_data},$context->{headers}) if $context->{post_data}; my $rowblock = $restapi->extract_collection_items($collection_page,$blocksize,$blockcount,$context->{extract_collection_items_params}); my $realblocksize = scalar @$rowblock; my %packet :shared = (); diff --git a/lib/NGCP/BulkProcessor/Serialization.pm b/lib/NGCP/BulkProcessor/Serialization.pm index a234380..57c5c18 100755 --- a/lib/NGCP/BulkProcessor/Serialization.pm +++ b/lib/NGCP/BulkProcessor/Serialization.pm @@ -45,7 +45,8 @@ use MIME::Base64 qw(encode_base64 decode_base64); #http://blogs.perl.org/users/steven_haryanto/2010/09/comparison-of-perl-serialization-modules.html use Storable; # qw( nfreeze thaw ); -use JSON::XS; # qw(encode_json decode_json); +use JSON qw(); +#use JSON::XS; # qw(encode_json decode_json); use Data::Dump; # qw(dump); $Data::Dump::INDENT = ' '; @@ -166,12 +167,14 @@ sub deserialize_xml { sub serialize_json { my $input_ref = shift; - return JSON::XS::encode_json($input_ref); + #return JSON::XS::encode_json($input_ref); + return JSON::to_json($input_ref,{ allow_blessed => 1, convert_blessed => 1, pretty => 0 }); } sub deserialize_json { my $input_ref = shift; - return JSON::XS::decode_json($input_ref); + #return JSON::XS::decode_json($input_ref); + return JSON::from_json($$input_ref); } sub serialize_yaml { diff --git a/lib/NGCP/BulkProcessor/SqlConnector.pm b/lib/NGCP/BulkProcessor/SqlConnector.pm index 5b2d71e..7f2236f 100644 --- a/lib/NGCP/BulkProcessor/SqlConnector.pm +++ b/lib/NGCP/BulkProcessor/SqlConnector.pm @@ -882,6 +882,10 @@ sub db_get_begin { if ($transactional) { #$self->lock_tables({ $tablename => 'WRITE' }); $self->db_begin(); + } else { + my $offset = shift; + my $limit = shift; + $query = $self->paginate_sort_query($query,$offset,$limit,undef); } $self->{sth} = $self->{dbh}->prepare($query) or $self->_prepare_error($query); @@ -902,6 +906,13 @@ sub multithreading_supported { } +sub rowblock_transactional { + + my $self = shift; + return 0; + +} + sub db_get_rowblock { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm index bcab1bb..84d841b 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm @@ -486,6 +486,13 @@ sub multithreading_supported { } +sub rowblock_transactional { + + my $self = shift; + return $rowblock_transactional; + +} + sub truncate_table { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm index 53ab0bb..9a3360c 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm @@ -438,6 +438,13 @@ sub multithreading_supported { } +sub rowblock_transactional { + + my $self = shift; + return $rowblock_transactional; + +} + sub insert_ignore_phrase { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm index a392959..c5bab61 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm @@ -465,6 +465,13 @@ sub multithreading_supported { } +sub rowblock_transactional { + + my $self = shift; + return $rowblock_transactional; + +} + sub truncate_table { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm index 285ab7a..4e84497 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm @@ -469,6 +469,13 @@ sub multithreading_supported { } +sub rowblock_transactional { + + my $self = shift; + return $rowblock_transactional; + +} + sub truncate_table { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm index 3dbb8f9..d03cc50 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm @@ -459,6 +459,13 @@ sub multithreading_supported { } +sub rowblock_transactional { + + my $self = shift; + return $rowblock_transactional; + +} + sub truncate_table { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm index 541efec..a38bf5c 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm @@ -75,7 +75,7 @@ my $LongTruncOk = 0; #my $lock_do_chunk = 0; #1; #my $lock_get_chunk = 0; #1; -my $rowblock_transactional = 1; +my $rowblock_transactional = 0; #SQLite transactions are always serializable. @@ -341,7 +341,11 @@ sub getfieldnames { my $self = shift; my $tablename = shift; - my @fieldnames = keys %{$self->db_get_all_hashref('PRAGMA table_info(' . $tablename . ')','name')}; + #my @fieldnames = keys %{$self->db_get_all_hashref('PRAGMA table_info(' . $tablename . ')','name')}; + my @fieldnames = (); + foreach my $field (@{$self->db_get_all_arrayref('PRAGMA table_info(' . $tablename . ')')}) { + push(@fieldnames,$field->{name}); + } return \@fieldnames; } @@ -351,13 +355,21 @@ sub getprimarykeycols { my $self = shift; my $tablename = shift; #return $self->db_get_col('SHOW FIELDS FROM ' . $tablename); - my $fieldinfo = $self->db_get_all_hashref('PRAGMA table_info(' . $tablename . ')','name'); + #my $fieldinfo = $self->db_get_all_hashref('PRAGMA table_info(' . $tablename . ')','name'); + #my @keycols = (); + #foreach my $fieldname (keys %$fieldinfo) { + # if ($fieldinfo->{$fieldname}->{'pk'}) { + # push @keycols,$fieldname; + # } + #} + my @keycols = (); - foreach my $fieldname (keys %$fieldinfo) { - if ($fieldinfo->{$fieldname}->{'pk'}) { - push @keycols,$fieldname; + foreach my $field (@{$self->db_get_all_arrayref('PRAGMA table_info(' . $tablename . ')')}) { + if ($field->{'pk'}) { + push(@keycols,$field->{name}); } } + return \@keycols; } @@ -488,6 +500,13 @@ sub multithreading_supported { } +sub rowblock_transactional { + + my $self = shift; + return $rowblock_transactional; + +} + sub insert_ignore_phrase { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlProcessor.pm b/lib/NGCP/BulkProcessor/SqlProcessor.pm index 799eb10..27e75ec 100644 --- a/lib/NGCP/BulkProcessor/SqlProcessor.pm +++ b/lib/NGCP/BulkProcessor/SqlProcessor.pm @@ -905,12 +905,14 @@ sub transfer_table { #$target_db = &$get_target_db($writer_connection_name); eval { - $db->db_get_begin($selectstatement,@$values); #$tablename + $db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename my $i = 0; while (1) { fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)); + $db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional; my $rowblock = $db->db_get_rowblock($blocksize); + $db->db_finish() unless $db->rowblock_transactional; my $realblocksize = scalar @$rowblock; if ($realblocksize > 0) { writing_rows($target_db,$targettablename,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__)); @@ -931,7 +933,7 @@ sub transfer_table { last; } } - $db->db_finish(); + $db->db_finish() if $db->rowblock_transactional; }; @@ -1084,7 +1086,6 @@ sub process_table { } my $errorstate = $RUNNING; - #my $blocksize; if ($enablemultithreading and $multithreading and $db->multithreading_supported() and $cpucount > 1) { # and $multithreaded) { # definitely no multithreading when CSVDB is involved @@ -1209,12 +1210,14 @@ sub process_table { &$init_process_context_code($context); } - $db->db_get_begin($selectstatement,@$values); #$tablename + $db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename my $i = 0; while (1) { fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)); + $db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional; my $rowblock = $db->db_get_rowblock($blocksize); + $db->db_finish() unless $db->rowblock_transactional; my $realblocksize = scalar @$rowblock; if ($realblocksize > 0) { processing_rows($tid,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__)); @@ -1233,7 +1236,7 @@ sub process_table { last; } } - $db->db_finish(); + $db->db_finish() if $db->rowblock_transactional; }; @@ -1374,7 +1377,7 @@ sub _reader { my $blockcount = 0; eval { $reader_db = &{$context->{get_db}}(); #$reader_connection_name); - $reader_db->db_get_begin($context->{selectstatement},@{$context->{values_ref}}); #$context->{tablename} + $reader_db->db_get_begin($context->{selectstatement},@{$context->{values_ref}}) if $reader_db->rowblock_transactional; #$context->{tablename} tablethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__)); while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up #yield(); @@ -1384,7 +1387,9 @@ sub _reader { my $state = $RUNNING; #start at first while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer fetching_rows($reader_db,$context->{tablename},$i,$context->{blocksize},$context->{rowcount},getlogger(__PACKAGE__)); + $reader_db->db_get_begin($context->{selectstatement},$i,$context->{blocksize},@{$context->{values_ref}}) unless $reader_db->rowblock_transactional; my $rowblock = $reader_db->db_get_rowblock($context->{blocksize}); + $reader_db->db_finish() unless $reader_db->rowblock_transactional; my $realblocksize = scalar @$rowblock; #my $packet = {rows => $rowblock, # size => $realblocksize, @@ -1419,7 +1424,7 @@ sub _reader { (($state & $ERROR) == 0 ? 'no defunct thread(s)' : 'defunct thread(s)') . ') ...' ,getlogger(__PACKAGE__)); } - $reader_db->db_finish(); + $reader_db->db_finish() if $reader_db->rowblock_transactional; }; tablethreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__)); # stop the consumer: