TT#129852 export registration to .csv or sqlite

Change-Id: If7561752ece1c5daff9e479646592ae59e5539d4
(cherry picked from commit 51c91d368d)
mr7.5.4
Rene Krenn 4 years ago
parent fc332acb7a
commit dd3a9a28fa

@ -29,6 +29,7 @@ our @EXPORT_OK = qw(
insert_row
findby_domain
findby_id
);
my $tablename = 'voip_domains';
@ -72,6 +73,23 @@ sub findby_domain {
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub insert_row {
my $db = &$get_db();

@ -15,7 +15,7 @@ use NGCP::BulkProcessor::SqlProcessor qw(
copy_row
insert_stmt
transfer_table
);
#process_table
use NGCP::BulkProcessor::SqlRecord qw();
@ -35,6 +35,7 @@ our @EXPORT_OK = qw(
update_delta
findby_delta
countby_delta
copy_table
$deleted_delta
$updated_delta
@ -241,6 +242,26 @@ sub buildrecords_fromrows {
}
sub copy_table {
my ($get_target_db) = @_;
check_table();
#checktableinfo($get_target_db,
# __PACKAGE__,$tablename,
# get_fieldnames(1),
# $indexes);
return transfer_table(
get_db => $get_db,
class => __PACKAGE__,
get_target_db => $get_target_db,
targetclass => __PACKAGE__,
targettablename => $tablename,
);
}
#sub process_records {
#
# my %params = @_;

@ -41,6 +41,7 @@ use NGCP::BulkProcessor::ConnectorPool qw(
);
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Redis::Trunk::location::usrdom qw();
@ -49,10 +50,11 @@ use NGCP::BulkProcessor::Utils qw(threadid trim);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
load_registrations
load_registrations_file
load_registrations_all
);
sub load_registrations {
sub load_registrations_file {
my ($file) = @_;
$file //= $usernames_filename;
@ -131,6 +133,78 @@ sub load_registrations {
}
sub load_registrations_all {
my $result = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::create_table(0);
my $upsert = _locations_reset_delta();
destroy_all_dbs(); #close all db connections before forking..
destroy_stores();
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::process_records(
process_code => sub {
my ($context,$records,$row_offset) = @_;
my @data = ();
my @location_rows = ();
foreach my $record (@$records) {
if (_load_registrations_all_init_context($context,$record)) {
foreach my $registration (@{$context->{registrations}}) {
my %r = %{$registration->getvalue}; my @row_ext = @r{@NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::fieldnames};
if ($context->{upsert}) {
push(@row_ext,$registration->getvalue()->{ruid});
} else {
push(@row_ext,$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::added_delta);
}
push(@location_rows,\@row_ext);
}
if ($location_single_row_txn and (scalar @location_rows) > 0) {
while (defined (my $location_row = shift @location_rows)) {
if ($skip_errors) {
eval { _insert_location_rows($context,[$location_row]); };
_warn($context,$@) if $@;
} else {
_insert_location_rows($context,[$location_row]);
}
}
}
}
}
if (not $location_single_row_txn and (scalar @location_rows) > 0) {
if ($skip_errors) {
eval { _insert_location_rows($context,\@location_rows); };
_warn($context,$@) if $@;
} else {
_insert_location_rows($context,\@location_rows);
}
}
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_sqlite_db();
#$context->{redis} = &get_location_store();
$context->{upsert} = $upsert;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
#undef $context->{redis};
destroy_all_dbs();
destroy_stores();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $load_registrations_multithreading,
numofthreads => $load_registrations_numofthreads,
),$warning_count);
}
sub _locations_reset_delta {
my $upsert = 0;
if (NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta() > 0) {
@ -163,14 +237,14 @@ sub _insert_location_rows {
}
}
sub _load_registrations_init_context() {
sub _load_registrations_file_init_context() {
my ($context,$username,$domain) = @_;
$context->{username} = $username;
$context->{domain} = $domain;
my @registrations = ();
my $result = 1;
$context->{usrdom} = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::get_usrdom_by_username_domain($username,$domain,{ _entries => 1, });
$context->{usrdom} = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::get_usrdom_by_username_domain($context->{username},$context->{domain},{ _entries => 1, });
if ($context->{usrdom}) {
foreach my $entry (@{$context->{usrdom}->{_entries}}) {
push(@registrations,$entry); # if expiry > now
@ -182,7 +256,24 @@ sub _load_registrations_init_context() {
}
sub _load_registrations_all_init_context() {
my ($context,$prov_subscriber) = @_;
$context->{username} = $prov_subscriber->{username};
$context->{domain} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains::findby_id($prov_subscriber->{domain_id})->{domain};
my @registrations = ();
my $result = 1;
$context->{usrdom} = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::get_usrdom_by_username_domain(lc($context->{username}),$context->{domain},{ _entries => 1, });
if ($context->{usrdom}) {
foreach my $entry (@{$context->{usrdom}->{_entries}}) {
push(@registrations,$entry); # if expiry > now
}
}
$result = 0 unless scalar @registrations;
$context->{registrations} = \@registrations;
return $result;
}
sub _error {

@ -8,7 +8,7 @@ use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../');
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw(
$csv_dir
$sqlite_db_file
);
@ -17,7 +17,7 @@ use NGCP::BulkProcessor::ConnectorPool qw(
);
#use NGCP::BulkProcessor::SqlConnectors::MySQLDB;
use NGCP::BulkProcessor::SqlConnectors::CSVDB qw();
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode);
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
@ -27,6 +27,10 @@ our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
get_sqlite_db
sqlite_db_tableidentifier
get_csv_db
csv_db_tableidentifier
destroy_dbs
destroy_all_dbs
@ -34,6 +38,7 @@ our @EXPORT_OK = qw(
);
my $sqlite_dbs = {};
my $csv_dbs = {};
sub get_sqlite_db {
@ -62,6 +67,31 @@ sub sqlite_db_tableidentifier {
}
sub get_csv_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name);
if (not defined $csv_dbs->{$name}) {
$csv_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::CSVDB->new($instance_name);
if (not defined $reconnect) {
$reconnect = 1;
}
}
if ($reconnect) {
$csv_dbs->{$name}->db_connect($csv_dir);
}
return $csv_dbs->{$name};
}
sub csv_db_tableidentifier {
my ($get_target_db,$tablename) = @_;
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::CSVDB::get_tableidentifier($tablename,$csv_dir));
}
sub destroy_dbs {
foreach my $name (keys %$sqlite_dbs) {
@ -69,6 +99,12 @@ sub destroy_dbs {
undef $sqlite_dbs->{$name};
delete $sqlite_dbs->{$name};
}
foreach my $name (keys %$csv_dbs) {
cleartableinfo($csv_dbs->{$name});
undef $csv_dbs->{$name};
delete $csv_dbs->{$name};
}
}
@ -77,4 +113,8 @@ sub destroy_all_dbs() {
NGCP::BulkProcessor::ConnectorPool::destroy_dbs();
}
sub ping_all_dbs() {
NGCP::BulkProcessor::ConnectorPool::ping_dbs();
}
1;

@ -45,6 +45,7 @@ our @EXPORT_OK = qw(
$output_path
$input_path
$csv_dir
$defaultsettings
$defaultconfig
@ -59,6 +60,8 @@ our @EXPORT_OK = qw(
$skip_errors
$force
get_export_filename
$registrations_export_filename_format
);
our $defaultconfig = 'config.cfg';
@ -66,6 +69,7 @@ our $defaultsettings = 'settings.cfg';
our $output_path = $working_path . 'output/';
our $input_path = $working_path . 'input/';
our $csv_dir = 'location';
our $usernames_filename = undef;
our $usernames_rownum_start = 0;
@ -79,6 +83,8 @@ our $force = 0;
our $sqlite_db_file = 'sqlite';
our $registrations_export_filename_format = undef;
sub update_settings {
my ($data,$configfile) = @_;
@ -92,6 +98,7 @@ sub update_settings {
$result &= _prepare_working_paths(1);
$csv_dir = $data->{csv_dir} if exists $data->{csv_dir};
$sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file};
$load_registrations_multithreading = $data->{load_registrations_multithreading} if exists $data->{load_registrations_multithreading};
@ -104,6 +111,9 @@ sub update_settings {
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$force = $data->{force} if exists $data->{force};
$registrations_export_filename_format = $data->{registrations_export_filename} if exists $data->{registrations_export_filename};
get_export_filename($data->{registrations_export_filename},$configfile);
return $result;
}
@ -144,4 +154,26 @@ sub _get_import_filename {
return $import_filename;
}
sub get_export_filename {
my ($filename_format,$configfile) = @_;
my $export_filename;
my $export_format;
if ($filename_format) {
$export_filename = $output_path . sprintf($filename_format,timestampdigits(),threadid());
if (-e $export_filename and (unlink $export_filename) == 0) {
filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__));
$export_filename = undef;
}
my ($name,$path,$suffix) = fileparse($export_filename,".db",".csv");
if ($suffix eq '.db') {
$export_format = 'sqlite';
} elsif ($suffix eq '.csv') {
$export_format = 'csv';
} else {
configurationerror($configfile,"$filename_format: either .db or .csv export file format required");
}
}
return ($export_filename,$export_format);
}
1;

@ -21,6 +21,9 @@ use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw(
$skip_errors
$force
get_export_filename
$registrations_export_filename_format
);
use NGCP::BulkProcessor::Logging qw(
@ -56,6 +59,8 @@ use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location qw();
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool qw(
get_sqlite_db
get_csv_db
destroy_all_dbs
);
use NGCP::BulkProcessor::ConnectorPool qw(
@ -63,7 +68,8 @@ use NGCP::BulkProcessor::ConnectorPool qw(
);
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Process qw(
load_registrations
load_registrations_file
load_registrations_all
);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
@ -188,7 +194,11 @@ sub load_registrations_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = load_registrations($file);
if ($file) {
($result,$warning_count) = load_registrations_file($file);
} else {
($result,$warning_count) = load_registrations_all();
}
};
#print $@;
my $err = $@;
@ -208,6 +218,15 @@ sub load_registrations_task {
$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::deleted_delta
);
$stats .= "\n removed: $deleted_count rows";
my ($export_filename,$export_format) = get_export_filename($registrations_export_filename_format);
if ('sqlite' eq $export_format) {
&get_sqlite_db()->copydbfile($export_filename);
} elsif ('csv' eq $export_format) {
NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::copy_table(\&get_csv_db);
&get_csv_db()->copytablefile(NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::gettablename(),$export_filename);
} else {
push(@$messages,'invalid extension for output filename $export_filename');
}
};
if ($err or !$result) {
push(@$messages,"loading registrations INCOMPLETE$stats");

@ -7,4 +7,6 @@ load_registrations_multithreading = 1
usernames_filename =
usernames_rownum_start = 1
#ignore_location_unique = 0
location_single_row_txn = 0
location_single_row_txn = 0
registrations_export_filename=registrations_%s.csv

@ -7,4 +7,6 @@ load_registrations_multithreading = 1
usernames_filename = /home/rkrenn/temp/massive/input/test.csv
usernames_rownum_start = 1
#ignore_location_unique = 0
location_single_row_txn = 0
location_single_row_txn = 0
registrations_export_filename=registrations_%s.csv
Loading…
Cancel
Save