You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1707 lines
68 KiB
1707 lines
68 KiB
package NGCP::BulkProcessor::SqlProcessor;
|
|
use strict;
|
|
|
|
## no critic
|
|
|
|
use threads qw(yield);
|
|
use threads::shared;
|
|
use Thread::Queue;
|
|
#use Thread::Semaphore;
|
|
|
|
use Time::HiRes qw(sleep);
|
|
|
|
use NGCP::BulkProcessor::Globals qw(
|
|
$enablemultithreading
|
|
$cpucount
|
|
get_threadqueuelength
|
|
$cells_transfer_memory_limit
|
|
$transfer_defer_indexes
|
|
);
|
|
|
|
use NGCP::BulkProcessor::Logging qw(
|
|
getlogger
|
|
fieldnamesacquired
|
|
primarykeycolsacquired
|
|
tableinfoscleared
|
|
|
|
tablefixed
|
|
|
|
tabletransferstarted
|
|
tableprocessingstarted
|
|
|
|
rowtransferstarted
|
|
rowtransferred
|
|
rowinserted
|
|
rowupdated
|
|
rowsdeleted
|
|
totalrowsdeleted
|
|
rowinsertskipped
|
|
rowupdateskipped
|
|
tabletransferdone
|
|
tableprocessingdone
|
|
rowtransferdone
|
|
|
|
fetching_rows
|
|
writing_rows
|
|
processing_rows
|
|
|
|
tablethreadingdebug
|
|
enable_threading_info
|
|
);
|
|
|
|
use NGCP::BulkProcessor::LogError qw(
|
|
fieldnamesdiffer
|
|
transferzerorowcount
|
|
processzerorowcount
|
|
deleterowserror
|
|
tabletransferfailed
|
|
tableprocessingfailed
|
|
);
|
|
|
|
use NGCP::BulkProcessor::Table qw();
|
|
use NGCP::BulkProcessor::Array qw(setcontains contains);
|
|
use NGCP::BulkProcessor::Utils qw(round threadid);
|
|
|
|
require Exporter;
|
|
our @ISA = qw(Exporter);
|
|
our @EXPORT_OK = qw(
|
|
init_record
|
|
copy_row
|
|
transfer_table
|
|
process_table
|
|
cleartableinfo
|
|
checktableinfo
|
|
registertableinfo
|
|
create_targettable
|
|
insert_record
|
|
update_record
|
|
delete_record
|
|
delete_records
|
|
|
|
insert_stmt
|
|
is_shutdown
|
|
);
|
|
#transfer_record
|
|
#transfer_records
|
|
|
|
my $table_names = {};
|
|
my $table_expected_fieldnames = {};
|
|
my $table_fieldnames_cached = {};
|
|
my $table_primarykeys = {};
|
|
my $table_target_indexes = {};
|
|
|
|
#my $logger = getlogger(__PACKAGE__);
|
|
|
|
my $tabletransfer_threadqueuelength = 5; #100; #30; #5; # ... >= 1
|
|
my $minblocksize = 100;
|
|
my $maxblocksize = 100000;
|
|
my $minnumberofchunks = 10;
|
|
|
|
my $tableprocessing_threadqueuelength = 10;
|
|
#my $tableprocessing_threads = $cpucount; #3;
|
|
|
|
my $reader_name = 'reader';
|
|
my $writer_name = 'writer';
|
|
|
|
my $thread_sleep_secs = 0.1;
|
|
my $loop_sleep_secs = 1;
|
|
|
|
my $RUNNING = 1;
|
|
my $COMPLETED = 2;
|
|
my $ERROR = 4;
|
|
my $STOP = 8;
|
|
|
|
sub init_record {
|
|
|
|
my ($record,$class,$get_db,$tablename,$expected_fieldnames,$target_indexes) = @_;
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
|
|
checktableinfo($db,$class,$tablename,$expected_fieldnames,$target_indexes);
|
|
|
|
if (defined $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}) { # and ref $table_fieldnames_cached->{$connectidentifier}->{$tablename} eq 'ARRAY') {
|
|
# if there are fieldnames defined, we make a member variable for each and set it to undef
|
|
foreach my $fieldname (@{$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}}) {
|
|
$record->{$fieldname} = undef;
|
|
}
|
|
}
|
|
|
|
return $record;
|
|
|
|
}
|
|
|
|
sub copy_row {
|
|
my ($record,$row,$expected_fieldnames) = @_;
|
|
if (defined $record and defined $row) {
|
|
my $i;
|
|
if (ref $row eq 'ARRAY') {
|
|
$i = 0;
|
|
} elsif (ref $row eq 'HASH') {
|
|
$i = -1;
|
|
} elsif (ref $row eq ref $record) {
|
|
$i = -2;
|
|
} else {
|
|
$i = -3;
|
|
}
|
|
foreach my $fieldname (@$expected_fieldnames) {
|
|
if ($i >= 0) {
|
|
$record->{$fieldname} = $row->[$i];
|
|
$i++;
|
|
} elsif ($i == -1 or $i == -2) {
|
|
if (exists $row->{$fieldname}) {
|
|
$record->{$fieldname} = $row->{$fieldname};
|
|
} elsif (exists $row->{uc($fieldname)}) {
|
|
$record->{$fieldname} = $row->{uc($fieldname)};
|
|
} else {
|
|
$record->{$fieldname} = undef;
|
|
}
|
|
} else {
|
|
last;
|
|
}
|
|
}
|
|
}
|
|
return $record;
|
|
}
|
|
|
|
sub cleartableinfo {
|
|
|
|
my $get_db = shift;
|
|
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
my $tid = threadid();
|
|
|
|
my $connectidentifier = $db->connectidentifier();
|
|
|
|
my $found = 0;
|
|
|
|
if (exists $table_names->{$tid}) {
|
|
if (exists $table_names->{$tid}->{$connectidentifier}) {
|
|
delete $table_names->{$tid}->{$connectidentifier};
|
|
$found = 1;
|
|
}
|
|
}
|
|
if (exists $table_expected_fieldnames->{$tid}) {
|
|
if (exists $table_expected_fieldnames->{$tid}->{$connectidentifier}) {
|
|
delete $table_expected_fieldnames->{$tid}->{$connectidentifier};
|
|
$found = 1;
|
|
}
|
|
}
|
|
if (exists $table_fieldnames_cached->{$tid}){
|
|
if (exists $table_fieldnames_cached->{$tid}->{$connectidentifier}) {
|
|
delete $table_fieldnames_cached->{$tid}->{$connectidentifier};
|
|
$found = 1;
|
|
}
|
|
}
|
|
if (exists $table_primarykeys->{$tid}) {
|
|
if (exists $table_primarykeys->{$tid}->{$connectidentifier}) {
|
|
delete $table_primarykeys->{$tid}->{$connectidentifier};
|
|
$found = 1;
|
|
}
|
|
}
|
|
if (exists $table_target_indexes->{$tid}) {
|
|
if (exists $table_target_indexes->{$tid}->{$connectidentifier}) {
|
|
delete $table_target_indexes->{$tid}->{$connectidentifier};
|
|
$found = 1;
|
|
}
|
|
}
|
|
|
|
if ((scalar keys %{$table_names->{$tid}}) == 0) {
|
|
delete $table_names->{$tid};
|
|
$found = 1;
|
|
}
|
|
if ((scalar keys %{$table_expected_fieldnames->{$tid}}) == 0) {
|
|
delete $table_expected_fieldnames->{$tid};
|
|
$found = 1;
|
|
}
|
|
if ((scalar keys %{$table_fieldnames_cached->{$tid}}) == 0) {
|
|
delete $table_fieldnames_cached->{$tid};
|
|
$found = 1;
|
|
}
|
|
if ((scalar keys %{$table_primarykeys->{$tid}}) == 0) {
|
|
delete $table_primarykeys->{$tid};
|
|
$found = 1;
|
|
}
|
|
if ((scalar keys %{$table_target_indexes->{$tid}}) == 0) {
|
|
delete $table_target_indexes->{$tid};
|
|
$found = 1;
|
|
}
|
|
|
|
if ($found) {
|
|
tableinfoscleared($db,getlogger(__PACKAGE__));
|
|
}
|
|
|
|
}
|
|
|
|
sub registertableinfo { # to prepare creation of non-existent tables..
|
|
|
|
my ($get_db,$class,$tablename,$fieldnames,$indexes,$keycols) = @_;
|
|
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
|
|
if (not exists $table_names->{$tid}) {
|
|
$table_names->{$tid} = {};
|
|
}
|
|
if (not exists $table_names->{$tid}->{$connectidentifier}) {
|
|
$table_names->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
$table_names->{$tid}->{$connectidentifier}->{$class} = $tablename;
|
|
if (not exists $table_expected_fieldnames->{$tid}) {
|
|
$table_expected_fieldnames->{$tid} = {};
|
|
}
|
|
if (not exists $table_expected_fieldnames->{$tid}->{$connectidentifier}) {
|
|
# create an empty category for the connection if none exists yet:
|
|
$table_expected_fieldnames->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
|
|
# we prefer to always update the expected fieldnames (that come from a derived class)
|
|
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = $fieldnames // [];
|
|
|
|
if (not exists $table_fieldnames_cached->{$tid}) {
|
|
$table_fieldnames_cached->{$tid} = {};
|
|
}
|
|
if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}) {
|
|
# create an empty fieldname cache for the connection if none exists yet:
|
|
$table_fieldnames_cached->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
|
|
if (not exists $table_primarykeys->{$tid}) {
|
|
$table_primarykeys->{$tid} = {};
|
|
}
|
|
if (not exists $table_primarykeys->{$tid}->{$connectidentifier}) {
|
|
# create an empty primary key column name cache for the connection if none exists yet:
|
|
$table_primarykeys->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = $keycols // [];
|
|
|
|
if (not exists $table_target_indexes->{$tid}) {
|
|
$table_target_indexes->{$tid} = {};
|
|
}
|
|
if (not exists $table_target_indexes->{$tid}->{$connectidentifier}) {
|
|
# create an empty index set list for target tables for the connection if none exists yet:
|
|
$table_target_indexes->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
# we prefer to always update the target table indexes (that come from a derived class)
|
|
$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = $indexes // {};
|
|
|
|
}
|
|
|
|
sub checktableinfo {
|
|
|
|
my ($get_db,$class,$tablename,$expected_fieldnames,$target_indexes) = @_;
|
|
|
|
my $result = 1;
|
|
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
|
|
if (not exists $table_names->{$tid}) {
|
|
$table_names->{$tid} = {};
|
|
}
|
|
if (not exists $table_names->{$tid}->{$connectidentifier}) {
|
|
$table_names->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
$table_names->{$tid}->{$connectidentifier}->{$class} = $tablename;
|
|
if (not exists $table_expected_fieldnames->{$tid}) {
|
|
#$table_expected_fieldnames->{$tid} = shared_clone({});
|
|
$table_expected_fieldnames->{$tid} = {};
|
|
}
|
|
if (not exists $table_expected_fieldnames->{$tid}->{$connectidentifier}) {
|
|
# create an empty category for the connection if none exists yet:
|
|
#$table_expected_fieldnames->{$tid}->{$connectidentifier} = shared_clone({});
|
|
$table_expected_fieldnames->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
# we prefer to always update the expected fieldnames (that come from a derived class)
|
|
#$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = shared_clone($expected_fieldnames);
|
|
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = $expected_fieldnames // [];
|
|
|
|
if (not exists $table_fieldnames_cached->{$tid}) {
|
|
#$table_fieldnames_cached->{$tid} = shared_clone({});
|
|
$table_fieldnames_cached->{$tid} = {};
|
|
}
|
|
if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}) {
|
|
# create an empty fieldname cache for the connection if none exists yet:
|
|
#$table_fieldnames_cached->{$tid}->{$connectidentifier} = shared_clone({});
|
|
$table_fieldnames_cached->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
|
|
if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}) {
|
|
# query the database for fieldnames of the table if we don't have a cache entry yet:
|
|
my $fieldnames = $db->getfieldnames($tablename);
|
|
if (!defined $fieldnames
|
|
or (scalar @{$fieldnames}) == 0
|
|
or setcontains($table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$fieldnames,1)) {
|
|
#fieldnames are case insensitive in general
|
|
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class} = { fieldnames => $fieldnames, ok => 1, };
|
|
fieldnamesacquired($db,$tablename,getlogger(__PACKAGE__));
|
|
} else {
|
|
# otherwise we log a failure (exit? - see Logging Module)
|
|
#$table_fieldnames_cached->{$connectidentifier}->{$tablename} = {}; #$fieldnames;
|
|
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class} = { fieldnames => $fieldnames, ok => 0, };
|
|
fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$fieldnames,getlogger(__PACKAGE__));
|
|
$result = 0;
|
|
}
|
|
} elsif (not $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}->{ok}) {
|
|
fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},
|
|
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}->{fieldnames},getlogger(__PACKAGE__));
|
|
$result = 0;
|
|
}
|
|
|
|
if (not exists $table_primarykeys->{$tid}) {
|
|
#$table_primarykeys->{$tid} = shared_clone({});
|
|
$table_primarykeys->{$tid} = {};
|
|
}
|
|
if (not exists $table_primarykeys->{$tid}->{$connectidentifier}) {
|
|
# create an empty primary key column name cache for the connection if none exists yet:
|
|
#$table_primarykeys->{$tid}->{$connectidentifier} = shared_clone({});
|
|
$table_primarykeys->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
if (not exists $table_primarykeys->{$tid}->{$connectidentifier}->{$class}) {
|
|
# query the database for primary keys of the table if we don't have them cached yet:
|
|
#$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = shared_clone($db->getprimarykeycols($class));
|
|
$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = $db->getprimarykeycols($tablename);
|
|
primarykeycolsacquired($db,$tablename,$table_primarykeys->{$tid}->{$connectidentifier}->{$class},getlogger(__PACKAGE__));
|
|
}
|
|
|
|
if (not exists $table_target_indexes->{$tid}) {
|
|
#$table_target_indexes->{$tid} = shared_clone({});
|
|
$table_target_indexes->{$tid} = {};
|
|
}
|
|
if (not exists $table_target_indexes->{$tid}->{$connectidentifier}) {
|
|
# create an empty index set list for target tables for the connection if none exists yet:
|
|
#$table_target_indexes->{$tid}->{$connectidentifier} = shared_clone({});
|
|
$table_target_indexes->{$tid}->{$connectidentifier} = {};
|
|
}
|
|
# we prefer to always update the target table indexes (that come from a derived class)
|
|
#$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = shared_clone($target_indexes);
|
|
$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = $target_indexes // {};
|
|
|
|
return $result;
|
|
|
|
}
|
|
|
|
sub create_targettable {
|
|
|
|
my ($get_db,$class,$get_target_db,$targetclass,$targettablename,$truncate,$defer_indexes,$texttable_engine) = @_;
|
|
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
|
|
|
#my $targettablename = _gettargettablename($db,$tablename,$target_db);
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
|
|
if ($truncate and $defer_indexes) {
|
|
$target_db->drop_table($targettablename);
|
|
}
|
|
|
|
my $result = $target_db->create_texttable($targettablename,
|
|
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},
|
|
$table_primarykeys->{$tid}->{$connectidentifier}->{$class},
|
|
$table_target_indexes->{$tid}->{$connectidentifier}->{$class},
|
|
# 'ifnotexists' is always true
|
|
$truncate,
|
|
$defer_indexes,
|
|
$texttable_engine);
|
|
|
|
checktableinfo($target_db,$targetclass,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$defer_indexes ? undef : $table_target_indexes->{$tid}->{$connectidentifier}->{$class});
|
|
return $result;
|
|
|
|
}
|
|
|
|
sub delete_records {
|
|
|
|
my ($get_db,$get_xa_db,$class,$keyfields,$equal,$vals_table) = @_;
|
|
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
|
|
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
|
|
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
|
|
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
|
|
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
|
|
|
|
if (defined $expected_fieldnames and
|
|
(defined $keyfields and
|
|
ref $keyfields eq 'ARRAY') and
|
|
(defined $vals_table and
|
|
ref $vals_table eq 'NGCP::BulkProcessor::Table')) {
|
|
|
|
my @fields = @$keyfields;
|
|
my $field_cnt = scalar @fields;
|
|
|
|
my $total_rowcount = 0;
|
|
|
|
my $initial_rowcount = $xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename));
|
|
|
|
if ($field_cnt > 0) {
|
|
my $where_clause;
|
|
if ($equal) {
|
|
$where_clause = ' WHERE ' . join(' = ? AND ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fields) . ' = ?';
|
|
|
|
my $count_stmt = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . $where_clause;
|
|
my $delete_stmt = 'DELETE FROM ' . $db->tableidentifier($tablename) . $where_clause;
|
|
|
|
for (my $i = 0; $i < $vals_table->rowcount(); $i++) {
|
|
my @vals = $vals_table->getrow($i);
|
|
my $new_initial_rowcount = $xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename));
|
|
my $rowcount = $xa_db->db_get_value($count_stmt,@vals);
|
|
$xa_db->db_do($delete_stmt,@vals);
|
|
rowsdeleted($db,$tablename,$rowcount,$new_initial_rowcount,getlogger(__PACKAGE__));
|
|
$total_rowcount += $rowcount;
|
|
}
|
|
|
|
} elsif ($field_cnt == 1) {
|
|
my @ne_vals = $vals_table->getcol(0);
|
|
$where_clause = ' WHERE ' . $db->columnidentifier($fields[0]) . ' NOT IN (' . substr(',?' x scalar @ne_vals,1) . ')';
|
|
my $count_stmt = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . $where_clause;
|
|
my $delete_stmt = 'DELETE FROM ' . $db->tableidentifier($tablename) . $where_clause;
|
|
my $rowcount = $xa_db->db_get_value($count_stmt,@ne_vals);
|
|
$xa_db->db_do($delete_stmt,@ne_vals);
|
|
rowsdeleted($db,$tablename,$rowcount,$initial_rowcount,getlogger(__PACKAGE__));
|
|
$total_rowcount += $rowcount;
|
|
} else {
|
|
|
|
deleterowserror($db,$tablename,'deletings rows by complementary identifier values works with a single identifier column only',getlogger(__PACKAGE__));
|
|
return;
|
|
|
|
}
|
|
} else {
|
|
my $delete_stmt = 'DELETE FROM ' . $db->tableidentifier($tablename);
|
|
my $count_stmt = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename);
|
|
my $rowcount = $xa_db->db_get_value($count_stmt);
|
|
$xa_db->db_do($delete_stmt);
|
|
rowsdeleted($db,$tablename,$rowcount,$initial_rowcount,getlogger(__PACKAGE__));
|
|
$total_rowcount += $rowcount;
|
|
}
|
|
|
|
$xa_db->vacuum($tablename);
|
|
|
|
#if ($total_rowcount > 0) {
|
|
totalrowsdeleted($db,$tablename,$total_rowcount,$initial_rowcount,getlogger(__PACKAGE__));
|
|
#}
|
|
|
|
return $total_rowcount;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sub insert_record {
|
|
|
|
my ($get_db,$get_xa_db,$class,$row,$insert_ignore,$unique_count_fields) = @_;
|
|
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
|
|
|
|
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
|
|
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
|
|
#my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
|
|
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
|
|
|
|
if (defined $expected_fieldnames and defined $row) {
|
|
|
|
my @fields = ();
|
|
my @vals = ();
|
|
|
|
foreach my $fieldname (@$expected_fieldnames) {
|
|
if (exists $row->{$fieldname}) {
|
|
push @fields,$fieldname;
|
|
push @vals,$row->{$fieldname};
|
|
}
|
|
}
|
|
|
|
my @unique_fields = ();
|
|
my @unique_vals = ();
|
|
if (defined $unique_count_fields and 'ARRAY' eq ref $unique_count_fields) {
|
|
foreach my $fieldname (@$unique_count_fields) {
|
|
push(@unique_fields,$fieldname);
|
|
if (exists $row->{$fieldname}) {
|
|
push @unique_vals,$row->{$fieldname};
|
|
} else {
|
|
push @unique_vals,undef;
|
|
}
|
|
}
|
|
}
|
|
|
|
my $stmt = 'INSERT ' . ($insert_ignore ? $db->insert_ignore_phrase() . ' ' : '') . 'INTO ' . $db->tableidentifier($tablename) . ' (' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fields) . ') VALUES (' . substr(',?' x scalar @fields,1) . ')';
|
|
if ((scalar @unique_fields) > 0) {
|
|
if ($xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . ' WHERE ' . join(' = ? AND ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @unique_fields) . ' = ?',@unique_vals) == 0) {
|
|
if ($xa_db->db_do($stmt,@vals)) {
|
|
rowinserted($db,$tablename,getlogger(__PACKAGE__));
|
|
return 1;
|
|
} else {
|
|
rowinsertskipped($db,$tablename,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
} else {
|
|
rowinsertskipped($db,$tablename,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
} else {
|
|
if ($xa_db->db_do($stmt,@vals)) {
|
|
rowinserted($db,$tablename,getlogger(__PACKAGE__));
|
|
return 1;
|
|
} else {
|
|
rowinsertskipped($db,$tablename,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sub delete_record {
|
|
|
|
my ($get_db,$get_xa_db,$class,$row) = @_;
|
|
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
|
|
|
|
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
|
|
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
|
|
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
|
|
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
|
|
|
|
if (defined $expected_fieldnames and defined $row) {
|
|
|
|
my @pk_fields = ();
|
|
my @pk_vals = ();
|
|
|
|
my $full_pk = 0;
|
|
if (defined $primarykeys) {
|
|
$full_pk = ((scalar @$primarykeys) > 0 ? 1 : 0);
|
|
foreach my $fieldname (@$primarykeys) {
|
|
if (exists $row->{$fieldname}) {
|
|
push(@pk_fields,$fieldname);
|
|
push(@pk_vals,delete $row->{$fieldname});
|
|
} else {
|
|
$full_pk &= 0;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
if ((scalar @pk_fields) == 0) {
|
|
foreach my $fieldname (@$expected_fieldnames) {
|
|
if (exists $row->{$fieldname}) {
|
|
push @pk_fields,$fieldname;
|
|
push @pk_vals,$row->{$fieldname};
|
|
}
|
|
}
|
|
}
|
|
|
|
my $selectpk_fieldnames = join(' = ? AND ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @pk_fields);
|
|
my $stmt = 'DELETE FROM ' . $db->tableidentifier($tablename) . ' WHERE ' . $selectpk_fieldnames . ' = ?';
|
|
if ($full_pk) {
|
|
if ($xa_db->db_do($stmt,@pk_vals)) {
|
|
rowsdeleted($db,$tablename,1,1,getlogger(__PACKAGE__));
|
|
return 1;
|
|
} else {
|
|
rowsdeleted($db,$tablename,0,0,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
} else {
|
|
my $count = $xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . ' WHERE ' . $selectpk_fieldnames . ' = ?',@pk_vals);
|
|
if ($count == 1) {
|
|
my $affected;
|
|
if ($affected = $xa_db->db_do($stmt,@pk_vals)) {
|
|
rowsdeleted($db,$tablename,$affected,$count,getlogger(__PACKAGE__));
|
|
return 1;
|
|
} else {
|
|
rowsdeleted($db,$tablename,0,$count,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
} else {
|
|
rowsdeleted($db,$tablename,0,$count,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sub update_record {
|
|
|
|
my ($get_db,$get_xa_db,$class,$row) = @_;
|
|
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
|
|
|
|
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
|
|
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
|
|
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
|
|
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
|
|
|
|
if (defined $expected_fieldnames and defined $row) {
|
|
|
|
my %data = %$row;
|
|
|
|
my @pk_fields = ();
|
|
my @pk_vals = ();
|
|
|
|
my $full_pk = 0;
|
|
if (defined $primarykeys) {
|
|
$full_pk = ((scalar @$primarykeys) > 0 ? 1 : 0);
|
|
foreach my $fieldname (@$primarykeys) {
|
|
if (exists $data{$fieldname}) {
|
|
push(@pk_fields,$fieldname);
|
|
push(@pk_vals,delete $data{$fieldname});
|
|
} else {
|
|
$full_pk &= 0;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
my @fields = ();
|
|
my @vals = ();
|
|
|
|
foreach my $fieldname (@$expected_fieldnames) {
|
|
if (exists $data{$fieldname}) {
|
|
push @fields,$fieldname;
|
|
push @vals,$data{$fieldname};
|
|
}
|
|
}
|
|
|
|
if ((scalar @pk_fields) == 0) {
|
|
@pk_fields = @fields;
|
|
@pk_vals = @vals;
|
|
}
|
|
|
|
my $selectpk_fieldnames = join(' = ? AND ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @pk_fields);
|
|
my $stmt = 'UPDATE ' . $db->tableidentifier($tablename) . ' SET ' . join(' = ?, ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fields) . ' = ? WHERE ' . $selectpk_fieldnames . ' = ?';
|
|
if ($full_pk) {
|
|
if ($xa_db->db_do($stmt,@vals,@pk_vals)) {
|
|
rowupdated($db,$tablename,getlogger(__PACKAGE__));
|
|
return 1;
|
|
} else {
|
|
rowupdateskipped($db,$tablename,0,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
} else {
|
|
my $count = $xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . ' WHERE ' . $selectpk_fieldnames . ' = ?',@pk_vals);
|
|
if ($count == 1) {
|
|
if ($xa_db->db_do($stmt,@vals,@pk_vals)) {
|
|
rowupdated($db,$tablename,getlogger(__PACKAGE__));
|
|
return 1;
|
|
} else {
|
|
rowupdateskipped($db,$tablename,0,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
} else {
|
|
rowupdateskipped($db,$tablename,$count,getlogger(__PACKAGE__));
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sub insert_stmt {
|
|
|
|
my ($get_db,$class,$insert_ignore,$exclude_identity_fieldnames,$row_count) = @_;
|
|
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
|
|
if ($exclude_identity_fieldnames) {
|
|
$expected_fieldnames = [ grep { not contains($_,$exclude_identity_fieldnames); } @$expected_fieldnames ];
|
|
}
|
|
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
|
|
$row_count //= 1;
|
|
my $values = join ", ", ('(' . substr(',?' x scalar @$expected_fieldnames,1) . ')') x $row_count;
|
|
return 'INSERT ' . ($insert_ignore ? $db->insert_ignore_phrase() . ' ' : '') . 'INTO ' . $db->tableidentifier($tablename) . ' (' .
|
|
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) .
|
|
') VALUES ' . $values;
|
|
|
|
}
|
|
|
|
sub transfer_table {
|
|
|
|
my %params = @_;
|
|
my ($get_db,
|
|
$class,
|
|
$get_target_db,
|
|
$targetclass,
|
|
$targettablename,
|
|
$truncate_targettable,
|
|
$create_indexes,
|
|
$texttable_engine,
|
|
$fixtable_statements,
|
|
$multithreading,
|
|
$blocksize,
|
|
$destroy_source_dbs_code,
|
|
$destroy_target_dbs_code,
|
|
$selectcount,
|
|
$select,
|
|
$loop,
|
|
$values) = @params{qw/
|
|
get_db
|
|
class
|
|
get_target_db
|
|
targetclass
|
|
targettablename
|
|
truncate_targettable
|
|
create_indexes
|
|
texttable_engine
|
|
fixtable_statements
|
|
multithreading
|
|
blocksize
|
|
destroy_source_dbs_code
|
|
destroy_target_dbs_code
|
|
selectcount
|
|
select
|
|
loop
|
|
values
|
|
/};
|
|
|
|
#my ($get_db,$tablename,$get_target_db,$targettablename,$truncate_targettable,$create_indexes,$texttable_engine,$fixtable_statements,$selectcount,$select,@values) = @_;
|
|
|
|
if (ref $get_db eq 'CODE' and ref $get_target_db eq 'CODE') {
|
|
|
|
my $db = &$get_db($reader_name,1); # $reader_name
|
|
my $target_db = &$get_target_db($writer_name); #$writer_name);
|
|
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
|
|
|
|
my $countstatement;
|
|
if (defined $selectcount) {
|
|
$countstatement = $selectcount;
|
|
} else {
|
|
$countstatement = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename);
|
|
}
|
|
|
|
my $rowcount;
|
|
unless ($loop) {
|
|
$rowcount = $db->db_get_value($countstatement,@$values);
|
|
|
|
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
|
|
|
|
if ($rowcount > 0) {
|
|
tabletransferstarted($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
|
|
} else {
|
|
transferzerorowcount($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
|
|
return 1;
|
|
}
|
|
} else {
|
|
tabletransferstarted($db,$tablename,$target_db,$targettablename,undef,getlogger(__PACKAGE__));
|
|
}
|
|
|
|
my $errorstate = $RUNNING; # 1;
|
|
|
|
$create_indexes = ((defined $create_indexes) ? $create_indexes : $transfer_defer_indexes);
|
|
|
|
if (create_targettable($db,$class,$target_db,$targetclass,$targettablename,$truncate_targettable,$create_indexes,$texttable_engine)) {
|
|
|
|
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
|
|
|
|
my @fieldnames = @$expected_fieldnames;
|
|
|
|
#my $setfieldnames = join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames);
|
|
my $valueplaceholders = substr(',?' x scalar @fieldnames,1);
|
|
|
|
my $selectstatement;
|
|
if (length($select) > 0) {
|
|
$selectstatement = $select;
|
|
} else {
|
|
$selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename);
|
|
}
|
|
|
|
my $insertstatement = 'INSERT INTO ' . $target_db->tableidentifier($targettablename) . ' (' . join(', ',map { local $_ = $_; $_ = $target_db->columnidentifier($_); $_; } @fieldnames) . ') VALUES (' . $valueplaceholders . ')';
|
|
|
|
if ($enablemultithreading and $multithreading and $db->multithreading_supported() and $target_db->multithreading_supported() and $cpucount > 1) { # and $multithreaded) { # definitely no multithreading when CSVDB is involved
|
|
|
|
$blocksize //= _calc_blocksize($rowcount,scalar @fieldnames,1,$tabletransfer_threadqueuelength);
|
|
|
|
my $reader;
|
|
my $writer;
|
|
|
|
my %errorstates :shared = ();
|
|
#$errorstates{$tid} = $errorstate;
|
|
|
|
#my $readererrorstate :shared = 1;
|
|
#my $writererrorstate :shared = 1;
|
|
|
|
my $queue = Thread::Queue->new();
|
|
|
|
tablethreadingdebug('shutting down db connections ...',getlogger(__PACKAGE__));
|
|
|
|
$db->db_disconnect();
|
|
#undef $db;
|
|
$target_db->db_disconnect();
|
|
#undef $target_db;
|
|
my $default_connection = &$get_db(undef,0);
|
|
my $default_connection_reconnect = $default_connection->is_connected();
|
|
$default_connection->db_disconnect();
|
|
|
|
tablethreadingdebug('starting reader thread',getlogger(__PACKAGE__));
|
|
|
|
$reader = threads->create(\&_reader,
|
|
{ name => $reader_name,
|
|
queue => $queue,
|
|
errorstates => \%errorstates,
|
|
#readererrorstate_ref => \$readererrorstate,
|
|
#writererrorstate_ref => \$writererrorstate,
|
|
threadqueuelength => $tabletransfer_threadqueuelength,
|
|
get_db => $get_db,
|
|
tablename => $tablename,
|
|
class => $class,
|
|
selectstatement => $selectstatement,
|
|
blocksize => $blocksize,
|
|
rowcount => $rowcount,
|
|
loop => $loop,
|
|
#logger => $logger,
|
|
values_ref => $values,
|
|
destroy_dbs_code => $destroy_source_dbs_code,
|
|
});
|
|
|
|
tablethreadingdebug('starting writer thread',getlogger(__PACKAGE__));
|
|
|
|
$writer = threads->create(\&_writer,
|
|
{ #name => $writer_name,
|
|
queue => $queue,
|
|
errorstates => \%errorstates,
|
|
readertid => $reader->tid(),
|
|
#readererrorstate_ref => \$readererrorstate,
|
|
#writererrorstate_ref => \$writererrorstate,
|
|
get_target_db => $get_target_db,
|
|
targettablename => $targettablename,
|
|
targetclass => $targetclass,
|
|
insertstatement => $insertstatement,
|
|
blocksize => $blocksize,
|
|
rowcount => $rowcount,
|
|
loop => $loop,
|
|
#logger => $logger,
|
|
destroy_dbs_code => $destroy_target_dbs_code,
|
|
});
|
|
|
|
my $signal_handler = sub {
|
|
my $tid = threadid();
|
|
$errorstate = $STOP;
|
|
enable_threading_info(1);
|
|
tablethreadingdebug("[$tid] interrupt signal received",getlogger(__PACKAGE__));
|
|
#print("[$tid] interrupt signal received");
|
|
#_info($context,"interrupt signal received");
|
|
#$result = 0;
|
|
my $errorstates = \%errorstates;
|
|
lock $errorstates;
|
|
$errorstates->{$tid} = $STOP;
|
|
};
|
|
local $SIG{TERM} = $signal_handler;
|
|
local $SIG{INT} = $signal_handler;
|
|
local $SIG{QUIT} = $signal_handler;
|
|
local $SIG{HUP} = $signal_handler;
|
|
|
|
$reader->join();
|
|
tablethreadingdebug('reader thread joined',getlogger(__PACKAGE__));
|
|
$writer->join();
|
|
tablethreadingdebug('writer thread joined',getlogger(__PACKAGE__));
|
|
|
|
$errorstate = $COMPLETED if $errorstate == $RUNNING;
|
|
#$errorstate = $readererrorstate | $writererrorstate;
|
|
$errorstate |= _get_other_threads_state(\%errorstates,$tid);
|
|
|
|
tablethreadingdebug('restoring db connections ...',getlogger(__PACKAGE__));
|
|
|
|
#$db = &$get_db($reader_name,1);
|
|
$target_db = &$get_target_db(undef,1);
|
|
if ($default_connection_reconnect) {
|
|
$default_connection = &$get_db(undef,1);
|
|
}
|
|
|
|
} else {
|
|
|
|
$blocksize //= _calc_blocksize($rowcount,scalar @fieldnames,0,undef);
|
|
|
|
#$db->db_disconnect();
|
|
#undef $db;
|
|
#$db = &$get_db($reader_name);
|
|
#$target_db->db_disconnect();
|
|
#undef $target_db;
|
|
#$target_db = &$get_target_db($writer_name);
|
|
|
|
eval {
|
|
$db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename
|
|
|
|
my $last_i;
|
|
my $i = 0;
|
|
while (1) {
|
|
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
|
|
$db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional;
|
|
my $rowblock = $db->db_get_rowblock($blocksize);
|
|
$db->db_finish() unless $db->rowblock_transactional;
|
|
my $realblocksize = scalar @$rowblock;
|
|
if ($realblocksize > 0) {
|
|
writing_rows($target_db,$targettablename,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
|
|
$target_db->db_do_begin($insertstatement); #,$targettablename);
|
|
$target_db->db_do_rowblock($rowblock);
|
|
$target_db->db_finish();
|
|
$i += $realblocksize;
|
|
|
|
#foreach my $row (@$rowblock) {
|
|
# undef $row;
|
|
#}
|
|
#undef $rowblock;
|
|
|
|
if ($realblocksize < $blocksize) {
|
|
last unless $loop;
|
|
}
|
|
} else {
|
|
last unless $loop;
|
|
}
|
|
}
|
|
$db->db_finish() if $db->rowblock_transactional;
|
|
|
|
};
|
|
|
|
if ($@) {
|
|
$errorstate = $ERROR;
|
|
} else {
|
|
$errorstate = $COMPLETED;
|
|
}
|
|
|
|
$db->db_disconnect();
|
|
#undef $db;
|
|
#$target_db->db_disconnect();
|
|
#undef $target_db;
|
|
|
|
}
|
|
|
|
#$db = &$get_db($controller_name,1);
|
|
#$target_db = &$get_target_db($controller_name,1);
|
|
|
|
if ($errorstate == $COMPLETED and ref $fixtable_statements eq 'ARRAY' and (scalar @$fixtable_statements) > 0) {
|
|
eval {
|
|
foreach my $fixtable_statement (@$fixtable_statements) {
|
|
if (ref $fixtable_statement eq '') {
|
|
$target_db->db_do($fixtable_statement);
|
|
tablefixed($target_db,$targettablename,$fixtable_statement,getlogger(__PACKAGE__));
|
|
} else {
|
|
$fixtable_statement = &$fixtable_statement($target_db->tableidentifier($targettablename));
|
|
$target_db->db_do($fixtable_statement);
|
|
tablefixed($target_db,$targettablename,$fixtable_statement,getlogger(__PACKAGE__));
|
|
}
|
|
|
|
}
|
|
};
|
|
if ($@) {
|
|
$errorstate = $ERROR;
|
|
#} else {
|
|
# $errorstate = $COMPLETED;
|
|
}
|
|
}
|
|
|
|
if ($errorstate == $COMPLETED and $create_indexes) {
|
|
|
|
eval {
|
|
$target_db->create_primarykey($targettablename,
|
|
$table_primarykeys->{$tid}->{$connectidentifier}->{$class},
|
|
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class});
|
|
|
|
$target_db->create_indexes($targettablename,
|
|
$table_target_indexes->{$tid}->{$connectidentifier}->{$class},
|
|
$table_primarykeys->{$tid}->{$connectidentifier}->{$class});
|
|
|
|
|
|
delete $table_primarykeys->{$tid}->{$target_db->connectidentifier()}->{$targetclass};
|
|
checktableinfo($target_db,$targetclass,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$table_target_indexes->{$tid}->{$connectidentifier}->{$class});
|
|
|
|
$target_db->vacuum($targettablename);
|
|
|
|
};
|
|
|
|
if ($@) {
|
|
$errorstate = $ERROR;
|
|
#} else {
|
|
# $errorstate = $COMPLETED;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
if ($errorstate == $COMPLETED) {
|
|
tabletransferdone($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
|
|
#$db->db_disconnect();
|
|
#$target_db->db_disconnect();
|
|
return 1;
|
|
} else {
|
|
tabletransferfailed($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
|
|
#$db->db_disconnect();
|
|
#$target_db->db_disconnect();
|
|
}
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
sub process_table {
|
|
|
|
my %params = @_;
|
|
my ($get_db,
|
|
$name,
|
|
$class,
|
|
$process_code,
|
|
$read_code,
|
|
$static_context,
|
|
$init_process_context_code,
|
|
$uninit_process_context_code,
|
|
$multithreading,
|
|
$blocksize,
|
|
$tableprocessing_threads,
|
|
$destroy_reader_dbs_code,
|
|
$selectcount,
|
|
$select,
|
|
$loop,
|
|
$values) = @params{qw/
|
|
get_db
|
|
name
|
|
class
|
|
process_code
|
|
read_code
|
|
static_context
|
|
init_process_context_code
|
|
uninit_process_context_code
|
|
multithreading
|
|
blocksize
|
|
tableprocessing_threads
|
|
destroy_reader_dbs_code
|
|
selectcount
|
|
select
|
|
loop
|
|
values
|
|
/};
|
|
|
|
#my ($get_db,$tablename,$process_code,$init_process_context_code,$uninit_process_context_code,$multithreading,$selectcount,$select,@values) = @_;
|
|
|
|
if (ref $get_db eq 'CODE') {
|
|
|
|
my $db = &$get_db($name,1); #$reader_name
|
|
|
|
my $connectidentifier = $db->connectidentifier();
|
|
my $tid = threadid();
|
|
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
|
|
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
|
|
|
|
my $countstatement;
|
|
if (defined $selectcount) {
|
|
$countstatement = $selectcount;
|
|
} else {
|
|
$countstatement = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename);
|
|
}
|
|
|
|
my $rowcount;
|
|
unless ($loop) {
|
|
$rowcount = $db->db_get_value($countstatement,@$values);
|
|
|
|
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
|
|
|
|
if ($rowcount > 0) {
|
|
tableprocessingstarted($db,$tablename,$rowcount,getlogger(__PACKAGE__));
|
|
} else {
|
|
processzerorowcount($db,$tablename,$rowcount,getlogger(__PACKAGE__));
|
|
return 1;
|
|
}
|
|
} else {
|
|
tableprocessingstarted($db,$tablename,undef,getlogger(__PACKAGE__));
|
|
}
|
|
|
|
my @fieldnames = @$expected_fieldnames;
|
|
my $selectstatement;
|
|
if (length($select) > 0) {
|
|
$selectstatement = $select;
|
|
} else {
|
|
$selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename);
|
|
}
|
|
|
|
my $errorstate = $RUNNING;
|
|
|
|
if ($enablemultithreading and $multithreading and $db->multithreading_supported() and $cpucount > 1) { # and $multithreaded) { # definitely no multithreading when CSVDB is involved
|
|
|
|
$tableprocessing_threads //= $cpucount;
|
|
$blocksize //= _calc_blocksize($rowcount,scalar @fieldnames,1,$tableprocessing_threadqueuelength);
|
|
|
|
my $reader;
|
|
#my $processor;
|
|
my %processors = ();
|
|
|
|
my %errorstates :shared = ();
|
|
#$errorstates{$tid} = $errorstate;
|
|
|
|
#my $readererrorstate :shared = 1;
|
|
#my $processorerrorstate :shared = 1;
|
|
|
|
my $queue = Thread::Queue->new();
|
|
|
|
tablethreadingdebug('shutting down db connections ...',getlogger(__PACKAGE__));
|
|
|
|
$db->db_disconnect();
|
|
#undef $db;
|
|
my $default_connection = &$get_db(undef,0);
|
|
my $default_connection_reconnect = $default_connection->is_connected();
|
|
$default_connection->db_disconnect();
|
|
|
|
tablethreadingdebug('starting reader thread',getlogger(__PACKAGE__));
|
|
|
|
$reader = threads->create(\&_reader,
|
|
{ name => $name,
|
|
queue => $queue,
|
|
errorstates => \%errorstates,
|
|
#readererrorstate_ref => \$readererrorstate,
|
|
#writererrorstate_ref => \$processorerrorstate,
|
|
read_code => $read_code,
|
|
threadqueuelength => $tableprocessing_threadqueuelength,
|
|
get_db => $get_db,
|
|
tablename => $tablename,
|
|
class => $class,
|
|
selectstatement => $selectstatement,
|
|
blocksize => $blocksize,
|
|
rowcount => $rowcount,
|
|
loop => $loop,
|
|
#logger => $logger,
|
|
values_ref => $values,
|
|
destroy_dbs_code => $destroy_reader_dbs_code,
|
|
});
|
|
|
|
for (my $i = 0; $i < $tableprocessing_threads; $i++) {
|
|
tablethreadingdebug('starting processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads,getlogger(__PACKAGE__));
|
|
my $processor = threads->create(\&_process,
|
|
_create_process_context($static_context,
|
|
{ name => $name,
|
|
queue => $queue,
|
|
errorstates => \%errorstates,
|
|
readertid => $reader->tid(),
|
|
#readererrorstate_ref => \$readererrorstate,
|
|
#processorerrorstate_ref => \$processorerrorstate,
|
|
process_code => $process_code,
|
|
init_process_context_code => $init_process_context_code,
|
|
uninit_process_context_code => $uninit_process_context_code,
|
|
blocksize => $blocksize,
|
|
rowcount => $rowcount,
|
|
loop => $loop,
|
|
#logger => $logger,
|
|
}));
|
|
if (!defined $processor) {
|
|
tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' NOT started',getlogger(__PACKAGE__));
|
|
}
|
|
$processors{$processor->tid()} = $processor;
|
|
#push (@processors,$processor);
|
|
}
|
|
|
|
#$reader->join();
|
|
#tablethreadingdebug('reader thread joined',getlogger(__PACKAGE__));
|
|
#for (my $i = 0; $i < $tableprocessing_threads; $i++) {
|
|
# my $processor = $processors[$i];
|
|
# if (defined $processor) {
|
|
# $processor->join();
|
|
# tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' joinded',getlogger(__PACKAGE__));
|
|
# } else {
|
|
# tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' NOT joinded',getlogger(__PACKAGE__));
|
|
# }
|
|
#}
|
|
|
|
my $signal_handler = sub {
|
|
my $tid = threadid();
|
|
$errorstate = $STOP;
|
|
enable_threading_info(1);
|
|
tablethreadingdebug("[$tid] interrupt signal received",getlogger(__PACKAGE__));
|
|
#print("[$tid] interrupt signal received");
|
|
#_info($context,"interrupt signal received");
|
|
#$result = 0;
|
|
my $errorstates = \%errorstates;
|
|
lock $errorstates;
|
|
$errorstates->{$tid} = $STOP;
|
|
};
|
|
local $SIG{TERM} = $signal_handler;
|
|
local $SIG{INT} = $signal_handler;
|
|
local $SIG{QUIT} = $signal_handler;
|
|
local $SIG{HUP} = $signal_handler;
|
|
|
|
$reader->join();
|
|
tablethreadingdebug('reader thread joined',getlogger(__PACKAGE__));
|
|
#print 'threads running: ' . (scalar threads->list(threads::running));
|
|
#while ((scalar threads->list(threads::running)) > 1 or (scalar threads->list(threads::joinable)) > 0) {
|
|
while ((scalar keys %processors) > 0) {
|
|
#for (my $i = 0; $i < $tableprocessing_threads; $i++) {
|
|
foreach my $processor (values %processors) {
|
|
#my $processor = $processors[$i];
|
|
if (defined $processor and $processor->is_joinable()) {
|
|
$processor->join();
|
|
delete $processors{$processor->tid()};
|
|
#tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' joinded',getlogger(__PACKAGE__));
|
|
tablethreadingdebug('processor thread tid ' . $processor->tid() . ' joined',getlogger(__PACKAGE__));
|
|
}
|
|
#} else {
|
|
# tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' NOT joinded',getlogger(__PACKAGE__));
|
|
#}
|
|
}
|
|
sleep($thread_sleep_secs);
|
|
}
|
|
|
|
$errorstate = $COMPLETED if $errorstate == $RUNNING;
|
|
#$errorstate = $readererrorstate | $processorerrorstate;
|
|
$errorstate |= (_get_other_threads_state(\%errorstates,$tid) & ~$RUNNING);
|
|
|
|
tablethreadingdebug('restoring db connections ...',getlogger(__PACKAGE__));
|
|
|
|
#$db = &$get_db($reader_name,1);
|
|
if ($default_connection_reconnect) {
|
|
$default_connection = &$get_db(undef,1);
|
|
}
|
|
|
|
} else {
|
|
|
|
$blocksize //= _calc_blocksize($rowcount,scalar @fieldnames,0,undef);
|
|
#$db->db_disconnect();
|
|
#undef $db;
|
|
#$db = &$get_db($reader_name);
|
|
my $context = _create_process_context($static_context,{ tid => $tid, name => $name, });
|
|
my $rowblock_result = 1;
|
|
eval {
|
|
if (defined $init_process_context_code and 'CODE' eq ref $init_process_context_code) {
|
|
&$init_process_context_code($context);
|
|
}
|
|
|
|
$db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename
|
|
|
|
my $last_i;
|
|
my $i = 0;
|
|
while (1) {
|
|
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
|
|
$db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional;
|
|
my $rowblock = $db->db_get_rowblock($blocksize);
|
|
if ('CODE' eq ref $read_code) {
|
|
$rowblock = &$read_code($rowblock);
|
|
}
|
|
$db->db_finish() unless $db->rowblock_transactional;
|
|
my $realblocksize = scalar @$rowblock;
|
|
if ($realblocksize > 0) {
|
|
processing_rows($context,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
|
|
|
|
$rowblock_result = &$process_code($context,$rowblock,$i);
|
|
|
|
#$target_db->db_do_begin($insertstatement); #,$targettablename);
|
|
#$target_db->db_do_rowblock($rowblock);
|
|
#$target_db->db_finish();
|
|
$i += $realblocksize;
|
|
|
|
if (not $rowblock_result) {
|
|
last;
|
|
} elsif ($realblocksize < $blocksize) {
|
|
last unless $loop;
|
|
}
|
|
} else {
|
|
last unless $loop;
|
|
}
|
|
}
|
|
$db->db_finish() if $db->rowblock_transactional;
|
|
|
|
};
|
|
|
|
#print $@;
|
|
if ($@) {
|
|
$errorstate = $ERROR;
|
|
} else {
|
|
$errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
|
|
}
|
|
|
|
eval {
|
|
if (defined $uninit_process_context_code and 'CODE' eq ref $uninit_process_context_code) {
|
|
&$uninit_process_context_code($context);
|
|
}
|
|
};
|
|
$db->db_disconnect();
|
|
#undef $db;
|
|
|
|
}
|
|
|
|
#$db = &$get_db($controller_name,1);
|
|
|
|
if ($errorstate == $COMPLETED) {
|
|
tableprocessingdone($db,$tablename,$rowcount,getlogger(__PACKAGE__));
|
|
#$db->db_disconnect();
|
|
return 1;
|
|
} else {
|
|
#print "errorstate: $errorstate \n";
|
|
tableprocessingfailed($db,$tablename,$rowcount,getlogger(__PACKAGE__));
|
|
#$db->db_disconnect();
|
|
}
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
sub _calc_blocksize {
|
|
|
|
my ($rowcount,$columncount,$multithreaded,$threadqueuelength) = @_;
|
|
|
|
if (defined $rowcount and $rowcount > $minblocksize) {
|
|
|
|
my $exp = int ( log ($rowcount) / log(10.0) );
|
|
my $blocksize = int ( 10 ** $exp );
|
|
my $cellcount_in_memory = $columncount * $blocksize;
|
|
if ($multithreaded) {
|
|
$cellcount_in_memory *= get_threadqueuelength($threadqueuelength);
|
|
}
|
|
|
|
while ( $cellcount_in_memory > $cells_transfer_memory_limit or
|
|
$rowcount / $blocksize < $minnumberofchunks) {
|
|
$exp -= 1.0;
|
|
$blocksize = int ( 10 ** $exp );
|
|
$cellcount_in_memory = $columncount * $blocksize;
|
|
if ($multithreaded) {
|
|
$cellcount_in_memory *= get_threadqueuelength($threadqueuelength);
|
|
}
|
|
}
|
|
|
|
if ($blocksize < $minblocksize) {
|
|
return $minblocksize;
|
|
} elsif ($blocksize > $maxblocksize) {
|
|
return $maxblocksize;
|
|
} else {
|
|
return $blocksize;
|
|
}
|
|
|
|
} else {
|
|
|
|
return $minblocksize;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sub _get_other_threads_state {
|
|
my ($errorstates,$tid) = @_;
|
|
my $result = 0;
|
|
if (!defined $tid) {
|
|
$tid = threadid();
|
|
}
|
|
if (defined $errorstates and ref $errorstates eq 'HASH') {
|
|
lock $errorstates;
|
|
foreach my $threadid (keys %$errorstates) {
|
|
if ($threadid != $tid) {
|
|
$result |= $errorstates->{$threadid};
|
|
}
|
|
}
|
|
}
|
|
return $result;
|
|
}
|
|
|
|
sub is_shutdown {
|
|
my ($context) = @_;
|
|
my $errorstates;
|
|
my $state = 0;
|
|
$errorstates = $context->{errorstates} if $context;
|
|
if (defined $errorstates and ref $errorstates eq 'HASH') {
|
|
lock $errorstates;
|
|
foreach my $threadid (keys %$errorstates) {
|
|
$state |= $errorstates->{$threadid};
|
|
}
|
|
}
|
|
return 1 if $state & $ERROR;
|
|
return 1 if $state & $STOP;
|
|
return 0;
|
|
}
|
|
|
|
sub _get_stop_consumer_thread {
|
|
my ($context,$tid) = @_;
|
|
my $result = 1;
|
|
my $other_threads_state;
|
|
my $reader_state;
|
|
my $queuesize;
|
|
{
|
|
my $errorstates = $context->{errorstates};
|
|
lock $errorstates;
|
|
$other_threads_state = _get_other_threads_state($errorstates,$tid);
|
|
$reader_state = $errorstates->{$context->{readertid}};
|
|
}
|
|
$queuesize = $context->{queue}->pending();
|
|
if (($other_threads_state & $ERROR) == 0
|
|
and ($other_threads_state & $STOP) == 0
|
|
and ($queuesize > 0 or $reader_state == $RUNNING)) {
|
|
$result = 0;
|
|
#keep the consumer thread running if there is no defunct thread and queue is not empty or reader is still running
|
|
}
|
|
|
|
if ($result) {
|
|
tablethreadingdebug('[' . $tid . '] consumer thread is shutting down (' .
|
|
(($other_threads_state & $ERROR) == 0 ? 'no defunct thread(s)' : uc('defunct thread(s)')) . ', ' .
|
|
(($other_threads_state & $STOP) == 0 ? 'no thread(s) stopping by signal' : uc('thread(s) stopping by signal')) . ', ' .
|
|
($queuesize > 0 ? 'blocks pending' : uc('no blocks pending')) . ', ' .
|
|
($reader_state == $RUNNING ? 'reader thread running' : uc('reader thread not running')) . ') ...'
|
|
,getlogger(__PACKAGE__));
|
|
}
|
|
|
|
return $result;
|
|
|
|
}
|
|
|
|
sub _reader {
|
|
|
|
#my ($queue,$readererrorstate_ref,$writererrorstate_ref,$get_db,$tablename,$selectstatement,$blocksize,$rowcount,$logger,@values) = @_;
|
|
my $context = shift;
|
|
|
|
my $reader_db;
|
|
my $tid = threadid();
|
|
$context->{tid} = $tid;
|
|
{
|
|
lock $context->{errorstates};
|
|
$context->{errorstates}->{$tid} = $RUNNING;
|
|
}
|
|
|
|
tablethreadingdebug('[' . $tid . '] reader thread tid ' . $tid . ' started',getlogger(__PACKAGE__));
|
|
|
|
my $blockcount = 0;
|
|
eval {
|
|
$reader_db = &{$context->{get_db}}($context->{name}); #$reader_name);
|
|
$reader_db->db_get_begin($context->{selectstatement},@{$context->{values_ref}}) if $reader_db->rowblock_transactional; #$context->{tablename}
|
|
tablethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__));
|
|
while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up
|
|
#yield();
|
|
sleep($thread_sleep_secs);
|
|
}
|
|
my $last_i;
|
|
my $i = 0;
|
|
my $state = $RUNNING; #start at first
|
|
while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0 and ($state & $STOP) == 0) { #as long there is one running consumer and no defunct consumer
|
|
fetching_rows($reader_db,$context->{tablename},$i,$context->{blocksize},$context->{rowcount},getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
|
|
$reader_db->db_get_begin($context->{selectstatement},$i,$context->{blocksize},@{$context->{values_ref}}) unless $reader_db->rowblock_transactional;
|
|
my $rowblock = $reader_db->db_get_rowblock($context->{blocksize});
|
|
$reader_db->db_finish() unless $reader_db->rowblock_transactional;
|
|
my $realblocksize = scalar @$rowblock;
|
|
#my $packet = {rows => $rowblock,
|
|
# size => $realblocksize,
|
|
# #block => $i,
|
|
# row_offset => $i};
|
|
my %packet :shared = ();
|
|
if ('CODE' eq ref $context->{read_code}) {
|
|
$rowblock = &{$context->{read_code}}($rowblock);
|
|
}
|
|
$packet{rows} = $rowblock;
|
|
$packet{size} = $realblocksize;
|
|
$packet{row_offset} = $i;
|
|
if ($realblocksize > 0) {
|
|
$context->{queue}->enqueue(\%packet); #$packet);
|
|
$blockcount++;
|
|
#wait if thequeue is full and there there is one running consumer
|
|
while (((($state = _get_other_threads_state($context->{errorstates},$tid)) & $RUNNING) == $RUNNING) and $context->{queue}->pending() >= get_threadqueuelength($context->{threadqueuelength})) {
|
|
#yield();
|
|
sleep($thread_sleep_secs);
|
|
}
|
|
$i += $realblocksize;
|
|
if ($realblocksize < $context->{blocksize} and not $context->{loop}) {
|
|
tablethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data) ...',getlogger(__PACKAGE__));
|
|
last;
|
|
}
|
|
} elsif ($context->{loop}) {
|
|
sleep($loop_sleep_secs);
|
|
} else {
|
|
$context->{queue}->enqueue(\%packet); #$packet);
|
|
tablethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data - empty block) ...',getlogger(__PACKAGE__));
|
|
last;
|
|
}
|
|
}
|
|
if (not (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0 and ($state & $STOP) == 0)) {
|
|
tablethreadingdebug('[' . $tid . '] reader thread is shutting down (' .
|
|
(($state & $RUNNING) == $RUNNING ? 'still running consumer threads' : uc('no running consumer threads')) . ', ' .
|
|
(($state & $ERROR) == 0 ? 'no defunct thread(s)' : uc('defunct thread(s)')) . ', ' .
|
|
(($state & $STOP) == 0 ? 'no thread(s) stopping by signal' : uc('thread(s) stopping by signal')) . ') ...'
|
|
,getlogger(__PACKAGE__));
|
|
}
|
|
$reader_db->db_finish() if $reader_db->rowblock_transactional;
|
|
};
|
|
tablethreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
|
|
# stop the consumer:
|
|
# $context->{queue}->enqueue(undef);
|
|
if (defined $reader_db) {
|
|
# if thread cleanup has a problem...
|
|
$reader_db->db_disconnect();
|
|
}
|
|
if (defined $context->{destroy_dbs_code} and 'CODE' eq ref $context->{destroy_dbs_code}) {
|
|
&{$context->{destroy_dbs_code}}();
|
|
}
|
|
lock $context->{errorstates};
|
|
if ($@) {
|
|
$context->{errorstates}->{$tid} = $ERROR;
|
|
} elsif ($context->{errorstates}->{$tid} != $STOP) {
|
|
$context->{errorstates}->{$tid} = $COMPLETED;
|
|
}
|
|
return $context->{errorstates}->{$tid};
|
|
}
|
|
|
|
sub _writer {
|
|
|
|
my $context = shift;
|
|
|
|
#get_target_db
|
|
my $writer_db;
|
|
my $tid = threadid();
|
|
$context->{tid} = $tid;
|
|
{
|
|
lock $context->{errorstates};
|
|
$context->{errorstates}->{$tid} = $RUNNING;
|
|
}
|
|
tablethreadingdebug('[' . $tid . '] writer thread tid ' . $tid . ' started',getlogger(__PACKAGE__));
|
|
|
|
my $blockcount = 0;
|
|
eval {
|
|
$writer_db = &{$context->{get_target_db}}($writer_name); #$writer_name);
|
|
while (not _get_stop_consumer_thread($context,$tid)) {
|
|
my $packet = $context->{queue}->dequeue_nb();
|
|
if (defined $packet) {
|
|
if ($packet->{size} > 0) {
|
|
writing_rows($writer_db,$context->{targettablename},$packet->{row_offset},$packet->{size},$context->{rowcount},getlogger(__PACKAGE__));
|
|
|
|
$writer_db->db_do_begin($context->{insertstatement}); #,$context->{targettablename});
|
|
$writer_db->db_do_rowblock($packet->{rows});
|
|
$writer_db->db_finish();
|
|
$blockcount++;
|
|
|
|
} elsif ($context->{loop}) {
|
|
sleep($loop_sleep_secs);
|
|
} else { #empty packet received
|
|
tablethreadingdebug('[' . $tid . '] shutting down writer thread (end of data - empty block) ...',getlogger(__PACKAGE__));
|
|
last;
|
|
}
|
|
} else {
|
|
#yield();
|
|
sleep($thread_sleep_secs); #2015-01
|
|
}
|
|
}
|
|
};
|
|
tablethreadingdebug($@ ? '[' . $tid . '] writer thread error: ' . $@ : '[' . $tid . '] writer thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
|
|
if (defined $writer_db) {
|
|
# if thread cleanup has a problem...
|
|
$writer_db->db_disconnect();
|
|
}
|
|
if (defined $context->{destroy_dbs_code} and 'CODE' eq ref $context->{destroy_dbs_code}) {
|
|
&{$context->{destroy_dbs_code}}();
|
|
}
|
|
lock $context->{errorstates};
|
|
if ($@) {
|
|
$context->{errorstates}->{$tid} = $ERROR;
|
|
} elsif ($context->{errorstates}->{$tid} != $STOP) {
|
|
$context->{errorstates}->{$tid} = $COMPLETED;
|
|
}
|
|
return $context->{errorstates}->{$tid};
|
|
}
|
|
|
|
sub _process {
|
|
|
|
my $context = shift;
|
|
|
|
#my $writer_db;
|
|
my $rowblock_result = 1;
|
|
my $tid = threadid();
|
|
$context->{tid} = $tid;
|
|
{
|
|
lock $context->{errorstates};
|
|
$context->{errorstates}->{$tid} = $RUNNING;
|
|
}
|
|
|
|
tablethreadingdebug('[' . $tid . '] processor thread tid ' . $tid . ' started',getlogger(__PACKAGE__));
|
|
|
|
my $blockcount = 0;
|
|
eval {
|
|
if (defined $context->{init_process_context_code} and 'CODE' eq ref $context->{init_process_context_code}) {
|
|
&{$context->{init_process_context_code}}($context);
|
|
}
|
|
#$writer_db = &{$context->{get_target_db}}($writer_name);
|
|
while (not _get_stop_consumer_thread($context,$tid)) {
|
|
my $packet = $context->{queue}->dequeue_nb();
|
|
if (defined $packet) {
|
|
if ($packet->{size} > 0) {
|
|
|
|
#writing_rows($writer_db,$context->{targettablename},$i,$realblocksize,$context->{rowcount},getlogger(__PACKAGE__));
|
|
|
|
#$writer_db->db_do_begin($context->{insertstatement}); #,$context->{targettablename});
|
|
#$writer_db->db_do_rowblock($rowblock);
|
|
#$writer_db->db_finish();
|
|
|
|
#$i += $realblocksize;
|
|
|
|
processing_rows($context,$packet->{row_offset},$packet->{size},$context->{rowcount},getlogger(__PACKAGE__));
|
|
|
|
$rowblock_result = &{$context->{process_code}}($context,$packet->{rows},$packet->{row_offset});
|
|
|
|
$blockcount++;
|
|
|
|
#$i += $realblocksize;
|
|
|
|
if (not $rowblock_result) {
|
|
tablethreadingdebug('[' . $tid . '] shutting down processor thread (processing block NOK) ...',getlogger(__PACKAGE__));
|
|
last;
|
|
}
|
|
|
|
} elsif ($context->{loop}) {
|
|
sleep($loop_sleep_secs);
|
|
} else {
|
|
tablethreadingdebug('[' . $tid . '] shutting down processor thread (end of data - empty block) ...',getlogger(__PACKAGE__));
|
|
last;
|
|
}
|
|
} else {
|
|
#yield();
|
|
sleep($thread_sleep_secs); #2015-01
|
|
}
|
|
}
|
|
|
|
};
|
|
my $err = $@;
|
|
tablethreadingdebug($err ? '[' . $tid . '] processor thread error: ' . $err : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
|
|
eval {
|
|
if (defined $context->{uninit_process_context_code} and 'CODE' eq ref $context->{uninit_process_context_code}) {
|
|
&{$context->{uninit_process_context_code}}($context);
|
|
}
|
|
};
|
|
lock $context->{errorstates};
|
|
if ($err) {
|
|
$context->{errorstates}->{$tid} = $ERROR;
|
|
} elsif ($context->{errorstates}->{$tid} != $STOP) {
|
|
$context->{errorstates}->{$tid} = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
|
|
}
|
|
return $context->{errorstates}->{$tid};
|
|
}
|
|
|
|
sub _create_process_context {
|
|
|
|
my $context = {};
|
|
foreach my $ctx (@_) {
|
|
if (defined $ctx and 'HASH' eq ref $ctx) {
|
|
foreach my $key (keys %$ctx) {
|
|
$context->{$key} = $ctx->{$key};
|
|
#delete $ctx->{$key};
|
|
}
|
|
}
|
|
}
|
|
return $context;
|
|
|
|
}
|
|
|
|
1;
|