diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/Api.pm b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/Api.pm new file mode 100644 index 0000000..af07f85 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/Api.pm @@ -0,0 +1,214 @@ +package NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Api; +use strict; + +## no critic + +use threads::shared qw(); +#use List::Util qw(); +#use DateTime qw(); + +#use NGCP::BulkProcessor::Globals qw( +# $system_abbreviation +#); + +use NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Settings qw( + $dry + $skip_errors + + $mappings + + $set_profile_package_multithreading + $set_profile_package_numofthreads + +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn +); + +use NGCP::BulkProcessor::Utils qw(threadid); +use NGCP::BulkProcessor::Array qw(array_to_map); + +use NGCP::BulkProcessor::RestRequests::Trunk::Customers qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::ProfilePackages qw(); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + set_profile_package +); + +sub set_profile_package { + + my $static_context = {}; + my $result = _set_profile_package_init_context($static_context); + + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::RestRequests::Trunk::Customers::process_items( + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + foreach my $contract (@$records) { + next unless _set_profile_package_reset_context($context,$contract); + _update_contract($context); + } + return 1; + }, + init_process_context_code => sub { + my ($context) = @_; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context) = @_; + #destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + load_recursive => 1, + multithreading => $set_profile_package_multithreading, + numofthreads => $set_profile_package_numofthreads, + ),$warning_count); +} + +sub _set_profile_package_reset_context { + + my ($context,$contract) = @_; + + my $result = 0; + + $context->{contract} = $contract; + + $context->{package} = undef; + if (exists $context->{mappings}->{$contract->{billing_profile_id}} + and ( + not defined $contract->{profile_package_id} + or $contract->{profile_package_id} != $context->{mappings}->{$contract->{billing_profile_id}} + )) { + $result = 1; + $context->{package} = $context->{profile_package_map}->{$context->{mappings}->{$contract->{billing_profile_id}}}; + } + + return $result; + +} + +sub _set_profile_package_init_context { + my ($context) = @_; + + my $result = 1; + my @billing_profiles = (); + foreach my $handle (keys %$mappings) { + rowprocessingerror(threadid(),"no profile package for billing profile '$handle'",getlogger(__PACKAGE__)) unless defined $mappings->{$handle}; + my $billing_profile; + eval { + $billing_profile = NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::findby_handle( + $handle,#$reseller_id + )->[0]; + }; + if ($@ or not $billing_profile) { + rowprocessingerror(threadid(),"cannot find billing profile '$handle'",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + push(@billing_profiles,$billing_profile); + } + } + ($context->{billing_profile_map},my $ids,my $vals) = array_to_map( + \@billing_profiles, sub { return shift->{id}; }, sub { return shift; }, 'first' ); + my %profile_packages = (); + foreach my $name (values %$mappings) { + next if exists $profile_packages{$name}; + my $profile_package; + eval { + $profile_package = NGCP::BulkProcessor::RestRequests::Trunk::ProfilePackages::findby_name( + $name,#$reseller_id + )->[0]; + }; + if ($@ or not $profile_package) { + rowprocessingerror(threadid(),"cannot find profile package '$name'",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + $profile_packages{$name} = $profile_package; + } + } + ($context->{profile_package_map}, $ids, $vals) = array_to_map( + [ values %profile_packages ], sub { return shift->{id}; }, sub { return shift; }, 'first' ); + $context->{mappings} = { + map { $_->{id} => $profile_packages{$mappings->{$_->{handle}}}->{id}; } @billing_profiles + }; + + return $result; +} + +sub _update_contract { + my ($context) = @_; + + my $result = 0; + my $contract_path = NGCP::BulkProcessor::RestRequests::Trunk::Customers::get_item_path( + $context->{contract}->{id}); + eval { + my $customer; + if ($dry) { + $customer = NGCP::BulkProcessor::RestRequests::Trunk::Customers::get_item( + $context->{contract}->{id} + ); + } else { + $customer = NGCP::BulkProcessor::RestRequests::Trunk::Customers::update_item( + $context->{contract}->{id}, + { + billing_profile_definition => 'package', + profile_package_id => $context->{package}->{id}, + }, + ); + } + $result = (defined $customer ? 1 : 0); + }; + if ($@ or not $result) { + if ($skip_errors) { + _warn($context,'could not ' . ($dry ? 'fetch' : 'update') . ' ' . $contract_path); + } else { + _error($context,'could not ' . ($dry ? 'fetch' : 'update') . ' ' . $contract_path); + } + } else { + _info($context,$contract_path . ($dry ? ' fetched' : ' updated')); + } + return $result; + +} + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid},$message,getlogger(__PACKAGE__)); + } +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/Settings.pm b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/Settings.pm new file mode 100755 index 0000000..d53eaad --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/Settings.pm @@ -0,0 +1,147 @@ +package NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Settings; +use strict; + +## no critic + +use NGCP::BulkProcessor::Globals qw( + $enablemultithreading + $cpucount +); +#$working_path +#create_path + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); +use NGCP::BulkProcessor::Utils qw(prompt stringtobool); +#format_number check_ipnet + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + update_mappings + check_dry + + $defaultsettings + $defaultconfig + + $mappings_yml + $mappings + + $dry + $skip_errors + $force + + $set_profile_package_multithreading + $set_profile_package_numofthreads + +); + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.cfg'; + +our $mappings_yml = undef; +our $mappings = {}; + +our $force = 0; +our $dry = 0; +our $skip_errors = 0; + +our $set_profile_package_multithreading = $enablemultithreading; +our $set_profile_package_numofthreads = $cpucount; + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + #my $regexp_result; + + #&$configurationinfocode("testinfomessage",$configlogger); + + #$result &= _prepare_working_paths(1); + + $dry = $data->{dry} if exists $data->{dry}; + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + + $set_profile_package_multithreading = $data->{set_profile_package_multithreading} if exists $data->{set_profile_package_multithreading}; + $set_profile_package_numofthreads = _get_numofthreads($cpucount,$data,'set_profile_package_numofthreads'); + + $mappings_yml = $data->{mappings_yml} if exists $data->{mappings_yml}; + + return $result; + + } + return 0; + +} + +sub update_mappings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + $mappings = $data->{'mappings'}; + }; + if ($@ or 'HASH' ne ref $mappings or (scalar keys %$mappings) == 0) { + $mappings //= {}; + configurationerror($configfile,'no mappings found',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + +sub check_dry { + + if ($dry) { + scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); + return 1; + } else { + scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); + if (!$force) { + if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { + return 1; + } else { + return 0; + } + } else { + scriptinfo('force option applied',getlogger(__PACKAGE__)); + return 1; + } + } + +} + +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $_numofthreads = $default_value; + $_numofthreads = $data->{$key} if exists $data->{$key}; + $_numofthreads = $cpucount if $_numofthreads > $cpucount; + return $_numofthreads; +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/config.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/config.cfg new file mode 100755 index 0000000..daa4492 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/config.cfg @@ -0,0 +1,62 @@ +##general settings: +working_path = /var/sipwise +cpucount = 4 +enablemultithreading = 0 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +#provisioning_conf = /etc/ngcp-panel/provisioning.conf + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = somehost +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = somehost +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = somehost +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = somehost +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = somehost +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://somehost:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/config.debug.cfg new file mode 100755 index 0000000..d4bd2f9 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/config.debug.cfg @@ -0,0 +1,62 @@ +##general settings: +working_path = /home/rkrenn/temp/soco +cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +#provisioning_conf = /etc/ngcp-panel/provisioning.conf + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.29 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.29 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.29 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.29 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.29 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = DEBUG +screenloglevel = DEBUG +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/mappings.yml b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/mappings.yml new file mode 100755 index 0000000..61305fb --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/mappings.yml @@ -0,0 +1,2 @@ +mappings: + test_default_1541733070: 'test profile package 1541733434' diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/process.pl b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/process.pl new file mode 100755 index 0000000..40eb4b0 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/process.pl @@ -0,0 +1,213 @@ +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Settings qw( + update_settings + update_mappings + check_dry + + $defaultsettings + $defaultconfig + $mappings_yml + $dry + $skip_errors + $force + +); + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); + +#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances qw(); + +#use NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Contracts qw( +# set_profile_package +#); + +use NGCP::BulkProcessor::Projects::ETL::SetProfilePackage::Api qw( + set_profile_package +); + +use NGCP::BulkProcessor::ConnectorPool qw( + destroy_dbs +); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet + +my @TASK_OPTS = (); + +my $tasks = []; + +my $set_profile_package_task_opt = 'set_profile_package'; +push(@TASK_OPTS,$set_profile_package_task_opt); + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + "dry" => \$dry, + "skip-errors" => \$skip_errors, + "force" => \$force, + ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); + $result &= load_config($mappings_yml,\&update_mappings,$YAML_CONFIG_TYPE); + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($set_profile_package_task_opt) eq lc($task)) { + if (taskinfo($set_profile_package_task_opt,$result)) { + next unless check_dry(); + $result &= set_profile_package_task(\@messages); + $completion |= 1; + } + + } elsif (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages) if taskinfo($cleanup_task_opt,$result); + + } else { + $result = 0; + scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub cleanup_task { + my ($messages) = @_; + my $result = 0; + eval { + #cleanupcvsdirs() if $clean_generated; + #cleanupdbfiles() if $clean_generated; + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + #cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + #cleanupdir($rollback_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + + if ($@ or !$result) { + push(@$messages,'cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'cleanup completed'); + return 1; + } +} + +sub set_profile_package_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = set_profile_package(); + }; + my $err = $@; + my $stats = ($skip_errors ? ": $warning_count warnings" : ''); + #eval { + + #}; + if ($err or !$result) { + push(@$messages,"set profile package INCOMPLETE$stats"); + } else { + push(@$messages,"set profile package completed$stats"); + } + destroy_dbs(); #every task should leave with closed connections. + return $result; + +} + +#END { +# # this should not be required explicitly, but prevents Log4Perl's +# # "rootlogger not initialized error upon exit.. +# destroy_all_dbs +#} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/settings.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/settings.cfg new file mode 100755 index 0000000..f7c42bf --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/settings.cfg @@ -0,0 +1,8 @@ + +#dry=0 +#skip_errors=0 + +set_profile_package_numofthreads=2 +set_profile_package_multithreading=1 + +mappings_yml = mappings.yml diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/settings.debug.cfg new file mode 100755 index 0000000..f7c42bf --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/SetProfilePackage/settings.debug.cfg @@ -0,0 +1,8 @@ + +#dry=0 +#skip_errors=0 + +set_profile_package_numofthreads=2 +set_profile_package_multithreading=1 + +mappings_yml = mappings.yml diff --git a/lib/NGCP/BulkProcessor/RestConnector.pm b/lib/NGCP/BulkProcessor/RestConnector.pm index 376bcb5..a2a27df 100644 --- a/lib/NGCP/BulkProcessor/RestConnector.pm +++ b/lib/NGCP/BulkProcessor/RestConnector.pm @@ -620,7 +620,7 @@ sub get_defaultcollectionpagesize { notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); } -sub get_firscollectionpagenum { +sub get_firstcollectionpagenum { my $self = shift; notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); } diff --git a/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm b/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm index 1429599..1045a39 100644 --- a/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm +++ b/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm @@ -34,7 +34,7 @@ use NGCP::BulkProcessor::RestConnector qw(_add_headers convert_bools); use NGCP::BulkProcessor::Calendar qw(get_fake_now_string); -use NGCP::BulkProcessor::Utils qw(makepath cleanupdir); +use NGCP::BulkProcessor::Utils qw(makepath cleanupdir booltostring); require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::RestConnector); @@ -52,6 +52,7 @@ my $API_CERT_DIR = 'apicerts/'; my $API_CERT_FILENAME_FORMAT = '%s%s.pem'; my $default_collection_page_size = 10; +my $first_collection_page_num = 1; my $first_page_num = 1; my $contenttype = 'application/json'; @@ -270,12 +271,16 @@ sub _add_delete_headers { sub _get_page_num_query_param { my $self = shift; my ($page_num) = @_; - if (defined $page_num) { - $page_num += $first_page_num; - } else { - $page_num = $first_page_num; + if (defined $page_num and length($page_num) > 0) { + return 'page=' . $page_num; } - return 'page=' . $page_num; + return undef; + #if (defined $page_num) { + # $page_num += $first_page_num; + #} else { + # $page_num = $first_page_num; + #} + #return (defined $page_num ? 'p=' . $page_num : undef); } sub _get_page_size_query_param { @@ -285,6 +290,12 @@ sub _get_page_size_query_param { return 'size=' . $page_size; } +sub _get_total_count_expected_query_param { + my $self = shift; + my ($total_count_expected) = @_; + return ($total_count_expected ? '' : 'no_count=' . booltostring(1)); +} + sub extract_collection_items { my $self = shift; my ($data,$page_size,$page_num,$params) = @_; @@ -309,6 +320,11 @@ sub get_defaultcollectionpagesize { return $default_collection_page_size; } +sub get_firstcollectionpagenum { + my $self = shift; + return $first_collection_page_num; +} + sub _request_error { my $self = shift; my $msg = undef; diff --git a/lib/NGCP/BulkProcessor/RestProcessor.pm b/lib/NGCP/BulkProcessor/RestProcessor.pm index 68812ac..8bb60e9 100644 --- a/lib/NGCP/BulkProcessor/RestProcessor.pm +++ b/lib/NGCP/BulkProcessor/RestProcessor.pm @@ -227,8 +227,8 @@ sub process_collection { while (1) { fetching_items($restapi,$path_query,$i,$blocksize,getlogger(__PACKAGE__)); my $collection_page; - $collection_page = $restapi->get($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$headers) unless $post_data; - $collection_page = $restapi->post($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$post_data,$headers) if $post_data; + $collection_page = $restapi->get($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firstcollectionpagenum),$headers) unless $post_data; + $collection_page = $restapi->post($restapi->get_collection_page_query_uri($path_query,$blocksize,$blockcount + $restapi->get_firstcollectionpagenum),$post_data,$headers) if $post_data; my $rowblock = $restapi->extract_collection_items($collection_page,$blocksize,$blockcount,$extract_collection_items_params); my $realblocksize = scalar @$rowblock; if ($realblocksize > 0) { @@ -306,8 +306,8 @@ sub _reader { fetching_items($restapi,$context->{path_query},$i,$blocksize,getlogger(__PACKAGE__)); my $collection_page; - $collection_page = $restapi->get($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$context->{headers}) unless $context->{post_data}; - $collection_page = $restapi->post($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firscollectionpagenum),$context->{post_data},$context->{headers}) if $context->{post_data}; + $collection_page = $restapi->get($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firstcollectionpagenum),$context->{headers}) unless $context->{post_data}; + $collection_page = $restapi->post($restapi->get_collection_page_query_uri($context->{path_query},$blocksize,$blockcount + $restapi->get_firstcollectionpagenum),$context->{post_data},$context->{headers}) if $context->{post_data}; my $rowblock = $restapi->extract_collection_items($collection_page,$blocksize,$blockcount,$context->{extract_collection_items_params}); my $realblocksize = scalar @$rowblock; my %packet :shared = (); diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm index 53dbd72..5827c6a 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm @@ -25,6 +25,7 @@ our @EXPORT_OK = qw( process_items get_item_path findby_resellerid + findby_handle ); my $get_restapi = \&get_ngcp_restapi; @@ -42,6 +43,12 @@ my $findby_resellerid_path_query = sub { $filters->{reseller_id} = $reseller_id if defined $reseller_id; return 'api/' . $resource . '/' . get_query_string($filters); }; +my $findby_handle_path_query = sub { + my ($handle) = @_; + my $filters = {}; + $filters->{handle} = $handle if defined $handle; + return 'api/' . $resource . '/' . get_query_string($filters); +}; my $fieldnames = [ 'currency', @@ -93,6 +100,15 @@ sub findby_resellerid { } +sub findby_handle { + + my ($handle,$load_recursive,$headers) = @_; + my $restapi = &$get_restapi(); + return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_handle_path_query($handle),$headers),undef,undef, + { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive); + +} + sub create_item { my ($data,$load,$load_recursive,$post_headers,$get_headers) = @_; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm index bc3f3a2..ad51448 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm @@ -9,7 +9,9 @@ use NGCP::BulkProcessor::ConnectorPool qw( ); use NGCP::BulkProcessor::RestProcessor qw( + process_collection copy_row + get_query_string ); use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(); @@ -21,6 +23,7 @@ our @EXPORT_OK = qw( get_item create_item set_item + process_items update_item get_item_path ); @@ -118,6 +121,45 @@ sub builditems_fromrows { } +sub process_items { + + my %params = @_; + my ($process_code, + $static_context, + $blocksize, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + blocksize + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + return process_collection( + get_restapi => $get_restapi, + path_query => $collection_path_query . '?not_status=terminated&order_by=id&order_by_direction=asc', + headers => undef, #faketime,.. + extract_collection_items_params => { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,builditems_fromrows($rowblock,$load_recursive),$row_offset); + }, + static_context => $static_context, + blocksize => $blocksize, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + multithreading => $multithreading, + collectionprocessing_threads => $numofthreads, + ); +} + sub get_item_path { my ($id) = @_; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm index 8aae001..0a812e6 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm @@ -10,6 +10,8 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::RestProcessor qw( copy_row + process_collection + get_query_string ); use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(); @@ -22,6 +24,7 @@ our @EXPORT_OK = qw( create_item set_item update_item + process_items get_item_path $TERMINATED_STATE @@ -130,6 +133,45 @@ sub builditems_fromrows { } +sub process_items { + + my %params = @_; + my ($process_code, + $static_context, + $blocksize, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + blocksize + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + return process_collection( + get_restapi => $get_restapi, + path_query => $collection_path_query . '?not_status=terminated&order_by=id&order_by_direction=asc', + headers => undef, #faketime,.. + extract_collection_items_params => { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,builditems_fromrows($rowblock,$load_recursive),$row_offset); + }, + static_context => $static_context, + blocksize => $blocksize, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + multithreading => $multithreading, + collectionprocessing_threads => $numofthreads, + ); +} + sub get_item_path { my ($id) = @_; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/ProfilePackages.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/ProfilePackages.pm new file mode 100644 index 0000000..259ffdc --- /dev/null +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/ProfilePackages.pm @@ -0,0 +1,157 @@ +package NGCP::BulkProcessor::RestRequests::Trunk::ProfilePackages; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_ngcp_restapi + +); + +use NGCP::BulkProcessor::RestProcessor qw( + copy_row + get_query_string +); + +use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(); +use NGCP::BulkProcessor::RestItem qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::RestItem); +our @EXPORT_OK = qw( + get_item + create_item + get_item_path + findby_resellerid + findby_name +); + +my $get_restapi = \&get_ngcp_restapi; +my $resource = 'profilepackages'; +my $item_relation = 'ngcp:' . $resource; +my $get_item_path_query = sub { + my ($id) = @_; + return 'api/' . $resource . '/' . $id; +}; +my $collection_path_query = 'api/' . $resource . '/'; + +my $findby_resellerid_path_query = sub { + my ($reseller_id) = @_; + my $filters = {}; + $filters->{reseller_id} = $reseller_id if defined $reseller_id; + return 'api/' . $resource . '/' . get_query_string($filters); +}; +my $findby_name_path_query = sub { + my ($name) = @_; + my $filters = {}; + $filters->{name} = $name if defined $name; + return 'api/' . $resource . '/' . get_query_string($filters); +}; + +my $fieldnames = [ + + 'balance_interval_start_mode', + 'balance_interval_unit', + 'balance_interval_value', + 'carry_over_mode', + 'description', + 'initial_balance', + 'initial_profiles', + 'name', + 'notopup_discard_intervals', + 'reseller_id', + 'service_charge', + 'timely_duration_unit', + 'timely_duration_value', + 'topup_lock_level', + 'topup_profiles', + 'underrun_lock_level', + 'underrun_lock_threshold', + 'underrun_profile_threshold', + 'underrun_profiles', + + 'id', +]; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::RestItem->new($class,$fieldnames); + + copy_row($self,shift,$fieldnames); + + return $self; + +} + +sub get_item { + + my ($id,$load_recursive,$headers) = @_; + my $restapi = &$get_restapi(); + return builditems_fromrows($restapi->get(&$get_item_path_query($id),$headers),$load_recursive); + +} + +sub findby_resellerid { + + my ($reseller_id,$load_recursive,$headers) = @_; + my $restapi = &$get_restapi(); + return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_resellerid_path_query($reseller_id),$headers),undef,undef, + { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive); + +} + +sub findby_name { + + my ($name,$load_recursive,$headers) = @_; + my $restapi = &$get_restapi(); + return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_name_path_query($name),$headers),undef,undef, + { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive); + +} + +sub create_item { + + my ($data,$load,$load_recursive,$post_headers,$get_headers) = @_; + my $restapi = &$get_restapi(); + if ($load) { + return builditems_fromrows($restapi->post_get($collection_path_query,$data,$post_headers,$get_headers),$load_recursive); + } else { + my ($id) = $restapi->post($collection_path_query,$data,$post_headers); + return $id; + } + +} + +sub builditems_fromrows { + + my ($rows,$load_recursive) = @_; + + my $item; + + if (defined $rows and ref $rows eq 'ARRAY') { + my @items = (); + foreach my $row (@$rows) { + $item = __PACKAGE__->new($row); + + # transformations go here ... + + push @items,$item; + } + return \@items; + } elsif (defined $rows and ref $rows eq 'HASH') { + $item = __PACKAGE__->new($rows); + return $item; + } + return undef; + +} + +sub get_item_path { + + my ($id) = @_; + return &$get_item_path_query($id); + +} + +1;