TT#18883 implement teletek importer #8

+ prepare/merge/clean callforward+voicemail Y/N data
+ add voip_cf_destinations dao
+ add voip_cf_destination_sets dao
+ voip_cf_mappings, destination_sets and destinations
  insert dao method
+ write callforwards
+ write callforward related preference values
+ not well-formed enough: fixed issues popped up when
  trying to view/edit created callfowards in panel

+ fix "database locked" error - properly handle SQLite's
  serialized transactions (single transaction at a time).
+ write report file for debugging/review
  - json graph of merged data of a subscriber
  - including fields info/warning/error messages
    for a subscriber
  - write it snychronized now
+ get rid of JSON::XS

+ extend NGCPRestApi connector to support file transfers

+ heuristic for missing "channels" (by subscriber number
  count)

+ prevent mysql deadlocks when writing to ngcp

+ strictly consider record order from imports

+ task end result stats polished

+ cleanup code a bit

+ add kmailio.location dao
+ add kmailio.location insert dao method
+ writing "permanent registrations" to kamailio.location
+ generate location "ruid" and "partition" according to kamailio

Change-Id: Ief9a7634b4930e51d79ac5e963ba48769d3708ea
changes/46/15646/15
Rene Krenn 8 years ago
parent 90d93a9990
commit bfa6575581

@ -37,7 +37,6 @@ my $builder = Module::Build->new(
'MIME::Base64' => 0,
'MIME::Lite' => 0,
'Net::SMTP' => 0,
'JSON::XS' => 0,
'Data::Dump' => 0,
'YAML::XS' => 0,
'XML::Dumper' => '0.81',

1
debian/control vendored

@ -38,7 +38,6 @@ Depends:
libintl-perl,
libio-compress-perl,
libio-socket-ssl-perl,
libjson-xs-perl,
liblog-log4perl-perl,
libmail-imapclient-perl,
libmarpa-r2-perl,

@ -109,6 +109,23 @@ sub forupdate_cc_ac_sn_subscriberid {
return buildrecords_fromrows($rows,$load_recursive)->[0];
#my $stmt = $db->paginate_sort_query('SELECT ' . $db->columnidentifier('id') . ' FROM ' . $table . ' WHERE ' .
# $db->columnidentifier('cc') . ' = ?' .
# ' AND ' . $db->columnidentifier('ac') . ' = ?' .
# ' AND ' . $db->columnidentifier('sn') . ' = ?' .
# ' AND (' . $db->columnidentifier('subscriber_id') . ' = ? OR ' . $db->columnidentifier('subscriber_id') . ' IS NULL)',undef,undef,[{
# column => 'id',
# numeric => 1,
# dir => 1,
# }]);
#my @params = ($cc,$ac,$sn,$subscriber_id);
#foreach my $id (@{$xa_db->db_get_col($stmt,@params)}) {
# return buildrecords_fromrows([
# $xa_db->db_get_row('SELECT * FROM ' . $table . ' WHERE ' . $db->columnidentifier('id') . ' = ? FOR UPDATE',$id)
# ],$load_recursive)->[0];
#}
#return undef;
}
sub release_subscriber_numbers {

@ -0,0 +1,257 @@
package NGCP::BulkProcessor::Dao::Trunk::kamailio::location;
use strict;
## no critic
#use threads::shared qw();
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_kamailio_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Utils qw(threadid);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
countby_usernamedomain
next_ruid
);
my $tablename = 'location';
my $get_db = \&get_kamailio_db;
my $expected_fieldnames = [
'id',
'username',
'domain',
'contact',
'received',
'path',
'expires',
'q',
'callid',
'cseq',
'last_modified',
'flags',
'cflags',
'user_agent',
'socket',
'methods',
'ruid',
'reg_id',
'instance',
'server_id',
'connection_id',
'keepalive',
'partition',
];
my $indexes = {};
my $insert_unique_fields = [];
#/*! call-id used for ul_add and ul_rm_contact */
#static str mi_ul_cid = str_init("dfjrewr12386fd6-343@Kamailio.mi");
#/*! user agent used for ul_add */
#static str mi_ul_ua = str_init("Kamailio MI Server");
my $default_expires = 0; #4294967295
my $default_path = '<sip:127.0.0.1:5060;lr>';
my $default_q = 1.0;
my $default_cseq = 1;
my $default_callid = 'dfjrewr12386fd6-343@Kamailio.mi';
my $default_useragent = 'SIP Router MI Server'; #'Kamailio MI Server';
#\kamailio-master\src\lib\srutils\sruid.c
my $ruid_time = time();
my $ruid_counter = 0;
my $ruid_format = 'ulcx-%x-%x-%x';
my $partition_counter = 0;
my $max_partitions = undef; #>30...;
sub next_ruid {
return sprintf($ruid_format,$ruid_time,threadid(),$ruid_counter++);
}
sub _get_partition {
my $partition = $partition_counter + threadid();
$partition_counter++;
if (defined $max_partitions and $max_partitions > 0) {
return $partition % $max_partitions;
}
return $partition;
}
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub countby_usernamedomain {
my ($username,$domain) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
my @terms = ();
if (defined $username) {
push(@terms,'username = ?');
push(@params,$username);
}
if (defined $domain) {
push(@terms,'domain = ?');
push(@params,$domain);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($username,
$domain,
$contact,
$q,
$expires,
$ruid) = @params{qw/
username
domain
contact
q
expires
ruid
/};
$expires //= $default_expires;
$q //= $default_q;
$ruid //= next_ruid();
my $partition = _get_partition();
my $path = $default_path;
my $cseq = $default_cseq;
my $callid = $default_callid;
my $useragent = $default_useragent;
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('username') . ', ' .
$db->columnidentifier('domain') . ', ' .
$db->columnidentifier('contact') . ', ' .
$db->columnidentifier('path') . ', ' .
$db->columnidentifier('q') . ', ' .
$db->columnidentifier('last_modified') . ', ' .
$db->columnidentifier('expires') . ', ' .
$db->columnidentifier('cseq') . ', ' .
$db->columnidentifier('callid') . ', ' .
$db->columnidentifier('user_agent') . ', ' .
$db->columnidentifier('partition') . ', ' .
$db->columnidentifier('ruid') . ') VALUES (' .
'?, ' .
'?, ' .
'?, ' .
'?, ' .
'?, ' .
'FROM_UNIXTIME(0), ' .
'FROM_UNIXTIME(?), ' .
'?, ' .
'?, ' .
'?, ' .
'?, ' .
'?)',
$username,
$domain,
$contact,
$path,
$q,
$expires,
$cseq,
$callid,
$useragent,
$partition,
$ruid,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,148 @@
package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
insert_record
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
countby_subscriberid_type
insert_row
);
my $tablename = 'voip_cf_destination_sets';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'subscriber_id',
'name',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub countby_subscriberid_type {
my ($subscriber_id,$type,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
my @terms = ();
if ($subscriber_id) {
push(@terms,$db->columnidentifier('subscriber_id') . ' = ?');
push(@params,$subscriber_id);
}
if ($type) {
push(@terms,$db->columnidentifier('type') . ' = ?');
push(@params,$type);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($subscriber_id,
$name) = @params{qw/
subscriber_id
name
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('subscriber_id') . ', ' .
$db->columnidentifier('name') .') VALUES (' .
'?, ' .
'?)'
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,163 @@
package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
insert_record
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
countby_subscriberid_type
insert_row
);
my $tablename = 'voip_cf_destinations';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'destination_set_id',
'destination',
'priority',
'timeout',
'announcement_id',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub countby_subscriberid_type {
my ($subscriber_id,$type,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
my @terms = ();
if ($subscriber_id) {
push(@terms,$db->columnidentifier('subscriber_id') . ' = ?');
push(@params,$subscriber_id);
}
if ($type) {
push(@terms,$db->columnidentifier('type') . ' = ?');
push(@params,$type);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($destination_set_id,
$destination,
$priority,
$timeout,
$announcement_id) = @params{qw/
destination_set_id
destination
priority
timeout
announcement_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('destination_set_id') . ', ' .
$db->columnidentifier('destination') . ', ' .
$db->columnidentifier('priority') . ', ' .
$db->columnidentifier('timeout') . ', ' .
$db->columnidentifier('announcement_id') .') VALUES (' .
'?, ' .
'?, ' .
'?, ' .
'?, ' .
'NULL)'
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -11,6 +11,7 @@ use NGCP::BulkProcessor::ConnectorPool qw(
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
insert_record
);
use NGCP::BulkProcessor::SqlRecord qw();
@ -25,6 +26,8 @@ our @EXPORT_OK = qw(
$CFT_TYPE
$CFU_TYPE
$CFNA_TYPE
insert_row
);
my $tablename = 'voip_cf_mappings';
@ -40,6 +43,8 @@ my $expected_fieldnames = [
my $indexes = {};
my $insert_unique_fields = [];
our $CFB_TYPE = 'cfb';
our $CFT_TYPE = 'cft';
our $CFU_TYPE = 'cfu';
@ -84,6 +89,50 @@ sub countby_subscriberid_type {
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($subscriber_id,
$type,
$destination_set_id,
$time_set_id) = @params{qw/
subscriber_id
type
destination_set_id
time_set_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('subscriber_id') . ', ' .
$db->columnidentifier('type') . ', ' .
$db->columnidentifier('destination_set_id') . ', ' .
$db->columnidentifier('time_set_id') . ') VALUES (' .
'?, ' .
'?, ' .
'?, ' .
'?)',
$subscriber_id,
$type,
$destination_set_id,
$time_set_id
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;

@ -31,6 +31,7 @@ our @EXPORT_OK = qw(
findby_subscriberid_username
findby_domainid_username
countby_subscriberidisprimary
);
my $tablename = 'voip_dbaliases';
@ -102,6 +103,33 @@ sub findby_domainid_username {
}
sub countby_subscriberidisprimary {
my ($subscriber_id,$is_primary) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
my @terms = ();
if (defined $subscriber_id) {
push(@terms,$db->columnidentifier('subscriber_id') . ' = ?');
push(@params,$subscriber_id);
}
if (defined $is_primary) {
push(@terms,$db->columnidentifier('is_primary') . ' = ?');
push(@params,$is_primary);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub delete_dbaliases {
my ($xa_db,$subscriber_id,$usernames) = @_;

@ -38,6 +38,9 @@ our @EXPORT_OK = qw(
$ALLOWED_IPS_GRP_ATTRIBUTE
$CONCURRENT_MAX_TOTAL_ATTRIBUTE
$CONCURRENT_MAX_PER_ACCOUNT
@CF_ATTRIBUTES
$RINGTIMEOUT_ATTRIBUTE
);
#$FORCE_OUTBOUND_CALLS_TO_PEER
@ -88,6 +91,10 @@ our $CONCURRENT_MAX_TOTAL_ATTRIBUTE = 'concurrent_max_total';
our $CONCURRENT_MAX_PER_ACCOUNT_ATTRIBUTE = 'concurrent_max_per_account';
our $CLIR_ATTRIBUTE = 'clir';
our @CF_ATTRIBUTES = qw(cfu cft cfna cfb); #skip sms for now
our $RINGTIMEOUT_ATTRIBUTE = 'ringtimeout';
sub new {
my $class = shift;

@ -341,7 +341,7 @@ sub restwarn {
sub restrequesterror {
my ($restapi, $message, $request, $logger) = @_;
my ($restapi, $message, $request, $data, $logger) = @_;
$message = _getrestconnectorinstanceprefix($restapi) . _getrestconnectidentifiermessage($restapi,$message);
if (defined $logger) {
$logger->error($message);

@ -1,433 +0,0 @@
package NGCP::BulkProcessor::Projects::Migration::Teletek::Api;
use strict;
## no critic
use threads::shared qw();
#use List::Util qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$dry
$skip_errors
$batch
$domain_name
$reseller_id
$subsciber_username_prefix
$set_call_forwards_multithreading
$set_call_forwards_numofthreads
$cfb_priorities
$cfb_timeouts
$cfu_priorities
$cfu_timeouts
$cft_priorities
$cft_timeouts
$cfna_priorities
$cfna_timeouts
$cfnumber_exclude_pattern
$cfnumber_trim_pattern
$ringtimeout
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
);
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::RestRequests::Trunk::CallForwards qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
);
use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw(
destroy_all_dbs
);
use NGCP::BulkProcessor::Utils qw(threadid);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
set_call_forwards
);
sub set_call_forwards {
my $static_context = {};
my $result = _set_call_forwards_checks($static_context);
destroy_all_dbs();
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::process_records(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
my $rownum = $row_offset;
foreach my $imported_subscriber (@$records) {
$rownum++;
next unless _reset_set_call_forward_context($context,$imported_subscriber,$rownum);
_set_call_forward($context);
}
#return 0;
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
# below is not mandatory..
_check_insert_tables();
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
load_recursive => 0,
multithreading => $set_call_forwards_multithreading,
numofthreads => $set_call_forwards_numofthreads,
),$warning_count);
}
sub _check_insert_tables {
}
sub _invoke_api {
my ($context,$api_code) = @_;
eval {
$context->{db}->db_begin();
#rowprocessingwarn($context->{tid},'AutoCommit is on' ,getlogger(__PACKAGE__)) if $context->{db}->{drh}->{AutoCommit};
my $existing_billing_voip_subscribers = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_domainid_username_states($context->{db},
$context->{billing_domain}->{id},$context->{username},{ 'NOT IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::TERMINATED_STATE});
if ((scalar @$existing_billing_voip_subscribers) == 0) {
#if ($context->{subscriberdelta} eq
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::deleted_delta) {
# _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' is deleted, and no active subscriber found',1);
#} else {
# _warn($context,"($context->{rownum}) no active subscriber found for susbcriber " . $context->{cli});
#}
} elsif ((scalar @$existing_billing_voip_subscribers) == 1) {
$context->{billing_voip_subscriber} = $existing_billing_voip_subscribers->[0];
$context->{provisioning_voip_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(
$context->{db},$context->{billing_voip_subscriber}->{uuid});
if (defined $context->{provisioning_voip_subscriber}) {
#if ($context->{subscriberdelta} eq
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::deleted_delta) {
#
# _warn($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' is deleted, but active subscriber found');
#} else {
if (defined $api_code and 'CODE' eq ref $api_code) {
&$api_code($context);
}
#}
} else {
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli});
} else {
_error($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli});
}
}
} else {
rowprocessingwarn($context->{tid},"($context->{rownum}) " . 'multiple (' . (scalar @$existing_billing_voip_subscribers) . ') existing billing subscribers with username ' . $context->{username} . ' found, skipping' ,getlogger(__PACKAGE__));
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'database error with subscriber ' . $context->{cli} . ': ' . $err);
} else {
_error($context,"($context->{rownum}) " . 'database error with subscriber ' . $context->{cli} . ': ' . $err);
}
}
}
sub _set_call_forward {
my ($context) = @_;
_invoke_api($context,\&_set_cf_simple);
}
sub _checks {
my ($context) = @_;
my $result = 1;
#my $userpasswordcount = 0;
#eval {
# $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::UsernamePassword::countby_fqdn();
#};
#if ($@ or $userpasswordcount == 0) {
# rowprocessingerror(threadid(),'please import user passwords first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
#my $subscribercount = 0;
#my $subscriber_barring_profiles = [];
#eval {
# $subscribercount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::countby_subscribernumber();
# $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::list_barringprofiles();
#};
#if ($@ or $subscribercount == 0) {
# rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
#eval {
# $context->{billing_domain} = NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_domain($domain_name);
# if (defined $context->{billing_domain}
# and NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers::countby_domainid_resellerid($context->{billing_domain}->{id},$reseller_id) == 0) {
# undef $context->{billing_domain};
# }
#};
#if ($@ or not defined $context->{billing_domain}) {
# rowprocessingerror(threadid(),'cannot find billing domain',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
return $result;
}
sub _set_call_forwards_checks {
my ($context) = @_;
my $result = _checks($context);
#my $optioncount = 0;
#eval {
# $optioncount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOption::countby_subscribernumber_option();
#};
#if ($@ or $optioncount == 0) {
# rowprocessingerror(threadid(),'please import subscriber features first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
return $result;
}
sub _set_cf_simple {
my ($context) = @_;
my $result = 0;
#my $cf_path = NGCP::BulkProcessor::RestRequests::Trunk::CallForwards::get_item_path($context->{billing_voip_subscriber}->{id});
#eval {
# my $callforwards;
# if ($dry) {
# $callforwards = NGCP::BulkProcessor::RestRequests::Trunk::CallForwards::get_item($context->{billing_voip_subscriber}->{id});
# } else {
# $callforwards = NGCP::BulkProcessor::RestRequests::Trunk::CallForwards::set_item(
# $context->{billing_voip_subscriber}->{id},$context->{call_forwards});
# }
# $result = (defined $callforwards ? 1 : 0);
#};
#if ($@ or not $result) {
# if ($skip_errors) {
# _warn($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ': could not ' . ($dry ? 'fetch' : 'set') . ' call forwards ' . $cf_path);
# } else {
# _error($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ': could not ' . ($dry ? 'fetch' : 'set') . ' call forwards ' . $cf_path);
# }
#} else {
# _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ': call forwards ' . $cf_path . ($dry ? ' fetched' : ' set'));
#}
return $result;
}
sub _reset_context {
my ($context,$imported_subscriber,$rownum) = @_;
my $result = 1;
$context->{rownum} = $rownum;
#$context->{cli} = $imported_subscriber->subscribernumber();
#$context->{e164} = {};
#$context->{e164}->{cc} = substr($context->{cli},0,3);
#$context->{e164}->{ac} = '';
#$context->{e164}->{sn} = substr($context->{cli},3);
#$context->{subscriberdelta} = $imported_subscriber->{delta};
#my $userpassword = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::UsernamePassword::findby_fqdn($context->{cli});
#if (defined $userpassword) {
# $context->{username} = (defined $subsciber_username_prefix ? $subsciber_username_prefix : '') . $userpassword->{username};
# $context->{password} = $userpassword->{password};
# $context->{userpassworddelta} = $userpassword->{delta};
#} else {
# # once full username+passwords is available:
# delete $context->{username};
# delete $context->{password};
# delete $context->{userpassworddelta};
# if ($context->{subscriberdelta} eq
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::deleted_delta) {
#
# } else {
# $result &= 0;
#
# if ($skip_errors) {
# # for now, as username+passwords are incomplete:
# _warn($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli});
# } else {
# _error($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli});
# }
# }
#}
#delete $context->{billing_voip_subscriber};
#delete $context->{provisioning_voip_subscriber};
return $result;
}
sub _reset_set_call_forward_context {
my ($context,$imported_subscriber,$rownum) = @_;
my $result = _reset_context($context,$imported_subscriber,$rownum);
#my $call_forwards = {};
#if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::countby_subscribernumber_option_optionsetitem(
# $context->{cli}, { 'IN' => [
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ON_BUSY_OPTION_SET,
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ALL_CALLS_OPTION_SET,
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ON_NO_ANSWER_OPTION_SET,
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_UNAVAILABLE_OPTION_SET,
# ]}) > 0) {
# $call_forwards->{cfb} = _prepare_callforward($context,$cfb_priorities,$cfb_timeouts,
# NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::findby_subscribernumber_option_optionsetitem(
# $context->{cli},
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ON_BUSY_OPTION_SET,
# ));
# $call_forwards->{cfu} = _prepare_callforward($context,$cfu_priorities,$cfu_timeouts,
# NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::findby_subscribernumber_option_optionsetitem(
# $context->{cli},
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ALL_CALLS_OPTION_SET,
# ));
# $call_forwards->{cft} = _prepare_callforward($context,$cft_priorities,$cft_timeouts,
# NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::findby_subscribernumber_option_optionsetitem(
# $context->{cli},
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_ON_NO_ANSWER_OPTION_SET,
# ));
# $call_forwards->{cft}->{ringtimeout} = $ringtimeout if defined $call_forwards->{cft};
# $call_forwards->{cfna} = _prepare_callforward($context,$cfna_priorities,$cfna_timeouts,
# NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::findby_subscribernumber_option_optionsetitem(
# $context->{cli},
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::FORWARD_UNAVAILABLE_OPTION_SET,
# ));
#} else {
# _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' never had call forwards, skipping',1);
# $call_forwards->{cfb} = undef;
# $call_forwards->{cfu} = undef;
# $call_forwards->{cft} = undef;
# $call_forwards->{cfna} = undef;
# $result = 0;
#}
#$context->{call_forwards} = $call_forwards;
return $result;
}
sub _prepare_callforward {
my ($context,$priorities,$timeouts,$cf_option_set_items) = @_;
my @destinations = ();
#my $i = 0;
#foreach my $cf_option_set_item (@$cf_option_set_items) {
# if (defined $cf_option_set_item and $cf_option_set_item->{delta} ne
# $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOptionSetItem::deleted_delta) {
# if (defined $cfnumber_exclude_pattern and $cf_option_set_item->{optionsetitem} =~ $cfnumber_exclude_pattern) {
# _warn($context,"($context->{rownum}) " . $cf_option_set_item->{option} . " '" . $cf_option_set_item->{optionsetitem} . "' of subscriber " . $context->{cli} . ': exclude pattern match');
# } else {
# my $destination = $cf_option_set_item->{optionsetitem};
# if (defined $cfnumber_trim_pattern) {
# $destination =~ s/$cfnumber_trim_pattern//;
# if ($cf_option_set_item->{optionsetitem} ne $destination) {
# _info($context,"($context->{rownum}) " . $cf_option_set_item->{option} . " '" . $cf_option_set_item->{optionsetitem} . "' of subscriber " . $context->{cli} . ": trim pattern match, changed to to '$destination'");
# }
# }
# push(@destinations, {
# destination => $destination,
# priority => (defined $priorities->[$i] ? $priorities->[$i] : $priorities->[-1]),
# timeout => (defined $timeouts->[$i] ? $timeouts->[$i] : $timeouts->[-1]),
# });
# $i++;
# }
# }
#}
if ((scalar @destinations) > 0) {
return { destinations => \@destinations , times => [], };
} else {
return undef;
}
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -5,14 +5,6 @@ use strict;
no strict 'refs';
#use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
# $reseller_id
# $domain_name
# $billing_profile_id
#);
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
@ -23,10 +15,6 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::products qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw();
#use NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
@ -36,8 +24,11 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw();
use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw();
use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw();
@ -48,8 +39,6 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration
use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw();
use NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles qw();
#use NGCP::BulkProcessor::RestRequests::Trunk::NcosLevels qw();
#use NGCP::BulkProcessor::RestRequests::Trunk::LnpCarriers qw();
require Exporter;
our @ISA = qw(Exporter);
@ -107,15 +96,6 @@ sub check_billing_db_tables {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers');
$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers');
#if (not $check_result) {
# ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers');
#}
#$result &= $check_result; push(@$messages,$message);
return $result;
}
@ -183,6 +163,12 @@ sub check_provisioning_db_tables {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations');
$result &= $check_result; push(@$messages,$message);
return $result;
}
@ -200,6 +186,8 @@ sub check_kamailio_db_tables {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::location');
$result &= $check_result; push(@$messages,$message);
return $result;
@ -231,38 +219,7 @@ sub check_rest_get_items {
my $message_prefix = 'NGCP id\'s/constants - ';
#($check_result,$message, my $reseller) = _check_rest_get_item($message_prefix,
# 'NGCP::BulkProcessor::RestRequests::Trunk::Resellers',
# $reseller_id,
# 'name');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message, my $domain) = _check_rest_get_item($message_prefix,
# 'NGCP::BulkProcessor::RestRequests::Trunk::Domains',
# { 'domain' => $domain_name, 'reseller_id' => $reseller_id },
# 'domain',
# 'get_item_filtered',
# 'get_item_filter_path');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message, my $domain) = _check_rest_get_item($message_prefix,
# 'NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles',
# $billing_profile_id,
# 'handle');
#$result &= $check_result; push(@$messages,$message);
#foreach my $level (values %$barring_profiles) {
# if (defined $level and length($level) > 0) {
# ($check_result,$message, my $ncos_level) = _check_rest_get_item($message_prefix,
# 'NGCP::BulkProcessor::RestRequests::Trunk::NcosLevels',
# { 'level' => $level, 'reseller_id' => $reseller_id },
# 'level',
# 'get_item_filtered',
# 'get_item_filter_path');
# $result &= $check_result; push(@$messages,$message);
# }
#}
return $result;
}

@ -67,6 +67,7 @@ our @fieldnames = (
#calculated fields at the end!
'rownum',
'filenum',
'filename',
);
@ -170,10 +171,19 @@ sub findby_sipusername {
#return [] unless (defined $cc or defined $ac or defined $sn);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$db->paginate_sort_query('SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('sip_username') . ' = ?'
$db->columnidentifier('sip_username') . ' = ?',
undef,undef,[{
column => 'filenum',
numeric => 1,
dir => 1,
},{
column => 'rownum',
numeric => 1,
dir => 1,
}])
,$sip_username);
return buildrecords_fromrows($rows,$load_recursive);

@ -68,6 +68,7 @@ our @fieldnames = (
#calculated fields at the end!
"sip_username",
'rownum',
'filenum',
'filename',
);

@ -64,6 +64,7 @@ our @fieldnames = (
#calculated fields at the end!
'rownum',
'filenum',
'filename',
);

@ -59,6 +59,7 @@ our @fieldnames = (
#calculated fields at the end!
'rownum',
'filenum',
'filename',
);
@ -68,7 +69,7 @@ my $expected_fieldnames = [
];
# table creation:
my $primarykey_fieldnames = [ 'sip_username' ];
my $primarykey_fieldnames = [ 'sip_username' ]; #, 'domain', 'contact' ];
my $indexes = {
$tablename . '_rownum' => [ 'rownum(11)' ],
$tablename . '_delta' => [ 'delta(7)' ],

@ -93,6 +93,7 @@ our @fieldnames = (
'rownum',
'range',
'contact_hash',
'filenum',
'filename',
);
my $expected_fieldnames = [
@ -196,11 +197,20 @@ sub findby_domain_sipusername {
#return [] unless (defined $cc or defined $ac or defined $sn);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$db->paginate_sort_query('SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('domain') . ' = ?' .
' AND ' . $db->columnidentifier('sip_username') . ' = ?'
' AND ' . $db->columnidentifier('sip_username') . ' = ?',
undef,undef,[{
column => 'filenum',
numeric => 1,
dir => 1,
},{
column => 'rownum',
numeric => 1,
dir => 1,
}])
,$domain,$sip_username);
return buildrecords_fromrows($rows,$load_recursive);
@ -238,11 +248,20 @@ sub findby_domain_webusername {
#return [] unless (defined $cc or defined $ac or defined $sn);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$db->paginate_sort_query('SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('domain') . ' = ?' .
' AND ' . $db->columnidentifier('web_username') . ' = ?'
' AND ' . $db->columnidentifier('web_username') . ' = ?',
undef,undef,[{
column => 'filenum',
numeric => 1,
dir => 1,
},{
column => 'rownum',
numeric => 1,
dir => 1,
}])
,$domain,$web_username);
return buildrecords_fromrows($rows,$load_recursive);
@ -404,7 +423,16 @@ sub process_records {
destroy_reader_dbs_code => \&destroy_all_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),
#'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),
'select' => $db->paginate_sort_query('SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),undef,undef,[{
column => 'filenum',
numeric => 1,
dir => 1,
},{
column => 'rownum',
numeric => 1,
dir => 1,
}]),
'select_count' => 'SELECT COUNT(DISTINCT(' . join(',',@cols) . ')) FROM ' . $table,
);
}

@ -82,7 +82,9 @@ sub import_subscriber {
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
my $filenum = 0;
foreach my $file (@files) {
$filenum++;
$result &= $importer->process(
file => $file,
process_code => sub {
@ -98,6 +100,7 @@ sub import_subscriber {
$record->{ac} //= '';
$record->{sn} //= '';
$record->{rownum} = $rownum;
$record->{filenum} = $filenum;
$record->{filename} = $file;
my %r = %$record;
$record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
@ -229,7 +232,9 @@ sub import_allowedcli {
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
my $filenum = 0;
foreach my $file (@files) {
$filenum++;
$result &= $importer->process(
file => $file,
process_code => sub {
@ -245,6 +250,7 @@ sub import_allowedcli {
$record->{ac} //= '';
$record->{sn} //= '';
$record->{rownum} = $rownum;
$record->{filenum} = $filenum;
$record->{filename} = $file;
if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) {
@ -470,7 +476,9 @@ sub import_clir {
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
my $filenum = 0;
foreach my $file (@files) {
$filenum++;
$result &= $importer->process(
file => $file,
process_code => sub {
@ -483,6 +491,7 @@ sub import_clir {
$row = [ map { local $_ = $_; trim($_); } @$row ];
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir->new($row);
$record->{rownum} = $rownum;
$record->{filenum} = $filenum;
$record->{filename} = $file;
if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) {
@ -623,7 +632,9 @@ sub import_callforward {
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
my $filenum = 0;
foreach my $file (@files) {
$filenum++;
$result &= $importer->process(
file => $file,
process_code => sub {
@ -639,6 +650,7 @@ sub import_callforward {
$record->{ac} //= '';
$record->{sn} //= '';
$record->{rownum} = $rownum;
$record->{filenum} = $filenum;
$record->{filename} = $file;
if (my $subscriber = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) {
@ -767,15 +779,6 @@ sub _insert_callforward_rows {
}
}
sub import_registration {
my (@files) = @_;
@ -792,7 +795,9 @@ sub import_registration {
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
my $filenum = 0;
foreach my $file (@files) {
$filenum++;
$result &= $importer->process(
file => $file,
process_code => sub {
@ -805,6 +810,7 @@ sub import_registration {
$row = [ map { local $_ = $_; trim($_); } @$row ];
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration->new($row);
$record->{rownum} = $rownum;
$record->{filenum} = $filenum;
$record->{filename} = $file;
if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) {
@ -930,9 +936,6 @@ sub _insert_registration_rows {
}
sub _error {
my ($context,$message) = @_;

@ -12,8 +12,6 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$dry
$skip_errors
);
use NGCP::BulkProcessor::Logging qw (
@ -63,6 +61,7 @@ our @EXPORT_OK = qw(
);
sub cleanup_aig_sequence_ids {
my ($context) = @_;
eval {

@ -7,10 +7,13 @@ use threads::shared qw();
use String::MkPasswd qw();
#use List::Util qw();
use JSON qw();
use Tie::IxHash;
use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$dry
$skip_errors
$report_filename
@ -18,10 +21,15 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$provision_subscriber_numofthreads
$webpassword_length
$webusername_length
$default_channels_map
$reseller_mapping
$barring_profiles
$cf_default_priority
$cf_default_timeout
$cft_default_ringtimeout
);
use NGCP::BulkProcessor::Logging qw (
@ -32,11 +40,14 @@ use NGCP::BulkProcessor::Logging qw (
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
fileerror
);
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::products qw();
@ -56,8 +67,12 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw();
use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw();
use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Subscribers qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Customers qw();
@ -95,6 +110,13 @@ my $split_ipnets_pattern = join('|',(
quotemeta(';'),
#quotemeta('/')
));
my $cf_types_pattern = '^' . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFB_TYPE . '|'
. $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFT_TYPE . '|'
. $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFU_TYPE . '|'
. $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFNA_TYPE . '$';
my $db_lock :shared = undef;
my $file_lock :shared = undef;
sub provision_subscribers {
@ -103,18 +125,27 @@ sub provision_subscribers {
destroy_all_dbs();
my $warning_count :shared = 0;
#my $updated_password_count :shared = 0;
my %nonunique_contacts :shared = ();
return ($result && NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::process_records(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
$context->{rowcount} = $row_offset;
my @report_data = ();
foreach my $domain_sipusername (@$records) {
$context->{rowcount} += 1;
next unless _provision_susbcriber($context,
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_domain_sipusername(@$domain_sipusername));
push(@report_data,_get_report_obj($context));
}
cleanup_aig_sequence_ids($context);
if (defined $report_filename) {
lock $file_lock;
open(my $fh, '>>', $report_filename) or fileerror('cannot open file ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__));
binmode($fh);
print $fh JSON::to_json(\@report_data,{ allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, });
close $fh;
}
return 1;
},
init_process_context_code => sub {
@ -122,42 +153,63 @@ sub provision_subscribers {
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
#$context->{updated_password_count} = 0;
$context->{nonunique_contacts} = {};
# below is not mandatory..
_check_insert_tables();
#_check_insert_tables();
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
_warn($context,'non-unique contacts: ' . join("\n",keys %{$context->{nonunique_contacts}}))
if (scalar keys %{$context->{nonunique_contacts}}) > 0;
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
#$updated_password_count += $context->{updated_password_count};
}
{
lock %nonunique_contacts;
foreach my $sip_username (keys %{$context->{nonunique_contacts}}) {
$nonunique_contacts{$sip_username} = $context->{nonunique_contacts}->{$sip_username};
}
}
},
load_recursive => 0,
multithreading => $provision_subscriber_multithreading,
numofthreads => $provision_subscriber_numofthreads,
),$warning_count);
),$warning_count,\%nonunique_contacts);
}
sub _check_insert_tables {
NGCP::BulkProcessor::Dao::Trunk::billing::contacts::check_table();
NGCP::BulkProcessor::Dao::Trunk::billing::contracts::check_table();
NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings::check_table();
NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances::check_table();
NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::check_table();
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::check_table();
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::check_table();
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::check_table();
NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users::check_table();
sub _get_report_obj {
my ($context) = @_;
my %dump = ();
tie(%dump, 'Tie::IxHash');
foreach my $key (sort keys %$context) {
$dump{$key} = $context->{$key} if 'CODE' ne ref $context->{$key};
}
foreach my $key (qw/
sip_account_product
reseller
billing_profile
reseller_map
domain_map
domain
now
error_count
warning_count
attributes
ncos_level_map
ncos_level
nonunique_contacts
tid
db
blocksize
errorstates
queue
readertid
/) {
delete $dump{$key};
}
return \%dump;
}
sub _provision_susbcriber {
@ -166,6 +218,7 @@ sub _provision_susbcriber {
return 0 unless _provision_susbcriber_init_context($context,$subscriber_group);
eval {
lock $db_lock;
$context->{db}->db_begin();
#_warn($context,'AutoCommit is on') if $context->{db}->{drh}->{AutoCommit};
@ -180,9 +233,14 @@ sub _provision_susbcriber {
_update_contact($context);
_update_contract($context);
_update_subscriber($context);
_create_aliases($context);
#{
# lock $db_lock; #concurrent writes to voip_numbers causes deadlocks
_update_subscriber($context);
_create_aliases($context);
#}
_update_preferences($context);
_set_registrations($context);
_set_callforwards($context);
#todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it
#in this main provisioning loop, or align it in separate run-modes, according to the files given.
@ -477,6 +535,29 @@ sub _provision_subscribers_checks {
processing_info(threadid(),"adm_ncos_id attribute found",getlogger(__PACKAGE__));
}
foreach my $cf_attribute (@NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CF_ATTRIBUTES) {
eval {
$context->{attributes}->{$cf_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute($cf_attribute);
};
if ($@ or not defined $context->{attributes}->{$cf_attribute}) {
rowprocessingerror(threadid(),"cannot find $cf_attribute attribute",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"$cf_attribute attribute found",getlogger(__PACKAGE__));
}
}
eval {
$context->{attributes}->{ringtimeout} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::RINGTIMEOUT_ATTRIBUTE);
};
if ($@ or not defined $context->{attributes}->{ringtimeout}) {
rowprocessingerror(threadid(),'cannot find ringtimeout attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"ringtimeout attribute found",getlogger(__PACKAGE__));
}
return $result;
}
@ -828,12 +909,80 @@ sub _create_aliases {
return $result;
}
sub _set_registrations {
my ($context) = @_;
my $result = 1;
foreach my $registration (@{$context->{registrations}}) {
#print "blah";
$registration->{id} = NGCP::BulkProcessor::Dao::Trunk::kamailio::location::insert_row($context->{db},
%$registration);
_info($context,"permanent registration $registration->{contact} added",1);
}
return $result;
}
sub _set_callforwards {
my ($context) = @_;
my $result = 1;
foreach my $type (keys %{$context->{callforwards}}) {
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw();
my $destination_set_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets::insert_row($context->{db},{
subscriber_id => $context->{prov_subscriber}->{id},
name => "quickset_$type",
});
foreach my $callforward (@{$context->{callforwards}->{$type}}) {
$callforward->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations::insert_row($context->{db},{
%$callforward,
destination_set_id => $destination_set_id,
});
}
my $cf_mapping_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::insert_row($context->{db},{
subscriber_id => $context->{prov_subscriber}->{id},
type => $type,
destination_set_id => $destination_set_id,
#time_set_id
});
$context->{preferences}->{$type} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{$type},
$cf_mapping_id), value => $cf_mapping_id };
if (defined $context->{ringtimeout}) {
$context->{preferences}->{ringtimeout} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{ringtimeout},
$context->{ringtimeout}), value => $context->{ringtimeout} };
}
_info($context,"$type created (destination(s) " . join(', ',(map { $_->{destination}; } @{$context->{callforwards}->{$type}})) . ")",1);
$context->{callforwards}->{$type} = {
destination_set => {
destinations => $context->{callforwards}->{$type},
id => $destination_set_id,
},
id => $cf_mapping_id,
};
}
return $result;
}
sub _provision_susbcriber_init_context {
my ($context,$subscriber_group) = @_;
my $result = 1;
$context->{log_info} = [];
$context->{log_warning} = [];
$context->{log_error} = [];
my $first = $subscriber_group->[0];
unless (defined $first->{sip_username} and length($first->{sip_username}) > 0) {
@ -869,6 +1018,7 @@ sub _provision_susbcriber_init_context {
my %contact_dupes = ();
my %allowed_ips = ();
my %barrings = ();
my $voicemail = 0;
foreach my $subscriber (@$subscriber_group) {
my $number = $subscriber->{cc} . $subscriber->{ac} . $subscriber->{sn};
if (not exists $number_dupes{$number}) {
@ -877,12 +1027,13 @@ sub _provision_susbcriber_init_context {
ac => $subscriber->{ac},
sn => $subscriber->{sn},
number => $number,
delta => $subscriber->{delta},
#delta => $subscriber->{delta},
additional => 0,
filename => $subscriber->{filename},
});
$number_dupes{$number} = 1;
} else {
_warn($context,'duplicate number $number (subscriber table) ignored');
_warn($context,"duplicate number $number ($subscriber->{filename}) ignored");
}
if (not exists $contact_dupes{$subscriber->{contact_hash}}) {
@ -908,7 +1059,8 @@ sub _provision_susbcriber_init_context {
};
$contact_dupes{$subscriber->{contact_hash}} = 1;
} else {
_warn($context,'non-unique contact hash, skipped');
_warn($context,'non-unique contact data, skipped');
$context->{nonunique_contacts}->{$context->{prov_subscriber}->{username}} += 1;
}
}
@ -953,6 +1105,18 @@ sub _provision_susbcriber_init_context {
$barrings{$subscriber->{barrings}} = 1;
}
$voicemail = stringtobool($subscriber->{voicemail}) unless $voicemail;
}
unless (defined $context->{channels}) {
my $default_channels = 1;
foreach my $numbers (sort { $a <=> $b } keys %$default_channels_map) {
if ((scalar @numbers) > $numbers) {
$default_channels = $default_channels_map->{$numbers};
}
}
_info($context,"using $default_channels channels by default for " . (scalar @numbers) . ' numbers',1);
$context->{channels} = $default_channels;
}
unless (defined $context->{prov_subscriber}->{webusername} and length($context->{prov_subscriber}->{webusername}) > 0) {
@ -1006,12 +1170,13 @@ sub _provision_susbcriber_init_context {
ac => $allowed_cli->{ac},
sn => $allowed_cli->{sn},
number => $number,
delta => $allowed_cli->{delta},
#delta => $allowed_cli->{delta},
additional => 1,
filename => $allowed_cli->{filename},
});
$number_dupes{$number} = 1;
} else {
_warn($context,'duplicate number $number (allowed_cli table) ignored');
_warn($context,"duplicate number $number ($allowed_cli->{filename}) ignored");
}
}
@ -1055,6 +1220,72 @@ sub _provision_susbcriber_init_context {
$context->{clir} = stringtobool($clir->{clir});
}
$context->{ringtimeout} = undef;
my %cfsimple = ();
my $callforwards = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward::findby_sipusername($first->{sip_username});
if ((scalar @$callforwards) > 0 or $voicemail) {
my %vmcf = ();
my %maxpriority = ();
foreach my $callforward (@$callforwards) {
my $type = lc($callforward->{type});
if ($type =~ /$cf_types_pattern/) {
unless (defined $callforward->{destination} and length($callforward->{destination}) > 0) {
_warn($context,"empty callforward destination, ignoring");
next;
}
if ($callforward->{destination} =~ /voicemail/i) {
$callforward->{destination} = 'sip:vm' . ('cfb' eq $type ? 'b' : 'u') . $context->{numbers}->{primary}->{number} . '@voicebox.local';
$vmcf{$type} = 1 unless $vmcf{$type};
} elsif ($callforward->{destination} !~ /^\d+$/i) {
_warn($context,"invalid callforward destination '$callforward->{destination}', ignoring");
next;
} else { #todo: allow sip uri destinations
$callforward->{destination} .= '@' . $context->{domain}->{domain};
}
$callforward->{priority} //= $cf_default_priority;
$callforward->{timeout} //= $cf_default_timeout;
$callforward->{ringtimeout} //= $cft_default_ringtimeout if 'cft' eq $type;
$context->{ringtimeout} = $callforward->{ringtimeout} if ('cft' eq $type and (not defined $context->{ringtimeout} or $callforward->{ringtimeout} > $context->{ringtimeout}));
$cfsimple{$type} = [] unless exists $cfsimple{$type};
push(@{$cfsimple{$type}},{
destination => $callforward->{destination},
priority => $callforward->{priority},
timeout => $callforward->{timeout},
});
#$vmcf{$type} = ($callforward->{destination} =~ /voicemail/i) unless $vmcf{$type};
$maxpriority{$type} = $callforward->{priority} if (not defined $maxpriority{$type} or $callforward->{priority} > $maxpriority{$type});
} else {
_warn($context,"invalid callforward type '$type', ignoring");
}
}
if ($voicemail) {
foreach my $type (($NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFNA_TYPE,
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFT_TYPE)) {
next if $vmcf{$type};
$cfsimple{$type} = [] unless exists $cfsimple{$type};
push(@{$cfsimple{$type}},{
destination => 'sip:vmu' . $context->{numbers}->{primary}->{number} . '@voicebox.local',
priority => (defined $maxpriority{$type} ? $maxpriority{$type} + 1 : $cf_default_priority),
timeout => $cf_default_timeout,
});
$context->{ringtimeout} = $cft_default_ringtimeout if ('cft' eq $type and not defined $context->{ringtimeout}); # or $cft_default_ringtimeout > $context->{ringtimeout}));
}
}
}
$context->{callforwards} = \%cfsimple;
my @registrations = ();
if (my $registration = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::findby_sipusername($first->{sip_username})) {
#todo: check/transform, multiple contacts
push(@registrations,{
username => $registration->{sip_username},
domain => $registration->{domain},
contact => $registration->{sip_contact},
ruid => NGCP::BulkProcessor::Dao::Trunk::kamailio::location::next_ruid(),
});
}
$context->{registrations} = \@registrations;
#$context->{counts} = {} unless defined $context->{counts};
@ -1062,6 +1293,7 @@ sub _provision_susbcriber_init_context {
}
sub _generate_webpassword {
return String::MkPasswd::mkpasswd(
-length => $webpassword_length,
@ -1086,6 +1318,7 @@ sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
push(@{$context->{log_error}},$message) if exists $context->{log_error};
if ($context->{prov_subscriber}) {
$message = ($context->{prov_subscriber}->{username} ? $context->{prov_subscriber}->{username} : '<empty sip_username>') . ': ' . $message;
}
@ -1097,6 +1330,7 @@ sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
push(@{$context->{log_warning}},$message) if exists $context->{log_warning};
if ($context->{prov_subscriber}) {
$message = ($context->{prov_subscriber}->{username} ? $context->{prov_subscriber}->{username} : '<empty sip_username>') . ': ' . $message;
}
@ -1107,6 +1341,7 @@ sub _warn {
sub _info {
my ($context,$message,$debug) = @_;
push(@{$context->{log_info}},$message) if exists $context->{log_info};
if ($context->{prov_subscriber}) {
$message = ($context->{prov_subscriber}->{username} ? $context->{prov_subscriber}->{username} : '<empty sip_username>') . ': ' . $message;
}

@ -18,6 +18,7 @@ use NGCP::BulkProcessor::Logging qw(
use NGCP::BulkProcessor::LogError qw(
fileerror
filewarn
configurationwarn
configurationerror
);
@ -26,7 +27,7 @@ use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt);
use NGCP::BulkProcessor::Utils qw(prompt timestampdigits);
#format_number check_ipnet
require Exporter;
@ -39,6 +40,7 @@ our @EXPORT_OK = qw(
$input_path
$output_path
$report_filename
$defaultsettings
$defaultconfig
@ -85,43 +87,27 @@ our @EXPORT_OK = qw(
$provision_subscriber_numofthreads
$webpassword_length
$webusername_length
$default_channels_map
$set_call_forwards_multithreading
$set_call_forwards_numofthreads
$cfb_priorities
$cfb_timeouts
$cfu_priorities
$cfu_timeouts
$cft_priorities
$cft_timeouts
$cfna_priorities
$cfna_timeouts
$cfnumber_exclude_pattern
$cfnumber_trim_pattern
$ringtimeout
$set_preference_bulk_multithreading
$set_preference_bulk_numofthreads
$cf_default_priority
$cf_default_timeout
$cft_default_ringtimeout
);
#$concurrent_max_total
# $set_allowed_ips_multithreading
# $set_allowed_ips_numofthreads
# $allowed_ips
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $input_path = $working_path . 'input/';
our $output_path = $working_path . 'output/';
our $report_filename = undef;
our $force = 0;
our $dry = 0;
our $skip_errors = 0;
our $run_id = '';
our $import_db_file = _get_import_db_file($run_id,'import');
our $import_multithreading = $enablemultithreading;
our $import_multithreading = 0; #$enablemultithreading;
our @subscriber_filenames = ();
our $subscriber_import_numofthreads = $cpucount;
@ -158,27 +144,15 @@ our $provision_subscriber_multithreading = $enablemultithreading;
our $provision_subscriber_numofthreads = $cpucount;
our $webpassword_length = 8;
our $webusername_length = 8;
#our $set_allowed_ips_multithreading = $enablemultithreading;
#our $set_allowed_ips_numofthreads = $cpucount;
#our $allowed_ips = [];
our $set_call_forwards_multithreading = $enablemultithreading;
our $set_call_forwards_numofthreads = $cpucount;
our $cfb_priorities = [];
our $cfb_timeouts = [];
our $cfu_priorities = [];
our $cfu_timeouts = [];
our $cft_priorities = [];
our $cft_timeouts = [];
our $cfna_priorities = [];
our $cfna_timeouts = [];
our $cfnumber_exclude_pattern = undef;
our $cfnumber_trim_pattern = undef;
our $ringtimeout = undef;
#our $set_preference_bulk_multithreading = $enablemultithreading;
#our $set_preference_bulk_numofthreads = $cpucount;
#our $concurrent_max_total = undef;
our $default_channels_map = {
0 => 1,
4 => 10,
8 => 25, # "more than 10 numbers" => concurrent_max = 25
};
our $cf_default_priority = 1;
our $cf_default_timeout = 300;
our $cft_default_ringtimeout = 20;
sub update_settings {
@ -192,11 +166,23 @@ sub update_settings {
#&$configurationinfocode("testinfomessage",$configlogger);
$result &= _prepare_working_paths(1);
if ($data->{report_filename}) {
$report_filename = $output_path . sprintf('/' . $data->{report_filename},timestampdigits());
if (-e $report_filename and (unlink $report_filename) == 0) {
filewarn('cannot remove ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__));
$report_filename = undef;
}
} else {
$report_filename = undef;
}
$dry = $data->{dry} if exists $data->{dry};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$import_db_file = _get_import_db_file($run_id,'import');
$import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading};
#if ($import_multithreading) {
# configurationerror($configfile,"import_multithreading must be disabled to preserve record order",getlogger(__PACKAGE__));
#}
@subscriber_filenames = _get_import_filenames(\@subscriber_filenames,$data,'subscriber_filenames');
$subscriber_import_numofthreads = _get_numofthreads($cpucount,$data,'subscriber_import_numofthreads');
@ -239,45 +225,11 @@ sub update_settings {
configurationerror($configfile,'webusername_length greater than 7 required',getlogger(__PACKAGE__));
$result = 0;
}
#$set_allowed_ips_multithreading = $data->{set_allowed_ips_multithreading} if exists $data->{set_allowed_ips_multithreading};
#$set_allowed_ips_numofthreads = _get_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads');
#$allowed_ips = [ split_tuple($data->{allowed_ips}) ] if exists $data->{allowed_ips};
#foreach my $ipnet (@$allowed_ips) {
# if (not check_ipnet($ipnet)) {
# configurationerror($configfile,"invalid allowed_ip '$ipnet'",getlogger(__PACKAGE__));
# $result = 0;
# }
#}
$set_call_forwards_multithreading = $data->{set_call_forwards_multithreading} if exists $data->{set_call_forwards_multithreading};
$set_call_forwards_numofthreads = _get_numofthreads($cpucount,$data,'set_call_forwards_numofthreads');
$cfb_priorities = [ split_tuple($data->{cfb_priorities}) ] if exists $data->{cfb_priorities};
$cfb_timeouts = [ split_tuple($data->{cfb_timeouts}) ] if exists $data->{cfb_timeouts};
$cfu_priorities = [ split_tuple($data->{cfu_priorities}) ] if exists $data->{cfu_priorities};
$cfu_timeouts = [ split_tuple($data->{cfu_timeouts}) ] if exists $data->{cfu_timeouts};
$cft_priorities = [ split_tuple($data->{cft_priorities}) ] if exists $data->{cft_priorities};
$cft_timeouts = [ split_tuple($data->{cft_timeouts}) ] if exists $data->{cft_timeouts};
$cfna_priorities = [ split_tuple($data->{cfna_priorities}) ] if exists $data->{cfna_priorities};
$cfna_timeouts = [ split_tuple($data->{cfna_timeouts}) ] if exists $data->{cfna_timeouts};
$cfnumber_exclude_pattern = $data->{cfnumber_exclude_pattern} if exists $data->{cfnumber_exclude_pattern};
($regexp_result,$cfnumber_exclude_pattern) = parse_regexp($cfnumber_exclude_pattern,$configfile);
$result &= $regexp_result;
$cfnumber_trim_pattern = $data->{cfnumber_trim_pattern} if exists $data->{cfnumber_trim_pattern};
($regexp_result,$cfnumber_trim_pattern) = parse_regexp($cfnumber_trim_pattern,$configfile);
$result &= $regexp_result;
$ringtimeout = $data->{ringtimeout} if exists $data->{ringtimeout};
if (not defined $ringtimeout or $ringtimeout <= 0) {
configurationerror($configfile,'ringtimeout greater than 0 required',getlogger(__PACKAGE__));
$result = 0;
}
#$default_channels = $data->{default_channels} if exists $data->{default_channels};
#$set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading};
#$set_preference_bulk_numofthreads = _get_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads');
#$concurrent_max_total = $data->{concurrent_max_total} if exists $data->{concurrent_max_total};
#if (defined $concurrent_max_total and $concurrent_max_total <= 0) {
# configurationerror($configfile,'empty concurrent_max_total or greater than 0 required',getlogger(__PACKAGE__));
# $result = 0;
#}
$cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority};
$cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout};
$cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout};
return $result;
@ -303,10 +255,10 @@ sub _prepare_working_paths {
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $_numofthreads = $default_value;
$_numofthreads = $data->{$key} if exists $data->{$key};
$_numofthreads = $cpucount if $_numofthreads > $cpucount;
return $_numofthreads;
my $numofthreads = $default_value;
$numofthreads = $data->{$key} if exists $data->{$key};
$numofthreads = $cpucount if $numofthreads > $cpucount;
return $numofthreads;
}
sub _get_import_db_file {

@ -6,8 +6,6 @@ enablemultithreading = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
#provisioning_conf = /etc/ngcp-panel/provisioning.conf
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.84
accounting_port = 3306

@ -77,13 +77,11 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Check qw(
check_billing_db_tables
check_provisioning_db_tables
@ -104,16 +102,6 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Provisioning qw(
provision_subscribers
);
#use NGCP::BulkProcessor::Projects::Migration::Teletek::Preferences qw(
# set_allowed_ips
#
# set_preference_bulk
#);
#use NGCP::BulkProcessor::Projects::Migration::Teletek::Api qw(
# set_call_forwards
#);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
@ -409,9 +397,9 @@ sub import_allowedcli_task {
$stats .= "\n removed: $deleted_count rows";
};
if ($err or !$result) {
push(@$messages,"importing allowed clis INCOMPLETE$stats");
push(@$messages,"importing allowed clis (additional numbers) INCOMPLETE$stats");
} else {
push(@$messages,"importing allowed clis completed$stats");
push(@$messages,"importing allowed clis (additional numbers) completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
@ -428,13 +416,13 @@ sub import_truncate_allowedcli_task {
my $err = $@;
my $stats = '';
eval {
$stats .= "\n total allowed cli records: " .
$stats .= "\n total allowed cli (additional numbers) records: " .
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::countby_ccacsn() . ' rows';
};
if ($err or !$result) {
push(@$messages,"truncating imported allowed clis INCOMPLETE$stats");
push(@$messages,"truncating imported allowed clis (additional numbers) INCOMPLETE$stats");
} else {
push(@$messages,"truncating imported allowed clis completed$stats");
push(@$messages,"truncating imported allowed clis (additional numbers) completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
@ -502,7 +490,6 @@ sub import_truncate_clir_task {
}
sub import_callforward_task {
my ($messages) = @_;
@ -563,9 +550,6 @@ sub import_truncate_callforward_task {
}
sub import_registration_task {
my ($messages) = @_;
@ -625,99 +609,48 @@ sub import_truncate_registration_task {
}
sub create_subscriber_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
my ($result,$warning_count,$nonunique_contacts) = (0,0,{});
eval {
($result,$warning_count) = provision_subscribers();
($result,$warning_count,$nonunique_contacts) = provision_subscribers();
};
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
#$stats .= "\n total contracts: " .
# NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(undef,$reseller_id) . ' rows';
#my $active_count = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(
# $NGCP::BulkProcessor::Dao::Trunk::billing::contracts::ACTIVE_STATE,
# $reseller_id
#);
#$stats .= "\n active: $active_count rows";
#my $terminated_count = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(
# $NGCP::BulkProcessor::Dao::Trunk::billing::contracts::TERMINATED_STATE,
# $reseller_id
#);
#$stats .= "\n terminated: $terminated_count rows";
#$stats .= "\n total subscribers: " .
# NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(undef,$reseller_id) . ' rows';
#$active_count = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(
# $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE,
# $reseller_id
#);
#$stats .= "\n active: $active_count rows";
#$terminated_count = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(
# $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::TERMINATED_STATE,
# $reseller_id
#);
#$stats .= "\n terminated: $terminated_count rows";
$stats .= "\n total contracts: " .
NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(undef,undef) . ' rows';
$stats .= "\n total subscribers: " .
NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(undef,undef) . ' rows';
$stats .= "\n total aliases: " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,undef) . ' rows';
$stats .= "\n primary aliases: " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,1) . ' rows';
$stats .= "\n call forwards: " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,undef) . ' rows';
$stats .= "\n registrations: " .
NGCP::BulkProcessor::Dao::Trunk::kamailio::location::countby_usernamedomain(undef,undef) . ' rows';
$stats .= "\n non-unique contacts skipped:\n " . join("\n ",keys %$nonunique_contacts)
if (scalar keys %$nonunique_contacts) > 0;
};
if ($err or !$result) {
push(@$messages,"create subscribers INCOMPLETE$stats");
} else {
push(@$messages,"create subscribers completed$stats");
#if (not $dry and $reprovision_upon_password_change and $updated_password_count > 0) {
# push(@$messages,"THERE WERE $updated_password_count UPDATED PASSWORDS. YOU MIGHT WANT TO RESTART SEMS NOW ...");
#}
if (not $dry) {
push(@$messages,"YOU MIGHT WANT TO RESTART KAMAILIO FOR PERMANENT REGISTRATIONS TO COME INTO EFFECT");
}
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
#sub set_call_forwards_task {
#
# my ($messages,$mode) = @_;
# my ($result,$warning_count) = (0,0);
# eval {
# if ($batch) {
# ($result,$warning_count) = set_call_forwards_batch($mode);
# } else {
# ($result,$warning_count) = set_call_forwards($mode);
# }
# };
# my $err = $@;
# my $stats = ($skip_errors ? ": $warning_count warnings" : '');
# eval {
# $stats .= "\n '" . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFU_TYPE . "': " .
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFU_TYPE) . ' rows';
#
# $stats .= "\n '" . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFB_TYPE . "': " .
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFB_TYPE) . ' rows';
#
# $stats .= "\n '" . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFT_TYPE . "': " .
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFT_TYPE) . ' rows';
#
# $stats .= "\n '" . $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFNA_TYPE . "': " .
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::CFNA_TYPE) . ' rows';
# };
# if ($err or !$result) {
# push(@$messages,"set subscribers\' call forwards INCOMPLETE$stats");
# } else {
# push(@$messages,"set subscribers\' call forwards completed$stats");
# }
# destroy_all_dbs(); #every task should leave with closed connections.
# return $result;
#
#}
#END {
# # this should not be required explicitly, but prevents Log4Perl's

@ -40,22 +40,13 @@ ignore_registration_unique = 0
registration_import_single_row_txn = 1
provision_subscriber_multithreading = 1
#provision_subscriber_numofthreads = 6
provision_subscriber_numofthreads = 2
webpassword_length = 8
webusername_length = 8
report_filename = provision.txt
#report_filename = provision_%s.json
#default_channels = 1
set_call_forwards_multithreading = 1
#set_call_forwards_numofthreads = 6
cfb_priorities =
cfb_timeouts = 300
cfu_priorities = 1
cfu_timeouts = 300
cft_priorities = 1
cft_timeouts = 300
cfna_priorities = 1
cfna_timeouts = 300
ringtimeout = 20
#cfnumber_exclude_pattern =
cfnumber_trim_pattern = ^05\d{2}
cf_default_priority: 1
cf_default_timeout: 300
cft_default_ringtimeout: 20

@ -25,6 +25,7 @@ require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
_add_headers
convert_bools
);
#my $logger = getlogger(__PACKAGE__);
@ -88,6 +89,11 @@ sub baseuri {
}
sub path {
my $self = shift;
return $self->{path};
}
sub _clearrequestdata {
my $self = shift;
@ -188,11 +194,15 @@ sub _get_request_uri {
if (defined $path_query) {
if (blessed($path_query) and $path_query->isa('URI')) {
$path_query = $path_query->path_query();
if (defined $self->{path} and length($self->{path}) > 0) {
$path_query =~ s!^$self->{path}!!;
}
}
} else {
$path_query = '';
}
if (defined $self->{path} and length($self->{path}) > 0) {
#$path_query =~ s!^$self->{path}!!;
$path_query =~ s!^/!!;
$path_query = $self->{path} . $path_query;
}
@ -244,6 +254,33 @@ sub _post {
}
sub _post_raw {
my $self = shift;
my ($path_query_request,$data,$headers) = @_;
$self->_clearrequestdata();
$self->{requestdata} = $data;
if (blessed($path_query_request) and $path_query_request->isa('HTTP::Request')) {
$self->{req} = $path_query_request;
$self->_log_request($self->{req});
} else {
$self->{req} = HTTP::Request->new('POST',$self->_get_request_uri($path_query_request));
_add_headers($self->{req},$headers);
$self->_log_request($self->{req});
$self->{req}->content($data);
}
$self->{res} = $self->_ua_request($self->{req});
$self->_log_response($self->{res});
eval {
$self->{responsedata} = $self->_decode_post_response($self->{res}->decoded_content());
};
if ($@) {
restresponseerror($self,'error decoding POST response content: ' . $@,$self->{res},getlogger(__PACKAGE__));
}
return $self->{res};
}
sub post {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
@ -281,11 +318,62 @@ sub _get {
}
sub _get_raw {
my $self = shift;
my ($path_query,$headers) = @_;
$self->_clearrequestdata();
$self->{req} = HTTP::Request->new('GET',$self->_get_request_uri($path_query));
_add_headers($self->{req},$headers);
$self->_log_request($self->{req});
$self->{res} = $self->_ua_request($self->{req});
$self->_log_response($self->{res});
return $self->{res};
}
sub get {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub _add_head_headers {
my $self = shift;
my ($req,$headers) = @_;
_add_headers($req,$headers);
}
sub _decode_head_response {
my $self = shift;
my ($data) = @_;
return $self->_decode_response_content($data);
}
sub _head {
my $self = shift;
my ($path_query,$headers) = @_;
$self->_clearrequestdata();
$self->{req} = HTTP::Request->new('HEAD',$self->_get_request_uri($path_query));
$self->_add_head_headers($self->{req},$headers);
$self->_log_request($self->{req});
$self->{res} = $self->_ua_request($self->{req});
$self->_log_response($self->{res});
eval {
$self->{responsedata} = $self->_decode_head_response($self->{res}->decoded_content());
};
if ($@) {
restresponseerror($self,'error decoding HEAD response content: ' . $@,$self->{res},getlogger(__PACKAGE__));
}
return $self->{res};
}
sub head {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub _add_patch_headers {
my $self = shift;
my ($req,$headers) = @_;
@ -381,6 +469,7 @@ sub _put {
}
sub put {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
@ -435,9 +524,37 @@ sub _get_page_size_query_param {
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub _get_total_count_expected_query_param {
my $self = shift;
my ($total_count_expected) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub _get_sf_query_param {
my $self = shift;
my ($sf) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub get_collection_page_query_uri {
my $self = shift;
my ($collection_path_query,$page_size,$page_num) = @_;
my $collection_path_query = shift;
my $page_size;
my $page_num;
my $total_count_expected;
my $sf;
if (ref $_[0]) {
my $p = shift;
$page_size = $p->{page_size};
$page_num = $p->{page_num};
$total_count_expected = 1;
$sf = shift;
} else {
($page_size,$page_num) = @_;
$total_count_expected = 0;
$sf = undef;
}
#my ($collection_path_query,$page_size,$page_num) = @_;
#if ($page_size <= 0) {
# resterror($self,"positive collection page size required",getlogger(__PACKAGE__));
#}
@ -447,10 +564,15 @@ sub get_collection_page_query_uri {
my $page_uri = $self->_get_request_uri($collection_path_query);
my $page_size_query_param = $self->_get_page_size_query_param($page_size);
my $page_num_query_param = $self->_get_page_num_query_param($page_num);
my $total_count_expected_query_param = $self->_get_total_count_expected_query_param($total_count_expected);
my $sf_query_param;
$sf_query_param = $self->_get_sf_query_param($sf) if defined $sf;
my @query_params = ();
push(@query_params,$page_uri->query()) if $page_uri->query();
push(@query_params,$page_size_query_param) if defined $page_size_query_param && length($page_size_query_param) > 0;
push(@query_params,$page_num_query_param) if defined $page_num_query_param && length($page_num_query_param) > 0;
push(@query_params,$total_count_expected_query_param) if defined $total_count_expected_query_param && length($total_count_expected_query_param) > 0;
push(@query_params,$sf_query_param) if defined $sf_query_param && length($sf_query_param) > 0;
$page_uri->query(join('&',@query_params));
@ -498,4 +620,40 @@ sub get_defaultcollectionpagesize {
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub get_firscollectionpagenum {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub convert_bools {
my %unrecognized;
local *_convert_bools = sub {
my $ref_type = ref($_[0]);
if (!$ref_type) {
# Nothing.
}
elsif ($ref_type eq 'HASH') {
_convert_bools($_) for values(%{ $_[0] });
}
elsif ($ref_type eq 'ARRAY') {
_convert_bools($_) for @{ $_[0] };
}
elsif (
$ref_type eq 'JSON::PP::Boolean' # JSON::PP
|| $ref_type eq 'Types::Serialiser::Boolean' # JSON::XS
) {
$_[0] = $_[0] ? 1 : 0;
}
else {
++$unrecognized{$ref_type};
}
};
&_convert_bools;
carp("Encountered an object of unrecognized type $_")
for sort values(%unrecognized);
}
1;

@ -27,7 +27,7 @@ use NGCP::BulkProcessor::LogError qw(
restprocessingfailed
);
use NGCP::BulkProcessor::Utils qw(threadid urlencode urldecode);
use NGCP::BulkProcessor::Utils qw(threadid);
require Exporter;
our @ISA = qw(Exporter);
@ -36,6 +36,7 @@ our @EXPORT_OK = qw(
copy_row
process_collection
get_query_string
override_fields
);
my $collectionprocessing_threadqueuelength = 10;
@ -55,12 +56,18 @@ sub get_query_string {
} else {
$query .= '&';
}
#$query .= URI::Escape::uri_escape($param) . '=' . URI::Escape::uri_escape($filters->{$param});
$query .= urlencode($param) . '=' . urlencode($filters->{$param});
$query .= URI::Escape::uri_escape($param) . '=' . URI::Escape::uri_escape_utf8($filters->{$param});
}
return $query;
};
sub override_fields {
my ($item,$load_recursive) = @_;
foreach my $override (keys %{$load_recursive->{_overrides}}) {
$item->{$override} = $load_recursive->{_overrides}->{$override};
}
}
sub init_item {
my ($item,$fieldnames) = @_;
@ -102,6 +109,7 @@ sub copy_row {
$item->{$fieldname} = undef;
}
} else {
$item->{$fieldname} = $row; #scalar
last;
}
}
@ -114,6 +122,7 @@ sub process_collection {
my %params = @_;
my ($get_restapi,
$path_query,
$post_data,
$headers,
$extract_collection_items_params,
$process_code,
@ -125,6 +134,7 @@ sub process_collection {
$collectionprocessing_threads) = @params{qw/
get_restapi
path_query
post_data
headers
extract_collection_items_params
process_code
@ -163,6 +173,7 @@ sub process_collection {
headers => $headers,
blocksize => $blocksize,
extract_collection_items_params => $extract_collection_items_params,
post_data => $post_data,
});
for (my $i = 0; $i < $collectionprocessing_threads; $i++) {
@ -215,7 +226,9 @@ sub process_collection {
my $i = 0;
while (1) {
fetching_items($restapi,$path_query,$i,$blocksize,getlogger(__PACKAGE__));
my $collection_page = $restapi->get($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount),$headers);
my $collection_page;
$collection_page = $restapi->get($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$headers) unless $post_data;
$collection_page = $restapi->post($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$post_data,$headers) if $post_data;
my $rowblock = $restapi->extract_collection_items($collection_page,$blocksize,$blockcount,$extract_collection_items_params);
my $realblocksize = scalar @$rowblock;
if ($realblocksize > 0) {
@ -292,7 +305,9 @@ sub _reader {
while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer
fetching_items($restapi,$context->{path_query},$i,$blocksize,getlogger(__PACKAGE__));
my $collection_page = $restapi->get($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount),$context->{headers});
my $collection_page;
$collection_page = $restapi->get($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$context->{headers}) unless $context->{post_data};
$collection_page = $restapi->post($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$context->{post_data},$context->{headers}) if $context->{post_data};
my $rowblock = $restapi->extract_collection_items($collection_page,$blocksize,$blockcount,$context->{extract_collection_items_params});
my $realblocksize = scalar @$rowblock;
my %packet :shared = ();

@ -45,7 +45,8 @@ use MIME::Base64 qw(encode_base64 decode_base64);
#http://blogs.perl.org/users/steven_haryanto/2010/09/comparison-of-perl-serialization-modules.html
use Storable; # qw( nfreeze thaw );
use JSON::XS; # qw(encode_json decode_json);
use JSON qw();
#use JSON::XS; # qw(encode_json decode_json);
use Data::Dump; # qw(dump);
$Data::Dump::INDENT = ' ';
@ -166,12 +167,14 @@ sub deserialize_xml {
sub serialize_json {
my $input_ref = shift;
return JSON::XS::encode_json($input_ref);
#return JSON::XS::encode_json($input_ref);
return JSON::to_json($input_ref,{ allow_blessed => 1, convert_blessed => 1, pretty => 0 });
}
sub deserialize_json {
my $input_ref = shift;
return JSON::XS::decode_json($input_ref);
#return JSON::XS::decode_json($input_ref);
return JSON::from_json($$input_ref);
}
sub serialize_yaml {

@ -882,6 +882,10 @@ sub db_get_begin {
if ($transactional) {
#$self->lock_tables({ $tablename => 'WRITE' });
$self->db_begin();
} else {
my $offset = shift;
my $limit = shift;
$query = $self->paginate_sort_query($query,$offset,$limit,undef);
}
$self->{sth} = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
@ -902,6 +906,13 @@ sub multithreading_supported {
}
sub rowblock_transactional {
my $self = shift;
return 0;
}
sub db_get_rowblock {
my $self = shift;

@ -486,6 +486,13 @@ sub multithreading_supported {
}
sub rowblock_transactional {
my $self = shift;
return $rowblock_transactional;
}
sub truncate_table {
my $self = shift;

@ -438,6 +438,13 @@ sub multithreading_supported {
}
sub rowblock_transactional {
my $self = shift;
return $rowblock_transactional;
}
sub insert_ignore_phrase {
my $self = shift;

@ -465,6 +465,13 @@ sub multithreading_supported {
}
sub rowblock_transactional {
my $self = shift;
return $rowblock_transactional;
}
sub truncate_table {
my $self = shift;

@ -469,6 +469,13 @@ sub multithreading_supported {
}
sub rowblock_transactional {
my $self = shift;
return $rowblock_transactional;
}
sub truncate_table {
my $self = shift;

@ -459,6 +459,13 @@ sub multithreading_supported {
}
sub rowblock_transactional {
my $self = shift;
return $rowblock_transactional;
}
sub truncate_table {
my $self = shift;

@ -75,7 +75,7 @@ my $LongTruncOk = 0;
#my $lock_do_chunk = 0; #1;
#my $lock_get_chunk = 0; #1;
my $rowblock_transactional = 1;
my $rowblock_transactional = 0;
#SQLite transactions are always serializable.
@ -341,7 +341,11 @@ sub getfieldnames {
my $self = shift;
my $tablename = shift;
my @fieldnames = keys %{$self->db_get_all_hashref('PRAGMA table_info(' . $tablename . ')','name')};
#my @fieldnames = keys %{$self->db_get_all_hashref('PRAGMA table_info(' . $tablename . ')','name')};
my @fieldnames = ();
foreach my $field (@{$self->db_get_all_arrayref('PRAGMA table_info(' . $tablename . ')')}) {
push(@fieldnames,$field->{name});
}
return \@fieldnames;
}
@ -351,13 +355,21 @@ sub getprimarykeycols {
my $self = shift;
my $tablename = shift;
#return $self->db_get_col('SHOW FIELDS FROM ' . $tablename);
my $fieldinfo = $self->db_get_all_hashref('PRAGMA table_info(' . $tablename . ')','name');
#my $fieldinfo = $self->db_get_all_hashref('PRAGMA table_info(' . $tablename . ')','name');
#my @keycols = ();
#foreach my $fieldname (keys %$fieldinfo) {
# if ($fieldinfo->{$fieldname}->{'pk'}) {
# push @keycols,$fieldname;
# }
#}
my @keycols = ();
foreach my $fieldname (keys %$fieldinfo) {
if ($fieldinfo->{$fieldname}->{'pk'}) {
push @keycols,$fieldname;
foreach my $field (@{$self->db_get_all_arrayref('PRAGMA table_info(' . $tablename . ')')}) {
if ($field->{'pk'}) {
push(@keycols,$field->{name});
}
}
return \@keycols;
}
@ -488,6 +500,13 @@ sub multithreading_supported {
}
sub rowblock_transactional {
my $self = shift;
return $rowblock_transactional;
}
sub insert_ignore_phrase {
my $self = shift;

@ -905,12 +905,14 @@ sub transfer_table {
#$target_db = &$get_target_db($writer_connection_name);
eval {
$db->db_get_begin($selectstatement,@$values); #$tablename
$db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename
my $i = 0;
while (1) {
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__));
$db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional;
my $rowblock = $db->db_get_rowblock($blocksize);
$db->db_finish() unless $db->rowblock_transactional;
my $realblocksize = scalar @$rowblock;
if ($realblocksize > 0) {
writing_rows($target_db,$targettablename,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
@ -931,7 +933,7 @@ sub transfer_table {
last;
}
}
$db->db_finish();
$db->db_finish() if $db->rowblock_transactional;
};
@ -1084,7 +1086,6 @@ sub process_table {
}
my $errorstate = $RUNNING;
#my $blocksize;
if ($enablemultithreading and $multithreading and $db->multithreading_supported() and $cpucount > 1) { # and $multithreaded) { # definitely no multithreading when CSVDB is involved
@ -1209,12 +1210,14 @@ sub process_table {
&$init_process_context_code($context);
}
$db->db_get_begin($selectstatement,@$values); #$tablename
$db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename
my $i = 0;
while (1) {
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__));
$db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional;
my $rowblock = $db->db_get_rowblock($blocksize);
$db->db_finish() unless $db->rowblock_transactional;
my $realblocksize = scalar @$rowblock;
if ($realblocksize > 0) {
processing_rows($tid,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
@ -1233,7 +1236,7 @@ sub process_table {
last;
}
}
$db->db_finish();
$db->db_finish() if $db->rowblock_transactional;
};
@ -1374,7 +1377,7 @@ sub _reader {
my $blockcount = 0;
eval {
$reader_db = &{$context->{get_db}}(); #$reader_connection_name);
$reader_db->db_get_begin($context->{selectstatement},@{$context->{values_ref}}); #$context->{tablename}
$reader_db->db_get_begin($context->{selectstatement},@{$context->{values_ref}}) if $reader_db->rowblock_transactional; #$context->{tablename}
tablethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__));
while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up
#yield();
@ -1384,7 +1387,9 @@ sub _reader {
my $state = $RUNNING; #start at first
while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer
fetching_rows($reader_db,$context->{tablename},$i,$context->{blocksize},$context->{rowcount},getlogger(__PACKAGE__));
$reader_db->db_get_begin($context->{selectstatement},$i,$context->{blocksize},@{$context->{values_ref}}) unless $reader_db->rowblock_transactional;
my $rowblock = $reader_db->db_get_rowblock($context->{blocksize});
$reader_db->db_finish() unless $reader_db->rowblock_transactional;
my $realblocksize = scalar @$rowblock;
#my $packet = {rows => $rowblock,
# size => $realblocksize,
@ -1419,7 +1424,7 @@ sub _reader {
(($state & $ERROR) == 0 ? 'no defunct thread(s)' : 'defunct thread(s)') . ') ...'
,getlogger(__PACKAGE__));
}
$reader_db->db_finish();
$reader_db->db_finish() if $reader_db->rowblock_transactional;
};
tablethreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
# stop the consumer:

Loading…
Cancel
Save