From 2f9039f38b3483cc77c4bd1de0996aad594e0bf4 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Tue, 22 Jun 2021 16:09:50 +0200 Subject: [PATCH] TT#127150 contract/susbcriber exporter: sqlite tabular export Change-Id: I88ad999230421b7aa3c17a99c5783d047c621473 (cherry picked from commit 94e086773b6c1b13711411ab073753039b71b8b3) --- .../Dao/Trunk/billing/contracts.pm | 8 +- .../Dao/Trunk/billing/ncos_levels.pm | 18 + .../Dao/Trunk/billing/voip_numbers.pm | 17 + .../Dao/Trunk/billing/voip_subscribers.pm | 39 ++ .../Dao/Trunk/kamailio/voicemail_spool.pm | 18 + .../Dao/Trunk/kamailio/voicemail_users.pm | 19 + .../provisioning/voip_allowed_ip_groups.pm | 17 + .../provisioning/voip_cf_destinations.pm | 17 +- .../Trunk/provisioning/voip_cf_mappings.pm | 19 + .../Dao/Trunk/provisioning/voip_dbaliases.pm | 17 + .../provisioning/voip_fax_destinations.pm | 19 +- .../provisioning/voip_fax_preferences.pm | 19 + .../Trunk/provisioning/voip_preferences.pm | 24 +- .../Trunk/provisioning/voip_subscribers.pm | 9 +- .../provisioning/voip_usr_preferences.pm | 36 +- lib/NGCP/BulkProcessor/Globals.pm | 10 +- lib/NGCP/BulkProcessor/LoadConfig.pm | 8 +- lib/NGCP/BulkProcessor/LogError.pm | 3 +- lib/NGCP/BulkProcessor/Mail.pm | 3 +- .../Projects/ETL/Customer/Dao/Tabular.pm | 312 ++++++++++++ .../Projects/ETL/Customer/ExportCustomers.pm | 426 +++++++++++++++++ .../ETL/Customer/ProjectConnectorPool.pm | 84 ++++ .../Projects/ETL/Customer/Settings.pm | 444 ++++++++++++++++++ .../Projects/ETL/Customer/config.cfg | 61 +++ .../Projects/ETL/Customer/config.debug.cfg | 61 +++ .../Projects/ETL/Customer/load_recursive.yml | 41 ++ .../Projects/ETL/Customer/process.pl | 312 ++++++++++++ .../Projects/ETL/Customer/settings.cfg | 59 +++ .../Projects/ETL/Customer/tabular_fields.yml | 64 +++ lib/NGCP/BulkProcessor/SqlRecord.pm | 59 +++ lib/NGCP/BulkProcessor/Utils.pm | 15 + 31 files changed, 2237 insertions(+), 21 deletions(-) create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.debug.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/load_recursive.yml create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular_fields.yml diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm index 0c89fd82..0b7f1527 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm @@ -24,6 +24,7 @@ use NGCP::BulkProcessor::SqlRecord qw(); #use NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); @@ -253,7 +254,8 @@ sub process_records { $uninit_process_context_code, $multithreading, $numofthreads, - $load_recursive) = @params{qw/ + $load_recursive, + $blocksize) = @params{qw/ process_code static_context init_process_context_code @@ -261,6 +263,7 @@ sub process_records { multithreading numofthreads load_recursive + blocksize /}; check_table(); @@ -278,6 +281,7 @@ sub process_records { destroy_reader_dbs_code => \&destroy_dbs, multithreading => $multithreading, tableprocessing_threads => $numofthreads, + blocksize => $blocksize, ); } @@ -337,6 +341,8 @@ sub buildrecords_fromrows { $record = __PACKAGE__->new($row); # transformations go here ... + $record->load_relation($load_recursive,'voip_subscribers','NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_contractid',$record->{id},$load_recursive); + $record->load_relation($load_recursive,'contact','NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id',$record->{contact_id},$load_recursive); push @records,$record; } diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/ncos_levels.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/ncos_levels.pm index d09fcfd9..316b3c41 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/ncos_levels.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/ncos_levels.pm @@ -25,6 +25,7 @@ our @EXPORT_OK = qw( gettablename check_table + findby_id findby_resellerid findby_resellerid_level findby_resellername_level @@ -61,6 +62,23 @@ sub new { } +sub findby_id { + + my ($ncos_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($ncos_id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub findby_resellerid_level { my ($reseller_id,$level,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm index 2e1314e3..4e9ad425 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm @@ -30,6 +30,7 @@ our @EXPORT_OK = qw( insert_row update_row + findby_id findby_subscriberid forupdate_cc_ac_sn_subscriberid release_subscriber_numbers @@ -73,6 +74,22 @@ sub new { } +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} sub findby_subscriberid { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm index 68419bba..8621d91b 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm @@ -34,6 +34,8 @@ our @EXPORT_OK = qw( update_row delete_row + findby_id + findby_contractid findby_domainid_username_states countby_status_resellerid process_records @@ -81,6 +83,23 @@ sub new { } +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub findby_domainid_usernames { my ($xa_db,$domain_id,$usernames,$load_recursive) = @_; @@ -153,6 +172,23 @@ sub findby_domainid_username_states { } +sub findby_contractid { + + my ($contract_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub findby_contractid_states { my ($xa_db,$contract_id,$states,$load_recursive) = @_; @@ -454,6 +490,9 @@ sub buildrecords_fromrows { $record = __PACKAGE__->new($row); # transformations go here ... + $record->load_relation($load_recursive,'domain','NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_id',$record->{domain_id},$load_recursive); + $record->load_relation($load_recursive,'primary_number','NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::findby_id',$record->{primary_number_id},$load_recursive); + $record->load_relation($load_recursive,'provisioning_voip_subscriber','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid',undef,$record->{uuid},$load_recursive); push @records,$record; } diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_spool.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_spool.pm index 5e1b9dd1..5445ab33 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_spool.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_spool.pm @@ -25,6 +25,7 @@ our @EXPORT_OK = qw( gettablename check_table + findby_mailboxuser insert_row ); @@ -64,6 +65,23 @@ sub new { } +sub findby_mailboxuser { + + my ($mailboxuser,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('mailboxuser') . ' = ?'; + my @params = ($mailboxuser); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub insert_row { my $db = &$get_db(); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_users.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_users.pm index f0027c5f..2be02c29 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_users.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/kamailio/voicemail_users.pm @@ -25,6 +25,7 @@ our @EXPORT_OK = qw( gettablename check_table + findby_customerid insert_row ); @@ -77,6 +78,23 @@ sub new { } +sub findby_customerid { + + my ($uuid,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('customer_id') . ' = ?'; + my @params = ($uuid); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub insert_row { my $db = &$get_db(); @@ -132,6 +150,7 @@ sub buildrecords_fromrows { $record = __PACKAGE__->new($row); # transformations go here ... + $record->load_relation($load_recursive,'voicemail_spool','NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_spool::findby_mailboxuser',$record->{mailbox},$load_recursive); push @records,$record; } diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_allowed_ip_groups.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_allowed_ip_groups.pm index 5f487ff9..f269c26b 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_allowed_ip_groups.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_allowed_ip_groups.pm @@ -58,6 +58,23 @@ sub new { } +sub findby_group_id { + + my ($group_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('group_id') . ' = ?'; + my @params = ($group_id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub insert_rows { my ($xa_db,$group_id,$ipnets) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destinations.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destinations.pm index 7ae0cef3..65f2a5c7 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destinations.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_destinations.pm @@ -27,7 +27,7 @@ our @EXPORT_OK = qw( gettablename check_table - + findby_destinationsetid insert_row ); @@ -59,7 +59,22 @@ sub new { } +sub findby_destinationsetid { + + my ($destination_set_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('destination_set_id') . ' = ?'; + my @params = ($destination_set_id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} sub insert_row { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm index b75bd184..0b4c9975 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_cf_mappings.pm @@ -27,6 +27,7 @@ our @EXPORT_OK = qw( gettablename check_table + findby_id countby_subscriberid_type $CFB_TYPE $CFT_TYPE @@ -71,6 +72,23 @@ sub new { } +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub countby_subscriberid_type { my ($subscriber_id,$type,$load_recursive) = @_; @@ -194,6 +212,7 @@ sub buildrecords_fromrows { $record = __PACKAGE__->new($row); # transformations go here ... + $record->load_relation($load_recursive,'destinations','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations::findby_destinationsetid',$record->{destination_set_id},$load_recursive); push @records,$record; } diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm index 3b4a6a78..3d60266d 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm @@ -33,6 +33,7 @@ our @EXPORT_OK = qw( findby_domainid_username countby_subscriberidisprimary findby_subscriberidisprimary + findby_subscriberid ); my $tablename = 'voip_dbaliases'; @@ -64,6 +65,22 @@ sub new { } +sub findby_subscriberid { + + my ($subscriber_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('subscriber_id') . ' = ?'; + my @params = ($subscriber_id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} sub findby_subscriberid_username { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_fax_destinations.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_fax_destinations.pm index 2b1e2738..510f2fcf 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_fax_destinations.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_fax_destinations.pm @@ -26,7 +26,7 @@ our @EXPORT_OK = qw( gettablename check_table - source_findby_subscriberid + findby_subscriberid ); my $tablename = 'voip_fax_destinations'; @@ -61,6 +61,23 @@ sub new { } +sub findby_subscriberid { + + my ($subscriber_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('subscriber_id') . ' = ?'; + my @params = ($subscriber_id); + + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} sub insert_row { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_fax_preferences.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_fax_preferences.pm index 15661da3..552d3109 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_fax_preferences.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_fax_preferences.pm @@ -26,6 +26,7 @@ our @EXPORT_OK = qw( gettablename check_table + findby_subscriberid insert_row ); @@ -62,6 +63,24 @@ sub new { } +sub findby_subscriberid { + + my ($subscriber_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('subscriber_id') . ' = ?'; + my @params = ($subscriber_id); + + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub insert_row { my $db = &$get_db(); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm index e898e519..da4ecd6c 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm @@ -24,6 +24,7 @@ our @EXPORT_OK = qw( findby_attribute findall + findby_id $ALLOWED_CLIS_ATTRIBUTE $CLI_ATTRIBUTE @@ -33,7 +34,6 @@ our @EXPORT_OK = qw( $NCOS_ID_ATTRIBUTE $ADM_NCOS_ID_ATTRIBUTE - $ADM_CF_NCOS_ID_ATTRIBUTE $GPPx_ATTRIBUTE %DPID_ATTRIBUTES @@ -78,6 +78,7 @@ our @EXPORT_OK = qw( $BOOLEAN_DATA_TYPE ); #$FORCE_OUTBOUND_CALLS_TO_PEER +#$ADM_CF_NCOS_ID_ATTRIBUTE my $tablename = 'voip_preferences'; my $get_db = \&get_provisioning_db; @@ -113,7 +114,7 @@ our $ACCOUNT_ID_ATTRIBUTE = 'account_id'; our $NCOS_ID_ATTRIBUTE = 'ncos_id'; our $ADM_NCOS_ID_ATTRIBUTE = 'adm_ncos_id'; -our $ADM_CF_NCOS_ID_ATTRIBUTE = 'adm_ncos_id'; +#our $ADM_CF_NCOS_ID_ATTRIBUTE = 'adm_cf_ncos_id'; our $GPPx_ATTRIBUTE = 'gpp'; our %DPID_ATTRIBUTES = map { 'rewrite_' . $_ => $_; } @NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_rewrite_rule_sets::DPID_FIELDS; @@ -132,7 +133,7 @@ our $CONCURRENT_MAX_TOTAL_ATTRIBUTE = 'concurrent_max_total'; our $CONCURRENT_MAX_PER_ACCOUNT_ATTRIBUTE = 'concurrent_max_per_account'; our $CLIR_ATTRIBUTE = 'clir'; -our @CF_ATTRIBUTES = qw(cfu cft cfna cfb); #skip sms for now +our @CF_ATTRIBUTES = qw(cfu cft cfna cfb cfo cfr cfs); our $RINGTIMEOUT_ATTRIBUTE = 'ringtimeout'; @@ -190,6 +191,23 @@ sub findby_attribute { } +sub findby_id { + + my ($attribute_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($attribute_id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub findall { my ($load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_subscribers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_subscribers.pm index d1d9f666..1d0471c9 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_subscribers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_subscribers.pm @@ -228,7 +228,14 @@ sub buildrecords_fromrows { $record = __PACKAGE__->new($row); # transformations go here ... - + $record->load_relation($load_recursive,'voip_dbaliases','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberid',$record->{id},$load_recursive); + $record->load_relation($load_recursive,'voip_usr_preferences','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid',$record->{id},$load_recursive); + + $record->load_relation($load_recursive,'voicemail_users','NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users::findby_customerid',$record->{uuid},$load_recursive); + + $record->load_relation($load_recursive,'voip_fax_preferences','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_fax_preferences::findby_subscriberid',$record->{id},$load_recursive); + $record->load_relation($load_recursive,'voip_fax_destinations','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_fax_destinations::findby_subscriberid',$record->{id},$load_recursive); + push @records,$record; } } diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_usr_preferences.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_usr_preferences.pm index c16f78ea..9171cad2 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_usr_preferences.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_usr_preferences.pm @@ -22,6 +22,9 @@ use NGCP::BulkProcessor::SqlProcessor qw( ); use NGCP::BulkProcessor::SqlRecord qw(); +# required to use the constants: +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); + require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); our @EXPORT_OK = qw( @@ -35,6 +38,7 @@ our @EXPORT_OK = qw( findby_subscriberid_attributeid countby_subscriberid_attributeid_value + findby_subscriberid $TRUE $FALSE @@ -70,6 +74,24 @@ sub new { } +sub findby_subscriberid { + + my ($subscriber_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT v.*,a.attribute FROM ' . $table . ' v JOIN ' . + $db->tableidentifier('voip_preferences') . ' a ON v.attribute_id = a.id WHERE ' . + 'v.subscriber_id = ?'; + my @params = ($subscriber_id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub findby_subscriberid_attributeid { my ($xa_db,$subscriber_id,$attribute_id,$load_recursive) = @_; @@ -227,7 +249,19 @@ sub buildrecords_fromrows { $record = __PACKAGE__->new($row); # transformations go here ... - + $record->{_attribute} = $row->{attribute}; + $record->load_relation($load_recursive,'attribute','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_id',$record->{attribute_id},$load_recursive); + $record->{_attribute} //= $record->{attribute}->{attribute} if exists $record->{attribute}; + if ($record->{_attribute}) { + $record->load_relation($load_recursive,'allowed_ips','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::findby_group_id',$record->{value},$load_recursive) + if ($record->{_attribute} eq $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE + or $record->{_attribute} eq $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::MAN_ALLOWED_IPS_GRP_ATTRIBUTE); + $record->load_relation($load_recursive,'ncos','NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels::findby_id',$record->{value},$load_recursive) + if ($record->{_attribute} eq $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::NCOS_ID_ATTRIBUTE + or $record->{_attribute} eq $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ADM_NCOS_ID_ATTRIBUTE); + $record->load_relation($load_recursive,"cf_mapping",'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::findby_id',$record->{value},$load_recursive) + if (grep { $record->{_attribute} eq $_; } @NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CF_ATTRIBUTES); + } push @records,$record; } } diff --git a/lib/NGCP/BulkProcessor/Globals.pm b/lib/NGCP/BulkProcessor/Globals.pm index 18427069..be13fae0 100644 --- a/lib/NGCP/BulkProcessor/Globals.pm +++ b/lib/NGCP/BulkProcessor/Globals.pm @@ -141,11 +141,11 @@ our $VERSION = '1.0.1'; our $system_version = $VERSION; #keep this filename-save our $system_abbreviation = 'bulkprocessor'; #keep this filename-, dbname-save our $system_instance = 'ngcp'; #'test'; #'2014'; #dbname-save 0-9a-z_ -our $system_instance_label = 'some node'; +our $system_instance_label; our $local_ip = get_ipaddress(); our $local_fqdn = get_hostfqdn(); -our $application_version = $main::VERSION // $VERSION; +our $application_version = ($main::VERSION // $VERSION); our $application_path = get_applicationpath(); our $executable_path = $FindBin::Bin . '/'; #my $remotefilesystem = "MSWin32"; @@ -275,12 +275,12 @@ our $smtppasswd = 'xyz'; our $sender_address = 'donotreply@sipwise.com'; - #service layer: our @jobservers = ('127.0.0.1:4730'); #our $jobnamespace = $system_abbreviation . '-' . $system_version . '-' . $local_fqdn . '-' . $system_instance; -our $jobnamespace = $system_abbreviation . '-' . $system_version . '-' . $system_instance; - +our $jobnamespace = $system_abbreviation; +$jobnamespace .= '-' . $system_version if length($system_version); +$jobnamespace .= '-' . $system_instance if length($system_instance); # test directory diff --git a/lib/NGCP/BulkProcessor/LoadConfig.pm b/lib/NGCP/BulkProcessor/LoadConfig.pm index d1280e59..8f4bb4c2 100644 --- a/lib/NGCP/BulkProcessor/LoadConfig.pm +++ b/lib/NGCP/BulkProcessor/LoadConfig.pm @@ -5,7 +5,6 @@ use strict; use NGCP::BulkProcessor::Globals qw( $system_name - $system_version $system_instance_label $local_fqdn $application_version @@ -30,7 +29,8 @@ use NGCP::BulkProcessor::LogError qw( configurationerror ); -use YAML::XS qw(); +use YAML qw(); +$YAML::UseCode = 1; use Config::Any qw(); use NGCP::BulkProcessor::Utils qw(format_number trim); @@ -172,7 +172,7 @@ sub _search_path { sub _splashinfo { my ($configfile) = @_; - configurationinfo($system_name . ' ' . $system_version . ' (' . $system_instance_label . ') [' . $local_fqdn . ']',getlogger(__PACKAGE__)); + configurationinfo($system_name . (length($system_instance_label) ? ' (' . $system_instance_label . ')' : '') . ' [' . $local_fqdn . ']',getlogger(__PACKAGE__)); configurationinfo('application version: ' . $application_version,getlogger(__PACKAGE__)); configurationinfo('application path: ' . $application_path,getlogger(__PACKAGE__)); configurationinfo('working path: ' . $working_path,getlogger(__PACKAGE__)); @@ -304,7 +304,7 @@ sub _parse_yaml_config { my $config = undef; eval { - $config = YAML::XS::LoadFile($file); + $config = YAML::LoadFile($file); }; if ($@) { configurationerror($file,'parsing yaml format - error: ' . $@,getlogger(__PACKAGE__)); diff --git a/lib/NGCP/BulkProcessor/LogError.pm b/lib/NGCP/BulkProcessor/LogError.pm index 837f1c2c..661cacf8 100644 --- a/lib/NGCP/BulkProcessor/LogError.pm +++ b/lib/NGCP/BulkProcessor/LogError.pm @@ -4,7 +4,6 @@ use strict; ## no critic use NGCP::BulkProcessor::Globals qw( - $system_version $erroremailrecipient $warnemailrecipient $doneemailrecipient @@ -418,7 +417,7 @@ sub restresponseerror { sub fieldnamesdiffer { my ($db,$tablename,$expectedfieldnames,$fieldnamesfound,$logger) = @_; - my $message = _getsqlconnectorinstanceprefix($db) . 'wrong table fieldnames (v ' . $system_version . '): [' . $db->connectidentifier() . '].' . $tablename . ":\nexpected: " . ((defined $expectedfieldnames) ? join(', ',@$expectedfieldnames) : '') . "\nfound: " . ((defined $fieldnamesfound) ? join(', ',@$fieldnamesfound) : ''); + my $message = _getsqlconnectorinstanceprefix($db) . 'wrong table fieldnames: [' . $db->connectidentifier() . '].' . $tablename . ":\nexpected: " . ((defined $expectedfieldnames) ? join(', ',@$expectedfieldnames) : '') . "\nfound: " . ((defined $fieldnamesfound) ? join(', ',@$fieldnamesfound) : ''); if (defined $logger) { $logger->error($message); } diff --git a/lib/NGCP/BulkProcessor/Mail.pm b/lib/NGCP/BulkProcessor/Mail.pm index 5828d9fc..c1bbb919 100644 --- a/lib/NGCP/BulkProcessor/Mail.pm +++ b/lib/NGCP/BulkProcessor/Mail.pm @@ -12,7 +12,6 @@ use NGCP::BulkProcessor::Logging qw( use NGCP::BulkProcessor::Globals qw( $system_name $system_instance_label - $system_version $local_fqdn $mailfile_path $emailenable @@ -49,7 +48,7 @@ our @EXPORT_OK = qw( my $wordwrapcolumns = 72; #linebreak/wrap columns -our $signature = "--\n" . $system_name . ' ' . $system_version . ' (' . $system_instance_label . ")\n[" . $local_fqdn . ']'; # a nice email signature +our $signature = "--\n" . $system_name . (length($system_instance_label) ? ' (' . $system_instance_label . ')' : '') . "\n[" . $local_fqdn . ']'; # a nice email signature my $msgextension = '.msg'; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm new file mode 100644 index 00000000..483aa0d3 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm @@ -0,0 +1,312 @@ +package NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular; +use strict; + +## no critic + +use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs +); +#import_db_tableidentifier + +use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( + $tabular_fields +); + +use NGCP::BulkProcessor::SqlProcessor qw( + registertableinfo + create_targettable + checktableinfo + copy_row + + insert_stmt + +); +#process_table +use NGCP::BulkProcessor::SqlRecord qw(); + +#use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + getupsertstatement + + get_fieldnames + + + update_delta + findby_delta + countby_delta + + $deleted_delta + $updated_delta + $added_delta + +); +#@fieldnames +#findby_sipusername +#findby_ccacsn +#countby_ccacsn + +my $tablename = 'tabular'; +my $get_db = \&get_sqlite_db; +#my $get_tablename = \&import_db_tableidentifier; + +my $fieldnames; +my $expected_fieldnames; +sub get_fieldnames { + my $expected = shift; + unless (defined $fieldnames and defined $expected_fieldnames) { + $fieldnames = [ map { + local $_ = (ref $_ ? $_->{path} : $_); + $_ =~ s/\./_/g; + $_ =~ s/\[(\d+)\]/_$1/g; + $_; + } @$tabular_fields ]; + $expected_fieldnames = [ @$fieldnames ]; + push(@$expected_fieldnames,'domain') unless grep { 'domain' eq $_; } @$expected_fieldnames; + push(@$expected_fieldnames,'username') unless grep { 'username' eq $_; } @$expected_fieldnames; + push(@$expected_fieldnames,'delta'); + } + return $fieldnames unless $expected; + return $expected_fieldnames; +} + +# table creation: +my $primarykey_fieldnames = [ 'domain', 'username' ]; +my $indexes = { + #$tablename . '_number' => [ 'number(32)' ], + #$tablename . '_rownum' => [ 'rownum(11)' ], + $tablename . '_delta' => [ 'delta(7)' ], +}; +#my $fixtable_statements = []; + +our $deleted_delta = 'DELETED'; +our $updated_delta = 'UPDATED'; +our $added_delta = 'ADDED'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,get_fieldnames(1),$indexes); + + copy_row($self,shift,get_fieldnames(1)); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,__PACKAGE__,$tablename,get_fieldnames(1),$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); + +} + +sub findby_delta { + + my ($delta,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless defined $delta; + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('delta') . ' = ?' + , $delta); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_domainusername { + + my ($domain,$username,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless (defined $domain and defined $username); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . $table . + ' WHERE ' . $db->columnidentifier('domain') . ' = ?' . + ' AND ' . $db->columnidentifier('username') . ' = ?' + , $domain, $username); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub update_delta { + + my ($domain,$username,$delta) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; + my @params = (); + push(@params,$delta); + if (defined $domain or defined $username) { + $stmt .= ' WHERE ' . + $db->columnidentifier('domain') . ' = ?' . + ' AND ' . $db->columnidentifier('username') . ' = ?'; + push(@params, $domain, $username); + } + + return $db->db_do($stmt,@params); + +} + +sub countby_delta { + + my ($deltas) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1'; + my @params = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $deltas and length($deltas) > 0) { + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?'; + push(@params,$deltas); + } + + return $db->db_get_value($stmt,@params); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +#sub process_records { +# +# my %params = @_; +# my ($process_code, +# $static_context, +# $init_process_context_code, +# $uninit_process_context_code, +# $multithreading, +# $numofthreads) = @params{qw/ +# process_code +# static_context +# init_process_context_code +# uninit_process_context_code +# multithreading +# numofthreads +# /}; +# +# check_table(); +# my $db = &$get_db(); +# my $table = $db->tableidentifier($tablename); +# +# my @cols = map { $db->columnidentifier($_); } qw/domain sip_username/; +# +# return process_table( +# get_db => $get_db, +# class => __PACKAGE__, +# process_code => sub { +# my ($context,$rowblock,$row_offset) = @_; +# return &$process_code($context,$rowblock,$row_offset); +# }, +# static_context => $static_context, +# init_process_context_code => $init_process_context_code, +# uninit_process_context_code => $uninit_process_context_code, +# destroy_reader_dbs_code => \&destroy_all_dbs, +# multithreading => $multithreading, +# tableprocessing_threads => $numofthreads, +# 'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols), +# 'selectcount' => 'SELECT COUNT(DISTINCT(' . join(',',@cols) . ')) FROM ' . $table, +# ); +#} + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + +sub getupsertstatement { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . + join(', ', map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @{get_fieldnames(1)}) . ')'; + my @values = (); + foreach my $fieldname (@{get_fieldnames(1)}) { + if ('delta' eq $fieldname) { + my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . + $db->columnidentifier('domain') . ' = ?' . + ' AND ' . $db->columnidentifier('username') . ' = ?'; + push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); + } else { + push(@values,'?'); + } + } + $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; + return $upsert_stmt; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + get_fieldnames(1), + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm new file mode 100644 index 00000000..60a7889f --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm @@ -0,0 +1,426 @@ +package NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers; +use strict; + +## no critic + +use threads::shared qw(); + +use NGCP::BulkProcessor::Serialization qw(); +use Scalar::Util 'blessed'; +use MIME::Base64 qw(encode_base64); + +use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( + $dry + $skip_errors + + $export_customers_multithreading + $export_customers_numofthreads + $export_customers_blocksize + + + run_dao_method + get_dao_var + get_export_filename + + write_export_file + $customer_export_filename_format + + $tabular_fields + $load_recursive + $tabular_single_row_txn + $ignore_tabular_unique +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn + fileerror +); + +use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); + +use NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular qw(); + +use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs + ping_all_dbs +); + +use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp stringtobool trim); #check_ipnet +#use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); +#use NGCP::BulkProcessor::Table qw(get_rowhash); +use NGCP::BulkProcessor::Array qw(array_to_map); +use NGCP::BulkProcessor::DSPath qw(); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + export_customers_graph + export_customers_tabular +); + +sub export_customers_graph { + + my $static_context = { + + }; + my $result = 1; #_copy_customers_checks($static_context); + + destroy_all_dbs(); + my $warning_count :shared = 0; + return ($result && run_dao_method('billing::contracts::process_records', + #source_dbs => $static_context->{source_dbs}, + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + ping_all_dbs(); + my @data = (); + foreach my $record (@$records) { + next unless _export_customer_graph_init_context($context,$record); + push(@data,_get_contract_graph($context->{contract})); + } + write_export_file(\@data,$context->{export_filename},$context->{export_format}); + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + #$context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + ($context->{export_filename},$context->{export_format}) = get_export_filename($customer_export_filename_format); + }, + uninit_process_context_code => sub { + my ($context)= @_; + #undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + destroy_reader_dbs_code => \&destroy_all_dbs, + blocksize => $export_customers_blocksize, + multithreading => $export_customers_multithreading, + numofthreads => $export_customers_numofthreads, + ),$warning_count,); + +} + +sub export_customers_tabular { + + my $result = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::create_table(0); + + my $static_context = { + upsert => _tabular_rows_reset_delta(), + }; + + destroy_all_dbs(); + my $warning_count :shared = 0; + return ($result && run_dao_method('billing::contracts::process_records', + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + ping_all_dbs(); + my @subscriber_rows = (); + foreach my $record (@$records) { + next unless _export_customer_tabular_init_context($context,$record); + push(@subscriber_rows, _get_subscriber_rows($context)); + + if ($tabular_single_row_txn and (scalar @subscriber_rows) > 0) { + while (defined (my $subscriber_row = shift @subscriber_rows)) { + if ($skip_errors) { + eval { _insert_tabular_rows($context,[$subscriber_row]); }; + _warn($context,$@) if $@; + } else { + _insert_tabular_rows($context,[$subscriber_row]); + } + } + } + } + + if (not $tabular_single_row_txn and (scalar @subscriber_rows) > 0) { + if ($skip_errors) { + eval { insert_tabular_rows($context,\@subscriber_rows); }; + _warn($context,$@) if $@; + } else { + insert_tabular_rows($context,\@subscriber_rows); + } + } + + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_sqlite_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + destroy_reader_dbs_code => \&destroy_all_dbs, + blocksize => $export_customers_blocksize, + multithreading => $export_customers_multithreading, + numofthreads => $export_customers_numofthreads, + ),$warning_count,); + +} + +sub _tabular_rows_reset_delta { + my $upsert = 0; + if (NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta() > 0) { + processing_info(threadid(),'resetting delta of ' . + NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::update_delta(undef, + $NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::deleted_delta) . + ' records',getlogger(__PACKAGE__)); + $upsert |= 1; + } + return $upsert; +} + +sub _insert_tabular_rows { + my ($context,$subscriber_rows) = @_; + $context->{db}->db_do_begin( + ($context->{upsert} ? + NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::getupsertstatement() + : NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::getinsertstatement($ignore_tabular_unique)), + ); + eval { + $context->{db}->db_do_rowblock($subscriber_rows); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } + +} + +sub _export_customer_graph_init_context { + + my ($context,$record) = @_; + + my $result = 1; + + return 0 unless _load_contract($context,$record); + + return $result; + +} + +sub _get_contract_graph { + my ($context) = @_; + + #sub unshare { + # + # my ($obj,) = @_; + # my $ref = ref $obj; + # if ("ARRAY" eq $ref) { + # my @array = (); + # my $i = 0; + # foreach my $value (@$obj) { + # push(@array, unshare($value)) if xx; + # $i++; + # } + # return \@array; + # } elsif ($ref eq "HASH") { + # my %hash = (); + # foreach my $key (keys %$obj) { + # $hash{$key} = unshare($obj->{$key}) if xx; + # } + # return \%hash; + # } + # + #} + + foreach my $bill_subs (@{$context->{contract}->{voip_subscribers}}) { + ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) = + array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, + sub { return shift->{attribute}; }, sub { my $p = shift; }, 'group' ); + if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) { + foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) { + foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) { + $voicemail->{recording} = encode_base64($voicemail->{recording},''); + } + } + } + my $dp = NGCP::BulkProcessor::DSPath->new($bill_subs, { + retrieve_key_from_non_hash => sub {}, + key_does_not_exist => sub {}, + index_does_not_exist => sub {}, + }); + #foreach my $graph_field (@$graph_fields) { + # my $a; + # my $sep = ','; + # if ('HASH' eq ref $tabular_field) { + # $a = $tabular_field->{path}; + # $sep = $tabular_field->{sep}; + # } else { + # $a = $tabular_field; + # } + # #eval {'' . ($dp->get('.' . $a) // '');}; if($@){ + # # my $x=5; + # #} + # my $v = $dp->get('.' . $a); + # if ('ARRAY' eq ref $v) { + # if ('HASH' eq ref $v->[0]) { + # $v = join($sep, sort map { $_->{$tabular_field->{field}}; } @$v); + # } else { + # $v = join($sep, sort @$v); + # } + # } else { + # $v = '' . ($v // ''); + # } + # push(@row,$v); + #} + } +} + +sub _export_customer_tabular_init_context { + + my ($context,$record) = @_; + + my $result = 1; + + return 0 unless _load_contract($context,$record); + + if (not scalar @{$context->{contract}->{voip_subscribers}}) { + _info($context,"contract ID $record->{id} has no subscribers, skipping",1); + $result = 0; + } + + return $result; + +} + +sub _get_subscriber_rows { + + my ($context) = @_; + + my @rows = (); + foreach my $bill_subs (@{$context->{contract}->{voip_subscribers}}) { + my @row = (); + $bill_subs->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts->new($context->{contract}); #no circular ref + ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) = + array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, + sub { return shift->{_attribute}; }, sub { my $p = shift; }, 'group' ); + if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) { + foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) { + foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) { + $voicemail->{recording} = encode_base64($voicemail->{recording},''); + } + } + } + my $dp = NGCP::BulkProcessor::DSPath->new($bill_subs, { + retrieve_key_from_non_hash => sub {}, + key_does_not_exist => sub {}, + index_does_not_exist => sub {}, + }); + foreach my $tabular_field (@$tabular_fields) { + my $a; + my $sep = ','; + if ('HASH' eq ref $tabular_field) { + $a = $tabular_field->{path}; + $sep = $tabular_field->{sep}; + } else { + $a = $tabular_field; + } + #eval {'' . ($dp->get('.' . $a) // '');}; if($@){ + # my $x=5; + #} + my $v = $dp->get('.' . $a); + if ('ARRAY' eq ref $v) { + if ('HASH' eq ref $v->[0] + or (blessed($v->[0]) and $v->[0]->isa('NGCP::BulkProcessor::SqlRecord'))) { + $v = join($sep, sort map { $_->{$tabular_field->{field}}; } @$v); + } else { + $v = join($sep, sort @$v); + } + } else { + $v = '' . ($v // ''); + } + push(@row,$v); + } + push(@row,$bill_subs->{domain}->{domain}) unless grep { 'domain' eq $_; } @{NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::get_fieldnames()}; + push(@row,$bill_subs->{username}) unless grep { 'username' eq $_; } @{NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::get_fieldnames()}; + if ($context->{upsert}) { + push(@row,$bill_subs->{domain}->{domain},$bill_subs->{username}); + } else { + push(@row,$NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::added_delta); + } + + push(@rows,\@row); + } + + return @rows; + +} + +sub _load_contract { + + my ($context,$record) = @_; + $context->{contract} = run_dao_method('billing::contracts::findby_id', $record->{id}, { %$load_recursive, + 'contracts.voip_subscribers.domain' => 1, + _context => { + _info => \&_info, + _error => \&_error, + _debug => \&_debug, + _warn => \&_warn, + context => $context, + }, + }); + + return 1 if $context->{contract}; + return 0; + +} + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + } +} + +sub _debug { + + my ($context,$message,$debug) = @_; + processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm new file mode 100644 index 00000000..b5fed1a8 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm @@ -0,0 +1,84 @@ +package NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool; +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); + +use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( + + $sqlite_db_file +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_connectorinstancename +); + +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); + +use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + + get_sqlite_db + sqlite_db_tableidentifier + + destroy_dbs + destroy_all_dbs + ping_all_dbs + +); + +my $sqlite_dbs = {}; + +sub get_sqlite_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); #threadid(); #shift; + + if (not defined $sqlite_dbs->{$name}) { + $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file); + } + + return $sqlite_dbs->{$name}; + +} + +sub sqlite_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file)); + +} + +sub destroy_dbs { + + foreach my $name (keys %$sqlite_dbs) { + cleartableinfo($sqlite_dbs->{$name}); + undef $sqlite_dbs->{$name}; + delete $sqlite_dbs->{$name}; + } + +} + +sub destroy_all_dbs() { + destroy_dbs(); + NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); +} + +sub ping_all_dbs() { + NGCP::BulkProcessor::ConnectorPool::ping_dbs(); +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm new file mode 100644 index 00000000..801cc7df --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm @@ -0,0 +1,444 @@ +package NGCP::BulkProcessor::Projects::ETL::Customer::Settings; +use strict; + +## no critic + +use threads::shared qw(); + +use File::Basename qw(fileparse); +use NGCP::BulkProcessor::Serialization qw(); +use DateTime::TimeZone qw(); + +use JSON -support_by_pp, -no_export; +*NGCP::BulkProcessor::Serialization::serialize_json = sub { + my $input_ref = shift; + #return JSON::XS::encode_json($input_ref); + return JSON::to_json($input_ref, { allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, as_nonblessed => 1 }); +}; + +use NGCP::BulkProcessor::Globals qw( + $working_path + $enablemultithreading + $cpucount + create_path +); + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + filewarn + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); +use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid load_module); +#format_number +use NGCP::BulkProcessor::Array qw(contains); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + run_dao_method + get_dao_var + get_export_filename + write_export_file + write_sql_file + + update_load_recursive + $load_recursive_yml + $load_recursive + + update_tabular_fields + $tabular_fields_yml + $tabular_fields + $ignore_tabular_unique + $tabular_single_row_txn + + $sqlite_db_file + + check_dry + + $output_path + $input_path + + $customer_export_filename_format + $customer_import_filename + $split_customers + + + $defaultsettings + $defaultconfig + + $dry + $skip_errors + $force + + + $export_customers_multithreading + $export_customers_numofthreads + $export_customers_blocksize + + + $cf_default_priority + $cf_default_timeout + $cft_default_ringtimeout + + $rollback_sql_export_filename_format + $rollback_sql_stmt_format +); + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.cfg'; + +our $tabular_fields_yml = 'tabular_fields.yml'; +our $tabular_fields = []; +our $ignore_tabular_unique = 0; +our $tabular_single_row_txn = 1; + +our $load_recursive_yml = 'load_recursive_yml.yml'; +our $load_recursive; + +our $output_path = $working_path . 'output/'; +our $input_path = $working_path . 'input/'; + +our $customer_export_filename_format = undef; + +our $customer_import_filename = undef; +our $customer_import_numofthreads = $cpucount; +our $customer_import_multithreading = 1; +our $customer_reseller_name = 'default'; +our $customer_billing_profile_name = 'Default Billing Profile'; +our $customer_domain = undef; +our $customer_contact_email_format = '%s@example.org'; +our $subscriber_contact_email_format = '%s@example.org'; +our $split_customers = 0; + +our $subscriber_timezone = undef; +our $contract_timezone = undef; + +our $subscriber_profile_set_name = undef; +our $subscriber_profile_name = undef; +our $webusername_format = '%1$s'; +our $subscriber_externalid_format = undef; + + +our $force = 0; +our $dry = 0; +our $skip_errors = 0; + +my $mr = 'Trunk'; +my @supported_mr = ('Trunk'); + +our $sqlite_db_file = 'sqlite'; + + +our $export_customers_multithreading = $enablemultithreading; +our $export_customers_numofthreads = $cpucount; +our $export_customers_blocksize = 1000; + + +our $cf_default_priority = 1; +our $cf_default_timeout = 300; +our $cft_default_ringtimeout = 20; + +our $rollback_sql_export_filename_format = undef; +our $rollback_sql_stmt_format = undef; + +my $file_lock :shared = undef; + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + my $regexp_result; + + #&$configurationinfocode("testinfomessage",$configlogger); + + $result &= _prepare_working_paths(1); + + $customer_export_filename_format = $data->{customer_export_filename} if exists $data->{customer_export_filename}; + get_export_filename($data->{customer_export_filename},$configfile); + + + $rollback_sql_export_filename_format = $data->{rollback_sql_export_filename_format} if exists $data->{rollback_sql_export_filename_format}; + get_export_filename($data->{rollback_sql_export_filename_format},$configfile); + $rollback_sql_stmt_format = $data->{rollback_sql_stmt_format} if exists $data->{rollback_sql_stmt_format}; + + $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; + + $customer_import_filename = _get_import_filename($customer_import_filename,$data,'customer_import_filename'); + $customer_import_multithreading = $data->{customer_import_multithreading} if exists $data->{customer_import_multithreading}; + $customer_import_numofthreads = _get_numofthreads($cpucount,$data,'customer_import_numofthreads'); + $customer_reseller_name = $data->{customer_reseller_name} if exists $data->{customer_reseller_name}; + $customer_billing_profile_name = $data->{customer_billing_profile_name} if exists $data->{customer_billing_profile_name}; + $customer_domain = $data->{customer_domain} if exists $data->{customer_domain}; + $customer_contact_email_format = $data->{customer_contact_email_format} if exists $data->{customer_contact_email_format}; + $subscriber_contact_email_format = $data->{subscriber_contact_email_format} if exists $data->{subscriber_contact_email_format}; + $split_customers = $data->{split_customers} if exists $data->{split_customers}; + + $contract_timezone = $data->{customer_timezone} if exists $data->{customer_timezone}; + if ($contract_timezone and not DateTime::TimeZone->is_valid_name($contract_timezone)) { + configurationerror($configfile,"invalid customer_timezone '$contract_timezone'"); + $result = 0; + } + + $subscriber_timezone = $data->{subscriber_timezone} if exists $data->{subscriber_timezone}; + if ($subscriber_timezone and not DateTime::TimeZone->is_valid_name($subscriber_timezone)) { + configurationerror($configfile,"invalid subscriber_timezone '$subscriber_timezone'"); + $result = 0; + } + + $subscriber_profile_set_name = $data->{subscriber_profile_set_name} if exists $data->{subscriber_profile_set_name}; + $subscriber_profile_name = $data->{subscriber_profile_name} if exists $data->{subscriber_profile_name}; + if ($subscriber_profile_set_name and not $subscriber_profile_name + or not $subscriber_profile_set_name and $subscriber_profile_name) { + configurationerror($configfile,"both subscriber_profile_set_name and subscriber_profile_name required"); + $result = 0; + } + $webusername_format = $data->{webusername_format} if exists $data->{webusername_format}; + $subscriber_externalid_format = $data->{subscriber_externalid_format} if exists $data->{subscriber_externalid_format}; + + $dry = $data->{dry} if exists $data->{dry}; + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + + + $export_customers_multithreading = $data->{export_customers_multithreading} if exists $data->{export_customers_multithreading}; + $export_customers_numofthreads = _get_numofthreads($cpucount,$data,'export_customers_numofthreads'); + $export_customers_blocksize = $data->{export_customers_blocksize} if exists $data->{export_customers_blocksize}; + + $tabular_fields_yml = $data->{tabular_fields_yml} if exists $data->{tabular_fields_yml}; + $load_recursive_yml = $data->{load_recursive_yml} if exists $data->{load_recursive_yml}; + $tabular_single_row_txn = $data->{tabular_single_row_txn} if exists $data->{tabular_single_row_txn}; + $ignore_tabular_unique = $data->{ignore_tabular_unique} if exists $data->{ignore_tabular_unique}; + + $cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority}; + $cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout}; + $cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout}; + + $mr = $data->{schema_version}; + if (not defined $mr or not contains($mr,\@supported_mr)) { + configurationerror($configfile,'version must be one of ' . join(', ', @supported_mr)); + $result = 0; + } + + + return $result; + + } + return 0; + +} + +sub run_dao_method { + my $method_name = 'NGCP::BulkProcessor::Dao::' . $mr . '::' . shift; + load_module($method_name); + no strict 'refs'; + return $method_name->(@_); +} + +sub get_dao_var { + my $var_name = 'NGCP::BulkProcessor::Dao::' . $mr . '::' . shift; + load_module($var_name); + no strict 'refs'; + return @{$var_name} if wantarray; + return ${$var_name}; +} + +sub _prepare_working_paths { + + my ($create) = @_; + my $result = 1; + my $path_result; + + ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + + return $result; + +} + +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $numofthreads = $default_value; + $numofthreads = $data->{$key} if exists $data->{$key}; + $numofthreads = $cpucount if $numofthreads > $cpucount; + return $numofthreads; +} + +sub get_export_filename { + my ($filename_format,$configfile) = @_; + my $export_filename; + my $export_format; + if ($filename_format) { + $export_filename = $output_path . sprintf($filename_format,timestampdigits(),threadid()); + if (-e $export_filename and (unlink $export_filename) == 0) { + filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); + $export_filename = undef; + } + my ($name,$path,$suffix) = fileparse($export_filename,".json",".yml",".yaml",".sql"); + if ($suffix eq '.json') { + $export_format = $NGCP::BulkProcessor::Serialization::format_json; + } elsif ($suffix eq '.yml' or $suffix eq '.yaml') { + $export_format = $NGCP::BulkProcessor::Serialization::format_yaml; + } elsif ($suffix eq '.sql') { + + } else { + configurationerror($configfile,"$filename_format: either .json or .yaml export file format required"); + } + } + return ($export_filename,$export_format); +} + +sub write_export_file { + + my ($data,$export_filename,$export_format) = @_; + if (defined $export_filename) { + # "concatenated json" https://en.wikipedia.org/wiki/JSON_streaming + my $str = ''; + if (ref $data eq 'ARRAY') { + foreach my $obj (@$data) { + #$str .= "\n" if length($str); + $str .= NGCP::BulkProcessor::Serialization::serialize($obj,$export_format); + } + } else { + $str = NGCP::BulkProcessor::Serialization::serialize($data,$export_format); + } + _write_file($str,$export_filename); + } + +} + +sub write_sql_file { + + my ($data,$export_filename,$stmt_format) = @_; + if (defined $export_filename and $stmt_format) { + my $str = ''; + if (ref $data eq 'ARRAY') { + foreach my $obj (@$data) { + $str .= "\n" if length($str); + if (ref $obj eq 'ARRAY') { + $str .= sprintf($stmt_format,@$obj); + } else { + $str .= sprintf($stmt_format,$str); + } + } + } else { + $str = sprintf($stmt_format,$data); + } + $str .= "\n"; + _write_file($str,$export_filename); + } + +} + +sub _write_file { + + my ($str,$export_filename) = @_; + if (defined $export_filename) { + lock $file_lock; + open(my $fh, '>>', $export_filename) or fileerror('cannot open file ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); + binmode($fh); + print $fh $str; + close $fh; + } + +} + +sub update_tabular_fields { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + $tabular_fields = $data; + }; + if ($@ or 'ARRAY' ne ref $tabular_fields) { + $tabular_fields //= []; + configurationerror($configfile,'invalid tabular fields',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + +sub update_load_recursive { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + $load_recursive = $data; + }; + if ($@ or 'HASH' ne ref $load_recursive) { + undef $load_recursive; + configurationerror($configfile,'invalid load recursive def',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + +sub _get_sqlite_db_file { + my ($run,$name) = @_; + return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name; +} + +sub _get_import_filename { + my ($old_value,$data,$key) = @_; + my $import_filename = $old_value; + $import_filename = $data->{$key} if exists $data->{$key}; + if (defined $import_filename and length($import_filename) > 0) { + $import_filename = $input_path . $import_filename unless -e $import_filename; + } + return $import_filename; +} + +sub check_dry { + + if ($dry) { + scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); + return 1; + } else { + scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); + if (!$force) { + if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { + return 1; + } else { + return 0; + } + } else { + scriptinfo('force option applied',getlogger(__PACKAGE__)); + return 1; + } + } + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.cfg new file mode 100644 index 00000000..442b428c --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.cfg @@ -0,0 +1,61 @@ +##general settings: +working_path = /var/sipwise +cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = db01 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = db01 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = db01 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = db01 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = db01 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.debug.cfg new file mode 100644 index 00000000..775204ab --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.debug.cfg @@ -0,0 +1,61 @@ +##general settings: +working_path = /home/rkrenn/temp/customer_exporter +cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.133 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.133 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.133 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.133 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.133 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load_recursive.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load_recursive.yml new file mode 100644 index 00000000..218b9a0c --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load_recursive.yml @@ -0,0 +1,41 @@ +contracts.voip_subscribers: + include: !!perl/code | + { + my ($contract) = @_; + #return 0 if $contract->{status} eq 'terminated'; + return 1; + } + + filter: !!perl/code | + { + my ($bill_subs,$context) = @_; + #_debug($context,"skipping terminated subscriber $bill_subs->{username}") if $bill_subs->{status} eq 'terminated'; + #return 0 if $bill_subs->{status} eq 'terminated'; + return 1; + } + + transform: !!perl/code | + { + my ($bill_subs,$context) = @_; + return $bill_subs; + } + +#contracts.contact: 1 +contracts.voip_subscribers.primary_number: 1 +contracts.voip_subscribers.provisioning_voip_subscriber: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_dbaliases: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences: 1 +#contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.attribute: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.allowed_ips: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.ncos: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping.destinations: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voicemail_users: 1 +#contracts.voip_subscribers.provisioning_voip_subscriber.voicemail_users.voicemail_spool: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_preferences: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_destinations: + transform: !!perl/code | + { + my ($fax_destinations,$context) = @_; + return [ map { $_->{destination} . ' (' . $_->{filetype} . ')'; } @$fax_destinations ]; + } diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl new file mode 100644 index 00000000..59e96fe4 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl @@ -0,0 +1,312 @@ +use strict; + +## no critic + +our $VERSION = "0.0"; + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( + update_settings + update_tabular_fields + $tabular_fields_yml + + update_load_recursive + $load_recursive_yml + + check_dry + $output_path + $defaultsettings + $defaultconfig + $dry + $skip_errors + $force + +); + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); +#use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); +#use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles); + +use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(destroy_all_dbs); + +#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw(); + +use NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers qw( + export_customers_graph + export_customers_tabular +); +#use NGCP::BulkProcessor::Projects::ETL::Customer::ImportCustomers qw( +# import_customers_json +#); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet + +my @TASK_OPTS = (); + +my $tasks = []; + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); + +my $cleanup_all_task_opt = 'cleanup_all'; +push(@TASK_OPTS,$cleanup_all_task_opt); + +#my $export_customers_graph_task_opt = 'export_customers_graph'; +#push(@TASK_OPTS,$export_customers_graph_task_opt); + +my $export_customers_tabular_task_opt = 'export_customers_tabular'; +push(@TASK_OPTS,$export_customers_tabular_task_opt); + +#my $import_customers_json_task_opt = 'import_customers_json'; +#push(@TASK_OPTS,$import_customers_json_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + "dry" => \$dry, + "skip-errors" => \$skip_errors, + "force" => \$force, + ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); + $result &= load_config($tabular_fields_yml,\&update_tabular_fields,$YAML_CONFIG_TYPE); + $result &= load_config($load_recursive_yml,\&update_load_recursive,$YAML_CONFIG_TYPE); + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result); + } elsif (lc($cleanup_all_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); + + #} elsif (lc($export_customers_graph_task_opt) eq lc($task)) { + # $result &= export_customers_graph_task(\@messages) if taskinfo($export_customers_graph_task_opt,$result); + # $completion |= 1; + } elsif (lc($export_customers_tabular_task_opt) eq lc($task)) { + $result &= export_customers_tabular_task(\@messages) if taskinfo($export_customers_tabular_task_opt,$result); + $completion |= 1; + #} elsif (lc($import_customers_json_task_opt) eq lc($task)) { + # if (taskinfo($import_customers_json_task_opt,$result,1)) { + # next unless check_dry(); + # $result &= import_customers_json_task(\@messages); + # $completion |= 1; + # } + + } else { + $result = 0; + scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub cleanup_task { + my ($messages,$clean_generated) = @_; + my $result = 0; + if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { + eval { + #cleanupcvsdirs() if $clean_generated; + cleanupdbfiles() if $clean_generated; + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + #cleanupcertfiles(); + cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + } + if ($@ or !$result) { + #print $@; + push(@$messages,'working directory cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'working directory folders cleaned up'); + return 1; + } +} + +sub export_customers_graph_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = export_customers_graph(); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + #$stats .= "\n total mta subscriber records: " . + # NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows'; + #my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta + #); + #$stats .= "\n new: $added_count rows"; + #my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta + #); + #$stats .= "\n existing: $existing_count rows"; + #my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta + #); + #$stats .= "\n removed: $deleted_count rows"; + }; + if ($err or !$result) { + push(@$messages,"exporting customers (graph) INCOMPLETE$stats"); + } else { + push(@$messages,"exporting customers (graph) completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + +sub export_customers_tabular_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = export_customers_tabular(); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + #$stats .= "\n total mta subscriber records: " . + # NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows'; + #my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta + #); + #$stats .= "\n new: $added_count rows"; + #my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta + #); + #$stats .= "\n existing: $existing_count rows"; + #my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta + #); + #$stats .= "\n removed: $deleted_count rows"; + }; + if ($err or !$result) { + push(@$messages,"exporting customers (tabular) INCOMPLETE$stats"); + } else { + push(@$messages,"exporting customers (tabular) completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + +#sub import_customers_json_task { +# +# my ($messages) = @_; +# my ($result,$warning_count,$contract_read_count,$subscriber_read_count,$contract_created_count,$subscriber_created_count,$contract_failed_count,$subscriber_failed_count) = (0,0,0,0,0,0,0,0); +# eval { +# ($result,$warning_count,$contract_read_count,$subscriber_read_count,$contract_created_count,$subscriber_created_count,$contract_failed_count,$subscriber_failed_count) = import_customers_json(); +# }; +# my $err = $@; +# my $stats = ": $warning_count warnings"; +# eval { +# $stats .= "\n contracts read: " . $contract_read_count; +# $stats .= "\n contracts created: " . $contract_created_count; +# $stats .= "\n contracts failed: " . $contract_failed_count; +# $stats .= "\n subscribers read: " . $subscriber_read_count; +# $stats .= "\n subscribers created: " . $subscriber_created_count; +# $stats .= "\n subscribers failed: " . $subscriber_failed_count; +# }; +# if ($err or !$result) { +# push(@$messages,"importing customers (json) INCOMPLETE$stats"); +# } else { +# push(@$messages,"importing customers (json) completed$stats"); +# } +# destroy_all_dbs(); #every task should leave with closed connections. +# return $result; +# +#} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg new file mode 100644 index 00000000..0c95b848 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg @@ -0,0 +1,59 @@ +#dry=0 +#skip_errors=0 + +schema_version = Trunk + +export_customers_multithreading = 1 +export_customers_numofthreads = 4 +export_customers_blocksize = 1000 + +customer_export_filename=customer_%s.json +#customer_import_filename=customer_20210216173615.json + +#split_customers = 1 + +#customer_import_multithreading = 1 +#customer_import_numofthreads = 4 + +#customer_reseller_name = default +#customer_billing_profile_name = Default Billing Profile +#customer_domain = test1610072315.example.org + +#customer_contact_email_format = DN0%2$s%3$s@example.org +#customer_timezone = Europe/Vienna + +#subscriber_profile_set_name = subscriber_profile_1_set_65261 +#subscriber_profile_name = subscriber_profile_1_65261 + +# sip username as webusername: +#webusername_format = %1$s +# webusername = cc+ac+sn: +#webusername_format = %2$s%3$s%4$s +# webusername = 0+ac+sn: +webusername_format = 0%3$s%4$s + +# sip username as external_id: +#subscriber_externalid_format = %1$s +# external_id = cc+ac+sn: +#subscriber_externalid_format = %2$s%3$s%4$s +# external_id = 0+ac+sn: +subscriber_externalid_format = 0%3$s%4$s + +# subscriber contact will be created, only if one of below is set. +subscriber_contact_email_format = DN0%2$s%3$s@domain.org +subscriber_timezone = Europe/Vienna + +tabular_single_row_txn = 1 +ignore_tabular_unique = 0 +tabular_fields_yml = tabular_fields.yml +load_recursive_yml = load_recursive.yml +sqlite_db_file = sqlite + +cf_default_priority: 1 +cf_default_timeout: 300 +cft_default_ringtimeout: 20 + +#write sql files for legacy db to set/unset the is_external pref of migrated subscribers: + +rollback_sql_export_filename_format = delete_subscribers_%s.sql +rollback_sql_stmt_format = start transaction;call billing.remove_subscriber("%1$s",%2$s);commit; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular_fields.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular_fields.yml new file mode 100644 index 00000000..0c94704f --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular_fields.yml @@ -0,0 +1,64 @@ +- path: contract.id +- path: id +- path: username +- path: primary_number.cc +- path: primary_number.ac +- path: primary_number.sn +- path: provisioning_voip_subscriber.voicemail_users[0].attach +- path: provisioning_voip_subscriber.voicemail_users[0].delete +- path: provisioning_voip_subscriber.voicemail_users[0].email +- path: provisioning_voip_subscriber.voicemail_users[0].password +- path: provisioning_voip_subscriber.voip_usr_preferences.allowed_clis + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.allowed_ips_grp[0].allowed_ips + sep: ',' + field: 'ipnet' +- path: provisioning_voip_subscriber.voip_usr_preferences.block_out_list + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.block_out_mode[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.block_in_list + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.block_in_mode[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_in_list + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_in_mode[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_out_list + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_out_mode[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.ncos_id[0].ncos.level +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_ncos_id[0].ncos.level +- path: provisioning_voip_subscriber.voip_usr_preferences.cfb[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfna[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfo[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfr[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfs[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cft[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfu[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_fax_preferences.active +- path: provisioning_voip_subscriber.voip_fax_preferences.ecm +- path: provisioning_voip_subscriber.voip_fax_preferences.name +- path: provisioning_voip_subscriber.voip_fax_preferences.t38 +- path: provisioning_voip_subscriber.voip_fax_destinations + sep: ',' +- path: provisioning_voip_subscriber.voip_usr_preferences.force_inbound_calls_to_peer[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.lnp_for_local_sub[0].value + diff --git a/lib/NGCP/BulkProcessor/SqlRecord.pm b/lib/NGCP/BulkProcessor/SqlRecord.pm index 7c22732b..ac133490 100644 --- a/lib/NGCP/BulkProcessor/SqlRecord.pm +++ b/lib/NGCP/BulkProcessor/SqlRecord.pm @@ -9,6 +9,8 @@ use NGCP::BulkProcessor::Table qw(get_rowhash); use NGCP::BulkProcessor::SqlProcessor qw(init_record); +use NGCP::BulkProcessor::Utils qw(load_module); + require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw(); @@ -44,4 +46,61 @@ sub gethash { return get_rowhash(\@fieldvalues); } +sub load_relation { + my $self = shift; + my ($load_recursive,$relation,$findby,@findby_args) = @_; + if ($load_recursive and 'HASH' eq ref $load_recursive and length($relation)) { + my $relation_path; + my $relation_path_backup = $load_recursive->{_relation_path}; + if (length($relation_path_backup)) { + $relation_path = $relation_path_backup . '.' . $relation; + } else { + no strict "refs"; ## no critic (ProhibitNoStrict) + $relation_path = ((ref $self) . '::gettablename')->() . '.' . $relation; + } + my $include = $load_recursive->{$relation_path}; + my $filter; + my $transfrom; + if ('HASH' eq ref $include) { + $filter = $include->{filter}; + $transfrom = $include->{transform}; + if (exists $include->{include}) { + $include = $include->{include}; + } elsif ($transfrom or $filter) { + $include = 1; + } + } + if (('CODE' eq ref $include and $include->($self)) + or (not ref $include and $include)) { + load_module($findby); + no strict "refs"; ## no critic (ProhibitNoStrict) + $load_recursive->{_relation_path} = $relation_path; + $self->{$relation} = $findby->(@findby_args); + if ('ARRAY' eq ref $self->{$relation} + and 'CODE' eq ref $filter) { + my $closure = _closure($filter,$load_recursive->{_context}); + $self->{$relation} = [ grep { $closure->($_); } @{$self->{$relation}}]; + } + if ('CODE' eq ref $transfrom) { + my $closure = _closure($transfrom,$load_recursive->{_context}); + $self->{$relation} = $closure->($self->{$relation}); + } + $load_recursive->{_relation_path} = $relation_path_backup; + return 1; + } + } + return 0; +} + +sub _closure { + my ($sub,$context) = @_; + return sub { + foreach my $key (keys %$context) { + no strict "refs"; ## no critic (ProhibitNoStrict) + *{"main::$key"} = $context->{$key} if 'CODE' eq ref $context->{$key}; + } + return $sub->(@_,$context); + }; +} + 1; diff --git a/lib/NGCP/BulkProcessor/Utils.pm b/lib/NGCP/BulkProcessor/Utils.pm index 0fd8237e..2d01b25e 100644 --- a/lib/NGCP/BulkProcessor/Utils.pm +++ b/lib/NGCP/BulkProcessor/Utils.pm @@ -114,6 +114,8 @@ our @EXPORT_OK = qw( unshare run + + load_module ); our $chmod_umask = 0777; @@ -1083,4 +1085,17 @@ sub run { } +sub load_module { + my $package_element = shift; + eval { + (my $module = $package_element) =~ s/::[a-zA-Z_0-9]+$//g; + (my $file = $module) =~ s|::|/|g; + require $file . '.pm'; + #$module->import(); + 1; + } or do { + die($@); + }; +} + 1;