You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bulk-processor/lib/NGCP/BulkProcessor/SqlProcessor.pm

1707 lines
68 KiB

package NGCP::BulkProcessor::SqlProcessor;
use strict;
## no critic
use threads qw(yield);
use threads::shared;
use Thread::Queue;
#use Thread::Semaphore;
use Time::HiRes qw(sleep);
use NGCP::BulkProcessor::Globals qw(
$enablemultithreading
$cpucount
get_threadqueuelength
$cells_transfer_memory_limit
$transfer_defer_indexes
);
use NGCP::BulkProcessor::Logging qw(
getlogger
fieldnamesacquired
primarykeycolsacquired
tableinfoscleared
tablefixed
tabletransferstarted
tableprocessingstarted
rowtransferstarted
rowtransferred
rowinserted
rowupdated
rowsdeleted
totalrowsdeleted
rowinsertskipped
rowupdateskipped
tabletransferdone
tableprocessingdone
rowtransferdone
fetching_rows
writing_rows
processing_rows
tablethreadingdebug
enable_threading_info
);
use NGCP::BulkProcessor::LogError qw(
fieldnamesdiffer
transferzerorowcount
processzerorowcount
deleterowserror
tabletransferfailed
tableprocessingfailed
);
use NGCP::BulkProcessor::Table qw();
use NGCP::BulkProcessor::Array qw(setcontains contains);
use NGCP::BulkProcessor::Utils qw(round threadid);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
init_record
copy_row
transfer_table
process_table
cleartableinfo
checktableinfo
registertableinfo
create_targettable
insert_record
update_record
delete_record
delete_records
insert_stmt
is_shutdown
);
#transfer_record
#transfer_records
my $table_names = {};
my $table_expected_fieldnames = {};
my $table_fieldnames_cached = {};
my $table_primarykeys = {};
my $table_target_indexes = {};
#my $logger = getlogger(__PACKAGE__);
my $tabletransfer_threadqueuelength = 5; #100; #30; #5; # ... >= 1
my $minblocksize = 100;
my $maxblocksize = 100000;
my $minnumberofchunks = 10;
my $tableprocessing_threadqueuelength = 10;
#my $tableprocessing_threads = $cpucount; #3;
my $reader_name = 'reader';
my $writer_name = 'writer';
my $thread_sleep_secs = 0.1;
my $loop_sleep_secs = 1;
my $RUNNING = 1;
my $COMPLETED = 2;
my $ERROR = 4;
my $STOP = 8;
sub init_record {
my ($record,$class,$get_db,$tablename,$expected_fieldnames,$target_indexes) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
checktableinfo($db,$class,$tablename,$expected_fieldnames,$target_indexes);
if (defined $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}) { # and ref $table_fieldnames_cached->{$connectidentifier}->{$tablename} eq 'ARRAY') {
# if there are fieldnames defined, we make a member variable for each and set it to undef
foreach my $fieldname (@{$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class}}) {
$record->{$fieldname} = undef;
}
}
return $record;
}
sub copy_row {
my ($record,$row,$expected_fieldnames) = @_;
if (defined $record and defined $row) {
my $i;
if (ref $row eq 'ARRAY') {
$i = 0;
} elsif (ref $row eq 'HASH') {
$i = -1;
} elsif (ref $row eq ref $record) {
$i = -2;
} else {
$i = -3;
}
foreach my $fieldname (@$expected_fieldnames) {
if ($i >= 0) {
$record->{$fieldname} = $row->[$i];
$i++;
} elsif ($i == -1 or $i == -2) {
if (exists $row->{$fieldname}) {
$record->{$fieldname} = $row->{$fieldname};
} elsif (exists $row->{uc($fieldname)}) {
$record->{$fieldname} = $row->{uc($fieldname)};
} else {
$record->{$fieldname} = undef;
}
} else {
last;
}
}
}
return $record;
}
sub cleartableinfo {
my $get_db = shift;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $tid = threadid();
my $connectidentifier = $db->connectidentifier();
my $found = 0;
if (exists $table_names->{$tid}) {
if (exists $table_names->{$tid}->{$connectidentifier}) {
delete $table_names->{$tid}->{$connectidentifier};
$found = 1;
}
}
if (exists $table_expected_fieldnames->{$tid}) {
if (exists $table_expected_fieldnames->{$tid}->{$connectidentifier}) {
delete $table_expected_fieldnames->{$tid}->{$connectidentifier};
$found = 1;
}
}
if (exists $table_fieldnames_cached->{$tid}){
if (exists $table_fieldnames_cached->{$tid}->{$connectidentifier}) {
delete $table_fieldnames_cached->{$tid}->{$connectidentifier};
$found = 1;
}
}
if (exists $table_primarykeys->{$tid}) {
if (exists $table_primarykeys->{$tid}->{$connectidentifier}) {
delete $table_primarykeys->{$tid}->{$connectidentifier};
$found = 1;
}
}
if (exists $table_target_indexes->{$tid}) {
if (exists $table_target_indexes->{$tid}->{$connectidentifier}) {
delete $table_target_indexes->{$tid}->{$connectidentifier};
$found = 1;
}
}
if ((scalar keys %{$table_names->{$tid}}) == 0) {
delete $table_names->{$tid};
$found = 1;
}
if ((scalar keys %{$table_expected_fieldnames->{$tid}}) == 0) {
delete $table_expected_fieldnames->{$tid};
$found = 1;
}
if ((scalar keys %{$table_fieldnames_cached->{$tid}}) == 0) {
delete $table_fieldnames_cached->{$tid};
$found = 1;
}
if ((scalar keys %{$table_primarykeys->{$tid}}) == 0) {
delete $table_primarykeys->{$tid};
$found = 1;
}
if ((scalar keys %{$table_target_indexes->{$tid}}) == 0) {
delete $table_target_indexes->{$tid};
$found = 1;
}
if ($found) {
tableinfoscleared($db,getlogger(__PACKAGE__));
}
}
sub registertableinfo { # to prepare creation of non-existent tables..
my ($get_db,$class,$tablename,$fieldnames,$indexes,$keycols) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
if (not exists $table_names->{$tid}) {
$table_names->{$tid} = {};
}
if (not exists $table_names->{$tid}->{$connectidentifier}) {
$table_names->{$tid}->{$connectidentifier} = {};
}
$table_names->{$tid}->{$connectidentifier}->{$class} = $tablename;
if (not exists $table_expected_fieldnames->{$tid}) {
$table_expected_fieldnames->{$tid} = {};
}
if (not exists $table_expected_fieldnames->{$tid}->{$connectidentifier}) {
# create an empty category for the connection if none exists yet:
$table_expected_fieldnames->{$tid}->{$connectidentifier} = {};
}
# we prefer to always update the expected fieldnames (that come from a derived class)
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = $fieldnames // [];
if (not exists $table_fieldnames_cached->{$tid}) {
$table_fieldnames_cached->{$tid} = {};
}
if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}) {
# create an empty fieldname cache for the connection if none exists yet:
$table_fieldnames_cached->{$tid}->{$connectidentifier} = {};
}
if (not exists $table_primarykeys->{$tid}) {
$table_primarykeys->{$tid} = {};
}
if (not exists $table_primarykeys->{$tid}->{$connectidentifier}) {
# create an empty primary key column name cache for the connection if none exists yet:
$table_primarykeys->{$tid}->{$connectidentifier} = {};
}
$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = $keycols // [];
if (not exists $table_target_indexes->{$tid}) {
$table_target_indexes->{$tid} = {};
}
if (not exists $table_target_indexes->{$tid}->{$connectidentifier}) {
# create an empty index set list for target tables for the connection if none exists yet:
$table_target_indexes->{$tid}->{$connectidentifier} = {};
}
# we prefer to always update the target table indexes (that come from a derived class)
$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = $indexes // {};
}
sub checktableinfo {
my ($get_db,$class,$tablename,$expected_fieldnames,$target_indexes) = @_;
my $result = 1;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
if (not exists $table_names->{$tid}) {
$table_names->{$tid} = {};
}
if (not exists $table_names->{$tid}->{$connectidentifier}) {
$table_names->{$tid}->{$connectidentifier} = {};
}
$table_names->{$tid}->{$connectidentifier}->{$class} = $tablename;
if (not exists $table_expected_fieldnames->{$tid}) {
#$table_expected_fieldnames->{$tid} = shared_clone({});
$table_expected_fieldnames->{$tid} = {};
}
if (not exists $table_expected_fieldnames->{$tid}->{$connectidentifier}) {
# create an empty category for the connection if none exists yet:
#$table_expected_fieldnames->{$tid}->{$connectidentifier} = shared_clone({});
$table_expected_fieldnames->{$tid}->{$connectidentifier} = {};
}
# we prefer to always update the expected fieldnames (that come from a derived class)
#$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = shared_clone($expected_fieldnames);
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class} = $expected_fieldnames // [];
if (not exists $table_fieldnames_cached->{$tid}) {
#$table_fieldnames_cached->{$tid} = shared_clone({});
$table_fieldnames_cached->{$tid} = {};
}
if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}) {
# create an empty fieldname cache for the connection if none exists yet:
#$table_fieldnames_cached->{$tid}->{$connectidentifier} = shared_clone({});
$table_fieldnames_cached->{$tid}->{$connectidentifier} = {};
}
if (not exists $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}) {
# query the database for fieldnames of the table if we don't have a cache entry yet:
my $fieldnames = $db->getfieldnames($tablename);
if (!defined $fieldnames
or (scalar @{$fieldnames}) == 0
or setcontains($table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$fieldnames,1)) {
#fieldnames are case insensitive in general
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class} = { fieldnames => $fieldnames, ok => 1, };
fieldnamesacquired($db,$tablename,getlogger(__PACKAGE__));
} else {
# otherwise we log a failure (exit? - see Logging Module)
#$table_fieldnames_cached->{$connectidentifier}->{$tablename} = {}; #$fieldnames;
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class} = { fieldnames => $fieldnames, ok => 0, };
fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$fieldnames,getlogger(__PACKAGE__));
$result = 0;
}
} elsif (not $table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}->{ok}) {
fieldnamesdiffer($db,$tablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},
$table_fieldnames_cached->{$tid}->{$connectidentifier}->{$class}->{fieldnames},getlogger(__PACKAGE__));
$result = 0;
}
if (not exists $table_primarykeys->{$tid}) {
#$table_primarykeys->{$tid} = shared_clone({});
$table_primarykeys->{$tid} = {};
}
if (not exists $table_primarykeys->{$tid}->{$connectidentifier}) {
# create an empty primary key column name cache for the connection if none exists yet:
#$table_primarykeys->{$tid}->{$connectidentifier} = shared_clone({});
$table_primarykeys->{$tid}->{$connectidentifier} = {};
}
if (not exists $table_primarykeys->{$tid}->{$connectidentifier}->{$class}) {
# query the database for primary keys of the table if we don't have them cached yet:
#$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = shared_clone($db->getprimarykeycols($class));
$table_primarykeys->{$tid}->{$connectidentifier}->{$class} = $db->getprimarykeycols($tablename);
primarykeycolsacquired($db,$tablename,$table_primarykeys->{$tid}->{$connectidentifier}->{$class},getlogger(__PACKAGE__));
}
if (not exists $table_target_indexes->{$tid}) {
#$table_target_indexes->{$tid} = shared_clone({});
$table_target_indexes->{$tid} = {};
}
if (not exists $table_target_indexes->{$tid}->{$connectidentifier}) {
# create an empty index set list for target tables for the connection if none exists yet:
#$table_target_indexes->{$tid}->{$connectidentifier} = shared_clone({});
$table_target_indexes->{$tid}->{$connectidentifier} = {};
}
# we prefer to always update the target table indexes (that come from a derived class)
#$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = shared_clone($target_indexes);
$table_target_indexes->{$tid}->{$connectidentifier}->{$class} = $target_indexes // {};
return $result;
}
sub create_targettable {
my ($get_db,$class,$get_target_db,$targetclass,$targettablename,$truncate,$defer_indexes,$texttable_engine) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
#my $targettablename = _gettargettablename($db,$tablename,$target_db);
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
if ($truncate and $defer_indexes) {
$target_db->drop_table($targettablename);
}
my $result = $target_db->create_texttable($targettablename,
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},
$table_primarykeys->{$tid}->{$connectidentifier}->{$class},
$table_target_indexes->{$tid}->{$connectidentifier}->{$class},
# 'ifnotexists' is always true
$truncate,
$defer_indexes,
$texttable_engine);
checktableinfo($target_db,$targetclass,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$defer_indexes ? undef : $table_target_indexes->{$tid}->{$connectidentifier}->{$class});
return $result;
}
sub delete_records {
my ($get_db,$get_xa_db,$class,$keyfields,$equal,$vals_table) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
if (defined $expected_fieldnames and
(defined $keyfields and
ref $keyfields eq 'ARRAY') and
(defined $vals_table and
ref $vals_table eq 'NGCP::BulkProcessor::Table')) {
my @fields = @$keyfields;
my $field_cnt = scalar @fields;
my $total_rowcount = 0;
my $initial_rowcount = $xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename));
if ($field_cnt > 0) {
my $where_clause;
if ($equal) {
$where_clause = ' WHERE ' . join(' = ? AND ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fields) . ' = ?';
my $count_stmt = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . $where_clause;
my $delete_stmt = 'DELETE FROM ' . $db->tableidentifier($tablename) . $where_clause;
for (my $i = 0; $i < $vals_table->rowcount(); $i++) {
my @vals = $vals_table->getrow($i);
my $new_initial_rowcount = $xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename));
my $rowcount = $xa_db->db_get_value($count_stmt,@vals);
$xa_db->db_do($delete_stmt,@vals);
rowsdeleted($db,$tablename,$rowcount,$new_initial_rowcount,getlogger(__PACKAGE__));
$total_rowcount += $rowcount;
}
} elsif ($field_cnt == 1) {
my @ne_vals = $vals_table->getcol(0);
$where_clause = ' WHERE ' . $db->columnidentifier($fields[0]) . ' NOT IN (' . substr(',?' x scalar @ne_vals,1) . ')';
my $count_stmt = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . $where_clause;
my $delete_stmt = 'DELETE FROM ' . $db->tableidentifier($tablename) . $where_clause;
my $rowcount = $xa_db->db_get_value($count_stmt,@ne_vals);
$xa_db->db_do($delete_stmt,@ne_vals);
rowsdeleted($db,$tablename,$rowcount,$initial_rowcount,getlogger(__PACKAGE__));
$total_rowcount += $rowcount;
} else {
deleterowserror($db,$tablename,'deletings rows by complementary identifier values works with a single identifier column only',getlogger(__PACKAGE__));
return;
}
} else {
my $delete_stmt = 'DELETE FROM ' . $db->tableidentifier($tablename);
my $count_stmt = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename);
my $rowcount = $xa_db->db_get_value($count_stmt);
$xa_db->db_do($delete_stmt);
rowsdeleted($db,$tablename,$rowcount,$initial_rowcount,getlogger(__PACKAGE__));
$total_rowcount += $rowcount;
}
$xa_db->vacuum($tablename);
#if ($total_rowcount > 0) {
totalrowsdeleted($db,$tablename,$total_rowcount,$initial_rowcount,getlogger(__PACKAGE__));
#}
return $total_rowcount;
}
}
sub insert_record {
my ($get_db,$get_xa_db,$class,$row,$insert_ignore,$unique_count_fields) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
#my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
if (defined $expected_fieldnames and defined $row) {
my @fields = ();
my @vals = ();
foreach my $fieldname (@$expected_fieldnames) {
if (exists $row->{$fieldname}) {
push @fields,$fieldname;
push @vals,$row->{$fieldname};
}
}
my @unique_fields = ();
my @unique_vals = ();
if (defined $unique_count_fields and 'ARRAY' eq ref $unique_count_fields) {
foreach my $fieldname (@$unique_count_fields) {
push(@unique_fields,$fieldname);
if (exists $row->{$fieldname}) {
push @unique_vals,$row->{$fieldname};
} else {
push @unique_vals,undef;
}
}
}
my $stmt = 'INSERT ' . ($insert_ignore ? $db->insert_ignore_phrase() . ' ' : '') . 'INTO ' . $db->tableidentifier($tablename) . ' (' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fields) . ') VALUES (' . substr(',?' x scalar @fields,1) . ')';
if ((scalar @unique_fields) > 0) {
if ($xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . ' WHERE ' . join(' = ? AND ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @unique_fields) . ' = ?',@unique_vals) == 0) {
if ($xa_db->db_do($stmt,@vals)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return 1;
} else {
rowinsertskipped($db,$tablename,getlogger(__PACKAGE__));
return 0;
}
} else {
rowinsertskipped($db,$tablename,getlogger(__PACKAGE__));
return 0;
}
} else {
if ($xa_db->db_do($stmt,@vals)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return 1;
} else {
rowinsertskipped($db,$tablename,getlogger(__PACKAGE__));
return 0;
}
}
}
}
sub delete_record {
my ($get_db,$get_xa_db,$class,$row) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
if (defined $expected_fieldnames and defined $row) {
my @pk_fields = ();
my @pk_vals = ();
my $full_pk = 0;
if (defined $primarykeys) {
$full_pk = ((scalar @$primarykeys) > 0 ? 1 : 0);
foreach my $fieldname (@$primarykeys) {
if (exists $row->{$fieldname}) {
push(@pk_fields,$fieldname);
push(@pk_vals,delete $row->{$fieldname});
} else {
$full_pk &= 0;
}
}
}
if ((scalar @pk_fields) == 0) {
foreach my $fieldname (@$expected_fieldnames) {
if (exists $row->{$fieldname}) {
push @pk_fields,$fieldname;
push @pk_vals,$row->{$fieldname};
}
}
}
my $selectpk_fieldnames = join(' = ? AND ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @pk_fields);
my $stmt = 'DELETE FROM ' . $db->tableidentifier($tablename) . ' WHERE ' . $selectpk_fieldnames . ' = ?';
if ($full_pk) {
if ($xa_db->db_do($stmt,@pk_vals)) {
rowsdeleted($db,$tablename,1,1,getlogger(__PACKAGE__));
return 1;
} else {
rowsdeleted($db,$tablename,0,0,getlogger(__PACKAGE__));
return 0;
}
} else {
my $count = $xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . ' WHERE ' . $selectpk_fieldnames . ' = ?',@pk_vals);
if ($count == 1) {
my $affected;
if ($affected = $xa_db->db_do($stmt,@pk_vals)) {
rowsdeleted($db,$tablename,$affected,$count,getlogger(__PACKAGE__));
return 1;
} else {
rowsdeleted($db,$tablename,0,$count,getlogger(__PACKAGE__));
return 0;
}
} else {
rowsdeleted($db,$tablename,0,$count,getlogger(__PACKAGE__));
return 0;
}
}
}
}
sub update_record {
my ($get_db,$get_xa_db,$class,$row) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $xa_db = (defined $get_xa_db ? (ref $get_xa_db eq 'CODE') ? &$get_xa_db() : $get_xa_db : $db);
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $primarykeys = $table_primarykeys->{$tid}->{$connectidentifier}->{$class};
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
if (defined $expected_fieldnames and defined $row) {
my %data = %$row;
my @pk_fields = ();
my @pk_vals = ();
my $full_pk = 0;
if (defined $primarykeys) {
$full_pk = ((scalar @$primarykeys) > 0 ? 1 : 0);
foreach my $fieldname (@$primarykeys) {
if (exists $data{$fieldname}) {
push(@pk_fields,$fieldname);
push(@pk_vals,delete $data{$fieldname});
} else {
$full_pk &= 0;
}
}
}
my @fields = ();
my @vals = ();
foreach my $fieldname (@$expected_fieldnames) {
if (exists $data{$fieldname}) {
push @fields,$fieldname;
push @vals,$data{$fieldname};
}
}
if ((scalar @pk_fields) == 0) {
@pk_fields = @fields;
@pk_vals = @vals;
}
my $selectpk_fieldnames = join(' = ? AND ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @pk_fields);
my $stmt = 'UPDATE ' . $db->tableidentifier($tablename) . ' SET ' . join(' = ?, ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fields) . ' = ? WHERE ' . $selectpk_fieldnames . ' = ?';
if ($full_pk) {
if ($xa_db->db_do($stmt,@vals,@pk_vals)) {
rowupdated($db,$tablename,getlogger(__PACKAGE__));
return 1;
} else {
rowupdateskipped($db,$tablename,0,getlogger(__PACKAGE__));
return 0;
}
} else {
my $count = $xa_db->db_get_value('SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename) . ' WHERE ' . $selectpk_fieldnames . ' = ?',@pk_vals);
if ($count == 1) {
if ($xa_db->db_do($stmt,@vals,@pk_vals)) {
rowupdated($db,$tablename,getlogger(__PACKAGE__));
return 1;
} else {
rowupdateskipped($db,$tablename,0,getlogger(__PACKAGE__));
return 0;
}
} else {
rowupdateskipped($db,$tablename,$count,getlogger(__PACKAGE__));
return 0;
}
}
}
}
sub insert_stmt {
my ($get_db,$class,$insert_ignore,$exclude_identity_fieldnames,$row_count) = @_;
my $db = (ref $get_db eq 'CODE') ? &$get_db() : $get_db;
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
if ($exclude_identity_fieldnames) {
$expected_fieldnames = [ grep { not contains($_,$exclude_identity_fieldnames); } @$expected_fieldnames ];
}
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
$row_count //= 1;
my $values = join ", ", ('(' . substr(',?' x scalar @$expected_fieldnames,1) . ')') x $row_count;
return 'INSERT ' . ($insert_ignore ? $db->insert_ignore_phrase() . ' ' : '') . 'INTO ' . $db->tableidentifier($tablename) . ' (' .
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) .
') VALUES ' . $values;
}
sub transfer_table {
my %params = @_;
my ($get_db,
$class,
$get_target_db,
$targetclass,
$targettablename,
$truncate_targettable,
$create_indexes,
$texttable_engine,
$fixtable_statements,
$multithreading,
$blocksize,
$destroy_source_dbs_code,
$destroy_target_dbs_code,
$selectcount,
$select,
$loop,
$values) = @params{qw/
get_db
class
get_target_db
targetclass
targettablename
truncate_targettable
create_indexes
texttable_engine
fixtable_statements
multithreading
blocksize
destroy_source_dbs_code
destroy_target_dbs_code
selectcount
select
loop
values
/};
#my ($get_db,$tablename,$get_target_db,$targettablename,$truncate_targettable,$create_indexes,$texttable_engine,$fixtable_statements,$selectcount,$select,@values) = @_;
if (ref $get_db eq 'CODE' and ref $get_target_db eq 'CODE') {
my $db = &$get_db($reader_name,1); # $reader_name
my $target_db = &$get_target_db($writer_name); #$writer_name);
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
my $countstatement;
if (defined $selectcount) {
$countstatement = $selectcount;
} else {
$countstatement = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename);
}
my $rowcount;
unless ($loop) {
$rowcount = $db->db_get_value($countstatement,@$values);
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
if ($rowcount > 0) {
tabletransferstarted($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
} else {
transferzerorowcount($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
return 1;
}
} else {
tabletransferstarted($db,$tablename,$target_db,$targettablename,undef,getlogger(__PACKAGE__));
}
my $errorstate = $RUNNING; # 1;
$create_indexes = ((defined $create_indexes) ? $create_indexes : $transfer_defer_indexes);
if (create_targettable($db,$class,$target_db,$targetclass,$targettablename,$truncate_targettable,$create_indexes,$texttable_engine)) {
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my @fieldnames = @$expected_fieldnames;
#my $setfieldnames = join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames);
my $valueplaceholders = substr(',?' x scalar @fieldnames,1);
my $selectstatement;
if (length($select) > 0) {
$selectstatement = $select;
} else {
$selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename);
}
my $insertstatement = 'INSERT INTO ' . $target_db->tableidentifier($targettablename) . ' (' . join(', ',map { local $_ = $_; $_ = $target_db->columnidentifier($_); $_; } @fieldnames) . ') VALUES (' . $valueplaceholders . ')';
if ($enablemultithreading and $multithreading and $db->multithreading_supported() and $target_db->multithreading_supported() and $cpucount > 1) { # and $multithreaded) { # definitely no multithreading when CSVDB is involved
$blocksize //= _calc_blocksize($rowcount,scalar @fieldnames,1,$tabletransfer_threadqueuelength);
my $reader;
my $writer;
my %errorstates :shared = ();
#$errorstates{$tid} = $errorstate;
#my $readererrorstate :shared = 1;
#my $writererrorstate :shared = 1;
my $queue = Thread::Queue->new();
tablethreadingdebug('shutting down db connections ...',getlogger(__PACKAGE__));
$db->db_disconnect();
#undef $db;
$target_db->db_disconnect();
#undef $target_db;
my $default_connection = &$get_db(undef,0);
my $default_connection_reconnect = $default_connection->is_connected();
$default_connection->db_disconnect();
tablethreadingdebug('starting reader thread',getlogger(__PACKAGE__));
$reader = threads->create(\&_reader,
{ name => $reader_name,
queue => $queue,
errorstates => \%errorstates,
#readererrorstate_ref => \$readererrorstate,
#writererrorstate_ref => \$writererrorstate,
threadqueuelength => $tabletransfer_threadqueuelength,
get_db => $get_db,
tablename => $tablename,
class => $class,
selectstatement => $selectstatement,
blocksize => $blocksize,
rowcount => $rowcount,
loop => $loop,
#logger => $logger,
values_ref => $values,
destroy_dbs_code => $destroy_source_dbs_code,
});
tablethreadingdebug('starting writer thread',getlogger(__PACKAGE__));
$writer = threads->create(\&_writer,
{ #name => $writer_name,
queue => $queue,
errorstates => \%errorstates,
readertid => $reader->tid(),
#readererrorstate_ref => \$readererrorstate,
#writererrorstate_ref => \$writererrorstate,
get_target_db => $get_target_db,
targettablename => $targettablename,
targetclass => $targetclass,
insertstatement => $insertstatement,
blocksize => $blocksize,
rowcount => $rowcount,
loop => $loop,
#logger => $logger,
destroy_dbs_code => $destroy_target_dbs_code,
});
my $signal_handler = sub {
my $tid = threadid();
$errorstate = $STOP;
enable_threading_info(1);
tablethreadingdebug("[$tid] interrupt signal received",getlogger(__PACKAGE__));
#print("[$tid] interrupt signal received");
#_info($context,"interrupt signal received");
#$result = 0;
my $errorstates = \%errorstates;
lock $errorstates;
$errorstates->{$tid} = $STOP;
};
local $SIG{TERM} = $signal_handler;
local $SIG{INT} = $signal_handler;
local $SIG{QUIT} = $signal_handler;
local $SIG{HUP} = $signal_handler;
$reader->join();
tablethreadingdebug('reader thread joined',getlogger(__PACKAGE__));
$writer->join();
tablethreadingdebug('writer thread joined',getlogger(__PACKAGE__));
$errorstate = $COMPLETED if $errorstate == $RUNNING;
#$errorstate = $readererrorstate | $writererrorstate;
$errorstate |= _get_other_threads_state(\%errorstates,$tid);
tablethreadingdebug('restoring db connections ...',getlogger(__PACKAGE__));
#$db = &$get_db($reader_name,1);
$target_db = &$get_target_db(undef,1);
if ($default_connection_reconnect) {
$default_connection = &$get_db(undef,1);
}
} else {
$blocksize //= _calc_blocksize($rowcount,scalar @fieldnames,0,undef);
#$db->db_disconnect();
#undef $db;
#$db = &$get_db($reader_name);
#$target_db->db_disconnect();
#undef $target_db;
#$target_db = &$get_target_db($writer_name);
eval {
$db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename
my $last_i;
my $i = 0;
while (1) {
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
$db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional;
my $rowblock = $db->db_get_rowblock($blocksize);
$db->db_finish() unless $db->rowblock_transactional;
my $realblocksize = scalar @$rowblock;
if ($realblocksize > 0) {
writing_rows($target_db,$targettablename,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
$target_db->db_do_begin($insertstatement); #,$targettablename);
$target_db->db_do_rowblock($rowblock);
$target_db->db_finish();
$i += $realblocksize;
#foreach my $row (@$rowblock) {
# undef $row;
#}
#undef $rowblock;
if ($realblocksize < $blocksize) {
last unless $loop;
}
} else {
last unless $loop;
}
}
$db->db_finish() if $db->rowblock_transactional;
};
if ($@) {
$errorstate = $ERROR;
} else {
$errorstate = $COMPLETED;
}
$db->db_disconnect();
#undef $db;
#$target_db->db_disconnect();
#undef $target_db;
}
#$db = &$get_db($controller_name,1);
#$target_db = &$get_target_db($controller_name,1);
if ($errorstate == $COMPLETED and ref $fixtable_statements eq 'ARRAY' and (scalar @$fixtable_statements) > 0) {
eval {
foreach my $fixtable_statement (@$fixtable_statements) {
if (ref $fixtable_statement eq '') {
$target_db->db_do($fixtable_statement);
tablefixed($target_db,$targettablename,$fixtable_statement,getlogger(__PACKAGE__));
} else {
$fixtable_statement = &$fixtable_statement($target_db->tableidentifier($targettablename));
$target_db->db_do($fixtable_statement);
tablefixed($target_db,$targettablename,$fixtable_statement,getlogger(__PACKAGE__));
}
}
};
if ($@) {
$errorstate = $ERROR;
#} else {
# $errorstate = $COMPLETED;
}
}
if ($errorstate == $COMPLETED and $create_indexes) {
eval {
$target_db->create_primarykey($targettablename,
$table_primarykeys->{$tid}->{$connectidentifier}->{$class},
$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class});
$target_db->create_indexes($targettablename,
$table_target_indexes->{$tid}->{$connectidentifier}->{$class},
$table_primarykeys->{$tid}->{$connectidentifier}->{$class});
delete $table_primarykeys->{$tid}->{$target_db->connectidentifier()}->{$targetclass};
checktableinfo($target_db,$targetclass,$targettablename,$table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class},$table_target_indexes->{$tid}->{$connectidentifier}->{$class});
$target_db->vacuum($targettablename);
};
if ($@) {
$errorstate = $ERROR;
#} else {
# $errorstate = $COMPLETED;
}
}
}
if ($errorstate == $COMPLETED) {
tabletransferdone($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
#$db->db_disconnect();
#$target_db->db_disconnect();
return 1;
} else {
tabletransferfailed($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
#$db->db_disconnect();
#$target_db->db_disconnect();
}
}
return 0;
}
sub process_table {
my %params = @_;
my ($get_db,
$name,
$class,
$process_code,
$read_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$blocksize,
$tableprocessing_threads,
$destroy_reader_dbs_code,
$selectcount,
$select,
$loop,
$values) = @params{qw/
get_db
name
class
process_code
read_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
blocksize
tableprocessing_threads
destroy_reader_dbs_code
selectcount
select
loop
values
/};
#my ($get_db,$tablename,$process_code,$init_process_context_code,$uninit_process_context_code,$multithreading,$selectcount,$select,@values) = @_;
if (ref $get_db eq 'CODE') {
my $db = &$get_db($name,1); #$reader_name
my $connectidentifier = $db->connectidentifier();
my $tid = threadid();
my $tablename = $table_names->{$tid}->{$connectidentifier}->{$class};
my $expected_fieldnames = $table_expected_fieldnames->{$tid}->{$connectidentifier}->{$class};
my $countstatement;
if (defined $selectcount) {
$countstatement = $selectcount;
} else {
$countstatement = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename);
}
my $rowcount;
unless ($loop) {
$rowcount = $db->db_get_value($countstatement,@$values);
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
if ($rowcount > 0) {
tableprocessingstarted($db,$tablename,$rowcount,getlogger(__PACKAGE__));
} else {
processzerorowcount($db,$tablename,$rowcount,getlogger(__PACKAGE__));
return 1;
}
} else {
tableprocessingstarted($db,$tablename,undef,getlogger(__PACKAGE__));
}
my @fieldnames = @$expected_fieldnames;
my $selectstatement;
if (length($select) > 0) {
$selectstatement = $select;
} else {
$selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename);
}
my $errorstate = $RUNNING;
if ($enablemultithreading and $multithreading and $db->multithreading_supported() and $cpucount > 1) { # and $multithreaded) { # definitely no multithreading when CSVDB is involved
$tableprocessing_threads //= $cpucount;
$blocksize //= _calc_blocksize($rowcount,scalar @fieldnames,1,$tableprocessing_threadqueuelength);
my $reader;
#my $processor;
my %processors = ();
my %errorstates :shared = ();
#$errorstates{$tid} = $errorstate;
#my $readererrorstate :shared = 1;
#my $processorerrorstate :shared = 1;
my $queue = Thread::Queue->new();
tablethreadingdebug('shutting down db connections ...',getlogger(__PACKAGE__));
$db->db_disconnect();
#undef $db;
my $default_connection = &$get_db(undef,0);
my $default_connection_reconnect = $default_connection->is_connected();
$default_connection->db_disconnect();
tablethreadingdebug('starting reader thread',getlogger(__PACKAGE__));
$reader = threads->create(\&_reader,
{ name => $name,
queue => $queue,
errorstates => \%errorstates,
#readererrorstate_ref => \$readererrorstate,
#writererrorstate_ref => \$processorerrorstate,
read_code => $read_code,
threadqueuelength => $tableprocessing_threadqueuelength,
get_db => $get_db,
tablename => $tablename,
class => $class,
selectstatement => $selectstatement,
blocksize => $blocksize,
rowcount => $rowcount,
loop => $loop,
#logger => $logger,
values_ref => $values,
destroy_dbs_code => $destroy_reader_dbs_code,
});
for (my $i = 0; $i < $tableprocessing_threads; $i++) {
tablethreadingdebug('starting processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads,getlogger(__PACKAGE__));
my $processor = threads->create(\&_process,
_create_process_context($static_context,
{ name => $name,
queue => $queue,
errorstates => \%errorstates,
readertid => $reader->tid(),
#readererrorstate_ref => \$readererrorstate,
#processorerrorstate_ref => \$processorerrorstate,
process_code => $process_code,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
blocksize => $blocksize,
rowcount => $rowcount,
loop => $loop,
#logger => $logger,
}));
if (!defined $processor) {
tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' NOT started',getlogger(__PACKAGE__));
}
$processors{$processor->tid()} = $processor;
#push (@processors,$processor);
}
#$reader->join();
#tablethreadingdebug('reader thread joined',getlogger(__PACKAGE__));
#for (my $i = 0; $i < $tableprocessing_threads; $i++) {
# my $processor = $processors[$i];
# if (defined $processor) {
# $processor->join();
# tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' joinded',getlogger(__PACKAGE__));
# } else {
# tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' NOT joinded',getlogger(__PACKAGE__));
# }
#}
my $signal_handler = sub {
my $tid = threadid();
$errorstate = $STOP;
enable_threading_info(1);
tablethreadingdebug("[$tid] interrupt signal received",getlogger(__PACKAGE__));
#print("[$tid] interrupt signal received");
#_info($context,"interrupt signal received");
#$result = 0;
my $errorstates = \%errorstates;
lock $errorstates;
$errorstates->{$tid} = $STOP;
};
local $SIG{TERM} = $signal_handler;
local $SIG{INT} = $signal_handler;
local $SIG{QUIT} = $signal_handler;
local $SIG{HUP} = $signal_handler;
$reader->join();
tablethreadingdebug('reader thread joined',getlogger(__PACKAGE__));
#print 'threads running: ' . (scalar threads->list(threads::running));
#while ((scalar threads->list(threads::running)) > 1 or (scalar threads->list(threads::joinable)) > 0) {
while ((scalar keys %processors) > 0) {
#for (my $i = 0; $i < $tableprocessing_threads; $i++) {
foreach my $processor (values %processors) {
#my $processor = $processors[$i];
if (defined $processor and $processor->is_joinable()) {
$processor->join();
delete $processors{$processor->tid()};
#tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' joinded',getlogger(__PACKAGE__));
tablethreadingdebug('processor thread tid ' . $processor->tid() . ' joined',getlogger(__PACKAGE__));
}
#} else {
# tablethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $tableprocessing_threads . ' NOT joinded',getlogger(__PACKAGE__));
#}
}
sleep($thread_sleep_secs);
}
$errorstate = $COMPLETED if $errorstate == $RUNNING;
#$errorstate = $readererrorstate | $processorerrorstate;
$errorstate |= (_get_other_threads_state(\%errorstates,$tid) & ~$RUNNING);
tablethreadingdebug('restoring db connections ...',getlogger(__PACKAGE__));
#$db = &$get_db($reader_name,1);
if ($default_connection_reconnect) {
$default_connection = &$get_db(undef,1);
}
} else {
$blocksize //= _calc_blocksize($rowcount,scalar @fieldnames,0,undef);
#$db->db_disconnect();
#undef $db;
#$db = &$get_db($reader_name);
my $context = _create_process_context($static_context,{ tid => $tid, name => $name, });
my $rowblock_result = 1;
eval {
if (defined $init_process_context_code and 'CODE' eq ref $init_process_context_code) {
&$init_process_context_code($context);
}
$db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename
my $last_i;
my $i = 0;
while (1) {
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
$db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional;
my $rowblock = $db->db_get_rowblock($blocksize);
if ('CODE' eq ref $read_code) {
$rowblock = &$read_code($rowblock);
}
$db->db_finish() unless $db->rowblock_transactional;
my $realblocksize = scalar @$rowblock;
if ($realblocksize > 0) {
processing_rows($context,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
$rowblock_result = &$process_code($context,$rowblock,$i);
#$target_db->db_do_begin($insertstatement); #,$targettablename);
#$target_db->db_do_rowblock($rowblock);
#$target_db->db_finish();
$i += $realblocksize;
if (not $rowblock_result) {
last;
} elsif ($realblocksize < $blocksize) {
last unless $loop;
}
} else {
last unless $loop;
}
}
$db->db_finish() if $db->rowblock_transactional;
};
#print $@;
if ($@) {
$errorstate = $ERROR;
} else {
$errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
}
eval {
if (defined $uninit_process_context_code and 'CODE' eq ref $uninit_process_context_code) {
&$uninit_process_context_code($context);
}
};
$db->db_disconnect();
#undef $db;
}
#$db = &$get_db($controller_name,1);
if ($errorstate == $COMPLETED) {
tableprocessingdone($db,$tablename,$rowcount,getlogger(__PACKAGE__));
#$db->db_disconnect();
return 1;
} else {
#print "errorstate: $errorstate \n";
tableprocessingfailed($db,$tablename,$rowcount,getlogger(__PACKAGE__));
#$db->db_disconnect();
}
}
return 0;
}
sub _calc_blocksize {
my ($rowcount,$columncount,$multithreaded,$threadqueuelength) = @_;
if (defined $rowcount and $rowcount > $minblocksize) {
my $exp = int ( log ($rowcount) / log(10.0) );
my $blocksize = int ( 10 ** $exp );
my $cellcount_in_memory = $columncount * $blocksize;
if ($multithreaded) {
$cellcount_in_memory *= get_threadqueuelength($threadqueuelength);
}
while ( $cellcount_in_memory > $cells_transfer_memory_limit or
$rowcount / $blocksize < $minnumberofchunks) {
$exp -= 1.0;
$blocksize = int ( 10 ** $exp );
$cellcount_in_memory = $columncount * $blocksize;
if ($multithreaded) {
$cellcount_in_memory *= get_threadqueuelength($threadqueuelength);
}
}
if ($blocksize < $minblocksize) {
return $minblocksize;
} elsif ($blocksize > $maxblocksize) {
return $maxblocksize;
} else {
return $blocksize;
}
} else {
return $minblocksize;
}
}
sub _get_other_threads_state {
my ($errorstates,$tid) = @_;
my $result = 0;
if (!defined $tid) {
$tid = threadid();
}
if (defined $errorstates and ref $errorstates eq 'HASH') {
lock $errorstates;
foreach my $threadid (keys %$errorstates) {
if ($threadid != $tid) {
$result |= $errorstates->{$threadid};
}
}
}
return $result;
}
sub is_shutdown {
my ($context) = @_;
my $errorstates;
my $state = 0;
$errorstates = $context->{errorstates} if $context;
if (defined $errorstates and ref $errorstates eq 'HASH') {
lock $errorstates;
foreach my $threadid (keys %$errorstates) {
$state |= $errorstates->{$threadid};
}
}
return 1 if $state & $ERROR;
return 1 if $state & $STOP;
return 0;
}
sub _get_stop_consumer_thread {
my ($context,$tid) = @_;
my $result = 1;
my $other_threads_state;
my $reader_state;
my $queuesize;
{
my $errorstates = $context->{errorstates};
lock $errorstates;
$other_threads_state = _get_other_threads_state($errorstates,$tid);
$reader_state = $errorstates->{$context->{readertid}};
}
$queuesize = $context->{queue}->pending();
if (($other_threads_state & $ERROR) == 0
and ($other_threads_state & $STOP) == 0
and ($queuesize > 0 or $reader_state == $RUNNING)) {
$result = 0;
#keep the consumer thread running if there is no defunct thread and queue is not empty or reader is still running
}
if ($result) {
tablethreadingdebug('[' . $tid . '] consumer thread is shutting down (' .
(($other_threads_state & $ERROR) == 0 ? 'no defunct thread(s)' : uc('defunct thread(s)')) . ', ' .
(($other_threads_state & $STOP) == 0 ? 'no thread(s) stopping by signal' : uc('thread(s) stopping by signal')) . ', ' .
($queuesize > 0 ? 'blocks pending' : uc('no blocks pending')) . ', ' .
($reader_state == $RUNNING ? 'reader thread running' : uc('reader thread not running')) . ') ...'
,getlogger(__PACKAGE__));
}
return $result;
}
sub _reader {
#my ($queue,$readererrorstate_ref,$writererrorstate_ref,$get_db,$tablename,$selectstatement,$blocksize,$rowcount,$logger,@values) = @_;
my $context = shift;
my $reader_db;
my $tid = threadid();
$context->{tid} = $tid;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
}
tablethreadingdebug('[' . $tid . '] reader thread tid ' . $tid . ' started',getlogger(__PACKAGE__));
my $blockcount = 0;
eval {
$reader_db = &{$context->{get_db}}($context->{name}); #$reader_name);
$reader_db->db_get_begin($context->{selectstatement},@{$context->{values_ref}}) if $reader_db->rowblock_transactional; #$context->{tablename}
tablethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__));
while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up
#yield();
sleep($thread_sleep_secs);
}
my $last_i;
my $i = 0;
my $state = $RUNNING; #start at first
while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0 and ($state & $STOP) == 0) { #as long there is one running consumer and no defunct consumer
fetching_rows($reader_db,$context->{tablename},$i,$context->{blocksize},$context->{rowcount},getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
$reader_db->db_get_begin($context->{selectstatement},$i,$context->{blocksize},@{$context->{values_ref}}) unless $reader_db->rowblock_transactional;
my $rowblock = $reader_db->db_get_rowblock($context->{blocksize});
$reader_db->db_finish() unless $reader_db->rowblock_transactional;
my $realblocksize = scalar @$rowblock;
#my $packet = {rows => $rowblock,
# size => $realblocksize,
# #block => $i,
# row_offset => $i};
my %packet :shared = ();
if ('CODE' eq ref $context->{read_code}) {
$rowblock = &{$context->{read_code}}($rowblock);
}
$packet{rows} = $rowblock;
$packet{size} = $realblocksize;
$packet{row_offset} = $i;
if ($realblocksize > 0) {
$context->{queue}->enqueue(\%packet); #$packet);
$blockcount++;
#wait if thequeue is full and there there is one running consumer
while (((($state = _get_other_threads_state($context->{errorstates},$tid)) & $RUNNING) == $RUNNING) and $context->{queue}->pending() >= get_threadqueuelength($context->{threadqueuelength})) {
#yield();
sleep($thread_sleep_secs);
}
$i += $realblocksize;
if ($realblocksize < $context->{blocksize} and not $context->{loop}) {
tablethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data) ...',getlogger(__PACKAGE__));
last;
}
} elsif ($context->{loop}) {
sleep($loop_sleep_secs);
} else {
$context->{queue}->enqueue(\%packet); #$packet);
tablethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data - empty block) ...',getlogger(__PACKAGE__));
last;
}
}
if (not (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0 and ($state & $STOP) == 0)) {
tablethreadingdebug('[' . $tid . '] reader thread is shutting down (' .
(($state & $RUNNING) == $RUNNING ? 'still running consumer threads' : uc('no running consumer threads')) . ', ' .
(($state & $ERROR) == 0 ? 'no defunct thread(s)' : uc('defunct thread(s)')) . ', ' .
(($state & $STOP) == 0 ? 'no thread(s) stopping by signal' : uc('thread(s) stopping by signal')) . ') ...'
,getlogger(__PACKAGE__));
}
$reader_db->db_finish() if $reader_db->rowblock_transactional;
};
tablethreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
# stop the consumer:
# $context->{queue}->enqueue(undef);
if (defined $reader_db) {
# if thread cleanup has a problem...
$reader_db->db_disconnect();
}
if (defined $context->{destroy_dbs_code} and 'CODE' eq ref $context->{destroy_dbs_code}) {
&{$context->{destroy_dbs_code}}();
}
lock $context->{errorstates};
if ($@) {
$context->{errorstates}->{$tid} = $ERROR;
} elsif ($context->{errorstates}->{$tid} != $STOP) {
$context->{errorstates}->{$tid} = $COMPLETED;
}
return $context->{errorstates}->{$tid};
}
sub _writer {
my $context = shift;
#get_target_db
my $writer_db;
my $tid = threadid();
$context->{tid} = $tid;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
}
tablethreadingdebug('[' . $tid . '] writer thread tid ' . $tid . ' started',getlogger(__PACKAGE__));
my $blockcount = 0;
eval {
$writer_db = &{$context->{get_target_db}}($writer_name); #$writer_name);
while (not _get_stop_consumer_thread($context,$tid)) {
my $packet = $context->{queue}->dequeue_nb();
if (defined $packet) {
if ($packet->{size} > 0) {
writing_rows($writer_db,$context->{targettablename},$packet->{row_offset},$packet->{size},$context->{rowcount},getlogger(__PACKAGE__));
$writer_db->db_do_begin($context->{insertstatement}); #,$context->{targettablename});
$writer_db->db_do_rowblock($packet->{rows});
$writer_db->db_finish();
$blockcount++;
} elsif ($context->{loop}) {
sleep($loop_sleep_secs);
} else { #empty packet received
tablethreadingdebug('[' . $tid . '] shutting down writer thread (end of data - empty block) ...',getlogger(__PACKAGE__));
last;
}
} else {
#yield();
sleep($thread_sleep_secs); #2015-01
}
}
};
tablethreadingdebug($@ ? '[' . $tid . '] writer thread error: ' . $@ : '[' . $tid . '] writer thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
if (defined $writer_db) {
# if thread cleanup has a problem...
$writer_db->db_disconnect();
}
if (defined $context->{destroy_dbs_code} and 'CODE' eq ref $context->{destroy_dbs_code}) {
&{$context->{destroy_dbs_code}}();
}
lock $context->{errorstates};
if ($@) {
$context->{errorstates}->{$tid} = $ERROR;
} elsif ($context->{errorstates}->{$tid} != $STOP) {
$context->{errorstates}->{$tid} = $COMPLETED;
}
return $context->{errorstates}->{$tid};
}
sub _process {
my $context = shift;
#my $writer_db;
my $rowblock_result = 1;
my $tid = threadid();
$context->{tid} = $tid;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
}
tablethreadingdebug('[' . $tid . '] processor thread tid ' . $tid . ' started',getlogger(__PACKAGE__));
my $blockcount = 0;
eval {
if (defined $context->{init_process_context_code} and 'CODE' eq ref $context->{init_process_context_code}) {
&{$context->{init_process_context_code}}($context);
}
#$writer_db = &{$context->{get_target_db}}($writer_name);
while (not _get_stop_consumer_thread($context,$tid)) {
my $packet = $context->{queue}->dequeue_nb();
if (defined $packet) {
if ($packet->{size} > 0) {
#writing_rows($writer_db,$context->{targettablename},$i,$realblocksize,$context->{rowcount},getlogger(__PACKAGE__));
#$writer_db->db_do_begin($context->{insertstatement}); #,$context->{targettablename});
#$writer_db->db_do_rowblock($rowblock);
#$writer_db->db_finish();
#$i += $realblocksize;
processing_rows($context,$packet->{row_offset},$packet->{size},$context->{rowcount},getlogger(__PACKAGE__));
$rowblock_result = &{$context->{process_code}}($context,$packet->{rows},$packet->{row_offset});
$blockcount++;
#$i += $realblocksize;
if (not $rowblock_result) {
tablethreadingdebug('[' . $tid . '] shutting down processor thread (processing block NOK) ...',getlogger(__PACKAGE__));
last;
}
} elsif ($context->{loop}) {
sleep($loop_sleep_secs);
} else {
tablethreadingdebug('[' . $tid . '] shutting down processor thread (end of data - empty block) ...',getlogger(__PACKAGE__));
last;
}
} else {
#yield();
sleep($thread_sleep_secs); #2015-01
}
}
};
my $err = $@;
tablethreadingdebug($err ? '[' . $tid . '] processor thread error: ' . $err : '[' . $tid . '] processor thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
eval {
if (defined $context->{uninit_process_context_code} and 'CODE' eq ref $context->{uninit_process_context_code}) {
&{$context->{uninit_process_context_code}}($context);
}
};
lock $context->{errorstates};
if ($err) {
$context->{errorstates}->{$tid} = $ERROR;
} elsif ($context->{errorstates}->{$tid} != $STOP) {
$context->{errorstates}->{$tid} = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
}
return $context->{errorstates}->{$tid};
}
sub _create_process_context {
my $context = {};
foreach my $ctx (@_) {
if (defined $ctx and 'HASH' eq ref $ctx) {
foreach my $key (keys %$ctx) {
$context->{$key} = $ctx->{$key};
#delete $ctx->{$key};
}
}
}
return $context;
}
1;