diff --git a/lib/NGCP/BulkProcessor/FileProcessor.pm b/lib/NGCP/BulkProcessor/FileProcessor.pm index d9e4963..1c8bf19 100644 --- a/lib/NGCP/BulkProcessor/FileProcessor.pm +++ b/lib/NGCP/BulkProcessor/FileProcessor.pm @@ -154,7 +154,9 @@ sub process { } else { my $context = { instance => $self, - filename => $file,}; + filename => $file, + tid => $tid, + }; my $rowblock_result = 1; eval { @@ -237,10 +239,6 @@ sub process { } close(INPUTFILE); - if ('CODE' eq ref $uninit_process_context_code) { - &$uninit_process_context_code($context); - } - }; if ($@) { @@ -249,6 +247,11 @@ sub process { $errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED; } + eval { + if ('CODE' eq ref $uninit_process_context_code) { + &$uninit_process_context_code($context); + } + }; } if ($errorstate == $COMPLETED) { @@ -269,6 +272,7 @@ sub _reader { my $context = shift; my $tid = threadid(); + $context->{tid} = $tid; { lock $context->{errorstates}; $context->{errorstates}->{$tid} = $RUNNING; @@ -396,6 +400,7 @@ sub _process { my $rowblock_result = 1; my $tid = threadid(); + $context->{tid} = $tid; { lock $context->{errorstates}; $context->{errorstates}->{$tid} = $RUNNING; @@ -433,13 +438,16 @@ sub _process { sleep($thread_sleep_secs); #2015-01 } } + }; + my $err = $@; + filethreadingdebug($err ? '[' . $tid . '] processor thread error: ' . $err : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__)); + eval { if ('CODE' eq ref $context->{uninit_process_context_code}) { &{$context->{uninit_process_context_code}}($context); } }; - filethreadingdebug($@ ? '[' . $tid . '] processor thread error: ' . $@ : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__)); lock $context->{errorstates}; - if ($@) { + if ($err) { $context->{errorstates}->{$tid} = $ERROR; } else { $context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED; diff --git a/lib/NGCP/BulkProcessor/Globals.pm b/lib/NGCP/BulkProcessor/Globals.pm index e4a4f85..5dabdc5 100644 --- a/lib/NGCP/BulkProcessor/Globals.pm +++ b/lib/NGCP/BulkProcessor/Globals.pm @@ -39,7 +39,8 @@ our @EXPORT_OK = qw( $application_path $executable_path $working_path - update_working_path + + create_path $appstartsecs $enablemultithreading $root_threadid @@ -67,14 +68,13 @@ our @EXPORT_OK = qw( $ngcprestapi_realm $csv_path - $input_path $local_db_path $emailenable $erroremailrecipient $warnemailrecipient $completionemailrecipient - $successemailrecipient + $doneemailrecipient $mailfile_path $ismsexchangeserver @@ -94,6 +94,7 @@ our @EXPORT_OK = qw( $defaultconfig update_mainconfig + create_path $chmod_umask @@ -157,7 +158,7 @@ our $ngcprestapi_realm = 'api_admin_http'; our $working_path = tempdir(CLEANUP => 0) . '/'; #'/var/sipwise/'; -our $input_path = $working_path . 'input/'; +#our $input_path = $working_path . 'input/'; # csv our $csv_path = $working_path . 'csv/'; @@ -182,7 +183,7 @@ our $local_db_path = $working_path . 'db/'; #mkdir $local_db_path; - +#our $rollback_path = $working_path . 'rollback/'; # email setup @@ -196,7 +197,7 @@ our $writefiles = 0; # save emails our $erroremailrecipient = ''; #'rkrenn@sipwise.com'; our $warnemailrecipient = ''; #'rkrenn@sipwise.com'; our $completionemailrecipient = ''; -our $successemailrecipient = ''; +our $doneemailrecipient = ''; our $mailprog = "/usr/sbin/sendmail"; # linux only our $mailtype = 1; #0 .. mailprog, 1 .. socket, 2 .. Net::SMTP @@ -230,7 +231,8 @@ sub update_mainconfig { my ($data,$configfile, $split_tuplecode, - $format_number, + $format_numbercode, + $parse_regexpcode, $configurationinfocode, $configurationwarncode, $configurationerrorcode, @@ -239,6 +241,8 @@ sub update_mainconfig { if (defined $data) { + my $result = 1; + # databases - dsp $accounting_host = $data->{accounting_host} if exists $data->{accounting_host}; $accounting_port = $data->{accounting_port} if exists $data->{accounting_port}; @@ -269,7 +273,11 @@ sub update_mainconfig { @jobservers = ($data->{jobservers}) if exists $data->{jobservers}; } - if (defined $format_number and ref $format_number eq 'CODE') { + if (defined $format_numbercode and ref $format_numbercode eq 'CODE') { + + } + + if (defined $parse_regexpcode and ref $parse_regexpcode eq 'CODE') { } @@ -278,7 +286,7 @@ sub update_mainconfig { $erroremailrecipient = $data->{erroremailrecipient} if exists $data->{erroremailrecipient}; $warnemailrecipient = $data->{warnemailrecipient} if exists $data->{warnemailrecipient}; $completionemailrecipient = $data->{completionemailrecipient} if exists $data->{completionemailrecipient}; - $successemailrecipient = $data->{successemailrecipient} if exists $data->{successemailrecipient}; + $doneemailrecipient = $data->{doneemailrecipient} if exists $data->{doneemailrecipient}; $ismsexchangeserver = $data->{ismsexchangeserver} if exists $data->{ismsexchangeserver}; $smtp_server = $data->{smtp_server} if exists $data->{smtp_server}; @@ -289,143 +297,77 @@ sub update_mainconfig { $screenloglevel = $data->{screenloglevel} if exists $data->{screenloglevel}; $emailloglevel = $data->{emailloglevel} if exists $data->{emailloglevel}; - my $new_working_path = (exists $data->{working_path} ? $data->{working_path} : $working_path); + if (exists $data->{working_path}) { + $result &= _prepare_working_paths($data->{working_path},1,$fileerrorcode,$configlogger); + } else { + $result &= _prepare_working_paths($working_path,1,$fileerrorcode,$configlogger); + } - return update_working_path($new_working_path,1,$fileerrorcode,$configlogger); + return $result; } return 0; } -sub update_working_path { +sub _prepare_working_paths { my ($new_working_path,$create,$fileerrorcode,$logger) = @_; my $result = 1; - if (defined $new_working_path and length($new_working_path) > 0) { - $new_working_path = fixdirpath($new_working_path); - if (-d $new_working_path) { - $working_path = $new_working_path; - } else { - if ($create) { - if (makepath($new_working_path,$fileerrorcode,$logger)) { - $working_path = $new_working_path; - } else { - $result = 0; - } - } else { - $result = 0; - if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') { - &$fileerrorcode("working path '$new_working_path' does not exist",$logger); - } - } - } + my $path_result; + + ($path_result,$working_path) = create_path($new_working_path,$working_path,$create,$fileerrorcode,$logger); + $result &= $path_result; + ($path_result,$csv_path) = create_path($working_path . 'csv',$csv_path,$create,$fileerrorcode,$logger); + $result &= $path_result; + #($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,$fileerrorcode,$logger); + #$result &= $path_result; + ($path_result,$logfile_path) = create_path($working_path . 'log',$logfile_path,$create,$fileerrorcode,$logger); + $result &= $path_result; + ($path_result,$local_db_path) = create_path($working_path . 'db',$local_db_path,$create,$fileerrorcode,$logger); + $result &= $path_result; + ($path_result,$mailfile_path) = create_path($working_path . 'mails',$local_db_path,$create,$fileerrorcode,$logger); + $result &= $path_result; + #($path_result,$rollback_path) = create_path($working_path . 'rollback',$rollback_path,$create,$fileerrorcode,$logger); + #$result &= $path_result; - my $new_csv_path = $working_path . 'csv/'; - if (-d $new_csv_path) { - $csv_path = $new_csv_path; - } else { - if ($create) { - if (makepath($new_csv_path,$fileerrorcode,$logger)) { - $csv_path = $new_csv_path; - } else { - $result = 0; - } - } else { - $result = 0; - if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') { - &$fileerrorcode("csv path '$new_csv_path' does not exist",$logger); - } - } - } + return $result; - my $new_input_path = $working_path . 'input/'; - if (-d $new_input_path) { - $input_path = $new_input_path; - } else { - if ($create) { - if (makepath($new_input_path,$fileerrorcode,$logger)) { - $input_path = $new_input_path; - } else { - $result = 0; - } - } else { - $result = 0; - if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') { - &$fileerrorcode("input path '$new_input_path' does not exist",$logger); - } - } - } +} - my $new_logfile_path = $working_path . 'log/'; - if (-d $new_logfile_path) { - $logfile_path = $new_logfile_path; - } else { - if ($create) { - if (makepath($new_logfile_path,$fileerrorcode,$logger)) { - $logfile_path = $new_logfile_path; - } else { - $result = 0; - } - } else { - $result = 0; - if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') { - &$fileerrorcode("logfile path '$new_logfile_path' does not exist",$logger); - } - } - } +sub get_applicationpath { - my $new_local_db_path = $working_path . 'db/'; - if (-d $new_local_db_path) { - $local_db_path = $new_local_db_path; - } else { - if ($create) { - if (makepath($new_local_db_path,$fileerrorcode,$logger)) { - $local_db_path = $new_local_db_path; - } else { - $result = 0; - } - } else { - $result = 0; - if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') { - &$fileerrorcode("local db path '$new_local_db_path' does not exist",$logger); - } - } - } + return dirname(abs_path(__FILE__)) . '/'; - my $new_mailfile_path = $working_path . 'mails/'; - if (-d $new_mailfile_path) { - $mailfile_path = $new_mailfile_path; +} + +sub create_path { + my ($new_value,$old_value,$create,$fileerrorcode,$logger) = @_; + my $path = $old_value; + my $result = 0; + if (defined $new_value and length($new_value) > 0) { + $new_value = fixdirpath($new_value); + if (-d $new_value) { + $path = $new_value; + $result = 1; } else { if ($create) { - if (makepath($new_mailfile_path,$fileerrorcode,$logger)) { - $mailfile_path = $new_mailfile_path; - } else { - $result = 0; + if (makepath($new_value,$fileerrorcode,$logger)) { + $path = $new_value; + $result = 1; } } else { - $result = 0; if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') { - &$fileerrorcode("mailfile path '$new_mailfile_path' does not exist",$logger); + &$fileerrorcode("path '$new_value' does not exist",$logger); } } } - } else { - $result = 0; if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') { - &$fileerrorcode("empty working path",$logger); + &$fileerrorcode("empty path",$logger); } } - #print "working path result: " . $result; - return $result; - -} - -sub get_applicationpath { - - return dirname(abs_path(__FILE__)) . '/'; - + return ($result,$path); } 1; diff --git a/lib/NGCP/BulkProcessor/LoadConfig.pm b/lib/NGCP/BulkProcessor/LoadConfig.pm index a7c15cf..ff37423 100644 --- a/lib/NGCP/BulkProcessor/LoadConfig.pm +++ b/lib/NGCP/BulkProcessor/LoadConfig.pm @@ -34,6 +34,8 @@ require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( load_config + parse_regexp + split_tuple $SIMPLE_CONFIG_TYPE $YAML_CONFIG_TYPE ); @@ -80,20 +82,14 @@ sub load_config { } if ($is_settings) { - my $result = &$process_code($data,$configfile, - \&split_tuple, - \&format_number, - \&configurationinfo, - \&configurationwarn, - \&configurationerror, - \&fileerror, - getlogger(__PACKAGE__)); + my $result = &$process_code($data,$configfile); configurationinfo('settings file ' . $configfile . ' loaded',getlogger(__PACKAGE__)); return $result; } else { my $result = update_mainconfig($data,$configfile, \&split_tuple, \&format_number, + \&parse_regexp, \&configurationinfo, \&configurationwarn, \&configurationerror, @@ -108,9 +104,8 @@ sub load_config { sub _splashinfo { configurationinfo($system_name . ' ' . $system_version . ' (' . $system_instance_label . ') [' . $local_fqdn . ']',getlogger(__PACKAGE__)); - configurationinfo('application path ' . $application_path,getlogger(__PACKAGE__)); - configurationinfo('working path ' . $working_path,getlogger(__PACKAGE__)); - #configurationinfo('executable path ' . $executable_path,getlogger(__PACKAGE__)); + configurationinfo('application path: ' . $application_path,getlogger(__PACKAGE__)); + configurationinfo('working path: ' . $working_path,getlogger(__PACKAGE__)); configurationinfo($cpucount . ' cpu(s), multithreading ' . ($enablemultithreading ? 'enabled' : 'disabled'),getlogger(__PACKAGE__)); } @@ -139,6 +134,24 @@ sub split_tuple { } +sub parse_regexp { + + my ($token,$file) = @_; + my $regexp = undef; + my $result = 1; + if (defined $token and length($token) > 0) { + eval { + $regexp = qr/$token/; + }; + if ($@ or !defined $regexp) { + configurationerror($file,'invalid pattern: ' . $@,getlogger(__PACKAGE__)); + $result = 0; + } + } + return ($result,$regexp); + +} + #sub parse_float { # # my ($value) = @_; diff --git a/lib/NGCP/BulkProcessor/LogError.pm b/lib/NGCP/BulkProcessor/LogError.pm index 3d5bbc1..a0ff4d1 100644 --- a/lib/NGCP/BulkProcessor/LogError.pm +++ b/lib/NGCP/BulkProcessor/LogError.pm @@ -7,7 +7,7 @@ use NGCP::BulkProcessor::Globals qw( $system_version $erroremailrecipient $warnemailrecipient - $successemailrecipient + $doneemailrecipient $completionemailrecipient $appstartsecs $root_threadid @@ -84,7 +84,7 @@ our @EXPORT_OK = qw( dbclustererror dbclusterwarn - success + done completion scripterror @@ -93,15 +93,15 @@ our @EXPORT_OK = qw( my $erroremailsubject = 'error: module '; my $warnemailsubject = 'warning: module '; -my $successmailsubject = 'success: module '; +my $donemailsubject = 'done: module '; my $completionmailsubject = 'completed: module '; -sub success { +sub done { my ($message,$attachments,$logger) = @_; if (length($message) == 0) { - $message = 'success'; + $message = 'done'; } my $appexitsecs = Time::HiRes::time(); @@ -113,16 +113,16 @@ sub success { } if (threadid() == $root_threadid) { - if (length($successemailrecipient) > 0 and defined $logger) { + if (length($doneemailrecipient) > 0 and defined $logger) { my $email = { - to => $successemailrecipient, + to => $doneemailrecipient, #cc => 'rkrenn@sipwise.com', #bcc => '', #return_path => undef, priority => $lowpriority, #sender_name => 'Rene K.', #from => 'rkrenn@sipwise.com', - subject => $successmailsubject . $logger->{category}, + subject => $donemailsubject . $logger->{category}, body => getscriptpath() . ":\n\n" . wrap_mailbody($message) . "\n\n" . $signature, guid => create_guid() }; @@ -290,7 +290,7 @@ sub dbwarn { } #die(); - warning($message, $logger, 1); + warning($message, $logger); } @@ -318,7 +318,7 @@ sub restwarn { } #die(); - warning($message, $logger, 1); + warning($message, $logger); } @@ -385,7 +385,7 @@ sub dbclusterwarn { } #die(); - warning($message, $logger, 1); + warning($message, $logger); } @@ -482,7 +482,7 @@ sub fileerror { sub processzerofilesize { my ($file,$logger) = @_; - my $message = basename($file) . ' has 0 bytes'; + my $message = basename($file) . ' ' . (-e $file ? 'has 0 bytes' : 'not found'); if (defined $logger) { $logger->error($message); } @@ -560,7 +560,7 @@ sub filewarn { } #die(); - warning($message, $logger, 1); + warning($message, $logger); } @@ -571,7 +571,7 @@ sub xls2csvwarn { $logger->warn($message); } - warning($message, $logger, 1); + warning($message, $logger); } sub webarchivexls2csvwarn { @@ -581,7 +581,7 @@ sub webarchivexls2csvwarn { $logger->warn($message); } - warning($message, $logger, 1); + warning($message, $logger); } #sub parameterdefinedtwice { @@ -590,7 +590,7 @@ sub webarchivexls2csvwarn { # if (defined $logger) { # $logger->warn($message); # } -# warning($message, $logger, 1); +# warning($message, $logger); #} sub emailwarn { @@ -669,7 +669,7 @@ sub servicewarn { } #die(); - warning($message, $logger, 1); + warning($message, $logger); } diff --git a/lib/NGCP/BulkProcessor/Logging.pm b/lib/NGCP/BulkProcessor/Logging.pm index d7d656c..39a3ba2 100644 --- a/lib/NGCP/BulkProcessor/Logging.pm +++ b/lib/NGCP/BulkProcessor/Logging.pm @@ -79,6 +79,7 @@ our @EXPORT_OK = qw( fileprocessingdone fetching_lines processing_lines + processing_info tablefixed servicedebug @@ -624,6 +625,15 @@ sub processing_lines { } +sub processing_info { + + my ($tid, $message, $logger) = @_; + if (defined $logger) { + $logger->info(($enablemultithreading ? '[' . $tid . '] ' : '') . $message); + } + +} + #sub mainconfigurationloaded { # # my ($configfile,$logger) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOption.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOption.pm index 6e8f04a..7868a77 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOption.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOption.pm @@ -22,6 +22,8 @@ use NGCP::BulkProcessor::SqlRecord qw( insert_stmt ); +use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem qw(); + require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); our @EXPORT_OK = qw( @@ -30,10 +32,8 @@ our @EXPORT_OK = qw( check_table getinsertstatement - test_table_bycolumn1 - test_table_local_select - test_table_source_select - test_table_source_select_temptable + findby_subscribernumber + countby_subscribernumber ); my $tablename = 'feature_option'; @@ -76,6 +76,43 @@ sub create_table { } +sub findby_subscribernumber { + + my ($subscribernumber,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('subscribernumber') . ' = ?' + ,$subscribernumber); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub countby_subscribernumber { + + my ($subscribernumber) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + if (defined $subscribernumber) { + $stmt .= ' WHERE ' . $db->columnidentifier('subscribernumber') . ' = ?'; + push(@params,$subscribernumber); + } + + return $db->db_get_value($stmt,@params); + +} sub buildrecords_fromrows { @@ -89,6 +126,13 @@ sub buildrecords_fromrows { $record = __PACKAGE__->new($row); # transformations go here ... + if ($load_recursive) { + $record->{_optionsetitems} = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::findby_subscribernumber_option( + $record->{subscribernumber}, + $record->{option}, + $load_recursive + ); + } push @records,$record; } @@ -100,8 +144,9 @@ sub buildrecords_fromrows { sub getinsertstatement { + my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename); + return insert_stmt($get_db,$tablename,$insert_ignore); } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOptionSet.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOptionSetItem.pm similarity index 62% rename from lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOptionSet.pm rename to lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOptionSetItem.pm index 8c87891..34385b0 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOptionSet.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/FeatureOptionSetItem.pm @@ -1,4 +1,4 @@ -package NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet; +package NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem; use strict; ## no critic @@ -30,13 +30,11 @@ our @EXPORT_OK = qw( check_table getinsertstatement - test_table_bycolumn1 - test_table_local_select - test_table_source_select - test_table_source_select_temptable + findby_subscribernumber_option + countby_subscribernumber_option ); -my $tablename = 'feature_option_set'; +my $tablename = 'feature_option_set_item'; my $get_db = \&get_import_db; #my $get_tablename = \&import_db_tableidentifier; @@ -77,6 +75,48 @@ sub create_table { } +sub findby_subscribernumber_option { + + my ($subscribernumber,$option,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('subscribernumber') . ' = ? ' . + ' AND ' . $db->columnidentifier('option') . ' = ?' + ,$subscribernumber,$option); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub countby_subscribernumber_option { + + my ($subscribernumber,$option) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + if (defined $subscribernumber) { + $stmt .= ' WHERE ' . $db->columnidentifier('subscribernumber') . ' = ?'; + push(@params,$subscribernumber); + if (defined $option) { + $stmt .= ' AND ' . $db->columnidentifier('option') . ' = ?'; + push(@params,$option); + } + } + + return $db->db_get_value($stmt,@params); + +} sub buildrecords_fromrows { @@ -101,8 +141,9 @@ sub buildrecords_fromrows { sub getinsertstatement { + my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename); + return insert_stmt($get_db,$tablename,$insert_ignore); } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/Lnp.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/Lnp.pm new file mode 100644 index 0000000..ffdde5c --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/Lnp.pm @@ -0,0 +1,179 @@ +package NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp; +use strict; + +## no critic + +#use File::Basename; +#use Cwd; +#use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../'); + +use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw( + get_import_db + +); +#import_db_tableidentifier + +use NGCP::BulkProcessor::SqlRecord qw( + registertableinfo + create_targettable + checktableinfo + copy_row + + insert_stmt +); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + + findby_lrncode_portednumber + countby_lrncode_portednumber + count_lrncodes +); + +my $tablename = 'lnp'; +my $get_db = \&get_import_db; +#my $get_tablename = \&import_db_tableidentifier; + + +my $expected_fieldnames = [ + 'ported_number', + 'type', + 'lrn_code', +]; + +my $primarykey_fieldnames = [ 'lrn_code', 'ported_number' ]; + +my $indexes = {}; + +my $fixtable_statements = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($get_db, + $tablename, + $expected_fieldnames,$indexes); + + bless($self,$class); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef); + +} + +sub findby_lrncode_portednumber { + + my ($lrncode,$portednumber,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('lrn_code') . ' = ? ' . + ' AND ' . $db->columnidentifier('ported_number') . ' = ?' + ,$lrncode,$portednumber); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub countby_lrncode_portednumber { + + my ($lrncode,$portednumber) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + if (defined $lrncode) { + $stmt .= ' WHERE ' . $db->columnidentifier('lrn_code') . ' = ?'; + push(@params,$lrncode); + if (defined $portednumber) { + $stmt .= ' AND ' . $db->columnidentifier('ported_number') . ' = ?'; + push(@params,$portednumber); + } + } + + return $db->db_get_value($stmt,@params); + +} + +sub count_lrncodes { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return $db->db_get_value('SELECT COUNT(DISTINCT ' . + $db->columnidentifier('lrn_code') . ') FROM ' . $table); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,$tablename,$insert_ignore); + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + $tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/Subscriber.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/Subscriber.pm index d979a8d..346905b 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/Subscriber.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Dao/Subscriber.pm @@ -30,10 +30,8 @@ our @EXPORT_OK = qw( check_table getinsertstatement - test_table_bycolumn1 - test_table_local_select - test_table_source_select - test_table_source_select_temptable + findby_subscribernumber + countby_subscribernumber ); my $tablename = 'subscriber'; @@ -86,6 +84,50 @@ sub create_table { } +sub findby_subscribernumber { + + my ($subscribernumber,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless defined $subscribernumber; + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('country_code') . ' = ?' . + ' AND ' . $db->columnidentifier('area_code') . ' = ?' . + ' AND ' . $db->columnidentifier('dial_number') . ' = ?' + ,split_subscribernumber($subscribernumber)); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub countby_subscribernumber { + + my ($subscribernumber) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + if (defined $subscribernumber) { + $stmt .= ' WHERE ' . + $db->columnidentifier('country_code') . ' = ?' . + ' AND ' . $db->columnidentifier('area_code') . ' = ?' . + ' AND ' . $db->columnidentifier('dial_number') . ' = ?'; + push(@params,split_subscribernumber($subscribernumber)); + } + + return $db->db_get_value($stmt,@params); + +} sub buildrecords_fromrows { @@ -99,6 +141,12 @@ sub buildrecords_fromrows { $record = __PACKAGE__->new($row); # transformations go here ... + if ($load_recursive) { + $record->{_features} = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::findby_subscribernumber( + $record->subscribernumber(), + $load_recursive + ); + } push @records,$record; } @@ -110,8 +158,9 @@ sub buildrecords_fromrows { sub getinsertstatement { + my ($insert_ignore) = @_; check_table(); - return insert_stmt($get_db,$tablename); + return insert_stmt($get_db,$tablename,$insert_ignore); } @@ -130,4 +179,17 @@ sub check_table { } +sub subscribernumber { + my $self = shift; + return $self->{dial_number}; #$self->{country_code} . $self->{dial_number}; +} + +sub split_subscribernumber { + my ($subscribernumber) = @_; + my $country_code = substr($subscribernumber,0,3); + my $area_code = 'None'; + my $dial_number = $subscribernumber; #substr($subscribernumber,3); + return ($country_code,$area_code,$dial_number); +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/FeaturesDefineFile.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/FeaturesDefineFile.pm index 96b1f46..1addbb9 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/FeaturesDefineFile.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/FeaturesDefineFile.pm @@ -81,6 +81,7 @@ sub extractfields { my ($context,$line_ref) = @_; return undef if length($$line_ref) == 0; + #return undef if $$line_ref =~ /^#/; if ($context->{instance}->{parselines}) { my $row = undef; @@ -101,4 +102,16 @@ sub extractfields { } +sub stoponparseerrors { + my $self = shift; + $self->{stoponparseerrors} = shift if @_; + return $self->{stoponparseerrors}; +} + +sub parselines { + my $self = shift; + $self->{parselines} = shift if @_; + return $self->{parselines}; +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/LnpDefineFile.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/LnpDefineFile.pm new file mode 100644 index 0000000..812d69e --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/LnpDefineFile.pm @@ -0,0 +1,68 @@ +package NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::LnpDefineFile; +use strict; + +## no critic + +#use File::Basename; +#use Cwd; +#use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../'); + +use NGCP::BulkProcessor::Logging qw( + getlogger +); +use NGCP::BulkProcessor::LogError qw( + fileprocessingerror + fileprocessingwarn +); + +use NGCP::BulkProcessor::FileProcessor; + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor); +our @EXPORT_OK = qw(); + +my $lineseparator = '\\n'; +my $fieldseparator = " +"; +my $encoding = 'UTF-8'; + +my $buffersize = 100 * 1024; +my $threadqueuelength = 10; +my $default_numofthreads = 3; +#my $multithreading = 0; +my $blocksize = 500; + +sub new { + + my $class = shift; + + my $self = NGCP::BulkProcessor::FileProcessor->new(@_); + + $self->{numofthreads} = shift // $default_numofthreads; + $self->{line_separator} = $lineseparator; + $self->{field_separator} = $fieldseparator; + $self->{encoding} = $encoding; + $self->{buffersize} = $buffersize; + $self->{threadqueuelength} = $threadqueuelength; + #$self->{multithreading} = $multithreading; + $self->{blocksize} = $blocksize; + + bless($self,$class); + + #restdebug($self,__PACKAGE__ . ' file processor created',getlogger(__PACKAGE__)); + + return $self; + +} + +sub extractfields { + my ($context,$line_ref) = @_; + my $separator = $context->{instance}->{field_separator}; + $$line_ref =~ s/^ +//; + $$line_ref =~ s/ +$//; + return undef if length($$line_ref) == 0; + return undef if $$line_ref =~ /^#/; + my @fields = split(/$separator/,$$line_ref,-1); + return \@fields; +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/SubscriberDefineFile.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/SubscriberDefineFile.pm index 61323f1..7dc99af 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/SubscriberDefineFile.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/FileProcessors/SubscriberDefineFile.pm @@ -29,7 +29,7 @@ my $buffersize = 100 * 1024; my $threadqueuelength = 10; my $default_numofthreads = 3; #my $multithreading = 0; -my $blocksize = 100; +my $blocksize = 500; sub new { @@ -59,6 +59,8 @@ sub extractfields { my $separator = $context->{instance}->{field_separator}; $$line_ref =~ s/^ +//; $$line_ref =~ s/ +$//; + return undef if length($$line_ref) == 0; + return undef if $$line_ref =~ /^#/; my @fields = split(/$separator/,$$line_ref,-1); return \@fields; } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Import.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Import.pm index 80d4259..c16e8d4 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Import.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Import.pm @@ -14,11 +14,19 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw( $import_multithreading $features_define_import_numofthreads $skip_duplicate_setoptionitems + $ignore_options_unique + $ignore_setoptionitems_unique $subscriber_define_import_numofthreads + $subscribernumer_exclude_pattern + $subscribernumer_exclude_exception_pattern + $ignore_subscriber_unique + $lnp_define_import_numofthreads + $ignore_lnp_unique $dry ); use NGCP::BulkProcessor::Logging qw ( getlogger + processing_info ); use NGCP::BulkProcessor::LogError qw( fileprocessingwarn @@ -28,6 +36,7 @@ use NGCP::BulkProcessor::LogError qw( use NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::FeaturesDefineFile qw(); use NGCP::BulkProcessor::Projects::Migration::IPGallery::FeaturesDefineParser qw(); use NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::SubscriberDefineFile qw(); +use NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::LnpDefineFile qw(); use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw( get_import_db @@ -35,8 +44,9 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw ); use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption qw(); -use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet qw(); +use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem qw(); use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp qw(); use NGCP::BulkProcessor::Array qw(removeduplicates); @@ -45,43 +55,52 @@ our @ISA = qw(Exporter); our @EXPORT_OK = qw( import_features_define import_subscriber_define + import_lnp_define ); sub import_features_define { my ($file) = @_; + # create tables: my $result = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::create_table(1); - $result &= NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet::create_table(1); - destroy_dbs(); #close all db connections before forking.. + $result &= NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::create_table(1); + + # checks, e.g. other table must be present: + # ..none.. + + # prepare parse: my $importer = NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::FeaturesDefineFile->new($features_define_import_numofthreads); - $importer->{stoponparseerrors} = !$dry; + $importer->stoponparseerrors(!$dry); + + # launch: + destroy_dbs(); #close all db connections before forking.. return $result && $importer->process($file,sub { my ($context,$rows,$row_offset) = @_; my $rownum = $row_offset; my @featureoption_rows = (); - my @featureoptionset_rows = (); + my @featureoptionsetitem_rows = (); foreach my $line (@$rows) { + $rownum++; my $row = undef; - if (not $importer->{parselines}) { + if (not $importer->parselines()) { eval { $row = NGCP::BulkProcessor::Projects::Migration::IPGallery::FeaturesDefineParser::parse(\$line,$context->{grammar}); }; if ($@) { - if ($importer->{stoponparseerrors}) { - fileprocessingerror($context->{filename},'record ' . ($rownum + 1) . ' - ' . $@,getlogger(__PACKAGE__)); + if ($importer->stoponparseerrors()) { + fileprocessingerror($context->{filename},'record ' . $rownum . ' - ' . $@,getlogger(__PACKAGE__)); } else { - fileprocessingwarn($context->{filename},'record ' . ($rownum + 1) . ' - ' . $@,getlogger(__PACKAGE__)); + fileprocessingwarn($context->{filename},'record ' . $rownum . ' - ' . $@,getlogger(__PACKAGE__)); } } } next unless defined $row; - $rownum++; foreach my $subscriber_number (keys %$row) { foreach my $option (@{$row->{$subscriber_number}}) { if ('HASH' eq ref $option) { foreach my $setoption (keys %$option) { foreach my $setoptionitem (@{$skip_duplicate_setoptionitems ? removeduplicates($option->{$setoption}) : $option->{$setoption}}) { - push(@featureoptionset_rows,[ $subscriber_number, $setoption, $setoptionitem ]); + push(@featureoptionsetitem_rows,[ $subscriber_number, $setoption, $setoptionitem ]); } push(@featureoption_rows,[ $subscriber_number, $setoption ]); } @@ -93,27 +112,23 @@ sub import_features_define { } if ((scalar @featureoption_rows) > 0) { - $context->{db}->db_do_begin( - NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::getinsertstatement(), - NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::gettablename(), - #lock - $import_multithreading - ); - $context->{db}->db_do_rowblock(\@featureoption_rows); - $context->{db}->db_finish(); + if ($dry) { + eval { _insert_featureoption_rows($context,\@featureoption_rows); }; + } else { + _insert_featureoption_rows($context,\@featureoption_rows); + } } - if ((scalar @featureoptionset_rows) > 0) { - $context->{db}->db_do_begin( - NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet::getinsertstatement(), - NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet::gettablename(), - #lock - ); - $context->{db}->db_do_rowblock(\@featureoptionset_rows); - $context->{db}->db_finish(); + if ((scalar @featureoptionsetitem_rows) > 0) { + if ($dry) { + eval { _insert_featureoptionsetitem_rows($context,\@featureoptionsetitem_rows); }; + } else { + _insert_featureoptionsetitem_rows($context,\@featureoptionsetitem_rows); + } } return 1; }, sub { my ($context)= @_; - if (not $importer->{parselines}) { + if (not $importer->parselines()) { eval { $context->{grammar} = NGCP::BulkProcessor::Projects::Migration::IPGallery::FeaturesDefineParser::create_grammar(); }; @@ -130,23 +145,147 @@ sub import_features_define { } +sub _insert_featureoption_rows { + my ($context,$featureoption_rows) = @_; + $context->{db}->db_do_begin( + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::getinsertstatement($ignore_options_unique), + #NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::gettablename(), + #lock - $import_multithreading + ); + $context->{db}->db_do_rowblock($featureoption_rows); + $context->{db}->db_finish(); +} + +sub _insert_featureoptionsetitem_rows { + my ($context,$featureoptionsetitem_rows) = @_; + $context->{db}->db_do_begin( + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::getinsertstatement($ignore_setoptionitems_unique), + #NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::gettablename(), + #lock + ); + $context->{db}->db_do_rowblock($featureoptionsetitem_rows); + $context->{db}->db_finish(); +} + sub import_subscriber_define { my ($file) = @_; + my $result = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::create_table(1); + + $result &= _import_subscriber_define_checks($file); + my $importer = NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::SubscriberDefineFile->new($subscriber_define_import_numofthreads); + destroy_dbs(); #close all db connections before forking.. return $result && $importer->process($file,sub { my ($context,$rows,$row_offset) = @_; + my $rownum = $row_offset; + my @subscriber_rows = (); + foreach my $row (@$rows) { + $rownum++; + my $record = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber->new($row); + next if 'None' eq $record->{rgw_fqdn}; + if ($record->{dial_number} =~ $subscribernumer_exclude_pattern) { + if ($record->{dial_number} =~ $subscribernumer_exclude_exception_pattern) { + processing_info($context->{tid},'record ' . $rownum . ' - exclude exception pattern match: ' . $record->{dial_number},getlogger(__PACKAGE__)); + push(@subscriber_rows,$row) if _import_subscriber_define_referential_checks($context,$record,$rownum); + } else { + processing_info($context->{tid},'record ' . $rownum . ' - skipped, exclude pattern match: ' . $record->{dial_number},getlogger(__PACKAGE__)); + } + } else { + push(@subscriber_rows,$row) if _import_subscriber_define_referential_checks($context,$record,$rownum); + } + } if ((scalar @$rows) > 0) { - $context->{db}->db_do_begin( - NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::getinsertstatement(), - NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::gettablename(), - #lock - $import_multithreading - ); - $context->{db}->db_do_rowblock($rows); - $context->{db}->db_finish(); + if ($dry) { + eval { _insert_subscriber_rows($context,$rows); }; + } else { + _insert_subscriber_rows($context,$rows); + } + } + + return 1; + }, sub { + my ($context)= @_; + $context->{db} = &get_import_db(); # keep ref count low.. + }, sub { + my ($context)= @_; + undef $context->{db}; + destroy_dbs(); + }, $import_multithreading); + +} + +sub _import_subscriber_define_referential_checks { + my ($context,$record,$rownum) = @_; + my $result = 0; + if (NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::countby_subscribernumber($record->subscribernumber()) > 0) { + $result = 1; + } else { + if ($dry) { + fileprocessingwarn($context->{filename},'record ' . $rownum . ' - no features records for subscriber found: ' . $record->{dial_number},getlogger(__PACKAGE__)); + } else { + fileprocessingerror($context->{filename},'record ' . $rownum . ' - no features records for subscriber found: ' . $record->{dial_number},getlogger(__PACKAGE__)); + } + } + return $result; + +} + +sub _import_subscriber_define_checks { + my ($file) = @_; + my $result = 1; + my $optioncount = 0; + eval { + $optioncount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::countby_subscribernumber(); + }; + if ($@ or $optioncount == 0) { + fileprocessingerror($file,'please import subscriber features first',getlogger(__PACKAGE__)); + $result = 0; #even in dry mode.. + } + return $result; +} + +sub _insert_subscriber_rows { + my ($context,$subscriber_rows) = @_; + $context->{db}->db_do_begin( + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::getinsertstatement($ignore_subscriber_unique), + #NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::gettablename(), + #lock + ); + $context->{db}->db_do_rowblock($subscriber_rows); + $context->{db}->db_finish(); +} + +sub import_lnp_define { + + my ($file) = @_; + + my $result = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::create_table(1); + + my $importer = NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::LnpDefineFile->new($lnp_define_import_numofthreads); + + destroy_dbs(); #close all db connections before forking.. + return $result && $importer->process($file,sub { + my ($context,$rows,$row_offset) = @_; + my $rownum = $row_offset; + my @lnp_rows = (); + foreach my $row (@$rows) { + $rownum++; + next if $row->[2] eq 'In'; + $row->[3] = substr($row->[3],0,4); + shift @$row; #ignore first col + push(@lnp_rows,$row); + } + + if ((scalar @lnp_rows) > 0) { + if ($dry) { + eval { _insert_lnp_rows($context,\@lnp_rows); }; + } else { + _insert_lnp_rows($context,\@lnp_rows); + } } return 1; @@ -161,4 +300,15 @@ sub import_subscriber_define { } +sub _insert_lnp_rows { + my ($context,$lnp_rows) = @_; + $context->{db}->db_do_begin( + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::getinsertstatement($ignore_lnp_unique), + #NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::gettablename(), + #lock + ); + $context->{db}->db_do_rowblock($lnp_rows); + $context->{db}->db_finish(); +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm index 6b3e2a7..9cc1c52 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm @@ -8,105 +8,175 @@ use strict; #use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); use NGCP::BulkProcessor::Globals qw( - update_working_path - $input_path + $working_path $enablemultithreading $cpucount + create_path ); use NGCP::BulkProcessor::Logging qw( getlogger scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp ); +use NGCP::BulkProcessor::Utils qw(format_number); require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( update_settings + check_dry + + $input_path + $output_path + $rollback_path + $defaultsettings $defaultconfig - $features_define_filename - $features_define_import_numofthreads - $skip_duplicate_setoptionitems - $subscriber_define_filename - $subscriber_define_import_numofthreads $import_multithreading $run_id $dry $force $import_db_file - check_dry + + $features_define_filename + $features_define_import_numofthreads + $skip_duplicate_setoptionitems + $ignore_options_unique + $ignore_setoptionitems_unique + + $subscriber_define_filename + $subscriber_define_import_numofthreads + $subscribernumer_exclude_pattern + $subscribernumer_exclude_exception_pattern + $ignore_subscriber_unique + + $lnp_define_filename + $lnp_define_import_numofthreads + $ignore_lnp_unique + ); our $defaultconfig = 'config.cfg'; our $defaultsettings = 'settings.cfg'; -our $features_define_filename = undef; -our $subscriber_define_filename = undef; +our $input_path = $working_path . 'input/'; +our $output_path = $working_path . 'output/'; +our $rollback_path = $working_path . 'rollback/'; +our $force = 0; +our $dry = 0; +our $run_id = ''; +our $import_db_file = _get_import_db_file($run_id,'import'); our $import_multithreading = $enablemultithreading; -our $features_define_import_numofthreads = $cpucount; - -our $subscriber_define_import_numofthreads = $cpucount; +our $features_define_filename = undef; +our $features_define_import_numofthreads = $cpucount; our $skip_duplicate_setoptionitems = 0; +our $ignore_options_unique = 0; +our $ignore_setoptionitems_unique = 0; -our $force = 0; -our $dry = 1; -our $run_id = ''; +our $subscriber_define_filename = undef; +our $subscriber_define_import_numofthreads = $cpucount; +our $subscribernumer_exclude_pattern = undef; +our $subscribernumer_exclude_exception_pattern = undef; +our $ignore_subscriber_unique = 0; -our $import_db_file = ((defined $run_id and length($run_id) > 0) ? '_' : '') . 'import'; +our $lnp_define_filename = undef; +our $lnp_define_import_numofthreads = $cpucount; +our $ignore_lnp_unique = 1; sub update_settings { - my ($data,$configfile, - $split_tuplecode, - $format_number, - $configurationinfocode, - $configurationwarncode, - $configurationerrorcode, - $fileerrorcode, - $configlogger) = @_; + my ($data,$configfile) = @_; if (defined $data) { - #print "$configlogger narf"; + my $result = 1; + #&$configurationinfocode("testinfomessage",$configlogger); - $features_define_filename = $data->{features_define_filename} if exists $data->{features_define_filename}; - if (defined $features_define_filename and length($features_define_filename) > 0) { - $features_define_filename = $input_path . $features_define_filename unless -e $features_define_filename; - } - - $subscriber_define_filename = $data->{subscriber_define_filename} if exists $data->{subscriber_define_filename}; - if (defined $subscriber_define_filename and length($subscriber_define_filename) > 0) { - $subscriber_define_filename = $input_path . $subscriber_define_filename unless -e $subscriber_define_filename; - } + $result &= _prepare_working_paths(1); + $dry = $data->{dry} if exists $data->{dry}; + $import_db_file = _get_import_db_file($run_id,'import'); $import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading}; - #my $new_working_path = (exists $data->{working_path} ? $data->{working_path} : $working_path); - $features_define_import_numofthreads = $cpucount; -$features_define_import_numofthreads = $data->{features_define_import_numofthreads} if exists $data->{features_define_import_numofthreads}; - $features_define_import_numofthreads = $cpucount if $features_define_import_numofthreads > $cpucount; + $features_define_filename = _get_import_filename($features_define_filename,$data,'features_define_filename'); + $features_define_import_numofthreads =_get_import_numofthreads($cpucount,$data,'features_define_import_numofthreads'); - $subscriber_define_import_numofthreads = $cpucount; -$subscriber_define_import_numofthreads = $data->{subscriber_define_import_numofthreads} if exists $data->{subscriber_define_import_numofthreads}; - $subscriber_define_import_numofthreads = $cpucount if $subscriber_define_import_numofthreads > $cpucount; - #return update_working_path($new_working_path,1,$fileerrorcode,$configlogger); + $subscriber_define_filename = _get_import_filename($subscriber_define_filename,$data,'subscriber_define_filename'); + $subscriber_define_import_numofthreads = _get_import_numofthreads($cpucount,$data,'subscriber_define_import_numofthreads'); - $import_db_file = ((defined $run_id and length($run_id) > 0) ? '_' : '') . 'import'; + $subscribernumer_exclude_pattern = $data->{subscribernumer_exclude_pattern} if exists $data->{subscribernumer_exclude_pattern}; + (my $regexp_result,$subscribernumer_exclude_pattern) = parse_regexp($subscribernumer_exclude_pattern,$configfile); + $result &= $regexp_result; + $subscribernumer_exclude_exception_pattern = $data->{subscribernumer_exclude_exception_pattern} if exists $data->{subscribernumer_exclude_exception_pattern}; + (my $regexp_result,$subscribernumer_exclude_exception_pattern) = parse_regexp($subscribernumer_exclude_exception_pattern,$configfile); + $result &= $regexp_result; - $dry = $data->{dry} if exists $data->{dry}; + $lnp_define_filename = _get_import_filename($lnp_define_filename,$data,'lnp_define_filename'); + $lnp_define_import_numofthreads= _get_import_numofthreads($cpucount,$data,'lnp_define_import_numofthreads'); - return 1; + return $result; } return 0; } +sub _prepare_working_paths { + + my ($create) = @_; + my $result = 1; + my $path_result; + + ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + ($path_result,$rollback_path) = create_path($working_path . 'rollback',$rollback_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + + return $result; + +} + +sub _get_import_numofthreads { + my ($default_value,$data,$key) = @_; + my $import_numofthreads = $default_value; + $import_numofthreads = $data->{$key} if exists $data->{$key}; + $import_numofthreads = $cpucount if $import_numofthreads > $cpucount; + return $import_numofthreads; +} + +sub _get_import_db_file { + my ($run,$name) = @_; + return ((defined $run and length($run) > 0) ? '_' : '') . $name; +} + +sub _get_import_filename { + my ($old_value,$data,$key) = @_; + my $import_filename = $old_value; + $import_filename = $data->{$key} if exists $data->{$key}; + if (defined $import_filename and length($import_filename) > 0) { + $import_filename = $input_path . $import_filename unless -e $import_filename; + } + return $import_filename; +} + sub check_dry { if ($dry) { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/config.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/config.cfg index c2582e9..6fa9dd8 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/config.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/config.cfg @@ -31,7 +31,7 @@ emailenable = 0 erroremailrecipient = warnemailrecipient = completionemailrecipient = rkrenn@sipwise.com -successemailrecipient = +doneemailrecipient = ##logging: fileloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/process.pl b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/process.pl index 6e598b4..137d317 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/process.pl @@ -11,27 +11,34 @@ use Fcntl qw(LOCK_EX LOCK_NB); use NGCP::BulkProcessor::Globals qw(); use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw( - $defaultsettings - $defaultconfig update_settings check_dry + $output_path + $rollback_path + $defaultsettings + $defaultconfig + $dry + $force $run_id $features_define_filename $subscriber_define_filename - $dry - $force + $lnp_define_filename ); use NGCP::BulkProcessor::Logging qw( init_log getlogger $attachmentlogfile scriptinfo + cleanuplogfiles + $currentlogfile ); use NGCP::BulkProcessor::LogError qw ( completion - success + done scriptwarn scripterror + filewarn + fileerror ); use NGCP::BulkProcessor::LoadConfig qw( load_config @@ -39,14 +46,17 @@ use NGCP::BulkProcessor::LoadConfig qw( $YAML_CONFIG_TYPE ); use NGCP::BulkProcessor::Array qw(removeduplicates); -use NGCP::BulkProcessor::Utils qw(getscriptpath prompt); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles wrap_mailbody $signature $normalpriority $lowpriority $highpriority ); +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); use NGCP::BulkProcessor::ConnectorPool qw(); use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw(); @@ -54,17 +64,30 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw use NGCP::BulkProcessor::Projects::Migration::IPGallery::Import qw( import_features_define import_subscriber_define + import_lnp_define ); +use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption qw(); +use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem qw(); +use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp qw(); + scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet my @TASK_OPTS = (); my $tasks = []; -my $import_features_define_task_opt = 'import_features_define'; +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); +my $cleanup_all_task_opt = 'cleanup_all'; +push(@TASK_OPTS,$cleanup_all_task_opt); +my $import_features_define_task_opt = 'import_features'; push(@TASK_OPTS,$import_features_define_task_opt); -my $import_subscriber_define_task_opt = 'import_subscriber_define'; +my $import_subscriber_define_task_opt = 'import_subscriber'; push(@TASK_OPTS,$import_subscriber_define_task_opt); +my $import_lnp_define_task_opt = 'import_lnp'; +push(@TASK_OPTS,$import_lnp_define_task_opt); + if (init()) { main(); @@ -78,13 +101,14 @@ sub init { my $configfile = $defaultconfig; my $settingsfile = $defaultsettings; - GetOptions ("config=s" => \$configfile, - "settings=s" => \$settingsfile, - "task=s" => $tasks, - "run=s" => \$run_id, - "dry" => \$dry, - "force" => \$force, - ) or scripterror('error in command line arguments',getlogger(getscriptpath())); + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + "run=s" => \$run_id, + "dry" => \$dry, + "force" => \$force, + ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); $tasks = removeduplicates($tasks,1); @@ -100,22 +124,27 @@ sub main() { my @messages = (); my @attachmentfiles = (); - my $result = 0; + my $result = 1; my $completion = 0; if ('ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { foreach my $task (@$tasks) { - if (lc($import_features_define_task_opt) eq lc($task)) { - scriptinfo('task: ' . $import_features_define_task_opt,getlogger(getscriptpath())); - $result |= import_features_define_task(\@messages); + if (lc($cleanup_task_opt) eq lc($task)) { + $result = cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result); + } elsif (lc($cleanup_all_task_opt) eq lc($task)) { + $result = cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); + } elsif (lc($import_features_define_task_opt) eq lc($task)) { + $result = import_features_define_task(\@messages) if taskinfo($import_features_define_task_opt,$result); } elsif (lc($import_subscriber_define_task_opt) eq lc($task)) { - scriptinfo('task: ' . $import_subscriber_define_task_opt,getlogger(getscriptpath())); - $result |= import_subscriber_define_task(\@messages); + $result = import_subscriber_define_task(\@messages) if taskinfo($import_subscriber_define_task_opt,$result); + } elsif (lc($import_lnp_define_task_opt) eq lc($task)) { + $result = import_lnp_define_task(\@messages) if taskinfo($import_lnp_define_task_opt,$result); } elsif (lc('blah') eq lc($task)) { - scriptinfo('task: ' . 'balh',getlogger(getscriptpath())); - next unless check_dry(); - $result |= import_features_define_task(\@messages); - $completion |= 1; + if (taskinfo($cleanup_task_opt,$result)) { + next unless check_dry(); + $result = import_features_define_task(\@messages); + $completion |= 1; + } } else { $result = 0; scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); @@ -128,60 +157,120 @@ sub main() { } push(@attachmentfiles,$attachmentlogfile); - if ($result) { - if ($completion) { - completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); - } else { - success(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); - } + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); } else { - success(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); } return $result; } -sub cleanup_task { +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} +sub cleanup_task { + my ($messages,$clean_generated) = @_; + my $result = 0; + if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { + eval { + cleanupcvsdirs() if $clean_generated; + cleanupdbfiles() if $clean_generated; + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + cleanupdir($rollback_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + } + if ($@ or !$result) { + push(@$messages,'working directory cleanup incomplete'); + return 0; + } else { + push(@$messages,'working directory folders cleaned up'); + return 1; + } } sub import_features_define_task { - my ($messages) = shift; - if (import_features_define( - $features_define_filename - )) { - push(@$messages,'sucessfully inserted x records...'); - return 1; + my ($messages) = @_; + my $result = 0; + eval { + $result = import_features_define($features_define_filename); + }; + my $err = $@; + my $stats = ' feature option: ' . + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::countby_subscribernumber() . ' rows'; + $stats .= "\n feature set option items: " . + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::countby_subscribernumber_option() . ' rows'; + if ($err or !$result) { + push(@$messages,"importing subscriber features incomplete\n$stats"); } else { - push(@$messages,'was not executed'); - return 0; + push(@$messages,"importing subscriber features completed\n$stats"); } + destroy_dbs(); #every task should leave with closed connections. + return $result; } sub import_subscriber_define_task { - my ($messages) = shift; - if (import_subscriber_define( - $subscriber_define_filename - )) { - push(@$messages,'sucessfully inserted x records...'); - return 1; + my ($messages) = @_; + my $result = 0; + eval { + $result = import_subscriber_define($subscriber_define_filename); + }; + my $err = $@; + my $stats = ' subscriber: ' . + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::countby_subscribernumber() . ' rows'; + if ($err or !$result) { + push(@$messages,"importing subscribers incomplete\n$stats"); } else { - push(@$messages,'was not executed'); - return 0; + push(@$messages,"importing subscribers completed\n$stats"); } + destroy_dbs(); #every task should leave with closed connections. + return $result; } -END { - # this should not be required explicitly, but prevents Log4Perl's - # "rootlogger not initialized error upon exit.. +sub import_lnp_define_task { + + my ($messages) = @_; + my $result = 0; + eval { + $result = import_lnp_define($lnp_define_filename); + }; + my $err = $@; + my $stats = ' lnp numbers: ' . + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::countby_lrncode_portednumber() . ' rows'; + $stats .= "\n lrn codes: " . + NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::count_lrncodes(); + if ($err or !$result) { + push(@$messages,"importing lnp numbers incomplete\n$stats"); + } else { + push(@$messages,"importing lnp numbers\n$stats"); + } + destroy_dbs(); #every task should leave with closed connections. + return $result; + +} + +sub destroy_dbs() { NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool::destroy_dbs(); NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); } +#END { +# # this should not be required explicitly, but prevents Log4Perl's +# # "rootlogger not initialized error upon exit.. +# NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool::destroy_dbs(); +# NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); +#} + __DATA__ This exists to allow the locking code at the beginning of the file to work. DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/settings.cfg index 10a66d6..33175b3 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/settings.cfg @@ -1,10 +1,14 @@ -features_define_filename = /home/rkrenn/test/Features_Define.cfg - +#dry=0 import_multithreading = 1 + +features_define_filename = /home/rkrenn/test/Features_Define.cfg features_define_import_numofthreads = 2 subscriber_define_filename = /home/rkrenn/test/Subscriber_Define.cfg subscriber_define_import_numofthreads = 2 +subscribernumer_exclude_pattern = ^3562770 +subscribernumer_exclude_exception_pattern = ^35627702770$ -#dry=0 +lnp_define_filename = /home/rkrenn/test/LNP_Define.cfg +lnp_define_import_numofthreads = 2 diff --git a/lib/NGCP/BulkProcessor/SqlConnector.pm b/lib/NGCP/BulkProcessor/SqlConnector.pm index 07db0bb..216f48d 100644 --- a/lib/NGCP/BulkProcessor/SqlConnector.pm +++ b/lib/NGCP/BulkProcessor/SqlConnector.pm @@ -177,6 +177,14 @@ sub paginate_sort_query { } +sub insert_ignore_phrase { + + my $self = shift; + + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); + +} + sub _force_numeric_column { my $self = shift; my $column = shift; @@ -440,6 +448,22 @@ sub _fetch_error { } +#sub db_autocommit { +# +# my $self = shift; +# if (defined $self->{dbh}) { +# if (@_) { +# my ($autocommit) = @_; +# $autocommit = ($autocommit ? 1 : 0); +# dbdebug($self,'set AutoCommit ' . $self->{dbh}->{AutoCommit} . ' -> ' . $autocommit,getlogger(__PACKAGE__)); +# $self->{dbh}->{AutoCommit} = $autocommit; +# } +# return $self->{dbh}->{AutoCommit}; +# } +# return undef; +# +#} + # This method executes a SQL query that doesn't return any data. The # query may contain placeholders, that will be replaced by the elements # in @params during execute(). The method will die if any error occurs @@ -702,7 +726,8 @@ sub lock_tables { my $self = shift; my $tablestolock = shift; - $self->db_begin(); + #$self->db_begin(); + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); } @@ -710,7 +735,8 @@ sub unlock_tables { my $self = shift; - $self->db_commit(); + #$self->db_commit(); + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); } @@ -718,18 +744,19 @@ sub db_do_begin { my $self = shift; my $query = shift; - my $tablename = shift; - my $lock = shift; + #my $tablename = shift; + my $transactional = shift; #notimplementederror('db_do_begin',getlogger(__PACKAGE__)); - if (defined $self->{dbh} and !defined $self->{sth} and length($tablename) > 0) { + if (defined $self->{dbh} and !defined $self->{sth}) { # and length($tablename) > 0) { - if ($lock) { - $self->lock_tables({ $tablename => 'WRITE' }); + dbdebug($self,'db_do_begin: ' . $query,getlogger(__PACKAGE__)); + if ($transactional) { + #$self->lock_tables({ $tablename => 'WRITE' }); + $self->db_begin(); } - dbdebug($self,'db_do_begin: ' . $query,getlogger(__PACKAGE__)); $self->{sth} = $self->{dbh}->prepare($query) or $self->_prepare_error($query); $self->{query} = $query; $self->{params} = []; @@ -765,17 +792,18 @@ sub db_get_begin { my $self = shift; my $query = shift; - my $tablename = shift; - my $lock = shift; + #my $tablename = shift; + my $transactional = shift; - if (defined $self->{dbh} and !defined $self->{sth} and length($tablename) > 0) { + if (defined $self->{dbh} and !defined $self->{sth}) { # and length($tablename) > 0) { + dbdebug($self,'db_get_begin: ' . $query . "\nparameters:\n" . join(', ', @_),getlogger(__PACKAGE__)); #eval { $self->lock_tables({ $tablename => 'WRITE' }); }; - if ($lock) { - $self->lock_tables({ $tablename => 'WRITE' }); + if ($transactional) { + #$self->lock_tables({ $tablename => 'WRITE' }); + $self->db_begin(); } - dbdebug($self,'db_get_begin: ' . $query . "\nparameters:\n" . join(', ', @_),getlogger(__PACKAGE__)); $self->{sth} = $self->{dbh}->prepare($query) or $self->_prepare_error($query); $self->{sth}->execute(@_) or $self->_execute_error($query,$self->{sth},@_); $self->{query} = $query; @@ -844,7 +872,7 @@ sub db_get_rowblock { sub db_finish { my $self = shift; - my $unlock = shift; + my $transactional = shift; # since this is also called from DESTROY, no die() here! @@ -855,8 +883,9 @@ sub db_finish { $self->{sth}->finish(); $self->{sth} = undef; - if ($unlock) { - $self->unlock_tables(); + if ($transactional) { + #$self->unlock_tables(); + $self->db_commit(); } $self->{query} = undef; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm index 1d2da4b..8cbac82 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm @@ -50,7 +50,8 @@ use HTML::PullParser qw(); use HTML::Entities qw(decode_entities); use IO::Uncompress::Unzip qw(unzip $UnzipError); -use DateTime::Format::Excel; +# no debian package yet: +#use DateTime::Format::Excel; require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlConnector); @@ -60,10 +61,10 @@ our @EXPORT_OK = qw( xlsxbin2csv sanitize_column_name sanitize_spreadsheet_name - excel_to_timestamp - excel_to_date get_tableidentifier $csvextension); +#excel_to_timestamp +#excel_to_date our $csvextension = '.csv'; @@ -80,8 +81,9 @@ my $LongTruncOk = 0; #my $logger = getlogger(__PACKAGE__); -my $lock_do_chunk = 0; -my $lock_get_chunk = 0; +#my $lock_do_chunk = 0; +#my $lock_get_chunk = 0; +my $rowblock_transactional = 0; my $invalid_excel_spreadsheet_chars_pattern = '[' . quotemeta('[]:*?/\\') . ']'; @@ -97,23 +99,23 @@ sub sanitize_column_name { return $column_name; } -sub excel_to_date { - my $excel_date_value = shift; - if ($excel_date_value > 0) { - my $datetime = DateTime::Format::Excel->parse_datetime($excel_date_value); - return $datetime->ymd('-'); # prints 1992-02-28 - } - return undef; -} - -sub excel_to_timestamp { - my $excel_datetime_value = shift; - if ($excel_datetime_value > 0) { - my $datetime = DateTime::Format::Excel->parse_datetime($excel_datetime_value); - return $datetime->ymd('-') . ' ' . $datetime->hms(':'); - } - return undef; -} +#sub excel_to_date { +# my $excel_date_value = shift; +# if ($excel_date_value > 0) { +# my $datetime = DateTime::Format::Excel->parse_datetime($excel_date_value); +# return $datetime->ymd('-'); # prints 1992-02-28 +# } +# return undef; +#} + +#sub excel_to_timestamp { +# my $excel_datetime_value = shift; +# if ($excel_datetime_value > 0) { +# my $datetime = DateTime::Format::Excel->parse_datetime($excel_datetime_value); +# return $datetime->ymd('-') . ' ' . $datetime->hms(':'); +# } +# return undef; +#} sub new { @@ -363,13 +365,14 @@ sub cleanupcvsdirs { # filewarn('cannot remove ' . $dirpath . ': ' . $!,getlogger(__PACKAGE__)); #} remove_tree($dirpath, { - keep_root => 0, - error => \my $err }); + 'keep_root' => 0, + 'verbose' => 1, + 'error' => \my $err }); if (@$err) { for my $diag (@$err) { my ($file, $message) = %$diag; if ($file eq '') { - filewarn("general error: $message",getlogger(__PACKAGE__)); + filewarn("cleanup: $message",getlogger(__PACKAGE__)); } else { filewarn("problem unlinking $file: $message",getlogger(__PACKAGE__)); } @@ -557,9 +560,9 @@ sub db_do_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; - $self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_); + $self->SUPER::db_do_begin($query,$rowblock_transactional,@_); } @@ -567,9 +570,9 @@ sub db_get_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; - $self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_); + $self->SUPER::db_get_begin($query,$rowblock_transactional,@_); } @@ -577,7 +580,7 @@ sub db_finish { my $self = shift; - $self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk); + $self->SUPER::db_finish($rowblock_transactional); } diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm index ec813b6..e572040 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm @@ -50,8 +50,10 @@ my $LongTruncOk = 0; #my $logger = getlogger(__PACKAGE__); -my $lock_do_chunk = 1; -my $lock_get_chunk = 0; +#my $lock_do_chunk = 0; #1; +#my $lock_get_chunk = 0; + +my $rowblock_transactional = 1; my $serialization_level = ''; #'SERIALIZABLE' @@ -430,6 +432,14 @@ sub multithreading_supported { } +sub insert_ignore_phrase { + + my $self = shift; + + return 'IGNORE'; + +} + sub truncate_table { my $self = shift; @@ -465,41 +475,41 @@ sub drop_table { } - -sub lock_tables { - - my $self = shift; - my $tablestolock = shift; - - if (defined $self->{dbh} and defined $tablestolock and ref $tablestolock eq 'HASH') { - - my $locks = join(', ',map { local $_ = $_; $_ = $self->tableidentifier($_) . ' ' . $tablestolock->{$_}; $_; } keys %$tablestolock); - dbdebug($self,"lock_tables:\n" . $locks,getlogger(__PACKAGE__)); - $self->db_do('LOCK TABLES ' . $locks); - - } - -} - -sub unlock_tables { - - my $self = shift; - if (defined $self->{dbh}) { - - dbdebug($self,'unlock_tables',getlogger(__PACKAGE__)); - $self->db_do('UNLOCK TABLES'); - - } - -} +# too dangerous: +#sub lock_tables { +# +# my $self = shift; +# my $tablestolock = shift; +# +# if (defined $self->{dbh} and defined $tablestolock and ref $tablestolock eq 'HASH') { +# +# my $locks = join(', ',map { local $_ = $_; $_ = $self->tableidentifier($_) . ' ' . $tablestolock->{$_}; $_; } keys %$tablestolock); +# dbdebug($self,"lock_tables:\n" . $locks,getlogger(__PACKAGE__)); +# $self->db_do('LOCK TABLES ' . $locks); +# +# } +# +#} +# +#sub unlock_tables { +# +# my $self = shift; +# if (defined $self->{dbh}) { +# +# dbdebug($self,'unlock_tables',getlogger(__PACKAGE__)); +# $self->db_do('UNLOCK TABLES'); +# +# } +# +#} sub db_do_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; - $self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_); + $self->SUPER::db_do_begin($query,$rowblock_transactional,@_); } @@ -507,10 +517,10 @@ sub db_get_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; #my $lock = shift; - $self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_); + $self->SUPER::db_get_begin($query,$rowblock_transactional,@_); } @@ -519,7 +529,7 @@ sub db_finish { my $self = shift; #my $unlock = shift; - $self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk); + $self->SUPER::db_finish($rowblock_transactional); } diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm index 2af3e54..73eca73 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm @@ -48,8 +48,10 @@ my $LongTruncOk = 0; #my $logger = getlogger(__PACKAGE__); -my $lock_do_chunk = 0; -my $lock_get_chunk = 0; +#my $lock_do_chunk = 0; +#my $lock_get_chunk = 0; + +my $rowblock_transactional = 1; my $isolation_level = ''; #'SERIALIZABLE' @@ -504,9 +506,9 @@ sub db_do_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; - $self->SUPER::db_do_begin($query,$tablename,0,@_); + $self->SUPER::db_do_begin($query,$rowblock_transactional,@_); } @@ -514,9 +516,9 @@ sub db_get_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; - $self->SUPER::db_get_begin($query,$tablename,0,@_); + $self->SUPER::db_get_begin($query,$rowblock_transactional,@_); } @@ -525,7 +527,7 @@ sub db_finish { my $self = shift; #my $unlock = shift; - $self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk); + $self->SUPER::db_finish($rowblock_transactional); } diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm index 57c41df..defabb2 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm @@ -50,8 +50,10 @@ my $client_encoding = 'LATIN1'; #my $logger = getlogger(__PACKAGE__); -my $lock_do_chunk = 0; -my $lock_get_chunk = 0; +#my $lock_do_chunk = 0; +#my $lock_get_chunk = 0; + +my $rowblock_transactional = 1; #my $to_number_pattern = '9.9999999999999'; #EEEE'; @@ -507,9 +509,9 @@ sub db_do_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; - $self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_); + $self->SUPER::db_do_begin($query,$rowblock_transactional,@_); } @@ -517,10 +519,10 @@ sub db_get_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; #my $lock = shift; - $self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_); + $self->SUPER::db_get_begin($query,$rowblock_transactional,@_); } @@ -529,7 +531,7 @@ sub db_finish { my $self = shift; #my $unlock = shift; - $self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk); + $self->SUPER::db_finish($rowblock_transactional); } diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm index 2074280..d37b321 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm @@ -55,8 +55,10 @@ my $LongTruncOk = 0; #my $logger = getlogger(__PACKAGE__); -my $lock_do_chunk = 0; -my $lock_get_chunk = 0; +#my $lock_do_chunk = 0; +#my $lock_get_chunk = 0; + +my $rowblock_transactional = 1; my $transaction_isolation_level = ''; #'SERIALIZABLE' @@ -498,9 +500,9 @@ sub db_do_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; - $self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_); + $self->SUPER::db_do_begin($query,$rowblock_transactional,@_); } @@ -508,10 +510,10 @@ sub db_get_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; #my $lock = shift; - $self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_); + $self->SUPER::db_get_begin($query,$rowblock_transactional,@_); } @@ -520,7 +522,7 @@ sub db_finish { my $self = shift; #my $unlock = shift; - $self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk); + $self->SUPER::db_finish($rowblock_transactional); } diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm index af1a8e9..f4971d9 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm @@ -76,8 +76,10 @@ my $LongTruncOk = 0; #my $logger = getlogger(__PACKAGE__); -my $lock_do_chunk = 1; -my $lock_get_chunk = 1; +#my $lock_do_chunk = 0; #1; +#my $lock_get_chunk = 0; #1; + +my $rowblock_transactional = 1; #SQLite transactions are always serializable. @@ -490,6 +492,14 @@ sub multithreading_supported { } +sub insert_ignore_phrase { + + my $self = shift; + + return 'OR IGNORE'; + +} + sub truncate_table { my $self = shift; @@ -583,9 +593,9 @@ sub db_do_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; - $self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_); + $self->SUPER::db_do_begin($query,$rowblock_transactional,@_); } @@ -593,10 +603,10 @@ sub db_get_begin { my $self = shift; my $query = shift; - my $tablename = shift; + #my $tablename = shift; #my $lock = shift; - $self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_); + $self->SUPER::db_get_begin($query,$rowblock_transactional,@_); } @@ -605,7 +615,7 @@ sub db_finish { my $self = shift; #my $unlock = shift; - $self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk); + $self->SUPER::db_finish($rowblock_transactional); } diff --git a/lib/NGCP/BulkProcessor/SqlRecord.pm b/lib/NGCP/BulkProcessor/SqlRecord.pm index 7726a50..cb50cdb 100644 --- a/lib/NGCP/BulkProcessor/SqlRecord.pm +++ b/lib/NGCP/BulkProcessor/SqlRecord.pm @@ -252,7 +252,7 @@ sub checktableinfo { my ($get_db,$tablename,$expected_fieldnames,$target_indexes) = @_; - my $success = 1; + my $result = 1; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; @@ -295,7 +295,7 @@ sub checktableinfo { # otherwise we log a failure (exit? - see Logging Module) #$table_fieldnames_cached->{$connectidentifier}->{$tablename} = {}; #$fieldnames; fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename},getlogger(__PACKAGE__)); - $success = 0; + $result = 0; } } @@ -328,7 +328,7 @@ sub checktableinfo { #$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($target_indexes); $table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = $target_indexes; - return $success; + return $result; } @@ -719,12 +719,12 @@ sub transfer_records { sub insert_stmt { - my ($get_db,$tablename) = @_; + my ($get_db,$tablename,$insert_ignore) = @_; my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db; my $connectidentifier = $db->connectidentifier(); my $tid = threadid(); my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename}; - return 'INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + return 'INSERT ' . ($insert_ignore ? $db->insert_ignore_phrase() . ' ' : '') . 'INTO ' . $db->tableidentifier($tablename) . ' (' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ') VALUES (' . substr(',?' x scalar @$expected_fieldnames,1) . ')'; @@ -869,7 +869,7 @@ sub transfer_table { #$target_db = &$get_target_db($writer_connection_name); eval { - $db->db_get_begin($selectstatement,$tablename,@values); + $db->db_get_begin($selectstatement,@values); #$tablename my $i = 0; while (1) { @@ -878,7 +878,7 @@ sub transfer_table { my $realblocksize = scalar @$rowblock; if ($realblocksize > 0) { writing_rows($target_db,$targettablename,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__)); - $target_db->db_do_begin($insertstatement,$targettablename); + $target_db->db_do_begin($insertstatement); #,$targettablename); $target_db->db_do_rowblock($rowblock); $target_db->db_finish(); $i += $realblocksize; @@ -1137,14 +1137,14 @@ sub process_table { #$db->db_disconnect(); #undef $db; #$db = &$get_db($reader_connection_name); - my $context = {}; + my $context = { tid => $tid }; my $rowblock_result = 1; eval { if ('CODE' eq ref $init_process_context_code) { &$init_process_context_code($context); } - $db->db_get_begin($selectstatement,$tablename,@values); + $db->db_get_begin($selectstatement,@values); #$tablename my $i = 0; while (1) { @@ -1156,7 +1156,7 @@ sub process_table { $rowblock_result = &$process_code($context,$rowblock,$i); - #$target_db->db_do_begin($insertstatement,$targettablename); + #$target_db->db_do_begin($insertstatement); #,$targettablename); #$target_db->db_do_rowblock($rowblock); #$target_db->db_finish(); $i += $realblocksize; @@ -1170,9 +1170,6 @@ sub process_table { } $db->db_finish(); - if ('CODE' eq ref $uninit_process_context_code) { - &$uninit_process_context_code($context); - } }; if ($@) { @@ -1181,6 +1178,11 @@ sub process_table { $errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED; } + eval { + if ('CODE' eq ref $uninit_process_context_code) { + &$uninit_process_context_code($context); + } + }; $db->db_disconnect(); #undef $db; @@ -1296,6 +1298,7 @@ sub _reader { my $reader_db; my $tid = threadid(); + $context->{tid} = $tid; { lock $context->{errorstates}; $context->{errorstates}->{$tid} = $RUNNING; @@ -1306,7 +1309,7 @@ sub _reader { my $blockcount = 0; eval { $reader_db = &{$context->{get_db}}(); #$reader_connection_name); - $reader_db->db_get_begin($context->{selectstatement},$context->{tablename},@{$context->{values_ref}}); + $reader_db->db_get_begin($context->{selectstatement},@{$context->{values_ref}}); #$context->{tablename} tablethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__)); while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up #yield(); @@ -1376,6 +1379,7 @@ sub _writer { #get_target_db my $writer_db; my $tid = threadid(); + $context->{tid} = $tid; { lock $context->{errorstates}; $context->{errorstates}->{$tid} = $RUNNING; @@ -1391,7 +1395,7 @@ sub _writer { if ($packet->{size} > 0) { writing_rows($writer_db,$context->{targettablename},$packet->{row_offset},$packet->{size},$context->{rowcount},getlogger(__PACKAGE__)); - $writer_db->db_do_begin($context->{insertstatement},$context->{targettablename}); + $writer_db->db_do_begin($context->{insertstatement}); #,$context->{targettablename}); $writer_db->db_do_rowblock($packet->{rows}); $writer_db->db_finish(); $blockcount++; @@ -1427,6 +1431,7 @@ sub _process { #my $writer_db; my $rowblock_result = 1; my $tid = threadid(); + $context->{tid} = $tid; { lock $context->{errorstates}; $context->{errorstates}->{$tid} = $RUNNING; @@ -1447,7 +1452,7 @@ sub _process { #writing_rows($writer_db,$context->{targettablename},$i,$realblocksize,$context->{rowcount},getlogger(__PACKAGE__)); - #$writer_db->db_do_begin($context->{insertstatement},$context->{targettablename}); + #$writer_db->db_do_begin($context->{insertstatement}); #,$context->{targettablename}); #$writer_db->db_do_rowblock($rowblock); #$writer_db->db_finish(); @@ -1475,16 +1480,17 @@ sub _process { sleep($thread_sleep_secs); #2015-01 } } + + }; + my $err = $@; + tablethreadingdebug($err ? '[' . $tid . '] processor thread error: ' . $err : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__)); + eval { if ('CODE' eq ref $context->{uninit_process_context_code}) { &{$context->{uninit_process_context_code}}($context); } }; - #if (defined $writer_db) { - # $writer_db->db_disconnect(); - #} - tablethreadingdebug($@ ? '[' . $tid . '] processor thread error: ' . $@ : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__)); lock $context->{errorstates}; - if ($@) { + if ($err) { $context->{errorstates}->{$tid} = $ERROR; } else { $context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED; diff --git a/lib/NGCP/BulkProcessor/Utils.pm b/lib/NGCP/BulkProcessor/Utils.pm index 0957d0b..d25c4e7 100644 --- a/lib/NGCP/BulkProcessor/Utils.pm +++ b/lib/NGCP/BulkProcessor/Utils.pm @@ -543,27 +543,29 @@ sub kbytes2gigs { sub cleanupdir { - my ($dirpath,$keeproot,$scriptinfocode,$filewarncode,$logger) = @_; + my ($dirpath,$keeproot,$filewarncode,$logger) = @_; if (-d $dirpath) { remove_tree($dirpath, { - keep_root => $keeproot, - error => \my $err }); + 'keep_root' => $keeproot, + 'verbose' => 1, + 'error' => \my $err }); if (@$err) { if (defined $filewarncode and ref $filewarncode eq 'CODE') { for my $diag (@$err) { my ($file, $message) = %$diag; if ($file eq '') { - &$filewarncode("general error: $message",$logger); + &$filewarncode("cleanup: $message",$logger); } else { &$filewarncode("problem unlinking $file: $message",$logger); } } } - } else { - if (defined $scriptinfocode and ref $scriptinfocode eq 'CODE') { - &$scriptinfocode($dirpath . ' removed',$logger); - } } + #else { + # if (!$keeproot and defined $scriptinfocode and ref $scriptinfocode eq 'CODE') { + # &$scriptinfocode($dirpath . ' removed',$logger); + # } + #} #if ($restoredir) { # makedir($dirpath); #} @@ -583,13 +585,14 @@ sub makepath { #changemod($dirpath); make_path($dirpath,{ 'chmod' => $chmod_umask, + 'verbose' => 1, 'error' => \my $err }); if (@$err) { if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') { for my $diag (@$err) { my ($file, $message) = %$diag; if ($file eq '') { - &$fileerrorcode("general error: $message",$logger); + &$fileerrorcode("creating path: $message",$logger); } else { &$fileerrorcode("problem creating $file: $message",$logger); } diff --git a/lib/NGCP/BulkProcessor/default.cfg b/lib/NGCP/BulkProcessor/default.cfg index 9db8e7f..7cf3336 100644 --- a/lib/NGCP/BulkProcessor/default.cfg +++ b/lib/NGCP/BulkProcessor/default.cfg @@ -31,7 +31,7 @@ emailenable = 0 erroremailrecipient = warnemailrecipient = completionemailrecipient = rkrenn@sipwise.com -successemailrecipient = +doneemailrecipient = ##logging: fileloglevel = OFF