diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_domains.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_domains.pm index 42b81ae..44f6826 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_domains.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_domains.pm @@ -29,6 +29,7 @@ our @EXPORT_OK = qw( insert_row findby_domain + findby_id ); my $tablename = 'voip_domains'; @@ -72,6 +73,23 @@ sub findby_domain { } +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub insert_row { my $db = &$get_db(); diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Dao/Location.pm b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Dao/Location.pm index 40f69bb..9c6f3cb 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Dao/Location.pm +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Dao/Location.pm @@ -15,7 +15,7 @@ use NGCP::BulkProcessor::SqlProcessor qw( copy_row insert_stmt - + transfer_table ); #process_table use NGCP::BulkProcessor::SqlRecord qw(); @@ -35,6 +35,7 @@ our @EXPORT_OK = qw( update_delta findby_delta countby_delta + copy_table $deleted_delta $updated_delta @@ -241,6 +242,26 @@ sub buildrecords_fromrows { } +sub copy_table { + + my ($get_target_db) = @_; + + check_table(); + #checktableinfo($get_target_db, + # __PACKAGE__,$tablename, + # get_fieldnames(1), + # $indexes); + + return transfer_table( + get_db => $get_db, + class => __PACKAGE__, + get_target_db => $get_target_db, + targetclass => __PACKAGE__, + targettablename => $tablename, + ); + +} + #sub process_records { # # my %params = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Process.pm b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Process.pm index 2dee47b..acaf212 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Process.pm +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Process.pm @@ -41,6 +41,7 @@ use NGCP::BulkProcessor::ConnectorPool qw( ); use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); use NGCP::BulkProcessor::Redis::Trunk::location::usrdom qw(); @@ -49,10 +50,11 @@ use NGCP::BulkProcessor::Utils qw(threadid trim); require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( - load_registrations + load_registrations_file + load_registrations_all ); -sub load_registrations { +sub load_registrations_file { my ($file) = @_; $file //= $usernames_filename; @@ -131,6 +133,78 @@ sub load_registrations { } +sub load_registrations_all { + + my $result = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::create_table(0); + + my $upsert = _locations_reset_delta(); + + destroy_all_dbs(); #close all db connections before forking.. + destroy_stores(); + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::process_records( + process_code => sub { + my ($context,$records,$row_offset) = @_; + my @data = (); + my @location_rows = (); + foreach my $record (@$records) { + if (_load_registrations_all_init_context($context,$record)) { + foreach my $registration (@{$context->{registrations}}) { + my %r = %{$registration->getvalue}; my @row_ext = @r{@NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::fieldnames}; + if ($context->{upsert}) { + push(@row_ext,$registration->getvalue()->{ruid}); + } else { + push(@row_ext,$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::added_delta); + } + push(@location_rows,\@row_ext); + } + if ($location_single_row_txn and (scalar @location_rows) > 0) { + while (defined (my $location_row = shift @location_rows)) { + if ($skip_errors) { + eval { _insert_location_rows($context,[$location_row]); }; + _warn($context,$@) if $@; + } else { + _insert_location_rows($context,[$location_row]); + } + } + } + } + } + if (not $location_single_row_txn and (scalar @location_rows) > 0) { + if ($skip_errors) { + eval { _insert_location_rows($context,\@location_rows); }; + _warn($context,$@) if $@; + } else { + _insert_location_rows($context,\@location_rows); + } + } + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_sqlite_db(); + #$context->{redis} = &get_location_store(); + $context->{upsert} = $upsert; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + #undef $context->{redis}; + destroy_all_dbs(); + destroy_stores(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $load_registrations_multithreading, + numofthreads => $load_registrations_numofthreads, + ),$warning_count); + +} + sub _locations_reset_delta { my $upsert = 0; if (NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta() > 0) { @@ -163,14 +237,14 @@ sub _insert_location_rows { } } -sub _load_registrations_init_context() { +sub _load_registrations_file_init_context() { my ($context,$username,$domain) = @_; $context->{username} = $username; $context->{domain} = $domain; my @registrations = (); my $result = 1; - $context->{usrdom} = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::get_usrdom_by_username_domain($username,$domain,{ _entries => 1, }); + $context->{usrdom} = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::get_usrdom_by_username_domain($context->{username},$context->{domain},{ _entries => 1, }); if ($context->{usrdom}) { foreach my $entry (@{$context->{usrdom}->{_entries}}) { push(@registrations,$entry); # if expiry > now @@ -182,7 +256,24 @@ sub _load_registrations_init_context() { } - +sub _load_registrations_all_init_context() { + + my ($context,$prov_subscriber) = @_; + $context->{username} = $prov_subscriber->{username}; + $context->{domain} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains::findby_id($prov_subscriber->{domain_id})->{domain}; + my @registrations = (); + my $result = 1; + $context->{usrdom} = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::get_usrdom_by_username_domain(lc($context->{username}),$context->{domain},{ _entries => 1, }); + if ($context->{usrdom}) { + foreach my $entry (@{$context->{usrdom}->{_entries}}) { + push(@registrations,$entry); # if expiry > now + } + } + $result = 0 unless scalar @registrations; + $context->{registrations} = \@registrations; + return $result; + +} sub _error { diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/ProjectConnectorPool.pm index cecdc51..3ed5f6c 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/ProjectConnectorPool.pm +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/ProjectConnectorPool.pm @@ -8,7 +8,7 @@ use Cwd; use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw( - + $csv_dir $sqlite_db_file ); @@ -17,7 +17,7 @@ use NGCP::BulkProcessor::ConnectorPool qw( ); -#use NGCP::BulkProcessor::SqlConnectors::MySQLDB; +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(); use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); @@ -27,6 +27,10 @@ our @ISA = qw(Exporter); our @EXPORT_OK = qw( get_sqlite_db + sqlite_db_tableidentifier + + get_csv_db + csv_db_tableidentifier destroy_dbs destroy_all_dbs @@ -34,6 +38,7 @@ our @EXPORT_OK = qw( ); my $sqlite_dbs = {}; +my $csv_dbs = {}; sub get_sqlite_db { @@ -62,6 +67,31 @@ sub sqlite_db_tableidentifier { } +sub get_csv_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); + if (not defined $csv_dbs->{$name}) { + $csv_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::CSVDB->new($instance_name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $csv_dbs->{$name}->db_connect($csv_dir); + } + return $csv_dbs->{$name}; + +} + +sub csv_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::CSVDB::get_tableidentifier($tablename,$csv_dir)); + +} + sub destroy_dbs { foreach my $name (keys %$sqlite_dbs) { @@ -69,6 +99,12 @@ sub destroy_dbs { undef $sqlite_dbs->{$name}; delete $sqlite_dbs->{$name}; } + + foreach my $name (keys %$csv_dbs) { + cleartableinfo($csv_dbs->{$name}); + undef $csv_dbs->{$name}; + delete $csv_dbs->{$name}; + } } @@ -77,4 +113,8 @@ sub destroy_all_dbs() { NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); } +sub ping_all_dbs() { + NGCP::BulkProcessor::ConnectorPool::ping_dbs(); +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Settings.pm index 48d1dd4..87fe86e 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Settings.pm @@ -45,6 +45,7 @@ our @EXPORT_OK = qw( $output_path $input_path + $csv_dir $defaultsettings $defaultconfig @@ -59,6 +60,8 @@ our @EXPORT_OK = qw( $skip_errors $force + get_export_filename + $registrations_export_filename_format ); our $defaultconfig = 'config.cfg'; @@ -66,6 +69,7 @@ our $defaultsettings = 'settings.cfg'; our $output_path = $working_path . 'output/'; our $input_path = $working_path . 'input/'; +our $csv_dir = 'location'; our $usernames_filename = undef; our $usernames_rownum_start = 0; @@ -79,6 +83,8 @@ our $force = 0; our $sqlite_db_file = 'sqlite'; +our $registrations_export_filename_format = undef; + sub update_settings { my ($data,$configfile) = @_; @@ -92,6 +98,7 @@ sub update_settings { $result &= _prepare_working_paths(1); + $csv_dir = $data->{csv_dir} if exists $data->{csv_dir}; $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; $load_registrations_multithreading = $data->{load_registrations_multithreading} if exists $data->{load_registrations_multithreading}; @@ -104,6 +111,9 @@ sub update_settings { $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; $force = $data->{force} if exists $data->{force}; + $registrations_export_filename_format = $data->{registrations_export_filename} if exists $data->{registrations_export_filename}; + get_export_filename($data->{registrations_export_filename},$configfile); + return $result; } @@ -144,4 +154,26 @@ sub _get_import_filename { return $import_filename; } +sub get_export_filename { + my ($filename_format,$configfile) = @_; + my $export_filename; + my $export_format; + if ($filename_format) { + $export_filename = $output_path . sprintf($filename_format,timestampdigits(),threadid()); + if (-e $export_filename and (unlink $export_filename) == 0) { + filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); + $export_filename = undef; + } + my ($name,$path,$suffix) = fileparse($export_filename,".db",".csv"); + if ($suffix eq '.db') { + $export_format = 'sqlite'; + } elsif ($suffix eq '.csv') { + $export_format = 'csv'; + } else { + configurationerror($configfile,"$filename_format: either .db or .csv export file format required"); + } + } + return ($export_filename,$export_format); +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl index c4adb00..930a653 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl @@ -21,6 +21,9 @@ use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw( $skip_errors $force + + get_export_filename + $registrations_export_filename_format ); use NGCP::BulkProcessor::Logging qw( @@ -56,6 +59,8 @@ use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location qw(); use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool qw( + get_sqlite_db + get_csv_db destroy_all_dbs ); use NGCP::BulkProcessor::ConnectorPool qw( @@ -63,7 +68,8 @@ use NGCP::BulkProcessor::ConnectorPool qw( ); use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Process qw( - load_registrations + load_registrations_file + load_registrations_all ); scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet @@ -188,7 +194,11 @@ sub load_registrations_task { my ($messages) = @_; my ($result,$warning_count) = (0,0); eval { - ($result,$warning_count) = load_registrations($file); + if ($file) { + ($result,$warning_count) = load_registrations_file($file); + } else { + ($result,$warning_count) = load_registrations_all(); + } }; #print $@; my $err = $@; @@ -208,6 +218,15 @@ sub load_registrations_task { $NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::deleted_delta ); $stats .= "\n removed: $deleted_count rows"; + my ($export_filename,$export_format) = get_export_filename($registrations_export_filename_format); + if ('sqlite' eq $export_format) { + &get_sqlite_db()->copydbfile($export_filename); + } elsif ('csv' eq $export_format) { + NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::copy_table(\&get_csv_db); + &get_csv_db()->copytablefile(NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::gettablename(),$export_filename); + } else { + push(@$messages,'invalid extension for output filename $export_filename'); + } }; if ($err or !$result) { push(@$messages,"loading registrations INCOMPLETE$stats"); diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.cfg index 1496ca3..aa312db 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.cfg @@ -7,4 +7,6 @@ load_registrations_multithreading = 1 usernames_filename = usernames_rownum_start = 1 #ignore_location_unique = 0 -location_single_row_txn = 0 \ No newline at end of file +location_single_row_txn = 0 + +registrations_export_filename=registrations_%s.csv \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.debug.cfg index defe69a..cd2a08d 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.debug.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.debug.cfg @@ -7,4 +7,6 @@ load_registrations_multithreading = 1 usernames_filename = /home/rkrenn/temp/massive/input/test.csv usernames_rownum_start = 1 #ignore_location_unique = 0 -location_single_row_txn = 0 \ No newline at end of file +location_single_row_txn = 0 + +registrations_export_filename=registrations_%s.csv \ No newline at end of file