MT#18663 row bulk processing framework WIP #6

+import LNP_Define.cfg
+workingdir folder for rollback logs
+cleanup task
+chained task execution
+dry mode for import
+refactoring txn demaraction for rowblock
 db operations
+take out "lock table" impl completely,
 too dangerous
+take out DateTime::Format::Excel dependency
 for now
+allow '#' comments in Subscriber_Define.cfg
 and Lnp_Define.cfg
+check for import sequence
+referential integrity checks for Subscriber_Define.cfg
+task summary messages / rowcounts
+exclude and exclude exception number patterns
 for Subscriber_Define.cfg
+ignore_unique options

Change-Id: If1f5094e15828633e212b1a8d651c97816c388b3
changes/28/6928/4
Rene Krenn 9 years ago
parent 52d66ca0de
commit 1c669a3a73

@ -154,7 +154,9 @@ sub process {
} else {
my $context = { instance => $self,
filename => $file,};
filename => $file,
tid => $tid,
};
my $rowblock_result = 1;
eval {
@ -237,10 +239,6 @@ sub process {
}
close(INPUTFILE);
if ('CODE' eq ref $uninit_process_context_code) {
&$uninit_process_context_code($context);
}
};
if ($@) {
@ -249,6 +247,11 @@ sub process {
$errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED;
}
eval {
if ('CODE' eq ref $uninit_process_context_code) {
&$uninit_process_context_code($context);
}
};
}
if ($errorstate == $COMPLETED) {
@ -269,6 +272,7 @@ sub _reader {
my $context = shift;
my $tid = threadid();
$context->{tid} = $tid;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
@ -396,6 +400,7 @@ sub _process {
my $rowblock_result = 1;
my $tid = threadid();
$context->{tid} = $tid;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
@ -433,13 +438,16 @@ sub _process {
sleep($thread_sleep_secs); #2015-01
}
}
};
my $err = $@;
filethreadingdebug($err ? '[' . $tid . '] processor thread error: ' . $err : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
eval {
if ('CODE' eq ref $context->{uninit_process_context_code}) {
&{$context->{uninit_process_context_code}}($context);
}
};
filethreadingdebug($@ ? '[' . $tid . '] processor thread error: ' . $@ : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
lock $context->{errorstates};
if ($@) {
if ($err) {
$context->{errorstates}->{$tid} = $ERROR;
} else {
$context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED;

@ -39,7 +39,8 @@ our @EXPORT_OK = qw(
$application_path
$executable_path
$working_path
update_working_path
create_path
$appstartsecs
$enablemultithreading
$root_threadid
@ -67,14 +68,13 @@ our @EXPORT_OK = qw(
$ngcprestapi_realm
$csv_path
$input_path
$local_db_path
$emailenable
$erroremailrecipient
$warnemailrecipient
$completionemailrecipient
$successemailrecipient
$doneemailrecipient
$mailfile_path
$ismsexchangeserver
@ -94,6 +94,7 @@ our @EXPORT_OK = qw(
$defaultconfig
update_mainconfig
create_path
$chmod_umask
@ -157,7 +158,7 @@ our $ngcprestapi_realm = 'api_admin_http';
our $working_path = tempdir(CLEANUP => 0) . '/'; #'/var/sipwise/';
our $input_path = $working_path . 'input/';
#our $input_path = $working_path . 'input/';
# csv
our $csv_path = $working_path . 'csv/';
@ -182,7 +183,7 @@ our $local_db_path = $working_path . 'db/';
#mkdir $local_db_path;
#our $rollback_path = $working_path . 'rollback/';
# email setup
@ -196,7 +197,7 @@ our $writefiles = 0; # save emails
our $erroremailrecipient = ''; #'rkrenn@sipwise.com';
our $warnemailrecipient = ''; #'rkrenn@sipwise.com';
our $completionemailrecipient = '';
our $successemailrecipient = '';
our $doneemailrecipient = '';
our $mailprog = "/usr/sbin/sendmail"; # linux only
our $mailtype = 1; #0 .. mailprog, 1 .. socket, 2 .. Net::SMTP
@ -230,7 +231,8 @@ sub update_mainconfig {
my ($data,$configfile,
$split_tuplecode,
$format_number,
$format_numbercode,
$parse_regexpcode,
$configurationinfocode,
$configurationwarncode,
$configurationerrorcode,
@ -239,6 +241,8 @@ sub update_mainconfig {
if (defined $data) {
my $result = 1;
# databases - dsp
$accounting_host = $data->{accounting_host} if exists $data->{accounting_host};
$accounting_port = $data->{accounting_port} if exists $data->{accounting_port};
@ -269,7 +273,11 @@ sub update_mainconfig {
@jobservers = ($data->{jobservers}) if exists $data->{jobservers};
}
if (defined $format_number and ref $format_number eq 'CODE') {
if (defined $format_numbercode and ref $format_numbercode eq 'CODE') {
}
if (defined $parse_regexpcode and ref $parse_regexpcode eq 'CODE') {
}
@ -278,7 +286,7 @@ sub update_mainconfig {
$erroremailrecipient = $data->{erroremailrecipient} if exists $data->{erroremailrecipient};
$warnemailrecipient = $data->{warnemailrecipient} if exists $data->{warnemailrecipient};
$completionemailrecipient = $data->{completionemailrecipient} if exists $data->{completionemailrecipient};
$successemailrecipient = $data->{successemailrecipient} if exists $data->{successemailrecipient};
$doneemailrecipient = $data->{doneemailrecipient} if exists $data->{doneemailrecipient};
$ismsexchangeserver = $data->{ismsexchangeserver} if exists $data->{ismsexchangeserver};
$smtp_server = $data->{smtp_server} if exists $data->{smtp_server};
@ -289,143 +297,77 @@ sub update_mainconfig {
$screenloglevel = $data->{screenloglevel} if exists $data->{screenloglevel};
$emailloglevel = $data->{emailloglevel} if exists $data->{emailloglevel};
my $new_working_path = (exists $data->{working_path} ? $data->{working_path} : $working_path);
if (exists $data->{working_path}) {
$result &= _prepare_working_paths($data->{working_path},1,$fileerrorcode,$configlogger);
} else {
$result &= _prepare_working_paths($working_path,1,$fileerrorcode,$configlogger);
}
return update_working_path($new_working_path,1,$fileerrorcode,$configlogger);
return $result;
}
return 0;
}
sub update_working_path {
sub _prepare_working_paths {
my ($new_working_path,$create,$fileerrorcode,$logger) = @_;
my $result = 1;
if (defined $new_working_path and length($new_working_path) > 0) {
$new_working_path = fixdirpath($new_working_path);
if (-d $new_working_path) {
$working_path = $new_working_path;
} else {
if ($create) {
if (makepath($new_working_path,$fileerrorcode,$logger)) {
$working_path = $new_working_path;
} else {
$result = 0;
}
} else {
$result = 0;
if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') {
&$fileerrorcode("working path '$new_working_path' does not exist",$logger);
}
}
}
my $path_result;
($path_result,$working_path) = create_path($new_working_path,$working_path,$create,$fileerrorcode,$logger);
$result &= $path_result;
($path_result,$csv_path) = create_path($working_path . 'csv',$csv_path,$create,$fileerrorcode,$logger);
$result &= $path_result;
#($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,$fileerrorcode,$logger);
#$result &= $path_result;
($path_result,$logfile_path) = create_path($working_path . 'log',$logfile_path,$create,$fileerrorcode,$logger);
$result &= $path_result;
($path_result,$local_db_path) = create_path($working_path . 'db',$local_db_path,$create,$fileerrorcode,$logger);
$result &= $path_result;
($path_result,$mailfile_path) = create_path($working_path . 'mails',$local_db_path,$create,$fileerrorcode,$logger);
$result &= $path_result;
#($path_result,$rollback_path) = create_path($working_path . 'rollback',$rollback_path,$create,$fileerrorcode,$logger);
#$result &= $path_result;
my $new_csv_path = $working_path . 'csv/';
if (-d $new_csv_path) {
$csv_path = $new_csv_path;
} else {
if ($create) {
if (makepath($new_csv_path,$fileerrorcode,$logger)) {
$csv_path = $new_csv_path;
} else {
$result = 0;
}
} else {
$result = 0;
if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') {
&$fileerrorcode("csv path '$new_csv_path' does not exist",$logger);
}
}
}
return $result;
my $new_input_path = $working_path . 'input/';
if (-d $new_input_path) {
$input_path = $new_input_path;
} else {
if ($create) {
if (makepath($new_input_path,$fileerrorcode,$logger)) {
$input_path = $new_input_path;
} else {
$result = 0;
}
} else {
$result = 0;
if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') {
&$fileerrorcode("input path '$new_input_path' does not exist",$logger);
}
}
}
}
my $new_logfile_path = $working_path . 'log/';
if (-d $new_logfile_path) {
$logfile_path = $new_logfile_path;
} else {
if ($create) {
if (makepath($new_logfile_path,$fileerrorcode,$logger)) {
$logfile_path = $new_logfile_path;
} else {
$result = 0;
}
} else {
$result = 0;
if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') {
&$fileerrorcode("logfile path '$new_logfile_path' does not exist",$logger);
}
}
}
sub get_applicationpath {
my $new_local_db_path = $working_path . 'db/';
if (-d $new_local_db_path) {
$local_db_path = $new_local_db_path;
} else {
if ($create) {
if (makepath($new_local_db_path,$fileerrorcode,$logger)) {
$local_db_path = $new_local_db_path;
} else {
$result = 0;
}
} else {
$result = 0;
if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') {
&$fileerrorcode("local db path '$new_local_db_path' does not exist",$logger);
}
}
}
return dirname(abs_path(__FILE__)) . '/';
my $new_mailfile_path = $working_path . 'mails/';
if (-d $new_mailfile_path) {
$mailfile_path = $new_mailfile_path;
}
sub create_path {
my ($new_value,$old_value,$create,$fileerrorcode,$logger) = @_;
my $path = $old_value;
my $result = 0;
if (defined $new_value and length($new_value) > 0) {
$new_value = fixdirpath($new_value);
if (-d $new_value) {
$path = $new_value;
$result = 1;
} else {
if ($create) {
if (makepath($new_mailfile_path,$fileerrorcode,$logger)) {
$mailfile_path = $new_mailfile_path;
} else {
$result = 0;
if (makepath($new_value,$fileerrorcode,$logger)) {
$path = $new_value;
$result = 1;
}
} else {
$result = 0;
if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') {
&$fileerrorcode("mailfile path '$new_mailfile_path' does not exist",$logger);
&$fileerrorcode("path '$new_value' does not exist",$logger);
}
}
}
} else {
$result = 0;
if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') {
&$fileerrorcode("empty working path",$logger);
&$fileerrorcode("empty path",$logger);
}
}
#print "working path result: " . $result;
return $result;
}
sub get_applicationpath {
return dirname(abs_path(__FILE__)) . '/';
return ($result,$path);
}
1;

@ -34,6 +34,8 @@ require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
load_config
parse_regexp
split_tuple
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
);
@ -80,20 +82,14 @@ sub load_config {
}
if ($is_settings) {
my $result = &$process_code($data,$configfile,
\&split_tuple,
\&format_number,
\&configurationinfo,
\&configurationwarn,
\&configurationerror,
\&fileerror,
getlogger(__PACKAGE__));
my $result = &$process_code($data,$configfile);
configurationinfo('settings file ' . $configfile . ' loaded',getlogger(__PACKAGE__));
return $result;
} else {
my $result = update_mainconfig($data,$configfile,
\&split_tuple,
\&format_number,
\&parse_regexp,
\&configurationinfo,
\&configurationwarn,
\&configurationerror,
@ -108,9 +104,8 @@ sub load_config {
sub _splashinfo {
configurationinfo($system_name . ' ' . $system_version . ' (' . $system_instance_label . ') [' . $local_fqdn . ']',getlogger(__PACKAGE__));
configurationinfo('application path ' . $application_path,getlogger(__PACKAGE__));
configurationinfo('working path ' . $working_path,getlogger(__PACKAGE__));
#configurationinfo('executable path ' . $executable_path,getlogger(__PACKAGE__));
configurationinfo('application path: ' . $application_path,getlogger(__PACKAGE__));
configurationinfo('working path: ' . $working_path,getlogger(__PACKAGE__));
configurationinfo($cpucount . ' cpu(s), multithreading ' . ($enablemultithreading ? 'enabled' : 'disabled'),getlogger(__PACKAGE__));
}
@ -139,6 +134,24 @@ sub split_tuple {
}
sub parse_regexp {
my ($token,$file) = @_;
my $regexp = undef;
my $result = 1;
if (defined $token and length($token) > 0) {
eval {
$regexp = qr/$token/;
};
if ($@ or !defined $regexp) {
configurationerror($file,'invalid pattern: ' . $@,getlogger(__PACKAGE__));
$result = 0;
}
}
return ($result,$regexp);
}
#sub parse_float {
#
# my ($value) = @_;

@ -7,7 +7,7 @@ use NGCP::BulkProcessor::Globals qw(
$system_version
$erroremailrecipient
$warnemailrecipient
$successemailrecipient
$doneemailrecipient
$completionemailrecipient
$appstartsecs
$root_threadid
@ -84,7 +84,7 @@ our @EXPORT_OK = qw(
dbclustererror
dbclusterwarn
success
done
completion
scripterror
@ -93,15 +93,15 @@ our @EXPORT_OK = qw(
my $erroremailsubject = 'error: module ';
my $warnemailsubject = 'warning: module ';
my $successmailsubject = 'success: module ';
my $donemailsubject = 'done: module ';
my $completionmailsubject = 'completed: module ';
sub success {
sub done {
my ($message,$attachments,$logger) = @_;
if (length($message) == 0) {
$message = 'success';
$message = 'done';
}
my $appexitsecs = Time::HiRes::time();
@ -113,16 +113,16 @@ sub success {
}
if (threadid() == $root_threadid) {
if (length($successemailrecipient) > 0 and defined $logger) {
if (length($doneemailrecipient) > 0 and defined $logger) {
my $email = {
to => $successemailrecipient,
to => $doneemailrecipient,
#cc => 'rkrenn@sipwise.com',
#bcc => '',
#return_path => undef,
priority => $lowpriority,
#sender_name => 'Rene K.',
#from => 'rkrenn@sipwise.com',
subject => $successmailsubject . $logger->{category},
subject => $donemailsubject . $logger->{category},
body => getscriptpath() . ":\n\n" . wrap_mailbody($message) . "\n\n" . $signature,
guid => create_guid()
};
@ -290,7 +290,7 @@ sub dbwarn {
}
#die();
warning($message, $logger, 1);
warning($message, $logger);
}
@ -318,7 +318,7 @@ sub restwarn {
}
#die();
warning($message, $logger, 1);
warning($message, $logger);
}
@ -385,7 +385,7 @@ sub dbclusterwarn {
}
#die();
warning($message, $logger, 1);
warning($message, $logger);
}
@ -482,7 +482,7 @@ sub fileerror {
sub processzerofilesize {
my ($file,$logger) = @_;
my $message = basename($file) . ' has 0 bytes';
my $message = basename($file) . ' ' . (-e $file ? 'has 0 bytes' : 'not found');
if (defined $logger) {
$logger->error($message);
}
@ -560,7 +560,7 @@ sub filewarn {
}
#die();
warning($message, $logger, 1);
warning($message, $logger);
}
@ -571,7 +571,7 @@ sub xls2csvwarn {
$logger->warn($message);
}
warning($message, $logger, 1);
warning($message, $logger);
}
sub webarchivexls2csvwarn {
@ -581,7 +581,7 @@ sub webarchivexls2csvwarn {
$logger->warn($message);
}
warning($message, $logger, 1);
warning($message, $logger);
}
#sub parameterdefinedtwice {
@ -590,7 +590,7 @@ sub webarchivexls2csvwarn {
# if (defined $logger) {
# $logger->warn($message);
# }
# warning($message, $logger, 1);
# warning($message, $logger);
#}
sub emailwarn {
@ -669,7 +669,7 @@ sub servicewarn {
}
#die();
warning($message, $logger, 1);
warning($message, $logger);
}

@ -79,6 +79,7 @@ our @EXPORT_OK = qw(
fileprocessingdone
fetching_lines
processing_lines
processing_info
tablefixed
servicedebug
@ -624,6 +625,15 @@ sub processing_lines {
}
sub processing_info {
my ($tid, $message, $logger) = @_;
if (defined $logger) {
$logger->info(($enablemultithreading ? '[' . $tid . '] ' : '') . $message);
}
}
#sub mainconfigurationloaded {
#
# my ($configfile,$logger) = @_;

@ -22,6 +22,8 @@ use NGCP::BulkProcessor::SqlRecord qw(
insert_stmt
);
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
@ -30,10 +32,8 @@ our @EXPORT_OK = qw(
check_table
getinsertstatement
test_table_bycolumn1
test_table_local_select
test_table_source_select
test_table_source_select_temptable
findby_subscribernumber
countby_subscribernumber
);
my $tablename = 'feature_option';
@ -76,6 +76,43 @@ sub create_table {
}
sub findby_subscribernumber {
my ($subscribernumber,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('subscribernumber') . ' = ?'
,$subscribernumber);
return buildrecords_fromrows($rows,$load_recursive);
}
sub countby_subscribernumber {
my ($subscribernumber) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
if (defined $subscribernumber) {
$stmt .= ' WHERE ' . $db->columnidentifier('subscribernumber') . ' = ?';
push(@params,$subscribernumber);
}
return $db->db_get_value($stmt,@params);
}
sub buildrecords_fromrows {
@ -89,6 +126,13 @@ sub buildrecords_fromrows {
$record = __PACKAGE__->new($row);
# transformations go here ...
if ($load_recursive) {
$record->{_optionsetitems} = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::findby_subscribernumber_option(
$record->{subscribernumber},
$record->{option},
$load_recursive
);
}
push @records,$record;
}
@ -100,8 +144,9 @@ sub buildrecords_fromrows {
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename);
return insert_stmt($get_db,$tablename,$insert_ignore);
}

@ -1,4 +1,4 @@
package NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet;
package NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem;
use strict;
## no critic
@ -30,13 +30,11 @@ our @EXPORT_OK = qw(
check_table
getinsertstatement
test_table_bycolumn1
test_table_local_select
test_table_source_select
test_table_source_select_temptable
findby_subscribernumber_option
countby_subscribernumber_option
);
my $tablename = 'feature_option_set';
my $tablename = 'feature_option_set_item';
my $get_db = \&get_import_db;
#my $get_tablename = \&import_db_tableidentifier;
@ -77,6 +75,48 @@ sub create_table {
}
sub findby_subscribernumber_option {
my ($subscribernumber,$option,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('subscribernumber') . ' = ? ' .
' AND ' . $db->columnidentifier('option') . ' = ?'
,$subscribernumber,$option);
return buildrecords_fromrows($rows,$load_recursive);
}
sub countby_subscribernumber_option {
my ($subscribernumber,$option) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
if (defined $subscribernumber) {
$stmt .= ' WHERE ' . $db->columnidentifier('subscribernumber') . ' = ?';
push(@params,$subscribernumber);
if (defined $option) {
$stmt .= ' AND ' . $db->columnidentifier('option') . ' = ?';
push(@params,$option);
}
}
return $db->db_get_value($stmt,@params);
}
sub buildrecords_fromrows {
@ -101,8 +141,9 @@ sub buildrecords_fromrows {
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename);
return insert_stmt($get_db,$tablename,$insert_ignore);
}

@ -0,0 +1,179 @@
package NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp;
use strict;
## no critic
#use File::Basename;
#use Cwd;
#use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../');
use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw(
get_import_db
);
#import_db_tableidentifier
use NGCP::BulkProcessor::SqlRecord qw(
registertableinfo
create_targettable
checktableinfo
copy_row
insert_stmt
);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
create_table
gettablename
check_table
getinsertstatement
findby_lrncode_portednumber
countby_lrncode_portednumber
count_lrncodes
);
my $tablename = 'lnp';
my $get_db = \&get_import_db;
#my $get_tablename = \&import_db_tableidentifier;
my $expected_fieldnames = [
'ported_number',
'type',
'lrn_code',
];
my $primarykey_fieldnames = [ 'lrn_code', 'ported_number' ];
my $indexes = {};
my $fixtable_statements = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($get_db,
$tablename,
$expected_fieldnames,$indexes);
bless($self,$class);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub create_table {
my ($truncate) = @_;
my $db = &$get_db();
registertableinfo($db,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,$tablename,$db,$tablename,$truncate,0,undef);
}
sub findby_lrncode_portednumber {
my ($lrncode,$portednumber,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('lrn_code') . ' = ? ' .
' AND ' . $db->columnidentifier('ported_number') . ' = ?'
,$lrncode,$portednumber);
return buildrecords_fromrows($rows,$load_recursive);
}
sub countby_lrncode_portednumber {
my ($lrncode,$portednumber) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
if (defined $lrncode) {
$stmt .= ' WHERE ' . $db->columnidentifier('lrn_code') . ' = ?';
push(@params,$lrncode);
if (defined $portednumber) {
$stmt .= ' AND ' . $db->columnidentifier('ported_number') . ' = ?';
push(@params,$portednumber);
}
}
return $db->db_get_value($stmt,@params);
}
sub count_lrncodes {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return $db->db_get_value('SELECT COUNT(DISTINCT ' .
$db->columnidentifier('lrn_code') . ') FROM ' . $table);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename,$insert_ignore);
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -30,10 +30,8 @@ our @EXPORT_OK = qw(
check_table
getinsertstatement
test_table_bycolumn1
test_table_local_select
test_table_source_select
test_table_source_select_temptable
findby_subscribernumber
countby_subscribernumber
);
my $tablename = 'subscriber';
@ -86,6 +84,50 @@ sub create_table {
}
sub findby_subscribernumber {
my ($subscribernumber,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless defined $subscribernumber;
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('country_code') . ' = ?' .
' AND ' . $db->columnidentifier('area_code') . ' = ?' .
' AND ' . $db->columnidentifier('dial_number') . ' = ?'
,split_subscribernumber($subscribernumber));
return buildrecords_fromrows($rows,$load_recursive);
}
sub countby_subscribernumber {
my ($subscribernumber) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
if (defined $subscribernumber) {
$stmt .= ' WHERE ' .
$db->columnidentifier('country_code') . ' = ?' .
' AND ' . $db->columnidentifier('area_code') . ' = ?' .
' AND ' . $db->columnidentifier('dial_number') . ' = ?';
push(@params,split_subscribernumber($subscribernumber));
}
return $db->db_get_value($stmt,@params);
}
sub buildrecords_fromrows {
@ -99,6 +141,12 @@ sub buildrecords_fromrows {
$record = __PACKAGE__->new($row);
# transformations go here ...
if ($load_recursive) {
$record->{_features} = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::findby_subscribernumber(
$record->subscribernumber(),
$load_recursive
);
}
push @records,$record;
}
@ -110,8 +158,9 @@ sub buildrecords_fromrows {
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,$tablename);
return insert_stmt($get_db,$tablename,$insert_ignore);
}
@ -130,4 +179,17 @@ sub check_table {
}
sub subscribernumber {
my $self = shift;
return $self->{dial_number}; #$self->{country_code} . $self->{dial_number};
}
sub split_subscribernumber {
my ($subscribernumber) = @_;
my $country_code = substr($subscribernumber,0,3);
my $area_code = 'None';
my $dial_number = $subscribernumber; #substr($subscribernumber,3);
return ($country_code,$area_code,$dial_number);
}
1;

@ -81,6 +81,7 @@ sub extractfields {
my ($context,$line_ref) = @_;
return undef if length($$line_ref) == 0;
#return undef if $$line_ref =~ /^#/;
if ($context->{instance}->{parselines}) {
my $row = undef;
@ -101,4 +102,16 @@ sub extractfields {
}
sub stoponparseerrors {
my $self = shift;
$self->{stoponparseerrors} = shift if @_;
return $self->{stoponparseerrors};
}
sub parselines {
my $self = shift;
$self->{parselines} = shift if @_;
return $self->{parselines};
}
1;

@ -0,0 +1,68 @@
package NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::LnpDefineFile;
use strict;
## no critic
#use File::Basename;
#use Cwd;
#use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../');
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::LogError qw(
fileprocessingerror
fileprocessingwarn
);
use NGCP::BulkProcessor::FileProcessor;
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor);
our @EXPORT_OK = qw();
my $lineseparator = '\\n';
my $fieldseparator = " +";
my $encoding = 'UTF-8';
my $buffersize = 100 * 1024;
my $threadqueuelength = 10;
my $default_numofthreads = 3;
#my $multithreading = 0;
my $blocksize = 500;
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::FileProcessor->new(@_);
$self->{numofthreads} = shift // $default_numofthreads;
$self->{line_separator} = $lineseparator;
$self->{field_separator} = $fieldseparator;
$self->{encoding} = $encoding;
$self->{buffersize} = $buffersize;
$self->{threadqueuelength} = $threadqueuelength;
#$self->{multithreading} = $multithreading;
$self->{blocksize} = $blocksize;
bless($self,$class);
#restdebug($self,__PACKAGE__ . ' file processor created',getlogger(__PACKAGE__));
return $self;
}
sub extractfields {
my ($context,$line_ref) = @_;
my $separator = $context->{instance}->{field_separator};
$$line_ref =~ s/^ +//;
$$line_ref =~ s/ +$//;
return undef if length($$line_ref) == 0;
return undef if $$line_ref =~ /^#/;
my @fields = split(/$separator/,$$line_ref,-1);
return \@fields;
}
1;

@ -29,7 +29,7 @@ my $buffersize = 100 * 1024;
my $threadqueuelength = 10;
my $default_numofthreads = 3;
#my $multithreading = 0;
my $blocksize = 100;
my $blocksize = 500;
sub new {
@ -59,6 +59,8 @@ sub extractfields {
my $separator = $context->{instance}->{field_separator};
$$line_ref =~ s/^ +//;
$$line_ref =~ s/ +$//;
return undef if length($$line_ref) == 0;
return undef if $$line_ref =~ /^#/;
my @fields = split(/$separator/,$$line_ref,-1);
return \@fields;
}

@ -14,11 +14,19 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw(
$import_multithreading
$features_define_import_numofthreads
$skip_duplicate_setoptionitems
$ignore_options_unique
$ignore_setoptionitems_unique
$subscriber_define_import_numofthreads
$subscribernumer_exclude_pattern
$subscribernumer_exclude_exception_pattern
$ignore_subscriber_unique
$lnp_define_import_numofthreads
$ignore_lnp_unique
$dry
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
);
use NGCP::BulkProcessor::LogError qw(
fileprocessingwarn
@ -28,6 +36,7 @@ use NGCP::BulkProcessor::LogError qw(
use NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::FeaturesDefineFile qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::FeaturesDefineParser qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::SubscriberDefineFile qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::LnpDefineFile qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw(
get_import_db
@ -35,8 +44,9 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw
);
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp qw();
use NGCP::BulkProcessor::Array qw(removeduplicates);
@ -45,43 +55,52 @@ our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
import_features_define
import_subscriber_define
import_lnp_define
);
sub import_features_define {
my ($file) = @_;
# create tables:
my $result = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::create_table(1);
$result &= NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet::create_table(1);
destroy_dbs(); #close all db connections before forking..
$result &= NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::create_table(1);
# checks, e.g. other table must be present:
# ..none..
# prepare parse:
my $importer = NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::FeaturesDefineFile->new($features_define_import_numofthreads);
$importer->{stoponparseerrors} = !$dry;
$importer->stoponparseerrors(!$dry);
# launch:
destroy_dbs(); #close all db connections before forking..
return $result && $importer->process($file,sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @featureoption_rows = ();
my @featureoptionset_rows = ();
my @featureoptionsetitem_rows = ();
foreach my $line (@$rows) {
$rownum++;
my $row = undef;
if (not $importer->{parselines}) {
if (not $importer->parselines()) {
eval {
$row = NGCP::BulkProcessor::Projects::Migration::IPGallery::FeaturesDefineParser::parse(\$line,$context->{grammar});
};
if ($@) {
if ($importer->{stoponparseerrors}) {
fileprocessingerror($context->{filename},'record ' . ($rownum + 1) . ' - ' . $@,getlogger(__PACKAGE__));
if ($importer->stoponparseerrors()) {
fileprocessingerror($context->{filename},'record ' . $rownum . ' - ' . $@,getlogger(__PACKAGE__));
} else {
fileprocessingwarn($context->{filename},'record ' . ($rownum + 1) . ' - ' . $@,getlogger(__PACKAGE__));
fileprocessingwarn($context->{filename},'record ' . $rownum . ' - ' . $@,getlogger(__PACKAGE__));
}
}
}
next unless defined $row;
$rownum++;
foreach my $subscriber_number (keys %$row) {
foreach my $option (@{$row->{$subscriber_number}}) {
if ('HASH' eq ref $option) {
foreach my $setoption (keys %$option) {
foreach my $setoptionitem (@{$skip_duplicate_setoptionitems ? removeduplicates($option->{$setoption}) : $option->{$setoption}}) {
push(@featureoptionset_rows,[ $subscriber_number, $setoption, $setoptionitem ]);
push(@featureoptionsetitem_rows,[ $subscriber_number, $setoption, $setoptionitem ]);
}
push(@featureoption_rows,[ $subscriber_number, $setoption ]);
}
@ -93,27 +112,23 @@ sub import_features_define {
}
if ((scalar @featureoption_rows) > 0) {
$context->{db}->db_do_begin(
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::getinsertstatement(),
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::gettablename(),
#lock - $import_multithreading
);
$context->{db}->db_do_rowblock(\@featureoption_rows);
$context->{db}->db_finish();
if ($dry) {
eval { _insert_featureoption_rows($context,\@featureoption_rows); };
} else {
_insert_featureoption_rows($context,\@featureoption_rows);
}
}
if ((scalar @featureoptionset_rows) > 0) {
$context->{db}->db_do_begin(
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet::getinsertstatement(),
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSet::gettablename(),
#lock
);
$context->{db}->db_do_rowblock(\@featureoptionset_rows);
$context->{db}->db_finish();
if ((scalar @featureoptionsetitem_rows) > 0) {
if ($dry) {
eval { _insert_featureoptionsetitem_rows($context,\@featureoptionsetitem_rows); };
} else {
_insert_featureoptionsetitem_rows($context,\@featureoptionsetitem_rows);
}
}
return 1;
}, sub {
my ($context)= @_;
if (not $importer->{parselines}) {
if (not $importer->parselines()) {
eval {
$context->{grammar} = NGCP::BulkProcessor::Projects::Migration::IPGallery::FeaturesDefineParser::create_grammar();
};
@ -130,23 +145,147 @@ sub import_features_define {
}
sub _insert_featureoption_rows {
my ($context,$featureoption_rows) = @_;
$context->{db}->db_do_begin(
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::getinsertstatement($ignore_options_unique),
#NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::gettablename(),
#lock - $import_multithreading
);
$context->{db}->db_do_rowblock($featureoption_rows);
$context->{db}->db_finish();
}
sub _insert_featureoptionsetitem_rows {
my ($context,$featureoptionsetitem_rows) = @_;
$context->{db}->db_do_begin(
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::getinsertstatement($ignore_setoptionitems_unique),
#NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::gettablename(),
#lock
);
$context->{db}->db_do_rowblock($featureoptionsetitem_rows);
$context->{db}->db_finish();
}
sub import_subscriber_define {
my ($file) = @_;
my $result = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::create_table(1);
$result &= _import_subscriber_define_checks($file);
my $importer = NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::SubscriberDefineFile->new($subscriber_define_import_numofthreads);
destroy_dbs(); #close all db connections before forking..
return $result && $importer->process($file,sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @subscriber_rows = ();
foreach my $row (@$rows) {
$rownum++;
my $record = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber->new($row);
next if 'None' eq $record->{rgw_fqdn};
if ($record->{dial_number} =~ $subscribernumer_exclude_pattern) {
if ($record->{dial_number} =~ $subscribernumer_exclude_exception_pattern) {
processing_info($context->{tid},'record ' . $rownum . ' - exclude exception pattern match: ' . $record->{dial_number},getlogger(__PACKAGE__));
push(@subscriber_rows,$row) if _import_subscriber_define_referential_checks($context,$record,$rownum);
} else {
processing_info($context->{tid},'record ' . $rownum . ' - skipped, exclude pattern match: ' . $record->{dial_number},getlogger(__PACKAGE__));
}
} else {
push(@subscriber_rows,$row) if _import_subscriber_define_referential_checks($context,$record,$rownum);
}
}
if ((scalar @$rows) > 0) {
$context->{db}->db_do_begin(
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::getinsertstatement(),
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::gettablename(),
#lock - $import_multithreading
);
$context->{db}->db_do_rowblock($rows);
$context->{db}->db_finish();
if ($dry) {
eval { _insert_subscriber_rows($context,$rows); };
} else {
_insert_subscriber_rows($context,$rows);
}
}
return 1;
}, sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
}, sub {
my ($context)= @_;
undef $context->{db};
destroy_dbs();
}, $import_multithreading);
}
sub _import_subscriber_define_referential_checks {
my ($context,$record,$rownum) = @_;
my $result = 0;
if (NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::countby_subscribernumber($record->subscribernumber()) > 0) {
$result = 1;
} else {
if ($dry) {
fileprocessingwarn($context->{filename},'record ' . $rownum . ' - no features records for subscriber found: ' . $record->{dial_number},getlogger(__PACKAGE__));
} else {
fileprocessingerror($context->{filename},'record ' . $rownum . ' - no features records for subscriber found: ' . $record->{dial_number},getlogger(__PACKAGE__));
}
}
return $result;
}
sub _import_subscriber_define_checks {
my ($file) = @_;
my $result = 1;
my $optioncount = 0;
eval {
$optioncount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::countby_subscribernumber();
};
if ($@ or $optioncount == 0) {
fileprocessingerror($file,'please import subscriber features first',getlogger(__PACKAGE__));
$result = 0; #even in dry mode..
}
return $result;
}
sub _insert_subscriber_rows {
my ($context,$subscriber_rows) = @_;
$context->{db}->db_do_begin(
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::getinsertstatement($ignore_subscriber_unique),
#NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::gettablename(),
#lock
);
$context->{db}->db_do_rowblock($subscriber_rows);
$context->{db}->db_finish();
}
sub import_lnp_define {
my ($file) = @_;
my $result = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::create_table(1);
my $importer = NGCP::BulkProcessor::Projects::Migration::IPGallery::FileProcessors::LnpDefineFile->new($lnp_define_import_numofthreads);
destroy_dbs(); #close all db connections before forking..
return $result && $importer->process($file,sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @lnp_rows = ();
foreach my $row (@$rows) {
$rownum++;
next if $row->[2] eq 'In';
$row->[3] = substr($row->[3],0,4);
shift @$row; #ignore first col
push(@lnp_rows,$row);
}
if ((scalar @lnp_rows) > 0) {
if ($dry) {
eval { _insert_lnp_rows($context,\@lnp_rows); };
} else {
_insert_lnp_rows($context,\@lnp_rows);
}
}
return 1;
@ -161,4 +300,15 @@ sub import_subscriber_define {
}
sub _insert_lnp_rows {
my ($context,$lnp_rows) = @_;
$context->{db}->db_do_begin(
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::getinsertstatement($ignore_lnp_unique),
#NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::gettablename(),
#lock
);
$context->{db}->db_do_rowblock($lnp_rows);
$context->{db}->db_finish();
}
1;

@ -8,105 +8,175 @@ use strict;
#use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../');
use NGCP::BulkProcessor::Globals qw(
update_working_path
$input_path
$working_path
$enablemultithreading
$cpucount
create_path
);
use NGCP::BulkProcessor::Logging qw(
getlogger
scriptinfo
configurationinfo
);
use NGCP::BulkProcessor::LogError qw(
fileerror
configurationwarn
configurationerror
);
use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(format_number);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
check_dry
$input_path
$output_path
$rollback_path
$defaultsettings
$defaultconfig
$features_define_filename
$features_define_import_numofthreads
$skip_duplicate_setoptionitems
$subscriber_define_filename
$subscriber_define_import_numofthreads
$import_multithreading
$run_id
$dry
$force
$import_db_file
check_dry
$features_define_filename
$features_define_import_numofthreads
$skip_duplicate_setoptionitems
$ignore_options_unique
$ignore_setoptionitems_unique
$subscriber_define_filename
$subscriber_define_import_numofthreads
$subscribernumer_exclude_pattern
$subscribernumer_exclude_exception_pattern
$ignore_subscriber_unique
$lnp_define_filename
$lnp_define_import_numofthreads
$ignore_lnp_unique
);
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $features_define_filename = undef;
our $subscriber_define_filename = undef;
our $input_path = $working_path . 'input/';
our $output_path = $working_path . 'output/';
our $rollback_path = $working_path . 'rollback/';
our $force = 0;
our $dry = 0;
our $run_id = '';
our $import_db_file = _get_import_db_file($run_id,'import');
our $import_multithreading = $enablemultithreading;
our $features_define_import_numofthreads = $cpucount;
our $subscriber_define_import_numofthreads = $cpucount;
our $features_define_filename = undef;
our $features_define_import_numofthreads = $cpucount;
our $skip_duplicate_setoptionitems = 0;
our $ignore_options_unique = 0;
our $ignore_setoptionitems_unique = 0;
our $force = 0;
our $dry = 1;
our $run_id = '';
our $subscriber_define_filename = undef;
our $subscriber_define_import_numofthreads = $cpucount;
our $subscribernumer_exclude_pattern = undef;
our $subscribernumer_exclude_exception_pattern = undef;
our $ignore_subscriber_unique = 0;
our $import_db_file = ((defined $run_id and length($run_id) > 0) ? '_' : '') . 'import';
our $lnp_define_filename = undef;
our $lnp_define_import_numofthreads = $cpucount;
our $ignore_lnp_unique = 1;
sub update_settings {
my ($data,$configfile,
$split_tuplecode,
$format_number,
$configurationinfocode,
$configurationwarncode,
$configurationerrorcode,
$fileerrorcode,
$configlogger) = @_;
my ($data,$configfile) = @_;
if (defined $data) {
#print "$configlogger narf";
my $result = 1;
#&$configurationinfocode("testinfomessage",$configlogger);
$features_define_filename = $data->{features_define_filename} if exists $data->{features_define_filename};
if (defined $features_define_filename and length($features_define_filename) > 0) {
$features_define_filename = $input_path . $features_define_filename unless -e $features_define_filename;
}
$subscriber_define_filename = $data->{subscriber_define_filename} if exists $data->{subscriber_define_filename};
if (defined $subscriber_define_filename and length($subscriber_define_filename) > 0) {
$subscriber_define_filename = $input_path . $subscriber_define_filename unless -e $subscriber_define_filename;
}
$result &= _prepare_working_paths(1);
$dry = $data->{dry} if exists $data->{dry};
$import_db_file = _get_import_db_file($run_id,'import');
$import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading};
#my $new_working_path = (exists $data->{working_path} ? $data->{working_path} : $working_path);
$features_define_import_numofthreads = $cpucount;
$features_define_import_numofthreads = $data->{features_define_import_numofthreads} if exists $data->{features_define_import_numofthreads};
$features_define_import_numofthreads = $cpucount if $features_define_import_numofthreads > $cpucount;
$features_define_filename = _get_import_filename($features_define_filename,$data,'features_define_filename');
$features_define_import_numofthreads =_get_import_numofthreads($cpucount,$data,'features_define_import_numofthreads');
$subscriber_define_import_numofthreads = $cpucount;
$subscriber_define_import_numofthreads = $data->{subscriber_define_import_numofthreads} if exists $data->{subscriber_define_import_numofthreads};
$subscriber_define_import_numofthreads = $cpucount if $subscriber_define_import_numofthreads > $cpucount;
#return update_working_path($new_working_path,1,$fileerrorcode,$configlogger);
$subscriber_define_filename = _get_import_filename($subscriber_define_filename,$data,'subscriber_define_filename');
$subscriber_define_import_numofthreads = _get_import_numofthreads($cpucount,$data,'subscriber_define_import_numofthreads');
$import_db_file = ((defined $run_id and length($run_id) > 0) ? '_' : '') . 'import';
$subscribernumer_exclude_pattern = $data->{subscribernumer_exclude_pattern} if exists $data->{subscribernumer_exclude_pattern};
(my $regexp_result,$subscribernumer_exclude_pattern) = parse_regexp($subscribernumer_exclude_pattern,$configfile);
$result &= $regexp_result;
$subscribernumer_exclude_exception_pattern = $data->{subscribernumer_exclude_exception_pattern} if exists $data->{subscribernumer_exclude_exception_pattern};
(my $regexp_result,$subscribernumer_exclude_exception_pattern) = parse_regexp($subscribernumer_exclude_exception_pattern,$configfile);
$result &= $regexp_result;
$dry = $data->{dry} if exists $data->{dry};
$lnp_define_filename = _get_import_filename($lnp_define_filename,$data,'lnp_define_filename');
$lnp_define_import_numofthreads= _get_import_numofthreads($cpucount,$data,'lnp_define_import_numofthreads');
return 1;
return $result;
}
return 0;
}
sub _prepare_working_paths {
my ($create) = @_;
my $result = 1;
my $path_result;
($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
($path_result,$rollback_path) = create_path($working_path . 'rollback',$rollback_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
return $result;
}
sub _get_import_numofthreads {
my ($default_value,$data,$key) = @_;
my $import_numofthreads = $default_value;
$import_numofthreads = $data->{$key} if exists $data->{$key};
$import_numofthreads = $cpucount if $import_numofthreads > $cpucount;
return $import_numofthreads;
}
sub _get_import_db_file {
my ($run,$name) = @_;
return ((defined $run and length($run) > 0) ? '_' : '') . $name;
}
sub _get_import_filename {
my ($old_value,$data,$key) = @_;
my $import_filename = $old_value;
$import_filename = $data->{$key} if exists $data->{$key};
if (defined $import_filename and length($import_filename) > 0) {
$import_filename = $input_path . $import_filename unless -e $import_filename;
}
return $import_filename;
}
sub check_dry {
if ($dry) {

@ -31,7 +31,7 @@ emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
successemailrecipient =
doneemailrecipient =
##logging:
fileloglevel = OFF

@ -11,27 +11,34 @@ use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw(
$defaultsettings
$defaultconfig
update_settings
check_dry
$output_path
$rollback_path
$defaultsettings
$defaultconfig
$dry
$force
$run_id
$features_define_filename
$subscriber_define_filename
$dry
$force
$lnp_define_filename
);
use NGCP::BulkProcessor::Logging qw(
init_log
getlogger
$attachmentlogfile
scriptinfo
cleanuplogfiles
$currentlogfile
);
use NGCP::BulkProcessor::LogError qw (
completion
success
done
scriptwarn
scripterror
filewarn
fileerror
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
@ -39,14 +46,17 @@ use NGCP::BulkProcessor::LoadConfig qw(
$YAML_CONFIG_TYPE
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
wrap_mailbody
$signature
$normalpriority
$lowpriority
$highpriority
);
use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs);
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
use NGCP::BulkProcessor::ConnectorPool qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw();
@ -54,17 +64,30 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool qw
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Import qw(
import_features_define
import_subscriber_define
import_lnp_define
);
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber qw();
use NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp qw();
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
my $tasks = [];
my $import_features_define_task_opt = 'import_features_define';
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
my $cleanup_all_task_opt = 'cleanup_all';
push(@TASK_OPTS,$cleanup_all_task_opt);
my $import_features_define_task_opt = 'import_features';
push(@TASK_OPTS,$import_features_define_task_opt);
my $import_subscriber_define_task_opt = 'import_subscriber_define';
my $import_subscriber_define_task_opt = 'import_subscriber';
push(@TASK_OPTS,$import_subscriber_define_task_opt);
my $import_lnp_define_task_opt = 'import_lnp';
push(@TASK_OPTS,$import_lnp_define_task_opt);
if (init()) {
main();
@ -78,13 +101,14 @@ sub init {
my $configfile = $defaultconfig;
my $settingsfile = $defaultsettings;
GetOptions ("config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
"run=s" => \$run_id,
"dry" => \$dry,
"force" => \$force,
) or scripterror('error in command line arguments',getlogger(getscriptpath()));
return 0 unless GetOptions(
"config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
"run=s" => \$run_id,
"dry" => \$dry,
"force" => \$force,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
@ -100,22 +124,27 @@ sub main() {
my @messages = ();
my @attachmentfiles = ();
my $result = 0;
my $result = 1;
my $completion = 0;
if ('ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
foreach my $task (@$tasks) {
if (lc($import_features_define_task_opt) eq lc($task)) {
scriptinfo('task: ' . $import_features_define_task_opt,getlogger(getscriptpath()));
$result |= import_features_define_task(\@messages);
if (lc($cleanup_task_opt) eq lc($task)) {
$result = cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result);
} elsif (lc($cleanup_all_task_opt) eq lc($task)) {
$result = cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result);
} elsif (lc($import_features_define_task_opt) eq lc($task)) {
$result = import_features_define_task(\@messages) if taskinfo($import_features_define_task_opt,$result);
} elsif (lc($import_subscriber_define_task_opt) eq lc($task)) {
scriptinfo('task: ' . $import_subscriber_define_task_opt,getlogger(getscriptpath()));
$result |= import_subscriber_define_task(\@messages);
$result = import_subscriber_define_task(\@messages) if taskinfo($import_subscriber_define_task_opt,$result);
} elsif (lc($import_lnp_define_task_opt) eq lc($task)) {
$result = import_lnp_define_task(\@messages) if taskinfo($import_lnp_define_task_opt,$result);
} elsif (lc('blah') eq lc($task)) {
scriptinfo('task: ' . 'balh',getlogger(getscriptpath()));
next unless check_dry();
$result |= import_features_define_task(\@messages);
$completion |= 1;
if (taskinfo($cleanup_task_opt,$result)) {
next unless check_dry();
$result = import_features_define_task(\@messages);
$completion |= 1;
}
} else {
$result = 0;
scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
@ -128,60 +157,120 @@ sub main() {
}
push(@attachmentfiles,$attachmentlogfile);
if ($result) {
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
success(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
success(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
return $result;
}
sub cleanup_task {
sub taskinfo {
my ($task,$result) = @_;
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
return $result;
}
sub cleanup_task {
my ($messages,$clean_generated) = @_;
my $result = 0;
if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) {
eval {
cleanupcvsdirs() if $clean_generated;
cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
cleanupdir($rollback_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
}
if ($@ or !$result) {
push(@$messages,'working directory cleanup incomplete');
return 0;
} else {
push(@$messages,'working directory folders cleaned up');
return 1;
}
}
sub import_features_define_task {
my ($messages) = shift;
if (import_features_define(
$features_define_filename
)) {
push(@$messages,'sucessfully inserted x records...');
return 1;
my ($messages) = @_;
my $result = 0;
eval {
$result = import_features_define($features_define_filename);
};
my $err = $@;
my $stats = ' feature option: ' .
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOption::countby_subscribernumber() . ' rows';
$stats .= "\n feature set option items: " .
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::FeatureOptionSetItem::countby_subscribernumber_option() . ' rows';
if ($err or !$result) {
push(@$messages,"importing subscriber features incomplete\n$stats");
} else {
push(@$messages,'was not executed');
return 0;
push(@$messages,"importing subscriber features completed\n$stats");
}
destroy_dbs(); #every task should leave with closed connections.
return $result;
}
sub import_subscriber_define_task {
my ($messages) = shift;
if (import_subscriber_define(
$subscriber_define_filename
)) {
push(@$messages,'sucessfully inserted x records...');
return 1;
my ($messages) = @_;
my $result = 0;
eval {
$result = import_subscriber_define($subscriber_define_filename);
};
my $err = $@;
my $stats = ' subscriber: ' .
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Subscriber::countby_subscribernumber() . ' rows';
if ($err or !$result) {
push(@$messages,"importing subscribers incomplete\n$stats");
} else {
push(@$messages,'was not executed');
return 0;
push(@$messages,"importing subscribers completed\n$stats");
}
destroy_dbs(); #every task should leave with closed connections.
return $result;
}
END {
# this should not be required explicitly, but prevents Log4Perl's
# "rootlogger not initialized error upon exit..
sub import_lnp_define_task {
my ($messages) = @_;
my $result = 0;
eval {
$result = import_lnp_define($lnp_define_filename);
};
my $err = $@;
my $stats = ' lnp numbers: ' .
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::countby_lrncode_portednumber() . ' rows';
$stats .= "\n lrn codes: " .
NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::Lnp::count_lrncodes();
if ($err or !$result) {
push(@$messages,"importing lnp numbers incomplete\n$stats");
} else {
push(@$messages,"importing lnp numbers\n$stats");
}
destroy_dbs(); #every task should leave with closed connections.
return $result;
}
sub destroy_dbs() {
NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool::destroy_dbs();
NGCP::BulkProcessor::ConnectorPool::destroy_dbs();
}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..
# NGCP::BulkProcessor::Projects::Migration::IPGallery::ProjectConnectorPool::destroy_dbs();
# NGCP::BulkProcessor::ConnectorPool::destroy_dbs();
#}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!

@ -1,10 +1,14 @@
features_define_filename = /home/rkrenn/test/Features_Define.cfg
#dry=0
import_multithreading = 1
features_define_filename = /home/rkrenn/test/Features_Define.cfg
features_define_import_numofthreads = 2
subscriber_define_filename = /home/rkrenn/test/Subscriber_Define.cfg
subscriber_define_import_numofthreads = 2
subscribernumer_exclude_pattern = ^3562770
subscribernumer_exclude_exception_pattern = ^35627702770$
#dry=0
lnp_define_filename = /home/rkrenn/test/LNP_Define.cfg
lnp_define_import_numofthreads = 2

@ -177,6 +177,14 @@ sub paginate_sort_query {
}
sub insert_ignore_phrase {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub _force_numeric_column {
my $self = shift;
my $column = shift;
@ -440,6 +448,22 @@ sub _fetch_error {
}
#sub db_autocommit {
#
# my $self = shift;
# if (defined $self->{dbh}) {
# if (@_) {
# my ($autocommit) = @_;
# $autocommit = ($autocommit ? 1 : 0);
# dbdebug($self,'set AutoCommit ' . $self->{dbh}->{AutoCommit} . ' -> ' . $autocommit,getlogger(__PACKAGE__));
# $self->{dbh}->{AutoCommit} = $autocommit;
# }
# return $self->{dbh}->{AutoCommit};
# }
# return undef;
#
#}
# This method executes a SQL query that doesn't return any data. The
# query may contain placeholders, that will be replaced by the elements
# in @params during execute(). The method will die if any error occurs
@ -702,7 +726,8 @@ sub lock_tables {
my $self = shift;
my $tablestolock = shift;
$self->db_begin();
#$self->db_begin();
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
@ -710,7 +735,8 @@ sub unlock_tables {
my $self = shift;
$self->db_commit();
#$self->db_commit();
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
@ -718,18 +744,19 @@ sub db_do_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
my $lock = shift;
#my $tablename = shift;
my $transactional = shift;
#notimplementederror('db_do_begin',getlogger(__PACKAGE__));
if (defined $self->{dbh} and !defined $self->{sth} and length($tablename) > 0) {
if (defined $self->{dbh} and !defined $self->{sth}) { # and length($tablename) > 0) {
if ($lock) {
$self->lock_tables({ $tablename => 'WRITE' });
dbdebug($self,'db_do_begin: ' . $query,getlogger(__PACKAGE__));
if ($transactional) {
#$self->lock_tables({ $tablename => 'WRITE' });
$self->db_begin();
}
dbdebug($self,'db_do_begin: ' . $query,getlogger(__PACKAGE__));
$self->{sth} = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
$self->{query} = $query;
$self->{params} = [];
@ -765,17 +792,18 @@ sub db_get_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
my $lock = shift;
#my $tablename = shift;
my $transactional = shift;
if (defined $self->{dbh} and !defined $self->{sth} and length($tablename) > 0) {
if (defined $self->{dbh} and !defined $self->{sth}) { # and length($tablename) > 0) {
dbdebug($self,'db_get_begin: ' . $query . "\nparameters:\n" . join(', ', @_),getlogger(__PACKAGE__));
#eval { $self->lock_tables({ $tablename => 'WRITE' }); };
if ($lock) {
$self->lock_tables({ $tablename => 'WRITE' });
if ($transactional) {
#$self->lock_tables({ $tablename => 'WRITE' });
$self->db_begin();
}
dbdebug($self,'db_get_begin: ' . $query . "\nparameters:\n" . join(', ', @_),getlogger(__PACKAGE__));
$self->{sth} = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
$self->{sth}->execute(@_) or $self->_execute_error($query,$self->{sth},@_);
$self->{query} = $query;
@ -844,7 +872,7 @@ sub db_get_rowblock {
sub db_finish {
my $self = shift;
my $unlock = shift;
my $transactional = shift;
# since this is also called from DESTROY, no die() here!
@ -855,8 +883,9 @@ sub db_finish {
$self->{sth}->finish();
$self->{sth} = undef;
if ($unlock) {
$self->unlock_tables();
if ($transactional) {
#$self->unlock_tables();
$self->db_commit();
}
$self->{query} = undef;

@ -50,7 +50,8 @@ use HTML::PullParser qw();
use HTML::Entities qw(decode_entities);
use IO::Uncompress::Unzip qw(unzip $UnzipError);
use DateTime::Format::Excel;
# no debian package yet:
#use DateTime::Format::Excel;
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlConnector);
@ -60,10 +61,10 @@ our @EXPORT_OK = qw(
xlsxbin2csv
sanitize_column_name
sanitize_spreadsheet_name
excel_to_timestamp
excel_to_date
get_tableidentifier
$csvextension);
#excel_to_timestamp
#excel_to_date
our $csvextension = '.csv';
@ -80,8 +81,9 @@ my $LongTruncOk = 0;
#my $logger = getlogger(__PACKAGE__);
my $lock_do_chunk = 0;
my $lock_get_chunk = 0;
#my $lock_do_chunk = 0;
#my $lock_get_chunk = 0;
my $rowblock_transactional = 0;
my $invalid_excel_spreadsheet_chars_pattern = '[' . quotemeta('[]:*?/\\') . ']';
@ -97,23 +99,23 @@ sub sanitize_column_name {
return $column_name;
}
sub excel_to_date {
my $excel_date_value = shift;
if ($excel_date_value > 0) {
my $datetime = DateTime::Format::Excel->parse_datetime($excel_date_value);
return $datetime->ymd('-'); # prints 1992-02-28
}
return undef;
}
sub excel_to_timestamp {
my $excel_datetime_value = shift;
if ($excel_datetime_value > 0) {
my $datetime = DateTime::Format::Excel->parse_datetime($excel_datetime_value);
return $datetime->ymd('-') . ' ' . $datetime->hms(':');
}
return undef;
}
#sub excel_to_date {
# my $excel_date_value = shift;
# if ($excel_date_value > 0) {
# my $datetime = DateTime::Format::Excel->parse_datetime($excel_date_value);
# return $datetime->ymd('-'); # prints 1992-02-28
# }
# return undef;
#}
#sub excel_to_timestamp {
# my $excel_datetime_value = shift;
# if ($excel_datetime_value > 0) {
# my $datetime = DateTime::Format::Excel->parse_datetime($excel_datetime_value);
# return $datetime->ymd('-') . ' ' . $datetime->hms(':');
# }
# return undef;
#}
sub new {
@ -363,13 +365,14 @@ sub cleanupcvsdirs {
# filewarn('cannot remove ' . $dirpath . ': ' . $!,getlogger(__PACKAGE__));
#}
remove_tree($dirpath, {
keep_root => 0,
error => \my $err });
'keep_root' => 0,
'verbose' => 1,
'error' => \my $err });
if (@$err) {
for my $diag (@$err) {
my ($file, $message) = %$diag;
if ($file eq '') {
filewarn("general error: $message",getlogger(__PACKAGE__));
filewarn("cleanup: $message",getlogger(__PACKAGE__));
} else {
filewarn("problem unlinking $file: $message",getlogger(__PACKAGE__));
}
@ -557,9 +560,9 @@ sub db_do_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
$self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_);
$self->SUPER::db_do_begin($query,$rowblock_transactional,@_);
}
@ -567,9 +570,9 @@ sub db_get_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
$self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_);
$self->SUPER::db_get_begin($query,$rowblock_transactional,@_);
}
@ -577,7 +580,7 @@ sub db_finish {
my $self = shift;
$self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk);
$self->SUPER::db_finish($rowblock_transactional);
}

@ -50,8 +50,10 @@ my $LongTruncOk = 0;
#my $logger = getlogger(__PACKAGE__);
my $lock_do_chunk = 1;
my $lock_get_chunk = 0;
#my $lock_do_chunk = 0; #1;
#my $lock_get_chunk = 0;
my $rowblock_transactional = 1;
my $serialization_level = ''; #'SERIALIZABLE'
@ -430,6 +432,14 @@ sub multithreading_supported {
}
sub insert_ignore_phrase {
my $self = shift;
return 'IGNORE';
}
sub truncate_table {
my $self = shift;
@ -465,41 +475,41 @@ sub drop_table {
}
sub lock_tables {
my $self = shift;
my $tablestolock = shift;
if (defined $self->{dbh} and defined $tablestolock and ref $tablestolock eq 'HASH') {
my $locks = join(', ',map { local $_ = $_; $_ = $self->tableidentifier($_) . ' ' . $tablestolock->{$_}; $_; } keys %$tablestolock);
dbdebug($self,"lock_tables:\n" . $locks,getlogger(__PACKAGE__));
$self->db_do('LOCK TABLES ' . $locks);
}
}
sub unlock_tables {
my $self = shift;
if (defined $self->{dbh}) {
dbdebug($self,'unlock_tables',getlogger(__PACKAGE__));
$self->db_do('UNLOCK TABLES');
}
}
# too dangerous:
#sub lock_tables {
#
# my $self = shift;
# my $tablestolock = shift;
#
# if (defined $self->{dbh} and defined $tablestolock and ref $tablestolock eq 'HASH') {
#
# my $locks = join(', ',map { local $_ = $_; $_ = $self->tableidentifier($_) . ' ' . $tablestolock->{$_}; $_; } keys %$tablestolock);
# dbdebug($self,"lock_tables:\n" . $locks,getlogger(__PACKAGE__));
# $self->db_do('LOCK TABLES ' . $locks);
#
# }
#
#}
#
#sub unlock_tables {
#
# my $self = shift;
# if (defined $self->{dbh}) {
#
# dbdebug($self,'unlock_tables',getlogger(__PACKAGE__));
# $self->db_do('UNLOCK TABLES');
#
# }
#
#}
sub db_do_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
$self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_);
$self->SUPER::db_do_begin($query,$rowblock_transactional,@_);
}
@ -507,10 +517,10 @@ sub db_get_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
#my $lock = shift;
$self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_);
$self->SUPER::db_get_begin($query,$rowblock_transactional,@_);
}
@ -519,7 +529,7 @@ sub db_finish {
my $self = shift;
#my $unlock = shift;
$self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk);
$self->SUPER::db_finish($rowblock_transactional);
}

@ -48,8 +48,10 @@ my $LongTruncOk = 0;
#my $logger = getlogger(__PACKAGE__);
my $lock_do_chunk = 0;
my $lock_get_chunk = 0;
#my $lock_do_chunk = 0;
#my $lock_get_chunk = 0;
my $rowblock_transactional = 1;
my $isolation_level = ''; #'SERIALIZABLE'
@ -504,9 +506,9 @@ sub db_do_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
$self->SUPER::db_do_begin($query,$tablename,0,@_);
$self->SUPER::db_do_begin($query,$rowblock_transactional,@_);
}
@ -514,9 +516,9 @@ sub db_get_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
$self->SUPER::db_get_begin($query,$tablename,0,@_);
$self->SUPER::db_get_begin($query,$rowblock_transactional,@_);
}
@ -525,7 +527,7 @@ sub db_finish {
my $self = shift;
#my $unlock = shift;
$self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk);
$self->SUPER::db_finish($rowblock_transactional);
}

@ -50,8 +50,10 @@ my $client_encoding = 'LATIN1';
#my $logger = getlogger(__PACKAGE__);
my $lock_do_chunk = 0;
my $lock_get_chunk = 0;
#my $lock_do_chunk = 0;
#my $lock_get_chunk = 0;
my $rowblock_transactional = 1;
#my $to_number_pattern = '9.9999999999999'; #EEEE';
@ -507,9 +509,9 @@ sub db_do_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
$self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_);
$self->SUPER::db_do_begin($query,$rowblock_transactional,@_);
}
@ -517,10 +519,10 @@ sub db_get_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
#my $lock = shift;
$self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_);
$self->SUPER::db_get_begin($query,$rowblock_transactional,@_);
}
@ -529,7 +531,7 @@ sub db_finish {
my $self = shift;
#my $unlock = shift;
$self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk);
$self->SUPER::db_finish($rowblock_transactional);
}

@ -55,8 +55,10 @@ my $LongTruncOk = 0;
#my $logger = getlogger(__PACKAGE__);
my $lock_do_chunk = 0;
my $lock_get_chunk = 0;
#my $lock_do_chunk = 0;
#my $lock_get_chunk = 0;
my $rowblock_transactional = 1;
my $transaction_isolation_level = ''; #'SERIALIZABLE'
@ -498,9 +500,9 @@ sub db_do_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
$self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_);
$self->SUPER::db_do_begin($query,$rowblock_transactional,@_);
}
@ -508,10 +510,10 @@ sub db_get_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
#my $lock = shift;
$self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_);
$self->SUPER::db_get_begin($query,$rowblock_transactional,@_);
}
@ -520,7 +522,7 @@ sub db_finish {
my $self = shift;
#my $unlock = shift;
$self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk);
$self->SUPER::db_finish($rowblock_transactional);
}

@ -76,8 +76,10 @@ my $LongTruncOk = 0;
#my $logger = getlogger(__PACKAGE__);
my $lock_do_chunk = 1;
my $lock_get_chunk = 1;
#my $lock_do_chunk = 0; #1;
#my $lock_get_chunk = 0; #1;
my $rowblock_transactional = 1;
#SQLite transactions are always serializable.
@ -490,6 +492,14 @@ sub multithreading_supported {
}
sub insert_ignore_phrase {
my $self = shift;
return 'OR IGNORE';
}
sub truncate_table {
my $self = shift;
@ -583,9 +593,9 @@ sub db_do_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
$self->SUPER::db_do_begin($query,$tablename,$lock_do_chunk,@_);
$self->SUPER::db_do_begin($query,$rowblock_transactional,@_);
}
@ -593,10 +603,10 @@ sub db_get_begin {
my $self = shift;
my $query = shift;
my $tablename = shift;
#my $tablename = shift;
#my $lock = shift;
$self->SUPER::db_get_begin($query,$tablename,$lock_get_chunk,@_);
$self->SUPER::db_get_begin($query,$rowblock_transactional,@_);
}
@ -605,7 +615,7 @@ sub db_finish {
my $self = shift;
#my $unlock = shift;
$self->SUPER::db_finish($lock_do_chunk | $lock_get_chunk);
$self->SUPER::db_finish($rowblock_transactional);
}

@ -252,7 +252,7 @@ sub checktableinfo {
my ($get_db,$tablename,$expected_fieldnames,$target_indexes) = @_;
my $success = 1;
my $result = 1;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
@ -295,7 +295,7 @@ sub checktableinfo {
# otherwise we log a failure (exit? - see Logging Module)
#$table_fieldnames_cached->{$connectidentifier}->{$tablename} = {}; #$fieldnames;
fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename},$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$tablename},getlogger(__PACKAGE__));
$success = 0;
$result = 0;
}
}
@ -328,7 +328,7 @@ sub checktableinfo {
#$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = shared_clone($target_indexes);
$table_target_indexes->{$tid}->{$connectidentifier}->{$tablename} = $target_indexes;
return $success;
return $result;
}
@ -719,12 +719,12 @@ sub transfer_records {
sub insert_stmt {
my ($get_db,$tablename) = @_;
my ($get_db,$tablename,$insert_ignore) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$tablename};
return 'INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
return 'INSERT ' . ($insert_ignore ? $db->insert_ignore_phrase() . ' ' : '') . 'INTO ' . $db->tableidentifier($tablename) . ' (' .
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) .
') VALUES (' . substr(',?' x scalar @$expected_fieldnames,1) . ')';
@ -869,7 +869,7 @@ sub transfer_table {
#$target_db = &$get_target_db($writer_connection_name);
eval {
$db->db_get_begin($selectstatement,$tablename,@values);
$db->db_get_begin($selectstatement,@values); #$tablename
my $i = 0;
while (1) {
@ -878,7 +878,7 @@ sub transfer_table {
my $realblocksize = scalar @$rowblock;
if ($realblocksize > 0) {
writing_rows($target_db,$targettablename,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
$target_db->db_do_begin($insertstatement,$targettablename);
$target_db->db_do_begin($insertstatement); #,$targettablename);
$target_db->db_do_rowblock($rowblock);
$target_db->db_finish();
$i += $realblocksize;
@ -1137,14 +1137,14 @@ sub process_table {
#$db->db_disconnect();
#undef $db;
#$db = &$get_db($reader_connection_name);
my $context = {};
my $context = { tid => $tid };
my $rowblock_result = 1;
eval {
if ('CODE' eq ref $init_process_context_code) {
&$init_process_context_code($context);
}
$db->db_get_begin($selectstatement,$tablename,@values);
$db->db_get_begin($selectstatement,@values); #$tablename
my $i = 0;
while (1) {
@ -1156,7 +1156,7 @@ sub process_table {
$rowblock_result = &$process_code($context,$rowblock,$i);
#$target_db->db_do_begin($insertstatement,$targettablename);
#$target_db->db_do_begin($insertstatement); #,$targettablename);
#$target_db->db_do_rowblock($rowblock);
#$target_db->db_finish();
$i += $realblocksize;
@ -1170,9 +1170,6 @@ sub process_table {
}
$db->db_finish();
if ('CODE' eq ref $uninit_process_context_code) {
&$uninit_process_context_code($context);
}
};
if ($@) {
@ -1181,6 +1178,11 @@ sub process_table {
$errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED;
}
eval {
if ('CODE' eq ref $uninit_process_context_code) {
&$uninit_process_context_code($context);
}
};
$db->db_disconnect();
#undef $db;
@ -1296,6 +1298,7 @@ sub _reader {
my $reader_db;
my $tid = threadid();
$context->{tid} = $tid;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
@ -1306,7 +1309,7 @@ sub _reader {
my $blockcount = 0;
eval {
$reader_db = &{$context->{get_db}}(); #$reader_connection_name);
$reader_db->db_get_begin($context->{selectstatement},$context->{tablename},@{$context->{values_ref}});
$reader_db->db_get_begin($context->{selectstatement},@{$context->{values_ref}}); #$context->{tablename}
tablethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__));
while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up
#yield();
@ -1376,6 +1379,7 @@ sub _writer {
#get_target_db
my $writer_db;
my $tid = threadid();
$context->{tid} = $tid;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
@ -1391,7 +1395,7 @@ sub _writer {
if ($packet->{size} > 0) {
writing_rows($writer_db,$context->{targettablename},$packet->{row_offset},$packet->{size},$context->{rowcount},getlogger(__PACKAGE__));
$writer_db->db_do_begin($context->{insertstatement},$context->{targettablename});
$writer_db->db_do_begin($context->{insertstatement}); #,$context->{targettablename});
$writer_db->db_do_rowblock($packet->{rows});
$writer_db->db_finish();
$blockcount++;
@ -1427,6 +1431,7 @@ sub _process {
#my $writer_db;
my $rowblock_result = 1;
my $tid = threadid();
$context->{tid} = $tid;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
@ -1447,7 +1452,7 @@ sub _process {
#writing_rows($writer_db,$context->{targettablename},$i,$realblocksize,$context->{rowcount},getlogger(__PACKAGE__));
#$writer_db->db_do_begin($context->{insertstatement},$context->{targettablename});
#$writer_db->db_do_begin($context->{insertstatement}); #,$context->{targettablename});
#$writer_db->db_do_rowblock($rowblock);
#$writer_db->db_finish();
@ -1475,16 +1480,17 @@ sub _process {
sleep($thread_sleep_secs); #2015-01
}
}
};
my $err = $@;
tablethreadingdebug($err ? '[' . $tid . '] processor thread error: ' . $err : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
eval {
if ('CODE' eq ref $context->{uninit_process_context_code}) {
&{$context->{uninit_process_context_code}}($context);
}
};
#if (defined $writer_db) {
# $writer_db->db_disconnect();
#}
tablethreadingdebug($@ ? '[' . $tid . '] processor thread error: ' . $@ : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
lock $context->{errorstates};
if ($@) {
if ($err) {
$context->{errorstates}->{$tid} = $ERROR;
} else {
$context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED;

@ -543,27 +543,29 @@ sub kbytes2gigs {
sub cleanupdir {
my ($dirpath,$keeproot,$scriptinfocode,$filewarncode,$logger) = @_;
my ($dirpath,$keeproot,$filewarncode,$logger) = @_;
if (-d $dirpath) {
remove_tree($dirpath, {
keep_root => $keeproot,
error => \my $err });
'keep_root' => $keeproot,
'verbose' => 1,
'error' => \my $err });
if (@$err) {
if (defined $filewarncode and ref $filewarncode eq 'CODE') {
for my $diag (@$err) {
my ($file, $message) = %$diag;
if ($file eq '') {
&$filewarncode("general error: $message",$logger);
&$filewarncode("cleanup: $message",$logger);
} else {
&$filewarncode("problem unlinking $file: $message",$logger);
}
}
}
} else {
if (defined $scriptinfocode and ref $scriptinfocode eq 'CODE') {
&$scriptinfocode($dirpath . ' removed',$logger);
}
}
#else {
# if (!$keeproot and defined $scriptinfocode and ref $scriptinfocode eq 'CODE') {
# &$scriptinfocode($dirpath . ' removed',$logger);
# }
#}
#if ($restoredir) {
# makedir($dirpath);
#}
@ -583,13 +585,14 @@ sub makepath {
#changemod($dirpath);
make_path($dirpath,{
'chmod' => $chmod_umask,
'verbose' => 1,
'error' => \my $err });
if (@$err) {
if (defined $fileerrorcode and ref $fileerrorcode eq 'CODE') {
for my $diag (@$err) {
my ($file, $message) = %$diag;
if ($file eq '') {
&$fileerrorcode("general error: $message",$logger);
&$fileerrorcode("creating path: $message",$logger);
} else {
&$fileerrorcode("problem creating $file: $message",$logger);
}

@ -31,7 +31,7 @@ emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
successemailrecipient =
doneemailrecipient =
##logging:
fileloglevel = OFF

Loading…
Cancel
Save