TT#54612 cashback tool WIP

Change-Id: I3ce428de311471be1d401dd8f51f50a41ad94032
changes/35/27935/1
Rene Krenn 6 years ago
parent 154ec4cb12
commit 60b86558a6

@ -40,6 +40,7 @@ our @EXPORT_OK = qw(
findby_callidprefix findby_callidprefix
process_unexported process_unexported
process_fromto
get_callidprefix get_callidprefix
@ -332,6 +333,69 @@ sub process_unexported {
); );
} }
sub process_fromto {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$blocksize,
$from,
$to) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
blocksize
from
to
/};
#sort
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = '';
if ($from or $to) {
$stmt .= ' WHERE ';
my @terms = ();
if ($from) {
push(@terms,$db->columnidentifier('start_time') . ' >= UNIX_TIMESTAMP("' . $from . '")');
}
if ($to) {
push(@terms,$db->columnidentifier('start_time') . ' < UNIX_TIMESTAMP("' . $to . '")');
}
$stmt .= join(' AND ',@terms);
}
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,$rowblock,$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
blocksize => $blocksize,
select => 'SELECT * FROM ' . $table . $stmt . ' ORDER BY ' . $db->columnidentifier('id'),
selectcount => 'SELECT COUNT(1) FROM ' . $table . $stmt,
);
}
sub _get_export_stmt { sub _get_export_stmt {
my ($db,$static_context,$joins,$conditions) = @_; my ($db,$static_context,$joins,$conditions) = @_;

@ -0,0 +1,181 @@
package NGCP::BulkProcessor::Projects::Disaster::Cashback::CDR;
use strict;
## no critic
use threads::shared qw();
use NGCP::BulkProcessor::Projects::Disaster::Cashback::Settings qw(
$skip_errors
$cashback_multithreading
$cashback_numofthreads
$cashback_blocksize
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
fileerror
);
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
destroy_dbs
ping_dbs
);
use NGCP::BulkProcessor::Utils qw(threadid);
#use NGCP::BulkProcessor::Calendar qw(from_epoch);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
cashback
);
sub cashback {
my ($from,$to) = @_;
my $static_context = {};
my $result = _cashback_create_context($static_context);
destroy_dbs();
my $warning_count :shared = 0;
$result &= NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::process_fromto(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
foreach my $record (@$records) {
if (defined $export_cdr_limit) {
lock $rowcount;
if ($rowcount >= $export_cdr_limit) {
_info($context,"exceeding export limit $export_cdr_limit");
return 0;
}
}
if ($context->{file_sequence_number} > $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn) {
_info($context,"exceeding file sequence number " . $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn);
return 0;
}
my ($id,$call_id) = @$record;
next unless _export_cdrs_init_context($context,$id,$call_id);
eval {
$context->{file}->write_record(
get_transfer_in => \&_get_transfer_in,
get_record => \&_get_record,
get_transfer_out => \&_get_transfer_out,
commit_cb => \&_commit_export_status,
context => $context,
);
};
if ($@) {
if ($skip_errors) {
_warn($context,"problem while exporting call id $call_id (cdr id $id): " . $@);
} else {
_error($context,"problem while exporting call id $call_id (cdr id $id): " . $@);
}
}
}
ping_dbs();
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
load_recursive => 0,
blocksize => $cashback_blocksize,
multithreading => cashback_multithreading,
numofthreads => $cashback_numofthreads,
from => $from,
to => $to
);
return ($result,$warning_count);
}
sub _cashback_init_context {
my ($context,$cdr_id,$call_id) = @_;
my $result = 0;
return $result;
}
sub _cashback_create_context {
my ($context) = @_;
my $result = 1;
$context->{tid} = threadid();
return $result;
}
sub _error {
my ($context,$message) = @_;
if ($skip_errors) {
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__));
} else {
$context->{error_count} = $context->{error_count} + 1;
rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__));
}
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -0,0 +1,118 @@
package NGCP::BulkProcessor::Projects::Disaster::Cashback::Settings;
use strict;
## no critic
use NGCP::BulkProcessor::Globals qw(
$enablemultithreading
$cpucount
);
#$working_path
#create_path
use NGCP::BulkProcessor::Logging qw(
getlogger
scriptinfo
configurationinfo
);
use NGCP::BulkProcessor::LogError qw(
fileerror
configurationwarn
configurationerror
);
use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt stringtobool);
#format_number check_ipnet
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
check_dry
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
$cashback_multithreading
$cashback_numofthreads
$cashback_blocksize
);
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $force = 0;
our $dry = 0;
our $skip_errors = 0;
our $cashback_multithreading = $enablemultithreading;
our $cashback_numofthreads = $cpucount;
our $cashback_blocksize = undef;
sub update_settings {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
#my $regexp_result;
#&$configurationinfocode("testinfomessage",$configlogger);
#$result &= _prepare_working_paths(1);
$dry = $data->{dry} if exists $data->{dry};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$cashback_multithreading = $data->{cashback_multithreading} if exists $data->{cashback_multithreading};
$cashback_numofthreads = _get_numofthreads($cpucount,$data,'cashback_numofthreads');
$cashback_blocksize = $data->{cashback_blocksize} if exists $data->{cashback_blocksize};
return $result;
}
return 0;
}
sub check_dry {
if ($dry) {
scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__));
return 1;
} else {
scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__));
if (!$force) {
if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) {
return 1;
} else {
return 0;
}
} else {
scriptinfo('force option applied',getlogger(__PACKAGE__));
return 1;
}
}
}
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $_numofthreads = $default_value;
$_numofthreads = $data->{$key} if exists $data->{$key};
$_numofthreads = $cpucount if $_numofthreads > $cpucount;
return $_numofthreads;
}
1;

@ -0,0 +1,62 @@
##general settings:
working_path = /var/sipwise
cpucount = 4
enablemultithreading = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
#provisioning_conf = /etc/ngcp-panel/provisioning.conf
##NGCP MySQL connectivity - "accounting" db:
accounting_host = somehost
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = somehost
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = somehost
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = somehost
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.74
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://somehost:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = DEBUG
screenloglevel = INFO
emailloglevel = OFF

@ -0,0 +1,62 @@
##general settings:
working_path = /home/rkrenn/temp/soco
cpucount = 4
enablemultithreading = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
#provisioning_conf = /etc/ngcp-panel/provisioning.conf
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.84
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.84
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.84
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.84
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.84
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = DEBUG
screenloglevel = INFO
emailloglevel = OFF

@ -0,0 +1,210 @@
use strict;
## no critic
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
use Getopt::Long qw(GetOptions);
use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::Disaster::Cashback::Settings qw(
update_settings
check_dry
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
);
use NGCP::BulkProcessor::Logging qw(
init_log
getlogger
$attachmentlogfile
scriptinfo
cleanuplogfiles
$currentlogfile
);
use NGCP::BulkProcessor::LogError qw (
completion
done
scriptwarn
scripterror
filewarn
fileerror
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
$ANY_CONFIG_TYPE
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
use NGCP::BulkProcessor::Projects::Disaster::Cashback::CDR qw(
fix_xx
);
use NGCP::BulkProcessor::ConnectorPool qw(
destroy_dbs
);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
my $tasks = [];
my $from = undef,
my $to = undef;
my $cashback_task_opt = 'cashback';
push(@TASK_OPTS,$cashback_task_opt);
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
if (init()) {
main();
exit(0);
} else {
exit(1);
}
sub init {
my $configfile = $defaultconfig;
my $settingsfile = $defaultsettings;
return 0 unless GetOptions(
"config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
"from=s" => \$from,
"to=s" => \$to,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
my $result = load_config($configfile);
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
return $result;
}
sub main() {
my @messages = ();
my @attachmentfiles = ();
my $result = 1;
my $completion = 0;
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
foreach my $task (@$tasks) {
if (lc($cleanup_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages) if taskinfo($cleanup_task_opt,$result);
} elsif (lc($cashback_task_opt) eq lc($task)) {
if (taskinfo($cashback_task_opt,$result)) {
next unless check_dry();
$result &= cashback_task(\@messages);
$completion |= 1;
}
} else {
$result = 0;
scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
last;
}
}
} else {
$result = 0;
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
}
push(@attachmentfiles,$attachmentlogfile);
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
return $result;
}
sub taskinfo {
my ($task,$result) = @_;
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
return $result;
}
sub cleanup_task {
my ($messages) = @_;
my $result = 0;
eval {
#cleanupcvsdirs() if $clean_generated;
#cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
#cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
#cleanupdir($rollback_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
if ($@ or !$result) {
push(@$messages,'cleanup INCOMPLETE');
return 0;
} else {
push(@$messages,'cleanup completed');
return 1;
}
}
sub cashback_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = cashback($from,$to);
};
my $err = $@;
my $fromto = 'from ' . ($from ? $from : '-') . ' to ' . ($to ? $to : '-');
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
eval {
};
if ($err or !$result) {
push(@$messages,"cashback $fromto INCOMPLETE$stats");
} else {
push(@$messages,"cashback $fromto completed$stats");
}
destroy_dbs(); #every task should leave with closed connections.
return $result;
}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..
# destroy_all_dbs
#}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!

@ -0,0 +1,7 @@
#dry=0
#skip_errors=0
cashback_numofthreads=2
cashback_multithreading=1
cashback_blocksize=1000

@ -0,0 +1,7 @@
#dry=0
#skip_errors=0
cashback_numofthreads=2
cashback_multithreading=1
cashback_blocksize=1000

@ -14,4 +14,4 @@ providers_yml = providers.yml
generate_cdr_multithreading = 1 generate_cdr_multithreading = 1
#generate_cdr_numofthreads = 2 #generate_cdr_numofthreads = 2
generate_cdr_count = 5000 generate_cdr_count = 10

Loading…
Cancel
Save