Change-Id: Ie3b6aef065d236c1d80589b3efc8c0545dea4d6amr10.4.1
parent
6fd5f751f9
commit
d812c82aa6
@ -0,0 +1,287 @@
|
|||||||
|
package NGCP::BulkProcessor::Dao::Trunk::accounting::events;
|
||||||
|
use strict;
|
||||||
|
|
||||||
|
#use Tie::IxHash;
|
||||||
|
## no critic
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Logging qw(
|
||||||
|
getlogger
|
||||||
|
rowsdeleted
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::ConnectorPool qw(
|
||||||
|
get_accounting_db
|
||||||
|
destroy_dbs
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::SqlProcessor qw(
|
||||||
|
checktableinfo
|
||||||
|
|
||||||
|
copy_row
|
||||||
|
|
||||||
|
process_table
|
||||||
|
);
|
||||||
|
use NGCP::BulkProcessor::SqlRecord qw();
|
||||||
|
|
||||||
|
require Exporter;
|
||||||
|
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
|
||||||
|
our @EXPORT_OK = qw(
|
||||||
|
gettablename
|
||||||
|
check_table
|
||||||
|
|
||||||
|
update_row
|
||||||
|
insert_row
|
||||||
|
|
||||||
|
process_subscribers
|
||||||
|
process_events
|
||||||
|
findby_subscriberid
|
||||||
|
);
|
||||||
|
|
||||||
|
my $tablename = 'events';
|
||||||
|
my $get_db = \&get_accounting_db;
|
||||||
|
|
||||||
|
my $expected_fieldnames = [
|
||||||
|
'id',
|
||||||
|
'type',
|
||||||
|
'subscriber_id',
|
||||||
|
'reseller_id',
|
||||||
|
'old_status',
|
||||||
|
'new_status',
|
||||||
|
'timestamp',
|
||||||
|
'export_status',
|
||||||
|
'exported_at',
|
||||||
|
];
|
||||||
|
|
||||||
|
my $indexes = {};
|
||||||
|
|
||||||
|
my $insert_unique_fields = [];
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
|
||||||
|
my $class = shift;
|
||||||
|
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
|
||||||
|
$tablename,$expected_fieldnames,$indexes);
|
||||||
|
|
||||||
|
copy_row($self,shift,$expected_fieldnames);
|
||||||
|
|
||||||
|
return $self;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub findby_subscriberid {
|
||||||
|
|
||||||
|
my ($xa_db,$subscriber_id,$joins,$conditions,$load_recursive) = @_;
|
||||||
|
|
||||||
|
check_table();
|
||||||
|
my $db = &$get_db();
|
||||||
|
$xa_db //= $db;
|
||||||
|
my $table = $db->tableidentifier($tablename);
|
||||||
|
|
||||||
|
my @conditions = @{$conditions // []};
|
||||||
|
push(@conditions,{ $table . '.subscriber_id' => { '=' => '?' } });
|
||||||
|
my $stmt = 'SELECT ' . join(',', map { $table . '.' . $db->columnidentifier($_); } @$expected_fieldnames) . ' ' .
|
||||||
|
_get_export_stmt($db,$joins,\@conditions) .
|
||||||
|
' ORDER BY ' . $table . '.id ASC';
|
||||||
|
my @params = ($subscriber_id);
|
||||||
|
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
|
||||||
|
|
||||||
|
return buildrecords_fromrows($rows,$load_recursive);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub process_subscribers {
|
||||||
|
|
||||||
|
my %params = @_;
|
||||||
|
my ($process_code,
|
||||||
|
$static_context,
|
||||||
|
$init_process_context_code,
|
||||||
|
$uninit_process_context_code,
|
||||||
|
$multithreading,
|
||||||
|
$numofthreads,
|
||||||
|
$blocksize,
|
||||||
|
$joins,
|
||||||
|
$conditions,
|
||||||
|
#$sort,
|
||||||
|
$limit) = @params{qw/
|
||||||
|
process_code
|
||||||
|
static_context
|
||||||
|
init_process_context_code
|
||||||
|
uninit_process_context_code
|
||||||
|
multithreading
|
||||||
|
numofthreads
|
||||||
|
blocksize
|
||||||
|
joins
|
||||||
|
conditions
|
||||||
|
limit
|
||||||
|
/};
|
||||||
|
#sort
|
||||||
|
|
||||||
|
check_table();
|
||||||
|
my $db = &$get_db();
|
||||||
|
my $table = $db->tableidentifier($tablename);
|
||||||
|
|
||||||
|
my $select_stmt;
|
||||||
|
my $count_stmt;
|
||||||
|
my $select_format = 'SELECT ' . $table . '.' . $db->columnidentifier('subscriber_id') . ' %s GROUP BY ' . $table . '.' . $db->columnidentifier('subscriber_id');
|
||||||
|
my $count_format = 'SELECT COUNT(1) FROM (%s) AS __cnt';
|
||||||
|
|
||||||
|
$select_stmt = sprintf($select_format,_get_export_stmt_part($db,$joins,$conditions));
|
||||||
|
$count_stmt = sprintf($count_format,$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,$joins,$conditions),0,$limit,undef));
|
||||||
|
|
||||||
|
return process_table(
|
||||||
|
get_db => $get_db,
|
||||||
|
class => __PACKAGE__,
|
||||||
|
process_code => sub {
|
||||||
|
my ($context,$rowblock,$row_offset) = @_;
|
||||||
|
return &$process_code($context,$rowblock,$row_offset);
|
||||||
|
},
|
||||||
|
static_context => $static_context,
|
||||||
|
init_process_context_code => $init_process_context_code,
|
||||||
|
uninit_process_context_code => $uninit_process_context_code,
|
||||||
|
destroy_reader_dbs_code => \&destroy_dbs,
|
||||||
|
multithreading => $multithreading,
|
||||||
|
tableprocessing_threads => $numofthreads,
|
||||||
|
blocksize => $blocksize,
|
||||||
|
select => $select_stmt,
|
||||||
|
selectcount => $count_stmt,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
sub process_events {
|
||||||
|
|
||||||
|
my %params = @_;
|
||||||
|
my ($process_code,
|
||||||
|
$static_context,
|
||||||
|
$init_process_context_code,
|
||||||
|
$uninit_process_context_code,
|
||||||
|
$multithreading,
|
||||||
|
$numofthreads,
|
||||||
|
$blocksize,
|
||||||
|
$joins,
|
||||||
|
$conditions,
|
||||||
|
$load_recursive,
|
||||||
|
$limit) = @params{qw/
|
||||||
|
process_code
|
||||||
|
static_context
|
||||||
|
init_process_context_code
|
||||||
|
uninit_process_context_code
|
||||||
|
multithreading
|
||||||
|
numofthreads
|
||||||
|
blocksize
|
||||||
|
joins
|
||||||
|
conditions
|
||||||
|
load_recursive
|
||||||
|
limit
|
||||||
|
/};
|
||||||
|
#sort
|
||||||
|
|
||||||
|
check_table();
|
||||||
|
my $db = &$get_db();
|
||||||
|
my $table = $db->tableidentifier($tablename);
|
||||||
|
|
||||||
|
my $select_stmt;
|
||||||
|
my $count_stmt;
|
||||||
|
my $select_format = 'SELECT * %s';
|
||||||
|
my $count_format = 'SELECT COUNT(1) FROM (%s) AS __cnt';
|
||||||
|
|
||||||
|
$select_stmt = $db->paginate_sort_query(sprintf($select_format,_get_export_stmt_part($db,$joins,$conditions)),undef,undef,[
|
||||||
|
{ numeric => 1,
|
||||||
|
dir => 1, #-1,
|
||||||
|
memberchain => [ 'id' ],
|
||||||
|
}
|
||||||
|
]);
|
||||||
|
$count_stmt = sprintf($count_format,$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,$joins,$conditions),0,$limit,undef));
|
||||||
|
|
||||||
|
return process_table(
|
||||||
|
get_db => $get_db,
|
||||||
|
class => __PACKAGE__,
|
||||||
|
process_code => sub {
|
||||||
|
my ($context,$rowblock,$row_offset) = @_;
|
||||||
|
return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset);
|
||||||
|
},
|
||||||
|
static_context => $static_context,
|
||||||
|
init_process_context_code => $init_process_context_code,
|
||||||
|
uninit_process_context_code => $uninit_process_context_code,
|
||||||
|
destroy_reader_dbs_code => \&destroy_dbs,
|
||||||
|
multithreading => $multithreading,
|
||||||
|
tableprocessing_threads => $numofthreads,
|
||||||
|
blocksize => $blocksize,
|
||||||
|
select => $select_stmt,
|
||||||
|
selectcount => $count_stmt,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _get_export_stmt {
|
||||||
|
|
||||||
|
my ($db,$joins,$conditions) = @_;
|
||||||
|
return _get_export_stmt_part($db,undef,$joins,$conditions);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _get_export_stmt_part {
|
||||||
|
|
||||||
|
my ($db,$static_context,$joins,$conditions) = @_;
|
||||||
|
|
||||||
|
my $table = $db->tableidentifier($tablename);
|
||||||
|
|
||||||
|
my $stmt = "FROM " . $table;
|
||||||
|
my @intjoins = ();
|
||||||
|
if (defined $joins and (scalar @$joins) > 0) {
|
||||||
|
foreach my $f (@$joins) {
|
||||||
|
my ($table, $keys) = %{ $f };
|
||||||
|
my ($foreign_key, $own_key) = %{ $keys };
|
||||||
|
push @intjoins, "LEFT JOIN $table ON $foreign_key = $own_key";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
my @conds = ();
|
||||||
|
$stmt .= " " . join(" ", @intjoins) if (scalar @intjoins) > 0;
|
||||||
|
if (defined $conditions and (scalar @$conditions) > 0) {
|
||||||
|
foreach my $f (@$conditions) {
|
||||||
|
my ($field, $match) = %{ $f };
|
||||||
|
my ($op, $val) = %{ $match };
|
||||||
|
push @conds, "$field $op $val";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$stmt .= " WHERE " . join(" AND ", @conds) if (scalar @conds) > 0;
|
||||||
|
return $stmt;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub buildrecords_fromrows {
|
||||||
|
|
||||||
|
my ($rows,$load_recursive) = @_;
|
||||||
|
|
||||||
|
my @records = ();
|
||||||
|
my $record;
|
||||||
|
|
||||||
|
if (defined $rows and ref $rows eq 'ARRAY') {
|
||||||
|
foreach my $row (@$rows) {
|
||||||
|
$record = __PACKAGE__->new($row);
|
||||||
|
|
||||||
|
# transformations go here ...
|
||||||
|
|
||||||
|
push @records,$record;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return \@records;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub gettablename {
|
||||||
|
|
||||||
|
return $tablename;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub check_table {
|
||||||
|
|
||||||
|
return checktableinfo($get_db,
|
||||||
|
__PACKAGE__,$tablename,
|
||||||
|
$expected_fieldnames,
|
||||||
|
$indexes);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
||||||
@ -0,0 +1,157 @@
|
|||||||
|
package NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents;
|
||||||
|
use strict;
|
||||||
|
|
||||||
|
## no critic
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::ProjectConnectorPool qw(
|
||||||
|
get_sqlite_db
|
||||||
|
destroy_all_dbs
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::SqlProcessor qw(
|
||||||
|
registertableinfo
|
||||||
|
create_targettable
|
||||||
|
checktableinfo
|
||||||
|
copy_row
|
||||||
|
insert_stmt
|
||||||
|
transfer_table
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::SqlRecord qw();
|
||||||
|
|
||||||
|
require Exporter;
|
||||||
|
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
|
||||||
|
our @EXPORT_OK = qw(
|
||||||
|
create_table
|
||||||
|
gettablename
|
||||||
|
check_table
|
||||||
|
getinsertstatement
|
||||||
|
|
||||||
|
copy_table
|
||||||
|
);
|
||||||
|
|
||||||
|
my $tablename = 'period_events';
|
||||||
|
my $get_db = \&get_sqlite_db;
|
||||||
|
|
||||||
|
my $fieldnames;
|
||||||
|
my $expected_fieldnames = [
|
||||||
|
'subscriber_id',
|
||||||
|
'profile_id',
|
||||||
|
'start_profile',
|
||||||
|
'update_profile',
|
||||||
|
'stop_profile',
|
||||||
|
];
|
||||||
|
|
||||||
|
my $primarykey_fieldnames = [];
|
||||||
|
my $indexes = {
|
||||||
|
$tablename . '_suscriber_id' => [ 'subscriber_id(11)' ],
|
||||||
|
};
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
|
||||||
|
my $class = shift;
|
||||||
|
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
|
||||||
|
$tablename,$expected_fieldnames,$indexes);
|
||||||
|
|
||||||
|
copy_row($self,shift,$expected_fieldnames);
|
||||||
|
|
||||||
|
return $self;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub create_table {
|
||||||
|
|
||||||
|
my ($truncate) = @_;
|
||||||
|
|
||||||
|
my $db = &$get_db();
|
||||||
|
|
||||||
|
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
|
||||||
|
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,1,undef);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub findby_domainusername {
|
||||||
|
|
||||||
|
my ($domain,$username,$load_recursive) = @_;
|
||||||
|
|
||||||
|
check_table();
|
||||||
|
my $db = &$get_db();
|
||||||
|
my $table = $db->tableidentifier($tablename);
|
||||||
|
|
||||||
|
return [] unless (defined $domain and defined $username);
|
||||||
|
|
||||||
|
my $rows = $db->db_get_all_arrayref(
|
||||||
|
'SELECT * FROM ' . $table .
|
||||||
|
' WHERE ' . $db->columnidentifier('domain') . ' = ?' .
|
||||||
|
' AND ' . $db->columnidentifier('username') . ' = ?'
|
||||||
|
, $domain, $username);
|
||||||
|
|
||||||
|
return buildrecords_fromrows($rows,$load_recursive)->[0];
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub copy_table {
|
||||||
|
|
||||||
|
my ($get_target_db) = @_;
|
||||||
|
|
||||||
|
check_table();
|
||||||
|
#checktableinfo($get_target_db,
|
||||||
|
# __PACKAGE__,$tablename,
|
||||||
|
# get_fieldnames(1),
|
||||||
|
# $indexes);
|
||||||
|
|
||||||
|
return transfer_table(
|
||||||
|
get_db => $get_db,
|
||||||
|
class => __PACKAGE__,
|
||||||
|
get_target_db => $get_target_db,
|
||||||
|
targetclass => __PACKAGE__,
|
||||||
|
targettablename => $tablename,
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub buildrecords_fromrows {
|
||||||
|
|
||||||
|
my ($rows,$load_recursive) = @_;
|
||||||
|
|
||||||
|
my @records = ();
|
||||||
|
my $record;
|
||||||
|
|
||||||
|
if (defined $rows and ref $rows eq 'ARRAY') {
|
||||||
|
foreach my $row (@$rows) {
|
||||||
|
$record = __PACKAGE__->new($row);
|
||||||
|
|
||||||
|
# transformations go here ...
|
||||||
|
|
||||||
|
push @records,$record;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return \@records;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub getinsertstatement {
|
||||||
|
|
||||||
|
my ($insert_ignore) = @_;
|
||||||
|
check_table();
|
||||||
|
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub gettablename {
|
||||||
|
|
||||||
|
return $tablename;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub check_table {
|
||||||
|
|
||||||
|
return checktableinfo($get_db,
|
||||||
|
__PACKAGE__,$tablename,
|
||||||
|
$expected_fieldnames,
|
||||||
|
$indexes);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
||||||
@ -0,0 +1,271 @@
|
|||||||
|
package NGCP::BulkProcessor::Projects::ETL::EDR::ExportEvents;
|
||||||
|
use strict;
|
||||||
|
|
||||||
|
## no critic
|
||||||
|
|
||||||
|
use threads::shared qw();
|
||||||
|
|
||||||
|
use Tie::IxHash;
|
||||||
|
|
||||||
|
#use NGCP::BulkProcessor::Serialization qw();
|
||||||
|
#use Scalar::Util qw(blessed);
|
||||||
|
#use MIME::Base64 qw(encode_base64);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::Settings qw(
|
||||||
|
$dry
|
||||||
|
$skip_errors
|
||||||
|
|
||||||
|
$export_subscriber_profiles_multithreading
|
||||||
|
$export_subscriber_profiles_numofthreads
|
||||||
|
$export_subscriber_profiles_blocksize
|
||||||
|
$export_subscriber_profiles_joins
|
||||||
|
$export_subscriber_profiles_conditions
|
||||||
|
$export_subscriber_profiles_limit
|
||||||
|
|
||||||
|
$period_events_single_row_txn
|
||||||
|
$ignore_period_events_unique
|
||||||
|
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Logging qw (
|
||||||
|
getlogger
|
||||||
|
processing_info
|
||||||
|
processing_debug
|
||||||
|
);
|
||||||
|
use NGCP::BulkProcessor::LogError qw(
|
||||||
|
rowprocessingerror
|
||||||
|
rowprocessingwarn
|
||||||
|
fileerror
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Dao::Trunk::accounting::events qw();
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents qw();
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::ProjectConnectorPool qw(
|
||||||
|
get_sqlite_db
|
||||||
|
destroy_all_dbs
|
||||||
|
ping_all_dbs
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp stringtobool trim); #check_ipnet
|
||||||
|
use NGCP::BulkProcessor::DSSorter qw(sort_by_configs);
|
||||||
|
use NGCP::BulkProcessor::Array qw(contains);
|
||||||
|
use NGCP::BulkProcessor::Calendar qw(from_epoch datetime_to_string);
|
||||||
|
#use NGCP::BulkProcessor::DSPath qw();
|
||||||
|
|
||||||
|
require Exporter;
|
||||||
|
our @ISA = qw(Exporter);
|
||||||
|
our @EXPORT_OK = qw(
|
||||||
|
export_subscriber_profiles
|
||||||
|
);
|
||||||
|
|
||||||
|
sub export_subscriber_profiles {
|
||||||
|
|
||||||
|
my $result = NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents::create_table(1);
|
||||||
|
|
||||||
|
my $static_context = {};
|
||||||
|
|
||||||
|
destroy_all_dbs();
|
||||||
|
my $warning_count :shared = 0;
|
||||||
|
return ($result && NGCP::BulkProcessor::Dao::Trunk::accounting::events::process_subscribers(
|
||||||
|
static_context => $static_context,
|
||||||
|
process_code => sub {
|
||||||
|
my ($context,$records,$row_offset) = @_;
|
||||||
|
ping_all_dbs();
|
||||||
|
my @period_event_rows = ();
|
||||||
|
foreach my $subscriber_id (map { $_->[0]; } @$records) {
|
||||||
|
if ($subscriber_id == 202) {
|
||||||
|
my $x=1;
|
||||||
|
print "blah";
|
||||||
|
}
|
||||||
|
next unless _export_subscriber_profiles_init_context($context,$subscriber_id);
|
||||||
|
push(@period_event_rows, _get_period_event_rows($context));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if ($period_events_single_row_txn and (scalar @period_event_rows) > 0) {
|
||||||
|
while (defined (my $period_event_row = shift @period_event_rows)) {
|
||||||
|
if ($skip_errors) {
|
||||||
|
eval { _insert_period_events_rows($context,[$period_event_row]); };
|
||||||
|
_warn($context,$@) if $@;
|
||||||
|
} else {
|
||||||
|
_insert_period_events_rows($context,[$period_event_row]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (not $period_events_single_row_txn and (scalar @period_event_rows) > 0) {
|
||||||
|
if ($skip_errors) {
|
||||||
|
eval { insert_period_events_rows($context,\@period_event_rows); };
|
||||||
|
_warn($context,$@) if $@;
|
||||||
|
} else {
|
||||||
|
insert_period_events_rows($context,\@period_event_rows);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
},
|
||||||
|
init_process_context_code => sub {
|
||||||
|
my ($context)= @_;
|
||||||
|
$context->{db} = &get_sqlite_db();
|
||||||
|
$context->{error_count} = 0;
|
||||||
|
$context->{warning_count} = 0;
|
||||||
|
},
|
||||||
|
uninit_process_context_code => sub {
|
||||||
|
my ($context)= @_;
|
||||||
|
undef $context->{db};
|
||||||
|
destroy_all_dbs();
|
||||||
|
{
|
||||||
|
lock $warning_count;
|
||||||
|
$warning_count += $context->{warning_count};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
destroy_reader_dbs_code => \&destroy_all_dbs,
|
||||||
|
blocksize => $export_subscriber_profiles_blocksize,
|
||||||
|
multithreading => $export_subscriber_profiles_multithreading,
|
||||||
|
numofthreads => $export_subscriber_profiles_numofthreads,
|
||||||
|
joins => $export_subscriber_profiles_joins,
|
||||||
|
conditions => $export_subscriber_profiles_conditions,
|
||||||
|
#sort => [{ column => 'id', numeric => 1, dir => 1 }],
|
||||||
|
#limit => $export_subscriber_profiles_limit,
|
||||||
|
),$warning_count,);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _export_subscriber_profiles_init_context {
|
||||||
|
|
||||||
|
my ($context,$subscriber_id) = @_;
|
||||||
|
|
||||||
|
my $result = 1;
|
||||||
|
|
||||||
|
$context->{events} = NGCP::BulkProcessor::Dao::Trunk::accounting::events::findby_subscriberid(
|
||||||
|
undef,$subscriber_id,$export_subscriber_profiles_joins,$export_subscriber_profiles_conditions);
|
||||||
|
|
||||||
|
$context->{subscriber_id} = $subscriber_id;
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _get_period_event_rows {
|
||||||
|
|
||||||
|
my ($context) = @_;
|
||||||
|
|
||||||
|
my $profile_events = {
|
||||||
|
start => undef,
|
||||||
|
update => [],
|
||||||
|
stop => undef,
|
||||||
|
};
|
||||||
|
my $last_event;
|
||||||
|
|
||||||
|
my %subscriber_profiles = ();
|
||||||
|
tie(%subscriber_profiles, 'Tie::IxHash');
|
||||||
|
|
||||||
|
foreach my $event (@{sort_by_configs([ grep { contains($_->{type},[ qw(start_profile update_profile end_profile) ]); } @{$context->{events}} ],[
|
||||||
|
{ numeric => 1,
|
||||||
|
dir => 1, #-1,
|
||||||
|
memberchain => [ 'id' ],
|
||||||
|
}
|
||||||
|
])}) {
|
||||||
|
if ($event->{type} eq 'start_profile') {
|
||||||
|
if (not defined $last_event or $last_event->{type} eq 'end_profile') {
|
||||||
|
$profile_events->{start} = $event;
|
||||||
|
$last_event = $event;
|
||||||
|
$subscriber_profiles{$event->{new_status}} = $profile_events;
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
} elsif ($event->{type} eq 'update_profile') {
|
||||||
|
if (defined $last_event and contains($last_event->{type},[ qw(start_profile update_profile) ])) {
|
||||||
|
push(@{$profile_events->{update}},$event);
|
||||||
|
$last_event = $event;
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
} elsif ($event->{type} eq 'end_profile') {
|
||||||
|
if (defined $last_event and contains($last_event->{type},[ qw(start_profile update_profile) ])) {
|
||||||
|
$profile_events->{stop} = $event;
|
||||||
|
$last_event = $event;
|
||||||
|
$profile_events = {
|
||||||
|
start => undef,
|
||||||
|
update => [],
|
||||||
|
stop => undef,
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
my @period_event_rows = ();
|
||||||
|
foreach my $profile_id (keys %subscriber_profiles) {
|
||||||
|
$profile_events = $subscriber_profiles{$profile_id};
|
||||||
|
push(@period_event_rows,[
|
||||||
|
$context->{subscriber_id},
|
||||||
|
$profile_id,
|
||||||
|
datetime_to_string(from_epoch($profile_events->{start}->{timestamp})),
|
||||||
|
join(",",map { datetime_to_string(from_epoch($_)); } @{$profile_events->{update}}),
|
||||||
|
(defined $profile_events->{stop} ? datetime_to_string(from_epoch($profile_events->{stop}->{timestamp})) : undef),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return @period_event_rows;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _insert_period_events_rows {
|
||||||
|
my ($context,$subscriber_rows) = @_;
|
||||||
|
$context->{db}->db_do_begin(
|
||||||
|
NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents::getinsertstatement($ignore_period_events_unique),
|
||||||
|
);
|
||||||
|
eval {
|
||||||
|
$context->{db}->db_do_rowblock($subscriber_rows);
|
||||||
|
$context->{db}->db_finish();
|
||||||
|
};
|
||||||
|
my $err = $@;
|
||||||
|
if ($err) {
|
||||||
|
eval {
|
||||||
|
$context->{db}->db_finish(1);
|
||||||
|
};
|
||||||
|
die($err);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
sub _error {
|
||||||
|
|
||||||
|
my ($context,$message) = @_;
|
||||||
|
$context->{error_count} = $context->{error_count} + 1;
|
||||||
|
rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _warn {
|
||||||
|
|
||||||
|
my ($context,$message) = @_;
|
||||||
|
$context->{warning_count} = $context->{warning_count} + 1;
|
||||||
|
rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _info {
|
||||||
|
|
||||||
|
my ($context,$message,$debug) = @_;
|
||||||
|
if ($debug) {
|
||||||
|
processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||||
|
} else {
|
||||||
|
processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _debug {
|
||||||
|
|
||||||
|
my ($context,$message,$debug) = @_;
|
||||||
|
processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
||||||
@ -0,0 +1,120 @@
|
|||||||
|
package NGCP::BulkProcessor::Projects::ETL::EDR::ProjectConnectorPool;
|
||||||
|
use strict;
|
||||||
|
|
||||||
|
## no critic
|
||||||
|
|
||||||
|
use File::Basename;
|
||||||
|
use Cwd;
|
||||||
|
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../');
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::Settings qw(
|
||||||
|
$csv_dir
|
||||||
|
$sqlite_db_file
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::ConnectorPool qw(
|
||||||
|
get_connectorinstancename
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::SqlConnectors::CSVDB qw();
|
||||||
|
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
|
||||||
|
|
||||||
|
require Exporter;
|
||||||
|
our @ISA = qw(Exporter);
|
||||||
|
our @EXPORT_OK = qw(
|
||||||
|
|
||||||
|
get_sqlite_db
|
||||||
|
sqlite_db_tableidentifier
|
||||||
|
|
||||||
|
get_csv_db
|
||||||
|
csv_db_tableidentifier
|
||||||
|
|
||||||
|
destroy_dbs
|
||||||
|
destroy_all_dbs
|
||||||
|
ping_all_dbs
|
||||||
|
|
||||||
|
);
|
||||||
|
|
||||||
|
my $sqlite_dbs = {};
|
||||||
|
my $csv_dbs = {};
|
||||||
|
|
||||||
|
sub get_sqlite_db {
|
||||||
|
|
||||||
|
my ($instance_name,$reconnect) = @_;
|
||||||
|
my $name = get_connectorinstancename($instance_name);
|
||||||
|
|
||||||
|
if (not defined $sqlite_dbs->{$name}) {
|
||||||
|
$sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name);
|
||||||
|
if (not defined $reconnect) {
|
||||||
|
$reconnect = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ($reconnect) {
|
||||||
|
$sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $sqlite_dbs->{$name};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub sqlite_db_tableidentifier {
|
||||||
|
|
||||||
|
my ($get_target_db,$tablename) = @_;
|
||||||
|
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
||||||
|
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub get_csv_db {
|
||||||
|
|
||||||
|
my ($instance_name,$reconnect) = @_;
|
||||||
|
my $name = get_connectorinstancename($instance_name);
|
||||||
|
if (not defined $csv_dbs->{$name}) {
|
||||||
|
$csv_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::CSVDB->new($instance_name);
|
||||||
|
if (not defined $reconnect) {
|
||||||
|
$reconnect = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ($reconnect) {
|
||||||
|
$csv_dbs->{$name}->db_connect($csv_dir);
|
||||||
|
}
|
||||||
|
return $csv_dbs->{$name};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub csv_db_tableidentifier {
|
||||||
|
|
||||||
|
my ($get_target_db,$tablename) = @_;
|
||||||
|
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
||||||
|
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::CSVDB::get_tableidentifier($tablename,$csv_dir));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub destroy_dbs {
|
||||||
|
|
||||||
|
foreach my $name (keys %$sqlite_dbs) {
|
||||||
|
cleartableinfo($sqlite_dbs->{$name});
|
||||||
|
undef $sqlite_dbs->{$name};
|
||||||
|
delete $sqlite_dbs->{$name};
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach my $name (keys %$csv_dbs) {
|
||||||
|
cleartableinfo($csv_dbs->{$name});
|
||||||
|
undef $csv_dbs->{$name};
|
||||||
|
delete $csv_dbs->{$name};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub destroy_all_dbs() {
|
||||||
|
destroy_dbs();
|
||||||
|
NGCP::BulkProcessor::ConnectorPool::destroy_dbs();
|
||||||
|
}
|
||||||
|
|
||||||
|
sub ping_all_dbs() {
|
||||||
|
NGCP::BulkProcessor::ConnectorPool::ping_dbs();
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
||||||
@ -0,0 +1,235 @@
|
|||||||
|
package NGCP::BulkProcessor::Projects::ETL::EDR::Settings;
|
||||||
|
use strict;
|
||||||
|
|
||||||
|
## no critic
|
||||||
|
|
||||||
|
use File::Basename qw(fileparse);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Globals qw(
|
||||||
|
$working_path
|
||||||
|
$enablemultithreading
|
||||||
|
$cpucount
|
||||||
|
create_path
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Logging qw(
|
||||||
|
getlogger
|
||||||
|
scriptinfo
|
||||||
|
configurationinfo
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::LogError qw(
|
||||||
|
fileerror
|
||||||
|
filewarn
|
||||||
|
configurationwarn
|
||||||
|
configurationerror
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::LoadConfig qw(
|
||||||
|
split_tuple
|
||||||
|
parse_regexp
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid load_module);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Array qw(contains);
|
||||||
|
|
||||||
|
require Exporter;
|
||||||
|
our @ISA = qw(Exporter);
|
||||||
|
our @EXPORT_OK = qw(
|
||||||
|
update_settings
|
||||||
|
|
||||||
|
get_export_filename
|
||||||
|
|
||||||
|
$ignore_period_events_unique
|
||||||
|
$period_events_single_row_txn
|
||||||
|
|
||||||
|
$sqlite_db_file
|
||||||
|
$csv_dir
|
||||||
|
|
||||||
|
check_dry
|
||||||
|
|
||||||
|
$output_path
|
||||||
|
$input_path
|
||||||
|
|
||||||
|
$subscriber_profiles_export_filename_format
|
||||||
|
|
||||||
|
$defaultsettings
|
||||||
|
$defaultconfig
|
||||||
|
|
||||||
|
$dry
|
||||||
|
$skip_errors
|
||||||
|
$force
|
||||||
|
|
||||||
|
$export_subscriber_profiles_multithreading
|
||||||
|
$export_subscriber_profiles_numofthreads
|
||||||
|
$export_subscriber_profiles_blocksize
|
||||||
|
|
||||||
|
$export_subscriber_profiles_joins
|
||||||
|
$export_subscriber_profiles_conditions
|
||||||
|
$export_subscriber_profiles_limit
|
||||||
|
|
||||||
|
);
|
||||||
|
|
||||||
|
our $defaultconfig = 'config.cfg';
|
||||||
|
our $defaultsettings = 'settings.cfg';
|
||||||
|
|
||||||
|
our $ignore_period_events_unique = 0;
|
||||||
|
our $period_events_single_row_txn = 1;
|
||||||
|
|
||||||
|
our $output_path = $working_path . 'output/';
|
||||||
|
our $input_path = $working_path . 'input/';
|
||||||
|
our $csv_dir = 'events';
|
||||||
|
|
||||||
|
our $subscriber_profiles_export_filename_format = undef;
|
||||||
|
|
||||||
|
our $force = 0;
|
||||||
|
our $dry = 0;
|
||||||
|
our $skip_errors = 0;
|
||||||
|
|
||||||
|
our $sqlite_db_file = 'sqlite';
|
||||||
|
|
||||||
|
our $export_subscriber_profiles_multithreading = $enablemultithreading;
|
||||||
|
our $export_subscriber_profiles_numofthreads = $cpucount;
|
||||||
|
our $export_subscriber_profiles_blocksize = 1000;
|
||||||
|
|
||||||
|
our $export_subscriber_profiles_joins = [];
|
||||||
|
our $export_subscriber_profiles_conditions = [];
|
||||||
|
our $export_subscriber_profiles_limit = undef;
|
||||||
|
|
||||||
|
sub update_settings {
|
||||||
|
|
||||||
|
my ($data,$configfile) = @_;
|
||||||
|
|
||||||
|
if (defined $data) {
|
||||||
|
|
||||||
|
my $result = 1;
|
||||||
|
my $regexp_result;
|
||||||
|
|
||||||
|
#&$configurationinfocode("testinfomessage",$configlogger);
|
||||||
|
|
||||||
|
$result &= _prepare_working_paths(1);
|
||||||
|
|
||||||
|
$subscriber_profiles_export_filename_format = $data->{subscriber_profiles_export_filename} if exists $data->{subscriber_profiles_export_filename};
|
||||||
|
get_export_filename($data->{subscriber_profiles_export_filename},$configfile);
|
||||||
|
|
||||||
|
$sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file};
|
||||||
|
$csv_dir = $data->{csv_dir} if exists $data->{csv_dir};
|
||||||
|
|
||||||
|
$dry = $data->{dry} if exists $data->{dry};
|
||||||
|
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
|
||||||
|
|
||||||
|
my $parse_result;
|
||||||
|
($parse_result,$export_subscriber_profiles_joins) = _parse_export_joins($data->{export_subscriber_profiles_joins},$configfile);
|
||||||
|
$result &= $parse_result;
|
||||||
|
($parse_result,$export_subscriber_profiles_conditions) = _parse_export_conditions($data->{export_subscriber_profiles_conditions},$configfile);
|
||||||
|
$result &= $parse_result;
|
||||||
|
|
||||||
|
$export_subscriber_profiles_limit = $data->{export_subscriber_profiles_limit} if exists $data->{export_subscriber_profiles_limit};
|
||||||
|
|
||||||
|
$export_subscriber_profiles_multithreading = $data->{export_subscriber_profiles_multithreading} if exists $data->{export_subscriber_profiles_multithreading};
|
||||||
|
$export_subscriber_profiles_numofthreads = _get_numofthreads($cpucount,$data,'export_subscriber_profiles_numofthreads');
|
||||||
|
$export_subscriber_profiles_blocksize = $data->{export_subscriber_profiles_blocksize} if exists $data->{export_subscriber_profiles_blocksize};
|
||||||
|
|
||||||
|
$period_events_single_row_txn = $data->{period_events_single_row_txn} if exists $data->{period_events_single_row_txn};
|
||||||
|
$ignore_period_events_unique = $data->{ignore_period_events_unique} if exists $data->{ignore_period_events_unique};
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _prepare_working_paths {
|
||||||
|
|
||||||
|
my ($create) = @_;
|
||||||
|
my $result = 1;
|
||||||
|
my $path_result;
|
||||||
|
|
||||||
|
($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__));
|
||||||
|
$result &= $path_result;
|
||||||
|
($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__));
|
||||||
|
$result &= $path_result;
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _get_numofthreads {
|
||||||
|
my ($default_value,$data,$key) = @_;
|
||||||
|
my $numofthreads = $default_value;
|
||||||
|
$numofthreads = $data->{$key} if exists $data->{$key};
|
||||||
|
$numofthreads = $cpucount if $numofthreads > $cpucount;
|
||||||
|
return $numofthreads;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub get_export_filename {
|
||||||
|
my ($filename_format,$configfile) = @_;
|
||||||
|
my $export_filename;
|
||||||
|
my $export_format;
|
||||||
|
if ($filename_format) {
|
||||||
|
$export_filename = sprintf($filename_format,timestampdigits(),threadid());
|
||||||
|
unless ($export_filename =~ /^\//) {
|
||||||
|
$export_filename = $output_path . $export_filename;
|
||||||
|
}
|
||||||
|
if (-e $export_filename and (unlink $export_filename) == 0) {
|
||||||
|
filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__));
|
||||||
|
$export_filename = undef;
|
||||||
|
}
|
||||||
|
my ($name,$path,$suffix) = fileparse($export_filename,".csv");
|
||||||
|
if ($suffix eq '.csv') {
|
||||||
|
$export_format = 'csv';
|
||||||
|
} else {
|
||||||
|
configurationerror($configfile,"$filename_format: .csv export file format required");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ($export_filename,$export_format);
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _parse_export_joins {
|
||||||
|
my ($token,$file) = @_;
|
||||||
|
my @joins = ();
|
||||||
|
if (defined $token and length($token) > 0) {
|
||||||
|
foreach my $f (_split(\$token)) {
|
||||||
|
next unless($f);
|
||||||
|
$f =~ s/^\s*\{?\s*//;
|
||||||
|
$f =~ s/\}\s*\}\s*$/}/;
|
||||||
|
my ($a, $b) = split(/\s*=>\s*{\s*/, $f);
|
||||||
|
$a =~ s/^\s*\'//;
|
||||||
|
$a =~ s/\'$//g;
|
||||||
|
$b =~ s/\s*\}\s*$//;
|
||||||
|
my ($c, $d) = split(/\s*=>\s*/, $b);
|
||||||
|
$c =~ s/^\s*\'//g;
|
||||||
|
$c =~ s/\'\s*//;
|
||||||
|
$d =~ s/^\s*\'//g;
|
||||||
|
$d =~ s/\'\s*//;
|
||||||
|
push @joins, { $a => { $c => $d } };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (1,\@joins);
|
||||||
|
}
|
||||||
|
|
||||||
|
sub _parse_export_conditions {
|
||||||
|
my ($token,$file) = @_;
|
||||||
|
my @conditions = ();
|
||||||
|
if (defined $token and length($token) > 0) {
|
||||||
|
foreach my $f (_split(\$token)) {
|
||||||
|
next unless($f);
|
||||||
|
$f =~ s/^\s*\{?\s*//;
|
||||||
|
$f =~ s/\}\s*\}\s*$/}/;
|
||||||
|
my ($a, $b) = split(/\s*=>\s*{\s*/, $f);
|
||||||
|
$a =~ s/^\s*\'//;
|
||||||
|
$a =~ s/\'$//g;
|
||||||
|
$b =~ s/\s*\}\s*$//;
|
||||||
|
my ($c, $d) = split(/\s*=>\s*/, $b);
|
||||||
|
$c =~ s/^\s*\'//g;
|
||||||
|
$c =~ s/\'\s*//;
|
||||||
|
$d =~ s/^\s*\'//g;
|
||||||
|
$d =~ s/\'\s*//;
|
||||||
|
push @conditions, { $a => { $c => $d } };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (1,\@conditions);
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
||||||
@ -0,0 +1,61 @@
|
|||||||
|
##general settings:
|
||||||
|
working_path = /var/sipwise
|
||||||
|
cpucount = 4
|
||||||
|
enablemultithreading = 1
|
||||||
|
|
||||||
|
##gearman/service listener config:
|
||||||
|
jobservers = 127.0.0.1:4730
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - "accounting" db:
|
||||||
|
accounting_host = db01
|
||||||
|
accounting_port = 3306
|
||||||
|
accounting_databasename = accounting
|
||||||
|
accounting_username = root
|
||||||
|
accounting_password =
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - "billing" db:
|
||||||
|
billing_host = db01
|
||||||
|
billing_port = 3306
|
||||||
|
billing_databasename = billing
|
||||||
|
billing_username = root
|
||||||
|
billing_password =
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - "provisioning" db:
|
||||||
|
provisioning_host = db01
|
||||||
|
provisioning_port = 3306
|
||||||
|
provisioning_databasename = provisioning
|
||||||
|
provisioning_username = root
|
||||||
|
provisioning_password =
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - "kamailio" db:
|
||||||
|
kamailio_host = db01
|
||||||
|
kamailio_port = 3306
|
||||||
|
kamailio_databasename = kamailio
|
||||||
|
kamailio_username = root
|
||||||
|
kamailio_password =
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
|
||||||
|
xa_host = db01
|
||||||
|
xa_port = 3306
|
||||||
|
xa_databasename = ngcp
|
||||||
|
xa_username = root
|
||||||
|
xa_password =
|
||||||
|
|
||||||
|
##NGCP REST-API connectivity:
|
||||||
|
ngcprestapi_uri = https://127.0.0.1:1443
|
||||||
|
ngcprestapi_username = administrator
|
||||||
|
ngcprestapi_password = administrator
|
||||||
|
ngcprestapi_realm = api_admin_http
|
||||||
|
|
||||||
|
##sending email:
|
||||||
|
emailenable = 0
|
||||||
|
erroremailrecipient =
|
||||||
|
warnemailrecipient =
|
||||||
|
completionemailrecipient = rkrenn@sipwise.com
|
||||||
|
doneemailrecipient =
|
||||||
|
|
||||||
|
##logging:
|
||||||
|
fileloglevel = INFO
|
||||||
|
#DEBUG
|
||||||
|
screenloglevel = INFO
|
||||||
|
emailloglevel = OFF
|
||||||
@ -0,0 +1,61 @@
|
|||||||
|
##general settings:
|
||||||
|
working_path = /home/rkrenn/temp/customer_exporter
|
||||||
|
cpucount = 4
|
||||||
|
enablemultithreading = 1
|
||||||
|
|
||||||
|
##gearman/service listener config:
|
||||||
|
jobservers = 127.0.0.1:4730
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - "accounting" db:
|
||||||
|
accounting_host = 192.168.0.96
|
||||||
|
accounting_port = 3306
|
||||||
|
accounting_databasename = accounting
|
||||||
|
accounting_username = root
|
||||||
|
accounting_password =
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - "billing" db:
|
||||||
|
billing_host = 192.168.0.96
|
||||||
|
billing_port = 3306
|
||||||
|
billing_databasename = billing
|
||||||
|
billing_username = root
|
||||||
|
billing_password =
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - "provisioning" db:
|
||||||
|
provisioning_host = 192.168.0.96
|
||||||
|
provisioning_port = 3306
|
||||||
|
provisioning_databasename = provisioning
|
||||||
|
provisioning_username = root
|
||||||
|
provisioning_password =
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - "kamailio" db:
|
||||||
|
kamailio_host = 192.168.0.96
|
||||||
|
kamailio_port = 3306
|
||||||
|
kamailio_databasename = kamailio
|
||||||
|
kamailio_username = root
|
||||||
|
kamailio_password =
|
||||||
|
|
||||||
|
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
|
||||||
|
xa_host = 192.168.0.96
|
||||||
|
xa_port = 3306
|
||||||
|
xa_databasename = ngcp
|
||||||
|
xa_username = root
|
||||||
|
xa_password =
|
||||||
|
|
||||||
|
##NGCP REST-API connectivity:
|
||||||
|
ngcprestapi_uri = https://127.0.0.1:1443
|
||||||
|
ngcprestapi_username = administrator
|
||||||
|
ngcprestapi_password = administrator
|
||||||
|
ngcprestapi_realm = api_admin_http
|
||||||
|
|
||||||
|
##sending email:
|
||||||
|
emailenable = 0
|
||||||
|
erroremailrecipient =
|
||||||
|
warnemailrecipient =
|
||||||
|
completionemailrecipient = rkrenn@sipwise.com
|
||||||
|
doneemailrecipient =
|
||||||
|
|
||||||
|
##logging:
|
||||||
|
fileloglevel = INFO
|
||||||
|
#DEBUG
|
||||||
|
screenloglevel = INFO
|
||||||
|
emailloglevel = OFF
|
||||||
@ -0,0 +1,214 @@
|
|||||||
|
use strict;
|
||||||
|
|
||||||
|
## no critic
|
||||||
|
|
||||||
|
our $VERSION = "0.0";
|
||||||
|
|
||||||
|
use File::Basename;
|
||||||
|
use Cwd;
|
||||||
|
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
|
||||||
|
|
||||||
|
use Getopt::Long qw(GetOptions);
|
||||||
|
use Fcntl qw(LOCK_EX LOCK_NB);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Globals qw();
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::Settings qw(
|
||||||
|
update_settings
|
||||||
|
|
||||||
|
get_export_filename
|
||||||
|
$subscriber_profiles_export_filename_format
|
||||||
|
|
||||||
|
check_dry
|
||||||
|
$output_path
|
||||||
|
$defaultsettings
|
||||||
|
$defaultconfig
|
||||||
|
$dry
|
||||||
|
$skip_errors
|
||||||
|
$force
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Logging qw(
|
||||||
|
init_log
|
||||||
|
getlogger
|
||||||
|
$attachmentlogfile
|
||||||
|
scriptinfo
|
||||||
|
cleanuplogfiles
|
||||||
|
$currentlogfile
|
||||||
|
);
|
||||||
|
use NGCP::BulkProcessor::LogError qw (
|
||||||
|
completion
|
||||||
|
done
|
||||||
|
scriptwarn
|
||||||
|
scripterror
|
||||||
|
filewarn
|
||||||
|
fileerror
|
||||||
|
);
|
||||||
|
use NGCP::BulkProcessor::LoadConfig qw(
|
||||||
|
load_config
|
||||||
|
$SIMPLE_CONFIG_TYPE
|
||||||
|
$YAML_CONFIG_TYPE
|
||||||
|
$ANY_CONFIG_TYPE
|
||||||
|
);
|
||||||
|
use NGCP::BulkProcessor::Array qw(removeduplicates);
|
||||||
|
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
|
||||||
|
use NGCP::BulkProcessor::Mail qw(
|
||||||
|
cleanupmsgfiles
|
||||||
|
);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs);
|
||||||
|
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::ProjectConnectorPool qw(destroy_all_dbs get_csv_db get_sqlite_db);
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents qw();
|
||||||
|
|
||||||
|
use NGCP::BulkProcessor::Projects::ETL::EDR::ExportEvents qw(
|
||||||
|
export_subscriber_profiles
|
||||||
|
);
|
||||||
|
|
||||||
|
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB;
|
||||||
|
|
||||||
|
my @TASK_OPTS = ();
|
||||||
|
|
||||||
|
my $tasks = [];
|
||||||
|
|
||||||
|
my $cleanup_task_opt = 'cleanup';
|
||||||
|
push(@TASK_OPTS,$cleanup_task_opt);
|
||||||
|
|
||||||
|
my $cleanup_all_task_opt = 'cleanup_all';
|
||||||
|
push(@TASK_OPTS,$cleanup_all_task_opt);
|
||||||
|
|
||||||
|
my $export_subscriber_profiles_task_opt = 'export_subscriber_profiles';
|
||||||
|
push(@TASK_OPTS,$export_subscriber_profiles_task_opt);
|
||||||
|
|
||||||
|
if (init()) {
|
||||||
|
main();
|
||||||
|
exit(0);
|
||||||
|
} else {
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
sub init {
|
||||||
|
|
||||||
|
my $configfile = $defaultconfig;
|
||||||
|
my $settingsfile = $defaultsettings;
|
||||||
|
|
||||||
|
return 0 unless GetOptions(
|
||||||
|
"config=s" => \$configfile,
|
||||||
|
"settings=s" => \$settingsfile,
|
||||||
|
"task=s" => $tasks,
|
||||||
|
"skip-errors" => \$skip_errors,
|
||||||
|
"force" => \$force,
|
||||||
|
);
|
||||||
|
|
||||||
|
$tasks = removeduplicates($tasks,1);
|
||||||
|
|
||||||
|
my $result = load_config($configfile);
|
||||||
|
init_log();
|
||||||
|
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
|
||||||
|
return $result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
sub main() {
|
||||||
|
|
||||||
|
my @messages = ();
|
||||||
|
my @attachmentfiles = ();
|
||||||
|
my $result = 1;
|
||||||
|
my $completion = 0;
|
||||||
|
|
||||||
|
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
|
||||||
|
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
|
||||||
|
foreach my $task (@$tasks) {
|
||||||
|
|
||||||
|
if (lc($cleanup_task_opt) eq lc($task)) {
|
||||||
|
$result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result);
|
||||||
|
} elsif (lc($cleanup_all_task_opt) eq lc($task)) {
|
||||||
|
$result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result);
|
||||||
|
|
||||||
|
} elsif (lc($export_subscriber_profiles_task_opt) eq lc($task)) {
|
||||||
|
$result &= export_subscriber_profiles_task(\@messages) if taskinfo($export_subscriber_profiles_task_opt,$result);
|
||||||
|
$completion |= 1;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
$result = 0;
|
||||||
|
scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
|
||||||
|
last;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$result = 0;
|
||||||
|
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
push(@attachmentfiles,$attachmentlogfile);
|
||||||
|
if ($completion) {
|
||||||
|
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
|
||||||
|
} else {
|
||||||
|
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub taskinfo {
|
||||||
|
my ($task,$result) = @_;
|
||||||
|
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub cleanup_task {
|
||||||
|
my ($messages,$clean_generated) = @_;
|
||||||
|
my $result = 0;
|
||||||
|
if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) {
|
||||||
|
eval {
|
||||||
|
cleanupcvsdirs();
|
||||||
|
cleanupdbfiles();
|
||||||
|
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
|
||||||
|
cleanupmsgfiles(\&fileerror,\&filewarn);
|
||||||
|
cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
|
||||||
|
$result = 1;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if ($@ or !$result) {
|
||||||
|
push(@$messages,'working directory cleanup INCOMPLETE');
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
push(@$messages,'working directory folders cleaned up');
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub export_subscriber_profiles_task {
|
||||||
|
|
||||||
|
my ($messages) = @_;
|
||||||
|
my ($result,$warning_count) = (0,0);
|
||||||
|
eval {
|
||||||
|
($result,$warning_count) = export_subscriber_profiles();
|
||||||
|
};
|
||||||
|
my $err = $@;
|
||||||
|
my $stats = ": $warning_count warnings";
|
||||||
|
eval {
|
||||||
|
my ($export_filename,$export_format) = get_export_filename($subscriber_profiles_export_filename_format);
|
||||||
|
if ('sqlite' eq $export_format) {
|
||||||
|
&get_sqlite_db()->copydbfile($export_filename);
|
||||||
|
} elsif ('csv' eq $export_format) {
|
||||||
|
NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents::copy_table(\&get_csv_db);
|
||||||
|
&get_csv_db()->copytablefile(NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents::gettablename(),$export_filename);
|
||||||
|
} else {
|
||||||
|
push(@$messages,'invalid extension for output filename $export_filename');
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if ($err or !$result) {
|
||||||
|
push(@$messages,"exporting subscriber profiles INCOMPLETE$stats");
|
||||||
|
} else {
|
||||||
|
push(@$messages,"exporting subscriber profiles completed$stats");
|
||||||
|
}
|
||||||
|
destroy_all_dbs();
|
||||||
|
return $result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
__DATA__
|
||||||
|
This exists to allow the locking code at the beginning of the file to work.
|
||||||
|
DO NOT REMOVE THESE LINES!
|
||||||
@ -0,0 +1,21 @@
|
|||||||
|
#dry=0
|
||||||
|
#skip_errors=0
|
||||||
|
|
||||||
|
export_subscriber_profiles_multithreading = 1
|
||||||
|
export_subscriber_profiles_numofthreads = 2
|
||||||
|
export_subscriber_profiles_blocksize = 1000
|
||||||
|
export_subscriber_profiles_limit = 10000
|
||||||
|
|
||||||
|
#export_cdr_conditions = { 'accounting.cdr.destination_domain' => { 'IN' => '("80.110.2.164","ccs.upc.at")' } }
|
||||||
|
#export_cdr_conditions = { 'accounting.cdr.destination_domain' => { '=' => '"ccs.upc.at"' } }
|
||||||
|
#, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } }
|
||||||
|
#{ 'accounting.cdr.call_status' => { '=' => '"ok"' } }
|
||||||
|
#export_cdr_joins = { 'accounting.cdr_export_status_data esd' => { 'esd.cdr_id' => 'accounting.cdr.id' } }, { 'accounting.cdr_export_status es' => { 'es.id' => 'esd.status_id' } }
|
||||||
|
export_cdr_conditions = { 'accounting.cdr.id' => { 'IN' => '(51,53, 87,89, 55, 79, 65,67,69, 81,83,85, 111, 113)' } }
|
||||||
|
|
||||||
|
subscriber_profiles_export_filename=subscriber_profiles_%s.csv
|
||||||
|
|
||||||
|
sqlite_db_file = sqlite
|
||||||
|
csv_dir = events
|
||||||
|
period_events_single_row_txn = 1
|
||||||
|
ignore_period_events_unique = 0
|
||||||
Loading…
Reference in new issue