TT#110203 redis registration monitoring to SQLite PoC

Change-Id: I3c7bcb14f3bff3de04258938d5e9a1bed2c493ab
(cherry picked from commit c2a2b7c64d)
mr8.1.1
Rene Krenn 5 years ago
parent 0a561f88f6
commit fbdccef6e9

@ -155,7 +155,7 @@ sub process_entries {
} else {
my $store = &$get_store(); #$reader_connection_name);
my $store = &$get_store(undef,1); #$reader_connection_name);
$blocksize //= $store->get_defaultblockcount();
my $context = _create_process_context($static_context,{ tid => $tid });

@ -0,0 +1,330 @@
package NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location;
use strict;
## no critic
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool qw(
get_sqlite_db
destroy_all_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
registertableinfo
create_targettable
checktableinfo
copy_row
insert_stmt
);
#process_table
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
create_table
gettablename
check_table
getinsertstatement
getupsertstatement
findby_domainusername
findby_ruid
update_delta
findby_delta
countby_delta
$deleted_delta
$updated_delta
$added_delta
@fieldnames
);
my $tablename = 'location';
my $get_db = \&get_sqlite_db;
our @fieldnames = (
'instance',
'domain',
'cseq',
'partition',
'ruid',
'connection_id',
'username',
'keepalive',
'path',
'reg_id',
'contact',
'flags',
'received',
'callid',
'socket',
'cflags',
'expires',
'methods',
'user_agent',
'q',
'last_modified',
'server_id',
);
my $expected_fieldnames = [
@fieldnames,
'delta',
];
# table creation:
my $primarykey_fieldnames = [ 'ruid' ];
my $indexes = {
#$tablename . '_number' => [ 'number(32)' ],
#$tablename . '_rownum' => [ 'rownum(11)' ],
$tablename . '_domain_username' => [ 'domain', 'username' ],
$tablename . '_delta' => [ 'delta(7)' ],
};
#my $fixtable_statements = [];
our $deleted_delta = 'DELETED';
our $updated_delta = 'UPDATED';
our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub create_table {
my ($truncate) = @_;
my $db = &$get_db();
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
sub findby_delta {
my ($delta,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless defined $delta;
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('delta') . ' = ?'
, $delta);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_domainusername {
my ($domain,$username,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless (defined $domain and defined $username);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' . $table .
' WHERE ' . $db->columnidentifier('domain') . ' = ?' .
' AND ' . $db->columnidentifier('username') . ' = ?'
, $domain, $username);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findby_ruid {
my ($ruid,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return undef unless defined $ruid;
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' . $table .
' WHERE ' . $db->columnidentifier('ruid') . ' = ?'
, $ruid);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub update_delta {
my ($ruid,$delta) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'UPDATE ' . $table . ' SET delta = ?';
my @params = ();
push(@params,$delta);
if (defined $ruid) {
$stmt .= ' WHERE ' .
$db->columnidentifier('ruid') . ' = ?';
push(@params, $ruid);
}
return $db->db_do($stmt,@params);
}
sub countby_delta {
my ($deltas) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1';
my @params = ();
if (defined $deltas and 'HASH' eq ref $deltas) {
foreach my $in (keys %$deltas) {
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
push(@params,@values);
}
} elsif (defined $deltas and length($deltas) > 0) {
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?';
push(@params,$deltas);
}
return $db->db_get_value($stmt,@params);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
#sub process_records {
#
# my %params = @_;
# my ($process_code,
# $static_context,
# $init_process_context_code,
# $uninit_process_context_code,
# $multithreading,
# $numofthreads) = @params{qw/
# process_code
# static_context
# init_process_context_code
# uninit_process_context_code
# multithreading
# numofthreads
# /};
#
# check_table();
# my $db = &$get_db();
# my $table = $db->tableidentifier($tablename);
#
# my @cols = map { $db->columnidentifier($_); } qw/domain sip_username/;
#
# return process_table(
# get_db => $get_db,
# class => __PACKAGE__,
# process_code => sub {
# my ($context,$rowblock,$row_offset) = @_;
# return &$process_code($context,$rowblock,$row_offset);
# },
# static_context => $static_context,
# init_process_context_code => $init_process_context_code,
# uninit_process_context_code => $uninit_process_context_code,
# destroy_reader_dbs_code => \&destroy_all_dbs,
# multithreading => $multithreading,
# tableprocessing_threads => $numofthreads,
# 'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),
# 'selectcount' => 'SELECT COUNT(DISTINCT(' . join(',',@cols) . ')) FROM ' . $table,
# );
#}
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
sub getupsertstatement {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' .
join(', ', map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')';
my @values = ();
foreach my $fieldname (@$expected_fieldnames) {
if ('delta' eq $fieldname) {
my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' .
$db->columnidentifier('ruid') . ' = ?';
push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')');
} else {
push(@values,'?');
}
}
$upsert_stmt .= ' VALUES (' . join(',',@values) . ')';
return $upsert_stmt;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,214 @@
package NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Process;
use strict;
## no critic
use threads::shared qw();
#use Encode qw();
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw(
$usernames_filename
$usernames_rownum_start
$load_registrations_multithreading
$load_registrations_numofthreads
$ignore_location_unique
$location_single_row_txn
$skip_errors
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
fileprocessingwarn
fileprocessingerror
);
use NGCP::BulkProcessor::FileProcessors::CSVFileSimple qw();
#use NGCP::BulkProcessor::FileProcessors::XslxFileSimple qw();
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool qw(
get_sqlite_db
destroy_all_dbs
);
use NGCP::BulkProcessor::ConnectorPool qw(
destroy_stores
);
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location qw();
use NGCP::BulkProcessor::Redis::Trunk::location::usrdom qw();
use NGCP::BulkProcessor::Utils qw(threadid trim);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
load_registrations
);
sub load_registrations {
my ($file) = @_;
$file //= $usernames_filename;
_error({ filename => '<none>', },'no file specified') unless $file;
my $result = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::create_table(0);
my $importer = NGCP::BulkProcessor::FileProcessors::CSVFileSimple->new($load_registrations_numofthreads,undef,';');
my $upsert = _locations_reset_delta();
destroy_all_dbs(); #close all db connections before forking..
destroy_stores();
my $warning_count :shared = 0;
return ($result && $importer->process(
file => $file,
process_code => sub {
my ($context,$records,$row_offset) = @_;
my @data = ();
my @location_rows = ();
foreach my $record (@$records) {
if (_load_registrations_init_context($context,$record->[0],$record->[1])) {
foreach my $registration (@{$context->{registrations}}) {
my %r = %{$registration->getvalue}; my @row_ext = @r{@NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::fieldnames};
if ($context->{upsert}) {
push(@row_ext,$registration->getvalue()->{ruid});
} else {
push(@row_ext,$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::added_delta);
}
push(@location_rows,\@row_ext);
}
if ($location_single_row_txn and (scalar @location_rows) > 0) {
while (defined (my $location_row = shift @location_rows)) {
if ($skip_errors) {
eval { _insert_location_rows($context,[$location_row]); };
_warn($context,$@) if $@;
} else {
_insert_location_rows($context,[$location_row]);
}
}
}
}
}
if (not $location_single_row_txn and (scalar @location_rows) > 0) {
if ($skip_errors) {
eval { _insert_location_rows($context,\@location_rows); };
_warn($context,$@) if $@;
} else {
_insert_location_rows($context,\@location_rows);
}
}
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_sqlite_db();
#$context->{redis} = &get_location_store();
$context->{upsert} = $upsert;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
#undef $context->{redis};
destroy_all_dbs();
destroy_stores();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $load_registrations_multithreading,
numofthreads => $load_registrations_numofthreads,
),$warning_count);
}
sub _locations_reset_delta {
my $upsert = 0;
if (NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta() > 0) {
processing_info(threadid(),'resetting delta of ' .
NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::update_delta(undef,
$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::deleted_delta) .
' location records',getlogger(__PACKAGE__));
$upsert |= 1;
}
return $upsert;
}
sub _insert_location_rows {
my ($context,$location_rows) = @_;
$context->{db}->db_do_begin(
($context->{upsert} ?
NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::getupsertstatement()
: NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::getinsertstatement($ignore_location_unique)),
);
eval {
$context->{db}->db_do_rowblock($location_rows);
$context->{db}->db_finish();
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_finish(1);
};
die($err);
}
}
sub _load_registrations_init_context() {
my ($context,$username,$domain) = @_;
$context->{username} = $username;
$context->{domain} = $domain;
my @registrations = ();
my $result = 1;
$context->{usrdom} = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::get_usrdom_by_username_domain($username,$domain,{ _entries => 1, });
if ($context->{usrdom}) {
foreach my $entry (@{$context->{usrdom}->{_entries}}) {
push(@registrations,$entry); # if expiry > now
}
}
$result = 0 unless scalar @registrations;
$context->{registrations} = \@registrations;
return $result;
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
fileprocessingerror($context->{filename},$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
fileprocessingwarn($context->{filename},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -0,0 +1,80 @@
package NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool;
use strict;
## no critic
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../');
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw(
$sqlite_db_file
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_connectorinstancename
);
#use NGCP::BulkProcessor::SqlConnectors::MySQLDB;
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode);
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
get_sqlite_db
destroy_dbs
destroy_all_dbs
);
my $sqlite_dbs = {};
sub get_sqlite_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name); #threadid(); #shift;
if (not defined $sqlite_dbs->{$name}) {
$sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name);
if (not defined $reconnect) {
$reconnect = 1;
}
}
if ($reconnect) {
$sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file);
}
return $sqlite_dbs->{$name};
}
sub sqlite_db_tableidentifier {
my ($get_target_db,$tablename) = @_;
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file));
}
sub destroy_dbs {
foreach my $name (keys %$sqlite_dbs) {
cleartableinfo($sqlite_dbs->{$name});
undef $sqlite_dbs->{$name};
delete $sqlite_dbs->{$name};
}
}
sub destroy_all_dbs() {
destroy_dbs();
NGCP::BulkProcessor::ConnectorPool::destroy_dbs();
}
1;

@ -80,8 +80,13 @@ SKIP:
process_code => sub {
my ($context,$records,$row_offset) = @_;
#die();
print @$records . " done\n";
return 0;
print join("\n", map {
my $key = $_->getkey();
$key =~ s/location\:usrdom\:\://;
$key =~ s/\:/;/;
$key;
} @$records);
return 1;
},
static_context => $static_context,
blocksize => 10000,

@ -0,0 +1,147 @@
package NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings;
use strict;
## no critic
use threads::shared qw();
use File::Basename qw(fileparse);
use DateTime::TimeZone qw();
use NGCP::BulkProcessor::Globals qw(
$working_path
$enablemultithreading
$cpucount
create_path
);
use NGCP::BulkProcessor::Logging qw(
getlogger
scriptinfo
configurationinfo
);
use NGCP::BulkProcessor::LogError qw(
fileerror
filewarn
configurationwarn
configurationerror
);
use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid);
#format_number check_ipnet
use NGCP::BulkProcessor::Array qw(contains);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
$sqlite_db_file
$output_path
$input_path
$defaultsettings
$defaultconfig
$usernames_filename
$usernames_rownum_start
$load_registrations_numofthreads
$load_registrations_multithreading
$ignore_location_unique
$location_single_row_txn
$skip_errors
$force
);
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $output_path = $working_path . 'output/';
our $input_path = $working_path . 'input/';
our $usernames_filename = undef;
our $usernames_rownum_start = 0;
our $load_registrations_multithreading = $enablemultithreading;
our $load_registrations_numofthreads = $cpucount;
our $ignore_location_unique = 0;
our $location_single_row_txn = 0;
our $skip_errors = 0;
our $force = 0;
our $sqlite_db_file = 'sqlite';
sub update_settings {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
my $regexp_result;
#&$configurationinfocode("testinfomessage",$configlogger);
$result &= _prepare_working_paths(1);
$sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file};
$load_registrations_multithreading = $data->{load_registrations_multithreading} if exists $data->{load_registrations_multithreading};
$usernames_filename = _get_import_filename($usernames_filename,$data,'usernames_filename');
$usernames_rownum_start = $data->{usernames_rownum_start} if exists $data->{usernames_rownum_start};
$load_registrations_numofthreads = _get_numofthreads($cpucount,$data,'load_registrations_numofthreads');
$ignore_location_unique = $data->{ignore_location_unique} if exists $data->{ignore_location_unique};
$location_single_row_txn = $data->{location_single_row_txn} if exists $data->{location_single_row_txn};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$force = $data->{force} if exists $data->{force};
return $result;
}
return 0;
}
sub _prepare_working_paths {
my ($create) = @_;
my $result = 1;
my $path_result;
($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
return $result;
}
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $numofthreads = $default_value;
$numofthreads = $data->{$key} if exists $data->{$key};
$numofthreads = $cpucount if $numofthreads > $cpucount;
return $numofthreads;
}
sub _get_import_filename {
my ($old_value,$data,$key) = @_;
my $import_filename = $old_value;
$import_filename = $data->{$key} if exists $data->{$key};
if (defined $import_filename and length($import_filename) > 0) {
$import_filename = $input_path . $import_filename unless -e $import_filename;
}
return $import_filename;
}
1;

@ -0,0 +1,69 @@
##general settings:
working_path = /var/sipwise
#cpucount = 4
enablemultithreading = 1
##gearman/service listener config:
jobservers = 127.0.0.1:4730
##NGCP MySQL connectivity - "accounting" db:
accounting_host = localhost
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = localhost
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = localhost
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = localhost
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = localhost
xa_port = 3306
xa_databasename = billing
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://10.0.2.15:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##NGCP Redis connectivity - "location" store:
location_databaseindex = 20
#location_password =
location_host = 127.0.0.1
location_port = 6379
#location_sock =
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = INFO
#DEBUG
screenloglevel = INFO
#INFO
emailloglevel = OFF

@ -0,0 +1,69 @@
##general settings:
working_path = /home/rkrenn/temp/massive
#cpucount = 4
enablemultithreading = 1
##gearman/service listener config:
jobservers = 127.0.0.1:4730
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.146
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.146
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.146
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.146
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.146
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##NGCP Redis connectivity - "location" store:
location_databaseindex = 20
#location_password =
location_host = 192.168.0.146
location_port = 6379
#location_sock =
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = INFO
#DEBUG
screenloglevel = INFO
#INFO
emailloglevel = OFF

@ -1,3 +1,223 @@
use strict;
## no critic
get_location_store
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
use Getopt::Long qw(GetOptions);
use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw(
update_settings
$output_path
$defaultsettings
$defaultconfig
$skip_errors
$force
);
use NGCP::BulkProcessor::Logging qw(
init_log
getlogger
$attachmentlogfile
scriptinfo
cleanuplogfiles
$currentlogfile
);
use NGCP::BulkProcessor::LogError qw (
completion
done
scriptwarn
scripterror
filewarn
fileerror
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
$ANY_CONFIG_TYPE
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location qw();
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool qw(
destroy_all_dbs
);
use NGCP::BulkProcessor::ConnectorPool qw(
destroy_stores
);
use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Process qw(
load_registrations
);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
my $tasks = [];
my $file = undef;
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
my $cleanup_all_task_opt = 'cleanup_all';
push(@TASK_OPTS,$cleanup_all_task_opt);
my $load_registrations_task_opt = 'load_registrations';
push(@TASK_OPTS,$load_registrations_task_opt);
if (init()) {
main();
exit(0);
} else {
exit(1);
}
sub init {
my $configfile = $defaultconfig;
my $settingsfile = $defaultsettings;
return 0 unless GetOptions(
"config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
#"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
"file=s" => \$file,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
my $result = load_config($configfile);
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
return $result;
}
sub main() {
my @messages = ();
my @attachmentfiles = ();
my $result = 1;
my $completion = 0;
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
foreach my $task (@$tasks) {
if (lc($cleanup_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result);
} elsif (lc($cleanup_all_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result);
} elsif (lc($load_registrations_task_opt) eq lc($task)) {
$result &= load_registrations_task(\@messages) if taskinfo($load_registrations_task_opt,$result);
} else {
$result = 0;
scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
last;
}
}
} else {
$result = 0;
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
}
push(@attachmentfiles,$attachmentlogfile);
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
return $result;
}
sub taskinfo {
my ($task,$result) = @_;
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
return $result;
}
sub cleanup_task {
my ($messages,$clean_generated) = @_;
my $result = 0;
if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) {
eval {
#cleanupcvsdirs() if $clean_generated;
cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
#cleanupcertfiles();
cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
}
if ($@ or !$result) {
#print $@;
push(@$messages,'working directory cleanup INCOMPLETE');
return 0;
} else {
push(@$messages,'working directory folders cleaned up');
return 1;
}
}
sub load_registrations_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = load_registrations($file);
};
#print $@;
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
$stats .= "\n total loaded registrations: " .
NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta() . ' rows';
my $added_count = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta(
$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::added_delta
);
$stats .= "\n new: $added_count rows";
my $existing_count = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta(
$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::updated_delta
);
$stats .= "\n existing: $existing_count rows";
my $deleted_count = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta(
$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::deleted_delta
);
$stats .= "\n removed: $deleted_count rows";
};
if ($err or !$result) {
push(@$messages,"loading registrations INCOMPLETE$stats");
} else {
push(@$messages,"loading registrations completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
destroy_stores();
return $result;
}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!

@ -0,0 +1,10 @@
#force=0
#skip_errors=0
load_registrations_multithreading = 1
#load_registrations_numofthreads = 4
usernames_filename =
usernames_rownum_start = 1
#ignore_location_unique = 0
location_single_row_txn = 0

@ -0,0 +1,10 @@
#force=0
#skip_errors=0
load_registrations_multithreading = 1
#load_registrations_numofthreads = 4
usernames_filename = /home/rkrenn/temp/massive/input/test.csv
usernames_rownum_start = 1
#ignore_location_unique = 0
location_single_row_txn = 0

@ -111,6 +111,8 @@ our @EXPORT_OK = qw(
prompt
check_int
check_ipnet
unshare
);
our $chmod_umask = 0777;
@ -1025,4 +1027,37 @@ sub check_int {
return 0;
}
sub unshare {
# PP deep-copy without tie-ing, to un-share shared datastructures,
# so they can be manipulated without errors
my ($obj) = @_;
return undef if not defined $obj; # terminal for: undefined
my $ref = ref $obj;
if (not $ref) { # terminal for: scalar
return $obj;
} elsif ("SCALAR" eq $ref) { # terminal for: scalar ref
$obj = $$obj;
return \$obj;
} elsif ("ARRAY" eq $ref) { # terminal for: array
my @array = ();
foreach my $value (@$obj) {
push(@array, unshare($value));
}
return \@array;
} elsif ($ref eq "HASH") { # terminal for: hash
my %hash = ();
foreach my $key (keys %$obj) {
$hash{$key} = unshare($obj->{$key});
}
return \%hash;
} elsif ("REF" eq $ref) { # terminal for: ref of scalar ref, array, hash etc.
$obj = unshare($$obj);
return \$obj;
} else {
die("unsharing $ref not supported\n");
}
}
1;

Loading…
Cancel
Save