diff --git a/lib/NGCP/BulkProcessor/ConnectorPool.pm b/lib/NGCP/BulkProcessor/ConnectorPool.pm index 0defac6..c708bfa 100755 --- a/lib/NGCP/BulkProcessor/ConnectorPool.pm +++ b/lib/NGCP/BulkProcessor/ConnectorPool.pm @@ -5,6 +5,8 @@ use strict; use NGCP::BulkProcessor::Globals qw( + $rowblock_transactional + $accounting_databasename $accounting_username $accounting_password @@ -112,7 +114,7 @@ sub get_accounting_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $accounting_dbs->{$name}) { - $accounting_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name); + $accounting_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } @@ -138,7 +140,7 @@ sub get_billing_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $billing_dbs->{$name}) { - $billing_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name); + $billing_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } @@ -163,7 +165,7 @@ sub get_provisioning_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $provisioning_dbs->{$name}) { - $provisioning_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name); + $provisioning_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } @@ -188,7 +190,7 @@ sub get_kamailio_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $kamailio_dbs->{$name}) { - $kamailio_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name); + $kamailio_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } @@ -214,7 +216,7 @@ sub get_xa_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $xa_dbs->{$name}) { - $xa_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($instance_name); + $xa_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm index 899209e..0c82438 100755 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm @@ -1,6 +1,7 @@ package NGCP::BulkProcessor::Dao::Trunk::accounting::cdr; use strict; +#use Tie::IxHash; ## no critic use NGCP::BulkProcessor::Logging qw( @@ -10,6 +11,7 @@ use NGCP::BulkProcessor::Logging qw( use NGCP::BulkProcessor::ConnectorPool qw( get_accounting_db + destroy_dbs ); use NGCP::BulkProcessor::SqlProcessor qw( @@ -17,9 +19,13 @@ use NGCP::BulkProcessor::SqlProcessor qw( insert_record update_record copy_row + + process_table ); use NGCP::BulkProcessor::SqlRecord qw(); +use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data qw(); + require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); our @EXPORT_OK = qw( @@ -32,6 +38,9 @@ our @EXPORT_OK = qw( delete_callids countby_ratingstatus + findby_callidprefix + process_unexported + ); #process_records #delete_ids @@ -128,6 +137,12 @@ my $expected_fieldnames = [ "export_status", ]; +my @callid_suffixes = (); +our $PBXSUFFIX = '_pbx-1'; +push(@callid_suffixes,$PBXSUFFIX); +our $XFERSUFFIX = '_xfer-1'; +push(@callid_suffixes,$XFERSUFFIX); + my $indexes = {}; my $insert_unique_fields = []; @@ -192,6 +207,31 @@ sub countby_ratingstatus { } +sub findby_callidprefix { + + my ($xa_db,$call_id,$joins,$conditions,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $suffixre = '(' . join('|', map { quotemeta($_); } @callid_suffixes) . ')+$'; + $call_id =~ s/$suffixre//g; + $call_id =~ s/%/\\%/g; + + my @conditions = @{$conditions // []}; + push(@conditions,{ $table . '.call_id' => { 'LIKE' => '?' } }); + my $stmt = 'SELECT ' . join(',', map { $table . '.' . $db->columnidentifier($_); } @$expected_fieldnames) . ' ' . + _get_export_stmt($db,undef,$joins,\@conditions) . + ' ORDER BY LENGTH(' . $table . '.call_id' . ') ASC, ' . $table . '.start_time ASC'; + my @params = ($call_id . '%'); + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub update_row { my ($xa_db,$data) = @_; @@ -205,54 +245,111 @@ sub insert_row { my $db = &$get_db(); my $xa_db = shift // $db; - if ('HASH' eq ref $_[0]) { - my ($data,$insert_ignore) = @_; - check_table(); - if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { - return $xa_db->db_last_insert_id(); - } - } else { - #my %params = @_; - #my ($contract_id, - # $domain_id, - # $username, - # $uuid) = @params{qw/ - # contract_id - # domain_id - # username - # uuid - # /}; - # - #if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . - # $db->columnidentifier('contact_id') . ', ' . - # $db->columnidentifier('contract_id') . ', ' . - # $db->columnidentifier('domain_id') . ', ' . - # $db->columnidentifier('external_id') . ', ' . - # $db->columnidentifier('primary_number_id') . ', ' . - # $db->columnidentifier('status') . ', ' . - # $db->columnidentifier('username') . ', ' . - # $db->columnidentifier('uuid') . ') VALUES (' . - # 'NULL, ' . - # '?, ' . - # '?, ' . - # 'NULL, ' . - # 'NULL, ' . - # '\'' . $ACTIVE_STATE . '\', ' . - # '?, ' . - # '?)', - # $contract_id, - # $domain_id, - # $username, - # $uuid, - # )) { - # rowinserted($db,$tablename,getlogger(__PACKAGE__)); - # return $xa_db->db_last_insert_id(); - #} + + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); } return undef; } +sub process_unexported { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $blocksize, + $joins, + $conditions, + #$sort, + $limit) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + blocksize + joins + conditions + limit + /}; + #sort + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = _get_export_stmt($db,$static_context,$joins,$conditions); + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + #my %cdr_id_map = (); + #tie(%cdr_id_map, 'Tie::IxHash'); + #if ($rowblock) { + # foreach my $record (@$rowblock) { + # $cdr_id_map{$record->[0]} = $record->[1]; + # } + #} + #return 0 if $row_offset >= $limit; + return &$process_code($context,$rowblock,$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + blocksize => $blocksize, + #select => $db->paginate_sort_query('SELECT ' . $table . '.* ' . $stmt,undef,undef,$sort), + select => 'SELECT ' . $table . '.' . $db->columnidentifier('id') . ', ' . $table . '.' . $db->columnidentifier('call_id') . + ' ' . $stmt . ' ORDER BY ' . $table . '.' . $db->columnidentifier('id'), + selectcount => 'SELECT COUNT(1) FROM (' . $db->paginate_sort_query('SELECT 1 ' . $stmt,0,$limit,undef) . ') AS __cnt', + ); +} + +sub _get_export_stmt { + + my ($db,$static_context,$joins,$conditions) = @_; + + my $table = $db->tableidentifier($tablename); + + my $stmt = "FROM " . $table; + my @intjoins = (); + if (defined $joins and (scalar @$joins) > 0) { + foreach my $f (@$joins) { + my ($table, $keys) = %{ $f }; + my ($foreign_key, $own_key) = %{ $keys }; + push @intjoins, "LEFT JOIN $table ON $foreign_key = $own_key"; + } + } + my @conds = (); + if (defined $static_context and $static_context->{export_status_id}) { + push @intjoins, 'LEFT JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::gettablename()) . ' AS __cesd ON __cesd.cdr_id = ' . $table . '.id AND __cesd.status_id = ' . $static_context->{export_status_id}; + push @conds, '(__cesd.export_status = "' . $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::UNEXPORTED . '" OR __cesd.export_status IS NULL)'; + } + $stmt .= " " . join(" ", @intjoins) if (scalar @intjoins) > 0; + if (defined $conditions and (scalar @$conditions) > 0) { + foreach my $f (@$conditions) { + my ($field, $match) = %{ $f }; + my ($op, $val) = %{ $match }; + push @conds, "$field $op $val"; + } + } + $stmt .= " WHERE " . join(" AND ", @conds) if (scalar @conds) > 0; + return $stmt; + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status.pm new file mode 100644 index 0000000..aafe4b6 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status.pm @@ -0,0 +1,124 @@ +package NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_accounting_db + destroy_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row + +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + findall + findby_type +); + +my $tablename = 'cdr_export_status'; +my $get_db = \&get_accounting_db; + +my $expected_fieldnames = [ +"id", +"type", +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findall { + + my ($load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my $rows = $db->db_get_all_arrayref($stmt); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_type { + + my ($type,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('type') . ' = ?'; + my @params = ($type); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm new file mode 100644 index 0000000..517433a --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm @@ -0,0 +1,204 @@ +package NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowsdeleted + rowinserted + rowupserted + rowupdated +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_accounting_db + destroy_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_record + update_record + copy_row + +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + update_row + insert_row + upsert_row + + $UNEXPORTED + $EXPORTED +); + +our $UNEXPORTED = 'unexported'; +our $EXPORTED = 'exported'; + +my $tablename = 'cdr_export_status_data'; +my $get_db = \&get_accounting_db; + +my $expected_fieldnames = [ +"cdr_id", +"status_id", +"exported_at", +"export_status", +"cdr_start_time", +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub update_row { + + my ($xa_db,$data) = @_; + + check_table(); + return update_record($get_db,$xa_db,__PACKAGE__,$data); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($cdr_id, + $status_id, + $export_status, + $cdr_start_time) = @params{qw/ + cdr_id + status_id + export_status + cdr_start_time + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('cdr_id') . ', ' . + $db->columnidentifier('status_id') . ', ' . + $db->columnidentifier('export_status') . ', ' . + $db->columnidentifier('exported_at') . ', ' . + $db->columnidentifier('cdr_start_time') . ') VALUES (' . + '?, ' . + '?, ' . + '?, ' . + 'NOW(), ' . + '?)', + $cdr_id, + $status_id, + $export_status, + $cdr_start_time, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return 1; + } + } + return undef; + +} + +sub upsert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + + my %params = @_; + my ($cdr_id, + $status_id, + $export_status, + $cdr_start_time) = @params{qw/ + cdr_id + status_id + export_status + cdr_start_time + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('cdr_id') . ', ' . + $db->columnidentifier('status_id') . ', ' . + $db->columnidentifier('export_status') . ', ' . + $db->columnidentifier('exported_at') . ', ' . + $db->columnidentifier('cdr_start_time') . ') VALUES (' . + '?, ' . + '?, ' . + '?, ' . + 'NOW(), ' . + '?) ON DUPLICATE KEY UPDATE export_status = ?, exported_at = NOW()', + $cdr_id, + $status_id, + $export_status, + $cdr_start_time, + $export_status, + )) { + rowupserted($db,$tablename,getlogger(__PACKAGE__)); + return 1; + } + + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Globals.pm b/lib/NGCP/BulkProcessor/Globals.pm index 872611e..734e66b 100755 --- a/lib/NGCP/BulkProcessor/Globals.pm +++ b/lib/NGCP/BulkProcessor/Globals.pm @@ -51,6 +51,8 @@ our @EXPORT_OK = qw( $LongReadLen_limit $transfer_defer_indexes + $rowblock_transactional + $accounting_databasename $accounting_username $accounting_password @@ -164,7 +166,7 @@ our $LongReadLen_limit = 128*1024; #longest LOB field size in bytes our $appstartsecs = Time::HiRes::time(); - +our $rowblock_transactional = undef; #connector default our $accounting_databasename = 'accounting'; our $accounting_username = 'root'; @@ -324,7 +326,7 @@ sub update_masterconfig { } $cells_transfer_memory_limit = $data->{cells_transfer_memory_limit} if exists $data->{cells_transfer_memory_limit}; $transfer_defer_indexes = $data->{transfer_defer_indexes} if exists $data->{transfer_defer_indexes}; - + $rowblock_transactional = $data->{rowblock_transactional} if exists $data->{rowblock_transactional}; if (defined $split_tuplecode and ref $split_tuplecode eq 'CODE') { @jobservers = &$split_tuplecode($data->{jobservers}) if exists $data->{jobservers}; diff --git a/lib/NGCP/BulkProcessor/Logging.pm b/lib/NGCP/BulkProcessor/Logging.pm index 02c3481..6f05a3f 100755 --- a/lib/NGCP/BulkProcessor/Logging.pm +++ b/lib/NGCP/BulkProcessor/Logging.pm @@ -52,6 +52,7 @@ our @EXPORT_OK = qw( rowtransferred rowinserted + rowupserted rowupdated rowsdeleted totalrowsdeleted @@ -479,6 +480,15 @@ sub rowinserted { } +sub rowupserted { + + my ($db,$tablename,$logger) = @_; + if (defined $logger) { + $logger->debug(_getsqlconnectorinstanceprefix($db) . 'row upserted'); + } + +} + sub rowupdated { my ($db,$tablename,$logger) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm new file mode 100644 index 0000000..893e48d --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm @@ -0,0 +1,216 @@ +package NGCP::BulkProcessor::Projects::Export::Ama::CDR; +use strict; + +## no critic + +use threads::shared qw(); +#use Time::HiRes qw(sleep); +#use String::MkPasswd qw(); +#use List::Util qw(); +#use Data::Rmap qw(); + +#use Tie::IxHash; + +#use NGCP::BulkProcessor::Globals qw( +# $enablemultithreading +#); + +use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw( + $dry + $skip_errors + + $export_cdr_multithreading + $export_cdr_blocksize + $export_cdr_joins + $export_cdr_conditions + $export_cdr_limit + $export_cdr_stream +); +#$deadlock_retries +#@providers +#$generate_cdr_numofthreads +#$generate_cdr_count + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn + fileerror +); + +use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); +use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status qw(); +use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data qw(); + +#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); + +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_xa_db + destroy_dbs +); +#ping_dbs + +#use NGCP::BulkProcessor::Utils qw(threadid timestamp); # stringtobool check_ipnet trim); +##use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); +##use NGCP::BulkProcessor::RandomString qw(createtmpstring); +#use NGCP::BulkProcessor::Array qw(array_to_map); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + export_cdrs + +); + +sub export_cdrs { + + my $static_context = {}; + my $result = _export_cdrs_create_context($static_context); + + destroy_dbs(); + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::process_unexported( + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + my $rownum = $row_offset; + my %cdr_id_map = map { $_->[0] => $_->[1]; } @$records; + foreach my $record (@$records) { + return 0 if (defined $export_cdr_limit and $rownum >= $export_cdr_limit); + my ($id,$call_id) = @$record; + next unless exists $cdr_id_map{$id}; + + my %dropped = (); + eval { + $context->{db}->db_begin(); + my $cdrs = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::findby_callidprefix($context->{db}, + $call_id,$export_cdr_joins,$export_cdr_conditions); + + #todo: write. + + foreach my $cdr (@$cdrs) { + #mark exported + NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::upsert_row($context->{db}, + cdr_id => $cdr->{id}, + status_id => $context->{export_status_id}, + export_status => $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::EXPORTED, + cdr_start_time => $cdr->{start_time}, + ); + _info($context,"export_status set for cdr id $cdr->{id}",1); + $dropped{$cdr->{id}} = delete $cdr_id_map{$cdr->{id}}; + $rownum++; + } + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + foreach (keys %dropped) { + $cdr_id_map{$_} = $dropped{$_}; + } + }; + if ($skip_errors) { + _warn($context,"problem while exporting call id $call_id (cdr id $id): " . $err); + } else { + _error($context,"problem while exporting call id $call_id (cdr id $id): " . $err); + } + } + + } + + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + # below is not mandatory.. + #_check_insert_tables(); + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + load_recursive => 0, + blocksize => $export_cdr_blocksize, + multithreading => $export_cdr_multithreading, + numofthreads => 1, + joins => $export_cdr_joins, + conditions => $export_cdr_conditions, + #sort => [{ column => 'id', numeric => 1, dir => 1 }], + limit => $export_cdr_limit, + ),$warning_count); +} + +sub _export_cdrs_create_context { + + my ($context) = @_; + + my $result = 1; + + my $export_status; + eval { + if ($export_cdr_stream) { + $export_status = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status::findby_type($export_cdr_stream); + } + $context->{export_status_id} = $export_status->{id} if $export_status; + }; + if ($@ or ($export_cdr_stream and not $export_status)) { + _error($context,"cannot find export stream '$export_cdr_stream'"); + $result = 0; + } elsif ($export_status) { + _info($context,"export stream '$export_cdr_stream' set"); + } + + return $result; +} + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid},$message,getlogger(__PACKAGE__)); + } + +} + +1; \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm new file mode 100644 index 0000000..b73fdea --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm @@ -0,0 +1,206 @@ +package NGCP::BulkProcessor::Projects::Export::Ama::Settings; +use strict; + +## no critic + +use NGCP::BulkProcessor::Globals qw( + $working_path + $enablemultithreading + $cpucount + create_path +); + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + filewarn + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); +use NGCP::BulkProcessor::Utils qw(prompt timestampdigits); +#format_number check_ipnet + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + + check_dry + + $input_path + $output_path + + $defaultsettings + $defaultconfig + + $dry + $skip_errors + $force + + $export_cdr_multithreading + $export_cdr_blocksize + $export_cdr_joins + $export_cdr_conditions + $export_cdr_limit + $export_cdr_stream +); +#update_provider_config +#$deadlock_retries +#$generate_cdr_count + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.cfg'; + +our $input_path = $working_path . 'input/'; +our $output_path = $working_path . 'output/'; + +our $force = 0; +our $dry = 0; +our $skip_errors = 0; + +our $export_cdr_multithreading = $enablemultithreading; +our $export_cdr_blocksize = undef; +our $export_cdr_joins = []; +our $export_cdr_conditions = []; +our $export_cdr_limit = undef; +our $export_cdr_stream = undef; + + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + #&$configurationinfocode("testinfomessage",$configlogger); + + $result &= _prepare_working_paths(1); + #if ($data->{report_filename}) { + # $report_filename = $output_path . sprintf('/' . $data->{report_filename},timestampdigits()); + # if (-e $report_filename and (unlink $report_filename) == 0) { + # filewarn('cannot remove ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__)); + # $report_filename = undef; + # } + #} else { + # $report_filename = undef; + #} + + $dry = $data->{dry} if exists $data->{dry}; + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + + $export_cdr_multithreading = $data->{export_cdr_multithreading} if exists $data->{export_cdr_multithreading}; + $export_cdr_blocksize = $data->{export_cdr_blocksize} if exists $data->{export_cdr_blocksize}; + + my $parse_result; + ($parse_result,$export_cdr_joins) = _parse_export_joins($data->{export_cdr_joins},$configfile); + ($parse_result,$export_cdr_conditions) = _parse_export_joins($data->{export_cdr_conditions},$configfile); + + $export_cdr_limit = $data->{export_cdr_limit} if exists $data->{export_cdr_limit}; + $export_cdr_stream = $data->{export_cdr_stream} if exists $data->{export_cdr_stream}; + + #if ((confval("MAINTENANCE") // 'no') eq 'yes') { + # exit(0); + #} + + return $result; + + } + return 0; + +} + +sub _prepare_working_paths { + + my ($create) = @_; + my $result = 1; + my $path_result; + + ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + + return $result; + +} + +sub _parse_export_joins { + my ($token,$file) = @_; + my @joins = (); + if (defined $token and length($token) > 0) { + foreach my $f (split_tuple($token)) { + next unless($f); + $f =~ s/^\s*\{?\s*//; + $f =~ s/\}\s*\}\s*$/}/; + my ($a, $b) = split(/\s*=>\s*{\s*/, $f); + $a =~ s/^\s*\'//; + $a =~ s/\'$//g; + $b =~ s/\s*\}\s*$//; + my ($c, $d) = split(/\s*=>\s*/, $b); + $c =~ s/^\s*\'//g; + $c =~ s/\'\s*//; + $d =~ s/^\s*\'//g; + $d =~ s/\'\s*//; + push @joins, { $a => { $c => $d } }; + } + } + return (1,\@joins); +} + +sub _parse_export_conditions { + my ($token,$file) = @_; + my @conditions = (); + if (defined $token and length($token) > 0) { + foreach my $f (split_tuple($token)) { + next unless($f); + $f =~ s/^\s*\{?\s*//; + $f =~ s/\}\s*\}\s*$/}/; + my ($a, $b) = split(/\s*=>\s*{\s*/, $f); + $a =~ s/^\s*\'//; + $a =~ s/\'$//g; + $b =~ s/\s*\}\s*$//; + my ($c, $d) = split(/\s*=>\s*/, $b); + $c =~ s/^\s*\'//g; + $c =~ s/\'\s*//; + $d =~ s/^\s*\'//g; + $d =~ s/\'\s*//; + push @conditions, { $a => { $c => $d } }; + } + } + return (1,\@conditions); +} + +sub check_dry { + + if ($dry) { + scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); + return 1; + } else { + scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); + if (!$force) { + if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { + return 1; + } else { + return 0; + } + } else { + scriptinfo('force option applied',getlogger(__PACKAGE__)); + return 1; + } + } + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.cfg new file mode 100755 index 0000000..c255d38 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.cfg @@ -0,0 +1,64 @@ +##general settings: +working_path = /var/sipwise +#cpucount = 4 +enablemultithreading = 1 + +rowblock_transactional = 0 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = localhost +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = localhost +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = localhost +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = localhost +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = localhost +xa_port = 3306 +xa_databasename = billing +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://10.0.2.15:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +#INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.debug.cfg new file mode 100755 index 0000000..b245a72 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.debug.cfg @@ -0,0 +1,64 @@ +##general settings: +working_path = /home/rkrenn/temp/export +#cpucount = 4 +enablemultithreading = 1 + +rowblock_transactional = 0 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.29 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.29 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.29 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.29 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.29 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +#INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl b/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl new file mode 100644 index 0000000..b80b0ae --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl @@ -0,0 +1,232 @@ +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw( + update_settings + check_dry + $output_path + $defaultsettings + $defaultconfig + $dry + $skip_errors + $force + +); +#@provider_config +#@providers +#$providers_yml + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); +use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles); + +use NGCP::BulkProcessor::ConnectorPool qw(destroy_dbs); + +#use NGCP::BulkProcessor::Projects::Massive::Generator::Dao::Blah qw(); + +#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); + +use NGCP::BulkProcessor::Projects::Export::Ama::CDR qw( + export_cdrs +); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet + +my @TASK_OPTS = (); + +my $tasks = []; + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); +my $cleanup_all_task_opt = 'cleanup_all'; +push(@TASK_OPTS,$cleanup_all_task_opt); + +my $export_cdr_task_opt = 'export_cdr'; +push(@TASK_OPTS,$export_cdr_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + #"run=s" => \$run_id, + "dry" => \$dry, + "skip-errors" => \$skip_errors, + "force" => \$force, + ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); + #$result &= load_config($providers_yml,\&update_provider_config,$YAML_CONFIG_TYPE); + + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result); + } elsif (lc($cleanup_all_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); + + } elsif (lc($export_cdr_task_opt) eq lc($task)) { + if (taskinfo($export_cdr_task_opt,$result,1)) { + next unless check_dry(); + $result &= export_cdr_task(\@messages); + $completion |= 1; + } + + + #} elsif (lc($provision_subscriber_task_opt) eq lc($task)) { + # if (taskinfo($provision_subscriber_task_opt,$result,1)) { + # next unless check_dry(); + # $result &= provision_subscriber_task(\@messages); + # $completion |= 1; + # } + + #} elsif (lc($generate_cdr_task_opt) eq lc($task)) { + # if (taskinfo($generate_cdr_task_opt,$result,1)) { + # next unless check_dry(); + # $result &= generate_cdr_task(\@messages); + # $completion |= 1; + # } + + } else { + $result = 0; + scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub cleanup_task { + my ($messages,$clean_generated) = @_; + my $result = 0; + if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { + eval { + cleanupcvsdirs() if $clean_generated; + cleanupdbfiles() if $clean_generated; + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + cleanupcertfiles(); + cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + } + if ($@ or !$result) { + push(@$messages,'working directory cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'working directory folders cleaned up'); + return 1; + } +} + +sub export_cdr_task { + + my ($messages) = @_; + my ($result) = (0); + eval { + ($result) = export_cdrs(); + }; + my $err = $@; + my $stats = ":"; + eval { + #stats .= "\n total CDRs: " . + # NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::countby_ratingstatus(undef) . ' rows'; + }; + if ($err or !$result) { + push(@$messages,"export cdrs INCOMPLETE$stats"); + } else { + push(@$messages,"export cdrs completed$stats"); + } + destroy_dbs(); + return $result; + +} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg new file mode 100755 index 0000000..9a37ddd --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg @@ -0,0 +1,11 @@ + +#dry=0 +#skip_errors=0 + +providers_yml = providers.yml + +export_cdr_multithreading = 1 +export_cdr_blocksize = 1000 +export_cdr_joins = { 'billing.voip_subscribers source_voip_subscribers' => { 'source_voip_subscribers.uuid' => 'accounting.cdr.source_user_id' } }, { 'billing.voip_subscribers destination_voip_subscribers' => { 'destination_voip_subscribers.uuid' => 'accounting.cdr.destination_user_id' } }, { 'billing.billing_zones_history source_carrier_bbz' => { 'source_carrier_bbz.id' => 'accounting.cdr.source_carrier_billing_zone_id' } }, { 'billing.billing_zones_history source_reseller_bbz' => { 'source_reseller_bbz.id' => 'accounting.cdr.source_reseller_billing_zone_id' } }, { 'billing.billing_zones_history source_customer_bbz' => { 'source_customer_bbz.id' => 'accounting.cdr.source_customer_billing_zone_id' } }, { 'billing.billing_zones_history destination_carrier_bbz' => { 'destination_carrier_bbz.id' => 'accounting.cdr.destination_carrier_billing_zone_id' } }, { 'billing.billing_zones_history destination_reseller_bbz' => { 'destination_reseller_bbz.id' => 'accounting.cdr.destination_reseller_billing_zone_id' } }, { 'billing.billing_zones_history destination_customer_bbz' => { 'destination_customer_bbz.id' => 'accounting.cdr.destination_customer_billing_zone_id' } } +export_cdr_conditions = { 'accounting.cdr.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } } +export_cdr_limit = 300000 diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg new file mode 100755 index 0000000..7d4cb03 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg @@ -0,0 +1,15 @@ + +#dry=0 +#skip_errors=0 + +providers_yml = providers.yml + +export_cdr_multithreading = 1 +export_cdr_blocksize = 1000 +#export_cdr_joins = { 'accounting.cdr_export_status_data esd' => { 'esd.cdr_id' => 'accounting.cdr.id' } }, { 'accounting.cdr_export_status es' => { 'es.id' => 'esd.status_id' } } +#export_cdr_conditions = { 'es.type' => { '=' => '"default"' } }, { 'esd.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } } +#, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } } +export_cdr_conditions = { 'accounting.cdr.call_status' => { '=' => '"ok"' } } +export_cdr_stream = default +#export_cdr_limit = 3500 +#300000 diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm index 7b255ec..8253fbe 100755 --- a/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm @@ -51,7 +51,7 @@ my $net_read_timeout = 300; #my $lock_do_chunk = 0; #1; #my $lock_get_chunk = 0; -my $rowblock_transactional = 1; +my $default_rowblock_transactional = 1; my $serialization_level = ''; #'SERIALIZABLE' @@ -60,9 +60,11 @@ our $READ_COMMITTED = 'READ COMMITTED'; sub new { my $class = shift; - + my $rowblock_transactional = shift; my $self = NGCP::BulkProcessor::SqlConnector->new(@_); + $self->{rowblock_transactional} = $rowblock_transactional // $default_rowblock_transactional; + $self->{host} = undef; $self->{port} = undef; $self->{databasename} = undef; @@ -455,7 +457,7 @@ sub multithreading_supported { sub rowblock_transactional { my $self = shift; - return $rowblock_transactional; + return $self->{rowblock_transactional}; } @@ -536,7 +538,7 @@ sub db_do_begin { my $query = shift; #my $tablename = shift; - $self->SUPER::db_do_begin($query,$rowblock_transactional,@_); + $self->SUPER::db_do_begin($query,$self->{rowblock_transactional},@_); } @@ -547,7 +549,7 @@ sub db_get_begin { #my $tablename = shift; #my $lock = shift; - $self->SUPER::db_get_begin($query,$rowblock_transactional,@_); + $self->SUPER::db_get_begin($query,$self->{rowblock_transactional},@_); } @@ -557,7 +559,7 @@ sub db_finish { #my $unlock = shift; my $rollback = shift; - $self->SUPER::db_finish($rowblock_transactional,$rollback); + $self->SUPER::db_finish($self->{rowblock_transactional},$rollback); }