TT#45958 ama exporter: iterate unexported cdrs

Change-Id: Ie3b741641e91487b0e26859110a43ce756e1e310
changes/83/24283/2
Rene Krenn 7 years ago
parent 40542fc967
commit 4066bcdf19

@ -5,6 +5,8 @@ use strict;
use NGCP::BulkProcessor::Globals qw(
$rowblock_transactional
$accounting_databasename
$accounting_username
$accounting_password
@ -112,7 +114,7 @@ sub get_accounting_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name);
if (!defined $accounting_dbs->{$name}) {
$accounting_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name);
$accounting_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
if (!defined $reconnect) {
$reconnect = 1;
}
@ -138,7 +140,7 @@ sub get_billing_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name);
if (!defined $billing_dbs->{$name}) {
$billing_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name);
$billing_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
if (!defined $reconnect) {
$reconnect = 1;
}
@ -163,7 +165,7 @@ sub get_provisioning_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name);
if (!defined $provisioning_dbs->{$name}) {
$provisioning_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name);
$provisioning_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
if (!defined $reconnect) {
$reconnect = 1;
}
@ -188,7 +190,7 @@ sub get_kamailio_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name);
if (!defined $kamailio_dbs->{$name}) {
$kamailio_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name);
$kamailio_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
if (!defined $reconnect) {
$reconnect = 1;
}
@ -214,7 +216,7 @@ sub get_xa_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name);
if (!defined $xa_dbs->{$name}) {
$xa_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name);
$xa_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
if (!defined $reconnect) {
$reconnect = 1;
}

@ -1,6 +1,7 @@
package NGCP::BulkProcessor::Dao::Trunk::accounting::cdr;
use strict;
#use Tie::IxHash;
## no critic
use NGCP::BulkProcessor::Logging qw(
@ -10,6 +11,7 @@ use NGCP::BulkProcessor::Logging qw(
use NGCP::BulkProcessor::ConnectorPool qw(
get_accounting_db
destroy_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
@ -17,9 +19,13 @@ use NGCP::BulkProcessor::SqlProcessor qw(
insert_record
update_record
copy_row
process_table
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
@ -32,6 +38,9 @@ our @EXPORT_OK = qw(
delete_callids
countby_ratingstatus
findby_callidprefix
process_unexported
);
#process_records
#delete_ids
@ -128,6 +137,12 @@ my $expected_fieldnames = [
"export_status",
];
my @callid_suffixes = ();
our $PBXSUFFIX = '_pbx-1';
push(@callid_suffixes,$PBXSUFFIX);
our $XFERSUFFIX = '_xfer-1';
push(@callid_suffixes,$XFERSUFFIX);
my $indexes = {};
my $insert_unique_fields = [];
@ -192,6 +207,31 @@ sub countby_ratingstatus {
}
sub findby_callidprefix {
my ($xa_db,$call_id,$joins,$conditions,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $suffixre = '(' . join('|', map { quotemeta($_); } @callid_suffixes) . ')+$';
$call_id =~ s/$suffixre//g;
$call_id =~ s/%/\\%/g;
my @conditions = @{$conditions // []};
push(@conditions,{ $table . '.call_id' => { 'LIKE' => '?' } });
my $stmt = 'SELECT ' . join(',', map { $table . '.' . $db->columnidentifier($_); } @$expected_fieldnames) . ' ' .
_get_export_stmt($db,undef,$joins,\@conditions) .
' ORDER BY LENGTH(' . $table . '.call_id' . ') ASC, ' . $table . '.start_time ASC';
my @params = ($call_id . '%');
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub update_row {
my ($xa_db,$data) = @_;
@ -205,54 +245,111 @@ sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
#my %params = @_;
#my ($contract_id,
# $domain_id,
# $username,
# $uuid) = @params{qw/
# contract_id
# domain_id
# username
# uuid
# /};
#
#if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
# $db->columnidentifier('contact_id') . ', ' .
# $db->columnidentifier('contract_id') . ', ' .
# $db->columnidentifier('domain_id') . ', ' .
# $db->columnidentifier('external_id') . ', ' .
# $db->columnidentifier('primary_number_id') . ', ' .
# $db->columnidentifier('status') . ', ' .
# $db->columnidentifier('username') . ', ' .
# $db->columnidentifier('uuid') . ') VALUES (' .
# 'NULL, ' .
# '?, ' .
# '?, ' .
# 'NULL, ' .
# 'NULL, ' .
# '\'' . $ACTIVE_STATE . '\', ' .
# '?, ' .
# '?)',
# $contract_id,
# $domain_id,
# $username,
# $uuid,
# )) {
# rowinserted($db,$tablename,getlogger(__PACKAGE__));
# return $xa_db->db_last_insert_id();
#}
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
return undef;
}
sub process_unexported {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$blocksize,
$joins,
$conditions,
#$sort,
$limit) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
blocksize
joins
conditions
limit
/};
#sort
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = _get_export_stmt($db,$static_context,$joins,$conditions);
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
#my %cdr_id_map = ();
#tie(%cdr_id_map, 'Tie::IxHash');
#if ($rowblock) {
# foreach my $record (@$rowblock) {
# $cdr_id_map{$record->[0]} = $record->[1];
# }
#}
#return 0 if $row_offset >= $limit;
return &$process_code($context,$rowblock,$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
blocksize => $blocksize,
#select => $db->paginate_sort_query('SELECT ' . $table . '.* ' . $stmt,undef,undef,$sort),
select => 'SELECT ' . $table . '.' . $db->columnidentifier('id') . ', ' . $table . '.' . $db->columnidentifier('call_id') .
' ' . $stmt . ' ORDER BY ' . $table . '.' . $db->columnidentifier('id'),
selectcount => 'SELECT COUNT(1) FROM (' . $db->paginate_sort_query('SELECT 1 ' . $stmt,0,$limit,undef) . ') AS __cnt',
);
}
sub _get_export_stmt {
my ($db,$static_context,$joins,$conditions) = @_;
my $table = $db->tableidentifier($tablename);
my $stmt = "FROM " . $table;
my @intjoins = ();
if (defined $joins and (scalar @$joins) > 0) {
foreach my $f (@$joins) {
my ($table, $keys) = %{ $f };
my ($foreign_key, $own_key) = %{ $keys };
push @intjoins, "LEFT JOIN $table ON $foreign_key = $own_key";
}
}
my @conds = ();
if (defined $static_context and $static_context->{export_status_id}) {
push @intjoins, 'LEFT JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::gettablename()) . ' AS __cesd ON __cesd.cdr_id = ' . $table . '.id AND __cesd.status_id = ' . $static_context->{export_status_id};
push @conds, '(__cesd.export_status = "' . $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::UNEXPORTED . '" OR __cesd.export_status IS NULL)';
}
$stmt .= " " . join(" ", @intjoins) if (scalar @intjoins) > 0;
if (defined $conditions and (scalar @$conditions) > 0) {
foreach my $f (@$conditions) {
my ($field, $match) = %{ $f };
my ($op, $val) = %{ $match };
push @conds, "$field $op $val";
}
}
$stmt .= " WHERE " . join(" AND ", @conds) if (scalar @conds) > 0;
return $stmt;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;

@ -0,0 +1,124 @@
package NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_accounting_db
destroy_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
findall
findby_type
);
my $tablename = 'cdr_export_status';
my $get_db = \&get_accounting_db;
my $expected_fieldnames = [
"id",
"type",
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findall {
my ($load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_type {
my ($type,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('type') . ' = ?';
my @params = ($type);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,204 @@
package NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowsdeleted
rowinserted
rowupserted
rowupdated
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_accounting_db
destroy_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
update_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
update_row
insert_row
upsert_row
$UNEXPORTED
$EXPORTED
);
our $UNEXPORTED = 'unexported';
our $EXPORTED = 'exported';
my $tablename = 'cdr_export_status_data';
my $get_db = \&get_accounting_db;
my $expected_fieldnames = [
"cdr_id",
"status_id",
"exported_at",
"export_status",
"cdr_start_time",
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($cdr_id,
$status_id,
$export_status,
$cdr_start_time) = @params{qw/
cdr_id
status_id
export_status
cdr_start_time
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('cdr_id') . ', ' .
$db->columnidentifier('status_id') . ', ' .
$db->columnidentifier('export_status') . ', ' .
$db->columnidentifier('exported_at') . ', ' .
$db->columnidentifier('cdr_start_time') . ') VALUES (' .
'?, ' .
'?, ' .
'?, ' .
'NOW(), ' .
'?)',
$cdr_id,
$status_id,
$export_status,
$cdr_start_time,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return 1;
}
}
return undef;
}
sub upsert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
my %params = @_;
my ($cdr_id,
$status_id,
$export_status,
$cdr_start_time) = @params{qw/
cdr_id
status_id
export_status
cdr_start_time
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('cdr_id') . ', ' .
$db->columnidentifier('status_id') . ', ' .
$db->columnidentifier('export_status') . ', ' .
$db->columnidentifier('exported_at') . ', ' .
$db->columnidentifier('cdr_start_time') . ') VALUES (' .
'?, ' .
'?, ' .
'?, ' .
'NOW(), ' .
'?) ON DUPLICATE KEY UPDATE export_status = ?, exported_at = NOW()',
$cdr_id,
$status_id,
$export_status,
$cdr_start_time,
$export_status,
)) {
rowupserted($db,$tablename,getlogger(__PACKAGE__));
return 1;
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -51,6 +51,8 @@ our @EXPORT_OK = qw(
$LongReadLen_limit
$transfer_defer_indexes
$rowblock_transactional
$accounting_databasename
$accounting_username
$accounting_password
@ -164,7 +166,7 @@ our $LongReadLen_limit = 128*1024; #longest LOB field size in bytes
our $appstartsecs = Time::HiRes::time();
our $rowblock_transactional = undef; #connector default
our $accounting_databasename = 'accounting';
our $accounting_username = 'root';
@ -324,7 +326,7 @@ sub update_masterconfig {
}
$cells_transfer_memory_limit = $data->{cells_transfer_memory_limit} if exists $data->{cells_transfer_memory_limit};
$transfer_defer_indexes = $data->{transfer_defer_indexes} if exists $data->{transfer_defer_indexes};
$rowblock_transactional = $data->{rowblock_transactional} if exists $data->{rowblock_transactional};
if (defined $split_tuplecode and ref $split_tuplecode eq 'CODE') {
@jobservers = &$split_tuplecode($data->{jobservers}) if exists $data->{jobservers};

@ -52,6 +52,7 @@ our @EXPORT_OK = qw(
rowtransferred
rowinserted
rowupserted
rowupdated
rowsdeleted
totalrowsdeleted
@ -479,6 +480,15 @@ sub rowinserted {
}
sub rowupserted {
my ($db,$tablename,$logger) = @_;
if (defined $logger) {
$logger->debug(_getsqlconnectorinstanceprefix($db) . 'row upserted');
}
}
sub rowupdated {
my ($db,$tablename,$logger) = @_;

@ -0,0 +1,216 @@
package NGCP::BulkProcessor::Projects::Export::Ama::CDR;
use strict;
## no critic
use threads::shared qw();
#use Time::HiRes qw(sleep);
#use String::MkPasswd qw();
#use List::Util qw();
#use Data::Rmap qw();
#use Tie::IxHash;
#use NGCP::BulkProcessor::Globals qw(
# $enablemultithreading
#);
use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw(
$dry
$skip_errors
$export_cdr_multithreading
$export_cdr_blocksize
$export_cdr_joins
$export_cdr_conditions
$export_cdr_limit
$export_cdr_stream
);
#$deadlock_retries
#@providers
#$generate_cdr_numofthreads
#$generate_cdr_count
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
fileerror
);
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status qw();
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
destroy_dbs
);
#ping_dbs
#use NGCP::BulkProcessor::Utils qw(threadid timestamp); # stringtobool check_ipnet trim);
##use NGCP::BulkProcessor::DSSorter qw(sort_by_configs);
##use NGCP::BulkProcessor::RandomString qw(createtmpstring);
#use NGCP::BulkProcessor::Array qw(array_to_map);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
export_cdrs
);
sub export_cdrs {
my $static_context = {};
my $result = _export_cdrs_create_context($static_context);
destroy_dbs();
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::process_unexported(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
my $rownum = $row_offset;
my %cdr_id_map = map { $_->[0] => $_->[1]; } @$records;
foreach my $record (@$records) {
return 0 if (defined $export_cdr_limit and $rownum >= $export_cdr_limit);
my ($id,$call_id) = @$record;
next unless exists $cdr_id_map{$id};
my %dropped = ();
eval {
$context->{db}->db_begin();
my $cdrs = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::findby_callidprefix($context->{db},
$call_id,$export_cdr_joins,$export_cdr_conditions);
#todo: write.
foreach my $cdr (@$cdrs) {
#mark exported
NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::upsert_row($context->{db},
cdr_id => $cdr->{id},
status_id => $context->{export_status_id},
export_status => $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::EXPORTED,
cdr_start_time => $cdr->{start_time},
);
_info($context,"export_status set for cdr id $cdr->{id}",1);
$dropped{$cdr->{id}} = delete $cdr_id_map{$cdr->{id}};
$rownum++;
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
foreach (keys %dropped) {
$cdr_id_map{$_} = $dropped{$_};
}
};
if ($skip_errors) {
_warn($context,"problem while exporting call id $call_id (cdr id $id): " . $err);
} else {
_error($context,"problem while exporting call id $call_id (cdr id $id): " . $err);
}
}
}
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
# below is not mandatory..
#_check_insert_tables();
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
load_recursive => 0,
blocksize => $export_cdr_blocksize,
multithreading => $export_cdr_multithreading,
numofthreads => 1,
joins => $export_cdr_joins,
conditions => $export_cdr_conditions,
#sort => [{ column => 'id', numeric => 1, dir => 1 }],
limit => $export_cdr_limit,
),$warning_count);
}
sub _export_cdrs_create_context {
my ($context) = @_;
my $result = 1;
my $export_status;
eval {
if ($export_cdr_stream) {
$export_status = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status::findby_type($export_cdr_stream);
}
$context->{export_status_id} = $export_status->{id} if $export_status;
};
if ($@ or ($export_cdr_stream and not $export_status)) {
_error($context,"cannot find export stream '$export_cdr_stream'");
$result = 0;
} elsif ($export_status) {
_info($context,"export stream '$export_cdr_stream' set");
}
return $result;
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -0,0 +1,206 @@
package NGCP::BulkProcessor::Projects::Export::Ama::Settings;
use strict;
## no critic
use NGCP::BulkProcessor::Globals qw(
$working_path
$enablemultithreading
$cpucount
create_path
);
use NGCP::BulkProcessor::Logging qw(
getlogger
scriptinfo
configurationinfo
);
use NGCP::BulkProcessor::LogError qw(
fileerror
filewarn
configurationwarn
configurationerror
);
use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt timestampdigits);
#format_number check_ipnet
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
check_dry
$input_path
$output_path
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
$export_cdr_multithreading
$export_cdr_blocksize
$export_cdr_joins
$export_cdr_conditions
$export_cdr_limit
$export_cdr_stream
);
#update_provider_config
#$deadlock_retries
#$generate_cdr_count
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $input_path = $working_path . 'input/';
our $output_path = $working_path . 'output/';
our $force = 0;
our $dry = 0;
our $skip_errors = 0;
our $export_cdr_multithreading = $enablemultithreading;
our $export_cdr_blocksize = undef;
our $export_cdr_joins = [];
our $export_cdr_conditions = [];
our $export_cdr_limit = undef;
our $export_cdr_stream = undef;
sub update_settings {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
#&$configurationinfocode("testinfomessage",$configlogger);
$result &= _prepare_working_paths(1);
#if ($data->{report_filename}) {
# $report_filename = $output_path . sprintf('/' . $data->{report_filename},timestampdigits());
# if (-e $report_filename and (unlink $report_filename) == 0) {
# filewarn('cannot remove ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__));
# $report_filename = undef;
# }
#} else {
# $report_filename = undef;
#}
$dry = $data->{dry} if exists $data->{dry};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$export_cdr_multithreading = $data->{export_cdr_multithreading} if exists $data->{export_cdr_multithreading};
$export_cdr_blocksize = $data->{export_cdr_blocksize} if exists $data->{export_cdr_blocksize};
my $parse_result;
($parse_result,$export_cdr_joins) = _parse_export_joins($data->{export_cdr_joins},$configfile);
($parse_result,$export_cdr_conditions) = _parse_export_joins($data->{export_cdr_conditions},$configfile);
$export_cdr_limit = $data->{export_cdr_limit} if exists $data->{export_cdr_limit};
$export_cdr_stream = $data->{export_cdr_stream} if exists $data->{export_cdr_stream};
#if ((confval("MAINTENANCE") // 'no') eq 'yes') {
# exit(0);
#}
return $result;
}
return 0;
}
sub _prepare_working_paths {
my ($create) = @_;
my $result = 1;
my $path_result;
($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
return $result;
}
sub _parse_export_joins {
my ($token,$file) = @_;
my @joins = ();
if (defined $token and length($token) > 0) {
foreach my $f (split_tuple($token)) {
next unless($f);
$f =~ s/^\s*\{?\s*//;
$f =~ s/\}\s*\}\s*$/}/;
my ($a, $b) = split(/\s*=>\s*{\s*/, $f);
$a =~ s/^\s*\'//;
$a =~ s/\'$//g;
$b =~ s/\s*\}\s*$//;
my ($c, $d) = split(/\s*=>\s*/, $b);
$c =~ s/^\s*\'//g;
$c =~ s/\'\s*//;
$d =~ s/^\s*\'//g;
$d =~ s/\'\s*//;
push @joins, { $a => { $c => $d } };
}
}
return (1,\@joins);
}
sub _parse_export_conditions {
my ($token,$file) = @_;
my @conditions = ();
if (defined $token and length($token) > 0) {
foreach my $f (split_tuple($token)) {
next unless($f);
$f =~ s/^\s*\{?\s*//;
$f =~ s/\}\s*\}\s*$/}/;
my ($a, $b) = split(/\s*=>\s*{\s*/, $f);
$a =~ s/^\s*\'//;
$a =~ s/\'$//g;
$b =~ s/\s*\}\s*$//;
my ($c, $d) = split(/\s*=>\s*/, $b);
$c =~ s/^\s*\'//g;
$c =~ s/\'\s*//;
$d =~ s/^\s*\'//g;
$d =~ s/\'\s*//;
push @conditions, { $a => { $c => $d } };
}
}
return (1,\@conditions);
}
sub check_dry {
if ($dry) {
scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__));
return 1;
} else {
scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__));
if (!$force) {
if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) {
return 1;
} else {
return 0;
}
} else {
scriptinfo('force option applied',getlogger(__PACKAGE__));
return 1;
}
}
}
1;

@ -0,0 +1,64 @@
##general settings:
working_path = /var/sipwise
#cpucount = 4
enablemultithreading = 1
rowblock_transactional = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
##NGCP MySQL connectivity - "accounting" db:
accounting_host = localhost
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = localhost
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = localhost
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = localhost
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = localhost
xa_port = 3306
xa_databasename = billing
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://10.0.2.15:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = INFO
#DEBUG
screenloglevel = INFO
#INFO
emailloglevel = OFF

@ -0,0 +1,64 @@
##general settings:
working_path = /home/rkrenn/temp/export
#cpucount = 4
enablemultithreading = 1
rowblock_transactional = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.29
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.29
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.29
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.29
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.29
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = INFO
#DEBUG
screenloglevel = INFO
#INFO
emailloglevel = OFF

@ -0,0 +1,232 @@
use strict;
## no critic
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
use Getopt::Long qw(GetOptions);
use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw(
update_settings
check_dry
$output_path
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
);
#@provider_config
#@providers
#$providers_yml
use NGCP::BulkProcessor::Logging qw(
init_log
getlogger
$attachmentlogfile
scriptinfo
cleanuplogfiles
$currentlogfile
);
use NGCP::BulkProcessor::LogError qw (
completion
done
scriptwarn
scripterror
filewarn
fileerror
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
$ANY_CONFIG_TYPE
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs);
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles);
use NGCP::BulkProcessor::ConnectorPool qw(destroy_dbs);
#use NGCP::BulkProcessor::Projects::Massive::Generator::Dao::Blah qw();
#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
use NGCP::BulkProcessor::Projects::Export::Ama::CDR qw(
export_cdrs
);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
my $tasks = [];
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
my $cleanup_all_task_opt = 'cleanup_all';
push(@TASK_OPTS,$cleanup_all_task_opt);
my $export_cdr_task_opt = 'export_cdr';
push(@TASK_OPTS,$export_cdr_task_opt);
if (init()) {
main();
exit(0);
} else {
exit(1);
}
sub init {
my $configfile = $defaultconfig;
my $settingsfile = $defaultsettings;
return 0 unless GetOptions(
"config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
#"run=s" => \$run_id,
"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
my $result = load_config($configfile);
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
#$result &= load_config($providers_yml,\&update_provider_config,$YAML_CONFIG_TYPE);
return $result;
}
sub main() {
my @messages = ();
my @attachmentfiles = ();
my $result = 1;
my $completion = 0;
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
foreach my $task (@$tasks) {
if (lc($cleanup_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result);
} elsif (lc($cleanup_all_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result);
} elsif (lc($export_cdr_task_opt) eq lc($task)) {
if (taskinfo($export_cdr_task_opt,$result,1)) {
next unless check_dry();
$result &= export_cdr_task(\@messages);
$completion |= 1;
}
#} elsif (lc($provision_subscriber_task_opt) eq lc($task)) {
# if (taskinfo($provision_subscriber_task_opt,$result,1)) {
# next unless check_dry();
# $result &= provision_subscriber_task(\@messages);
# $completion |= 1;
# }
#} elsif (lc($generate_cdr_task_opt) eq lc($task)) {
# if (taskinfo($generate_cdr_task_opt,$result,1)) {
# next unless check_dry();
# $result &= generate_cdr_task(\@messages);
# $completion |= 1;
# }
} else {
$result = 0;
scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
last;
}
}
} else {
$result = 0;
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
}
push(@attachmentfiles,$attachmentlogfile);
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
return $result;
}
sub taskinfo {
my ($task,$result) = @_;
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
return $result;
}
sub cleanup_task {
my ($messages,$clean_generated) = @_;
my $result = 0;
if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) {
eval {
cleanupcvsdirs() if $clean_generated;
cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
cleanupcertfiles();
cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
}
if ($@ or !$result) {
push(@$messages,'working directory cleanup INCOMPLETE');
return 0;
} else {
push(@$messages,'working directory folders cleaned up');
return 1;
}
}
sub export_cdr_task {
my ($messages) = @_;
my ($result) = (0);
eval {
($result) = export_cdrs();
};
my $err = $@;
my $stats = ":";
eval {
#stats .= "\n total CDRs: " .
# NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::countby_ratingstatus(undef) . ' rows';
};
if ($err or !$result) {
push(@$messages,"export cdrs INCOMPLETE$stats");
} else {
push(@$messages,"export cdrs completed$stats");
}
destroy_dbs();
return $result;
}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!

@ -0,0 +1,11 @@
#dry=0
#skip_errors=0
providers_yml = providers.yml
export_cdr_multithreading = 1
export_cdr_blocksize = 1000
export_cdr_joins = { 'billing.voip_subscribers source_voip_subscribers' => { 'source_voip_subscribers.uuid' => 'accounting.cdr.source_user_id' } }, { 'billing.voip_subscribers destination_voip_subscribers' => { 'destination_voip_subscribers.uuid' => 'accounting.cdr.destination_user_id' } }, { 'billing.billing_zones_history source_carrier_bbz' => { 'source_carrier_bbz.id' => 'accounting.cdr.source_carrier_billing_zone_id' } }, { 'billing.billing_zones_history source_reseller_bbz' => { 'source_reseller_bbz.id' => 'accounting.cdr.source_reseller_billing_zone_id' } }, { 'billing.billing_zones_history source_customer_bbz' => { 'source_customer_bbz.id' => 'accounting.cdr.source_customer_billing_zone_id' } }, { 'billing.billing_zones_history destination_carrier_bbz' => { 'destination_carrier_bbz.id' => 'accounting.cdr.destination_carrier_billing_zone_id' } }, { 'billing.billing_zones_history destination_reseller_bbz' => { 'destination_reseller_bbz.id' => 'accounting.cdr.destination_reseller_billing_zone_id' } }, { 'billing.billing_zones_history destination_customer_bbz' => { 'destination_customer_bbz.id' => 'accounting.cdr.destination_customer_billing_zone_id' } }
export_cdr_conditions = { 'accounting.cdr.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } }
export_cdr_limit = 300000

@ -0,0 +1,15 @@
#dry=0
#skip_errors=0
providers_yml = providers.yml
export_cdr_multithreading = 1
export_cdr_blocksize = 1000
#export_cdr_joins = { 'accounting.cdr_export_status_data esd' => { 'esd.cdr_id' => 'accounting.cdr.id' } }, { 'accounting.cdr_export_status es' => { 'es.id' => 'esd.status_id' } }
#export_cdr_conditions = { 'es.type' => { '=' => '"default"' } }, { 'esd.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }
#, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } }
export_cdr_conditions = { 'accounting.cdr.call_status' => { '=' => '"ok"' } }
export_cdr_stream = default
#export_cdr_limit = 3500
#300000

@ -51,7 +51,7 @@ my $net_read_timeout = 300;
#my $lock_do_chunk = 0; #1;
#my $lock_get_chunk = 0;
my $rowblock_transactional = 1;
my $default_rowblock_transactional = 1;
my $serialization_level = ''; #'SERIALIZABLE'
@ -60,9 +60,11 @@ our $READ_COMMITTED = 'READ COMMITTED';
sub new {
my $class = shift;
my $rowblock_transactional = shift;
my $self = NGCP::BulkProcessor::SqlConnector->new(@_);
$self->{rowblock_transactional} = $rowblock_transactional // $default_rowblock_transactional;
$self->{host} = undef;
$self->{port} = undef;
$self->{databasename} = undef;
@ -455,7 +457,7 @@ sub multithreading_supported {
sub rowblock_transactional {
my $self = shift;
return $rowblock_transactional;
return $self->{rowblock_transactional};
}
@ -536,7 +538,7 @@ sub db_do_begin {
my $query = shift;
#my $tablename = shift;
$self->SUPER::db_do_begin($query,$rowblock_transactional,@_);
$self->SUPER::db_do_begin($query,$self->{rowblock_transactional},@_);
}
@ -547,7 +549,7 @@ sub db_get_begin {
#my $tablename = shift;
#my $lock = shift;
$self->SUPER::db_get_begin($query,$rowblock_transactional,@_);
$self->SUPER::db_get_begin($query,$self->{rowblock_transactional},@_);
}
@ -557,7 +559,7 @@ sub db_finish {
#my $unlock = shift;
my $rollback = shift;
$self->SUPER::db_finish($rowblock_transactional,$rollback);
$self->SUPER::db_finish($self->{rowblock_transactional},$rollback);
}

Loading…
Cancel
Save