TT#47531 process contracts/set profile package

tool to switch contracts with some billing profile
to a desired profile package, as specified in mappings.yml.

run like:

perl process.pl --task=cleanup --task=set_profile_package --skip-errors

Change-Id: Ie7c07642ed29d91166aad95dfb62a0e3e40a5e3f
(cherry picked from commit 2bd51c65c1)
changes/04/25204/1
Rene Krenn 7 years ago
parent c8f55bee6c
commit 54a9900adb

@ -0,0 +1,214 @@
package NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Api;
use strict;
## no critic
use threads::shared qw();
#use List::Util qw();
#use DateTime qw();
#use NGCP::BulkProcessor::Globals qw(
# $system_abbreviation
#);
use NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Settings qw(
$dry
$skip_errors
$mappings
$set_profile_package_multithreading
$set_profile_package_numofthreads
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
);
use NGCP::BulkProcessor::Utils qw(threadid);
use NGCP::BulkProcessor::Array qw(array_to_map);
use NGCP::BulkProcessor::RestRequests::Trunk::Customers qw();
use NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles qw();
use NGCP::BulkProcessor::RestRequests::Trunk::ProfilePackages qw();
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
set_profile_package
);
sub set_profile_package {
my $static_context = {};
my $result = _set_profile_package_init_context($static_context);
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::RestRequests::Trunk::Customers::process_items(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
foreach my $contract (@$records) {
next unless _set_profile_package_reset_context($context,$contract);
_update_contract($context);
}
return 1;
},
init_process_context_code => sub {
my ($context) = @_;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context) = @_;
#destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
load_recursive => 1,
multithreading => $set_profile_package_multithreading,
numofthreads => $set_profile_package_numofthreads,
),$warning_count);
}
sub _set_profile_package_reset_context {
my ($context,$contract) = @_;
my $result = 0;
$context->{contract} = $contract;
$context->{package} = undef;
if (exists $context->{mappings}->{$contract->{billing_profile_id}}
and (
not defined $contract->{profile_package_id}
or $contract->{profile_package_id} != $context->{mappings}->{$contract->{billing_profile_id}}
)) {
$result = 1;
$context->{package} = $context->{profile_package_map}->{$context->{mappings}->{$contract->{billing_profile_id}}};
}
return $result;
}
sub _set_profile_package_init_context {
my ($context) = @_;
my $result = 1;
my @billing_profiles = ();
foreach my $handle (keys %$mappings) {
rowprocessingerror(threadid(),"no profile package for billing profile '$handle'",getlogger(__PACKAGE__)) unless defined $mappings->{$handle};
my $billing_profile;
eval {
$billing_profile = NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::findby_handle(
$handle,#$reseller_id
)->[0];
};
if ($@ or not $billing_profile) {
rowprocessingerror(threadid(),"cannot find billing profile '$handle'",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
push(@billing_profiles,$billing_profile);
}
}
($context->{billing_profile_map},my $ids,my $vals) = array_to_map(
\@billing_profiles, sub { return shift->{id}; }, sub { return shift; }, 'first' );
my %profile_packages = ();
foreach my $name (values %$mappings) {
next if exists $profile_packages{$name};
my $profile_package;
eval {
$profile_package = NGCP::BulkProcessor::RestRequests::Trunk::ProfilePackages::findby_name(
$name,#$reseller_id
)->[0];
};
if ($@ or not $profile_package) {
rowprocessingerror(threadid(),"cannot find profile package '$name'",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
$profile_packages{$name} = $profile_package;
}
}
($context->{profile_package_map}, $ids, $vals) = array_to_map(
[ values %profile_packages ], sub { return shift->{id}; }, sub { return shift; }, 'first' );
$context->{mappings} = {
map { $_->{id} => $profile_packages{$mappings->{$_->{handle}}}->{id}; } @billing_profiles
};
return $result;
}
sub _update_contract {
my ($context) = @_;
my $result = 0;
my $contract_path = NGCP::BulkProcessor::RestRequests::Trunk::Customers::get_item_path(
$context->{contract}->{id});
eval {
my $customer;
if ($dry) {
$customer = NGCP::BulkProcessor::RestRequests::Trunk::Customers::get_item(
$context->{contract}->{id}
);
} else {
$customer = NGCP::BulkProcessor::RestRequests::Trunk::Customers::update_item(
$context->{contract}->{id},
{
billing_profile_definition => 'package',
profile_package_id => $context->{package}->{id},
},
);
}
$result = (defined $customer ? 1 : 0);
};
if ($@ or not $result) {
if ($skip_errors) {
_warn($context,'could not ' . ($dry ? 'fetch' : 'update') . ' ' . $contract_path);
} else {
_error($context,'could not ' . ($dry ? 'fetch' : 'update') . ' ' . $contract_path);
}
} else {
_info($context,$contract_path . ($dry ? ' fetched' : ' updated'));
}
return $result;
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -0,0 +1,147 @@
package NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Settings;
use strict;
## no critic
use NGCP::BulkProcessor::Globals qw(
$enablemultithreading
$cpucount
);
#$working_path
#create_path
use NGCP::BulkProcessor::Logging qw(
getlogger
scriptinfo
configurationinfo
);
use NGCP::BulkProcessor::LogError qw(
fileerror
configurationwarn
configurationerror
);
use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt stringtobool);
#format_number check_ipnet
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
update_mappings
check_dry
$defaultsettings
$defaultconfig
$mappings_yml
$mappings
$dry
$skip_errors
$force
$set_profile_package_multithreading
$set_profile_package_numofthreads
);
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $mappings_yml = undef;
our $mappings = {};
our $force = 0;
our $dry = 0;
our $skip_errors = 0;
our $set_profile_package_multithreading = $enablemultithreading;
our $set_profile_package_numofthreads = $cpucount;
sub update_settings {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
#my $regexp_result;
#&$configurationinfocode("testinfomessage",$configlogger);
#$result &= _prepare_working_paths(1);
$dry = $data->{dry} if exists $data->{dry};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$set_profile_package_multithreading = $data->{set_profile_package_multithreading} if exists $data->{set_profile_package_multithreading};
$set_profile_package_numofthreads = _get_numofthreads($cpucount,$data,'set_profile_package_numofthreads');
$mappings_yml = $data->{mappings_yml} if exists $data->{mappings_yml};
return $result;
}
return 0;
}
sub update_mappings {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
eval {
$mappings = $data->{'mappings'};
};
if ($@ or 'HASH' ne ref $mappings or (scalar keys %$mappings) == 0) {
$mappings //= {};
configurationerror($configfile,'no mappings found',getlogger(__PACKAGE__));
$result = 0;
}
return $result;
}
return 0;
}
sub check_dry {
if ($dry) {
scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__));
return 1;
} else {
scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__));
if (!$force) {
if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) {
return 1;
} else {
return 0;
}
} else {
scriptinfo('force option applied',getlogger(__PACKAGE__));
return 1;
}
}
}
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $_numofthreads = $default_value;
$_numofthreads = $data->{$key} if exists $data->{$key};
$_numofthreads = $cpucount if $_numofthreads > $cpucount;
return $_numofthreads;
}
1;

@ -0,0 +1,62 @@
##general settings:
working_path = /var/sipwise
cpucount = 4
enablemultithreading = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
#provisioning_conf = /etc/ngcp-panel/provisioning.conf
##NGCP MySQL connectivity - "accounting" db:
accounting_host = somehost
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = somehost
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = somehost
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = somehost
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = somehost
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://somehost:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = DEBUG
screenloglevel = INFO
emailloglevel = OFF

@ -0,0 +1,62 @@
##general settings:
working_path = /home/rkrenn/temp/soco
cpucount = 4
enablemultithreading = 1
##gearman/service listener config:
jobservers = 127.0.0.1:4730
#provisioning_conf = /etc/ngcp-panel/provisioning.conf
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.29
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.29
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.29
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.29
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.29
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = DEBUG
screenloglevel = DEBUG
emailloglevel = OFF

@ -0,0 +1,2 @@
mappings:
test_default_1541733070: 'test profile package 1541733434'

@ -0,0 +1,213 @@
use strict;
## no critic
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
use Getopt::Long qw(GetOptions);
use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Settings qw(
update_settings
update_mappings
check_dry
$defaultsettings
$defaultconfig
$mappings_yml
$dry
$skip_errors
$force
);
use NGCP::BulkProcessor::Logging qw(
init_log
getlogger
$attachmentlogfile
scriptinfo
cleanuplogfiles
$currentlogfile
);
use NGCP::BulkProcessor::LogError qw (
completion
done
scriptwarn
scripterror
filewarn
fileerror
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
$ANY_CONFIG_TYPE
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances qw();
#use NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Contracts qw(
# set_profile_package
#);
use NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Api qw(
set_profile_package
);
use NGCP::BulkProcessor::ConnectorPool qw(
destroy_dbs
);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
my $tasks = [];
my $set_profile_package_task_opt = 'set_profile_package';
push(@TASK_OPTS,$set_profile_package_task_opt);
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
if (init()) {
main();
exit(0);
} else {
exit(1);
}
sub init {
my $configfile = $defaultconfig;
my $settingsfile = $defaultsettings;
return 0 unless GetOptions(
"config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
my $result = load_config($configfile);
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
$result &= load_config($mappings_yml,\&update_mappings,$YAML_CONFIG_TYPE);
return $result;
}
sub main() {
my @messages = ();
my @attachmentfiles = ();
my $result = 1;
my $completion = 0;
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
foreach my $task (@$tasks) {
if (lc($set_profile_package_task_opt) eq lc($task)) {
if (taskinfo($set_profile_package_task_opt,$result)) {
next unless check_dry();
$result &= set_profile_package_task(\@messages);
$completion |= 1;
}
} elsif (lc($cleanup_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages) if taskinfo($cleanup_task_opt,$result);
} else {
$result = 0;
scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
last;
}
}
} else {
$result = 0;
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
}
push(@attachmentfiles,$attachmentlogfile);
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
return $result;
}
sub taskinfo {
my ($task,$result) = @_;
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
return $result;
}
sub cleanup_task {
my ($messages) = @_;
my $result = 0;
eval {
#cleanupcvsdirs() if $clean_generated;
#cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
#cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
#cleanupdir($rollback_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
if ($@ or !$result) {
push(@$messages,'cleanup INCOMPLETE');
return 0;
} else {
push(@$messages,'cleanup completed');
return 1;
}
}
sub set_profile_package_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = set_profile_package();
};
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
#eval {
#};
if ($err or !$result) {
push(@$messages,"set profile package INCOMPLETE$stats");
} else {
push(@$messages,"set profile package completed$stats");
}
destroy_dbs(); #every task should leave with closed connections.
return $result;
}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..
# destroy_all_dbs
#}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!

@ -0,0 +1,8 @@
#dry=0
#skip_errors=0
set_profile_package_numofthreads=2
set_profile_package_multithreading=1
mappings_yml = mappings.yml

@ -0,0 +1,8 @@
#dry=0
#skip_errors=0
set_profile_package_numofthreads=2
set_profile_package_multithreading=1
mappings_yml = mappings.yml

@ -620,7 +620,7 @@ sub get_defaultcollectionpagesize {
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub get_firscollectionpagenum {
sub get_firstcollectionpagenum {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}

@ -34,7 +34,7 @@ use NGCP::BulkProcessor::RestConnector qw(_add_headers convert_bools);
use NGCP::BulkProcessor::Calendar qw(get_fake_now_string);
use NGCP::BulkProcessor::Utils qw(makepath cleanupdir);
use NGCP::BulkProcessor::Utils qw(makepath cleanupdir booltostring);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::RestConnector);
@ -52,6 +52,7 @@ my $API_CERT_DIR = 'apicerts/';
my $API_CERT_FILENAME_FORMAT = '%s%s.pem';
my $default_collection_page_size = 10;
my $first_collection_page_num = 1;
my $first_page_num = 1;
my $contenttype = 'application/json';
@ -270,12 +271,16 @@ sub _add_delete_headers {
sub _get_page_num_query_param {
my $self = shift;
my ($page_num) = @_;
if (defined $page_num) {
$page_num += $first_page_num;
} else {
$page_num = $first_page_num;
if (defined $page_num and length($page_num) > 0) {
return 'page=' . $page_num;
}
return 'page=' . $page_num;
return undef;
#if (defined $page_num) {
# $page_num += $first_page_num;
#} else {
# $page_num = $first_page_num;
#}
#return (defined $page_num ? 'p=' . $page_num : undef);
}
sub _get_page_size_query_param {
@ -285,6 +290,12 @@ sub _get_page_size_query_param {
return 'size=' . $page_size;
}
sub _get_total_count_expected_query_param {
my $self = shift;
my ($total_count_expected) = @_;
return ($total_count_expected ? '' : 'no_count=' . booltostring(1));
}
sub extract_collection_items {
my $self = shift;
my ($data,$page_size,$page_num,$params) = @_;
@ -309,6 +320,11 @@ sub get_defaultcollectionpagesize {
return $default_collection_page_size;
}
sub get_firstcollectionpagenum {
my $self = shift;
return $first_collection_page_num;
}
sub _request_error {
my $self = shift;
my $msg = undef;

@ -227,8 +227,8 @@ sub process_collection {
while (1) {
fetching_items($restapi,$path_query,$i,$blocksize,getlogger(__PACKAGE__));
my $collection_page;
$collection_page = $restapi->get($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$headers) unless $post_data;
$collection_page = $restapi->post($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$post_data,$headers) if $post_data;
$collection_page = $restapi->get($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firstcollectionpagenum),$headers) unless $post_data;
$collection_page = $restapi->post($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firstcollectionpagenum),$post_data,$headers) if $post_data;
my $rowblock = $restapi->extract_collection_items($collection_page,$blocksize,$blockcount,$extract_collection_items_params);
my $realblocksize = scalar @$rowblock;
if ($realblocksize > 0) {
@ -306,8 +306,8 @@ sub _reader {
fetching_items($restapi,$context->{path_query},$i,$blocksize,getlogger(__PACKAGE__));
my $collection_page;
$collection_page = $restapi->get($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$context->{headers}) unless $context->{post_data};
$collection_page = $restapi->post($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$context->{post_data},$context->{headers}) if $context->{post_data};
$collection_page = $restapi->get($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firstcollectionpagenum),$context->{headers}) unless $context->{post_data};
$collection_page = $restapi->post($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firstcollectionpagenum),$context->{post_data},$context->{headers}) if $context->{post_data};
my $rowblock = $restapi->extract_collection_items($collection_page,$blocksize,$blockcount,$context->{extract_collection_items_params});
my $realblocksize = scalar @$rowblock;
my %packet :shared = ();

@ -25,6 +25,7 @@ our @EXPORT_OK = qw(
process_items
get_item_path
findby_resellerid
findby_handle
);
my $get_restapi = \&get_ngcp_restapi;
@ -42,6 +43,12 @@ my $findby_resellerid_path_query = sub {
$filters->{reseller_id} = $reseller_id if defined $reseller_id;
return 'api/' . $resource . '/' . get_query_string($filters);
};
my $findby_handle_path_query = sub {
my ($handle) = @_;
my $filters = {};
$filters->{handle} = $handle if defined $handle;
return 'api/' . $resource . '/' . get_query_string($filters);
};
my $fieldnames = [
'currency',
@ -93,6 +100,15 @@ sub findby_resellerid {
}
sub findby_handle {
my ($handle,$load_recursive,$headers) = @_;
my $restapi = &$get_restapi();
return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_handle_path_query($handle),$headers),undef,undef,
{ $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive);
}
sub create_item {
my ($data,$load,$load_recursive,$post_headers,$get_headers) = @_;

@ -9,7 +9,9 @@ use NGCP::BulkProcessor::ConnectorPool qw(
);
use NGCP::BulkProcessor::RestProcessor qw(
process_collection
copy_row
get_query_string
);
use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw();
@ -21,6 +23,7 @@ our @EXPORT_OK = qw(
get_item
create_item
set_item
process_items
update_item
get_item_path
);
@ -118,6 +121,45 @@ sub builditems_fromrows {
}
sub process_items {
my %params = @_;
my ($process_code,
$static_context,
$blocksize,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$load_recursive) = @params{qw/
process_code
static_context
blocksize
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
load_recursive
/};
return process_collection(
get_restapi => $get_restapi,
path_query => $collection_path_query . '?not_status=terminated&order_by=id&order_by_direction=asc',
headers => undef, #faketime,..
extract_collection_items_params => { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation },
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,builditems_fromrows($rowblock,$load_recursive),$row_offset);
},
static_context => $static_context,
blocksize => $blocksize,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
multithreading => $multithreading,
collectionprocessing_threads => $numofthreads,
);
}
sub get_item_path {
my ($id) = @_;

@ -10,6 +10,8 @@ use NGCP::BulkProcessor::ConnectorPool qw(
use NGCP::BulkProcessor::RestProcessor qw(
copy_row
process_collection
get_query_string
);
use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw();
@ -22,6 +24,7 @@ our @EXPORT_OK = qw(
create_item
set_item
update_item
process_items
get_item_path
$TERMINATED_STATE
@ -130,6 +133,45 @@ sub builditems_fromrows {
}
sub process_items {
my %params = @_;
my ($process_code,
$static_context,
$blocksize,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$load_recursive) = @params{qw/
process_code
static_context
blocksize
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
load_recursive
/};
return process_collection(
get_restapi => $get_restapi,
path_query => $collection_path_query . '?not_status=terminated&order_by=id&order_by_direction=asc',
headers => undef, #faketime,..
extract_collection_items_params => { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation },
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,builditems_fromrows($rowblock,$load_recursive),$row_offset);
},
static_context => $static_context,
blocksize => $blocksize,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
multithreading => $multithreading,
collectionprocessing_threads => $numofthreads,
);
}
sub get_item_path {
my ($id) = @_;

@ -0,0 +1,157 @@
package NGCP::BulkProcessor::RestRequests::Trunk::ProfilePackages;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_ngcp_restapi
);
use NGCP::BulkProcessor::RestProcessor qw(
copy_row
get_query_string
);
use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw();
use NGCP::BulkProcessor::RestItem qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::RestItem);
our @EXPORT_OK = qw(
get_item
create_item
get_item_path
findby_resellerid
findby_name
);
my $get_restapi = \&get_ngcp_restapi;
my $resource = 'profilepackages';
my $item_relation = 'ngcp:' . $resource;
my $get_item_path_query = sub {
my ($id) = @_;
return 'api/' . $resource . '/' . $id;
};
my $collection_path_query = 'api/' . $resource . '/';
my $findby_resellerid_path_query = sub {
my ($reseller_id) = @_;
my $filters = {};
$filters->{reseller_id} = $reseller_id if defined $reseller_id;
return 'api/' . $resource . '/' . get_query_string($filters);
};
my $findby_name_path_query = sub {
my ($name) = @_;
my $filters = {};
$filters->{name} = $name if defined $name;
return 'api/' . $resource . '/' . get_query_string($filters);
};
my $fieldnames = [
'balance_interval_start_mode',
'balance_interval_unit',
'balance_interval_value',
'carry_over_mode',
'description',
'initial_balance',
'initial_profiles',
'name',
'notopup_discard_intervals',
'reseller_id',
'service_charge',
'timely_duration_unit',
'timely_duration_value',
'topup_lock_level',
'topup_profiles',
'underrun_lock_level',
'underrun_lock_threshold',
'underrun_profile_threshold',
'underrun_profiles',
'id',
];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames);
copy_row($self,shift,$fieldnames);
return $self;
}
sub get_item {
my ($id,$load_recursive,$headers) = @_;
my $restapi = &$get_restapi();
return builditems_fromrows($restapi->get(&$get_item_path_query($id),$headers),$load_recursive);
}
sub findby_resellerid {
my ($reseller_id,$load_recursive,$headers) = @_;
my $restapi = &$get_restapi();
return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_resellerid_path_query($reseller_id),$headers),undef,undef,
{ $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive);
}
sub findby_name {
my ($name,$load_recursive,$headers) = @_;
my $restapi = &$get_restapi();
return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_name_path_query($name),$headers),undef,undef,
{ $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive);
}
sub create_item {
my ($data,$load,$load_recursive,$post_headers,$get_headers) = @_;
my $restapi = &$get_restapi();
if ($load) {
return builditems_fromrows($restapi->post_get($collection_path_query,$data,$post_headers,$get_headers),$load_recursive);
} else {
my ($id) = $restapi->post($collection_path_query,$data,$post_headers);
return $id;
}
}
sub builditems_fromrows {
my ($rows,$load_recursive) = @_;
my $item;
if (defined $rows and ref $rows eq 'ARRAY') {
my @items = ();
foreach my $row (@$rows) {
$item = __PACKAGE__->new($row);
# transformations go here ...
push @items,$item;
}
return \@items;
} elsif (defined $rows and ref $rows eq 'HASH') {
$item = __PACKAGE__->new($rows);
return $item;
}
return undef;
}
sub get_item_path {
my ($id) = @_;
return &$get_item_path_query($id);
}
1;
Loading…
Cancel
Save