MT#18663 MT#20893 wip16: subscriebr update, line endings, batch processing fixes

+tolerant line endings for the username+password import
 +same for batch files
+only process batch records with updated or added delta
 +convenience: no need to clear the batch import before
  loading a new batch file.
 +if records in a batch file overlap with ones from previous
  batch files, they are "updated" and hece processed again.
+subscriber password update instead of recreating subs
 (aka know as reprovisioning).
 +if there's a peer_auth_pass present, its also updated.

Change-Id: I22ea6485dd2d48e3633ebb262037c8d3fafc923c
changes/31/7631/4
Rene Krenn 10 years ago
parent 018a04fa18
commit ccbf657ac5

@ -15,6 +15,7 @@ use NGCP::BulkProcessor::ConnectorPool qw(
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
update_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
@ -25,6 +26,7 @@ our @EXPORT_OK = qw(
gettablename
check_table
insert_row
update_row
findby_uuid
);
@ -87,6 +89,15 @@ sub findby_uuid {
}
sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
sub insert_row {
my $db = &$get_db();

@ -126,16 +126,18 @@ sub set_call_forwards_batch {
my $rownum = $row_offset;
foreach my $record (@$records) {
$rownum++;
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_call_forward_context($context,$imported_subscriber,$rownum);
_set_call_forward($context);
} else {
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
if ($record->{delta} ne $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::deleted_delta) {
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_call_forward_context($context,$imported_subscriber,$rownum);
_set_call_forward($context);
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
}
}
}
}

@ -17,7 +17,8 @@ require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor);
our @EXPORT_OK = qw();
my $lineseparator = '\\n';
#my $lineseparator = '\\n';
my $lineseparator = '\\n\\r|\\r|\\n';
my $fieldseparator = " +";
my $encoding = 'UTF-8';

@ -17,7 +17,8 @@ require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor);
our @EXPORT_OK = qw();
my $lineseparator = '\\n'; #'\\r\\n';
#my $lineseparator = '\\n'; #'\\r\\n';
my $lineseparator = '\\n\\r|\\r|\\n';
my $fieldseparator = ','; #" +";
my $encoding = 'UTF-8';

@ -152,15 +152,17 @@ sub set_barring_profiles_batch {
foreach my $record (@$records) {
$rownum++;
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_barring_profile_context($context,$imported_subscriber,$rownum);
_set_barring_profile($context);
} else {
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
if ($record->{delta} ne $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::deleted_delta) {
if (defined $imported_subscriber) {
next unless _reset_set_barring_profile_context($context,$imported_subscriber,$rownum);
_set_barring_profile($context);
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
}
}
}
}
@ -513,16 +515,18 @@ sub set_peer_auth_batch {
my $rownum = $row_offset;
foreach my $record (@$records) {
$rownum++;
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_peer_auth_context($context,$imported_subscriber,$rownum);
_set_peer_auth($context);
} else {
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
if ($record->{delta} ne $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::deleted_delta) {
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_peer_auth_context($context,$imported_subscriber,$rownum);
_set_peer_auth($context);
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
}
}
}
}
@ -777,16 +781,18 @@ sub set_allowed_ips_batch {
my $rownum = $row_offset;
foreach my $record (@$records) {
$rownum++;
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_allowed_ips_context($context,$imported_subscriber,$rownum);
_set_allowed_ips($context);
} else {
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
if ($record->{delta} ne $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::deleted_delta) {
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_allowed_ips_context($context,$imported_subscriber,$rownum);
_set_allowed_ips($context);
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
}
}
}
}
@ -950,16 +956,18 @@ sub set_preference_bulk_batch {
my $rownum = $row_offset;
foreach my $record (@$records) {
$rownum++;
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_preference_bulk_context($context,$imported_subscriber,$rownum);
_set_preference_bulk($context);
} else {
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
if ($record->{delta} ne $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::deleted_delta) {
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _reset_set_preference_bulk_context($context,$imported_subscriber,$rownum);
_set_preference_bulk($context);
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
}
}
}
}

@ -22,6 +22,7 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw(
$provision_subscriber_multithreading
$provision_subscriber_numofthreads
$reprovision_upon_password_change
);
use NGCP::BulkProcessor::Logging qw (
@ -90,6 +91,7 @@ sub provision_subscribers {
destroy_all_dbs();
my $warning_count :shared = 0;
my $updated_password_count :shared = 0;
return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::process_records(
static_context => $static_context,
process_code => sub {
@ -108,6 +110,7 @@ sub provision_subscribers {
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
$context->{updated_password_count} = 0;
# below is not mandatory..
_check_insert_tables();
},
@ -118,12 +121,13 @@ sub provision_subscribers {
{
lock $warning_count;
$warning_count += $context->{warning_count};
$updated_password_count += $context->{updated_password_count};
}
},
load_recursive => 0,
multithreading => $provision_subscriber_multithreading,
numofthreads => $provision_subscriber_numofthreads,
),$warning_count);
),$warning_count,$updated_password_count);
}
sub provision_subscribers_batch {
@ -133,6 +137,7 @@ sub provision_subscribers_batch {
destroy_all_dbs();
my $warning_count :shared = 0;
my $updated_password_count :shared = 0;
return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::process_records(
static_context => $static_context,
process_code => sub {
@ -140,15 +145,17 @@ sub provision_subscribers_batch {
my $rownum = $row_offset;
foreach my $record (@$records) {
$rownum++;
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _provision_susbcriber($context,$imported_subscriber,$rownum);
} else {
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
if ($record->{delta} ne $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::deleted_delta) {
my $imported_subscriber = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::findby_subscribernumber($record->{number});
if (defined $imported_subscriber) {
next unless _provision_susbcriber($context,$imported_subscriber,$rownum);
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
if ($skip_errors) {
_warn($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
next;
} else {
_error($context,'record ' . $rownum . ' - no subscriber record for batch number found: ' . $record->{number});
}
}
}
}
@ -161,6 +168,7 @@ sub provision_subscribers_batch {
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
$context->{updated_password_count} = 0;
# below is not mandatory..
_check_insert_tables();
},
@ -171,12 +179,13 @@ sub provision_subscribers_batch {
{
lock $warning_count;
$warning_count += $context->{warning_count};
$updated_password_count += $context->{updated_password_count};
}
},
load_recursive => 0,
multithreading => $provision_subscriber_multithreading,
numofthreads => $provision_subscriber_numofthreads,
),$warning_count);
),$warning_count,$updated_password_count);
}
@ -243,30 +252,34 @@ sub _provision_susbcriber {
_info($context,"($context->{rownum}) " . 'existing billing subscriber with username ' . $context->{username} . ' and updated password found (re-provisioned)');
if (_terminate_subscriber($context,$existing_billing_voip_subscriber->{id})) {
if (_terminate_contract($context,$existing_billing_voip_subscriber->{contract_id})) {
if ($dry) {
_create_contact($context);
_create_contract($context);
eval {
_create_subscriber($context);
};
if ($@) {
_info($context,"($context->{rownum}) " . 'expected error ' . $@ . ' while re-provisioning subscriber ' . $context->{cli} . ' in dry-mode',1);
} else {
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'expected error while re-provisioning subscriber ' . $context->{cli} . ' in dry-mode missing');
if ($reprovision_upon_password_change) {
if (_terminate_subscriber($context,$existing_billing_voip_subscriber->{id})) {
if (_terminate_contract($context,$existing_billing_voip_subscriber->{contract_id})) {
if ($dry) {
_create_contact($context);
_create_contract($context);
eval {
_create_subscriber($context);
};
if ($@) {
_info($context,"($context->{rownum}) " . 'expected error ' . $@ . ' while re-provisioning subscriber ' . $context->{cli} . ' in dry-mode',1);
} else {
_error($context,"($context->{rownum}) " . 'expected error while re-provisioning subscriber ' . $context->{cli} . ' in dry-mode missing');
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'expected error while re-provisioning subscriber ' . $context->{cli} . ' in dry-mode missing');
} else {
_error($context,"($context->{rownum}) " . 'expected error while re-provisioning subscriber ' . $context->{cli} . ' in dry-mode missing');
}
}
} else {
_create_contact($context);
_create_contract($context);
_create_subscriber($context);
_info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' successfully re-provisioned');
}
} else {
_create_contact($context);
_create_contract($context);
_create_subscriber($context);
_info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' successfully re-provisioned');
}
}
} else {
_update_passwords($context,$existing_billing_voip_subscriber->{uuid});
}
} else {
_info($context,"($context->{rownum}) " . 'existing billing subscriber with username ' . $context->{username} . ' and unchanged password found, skipping',1);
@ -422,6 +435,15 @@ sub _provision_subscribers_checks {
$result = 0; #even in skip-error mode..
}
eval {
$context->{peer_auth_pass_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_PASS);
};
if ($@ or not defined $context->{peer_auth_pass_attribute}) {
rowprocessingerror(threadid(),'cannot find peer_auth_pass attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
return $result;
}
@ -464,6 +486,44 @@ sub _create_contract {
}
sub _update_passwords {
my ($context,$uuid) = @_;
my $provisioning_voip_subscriber = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(
$context->{db},$uuid);
if (defined $provisioning_voip_subscriber) {
if (NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::update_row($context->{db},{
id => $provisioning_voip_subscriber->{id},
password => $context->{password},
})) {
_info($context,"($context->{rownum}) " . 'subscriber password updated: ' . $context->{cli});
my $old_preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db},
$provisioning_voip_subscriber->{id},$context->{peer_auth_pass_attribute}->{id});
if ((scalar @$old_preferences) > 0) {
if (NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::update_row($context->{db},{
id => $old_preferences->[0]->{id},
value => $context->{password},
})) {
$context->{updated_password_count} = $context->{updated_password_count} + 1;
_info($context,"($context->{rownum}) " . 'peer_auth_pass password found and updated: ' . $context->{cli});
}
} else {
_info($context,"($context->{rownum}) " . 'no peer_auth_pass password to update: ' . $context->{cli});
}
}
} else {
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli});
} else {
_error($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli});
}
}
}
sub _create_subscriber {
my ($context) = @_;

@ -89,6 +89,7 @@ our @EXPORT_OK = qw(
$provision_subscriber_multithreading
$provision_subscriber_numofthreads
$reprovision_upon_password_change
$set_barring_profiles_multithreading
$set_barring_profiles_numofthreads
@ -180,6 +181,7 @@ our $subsciber_username_prefix = undef;
our $provision_subscriber_multithreading = $enablemultithreading;
our $provision_subscriber_numofthreads = $cpucount;
our $reprovision_upon_password_change = 0;
our $set_barring_profiles_multithreading = $enablemultithreading;
our $set_barring_profiles_numofthreads = $cpucount;
@ -282,6 +284,7 @@ sub update_settings {
$provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading};
$provision_subscriber_numofthreads = _get_import_numofthreads($cpucount,$data,'provision_subscriber_numofthreads');
$reprovision_upon_password_change = $data->{reprovision_upon_password_change} if exists $data->{reprovision_upon_password_change};
$set_barring_profiles_multithreading = $data->{set_barring_profiles_multithreading} if exists $data->{set_barring_profiles_multithreading};
$set_barring_profiles_numofthreads = _get_import_numofthreads($cpucount,$data,'set_barring_profiles_numofthreads');

@ -33,6 +33,7 @@ use NGCP::BulkProcessor::Projects::Migration::IPGallery::Settings qw(
$barring_profiles
$allowed_ips
$concurrent_max_total
$reprovision_upon_password_change
);
use NGCP::BulkProcessor::Logging qw(
init_log
@ -782,16 +783,17 @@ sub import_truncate_batch_task {
sub provision_subscriber_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
my ($result,$warning_count,$updated_password_count) = (0,0,0);
eval {
if ($batch) {
($result,$warning_count) = provision_subscribers_batch();
($result,$warning_count,$updated_password_count) = provision_subscribers_batch();
} else {
($result,$warning_count) = provision_subscribers();
($result,$warning_count,$updated_password_count) = provision_subscribers();
}
};
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
my $updated_password_count = 0;
eval {
$stats .= "\n total contracts: " .
NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(undef,$reseller_id) . ' rows';
@ -823,6 +825,9 @@ sub provision_subscriber_task {
push(@$messages,"provisioning subscribers INCOMPLETE$stats");
} else {
push(@$messages,"provisioning subscribers completed$stats");
if (not $dry and $reprovision_upon_password_change and $updated_password_count > 0) {
push(@$messages,"THERE WERE $updated_password_count UPDATED PASSWORDS. YOU MIGHT WANT TO RESTART SEMS NOW ...");
}
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;

@ -17,7 +17,9 @@ subscribernumer_exclude_exception_pattern = ^35627702770$
lnp_define_filename = /home/rkrenn/test/26-07-2016/LNP_Define.cfg
lnp_define_import_numofthreads = 2
user_password_filename = /home/rkrenn/test/26-07-2016/Usernames and passwords 26-07-2016.csv
#user_password_filename = /home/rkrenn/test/26-07-2016/Usernames and passwords 26-07-2016.csv
#user_password_filename = /home/rkrenn/Downloads/user_passwords_2016_07_29
user_password_filename = /home/rkrenn/Downloads/passwords.csv
user_password_import_numofthreads = 2
username_prefix = 356
min_password_length = 3
@ -34,6 +36,7 @@ webpassword_length = 8
provision_subscriber_multithreading = 1
#provision_subscriber_numofthreads = 6
subsciber_username_prefix = 356
reprovision_upon_password_change = 0
set_barring_profiles_multithreading = 1
#set_barring_profiles_numofthreads = 6

Loading…
Cancel
Save