TT#69125 mr1.0.3 source db schema

Change-Id: I35b67c66236696af6c041e34dc4a732fdca1069d
(cherry picked from commit f763ad109c)
changes/81/35581/1
Rene Krenn 6 years ago
parent 023c0c3218
commit 85da933c0c

@ -0,0 +1,104 @@
package NGCP::BulkProcessor::DSPath;
use strict;
#use 5.006001;
use warnings;
use Scalar::Util qw/reftype blessed/;
#use Carp;
#our $VERSION = '1.4.1';
# this is a reformatted variant of https://metacpan.org/pod/release/ZAPHAR/Data-Path-1.4.1/lib/Data/Path.pm
# for local control. the only functional difference so far is that path expressions use . instead of /
sub new {
my ($class, $data, $callbacks) = @_;
$callbacks //= {};
my $self = {
data => $data,
# set call backs to default if not given
callbacks => {
key_does_not_exist => $callbacks->{key_does_not_exist} // sub {
my ($data, $key, $index, $value, $rest) = @_;
die "key $key does not exists\n";
},
index_does_not_exist => $callbacks->{index_does_not_exist} // sub {
my ($data, $key, $index, $value, $rest) = @_;
die "index $key\[$index\] does not exists\n";
},
retrieve_index_from_non_array => $callbacks->{retrieve_index_from_non_array} // sub {
my ($data, $key, $index, $value, $rest) = @_;
die "tried to retrieve an index $index from a no array value (in key $key)\n";
},
retrieve_key_from_non_hash => $callbacks->{retrieve_key_from_non_hash} // sub {
my ($data, $key, $index, $value, $rest) = @_;
die "tried to retrieve a key from a no hash value (in key $key)\n";
},
not_a_coderef_or_method => $callbacks->{not_a_coderef_or_method} // sub {
my ($data, $key, $index, $value, $rest) = @_;
die "tried to retrieve from a non-existant coderef or method: $key in $data";
}
},
};
return bless $self,$class;
}
sub get {
my ($self,$rkey,$data) = @_;
# set data to
$data //= $self->{data};
# get key till . or [
my $key;
$key = $1 if ( $rkey =~ s/^\.([^\.|\[]+)// );
die 'malformed path expression' unless $key;
die 'malformed array index request' if $rkey =~ /^\[([^\d]*)\]/;
# check index for index
my $index;
$index = $1 if ( $rkey =~ s/^\[(\d+)\]// );
# set rest
my $rest = $rkey;
# get key from data
my $value;
if ($key =~ s/(\(\))$//) {
$self->{callbacks}->{not_a_coderef_or_method}->($data, $key, $index, $value, $rest)
unless exists $data->{$key} or (blessed $data and $data->can($key));
$value = $data->{$key}->() if exists $data->{$key};
$value = $data->$key() if (blessed $data and $data->can($key));
} else {
$value = $data->{$key} if exists $data->{$key};
}
# croak if key does not exists and something after that is requested
$self->{callbacks}->{key_does_not_exist}->($data, $key, $index, $value, $rest)
if (not exists $data->{$key} and length($rest) > 0);
# check index
if (defined $index) {
# croak if index does not exists and something after that is requested
$self->{callbacks}->{index_does_not_exist}->($data, $key, $index, $value, $rest)
if (not exists $value->[$index] and length($rest) > 0);
if (reftype $value eq 'ARRAY') {
$value = $value->[$index];
} else {
$self->{callbacks}->{retrieve_index_from_non_array}->($data, $key, $index, $value, $rest);
}
}
# check if last element is reached
if ($rest) {
if (defined $value and (reftype $value eq 'HASH' or blessed $value)) {
$value = $self->get($rest,$value);
} else {
$self->{callbacks}->{retrieve_key_from_non_hash}->($data, $key, $index, $value, $rest);
}
}
return $value;
}
1;

@ -25,6 +25,9 @@ our @EXPORT_OK = qw(
gettablename
check_table
findby_resellerid_level
findby_resellername_level
insert_row
);
@ -75,6 +78,22 @@ sub findby_resellerid_level {
}
sub findby_resellername_level {
my ($reseller_name,$level,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT n.* FROM ' . $table . ' n join billing.resellers r ON n.reseller_id = r.id WHERE r.name = ? AND n.level = ?';
my @params = ($reseller_name,$level);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub insert_row {
my $db = &$get_db();

@ -41,6 +41,7 @@ our @EXPORT_OK = qw(
find_random
findby_contractid_states
findby_domainid_usernames
findby_domain_usernames
$TERMINATED_STATE
$ACTIVE_STATE
@ -102,6 +103,27 @@ sub findby_domainid_usernames {
}
sub findby_domain_usernames {
my ($xa_db,$domain,$usernames,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT s.* FROM ' . $table . ' s join billing.domains d on s.domain_id = d.id WHERE d.domain = ?';
my @params = ($domain);
if (defined $usernames and 'ARRAY' eq ref $usernames) {
$stmt .= ' AND ' . $db->columnidentifier('username') . ' IN (' . substr(',?' x scalar @$usernames,1) . ')';
push(@params,@$usernames);
}
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_domainid_username_states {
my ($xa_db,$domain_id,$username,$states,$load_recursive) = @_;

@ -0,0 +1,131 @@
package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_fax_destinations;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
rowsdeleted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
insert_record
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_subscriberid
);
my $tablename = 'voip_fax_destinations';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'subscriber_id',
'destination',
'filetype',
'cc',
'incoming',
'outgoing',
'status',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($subscriber_id) = @params{qw/
subscriber_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('subscriber_id') . ') VALUES (' .
'?)',
$subscriber_id,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,130 @@
package NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_fax_preferences;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
rowsdeleted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
insert_record
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
);
my $tablename = 'voip_fax_preferences';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'subscriber_id',
'password',
'name',
'active',
'send_status',
'send_copy',
't38',
'ecm',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($subscriber_id) = @params{qw/
subscriber_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('subscriber_id') . ') VALUES (' .
'?)',
$subscriber_id,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -23,6 +23,7 @@ our @EXPORT_OK = qw(
check_table
findby_attribute
findall
$ALLOWED_CLIS_ATTRIBUTE
$CLI_ATTRIBUTE
@ -172,6 +173,21 @@ sub findby_attribute {
}
sub findall {
my ($load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return buildrecords_fromrows($rows,$load_recursive);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;

@ -0,0 +1,239 @@
package NGCP::BulkProcessor::Dao::mr103::billing::contracts;
use strict;
use threads::shared;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
destroy_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
copy_row
process_table
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::mr103::billing::voip_subscribers qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
source_process_records
source_findby_id
$ACTIVE_STATE
$TERMINATED_STATE
);
my $tablename = 'contracts';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'customer_id',
'reseller_id',
'contact_id',
'order_id',
'status',
'modify_timestamp',
'create_timestamp',
'activate_timestamp',
'terminate_timestamp',
];
my $indexes = {};
my $insert_unique_fields = [];
our $ACTIVE_STATE = 'active';
our $TERMINATED_STATE = 'terminated';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_process_records {
my %params = @_;
my ($source_dbs,
$process_code,
$read_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$destroy_reader_dbs_code,
$multithreading,
$blocksize,
$numofthreads) = @params{qw/
source_dbs
process_code
read_code
static_context
init_process_context_code
uninit_process_context_code
destroy_reader_dbs_code
multithreading
blocksize
numofthreads
/};
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
return process_table(
get_db => $source_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
#return &$process_code($context,buildrecords_fromrows_source($rowblock,$source_db,$load_recursive),$row_offset);
return &$process_code($context,$rowblock,$row_offset);
},
read_code => sub {
my ($rowblock) = @_;
return source_buildrecords_fromrows($rowblock,$source_dbs);
},
static_context => $static_context,
blocksize => $blocksize,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => $destroy_reader_dbs_code,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
'select' => 'SELECT c.*,r.name as reseller_name FROM ' . $table . ' c left join billing.resellers r on c.reseller_id = r.id WHERE c.status != "' . $TERMINATED_STATE . '"', # and id = 7185',
'selectcount' => 'SELECT COUNT(c.id) FROM ' . $table . ' c WHERE c.status != "' . $TERMINATED_STATE . '"', # and id = 7185',
);
}
sub source_findby_id {
my ($source_dbs,$id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT c.*,r.name as reseller_name FROM ' . $table . ' c join billing.resellers r on c.reseller_id = r.id WHERE ' .
'c.id = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs)->[0];
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{billing_db},$row);
if ('ARRAY' eq ref $row) {
$record->{reseller_name} = pop @$row;
} else {
$record->{reseller_name} = $row->{reseller_name};
}
#$record->{billing_mappings} = NGCP::BulkProcessor::Dao::mr103::billing::billing_mappings::source_findby_contractid($source_dbs,$record->{id});
#$record->{contact} = NGCP::BulkProcessor::Dao::mr103::billing::contacts::source_findby_id($source_dbs,$record->{contact_id});
#$record->{contract_balances} = NGCP::BulkProcessor::Dao::mr103::billing::contract_balances::source_findby_contractid($source_dbs,$record->{id});
#if ($record->{reseller_id}) {
$record->{voip_subscribers} = NGCP::BulkProcessor::Dao::mr103::billing::voip_subscribers::source_findby_contractid($source_dbs,$record->{id});
#}
#delete $record->{reseller_id};
#
#delete $record->{contact_id};
#delete $record->{customer_id};
#delete $record->{order_id};
#delete $record->{id};
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,119 @@
package NGCP::BulkProcessor::Dao::mr103::billing::domain_resellers;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::mr103::billing::domains qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_resellerid
);
my $tablename = 'domain_resellers';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'domain_id',
'reseller_id',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_resellerid {
my ($source_dbs,$id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('reseller_id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records = (); # : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{billing_db},$row);
$record->{domain} = NGCP::BulkProcessor::Dao::mr103::billing::domains::source_findby_id($source_dbs,$record->{domain_id});
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,116 @@
package NGCP::BulkProcessor::Dao::mr103::billing::domains;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
);
my $tablename = 'domains';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'domain',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_id {
my ($source_dbs,$id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs)->[0];
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records = (); # : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{billing_db},$row);
#$record->{voip_dom_preferences} = NGCP::BulkProcessor::Dao::mr103::provisioning::voip_dom_preferences::source_findby_domain($source_dbs,$record->{domain});
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,174 @@
package NGCP::BulkProcessor::Dao::mr103::billing::ncos_levels;
use strict;
## no critic
#use threads::shared;
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::mr103::billing::ncos_pattern_list qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_resellerid
source_getuniquename
);
my $tablename = 'ncos_levels';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'reseller_id',
'level',
'mode',
'local_ac',
'description',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_resellerid {
my ($source_dbs,$reseller_id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('reseller_id') . ' = ?';
my @params = ($reseller_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_getuniquename {
my ($source_dbs,$id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT r.name as reseller_name,n.level as level FROM ' . $table . ' n join billing.resellers r on n.reseller_id = r.id WHERE ' .
'n.id = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return $rows->[0];
#return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records = (); # : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{billing_db},$row);
# transformations go here ...
$record->{ncos_pattern_list} = NGCP::BulkProcessor::Dao::mr103::billing::ncos_pattern_list::source_findby_ncoslevelid($source_dbs,$record->{id});
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,142 @@
package NGCP::BulkProcessor::Dao::mr103::billing::ncos_pattern_list;
use strict;
## no critic
#use threads::shared;
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_ncoslevelid
);
my $tablename = 'ncos_pattern_list';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'ncos_level_id',
'pattern',
'description',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_ncoslevelid {
my ($source_dbs,$ncos_level_id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('ncos_level_id') . ' = ?';
my @params = ($ncos_level_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records = (); # : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{billing_db},$row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,131 @@
package NGCP::BulkProcessor::Dao::mr103::billing::resellers;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
#use NGCP::BulkProcessor::Dao::mr103::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr103::billing::domain_resellers qw();
use NGCP::BulkProcessor::Dao::mr103::billing::ncos_levels qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findall
$TERMINATED_STATE
);
my $tablename = 'resellers';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'contract_id',
'name',
'status',
];
my $indexes = {};
our $TERMINATED_STATE = 'terminated';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findall {
my ($source_dbs) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records = (); # : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{billing_db},$row);
# transformations go here ...
#$record->{contract} = NGCP::BulkProcessor::Dao::mr103::billing::contracts::source_findby_id($source_dbs,$record->{contract_id});
my @domains = ();
foreach my $domain_reseller (@{NGCP::BulkProcessor::Dao::mr103::billing::domain_resellers::source_findby_resellerid($source_dbs,$record->{id})}) {
push(@domains,$domain_reseller->{domain});
}
$record->{domains} = \@domains;
$record->{ncos_levels} = NGCP::BulkProcessor::Dao::mr103::billing::ncos_levels::source_findby_resellerid($source_dbs,$record->{id});
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,177 @@
package NGCP::BulkProcessor::Dao::mr103::billing::voip_numbers;
use strict;
## no critic
use threads::shared;
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_subscriberid
source_findby_id
);
my $tablename = 'voip_numbers';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'cc',
'ac',
'sn',
'reseller_id',
'subscriber_id',
'status',
'ported',
'list_timestamp',
];
my $indexes = {};
our $ACTIVE_STATE = 'active';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_subscriberid {
my ($source_dbs,$subscriber_id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT vn.*,r.name as reseller_name FROM ' . $table . ' vn left join billing.resellers r on vn.reseller_id = r.id WHERE ' .
'vn.subscriber_id = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_findby_id {
my ($source_dbs,$id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT vn.*,r.name as reseller_name FROM ' . $table . ' vn left join billing.resellers r on vn.reseller_id = r.id WHERE ' .
'vn.id = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs)->[0];
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{billing_db},$row);
# transformations go here ...
$record->{reseller_name} = $row->{reseller_name};
#delete $record->{id};
#delete $record->{reseller_id};
#delete $record->{subscriber_id};
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,165 @@
package NGCP::BulkProcessor::Dao::mr103::billing::voip_subscribers;
use strict;
## no critic
use threads::shared;
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::mr103::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::mr103::billing::voip_numbers qw();
use NGCP::BulkProcessor::Dao::mr103::billing::domains qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_contractid
);
my $tablename = 'voip_subscribers';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'contract_id',
'uuid',
'username',
'domain_id',
'status',
'primary_number_id',
];
my $indexes = {};
my $insert_unique_fields = [];
our $TERMINATED_STATE = 'terminated';
our $ACTIVE_STATE = 'active';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_contractid {
my ($source_dbs,$contract_id) = @_;
my $source_db = $source_dbs->{billing_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT s.*,d.domain as domain FROM ' . $table . ' s JOIN billing.domains d ON s.domain_id = d.id WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{billing_db},$row);
$record->{domain} = $row->{domain};
$record->{provisioning_voip_subscriber} = NGCP::BulkProcessor::Dao::mr103::provisioning::voip_subscribers::source_findby_uuid($source_dbs,$record->{uuid});
$record->{voip_numbers} = NGCP::BulkProcessor::Dao::mr103::billing::voip_numbers::source_findby_subscriberid($source_dbs,$record->{id});
$record->{primary_number} = NGCP::BulkProcessor::Dao::mr103::billing::voip_numbers::source_findby_id($source_dbs,$record->{primary_number_id});
#delete $record->{domain_id};
#delete $record->{primary_number_id};
#delete $record->{contract_id};
#delete $record->{id};
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,174 @@
package NGCP::BulkProcessor::Dao::mr103::openser::voicemail_users;
use strict;
## no critic
use threads::shared;
use Locale::Recode qw();
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_kamailio_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_customerid
);
my $tablename = 'voicemail_users';
my $get_db = \&get_kamailio_db;
my $expected_fieldnames = [
'uniqueid',
'customer_id',
'context',
'mailbox',
'password',
'fullname',
'email',
'pager',
'tz',
'attach',
'saycid',
'dialout',
'callback',
'review',
'operator',
'envelope',
'sayduration',
'saydurationm',
'sendvoicemail',
'delete',
'nextaftercmd',
'forcename',
'forcegreetings',
'hidefromdir',
'stamp',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_customerid {
my ($source_dbs,$uuid) = @_;
my $source_db = $source_dbs->{openser_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('customer_id') . ' = ?';
my @params = ($uuid);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
my $recoder = Locale::Recode->new( from => 'ISO-8859-1', to => 'UTF-8' );
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{openser_db},$row);
# transformations go here ...
foreach my $field (keys %$record) {
$record->{$field} = $recoder->recode($record->{$field}) if $record->{field};
}
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,119 @@
package NGCP::BulkProcessor::Dao::mr103::provisioning::voip_allowed_ip_groups;
use strict;
## no critic
use threads::shared;
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_group_id
);
my $tablename = 'voip_allowed_ip_groups';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'group_id',
'ipnet',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_group_id {
my ($source_dbs,$group_id) = @_;
my $source_db = $source_dbs->{provisioning_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('group_id') . ' = ?';
my @params = ($group_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{provisioning_db},$row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,128 @@
package NGCP::BulkProcessor::Dao::mr103::provisioning::voip_dbaliases;
use strict;
## no critic
use threads::shared;
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_subscriberid
);
my $tablename = 'voip_dbaliases';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'username',
'domain_id',
'subscriber_id',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_subscriberid {
my ($source_dbs,$subscriber_id) = @_;
my $source_db = $source_dbs->{provisioning_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('subscriber_id') . ' = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{provisioning_db},$row);
# transformations go here ...
#delete $record->{domain_id};
#delete $record->{subscriber_id};
#delete $record->{id};
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,132 @@
package NGCP::BulkProcessor::Dao::mr103::provisioning::voip_fax_destinations;
use strict;
## no critic
use threads::shared;
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_subscriberid
);
my $tablename = 'voip_fax_destinations';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'subscriber_id',
'destination',
'filetype',
'cc',
'incoming',
'outgoing',
'status',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_subscriberid {
my ($source_dbs,$subscriber_id) = @_;
my $source_db = $source_dbs->{provisioning_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('subscriber_id') . ' = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{provisioning_db},$row);
# transformations go here ...
#delete $record->{subscriber_id};
#delete $record->{id};
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,131 @@
package NGCP::BulkProcessor::Dao::mr103::provisioning::voip_fax_preferences;
use strict;
## no critic
use threads::shared;
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_subscriberid
);
my $tablename = 'voip_fax_preferences';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'subscriber_id',
'password',
'name',
'active',
'send_status',
'send_copy',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_subscriberid {
my ($source_dbs,$subscriber_id) = @_;
my $source_db = $source_dbs->{provisioning_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('subscriber_id') . ' = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs)->[0];
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{provisioning_db},$row);
# transformations go here ...
#delete $record->{subscriber_id};
#delete $record->{id};
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,73 @@
package NGCP::BulkProcessor::Dao::mr103::provisioning::voip_preferences;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
$ALLOWED_IPS_GRP_ATTRIBUTE
$MAN_ALLOWED_IPS_GRP_ATTRIBUTE
$NCOS_ID_ATTRIBUTE
);
my $tablename = 'voip_preferences';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'attribute',
'type',
'max_occur',
'modify_timestamp',
'internal',
];
my $indexes = {};
our $ALLOWED_IPS_GRP_ATTRIBUTE = 'allowed_ips_grp';
our $MAN_ALLOWED_IPS_GRP_ATTRIBUTE = 'man_allowed_ips_grp';
our $NCOS_ID_ATTRIBUTE = 'ncos_id';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,157 @@
package NGCP::BulkProcessor::Dao::mr103::provisioning::voip_subscribers;
use strict;
## no critic
use threads::shared;
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::mr103::provisioning::voip_dbaliases qw();
use NGCP::BulkProcessor::Dao::mr103::provisioning::voip_usr_preferences qw();
use NGCP::BulkProcessor::Dao::mr103::openser::voicemail_users qw();
use NGCP::BulkProcessor::Dao::mr103::provisioning::voip_fax_preferences qw();
use NGCP::BulkProcessor::Dao::mr103::provisioning::voip_fax_destinations qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_uuid
);
my $tablename = 'voip_subscribers';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'username',
'domain_id',
'uuid',
'password',
'timezone',
'admin',
'account_id',
'webusername',
'webpassword',
'autoconf_displayname',
'autoconf_group_id',
'modify_timestamp',
'create_timestamp',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_uuid {
my ($source_dbs,$uuid) = @_;
my $source_db = $source_dbs->{provisioning_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('uuid') . ' = ?';
my @params = ($uuid);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs)->[0];
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{provisioning_db},$row);
# transformations go here ...
$record->{voip_dbaliases} = NGCP::BulkProcessor::Dao::mr103::provisioning::voip_dbaliases::source_findby_subscriberid($source_dbs,$record->{id});
$record->{voip_usr_preferences} = NGCP::BulkProcessor::Dao::mr103::provisioning::voip_usr_preferences::source_findby_subscriberid($source_dbs,$record->{id});
$record->{voicemail_users} = NGCP::BulkProcessor::Dao::mr103::openser::voicemail_users::source_findby_customerid($source_dbs,$record->{uuid});
$record->{voip_fax_preferences} = NGCP::BulkProcessor::Dao::mr103::provisioning::voip_fax_preferences::source_findby_subscriberid($source_dbs,$record->{id});
$record->{voip_fax_destinations} = NGCP::BulkProcessor::Dao::mr103::provisioning::voip_fax_destinations::source_findby_subscriberid($source_dbs,$record->{id});
#delete $record->{account_id};
#delete $record->{autoconf_displayname};
#delete $record->{autoconf_group_id};
#delete $record->{domain_id};
#delete $record->{id};
push @records,$record;
}
}
return \@records;
}
1;

@ -0,0 +1,160 @@
package NGCP::BulkProcessor::Dao::mr103::provisioning::voip_usr_preferences;
use strict;
## no critic
use threads::shared;
use NGCP::BulkProcessor::Logging qw(
getlogger
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_provisioning_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::mr103::provisioning::voip_allowed_ip_groups qw();
use NGCP::BulkProcessor::Dao::mr103::provisioning::voip_preferences qw();
use NGCP::BulkProcessor::Dao::mr103::billing::ncos_levels qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
source_findby_subscriberid
$TRUE
$FALSE
);
#source_findby_attributesused
my $tablename = 'voip_usr_preferences';
my $get_db = \&get_provisioning_db;
my $expected_fieldnames = [
'id',
'subscriber_id',
'attribute_id',
'value',
'modify_timestamp',
];
my $indexes = {};
my $insert_unique_fields = [];
our $TRUE = 1;
our $FALSE = undef;
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo(shift // $get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
sub source_new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new_shared($class,shift,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub source_findby_subscriberid {
my ($source_dbs,$subscriber_id) = @_;
my $source_db = $source_dbs->{provisioning_db};
check_table($source_db);
my $db = &$source_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT v.*,a.attribute FROM ' . $table . ' v JOIN ' .
$db->tableidentifier('voip_preferences') . ' a ON v.attribute_id = a.id WHERE ' .
'v.subscriber_id = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return source_buildrecords_fromrows($rows,$source_dbs);
}
sub source_buildrecords_fromrows {
my ($rows,$source_dbs) = @_;
my @records : shared = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->source_new($source_dbs->{provisioning_db},$row);
# transformations go here ...
$record->{attribute} = $row->{attribute};
if ($record->{attribute} eq $NGCP::BulkProcessor::Dao::mr103::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE) {
my @allowed_ip_groups : shared = map { $_->{ipnet}; } @{NGCP::BulkProcessor::Dao::mr103::provisioning::voip_allowed_ip_groups::source_findby_group_id(
$source_dbs,$record->{value})};
$record->{allowed_ip_groups} = \@allowed_ip_groups;
#delete $record->{value};
}
if ($record->{attribute} eq $NGCP::BulkProcessor::Dao::mr103::provisioning::voip_preferences::MAN_ALLOWED_IPS_GRP_ATTRIBUTE) {
my @allowed_ip_groups : shared = map { $_->{ipnet}; } @{NGCP::BulkProcessor::Dao::mr103::provisioning::voip_allowed_ip_groups::source_findby_group_id(
$source_dbs,$record->{value})};
$record->{man_allowed_ip_groups} = \@allowed_ip_groups;
#delete $record->{value};
}
if ($record->{attribute} eq $NGCP::BulkProcessor::Dao::mr103::provisioning::voip_preferences::NCOS_ID_ATTRIBUTE) {
my %ncos : shared = ( %{NGCP::BulkProcessor::Dao::mr103::billing::ncos_levels::source_getuniquename($source_dbs,$record->{value})} );
$record->{ncos} = \%ncos;
#delete $record->{value};
}
#delete $record->{attribute_id};
#delete $record->{subscriber_id};
#delete $record->{id};
push @records,$record;
}
}
return \@records;
}
1;

@ -163,7 +163,6 @@ sub source_buildrecords_fromrows {
#$record->{provisioning_voip_subscriber} = NGCP::BulkProcessor::Dao::mr341::provisioning::voip_subscribers::source_findby_uuid($source_dbs,$record->{uuid});
push @records,$record;
}
}

@ -76,6 +76,7 @@ sub _extractlines {
foreach my $line (split(/$separator/,$$buffer_ref,-1)) {
$last_line = $line;
push(@$lines,$line);
#print $$buffer_ref;
}
#$count--;
$$buffer_ref = $last_line;

Loading…
Cancel
Save