You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bulk-processor/lib/NGCP/BulkProcessor/SqlConnector.pm

1033 lines
27 KiB

package NGCP::BulkProcessor::SqlConnector;
use strict;
## no critic
use threads;
use threads::shared;
use NGCP::BulkProcessor::Globals qw(
$enablemultithreading
$is_perl_debug
);
use NGCP::BulkProcessor::Logging qw(
getlogger
dbdebug
dbinfo);
use NGCP::BulkProcessor::LogError qw(
dberror
dbwarn
notimplementederror
sortconfigerror);
use DBI;
use NGCP::BulkProcessor::Utils qw(threadid);
use NGCP::BulkProcessor::Array qw(arrayeq);
use NGCP::BulkProcessor::RandomString qw(createtmpstring);
use NGCP::BulkProcessor::Calendar qw();
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(get_tableidentifier $log_db_operations);
#my $logger = getlogger(__PACKAGE__);
our $log_db_operations = 0; #0;
my $temptable_randomstringlength = 4;
sub new {
my $class = shift;
my $self = bless {}, $class;
my $instanceid = shift;
my $cluster = shift;
$self->{drh} = undef;
$self->{dbh} = undef;
$self->{instanceid} = $instanceid;
$self->{tid} = threadid();
$self->{sth} = undef;
$self->{query} = undef;
$self->{params} = undef;
$self->{temp_tables} = [];
$self->{cluster} = $cluster;
return $self;
}
sub set_transaction_isolation {
my ($self,$level) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return undef;
}
sub _gettemptablename {
my $self = shift;
my $temp_tablename = 'TMP_TBL_' . $self->{tid} . '_';
if (length($self->{instanceid}) > 0) {
$temp_tablename .= $self->{instanceid} . '_';
}
$temp_tablename .= createtmpstring($temptable_randomstringlength); #$self->{temp_table_count};
return $temp_tablename;
}
sub instanceidentifier {
my $self = shift;
$self->{instanceid} = shift if @_;
return $self->{instanceid};
}
sub cluster {
my $self = shift;
$self->{cluster} = shift if @_;
return $self->{cluster};
}
sub _connectidentifier {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return undef;
}
sub connectidentifier {
my $self = shift;
my $cluster = $self->{cluster};
if (defined $cluster) {
return $cluster->{name};
} else {
$self->_connectidentifier();
}
}
sub tableidentifier {
my $self = shift;
my $tablename = shift;
my (@params) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return undef;
}
sub columnidentifier {
my $self = shift;
my $columnname = shift;
my (@params) = @_;
return join('.',map { $self->_columnidentifier($_,@params); } split(/\./,$columnname,-1));
}
sub _columnidentifier {
my $self = shift;
my $columnname = shift;
my (@params) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return undef;
}
sub get_tableidentifier {
my ($tablename,@params) = @_;
notimplementederror(__PACKAGE__ . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return undef;
}
sub getsafetablename {
# make a table name (identifier) string save for use within create table statements
# of this rdbms connector.
my $self = shift;
my ($tableidentifier) = @_; #shift;
$tableidentifier =~ s/[^0-9a-z_]/_/gi;
return $tableidentifier;
}
sub _extract_indexcols {
my $self = shift;
my $indexcols = shift;
if (defined $indexcols and ref $indexcols eq 'ARRAY') {
my @blankcols = map { local $_ = $_; s/\s*\(\d+\).*$//g; $_; } @$indexcols;
return \@blankcols;
} else {
return [];
}
}
sub vacuum {
my $self = shift;
my $tablename = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub paginate_sort_query {
my $self = shift;
my $statement = shift;
my $offset = shift;
my $limit = shift;
my $sortingconfigurations = shift;
if ($statement =~ /limit\s+\d+(,\s*\d+)?\s*$/i) {
$statement = "SELECT * FROM ($statement) AS _ps";
}
my $orderby = $self->_orderby_columns($sortingconfigurations);
if (length($orderby) > 0) {
$statement .= ' ORDER BY ' . $orderby;
}
if (defined $offset and defined $limit) {
$statement .= ' LIMIT ' . $offset . ', ' . $limit;
}
return $statement;
}
sub insert_ignore_phrase {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub _force_numeric_column {
my $self = shift;
my $column = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub _orderby_columns {
my $self = shift;
my $sortingconfigurations = shift;
my @orderby = ();
if (defined $sortingconfigurations) {
foreach my $sc (@$sortingconfigurations) {
if (defined $sc and ref $sc eq 'HASH') {
my $columnname = ((exists $sc->{memberchain}) ? $sc->{memberchain} : $sc->{column});
if (ref $columnname eq 'ARRAY') {
$columnname = $columnname->[0];
}
if (length($columnname) > 0) {
$columnname = $self->columnidentifier($columnname);
my $orderby_column;
if ($sc->{numeric}) {
$orderby_column = $self->_force_numeric_column($columnname);
} else {
$orderby_column = $columnname;
}
if (!defined $sc->{dir} or $sc->{dir} > 0) {
$orderby_column .= ' ASC';
} else {
$orderby_column .= ' DESC';
}
push(@orderby,$orderby_column);
} else {
sortconfigerror(undef,'sort column required',getlogger(__PACKAGE__));
}
} else {
sortconfigerror(undef,'invalid sorting configuration',getlogger(__PACKAGE__));
}
}
}
return join(', ',@orderby);
}
sub getdatabases {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return [];
}
sub _createdatabase {
my $self = shift;
my ($databasename) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return [];
}
sub db_connect {
my $self = shift;
my (@params) = @_;
if (defined $self->{dbh}) {
$self->_db_disconnect();
}
# child class will do the connect stuff...
}
sub db_disconnect {
my $self = shift;
#my $tid = threadid();
my $cluster = $self->{cluster};
if (defined $cluster) {
dbdebug($self,'disconnecting database cluster ' . $cluster->{name},getlogger(__PACKAGE__));
foreach my $node (@{$cluster->{nodes}}) {
if ($node->{active}) {
my $node_db = &{$node->{get_db}}($self->{instanceid},0);
$node_db->_db_disconnect();
}
}
#$cluster->{scheduling_vars} = {};
} else {
$self->_db_disconnect();
}
}
sub _db_disconnect {
my $self = shift;
# since this is also called from DESTROY, no die() here!
$self->db_finish();
if (defined $self->{dbh}) {
#cleartableinfo($self);
#dbdebug($self,'disconnecting' . ((defined $self->{cluster}) ? ' ' . $self->_connectidentifier() : ''),getlogger(__PACKAGE__));
dbdebug($self,'disconnecting',getlogger(__PACKAGE__));
foreach my $temp_tablename (@{$self->{temp_tables}}) {
#if ($self->table_exists($temp_tablename)) {
$self->drop_table($temp_tablename);
#}
}
$self->{temp_tables} = [];
$self->{dbh}->disconnect() or dbwarn($self,'error disconnecting: ' . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
$self->{dbh} = undef;
dbinfo($self,'disconnected',getlogger(__PACKAGE__));
#dbinfo($self,((defined $self->{cluster}) ? $self->_connectidentifier() . ' ' : '') . 'disconnected',getlogger(__PACKAGE__));
}
# further disconnect code follows in child classes....
}
sub is_connected {
my $self = shift;
return (defined $self->{dbh});
}
sub getfieldnames {
my $self = shift;
my $tablename = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return [];
}
sub getprimarykeycols {
my $self = shift;
my $tablename = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return [];
}
sub create_temptable {
my $self = shift;
my $select_stmt = shift;
my $indexes = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return '';
}
sub create_primarykey {
my $self = shift;
my ($tablename,$keycols,$fieldnames) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return 0;
}
sub create_indexes {
my $self = shift;
my ($tablename,$indexes,$keycols) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return 0;
}
sub create_texttable {
my $self = shift;
my ($tablename,$fieldnames,$keycols,$indexes,$truncate,$defer_indexes) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return 0;
}
sub truncate_table {
my $self = shift;
my $tablename = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub table_exists {
my $self = shift;
my $tablename = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return 0;
}
sub drop_table {
my $self = shift;
my $tablename = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub _prepare_error {
my $self = shift;
my $query = shift;
dberror($self,"failed to prepare:\n" . $query . "\nDBI error:\n" . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
}
sub _execute_error {
my $self = shift;
my $query = shift;
my $sth = shift;
my $errstr;
if (defined $sth) {
$errstr = $sth->errstr();
} else {
$errstr = $self->{dbh}->errstr();
}
dberror($self,"failed to execute:\n" . $query . "\nparameters:\n". join(', ', @_) . "\nDBI error:\n" . $errstr,getlogger(__PACKAGE__));
}
sub _fetch_error {
my $self = shift;
my $query = shift;
my $sth = shift;
my $operation = shift;
my $index = shift;
my $errstr;
if (defined $sth) {
$errstr = $sth->errstr();
} else {
$errstr = $self->{dbh}->errstr();
}
dberror($self,'failed with ' . $operation . ":\n" . $query . "\n" . ((defined $index) ? 'column index: ' . $index . "\n" : '') . "parameters:\n". join(', ', @_) . "\nDBI error:\n" . $errstr,getlogger(__PACKAGE__));
}
sub datetime_to_string {
my $self = shift;
my ($dt) = @_;
return NGCP::BulkProcessor::Calendar::datetime_to_string($dt);
}
sub datetime_from_string {
my $self = shift;
my ($s,$tz) = @_;
return NGCP::BulkProcessor::Calendar::datetime_from_string($s,$tz);
}
# "The data type is 'sticky' in that bind values passed to execute() are bound with
# the data type specified by earlier bind_param() calls, if any."
sub _bind_params {
my $self = shift;
my $sth = shift;
my @params = ();
my $p_num = 1;
foreach my $param (@_) {
if (defined $param and 'HASH' eq $param) {
push(@params, delete $param->{value});
$sth->bind_param($p_num, undef, $param);
} else {
push(@params,$param);
}
$p_num++;
}
return @params;
}
#sub db_autocommit {
#
# my $self = shift;
# if (defined $self->{dbh}) {
# if (@_) {
# my ($autocommit) = @_;
# $autocommit = ($autocommit ? 1 : 0);
# dbdebug($self,'set AutoCommit ' . $self->{dbh}->{AutoCommit} . ' -> ' . $autocommit,getlogger(__PACKAGE__));
# $self->{dbh}->{AutoCommit} = $autocommit;
# }
# return $self->{dbh}->{AutoCommit};
# }
# return undef;
#
#}
# This method executes a SQL query that doesn't return any data. The
# query may contain placeholders, that will be replaced by the elements
# in @params during execute(). The method will die if any error occurs
# and return whatever DBI's execute() returned.
sub db_do {
my $self = shift;
my $query = shift;
my $result = 0;
if (defined $self->{dbh}) {
my $sth = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
my @params = $self->_bind_params($sth,@_);
dbdebug($self,'db_do: ' . $query . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__)) if $log_db_operations;
$result = $sth->execute(@params);
if (defined $result) {
if ('0E0' eq $result) {
return 0;
}
} else {
$self->_execute_error($query,$sth,@params);
}
}
return $result;
}
# get the first value of the first row of data that is returned from the
# database. Returns undef if no data is found.
sub db_get_value {
my $self = shift;
my $query = shift;
my $row = undef;
if (defined $self->{dbh}) {
my $sth = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
my @params = $self->_bind_params($sth,@_);
dbdebug($self,'db_get_value: ' . $query . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__)) if $log_db_operations;
$sth->execute(@params) or $self->_execute_error($query,$sth,@params);
$row = $sth->fetchrow_arrayref();
$self->_fetch_error($query,$sth,'fetchrow_arrayref',undef,@params) if !defined $row and $sth->err();
$sth->finish();
}
return ((defined $row) ? $$row[0] : undef);
}
# get a reference to the first row of data that is returned from the database.
# (I.e. whatever is returned by DBI's fetchrow_hashref().)
sub db_get_row {
my $self = shift;
my $query = shift;
my $row = [];
if (defined $self->{dbh}) {
my $sth = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
my @params = $self->_bind_params($sth,@_);
dbdebug($self,'db_get_row: ' . $query . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__)) if $log_db_operations;
$sth->execute(@params) or $self->_execute_error($query,$sth,@params);
$row = $sth->fetchrow_hashref();
$self->_fetch_error($query,$sth,'fetchrow_hashref',undef,@params) if !defined $row and $sth->err();
$sth->finish();
}
return $row;
}
# get a reference to an array containing the first value of every data row that
# is returned from the database like DBI's selectcol_arrayref() does.
sub db_get_col {
my $self = shift;
my $query = shift;
my $col = [];
if (defined $self->{dbh}) {
my $sth = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
my @params = $self->_bind_params($sth,@_);
dbdebug($self,'db_get_col: ' . $query . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__)) if $log_db_operations;
$col = $self->{dbh}->selectcol_arrayref($sth, undef, @params);
#die "Failed to selectcol_arrayref:\n$query\nDBI error:". $sth->errstr() if !defined $col and $sth->err();
$self->_fetch_error($query,$sth,'selectcol_arrayref',undef,@params) if !defined $col and $sth->err();
$sth->finish();
}
return $col;
}
# get all data that is returned from the database. (I.e. a reference to an
# array containing entries returned by DBI's fetchrow_hashref().)
sub db_get_all_arrayref {
my $self = shift;
my $query = shift;
my @rows = ();
if (defined $self->{dbh}) {
my $sth = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
my @params = $self->_bind_params($sth,@_);
dbdebug($self,'db_get_all_arrayref: ' . $query . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__)) if $log_db_operations;
$sth->execute(@params) or $self->_execute_error($query,$sth,@params);
while (my $row = $sth->fetchrow_hashref()) {
$self->_fetch_error($query,$sth,'fetchrow_hashref',undef,@params) if $sth->err();
push @rows, $row;
}
$sth->finish();
}
return \@rows;
}
# get a reference to a hash containing a hash reference for each row, like
# DBI's fetchall_hashref() does.
sub db_get_all_hashref {
my $self = shift;
my $query = shift;
my $index = shift;
my $result = {};
if (defined $self->{dbh}) {
my $sth = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
my @params = $self->_bind_params($sth,@_);
dbdebug($self,'db_get_all_hashref: ' . $query . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__)) if $log_db_operations;
$sth->execute(@params) or $self->_execute_error($query,$sth,@params);
$result = $sth->fetchall_hashref($index);
$self->_fetch_error($query,$sth,'fetchall_hashref',$index,@params) if $sth->err();
$sth->finish();
}
return $result;
}
# get a reference to a hash that is composed of the key_column as keys and the
# value_column as values.
sub db_get_mapref {
my $self = shift;
my $query = shift;
my $index = shift;
my $value = shift;
my $result = {};
if (defined $self->{dbh}) {
my $sth = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
my @params = $self->_bind_params($sth,@_);
dbdebug($self,'db_get_mapref: ' . $query . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__)) if $log_db_operations;
$sth->execute(@_) or $self->_execute_error($query,$sth,@params);
my $rows = $sth->fetchall_hashref($index);
#die "Failed to fetchall_hashref:\n$query\nDBI error:". $sth->errstr() if $sth->err();
$self->_fetch_error($query,$sth,'fetchall_hashref',$index,@params) if $sth->err();
foreach my $key (keys %$rows) {
$result->{$key} = $$rows{$key}{$value};
}
$sth->finish();
return $result;
}
return $result;
}
sub db_begin {
my $self = shift;
if (defined $self->{dbh}) {
dbdebug($self,'db_begin',getlogger(__PACKAGE__));
$self->{dbh}->begin_work() or dberror($self, "failed with begin_transaction \nDBI error:\n" . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
if ($self->{dbh}->{AutoCommit}) {
dbwarn($self,'autocommit was not disabled',getlogger(__PACKAGE__));
}
}
}
sub db_commit {
my $self = shift;
if (defined $self->{dbh}) {
dbdebug($self,'db_commit',getlogger(__PACKAGE__));
if ($is_perl_debug) { #https://rt.cpan.org/Public/Bug/Display.html?id=102791
# no context:
$self->{dbh}->commit();
if ($DBI::err) {
dberror($self, "failed to commit changes\nDBI error:\n" . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
}
} else {
#my @wa =
$self->{dbh}->commit() or dberror($self, "failed to commit changes\nDBI error:\n" . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
}
}
}
sub db_rollback {
my $self = shift;
my ($log) = @_;
if (defined $self->{dbh}) {
dbdebug($self,'db_rollback',getlogger(__PACKAGE__));
if ($is_perl_debug) {
$self->{dbh}->rollback();
if ($DBI::err) {
dberror($self, "failed to rollback changes\nDBI error:\n" . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
}
} else {
$self->{dbh}->rollback() or dberror($self, "failed to rollback changes\nDBI error:\n" . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
}
dbinfo($self,'transaction rolled back',getlogger(__PACKAGE__)) if $log;
}
}
sub db_quote {
my $self = shift;
my $value = shift;
my $result = $value;
if (defined $self->{dbh}) {
$result = $self->{dbh}->quote($value) or dberror($self, "failed to quote value\nDBI error:\n" . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
}
return $result;
}
sub db_last_insert_id {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub DESTROY {
my $self = shift;
# perl threads works like a fork, each thread owns a shalow? copy
# of the entire current context, at the moment it starts.
# due to this, if the thread is finished, perl gc will invoke destructors
# on the thread's scope elements, that potentially contains connectors from
# the main tread. it will actually attempt destroy them (disconnect, etc.)
# this is a problem with destructors that change object state like this one
#
# to avoid this, we perform destruction tasks only if the destructing tid
# is the same as the creating one:
if ($self->{tid} == threadid()) {
$self->_db_disconnect();
delete $self->{drh};
eval {
dbdebug($self,(ref $self) . ' connector destroyed',getlogger(__PACKAGE__));
};
#} else {
# print "NOT destroyed\n";
}
}
sub lock_tables {
my $self = shift;
my $tablestolock = shift;
#$self->db_begin();
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub unlock_tables {
my $self = shift;
#$self->db_commit();
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub db_do_begin {
my $self = shift;
my $query = shift;
#my $tablename = shift;
my $transactional = shift;
#notimplementederror('db_do_begin',getlogger(__PACKAGE__));
if (defined $self->{dbh} and !defined $self->{sth}) { # and length($tablename) > 0) {
dbdebug($self,'db_do_begin: ' . $query,getlogger(__PACKAGE__));
if ($transactional) {
#$self->lock_tables({ $tablename => 'WRITE' });
$self->db_begin();
}
$self->{sth} = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
$self->{query} = $query;
$self->{params} = [];
}
}
sub db_do_rowblock {
my $self = shift;
my $rows = shift;
#notimplementederror('db_do_rowblock',getlogger(__PACKAGE__));
if (defined $self->{dbh} and defined $self->{sth} and defined $rows and ref $rows eq 'ARRAY') {
#dberror($self,'test error',getlogger(__PACKAGE__));
#mysqldbdebug($self,"db_do_rowblock\nrows:\n" . (scalar @$rows),getlogger(__PACKAGE__));
#mysqldbdebug($self,'db_do_rowblock: ' . $self->{query} . "\nparameters:\n" . join(', ', @_),getlogger(__PACKAGE__));
foreach my $row (@$rows) {
my @params = $self->_bind_params($self->{sth},@$row);
dbdebug($self,'db_do_rowblock: ' . $self->{query} . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__)) if $log_db_operations;
$self->{sth}->execute(@params) or $self->_execute_error($self->{query},$self->{sth},@params);
$self->{params} = \@params;
}
}
}
sub db_get_begin {
my $self = shift;
my $query = shift;
#my $tablename = shift;
my $transactional = shift;
if (defined $self->{dbh} and !defined $self->{sth}) { # and length($tablename) > 0) {
#eval { $self->lock_tables({ $tablename => 'WRITE' }); };
if ($transactional) {
#$self->lock_tables({ $tablename => 'WRITE' });
$self->db_begin();
} else {
my $offset = shift;
my $limit = shift;
$query = $self->paginate_sort_query($query,$offset,$limit,undef);
}
$self->{sth} = $self->{dbh}->prepare($query) or $self->_prepare_error($query);
my @params = $self->_bind_params($self->{sth},@_);
dbdebug($self,'db_get_begin: ' . $query . "\nparameters:\n" . join(', ', @params),getlogger(__PACKAGE__));
$self->{sth}->execute(@params) or $self->_execute_error($query,$self->{sth},@params);
$self->{query} = $query;
$self->{params} = \@params;
}
}
sub multithreading_supported {
my $self = shift;
return 0;
}
sub rowblock_transactional {
my $self = shift;
return 0;
}
sub db_get_rowblock {
my $self = shift;
my $max_rows = shift; #https://www.perlmonks.org/?node_id=273952
if ($enablemultithreading) {
#my $rows : shared = [];
my @rows :shared = ();
#my $rows = &share([]); # beware of '&' here!!!!
#my $rows = shared_clone({});
if (defined $self->{dbh} and defined $self->{sth}) {
dbdebug($self,'db_get_rowblock: ' . $self->{query} . "\nparameters:\n" . join(', ', @{$self->{params}}),getlogger(__PACKAGE__)) if $log_db_operations;
foreach (@{$self->{sth}->fetchall_arrayref(undef, $max_rows) // []}) {
my @row : shared = @{$_};
push @rows, \@row;
}
$self->_fetch_error($self->{query},$self->{sth},'db_get_rowblock',undef,@{$self->{params}}) if $self->{sth}->err();
}
#share(@rows);
return \@rows;
#return $rows;
#return \@rows;
} else {
my $rows = [];
if (defined $self->{dbh} and defined $self->{sth}) {
dbdebug($self,'db_get_rowblock: ' . $self->{query} . "\nparameters:\n" . join(', ', @{$self->{params}}),getlogger(__PACKAGE__)) if $log_db_operations;
$rows = $self->{sth}->fetchall_arrayref(undef, $max_rows);
$self->_fetch_error($self->{query},$self->{sth},'db_get_rowblock',undef,@{$self->{params}}) if $self->{sth}->err();
}
return $rows;
}
}
sub db_finish {
my $self = shift;
my $transactional = shift;
my $rollback = shift;
my $log = shift;
# since this is also called from DESTROY, no die() here!
if (defined $self->{dbh} and defined $self->{sth}) {
dbdebug($self,'db_finish',getlogger(__PACKAGE__));
$self->{sth}->finish();
$self->{sth} = undef;
if ($transactional) {
#$self->unlock_tables();
if ($rollback) {
$self->db_rollback($log);
} else {
$self->db_commit();
}
}
$self->{query} = undef;
$self->{params} = undef;
}
}
sub ping {
my $self = shift;
#notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return 1;
}
1;