diff --git a/debian/control b/debian/control index 8873262..29db2c5 100644 --- a/debian/control +++ b/debian/control @@ -60,6 +60,8 @@ Depends: libxml-libxml-perl, libyaml-libyaml-perl, libio-compress-perl, + libsys-cpuaffinity-perl, + libdata-rmap-perl, perl, ${misc:Depends}, ${perl:Depends}, diff --git a/lib/NGCP/BulkProcessor/Globals.pm b/lib/NGCP/BulkProcessor/Globals.pm index ba2afb0..5c3ded1 100644 --- a/lib/NGCP/BulkProcessor/Globals.pm +++ b/lib/NGCP/BulkProcessor/Globals.pm @@ -119,6 +119,8 @@ our @EXPORT_OK = qw( @jobservers $jobnamespace + + @config_search_paths ); #set process umask for open and mkdir calls: @@ -126,11 +128,11 @@ umask 0000; # general constants our $system_name = 'Sipwise Bulk Processing Framework'; -our $VERSION = '0.0.1'; +our $VERSION = '1.0.1'; our $system_version = $VERSION; #keep this filename-save -our $system_abbreviation = 'sbpf'; #keep this filename-, dbname-save -our $system_instance = 'initial'; #'test'; #'2014'; #dbname-save 0-9a-z_ -our $system_instance_label = 'test'; +our $system_abbreviation = 'bulkprocessor'; #keep this filename-, dbname-save +our $system_instance = 'ngcp'; #'test'; #'2014'; #dbname-save 0-9a-z_ +our $system_instance_label = 'some node'; our $local_ip = get_ipaddress(); our $local_fqdn = get_hostfqdn(); @@ -224,7 +226,7 @@ our $emailloglevel = 'OFF'; #'INFO'; - +our @config_search_paths = ('/var/sipwise/'); # local db setup our $local_db_path = $working_path . 'db/'; diff --git a/lib/NGCP/BulkProcessor/LoadConfig.pm b/lib/NGCP/BulkProcessor/LoadConfig.pm index 9fd90c8..9d1830a 100644 --- a/lib/NGCP/BulkProcessor/LoadConfig.pm +++ b/lib/NGCP/BulkProcessor/LoadConfig.pm @@ -15,6 +15,7 @@ use NGCP::BulkProcessor::Globals qw( $enablemultithreading $is_perl_debug update_masterconfig + @config_search_paths ); use NGCP::BulkProcessor::Logging qw( @@ -54,32 +55,47 @@ our $YAML_CONFIG_TYPE = 2; our $ANY_CONFIG_TYPE = 3; #my $logger = getlogger(__PACKAGE__); +my $debug_config_ext_prefix = 'debug'; + sub load_config { my ($configfile,$process_code,$configtype,$configparser_args) = @_; my $is_master = 'CODE' ne ref $process_code; my $data; + my $variant = $configfile; if (defined $configfile) { - if (-e $configfile) { - $data = _parse_config($configfile,$configtype,$configparser_args); - } else { - my $relative_configfile = $executable_path . $configfile; - if (-e $relative_configfile) { - $configfile = $relative_configfile; - $data = _parse_config($configfile,$configtype,$configparser_args); + my @variants = (); + if ($is_perl_debug) { + push(@variants,_prefix_ext($configfile,$debug_config_ext_prefix)); + } + push(@variants,$configfile); + my %dupes = (); + while (not defined $data and ($variant = shift @variants)) { + next if exists $dupes{$variant}; + $dupes{$variant} = 1; + + if (-e $variant) { + $data = _parse_config($variant,$configtype,$configparser_args); } else { - configurationwarn($configfile,'no ' . ($is_master ? 'master config' : 'config') . ' file ' . $relative_configfile,getlogger(__PACKAGE__)); - $relative_configfile = $application_path . $configfile; - if (-e $relative_configfile) { - $configfile = $relative_configfile; - $data = _parse_config($configfile,$configtype,$configparser_args); - } else { - configurationerror($configfile,'no ' . ($is_master ? 'master config' : 'config') . ' file ' . $relative_configfile,getlogger(__PACKAGE__)); - return 0; + my @paths = (); + my %path_dupes = (); + my @search_paths = (@config_search_paths,$executable_path,$application_path); #todo: add /etc/bulkprocessor or similar here once + ($variant,$data) = _search_path($variant,$configtype,$configparser_args,\@search_paths,\@paths,\%path_dupes); + @search_paths = (); + if (not defined $data) { + if (index($executable_path,$application_path) > -1) { + my $module_path = 'NGCP/BulkProcessor/' . substr($executable_path,length($application_path)); + push(@search_paths,map { eval{ Cwd::abs_path($_ . '/') . '/' . $module_path; }; } @INC); + } + push(@search_paths,map { eval{ Cwd::abs_path($_ . '/') . '/'; }; } @INC); + ($variant,$data) = _search_path($variant,$configtype,$configparser_args,\@search_paths,\@paths,\%path_dupes); } } } + if (not defined $data) { + configurationerror($configfile,'no ' . ($is_master ? 'master config' : 'config') . ' variant found',getlogger(__PACKAGE__)); + } } else { fileerror('no ' . ($is_master ? 'master config' : 'config') . ' file specified',getlogger(__PACKAGE__)); return 0; @@ -88,7 +104,7 @@ sub load_config { if ($is_master) { my %context = ( data => $data, - configfile => $configfile, + configfile => $variant, split_tuplecode => \&split_tuple, format_numbercode => \&format_number, parse_regexpcode => \&parse_regexp, @@ -102,7 +118,7 @@ sub load_config { configlogger => getlogger(__PACKAGE__), ); my ($result,$loadconfig_args,$postprocesscode) = update_masterconfig(%context); - _splashinfo($configfile); + _splashinfo($variant); if (defined $loadconfig_args and 'ARRAY' eq ref $loadconfig_args) { foreach my $loadconfig_arg (@$loadconfig_args) { $result &= load_config(@$loadconfig_arg); @@ -113,13 +129,45 @@ sub load_config { } return $result; } else { - my $result = &$process_code($data,$configfile); - configurationinfo('config file ' . $configfile . ' loaded',getlogger(__PACKAGE__)); + my $result = &$process_code($data,$variant); + configurationinfo('config file ' . $variant . ' loaded',getlogger(__PACKAGE__)); return $result; } } +sub _prefix_ext { + my ($configfile,$ext_suffix) = @_; + return $configfile unless $ext_suffix; + if ($configfile =~ /\.([^\.]+)$/) { + $configfile =~ s/\.([^\.]+)$/.$ext_suffix.$1/; + } else { + $configfile .= '.' . $ext_suffix; + } + return $configfile; +} + +sub _search_path { + + my ($configfile,$configtype,$configparser_args,$search_paths,$paths,$dupes) = @_; + my $data = undef; + $dupes //= {}; + while (not defined $data and (my $path = shift @$search_paths)) { + next if exists $dupes->{$path}; + push(@$paths,$path); + $dupes->{$path} = 1; + my $relative_configfile = $path . $configfile; + if (-e $relative_configfile) { + $configfile = $relative_configfile; + $data = _parse_config($configfile,$configtype,$configparser_args); + #} else { + # configurationwarn($configfile,'no ' . ($is_master ? 'master config' : 'config') . ' file ' . $relative_configfile,getlogger(__PACKAGE__)); + } + } + return ($configfile,$data); + +} + sub _splashinfo { my ($configfile) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Api.pm b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Api.pm new file mode 100644 index 0000000..2570af5 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Api.pm @@ -0,0 +1,378 @@ +package NGCP::BulkProcessor::Projects::Massive::Generator::Api; +use strict; + +## no critic + +use threads::shared qw(); +#use List::Util qw(); +use Data::Rmap qw(); + +use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw( + $dry + $skip_errors +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn +); + +use NGCP::BulkProcessor::RestRequests::Trunk::SystemContacts qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::Contracts qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::BillingZones qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::BillingFees qw(); + +use NGCP::BulkProcessor::Utils qw(threadid); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + setup_provider +); + +my $t = time; +my %entity_maps = (); + +sub setup_provider { + my %params = @_; + my ( + $domain_name, + $reseller_name, + $subscriber_rates, + $provider_rate, + $type + ) = @params{qw/ + domain + reseller + subscriber_rates + provider_rate + type + /}; + my $provider = {}; + if (not _load_provider($provider,$reseller_name,$domain_name) and not $dry) { + $provider->{contact} = _create_systemcontact(); + _info("contact ID $provider->{contact}->{id} created"); + $provider->{contract} = _create_contract( + contact_id => $provider->{contact}->{id}, + billing_profile_id => 1, #default profile id + type => $type // 'reseller', + ); + _info("contract ID $provider->{contract}->{id} created"); + $provider->{reseller} = _create_reseller( + contract_id => $provider->{contract}->{id}, + name => $reseller_name, #"test ", + ); + _info("reseller '$reseller_name' created"); + if (defined $provider_rate) { + my $profile_fee = {}; + ($profile_fee->{profile}, + $profile_fee->{zone}, + $profile_fee->{fee}, + $profile_fee->{fees}) = _setup_fees($provider->{reseller}, + %$provider_rate + ); + $provider->{profile} = $profile_fee->{profile}; + $provider->{provider_fee} = $profile_fee; + $provider->{contract} = _update_contract( + id => $provider->{contract}->{id}, + billing_profile_id => $provider->{profile}->{id}, + ); + _info("contract ID $provider->{contract}->{id} updated"); + } + + $provider->{domain} = _create_domain( + reseller_id => $provider->{reseller}->{id}, + #domain => $domain_name.'.', + domain => $domain_name, + ); + _info("domain '$domain_name' created"); + $provider->{subscriber_fees} = []; + foreach my $rate (@$subscriber_rates) { + my $profile_fee = {}; + ($profile_fee->{profile}, + $profile_fee->{zone}, + $profile_fee->{fee}, + $profile_fee->{fees}) = _setup_fees($provider->{reseller}, + %$rate + ); + push(@{$provider->{subscriber_fees}},$profile_fee); + } + } + return $provider; +} + +sub _load_provider { + my ($provider,$reseller_name,$domain_name) = @_; + + $provider->{reseller} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Resellers', + name => $reseller_name, + ); + if (defined $provider->{reseller}) { + _info("reseller '$reseller_name' found"); + } else { + return 0; + } + + $provider->{contract} = NGCP::BulkProcessor::RestRequests::Trunk::Contracts::get_item($provider->{reseller}->{contract_id}); + if (defined $provider->{contract}) { + _info("contract ID $provider->{reseller}->{contract_id} found"); + } else { + return 0; + } + + $provider->{contact} = NGCP::BulkProcessor::RestRequests::Trunk::SystemContacts::get_item($provider->{contract}->{contact_id}); + if (defined $provider->{contact}) { + _info("contact ID $provider->{contract}->{contact_id} found"); + } else { + return 0; + } + + $provider->{domain} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Domains', + domain => $domain_name, + ); + if (defined $provider->{domain}) { + _info("domain '$domain_name' found"); + } else { + return 0; + } + + my $provider_profile = NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::get_item($provider->{contract}->{billing_profile_id}); + if (defined $provider_profile) { + _info("provider billing profile ID $provider_profile->{id} found"); + } + if (defined $provider_profile) { + my $profile_fee = {}; + ($profile_fee->{profile}, + $profile_fee->{zone}, + $profile_fee->{fee}, + $profile_fee->{fees}) = _load_fees($provider_profile); + $provider->{profile} = $profile_fee->{profile}; + $provider->{provider_fee} = $profile_fee; + } + + $provider->{subscriber_fees} = []; + foreach my $subscriber_profile (@{NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::findby_resellerid($provider->{reseller}->{id})}) { + next if (defined $provider_profile and $provider_profile->{id} == $subscriber_profile->{id}); + _info("subscriber billing profile ID $subscriber_profile->{id} found"); + my $profile_fee = {}; + ($profile_fee->{profile}, + $profile_fee->{zone}, + $profile_fee->{fee}, + $profile_fee->{fees}) = _load_fees($subscriber_profile); + push(@{$provider->{subscriber_fees}},$profile_fee); + } + + return 1; + +} + +sub _load_fees { + + my ($profile) = @_; + my $result = 1; + my $zone = NGCP::BulkProcessor::RestRequests::Trunk::BillingZones::findby_billingprofileid($profile->{id})->[0]; + $result &= defined $profile; + _info("billing zone ID $zone->{id} found") if $result; + my $fees = []; + $fees = NGCP::BulkProcessor::RestRequests::Trunk::BillingFees::findby_billingprofileid($profile->{id}) if $result; + foreach my $fee (@$fees) { + _info("billing fee ID $fee->{id} found"); + } + return ($profile,$zone,$fees->[0],$fees); + +} + +sub _setup_fees { + my ($reseller,%params) = @_; + my $prepaid = delete $params{prepaid}; + my $peaktime_weekdays = delete $params{peaktime_weekdays}; + my $peaktime_specials = delete $params{peaktime_special}; + my $interval_free_time = delete $params{interval_free_time}; + #my $interval_free_cash = delete $params{interval_free_cash}; + my $profile = _create_billing_profile( + reseller_id => $reseller->{id}, + (defined $prepaid ? (prepaid => $prepaid) : ()), + (defined $peaktime_weekdays ? (peaktime_weekdays => $peaktime_weekdays) : ()), + (defined $peaktime_specials ? (peaktime_special => $peaktime_specials) : ()), + (defined $interval_free_time ? (interval_free_time => $interval_free_time) : ()), + #(defined $interval_free_cash ? (interval_free_cash => $interval_free_cash) : ()), + ); + _info("billing profile ID $profile->{id} created"); + my $zone = _create_billing_zone( + billing_profile_id => $profile->{id}, + ); + _info("billing zone ID $profile->{id} created"); + my @fees = (); + if (exists $params{fees}) { + foreach my $fee (@{ $params{fees} }) { + push(@fees,_create_billing_fee( + billing_profile_id => $profile->{id}, + billing_zone_id => $zone->{id}, + %$fee, + )); + } + } else { + push(@fees,_create_billing_fee( + billing_profile_id => $profile->{id}, + billing_zone_id => $zone->{id}, + direction => "out", + destination => ".", + %params, + )); + } + return ($profile,$zone,$fees[0],\@fees); +} + +sub _create_systemcontact { + + return _create_entity('NGCP::BulkProcessor::RestRequests::Trunk::SystemContacts', + firstname => "syst_contact__first", + lastname => "syst_contact__last", + email => "syst_contact\@custcontact.invalid", + @_, + ); + +} + +sub _create_contract { + + return _create_entity('NGCP::BulkProcessor::RestRequests::Trunk::Contracts', + status => "active", + type => "reseller", + @_, + ); + +} + +sub _update_contract { + + return _update_entity('NGCP::BulkProcessor::RestRequests::Trunk::Contracts', + id => undef, + @_, + ); + +} + +sub _create_reseller { + + return _create_entity('NGCP::BulkProcessor::RestRequests::Trunk::Resellers', + status => "active", + name => "test ", + @_, + ); + +} + +sub _create_domain { + + return _create_entity('NGCP::BulkProcessor::RestRequests::Trunk::Domains', + domain => 'test__.example.org', + #reseller_id => $default_reseller_id, + @_, + ); + +} + +sub _create_billing_profile { + + return _create_entity('NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles', + name => "test ", + handle => "test__", + #reseller_id => $default_reseller_id, + @_, + ); + +} + +sub _create_billing_zone { + + return _create_entity('NGCP::BulkProcessor::RestRequests::Trunk::BillingZones', + zone => 'test', + detail => 'test ', + @_, + ); + +} + +sub _create_billing_fee { + + my $fee = _create_entity('NGCP::BulkProcessor::RestRequests::Trunk::BillingFees', + @_, + ); + _info("billing fee ID $fee->{id} created"); + return $fee; + +} + +sub _create_entity { + + my $class = shift; + my (@params) = @_; + my $map = _get_entity_map($class); + my $n = 1 + scalar keys %$map; + Data::Rmap::rmap { $_ =~ s//$n/ if defined $_; $_ =~ s//$n/ if defined $_; $_ =~ s//$t/ if defined $_; } @params; + no strict 'refs'; + my $entity = &{$class . '::create_item'}({@params},1); + $map->{$entity->{id}} = $entity; + return $entity; + +} + +sub _update_entity { + + my $class = shift; + my (@params) = @_; + my $map = _get_entity_map($class); + #my $n = 1 + scalar keys %$map; + Data::Rmap::rmap { $_ =~ s//$t/ if defined $_; } @params; + my $data = {@params}; + my $id = delete $data->{id}; + no strict 'refs'; + my $entity = &{$class . '::update_item'}($id,$data); + $entity->{id} = $id; + $map->{$id} = $entity; + return $entity; + +} + +sub _find_entity { + + my $class = shift; + my (@params) = @_; + foreach my $param (@params) { + return undef if $param =~ /||/; + } + no strict 'refs'; + return &{$class . '::get_item_filtered'}({@params}); + +} + +sub _get_entity_map { + my $class = shift; + if (!exists $entity_maps{$class}) { + $entity_maps{$class} = {}; + } + return $entity_maps{$class}; +} + + +sub _info { + + my ($message,$debug) = @_; + if ($debug) { + processing_debug(threadid(),$message,getlogger(__PACKAGE__)); + } else { + processing_info(threadid(),$message,getlogger(__PACKAGE__)); + } + +} + +1; \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Preferences.pm b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Preferences.pm new file mode 100644 index 0000000..9f0b240 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Preferences.pm @@ -0,0 +1,255 @@ +package NGCP::BulkProcessor::Projects::Massive::Generator::Preferences; +use strict; + +## no critic + +no strict 'refs'; + +use threads::shared qw(); +#use List::Util qw(); + +use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( + $dry + $skip_errors + +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn +); + +use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); + +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); + +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw(); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_xa_db +); + +use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw( + destroy_all_dbs +); + +use NGCP::BulkProcessor::Utils qw(threadid); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + + clear_subscriber_preferences + delete_subscriber_preference + set_subscriber_preference + get_subscriber_preference + + set_allowed_ips_preferences + cleanup_aig_sequence_ids + +); + + +sub cleanup_aig_sequence_ids { + my ($context) = @_; + eval { + $context->{db}->db_begin(); + if (NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::cleanup_ids($context->{db})) { + _info($context,'voip_aig_sequence cleaned up'); + } + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + if ($skip_errors) { + _warn($context,"database problem with voip_aig_sequence clean up: " . $err); + } else { + _error($context,"database problem with voip_aig_sequence clean up: " . $err); + } + } +} + + +sub set_allowed_ips_preferences { + + my ($context,$subscriber_id,$sip_username,$attribute,$allowed_ips) = @_; + + #my $subscriber_id = $context->{prov_subscriber}->{id} ; + #my $attribute = $context->{attributes}->{allowed_ips_grp}; + + my $allowed_ips_grp_attribute_preference = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid( + $context->{db},$subscriber_id,$attribute->{id})->[0]; + + my ($allowed_ip_group_id,$allowed_ip_group_preferrence_id); + + if (defined $allowed_ips_grp_attribute_preference) { + $allowed_ip_group_id = $allowed_ips_grp_attribute_preference->{value}; + $allowed_ip_group_preferrence_id = $allowed_ips_grp_attribute_preference->{id}; + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::delete_groupid($context->{db},$allowed_ip_group_id); + _info($context,"allowed ips group for subscriber $sip_username exists, ipnets deleted",1); + } else { + $allowed_ip_group_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::increment($context->{db}); + _info($context,"new allowed ips group id for subscriber $sip_username aquired",1); + } + + my $allowed_ips_grp_ipnet_ids = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::insert_rows($context->{db},$allowed_ip_group_id,$allowed_ips); + _info($context,"ipnets for allowed ips group for subscriber $sip_username created",1); + + if (not defined $allowed_ips_grp_attribute_preference) { + $allowed_ip_group_preferrence_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db}, + attribute_id => $attribute->{id}, + subscriber_id => $subscriber_id, + value => $allowed_ip_group_id, + ); + _info($context,"new allowed ips group preference value for subscriber $sip_username added",1); + } + + return ($allowed_ip_group_id,$allowed_ip_group_preferrence_id); + + #$context->{preferences}->{allowed_ips_grp} = { id => $allowed_ip_group_preferrence_id, $allowed_ip_group_id }; + +} + +my %get_preference_sub_names = ( + voip_usr_preferences => 'findby_subscriberid_attributeid', +); +my %preference_id_cols = ( + voip_usr_preferences => 'subscriber_id', +); + +sub clear_subscriber_preferences { + my ($context,$subscriber_id,$attribute,$except_value) = @_; + return _clear_preferences($context,'voip_usr_preferences',$subscriber_id,$attribute,$except_value); +} +sub delete_subscriber_preference { + my ($context,$subscriber_id,$attribute,$value) = @_; + return _delete_preference($context,'voip_usr_preferences',$subscriber_id,$attribute,$value); +} +sub set_subscriber_preference { + my ($context,$subscriber_id,$attribute,$value) = @_; + return _set_preference($context,'voip_usr_preferences',$subscriber_id,$attribute,$value); +} +sub get_subscriber_preference { + my ($context,$subscriber_id,$attribute) = @_; + return _get_preference($context,'voip_usr_preferences',$subscriber_id,$attribute); +} + +sub _clear_preferences { + my ($context,$pref_type,$id,$attribute,$except_value) = @_; + + return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db}, + $id, $attribute->{id}, defined $except_value ? { 'NOT IN' => $except_value } : undef); + +} + +sub _delete_preference { + my ($context,$pref_type,$id,$attribute,$value) = @_; + + return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db}, + $id, $attribute->{id}, { 'IN' => $value } ); + +} + +sub _set_preference { + my ($context,$pref_type,$id,$attribute,$value) = @_; + + if ($attribute->{max_occur} == 1) { + my $old_preferences = &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::' . $get_preference_sub_names{$pref_type}}($context->{db}, + $id,$attribute->{id}); + if (defined $value) { + if ((scalar @$old_preferences) == 1) { + &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::update_row'}($context->{db},{ + id => $old_preferences->[0]->{id}, + value => $value, + }); + return $old_preferences->[0]->{id}; + } else { + if ((scalar @$old_preferences) > 1) { + &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db}, + $id,$attribute->{id}); + } + return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::insert_row'}($context->{db}, + attribute_id => $attribute->{id}, + $preference_id_cols{$pref_type} => $id, + value => $value, + ); + } + } else { + &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db}, + $id,$attribute->{id}); + return undef; + } + } else { + if (defined $value) { + return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::insert_row'}($context->{db}, + attribute_id => $attribute->{id}, + $preference_id_cols{$pref_type} => $id, + value => $value, + ); + } else { + return undef; + } + } + +} + +sub _get_preference { + my ($context,$pref_type,$id,$attribute) = @_; + + my $preferences = &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::' . $get_preference_sub_names{$pref_type}}($context->{db}, + $id,$attribute->{id}); + + if ($attribute->{max_occur} == 1) { + return $preferences->[0]; + } else { + return $preferences; + } + +} + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid},$message,getlogger(__PACKAGE__)); + } +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Provisioning.pm b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Provisioning.pm new file mode 100644 index 0000000..93fa4db --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Provisioning.pm @@ -0,0 +1,1068 @@ +package NGCP::BulkProcessor::Projects::Massive::Generator::Provisioning; +use strict; + +## no critic + +use threads::shared qw(); +use Time::HiRes qw(sleep); +use String::MkPasswd qw(); +#use List::Util qw(); + +use Tie::IxHash; + +use NGCP::BulkProcessor::Globals qw( + $enablemultithreading + $cpucount +); + +use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw( + $dry + $skip_errors + $deadlock_retries + + $provision_subscriber_multithreading + $provision_subscriber_numofthreads + $provision_subscriber_count + $webpassword_length + $webusername_length + $sipusername_length + $sippassword_length + + @providers +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn + fileerror +); + +use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::products qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw(); + +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw(); + +use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw(); +use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw(); + +use NGCP::BulkProcessor::RestRequests::Trunk::Subscribers qw(); +use NGCP::BulkProcessor::RestRequests::Trunk::Customers qw(); + +use NGCP::BulkProcessor::Projects::Massive::Generator::Preferences qw( + set_subscriber_preference + get_subscriber_preference + clear_subscriber_preferences + delete_subscriber_preference + set_allowed_ips_preferences + cleanup_aig_sequence_ids +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_xa_db + ping_dbs + destroy_dbs +); + +use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp zerofill); # stringtobool check_ipnet trim); +#use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); +use NGCP::BulkProcessor::RandomString qw(createtmpstring); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + provision_subscribers + +); + +my $thread_sleep_secs = 0.1; + +my $RUNNING = 1; +my $COMPLETED = 2; +my $ERROR = 4; +my $STOP = 8; + +my $total_count :shared = 0; +my $db_lock :shared = undef; + +sub provision_subscribers { + + my $context = {}; + my $result = _provision_subscribers_create_context($context); + + destroy_dbs(); + if ($result) { + if ($enablemultithreading and $provision_subscriber_multithreading and $cpucount > 1) { + $context->{subscriber_count} = int($provision_subscriber_count / $provision_subscriber_numofthreads); + $context->{sn_increment} = $provision_subscriber_numofthreads; + my %processors = (); + for (my $i = 0; $i < $provision_subscriber_numofthreads; $i++) { + $context->{subscriber_count} += ($provision_subscriber_count - $context->{subscriber_count} * $provision_subscriber_numofthreads) if $i == 0; + _info($context,'starting generator thread ' . ($i + 1) . ' of ' . $provision_subscriber_numofthreads); + $context->{sn_offset} = $i; + my $processor = threads->create(\&_provision_subscriber,$context); + if (!defined $processor) { + _info($context,'generator thread ' . ($i + 1) . ' of ' . $provision_subscriber_numofthreads . ' NOT started'); + } + $processors{$processor->tid()} = $processor; + } + local $SIG{'INT'} = sub { + _info($context,"interrupt signal received"); + $result = 0; + lock $context->{errorstates}; + $context->{errorstates}->{$context->{tid}} = $STOP; + }; + while ((scalar keys %processors) > 0) { + foreach my $processor (values %processors) { + if (defined $processor and $processor->is_joinable()) { + $processor->join(); + delete $processors{$processor->tid()}; + _info($context,'generator thread tid ' . $processor->tid() . ' joined'); + } + } + sleep($thread_sleep_secs); + } + + $result &= (_get_threads_state($context->{errorstates},$context->{tid}) & $COMPLETED) == $COMPLETED; + + } else { + + $context->{subscriber_count} = $provision_subscriber_count; + $context->{sn_increment} = 1; + $context->{sn_offset} = 0; + local $SIG{'INT'} = sub { + _info($context,"interrupt signal received"); + $context->{errorstates}->{$context->{tid}} = $STOP; + }; + $result = _provision_subscriber($context); + + } + } + + return $result; +} + +sub _provision_subscriber { + + my $context = shift; + my $tid = threadid(); + { + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $RUNNING; + } + $context->{tid} = $tid; + $context->{db} = &get_xa_db(); + + my $subscriber_count = 0; + my $broadcast_state; + while (($broadcast_state = _get_threads_state($context->{errorstates})) == 0 + or + (($broadcast_state & $ERROR) == 0 + and ($broadcast_state & $STOP) == 0)) { + + last if $subscriber_count >= $context->{subscriber_count}; + $subscriber_count += 1; + + next unless _provision_susbcriber_init_context($context); + + my $retry = 1; + while ($retry > 0) { + eval { + $context->{db}->db_begin(); + #_info($context,"test" . $subscriber_count); + #die() if (($tid == 1 or $tid == 0) and $subscriber_count == 500); + _create_contact($context); + _create_contract($context); + { + #lock $db_lock; #concurrent writes to voip_numbers causes deadlocks + lock $total_count; + _create_subscriber($context); + _create_aliases($context); + $total_count += 1; + _info($context,"$total_count subscribers created",($total_count % 10) > 0); + } + # _update_preferences($context); + # _set_registrations($context); + # _set_callforwards($context); + # #todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it + # #in this main provisioning loop, or align it in separate run-modes, according to the files given. + # + # } else { + # _warn($context,(scalar @$existing_billing_voip_subscribers) . ' existing billing subscribers found, skipping'); + # } + + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + if ($err =~ /deadlock/gi and $retry < $deadlock_retries) { + my $sleep = 0.01 * 2**$retry; + _info($context,"retrying in $sleep secs"); + sleep($sleep); + $retry += 1; + } elsif (not $skip_errors) { + undef $context->{db}; + destroy_dbs(); + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $ERROR; + return 0; + } + } else { + $retry = 0; + } + } + } + undef $context->{db}; + destroy_dbs(); + if (($broadcast_state & $ERROR) == $ERROR) { + _info($context,"shutting down (error broadcast)"); + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $STOP; + return 0; + } elsif (($broadcast_state & $STOP) == $STOP) { + _info($context,"shutting down (stop broadcast)"); + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $STOP; + return 0; + } else { + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $COMPLETED; + return 1; + } + +} + +sub _provision_subscribers_create_context { + my ($context) = @_; + + my $result = 1; + + my %errorstates :shared = (); + my $tid = threadid(); + $context->{tid} = $tid; + $context->{now} = timestamp(); + $context->{errorstates} = \%errorstates; + { + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $RUNNING; + } + $context->{error_count} = 0; + $context->{warning_count} = 0; + + my $result = 1; + + #$context->{providers} + foreach my $provider (@providers) { + eval { + $provider->{domain}->{prov_domain} = + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains::findby_domain($provider->{domain}->{domain}); + }; + if ($@ or not $provider->{domain}->{prov_domain}) { + rowprocessingerror(threadid(),"cannot find provisioning domain '$provider->{domain}->{domain}'",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"provisioning domain '$provider->{domain}->{domain}' found",getlogger(__PACKAGE__)); + } + + my ($sn_min,$sn_max) = split(/[: -]+/,$provider->{sn},2); + my $sn_length = length($sn_min); + $sn_length = length($sn_max) if length($sn_max) > $sn_length; + if ($sn_length > 0 and $sn_max > $sn_min and $sn_min >= 0) { + my @sn_block = map { zerofill($_,$sn_length); } ($sn_min..$sn_max); + + $provider->{sn_block} = \@sn_block; + #$provider->{sn_block_size} = scalar @sn_block; + } else { + rowprocessingerror(threadid(),"invalid sn block definition for provider '$provider->{sn}'",getlogger(__PACKAGE__)); + } + } + + eval { + $context->{sip_account_product} = NGCP::BulkProcessor::Dao::Trunk::billing::products::findby_resellerid_handle(undef, + $NGCP::BulkProcessor::Dao::Trunk::billing::products::SIP_ACCOUNT_HANDLE)->[0]; + }; + if ($@ or not defined $context->{sip_account_product}) { + _error($context,"cannot find $NGCP::BulkProcessor::Dao::Trunk::billing::products::SIP_ACCOUNT_HANDLE product"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$NGCP::BulkProcessor::Dao::Trunk::billing::products::SIP_ACCOUNT_HANDLE product found"); + } + + $context->{attributes} = {}; + + eval { + $context->{attributes}->{allowed_clis} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_CLIS_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{allowed_clis}) { + _error($context,'cannot find allowed_clis attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"allowed_clis attribute found"); + } + + eval { + $context->{attributes}->{cli} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CLI_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{cli}) { + _error($context,'cannot find cli attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"cli attribute found"); + } + + eval { + $context->{attributes}->{ac} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::AC_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{ac}) { + _error($context,'cannot find ac attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"ac attribute found"); + } + + eval { + $context->{attributes}->{cc} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CC_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{cc}) { + _error($context,'cannot find cc attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"cc attribute found"); + } + + eval { + $context->{attributes}->{account_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ACCOUNT_ID_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{account_id}) { + _error($context,'cannot find account_id attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"account_id attribute found"); + } + + eval { + $context->{attributes}->{concurrent_max_total} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CONCURRENT_MAX_TOTAL_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{concurrent_max_total}) { + _error($context,'cannot find concurrent_max_total attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"concurrent_max_total attribute found"); + } + + eval { + $context->{attributes}->{clir} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CLIR_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{clir}) { + _error($context,'cannot find clir attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"clir attribute found"); + } + + eval { + $context->{attributes}->{allowed_ips_grp} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{allowed_ips_grp}) { + _error($context,'cannot find allowed_ips_grp attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"allowed_ips_grp attribute found"); + } + + eval { + $context->{attributes}->{adm_ncos_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ADM_NCOS_ID_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{adm_ncos_id}) { + _error($context,'cannot find adm_ncos_id attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"adm_ncos_id attribute found"); + } + + foreach my $cf_attribute (@NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CF_ATTRIBUTES) { + eval { + $context->{attributes}->{$cf_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute($cf_attribute); + }; + if ($@ or not defined $context->{attributes}->{$cf_attribute}) { + _error($context,"cannot find $cf_attribute attribute"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$cf_attribute attribute found"); + } + } + + eval { + $context->{attributes}->{ringtimeout} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::RINGTIMEOUT_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{ringtimeout}) { + _error($context,'cannot find ringtimeout attribute'); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"ringtimeout attribute found"); + } + + return $result; + +} + +#sub _check_ncos_level { +# my ($context,$resellername,$barring) = @_; +# my $result = 1; +# if ($barring ne $default_barring and not exists $barring_profiles->{$resellername}) { +# _error($context,"barring mappings for reseller $resellername missing"); +# $result = 0; #even in skip-error mode.. +# } elsif ($barring ne $default_barring and not exists $barring_profiles->{$resellername}->{$barring}) { +# _error($context,"mappings for barring '" . $barring . "' of reseller $resellername missing"); +# $result = 0; #even in skip-error mode.. +# } else { +# my $reseller_id = $context->{reseller_map}->{$resellername}->{id}; +# $context->{ncos_level_map}->{$reseller_id} = {} unless exists $context->{ncos_level_map}->{$reseller_id}; +# my $level = $barring_profiles->{$resellername}->{$barring}; +# unless (exists $context->{ncos_level_map}->{$reseller_id}->{$barring}) { +# if (not defined $level or length($level) == 0) { +# $context->{ncos_level_map}->{$reseller_id}->{$barring} = undef; +# } else { +# eval { +# $context->{ncos_level_map}->{$reseller_id}->{$barring} = NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels::findby_resellerid_level( +# $reseller_id,$level); +# }; +# if ($@ or not defined $context->{ncos_level_map}->{$reseller_id}->{$barring}) { +# my $err = "cannot find ncos level '$level' of reseller $resellername"; +# if (not defined $context->{_rowcount}) { +# if ($barring ne $default_barring) { +# _error($context,$err); +# $result = 0; #even in skip-error mode.. +# } else { +# rowprocessingwarn(threadid(),$err); +# } +# } elsif ($skip_errors) { +# _warn($context, $err); +# } else { +# _error($context, $err); +# $result = 0; #even in skip-error mode.. +# } +# } else { +# _info($context,"ncos level '$level' of reseller $resellername found"); +# } +# } +# } +# } +# return $result; +#} + +sub _create_contact { + + my ($context) = @_; + + $context->{contract}->{contact}->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::insert_row($context->{db}, + $context->{contract}->{contact}, + ); + $context->{contract}->{contact_id} = $context->{contract}->{contact}->{id}; + _info($context,"contact id $context->{contract}->{contact}->{id} created",1); + $context->{contract}->{contact_id} = $context->{contract}->{contact}->{id}; + + return 1; + +} + +sub _create_contract { + + my ($context) = @_; +# +# if ($context->{bill_subscriber}->{contract_id}) { +# #todo: the update case +# } else { +# #the insert case + $context->{contract}->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::insert_row($context->{db}, + $context->{contract} + ); + $context->{bill_subscriber}->{contract_id} = $context->{contract}->{id}; + $context->{prov_subscriber}->{account_id} = $context->{contract}->{id}; + + $context->{contract}->{billing_mapping_id} = NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings::insert_row($context->{db}, + billing_profile_id => $context->{billing_profile}->{id}, + contract_id => $context->{contract}->{id}, + product_id => $context->{sip_account_product}->{id}, + ); + + $context->{contract}->{contract_balance_id} = NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances::insert_row($context->{db}, + contract_id => $context->{contract}->{id}, + ); + + _info($context,"contract id $context->{contract}->{id} created",1); + #} + return 1; + +} + +sub _create_subscriber { + + my ($context) = @_; + +# my $result = 1; +# +# if ($context->{bill_subscriber}->{id}) { +# #todo: the update case +# } else { + #the insert case + $context->{bill_subscriber}->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::insert_row($context->{db}, + $context->{bill_subscriber}, + ); + + $context->{prov_subscriber}->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::insert_row($context->{db}, + $context->{prov_subscriber}, + ); + + my $number = $context->{numbers}->{primary}; + $context->{voip_numbers}->{primary} = NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::forupdate_cc_ac_sn_subscriberid($context->{db}, + $number->{cc}, + $number->{ac}, + $number->{sn}, + $context->{bill_subscriber}->{id}); + + if (defined $context->{voip_numbers}->{primary}) { + NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::update_row($context->{db},{ + id => $context->{voip_numbers}->{primary}->{id}, + reseller_id => $context->{reseller}->{id}, + subscriber_id => $context->{bill_subscriber}->{id}, + status => 'active', + }); + } else { + $context->{voip_numbers}->{primary}->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::insert_row($context->{db}, + cc => $number->{cc}, + ac => $number->{ac}, + sn => $number->{sn}, + reseller_id => $context->{reseller}->{id}, + subscriber_id => $context->{bill_subscriber}->{id}, + ); + } + + $context->{preferences}->{cli} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{cli}, + $number->{number}), value => $number->{number} }; + + NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::update_row($context->{db},{ + id => $context->{bill_subscriber}->{id}, + primary_number_id => $context->{voip_numbers}->{primary}->{id}, + }); + + _info($context,"subscriber uuid $context->{prov_subscriber}->{uuid} created",1); + + #primary alias + $context->{aliases}->{primary}->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::insert_row($context->{db}, + domain_id => $context->{prov_subscriber}->{domain_id}, + subscriber_id => $context->{prov_subscriber}->{id}, + username => $number->{number}, + ); + + my @allowed_clis = (); + push(@allowed_clis,{ id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{allowed_clis}, + $number->{number}), value => $number->{number}}); + $context->{preferences}->{allowed_clis} = \@allowed_clis; + + NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::release_subscriber_numbers($context->{db}, + $context->{bill_subscriber}->{id},{ 'NOT IN' => $context->{voip_numbers}->{primary}->{id} }); + + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::delete_dbaliases($context->{db}, + $context->{prov_subscriber}->{id},{ 'NOT IN' => $number->{number} }); + + clear_subscriber_preferences($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{allowed_clis}, + $number->{number}); + + _info($context,"primary alias $number->{number} created",1); + + $context->{voicemail_user}->{id} = NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users::insert_row($context->{db}, + $context->{voicemail_user}, + ); + + $context->{preferences}->{account_id} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{account_id}, + $context->{contract}->{id}), value => $context->{contract}->{id} }; + + if (length($number->{ac}) > 0) { + $context->{preferences}->{ac} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{ac}, + $number->{ac}), value => $number->{ac} }; + } + if (length($number->{cc}) > 0) { + $context->{preferences}->{cc} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{cc}, + $number->{cc}), value => $number->{cc} }; + } + +# } + + #return $result; + return 1; + +} + +#sub _update_preferences { +# +# my ($context) = @_; +# +# my $result = 1; +# +# if (defined $context->{channels}) { +# $context->{preferences}->{concurrent_max_total} = { id => set_subscriber_preference($context, +# $context->{prov_subscriber}->{id}, +# $context->{attributes}->{concurrent_max_total}, +# $context->{channels}), value => $context->{channels} }; +# _info($context,"concurrent_max_total preference set to $context->{channels}",1); +# } +# +# if ($context->{clir}) { +# $context->{preferences}->{clir} = { id => set_subscriber_preference($context, +# $context->{prov_subscriber}->{id}, +# $context->{attributes}->{clir}, +# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE), value => $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE }; +# _info($context,"clir preference set to $context->{clir}",1); +# } +# +# if ((scalar @{$context->{allowed_ips}}) > 0) { +# my ($allowed_ip_group_preferrence_id, $allowed_ip_group_id) = set_allowed_ips_preferences($context, +# $context->{prov_subscriber}->{id}, +# $context->{prov_subscriber}->{username}, +# $context->{attributes}->{allowed_ips_grp}, +# $context->{allowed_ips}, +# ); +# $context->{preferences}->{allowed_ips_grp} = { id => $allowed_ip_group_preferrence_id, value => $allowed_ip_group_id }; +# _info($context,"allowed_ips_grp preference set to $allowed_ip_group_id - " . join(',',@{$context->{allowed_ips}}),1); +# } +# +# if (defined $context->{ncos_level}) { +# $context->{preferences}->{adm_ncos_id} = { id => set_subscriber_preference($context, +# $context->{prov_subscriber}->{id}, +# $context->{attributes}->{adm_ncos_id}, +# $context->{ncos_level}->{id}), value => $context->{ncos_level}->{id} }; +# _info($context,"adm_ncos_id preference set to $context->{ncos_level}->{id} - $context->{ncos_level}->{level}",1); +# } +# +# +# +# return $result; +# +#} + +sub _create_aliases { + + my ($context) = @_; +# my $result = 1; + + if ((scalar @{$context->{numbers}->{other}}) > 0) { + + my @voip_number_ids = (); + my @usernames = (); + + foreach my $number (@{$context->{numbers}->{other}}) { + + my $voip_number = NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::forupdate_cc_ac_sn_subscriberid($context->{db}, + $number->{cc}, + $number->{ac}, + $number->{sn}, + $context->{bill_subscriber}->{id}); + + if (defined $voip_number) { + NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::update_row($context->{db},{ + id => $voip_number->{id}, + reseller_id => $context->{reseller}->{id}, + subscriber_id => $context->{bill_subscriber}->{id}, + status => 'active', + }); + } else { + $voip_number->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::insert_row($context->{db}, + cc => $number->{cc}, + ac => $number->{ac}, + sn => $number->{sn}, + reseller_id => $context->{reseller}->{id}, + subscriber_id => $context->{bill_subscriber}->{id}, + ); + } + + push(@{$context->{voip_numbers}->{other}}, $voip_number); + push(@voip_number_ids, $voip_number->{id}); + + my $alias; + if ($alias = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberid_username($context->{db}, + $context->{prov_subscriber}->{id}, + $number->{number}, + )->[0]) { + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::update_row($context->{db},{ + id => $alias->{id}, + is_primary => '0', + }); + $alias->{is_primary} = '0'; + } else { + $alias->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::insert_row($context->{db},{ + domain_id => $context->{prov_subscriber}->{domain_id}, + subscriber_id => $context->{prov_subscriber}->{id}, + is_primary => '0', + username => $number->{number}, + }); + } + + push(@{$context->{aliases}->{other}},$alias); + push(@usernames,$number->{number}); + + delete_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{allowed_clis}, + $number->{number}); + push(@{$context->{preferences}->{allowed_clis}},{ id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{allowed_clis}, + $number->{number}), value => $number->{number}}); + + _info($context,"alias $number->{number} created",1); + } + + push(@voip_number_ids,$context->{voip_numbers}->{primary}->{id}); + push(@usernames,$context->{numbers}->{primary}->{number}); + + NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::release_subscriber_numbers($context->{db}, + $context->{bill_subscriber}->{id},{ 'NOT IN' => \@voip_number_ids }); + + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::delete_dbaliases($context->{db},$context->{prov_subscriber}->{id}, + { 'NOT IN' => \@usernames }); + + clear_subscriber_preferences($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{allowed_clis}, + \@usernames ); + + #test: + #my $allowed_clis = get_subscriber_preference($context, + # $context->{prov_subscriber}->{id}, + # $context->{attributes}->{allowed_clis}); + + #my $voip_numbers = NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::findby_subscriberid($context->{db}, + # $context->{bill_subscriber}->{id}); + + #my $aliases = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberid_username($context->{db}, + # $context->{prov_subscriber}->{id},undef); + + #_info($context,(scalar @{$context->{numbers}->{other}}) . " aliases created: " . join(',',(map { $_->{number}; } @{$context->{numbers}->{other}}))); + } + return 1; +} + +#sub _set_registrations { +# +# my ($context) = @_; +# my $result = 1; +# foreach my $registration (@{$context->{registrations}}) { +# #print "blah"; +# $registration->{id} = NGCP::BulkProcessor::Dao::Trunk::kamailio::location::insert_row($context->{db}, +# %$registration); +# _info($context,"permanent registration $registration->{contact} added",1); +# } +# foreach my $trusted_source (@{$context->{trusted_sources}}) { +# #print "blah"; +# $trusted_source->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources::insert_row($context->{db},{ +# %$trusted_source, +# subscriber_id => $context->{prov_subscriber}->{id}, +# uuid => $context->{prov_subscriber}->{uuid}, +# }); +# _info($context,"trusted source $trusted_source->{protocol} $trusted_source->{src_ip} from $trusted_source->{from_pattern} added",1); +# } +# return $result; +# +#} + +#sub _set_callforwards { +# +# my ($context) = @_; +# my $result = 1; +# foreach my $type (keys %{$context->{callforwards}}) { +# #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw(); +# #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw(); +# +# my $destination_set_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets::insert_row($context->{db},{ +# subscriber_id => $context->{prov_subscriber}->{id}, +# name => "quickset_$type", +# }); +# foreach my $callforward (@{$context->{callforwards}->{$type}}) { +# $callforward->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations::insert_row($context->{db},{ +# %$callforward, +# destination_set_id => $destination_set_id, +# }); +# } +# my $cf_mapping_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::insert_row($context->{db},{ +# subscriber_id => $context->{prov_subscriber}->{id}, +# type => $type, +# destination_set_id => $destination_set_id, +# #time_set_id +# }); +# +# $context->{preferences}->{$type} = { id => set_subscriber_preference($context, +# $context->{prov_subscriber}->{id}, +# $context->{attributes}->{$type}, +# $cf_mapping_id), value => $cf_mapping_id }; +# +# if (defined $context->{ringtimeout}) { +# $context->{preferences}->{ringtimeout} = { id => set_subscriber_preference($context, +# $context->{prov_subscriber}->{id}, +# $context->{attributes}->{ringtimeout}, +# $context->{ringtimeout}), value => $context->{ringtimeout} }; +# } +# _info($context,"$type created (destination(s) " . join(', ',(map { $_->{destination}; } @{$context->{callforwards}->{$type}})) . ")",1); +# +# $context->{callforwards}->{$type} = { +# destination_set => { +# destinations => $context->{callforwards}->{$type}, +# id => $destination_set_id, +# }, +# id => $cf_mapping_id, +# }; +# } +# return $result; +# +#} + +sub _provision_susbcriber_init_context { + + my ($context) = @_; + + my $result = 1; + + my $provider = $providers[rand @providers]; + + $context->{domain} = $provider->{domain}; + $context->{reseller} = $provider->{reseller}; + + { + my @fees = @{$provider->{subscriber_fees}}; + $context->{billing_profile} = $fees[rand @fees]->{profile}; + } + + $context->{prov_subscriber} = {}; + $context->{prov_subscriber}->{username} = _generate_sipusername(); + $context->{prov_subscriber}->{password} = _generate_sippassword(); + $context->{prov_subscriber}->{webusername} = _generate_webusername(); + $context->{prov_subscriber}->{webpassword} = _generate_webpassword(); + + $context->{prov_subscriber}->{uuid} = create_uuid(); + $context->{prov_subscriber}->{domain_id} = $context->{domain}->{prov_domain}->{id}; + + $context->{bill_subscriber} = {}; + $context->{bill_subscriber}->{username} = $context->{prov_subscriber}->{username}; + $context->{bill_subscriber}->{domain_id} = $context->{domain}->{id}; + $context->{bill_subscriber}->{uuid} = $context->{prov_subscriber}->{uuid}; + + $context->{contract} = { + external_id => undef, #xxx$subscriber->{customer_id}, + create_timestamp => $context->{now}, + contact => { + reseller_id => $context->{reseller}->{id}, + +# firstname => xx$subscriber->{first_name}, +# lastname => $subscriber->{last_name}, +# compregnum => $subscriber->{company_registration_number}, +# company => $subscriber->{company}, +# street => $subscriber->{street}, +# postcode => $subscriber->{postal_code}, +# city => $subscriber->{city_name}, +# #country => $context->{contract}->{contact}->{country}, +# phonenumber => $subscriber->{phone_number}, +# email => $subscriber->{email}, +# vatnum => $subscriber->{vat_number}, +# #$contact_hash_field => $subscriber->{contact_hash}, + }, + }; + + $context->{channels} = undef; #$default_channels; + $context->{allowed_ips} = []; #[ keys %allowed_ips ]; + $context->{ncos_level} = undef; + + my @numbers = (); + foreach (1..($provider->{numbers_per_subscriber} // 1)) { + my $number = {}; + my @cc = @{$provider->{cc}}; + $number->{cc} = $cc[rand @cc]; + my @ac = @{$provider->{ac}}; + $number->{ac} = $ac[rand @ac]; + $number->{sn} = $provider->{sn_block}->[$context->{sn_offset}]; + $context->{sn_offset} += $context->{sn_increment}; + $number->{number} = ($number->{cc} // '') . ($number->{ac} // '') . ($number->{sn} // ''); + push(@numbers,$number); + } + + $context->{numbers} = {}; + $context->{numbers}->{other} = \@numbers; #sort_by_configs(\@numbers,[ +# { numeric => 1, +# dir => 1, #-1, +# memberchain => [ 'additional' ], +# }, +# { numeric => 0, +# dir => 1, #-1, +# memberchain => [ 'cc' ], +# }, +# { numeric => 0, +# dir => 1, #-1, +# memberchain => [ 'ac' ], +# }, +# { numeric => 0, +# dir => 1, #-1, +# memberchain => [ 'sn' ], +# }, +# ]); + $context->{numbers}->{primary} = shift(@{$context->{numbers}->{other}}); + #return 0 unless scalar @{$context->{numbers}->{other}}; + + $context->{voip_numbers} = {}; + $context->{voip_numbers}->{primary} = undef; + $context->{voip_numbers}->{other} = []; + $context->{aliases} = {}; + $context->{aliases}->{primary} = undef; + $context->{aliases}->{other} = []; + + $context->{voicemail_user} = {}; + $context->{voicemail_user}->{customer_id} = $context->{prov_subscriber}->{uuid}; + $context->{voicemail_user}->{mailbox} = $context->{numbers}->{primary}->{number}; + $context->{voicemail_user}->{password} = sprintf("%04d", int(rand 10000)); + + $context->{preferences} = {}; + $context->{clir} = 0; + + $context->{ringtimeout} = undef; + my %cfsimple = (); +# push(@{$cfsimple{$type}},{ +# destination => $callforward->{destination}, +# priority => $callforward->{priority}, +# timeout => $callforward->{timeout}, +# }); +# + $context->{callforwards} = \%cfsimple; + + my @registrations = (); + my @trusted_sources = (); +# push(@registrations,{ +# username => $registration->{sip_username}, +# domain => $registration->{domain}, +# contact => 'sip:' . $registration->{sip_contact}, +# ruid => NGCP::BulkProcessor::Dao::Trunk::kamailio::location::next_ruid(), +# }); +# if ($registration->{sip_contact} =~ /(\d{0,3}\.\d{0,3}\.\d{0,3}\.\d{0,3})/) { +# if (check_ipnet($1)) { +# push(@trusted_sources,{ +# src_ip => $1, +# protocol => $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources::PROTOCOL_UDP, +# from_pattern => 'sip:.+' . quotemeta($context->{domain}->{domain}), +# }); +# + $context->{registrations} = \@registrations; + $context->{trusted_sources} = \@trusted_sources; + + return $result; + +} + +sub _generate_webpassword { + return String::MkPasswd::mkpasswd( + -length => $webpassword_length, + -minnum => 1, -minlower => 1, -minupper => 1, -minspecial => 1, + -distribute => 1, -fatal => 1, + ); +} + +sub _generate_sippassword { + return createtmpstring($sippassword_length); +} + +sub _generate_webusername { + return createtmpstring($webusername_length); +} + +sub _generate_sipusername { + return createtmpstring($sipusername_length); +} + +sub _get_threads_state { + my ($errorstates,$tid) = @_; + my $result = 0; + if (defined $errorstates and ref $errorstates eq 'HASH') { + lock $errorstates; + foreach my $threadid (keys %$errorstates) { + if (not defined $tid or $threadid != $tid) { + $result |= $errorstates->{$threadid}; + } + } + } + return $result; +} + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid},$message,getlogger(__PACKAGE__)); + } +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Settings.pm new file mode 100644 index 0000000..0cbd961 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Settings.pm @@ -0,0 +1,198 @@ +package NGCP::BulkProcessor::Projects::Massive::Generator::Settings; +use strict; + +## no critic + +use NGCP::BulkProcessor::Globals qw( + $working_path + $enablemultithreading + $cpucount + create_path +); + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + filewarn + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); +use NGCP::BulkProcessor::Utils qw(prompt timestampdigits); +#format_number check_ipnet + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + update_provider_config + check_dry + + $input_path + $output_path + + $defaultsettings + $defaultconfig + + $dry + $skip_errors + $force + $deadlock_retries + + $provision_subscriber_multithreading + $provision_subscriber_numofthreads + $provision_subscriber_count + $webpassword_length + $webusername_length + $sippassword_length + $sipusername_length + + @provider_config + @providers + $providers_yml + +); + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.cfg'; + +our $input_path = $working_path . 'input/'; +our $output_path = $working_path . 'output/'; + +our $force = 0; +our $dry = 0; +our $skip_errors = 0; + +our $deadlock_retries = 8; + +our $provision_subscriber_multithreading = $enablemultithreading; +our $provision_subscriber_numofthreads = $cpucount; +our $webpassword_length = 8; +our $webusername_length = 8; +our $sippassword_length = 16; +our $sipusername_length = 8; +our $provision_subscriber_count = 0; + +our @provider_config = (); +our @providers = (); +our $providers_yml = undef; + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + #&$configurationinfocode("testinfomessage",$configlogger); + + $result &= _prepare_working_paths(1); + #if ($data->{report_filename}) { + # $report_filename = $output_path . sprintf('/' . $data->{report_filename},timestampdigits()); + # if (-e $report_filename and (unlink $report_filename) == 0) { + # filewarn('cannot remove ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__)); + # $report_filename = undef; + # } + #} else { + # $report_filename = undef; + #} + + $dry = $data->{dry} if exists $data->{dry}; + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + + $provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading}; + $provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads'); + $webpassword_length = $data->{webpassword_length} if exists $data->{webpassword_length}; + $webusername_length = $data->{webusername_length} if exists $data->{webusername_length}; + $sippassword_length = $data->{sippassword_length} if exists $data->{sippassword_length}; + $sipusername_length = $data->{sipusername_length} if exists $data->{sipusername_length}; + $provision_subscriber_count = $data->{provision_subscriber_count} if exists $data->{provision_subscriber_count}; + + + $providers_yml = $data->{providers_yml} if exists $data->{providers_yml}; + + return $result; + + } + return 0; + +} + +sub update_provider_config { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + @provider_config = @$data; + }; + if ($@) { # or 'HASH' ne ref $barring_profiles or (scalar keys %$barring_profiles) == 0) { + @provider_config = () unless scalar @provider_config; + configurationerror($configfile,'cannot load reseller config',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + +sub _prepare_working_paths { + + my ($create) = @_; + my $result = 1; + my $path_result; + + ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + + return $result; + +} + +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $numofthreads = $default_value; + $numofthreads = $data->{$key} if exists $data->{$key}; + $numofthreads = $cpucount if $numofthreads > $cpucount; + return $numofthreads; +} + +sub check_dry { + + if ($dry) { + scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); + return 1; + } else { + scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); + if (!$force) { + if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { + return 1; + } else { + return 0; + } + } else { + scriptinfo('force option applied',getlogger(__PACKAGE__)); + return 1; + } + } + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/config.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/config.cfg new file mode 100644 index 0000000..f756b4b --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/config.cfg @@ -0,0 +1,62 @@ +##general settings: +working_path = /var/sipwise +#cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = localhost +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = localhost +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = localhost +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = localhost +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = localhost +xa_port = 3306 +xa_databasename = billing +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://10.0.2.15:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +#INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/config.debug.cfg new file mode 100644 index 0000000..9d8d4d4 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/config.debug.cfg @@ -0,0 +1,62 @@ +##general settings: +working_path = /home/rkrenn/temp/massive +#cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.84 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.84 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.84 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.84 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.84 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +#INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/process.pl b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/process.pl new file mode 100644 index 0000000..38aebb5 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/process.pl @@ -0,0 +1,284 @@ +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw( + update_settings + update_provider_config + check_dry + $output_path + $defaultsettings + $defaultconfig + $dry + $skip_errors + $force + + @provider_config + @providers + $providers_yml + +); + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); +use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles); + +use NGCP::BulkProcessor::ConnectorPool qw(destroy_dbs); + +#use NGCP::BulkProcessor::Projects::Massive::Generator::Dao::Blah qw(); + +#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); + +use NGCP::BulkProcessor::Projects::Massive::Generator::Provisioning qw( + provision_subscribers +); +use NGCP::BulkProcessor::Projects::Massive::Generator::Api qw( + setup_provider +); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet + +my @TASK_OPTS = (); + +my $tasks = []; + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); +my $cleanup_all_task_opt = 'cleanup_all'; +push(@TASK_OPTS,$cleanup_all_task_opt); + +my $setup_provider_task_opt = 'setup_provider'; +push(@TASK_OPTS,$setup_provider_task_opt); + +my $provision_subscriber_task_opt = 'provision_subscriber'; +push(@TASK_OPTS,$provision_subscriber_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + #"run=s" => \$run_id, + "dry" => \$dry, + "skip-errors" => \$skip_errors, + "force" => \$force, + ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); + $result &= load_config($providers_yml,\&update_provider_config,$YAML_CONFIG_TYPE); + + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result); + } elsif (lc($cleanup_all_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); + + } elsif (lc($setup_provider_task_opt) eq lc($task)) { + if (taskinfo($setup_provider_task_opt,$result,1)) { + next unless check_dry(); + $result &= setup_provider_task(\@messages); + $completion |= 1; + } + + + } elsif (lc($provision_subscriber_task_opt) eq lc($task)) { + if (taskinfo($provision_subscriber_task_opt,$result,1)) { + next unless check_dry(); + $result &= provision_subscriber_task(\@messages); + $completion |= 1; + } + + } else { + $result = 0; + scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub cleanup_task { + my ($messages,$clean_generated) = @_; + my $result = 0; + if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { + eval { + cleanupcvsdirs() if $clean_generated; + cleanupdbfiles() if $clean_generated; + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + cleanupcertfiles(); + cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + } + if ($@ or !$result) { + push(@$messages,'working directory cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'working directory folders cleaned up'); + return 1; + } +} + +sub setup_provider_task { + my ($messages) = @_; + my $result = 1; + foreach my $params (@provider_config) { + my $provider = eval { setup_provider(%$params); }; + if ($@ or not defined $provider) { + $result = 0; + last unless $skip_errors; + } else { + my %pp = (%$params,%$provider); + push(@providers,\%pp); + } + } + my $stats = ": " . (scalar @providers) . ' resellers'; + #eval { + # + #}; + unless ($result) { + push(@$messages,"setup providers INCOMPLETE$stats"); + } else { + push(@$messages,"setup providers completed$stats"); + } + #destroy_dbs(); + return $result; +} + +sub provision_subscriber_task { + + my ($messages) = @_; + my ($result) = (0); + eval { + ($result) = provision_subscribers(); + }; + my $err = $@; + my $stats = ":"; + eval { + $stats .= "\n total contracts: " . + NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(undef,undef) . ' rows'; + $stats .= "\n total subscribers: " . + NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(undef,undef) . ' rows'; + + $stats .= "\n total aliases: " . + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,undef) . ' rows'; + $stats .= "\n primary aliases: " . + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,1) . ' rows'; + + #$stats .= "\n call forwards: " . + # NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,undef) . ' rows'; + # + #$stats .= "\n registrations: " . + # NGCP::BulkProcessor::Dao::Trunk::kamailio::location::countby_usernamedomain(undef,undef) . ' rows'; + # + #$stats .= "\n trusted sources: " . + # NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources::countby_subscriberid(undef) . ' rows'; + # + #$stats .= "\n non-unique contacts skipped:\n " . join("\n ",keys %$nonunique_contacts) + # if (scalar keys %$nonunique_contacts) > 0; + }; + if ($err or !$result) { + push(@$messages,"provision subscribers INCOMPLETE$stats"); + } else { + push(@$messages,"provision subscribers completed$stats"); + } + destroy_dbs(); + return $result; + +} + +#END { +# # this should not be required explicitly, but prevents Log4Perl's +# # "rootlogger not initialized error upon exit.. +# destroy_all_dbs +#} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.yml b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.yml new file mode 100644 index 0000000..e4e7b69 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.yml @@ -0,0 +1,110 @@ +- + domain: narf1.com + reseller: narf1 + numbers_per_subscriber: 3 + cc: + - 888 + - 999 + ac: + - 123 + - 456 + sn: '000001-100000' +- + domain: narf2.com + reseller: narf2 + cc: + - 888 + - 999 + ac: + - 123 + - 456 + sn: '000001-100000' +- + domain: narf3.com + reseller: narf3 + cc: + - 888 + - 999 + ac: + - 123 + - 456 + sn: '000001-100000' + provider_rate: + prepaid: 0 + fees: + - + destination: ^888.+ + direction: out + offpeak_follow_interval: 5 + offpeak_follow_rate: 1 + offpeak_init_interval: 5 + offpeak_init_rate: 2 + onpeak_follow_interval: 5 + onpeak_follow_rate: 1 + onpeak_init_interval: 5 + onpeak_init_rate: 2 + - + destination: . + direction: in + offpeak_follow_interval: 5 + offpeak_follow_rate: 1 + offpeak_init_interval: 5 + offpeak_init_rate: 1 + onpeak_follow_interval: 5 + onpeak_follow_rate: 1 + onpeak_init_interval: 5 + onpeak_init_rate: 1 + source: ^888.+ + subscriber_rates: + - + prepaid: 0 + fees: + - + destination: ^8882.+ + direction: out + offpeak_follow_interval: 5 + offpeak_follow_rate: 1 + offpeak_init_interval: 5 + offpeak_init_rate: 6 + onpeak_follow_interval: 5 + onpeak_follow_rate: 1 + onpeak_init_interval: 5 + onpeak_init_rate: 6 + - + destination: . + direction: in + offpeak_follow_interval: 5 + offpeak_follow_rate: 1 + offpeak_init_interval: 5 + offpeak_init_rate: 5 + onpeak_follow_interval: 5 + onpeak_follow_rate: 1 + onpeak_init_interval: 5 + onpeak_init_rate: 5 + source: ^8881.+ + - + prepaid: 1 + fees: + - + destination: ^8882.+ + direction: out + offpeak_follow_interval: 5 + offpeak_follow_rate: 1 + offpeak_init_interval: 5 + offpeak_init_rate: 4 + onpeak_follow_interval: 5 + onpeak_follow_rate: 1 + onpeak_init_interval: 5 + onpeak_init_rate: 4 + - + destination: . + direction: in + offpeak_follow_interval: 5 + offpeak_follow_rate: 1 + offpeak_init_interval: 5 + offpeak_init_rate: 3 + onpeak_follow_interval: 5 + onpeak_follow_rate: 1 + onpeak_init_interval: 5 + onpeak_init_rate: 3 + source: ^8881.+ \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg new file mode 100644 index 0000000..91a1030 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg @@ -0,0 +1,14 @@ + +#dry=0 +#skip_errors=0 + +provision_subscriber_multithreading = 1 +#provision_subscriber_numofthreads = 2 +webpassword_length = 8 +webusername_length = 8 +sippassword_length = 16 +sipusername_length = 8 +provision_subscriber_count = 2000 + +providers_yml = providers.yml + diff --git a/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm b/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm index 60ab61a..ddc8f50 100644 --- a/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm +++ b/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm @@ -348,7 +348,10 @@ sub post_get { $self->_request_error(); return undef; } else { - return $self->get($self->response()->header('Location'),$get_headers); + my @ids = $self->_extract_ids_from_response_location(); + my $item = $self->get($self->response()->header('Location'),$get_headers); + $item->{id} = $ids[0] if (scalar @ids) > 0; + return $item; } } diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingFees.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingFees.pm index 90ce912..6475cc1 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingFees.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingFees.pm @@ -10,6 +10,8 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::RestProcessor qw( copy_row + + get_query_string ); use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(); @@ -22,6 +24,7 @@ our @EXPORT_OK = qw( create_item delete_item get_item_path + findby_billingprofileid ); my $get_restapi = \&get_ngcp_restapi; @@ -33,6 +36,13 @@ my $get_item_path_query = sub { }; my $collection_path_query = 'api/' . $resource . '/'; +my $findby_billingprofileid_path_query = sub { + my ($billing_profile_id) = @_; + my $filters = {}; + $filters->{billing_profile_id} = $billing_profile_id if defined $billing_profile_id; + return 'api/' . $resource . '/' . get_query_string($filters); +}; + my $fieldnames = [ 'billing_zone_id', 'destination', @@ -48,6 +58,8 @@ my $fieldnames = [ 'purge_existing', 'source', 'use_free_time', + + 'id', ]; sub new { @@ -69,6 +81,15 @@ sub get_item { } +sub findby_billingprofileid { + + my ($billing_profile_id,$load_recursive,$headers) = @_; + my $restapi = &$get_restapi(); + return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_billingprofileid_path_query($billing_profile_id),$headers),undef,undef, + { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive); + +} + sub create_item { my ($data,$load,$load_recursive,$post_headers,$get_headers) = @_; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm index 86401c8..53dbd72 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingProfiles.pm @@ -11,6 +11,7 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::RestProcessor qw( process_collection copy_row + get_query_string ); use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(); @@ -23,6 +24,7 @@ our @EXPORT_OK = qw( create_item process_items get_item_path + findby_resellerid ); my $get_restapi = \&get_ngcp_restapi; @@ -34,6 +36,13 @@ my $get_item_path_query = sub { }; my $collection_path_query = 'api/' . $resource . '/'; +my $findby_resellerid_path_query = sub { + my ($reseller_id) = @_; + my $filters = {}; + $filters->{reseller_id} = $reseller_id if defined $reseller_id; + return 'api/' . $resource . '/' . get_query_string($filters); +}; + my $fieldnames = [ 'currency', 'fraud_daily_limit', @@ -52,6 +61,8 @@ my $fieldnames = [ 'peaktime_weekdays', 'prepaid', 'reseller_id', + + 'id', ]; sub new { @@ -73,6 +84,15 @@ sub get_item { } +sub findby_resellerid { + + my ($reseller_id,$load_recursive,$headers) = @_; + my $restapi = &$get_restapi(); + return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_resellerid_path_query($reseller_id),$headers),undef,undef, + { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive); + +} + sub create_item { my ($data,$load,$load_recursive,$post_headers,$get_headers) = @_; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingZones.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingZones.pm index 1d7d769..f481b5c 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingZones.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/BillingZones.pm @@ -10,6 +10,8 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::RestProcessor qw( copy_row + + get_query_string ); use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(); @@ -21,6 +23,7 @@ our @EXPORT_OK = qw( get_item create_item get_item_path + findby_billingprofileid ); my $get_restapi = \&get_ngcp_restapi; @@ -32,10 +35,19 @@ my $get_item_path_query = sub { }; my $collection_path_query = 'api/' . $resource . '/'; +my $findby_billingprofileid_path_query = sub { + my ($billing_profile_id) = @_; + my $filters = {}; + $filters->{billing_profile_id} = $billing_profile_id if defined $billing_profile_id; + return 'api/' . $resource . '/' . get_query_string($filters); +}; + my $fieldnames = [ 'billing_profile_id', 'detail', 'zone', + + 'id', ]; sub new { @@ -57,6 +69,15 @@ sub get_item { } +sub findby_billingprofileid { + + my ($billing_profile_id,$load_recursive,$headers) = @_; + my $restapi = &$get_restapi(); + return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$findby_billingprofileid_path_query($billing_profile_id),$headers),undef,undef, + { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive); + +} + sub create_item { my ($data,$load,$load_recursive,$post_headers,$get_headers) = @_; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm index 642af73..bc3f3a2 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Contracts.pm @@ -42,6 +42,8 @@ my $fieldnames = [ 'external_id', 'status', 'type', + + 'id', ]; sub new { diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/CustomerContacts.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/CustomerContacts.pm index f8615f8..e446a7a 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/CustomerContacts.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/CustomerContacts.pm @@ -60,6 +60,8 @@ my $fieldnames = [ 'reseller_id', 'street', 'vatnum', + + 'id', ]; sub new { diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm index 4a9f1d7..8aae001 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Customers.pm @@ -52,6 +52,8 @@ my $fieldnames = [ 'subscriber_email_template', 'type', 'vat_rate', + + 'id', ]; our $TERMINATED_STATE = 'terminated'; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Domains.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Domains.pm index 1e93323..5bb9790 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Domains.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Domains.pm @@ -37,16 +37,18 @@ my $get_item_path_query = sub { }; my $collection_path_query = 'api/' . $resource . '/'; -my $fieldnames = [ - 'domain', - 'reseller_id', -]; - my $get_item_filter_path_query = sub { my ($filters) = @_; return 'api/' . $resource . '/' . get_query_string($filters); }; +my $fieldnames = [ + 'domain', + 'reseller_id', + + 'id', +]; + sub new { my $class = shift; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/LnpCarriers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/LnpCarriers.pm index c0b0624..6a2495c 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/LnpCarriers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/LnpCarriers.pm @@ -37,17 +37,20 @@ my $get_item_path_query = sub { }; my $collection_path_query = 'api/' . $resource . '/'; +my $get_item_filter_path_query = sub { + my ($filters) = @_; + return 'api/' . $resource . '/' . get_query_string($filters); +}; + + my $fieldnames = [ 'authorative', 'name', 'prefix', 'skip_rewrite', -]; -my $get_item_filter_path_query = sub { - my ($filters) = @_; - return 'api/' . $resource . '/' . get_query_string($filters); -}; + 'id', +]; sub new { diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/NcosLevels.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/NcosLevels.pm index ae856ee..5edba04 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/NcosLevels.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/NcosLevels.pm @@ -37,18 +37,20 @@ my $get_item_path_query = sub { }; my $collection_path_query = 'api/' . $resource . '/'; +my $get_item_filter_path_query = sub { + my ($filters) = @_; + return 'api/' . $resource . '/' . get_query_string($filters); +}; + my $fieldnames = [ 'description', 'level', 'local_ac', 'mode', 'reseller_id', -]; -my $get_item_filter_path_query = sub { - my ($filters) = @_; - return 'api/' . $resource . '/' . get_query_string($filters); -}; + 'id', +]; sub new { diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Resellers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Resellers.pm index 624c97e..342a5a6 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Resellers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Resellers.pm @@ -10,6 +10,8 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::RestProcessor qw( copy_row + + get_query_string ); use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(); @@ -22,6 +24,7 @@ our @EXPORT_OK = qw( create_item get_item_path + get_item_filtered ); my $get_restapi = \&get_ngcp_restapi; @@ -33,12 +36,19 @@ my $get_item_path_query = sub { }; my $collection_path_query = 'api/' . $resource . '/'; +my $get_item_filter_path_query = sub { + my ($filters) = @_; + return 'api/' . $resource . '/' . get_query_string($filters); +}; + my $fieldnames = [ 'contract_id', 'enable_rtc', 'name', 'rtc_networks', 'status', + + 'id', ]; sub new { @@ -60,6 +70,15 @@ sub get_item { } +sub get_item_filtered { + + my ($filters,$load_recursive,$headers) = @_; + my $restapi = &$get_restapi(); + return builditems_fromrows($restapi->extract_collection_items($restapi->get(&$get_item_filter_path_query($filters),$headers),undef,undef, + { $NGCP::BulkProcessor::RestConnectors::NGCPRestApi::ITEM_REL_PARAM => $item_relation }),$load_recursive)->[0]; + +} + sub create_item { my ($data,$load,$load_recursive,$post_headers,$get_headers) = @_; diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Subscribers.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Subscribers.pm index 96b806b..1a5bc9e 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/Subscribers.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/Subscribers.pm @@ -58,6 +58,8 @@ my $fieldnames = [ 'username', 'webpassword', 'webusername', + + 'id', ]; sub new { diff --git a/lib/NGCP/BulkProcessor/RestRequests/Trunk/SystemContacts.pm b/lib/NGCP/BulkProcessor/RestRequests/Trunk/SystemContacts.pm index 096e86e..17c73b0 100644 --- a/lib/NGCP/BulkProcessor/RestRequests/Trunk/SystemContacts.pm +++ b/lib/NGCP/BulkProcessor/RestRequests/Trunk/SystemContacts.pm @@ -59,6 +59,8 @@ my $fieldnames = [ 'postcode', 'street', 'vatnum', + + 'id', ]; sub new { diff --git a/lib/NGCP/BulkProcessor/Utils.pm b/lib/NGCP/BulkProcessor/Utils.pm index ceb97e5..37252b0 100644 --- a/lib/NGCP/BulkProcessor/Utils.pm +++ b/lib/NGCP/BulkProcessor/Utils.pm @@ -39,7 +39,7 @@ use File::Path qw(remove_tree make_path); #use Sys::Info::Constants qw( :device_cpu ); # after all, the only reliable way to get the true vCPU count: -#use Sys::CpuAffinity; # qw(getNumCpus); not exported? +use Sys::CpuAffinity; # qw(getNumCpus); not exported? #disabling for now, no debian package yet. require Exporter; @@ -829,7 +829,7 @@ sub secs_to_years { } sub get_cpucount { - my $cpucount = 0; #Sys::CpuAffinity::getNumCpus() + 0; + my $cpucount = Sys::CpuAffinity::getNumCpus() + 0; return ($cpucount > 0) ? $cpucount : 1; #my $info = Sys::Info->new(); #my $cpu = $info->device('CPU'); # => %options );