You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bulk-processor/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm

540 lines
16 KiB

package NGCP::BulkProcessor::SqlConnectors::SQLServerDB;
use strict;
## no critic
use NGCP::BulkProcessor::Globals qw($LongReadLen_limit);
use NGCP::BulkProcessor::Logging qw(
getlogger
dbinfo
dbdebug
texttablecreated
temptablecreated
indexcreated
primarykeycreated
tabletruncated
tabledropped);
use NGCP::BulkProcessor::LogError qw(
dberror
fieldnamesdiffer);
use DBI;
use DBD::ODBC 1.50;
#https://blog.afoolishmanifesto.com/posts/install-and-configure-the-ms-odbc-driver-on-debian/
#http://community.spiceworks.com/how_to/show/78224-install-the-ms-sql-odbc-driver-on-debian-7
use NGCP::BulkProcessor::Array qw(arrayeq itemcount contains setcontains removeduplicates mergearrays);
use NGCP::BulkProcessor::SqlConnector;
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlConnector);
our @EXPORT_OK = qw(get_tableidentifier);
my $defaulthost = '127.0.0.1';
my $defaultport = '1433';
my $defaultusername = 'SA';
my $defaultpassword = '';
my $defaultdatabasename = 'master';
my $varcharsize = 900; #8000;
my $encoding = 'LATIN1';
my $collation_name = 'Latin1_General_CI_AS'; #OS locales only
my $lc_ctype = 'C';
my $client_encoding = 'LATIN1';
my $LongReadLen = $LongReadLen_limit; #bytes
my $LongTruncOk = 0;
#my $logger = getlogger(__PACKAGE__);
#my $lock_do_chunk = 0;
#my $lock_get_chunk = 0;
my $rowblock_transactional = 1;
my $transaction_isolation_level = ''; #'SERIALIZABLE'
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlConnector->new(@_);
$self->{host} = undef;
$self->{port} = undef;
$self->{databasename} = undef;
$self->{username} = undef;
$self->{password} = undef;
$self->{drh} = DBI->install_driver('ODBC');
bless($self,$class);
dbdebug($self,__PACKAGE__ . ' connector created',getlogger(__PACKAGE__));
return $self;
}
sub _connectidentifier {
my $self = shift;
if (defined $self->{databasename}) {
return $self->{username} . '@' . $self->{host} . ':' . $self->{port} . '.' . $self->{databasename};
} else {
return undef;
}
}
sub tableidentifier {
my $self = shift;
my $tablename = shift;
return $tablename;
}
sub _columnidentifier {
my $self = shift;
my $columnname = shift;
return $columnname;
}
sub get_tableidentifier {
my ($tablename,$databasename) = @_;
if (defined $databasename) {
return $databasename . '.' . $tablename;
} else {
return $tablename;
}
}
sub getsafetablename {
# make a table name (identifier) string save for use within create table statements
# of this rdbms connector.
my $self = shift;
my $tableidentifier = shift;
return lc($self->SUPER::getsafetablename($tableidentifier));
}
sub paginate_sort_query {
my $self = shift;
my $statement = shift;
my $offset = shift;
my $limit = shift;
my $sortingconfigurations = shift;
my $orderby = $self->_orderby_columns($sortingconfigurations);
if (defined $offset and defined $limit and length($orderby) > 0) {
my ($select_fields_part,$table_whereclause_part) = split /\s+from\s+/i,$statement,2;
$select_fields_part =~ s/^\s*select\s+//i;
return 'SELECT * FROM (SELECT ' . $select_fields_part . ', ROW_NUMBER() OVER (ORDER BY ' . $orderby . ') as row FROM ' . $table_whereclause_part . ') AS p WHERE p.row > ' . $offset . ' AND p.row <= ' . ($offset + $limit);
}
}
sub _force_numeric_column {
my $self = shift;
my $column = shift;
return 'TRY_CONVERT(NUMERIC, ' . $column . ')';
}
sub _dbd_connect {
my $self = shift;
my $databasename = shift;
my $connection_string;
if ($^O eq 'MSWin32') {
$connection_string = 'DBI:ODBC:Driver={SQL Server};Server=' . $self->{host} . ',' . $self->{port};
} else {
#$connection_string = 'dbi:ODBC:driver=SQL Server Native Client 11.0;server=tcp:' . $self->{host} . ',' . $self->{port};
#$connection_string = 'dbi:ODBC:driver=ODBC Driver 17 for SQL Server;server=tcp:' . $self->{host} . ',' . $self->{port};
#$connection_string = 'dbi:ODBC:driver={FreeTDS};server=' . $self->{host} . ',' . $self->{port};
$connection_string = 'dbi:ODBC:driver={FreeTDS};server=' . $self->{host} . ';encryption=off;port=' . $self->{port};
}
if (length($databasename) > 0) {
$connection_string .= ';database=' . $databasename;
}
return (DBI->connect($connection_string,$self->{username},$self->{password},
{
PrintError => 0,
RaiseError => 0,
AutoCommit => 1,
#AutoCommit => 0,
}
) or dberror($self,'error connecting: ' . $self->{drh}->errstr(),getlogger(__PACKAGE__)));
}
sub getdatabases {
my $self = shift;
my $connected_wo_db = 0;
if (!defined $self->{dbh}) {
$self->{dbh} = $self->_dbd_connect();
$connected_wo_db = 1;
}
my $dbs = $self->db_get_col('SELECT name FROM master..sysdatabases');
if ($connected_wo_db) {
$self->{dbh}->disconnect() or dberror($self,'error disconnecting: ' . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
$self->{dbh} = undef;
}
return $dbs;
}
sub _createdatabase {
my $self = shift;
my ($databasename) = @_;
$self->{dbh} = $self->_dbd_connect();
$self->db_do('CREATE DATABASE ' . $databasename . ' COLLATE ' . $collation_name);
dbinfo($self,'database \'' . $databasename . '\' created',getlogger(__PACKAGE__));
$self->{dbh}->disconnect() or dberror($self,'error disconnecting: ' . $self->{dbh}->errstr(),getlogger(__PACKAGE__));
$self->{dbh} = undef;
}
sub db_connect {
my $self = shift;
my ($databasename,$username,$password,$host,$port,$create_database) = @_;
$self->SUPER::db_connect($databasename,$username,$password,$host,$port);
#if (defined $self->{dbh}) {
# $self->db_disconnect();
#}
$host = $defaulthost if (not $host);
$port = $defaultport if (not $port);
$databasename = $defaultdatabasename if (not $databasename);
$username = $defaultusername if (not $username);
$password = $defaultpassword if (not $password);
$self->{host} = $host;
$self->{port} = $port;
$self->{databasename} = $databasename;
$self->{username} = $username;
$self->{password} = $password;
if ($create_database and not contains($databasename,$self->getdatabases(),0)) {
$self->_createdatabase($databasename);
}
dbdebug($self,'connecting',getlogger(__PACKAGE__));
my $dbh = $self->_dbd_connect($databasename);
$dbh->{InactiveDestroy} = 1;
$dbh->{LongReadLen} = $LongReadLen;
$dbh->{LongTruncOk} = $LongTruncOk;
$self->{dbh} = $dbh;
#$self->db_do('SET CLIENT_ENCODING TO ?',$client_encoding);
if (length($transaction_isolation_level) > 0) {
$self->db_do('SET TRANSACTION ISOLATION LEVEL ' . $transaction_isolation_level);
}
$self->db_do('SET ANSI_WARNINGS ON');
$self->db_do('SET ANSI_NULLS ON');
$self->db_do('SET ANSI_PADDING ON');
dbinfo($self,'connected',getlogger(__PACKAGE__));
}
sub vacuum {
my $self = shift;
my $tablename = shift;
$self->db_do('DBCC SHRINKDATABASE (0) WITH NO_INFOMSGS');
}
sub _db_disconnect {
my $self = shift;
$self->SUPER::_db_disconnect();
}
sub getfieldnames {
my $self = shift;
my $tablename = shift;
return $self->db_get_col('SELECT column_name FROM information_schema.columns WHERE table_name = ?',$tablename);
#return $self->db_get_col('SELECT name FROM sys.columns WHERE object_id = OBJECT_ID(?)', 'dbo.' . $tablename);
}
sub getprimarykeycols {
my $self = shift;
my $tablename = shift;
return $self->db_get_col('SELECT c.column_name from ' .
'information_schema.table_constraints t, ' .
'information_schema.constraint_column_usage c ' .
'WHERE ' .
'c.constraint_name = t.constraint_name ' .
'AND c.table_name = t.table_name ' .
'AND t.constraint_type = ? ' .
'AND c.table_name = ?','PRIMARY KEY',$tablename);
}
sub create_temptable {
my $self = shift;
my $select_stmt = shift;
my $indexes = shift;
my $index_tablename = $self->_gettemptablename();
my $temp_tablename = '##' . $index_tablename;
my ($select_fields_part,$table_whereclause_part) = split /\s+from\s+/i,$select_stmt,2;
$self->db_do($select_fields_part . ' INTO ' . $temp_tablename . ' FROM ' . $table_whereclause_part);
temptablecreated($self,$index_tablename,getlogger(__PACKAGE__));
if (defined $indexes and ref $indexes eq 'HASH' and scalar keys %$indexes > 0) {
foreach my $indexname (keys %$indexes) {
my $indexcols = $self->_extract_indexcols($indexes->{$indexname});
#if (not arrayeq($indexcols,$keycols,1)) {
#$statement .= ', INDEX ' . $indexname . ' (' . join(', ',@{$indexes->{$indexname}}) . ')';
$indexname = lc($index_tablename) . '_' . $indexname;
$self->db_do('CREATE INDEX ' . $indexname . ' ON ' . $temp_tablename . ' (' . join(', ',map { local $_ = $_; $_ = $self->columnidentifier($_); $_; } @$indexcols) . ')');
indexcreated($self,$index_tablename,$indexname,getlogger(__PACKAGE__));
#}
}
}
return $temp_tablename;
}
sub create_primarykey {
my $self = shift;
my ($tablename,$keycols,$fieldnames) = @_;
if (length($tablename) > 0 and defined $fieldnames and ref $fieldnames eq 'ARRAY') {
if (defined $keycols and ref $keycols eq 'ARRAY' and scalar @$keycols > 0 and setcontains($keycols,$fieldnames,1)) {
my $statement = 'ALTER TABLE ' . $self->tableidentifier($tablename) . ' ADD PRIMARY KEY (' . join(', ',map { local $_ = $_; $_ = $self->columnidentifier($_); $_; } @$keycols) . ')';
$self->db_do($statement);
primarykeycreated($self,$tablename,$keycols,getlogger(__PACKAGE__));
return 1;
}
}
return 0;
}
sub create_indexes {
my $self = shift;
my ($tablename,$indexes,$keycols) = @_;
my $index_count = 0;
if (length($tablename) > 0) {
if (defined $indexes and ref $indexes eq 'HASH' and scalar keys %$indexes > 0) {
foreach my $indexname (keys %$indexes) {
my $indexcols = $self->_extract_indexcols($indexes->{$indexname});
if (not arrayeq($indexcols,$keycols,1)) {
#$statement .= ', INDEX ' . $indexname . ' (' . join(', ',@{$indexes->{$indexname}}) . ')';
$self->db_do('CREATE INDEX ' . $indexname . ' ON ' . $self->tableidentifier($tablename) . ' (' . join(', ',map { local $_ = $_; $_ = $self->columnidentifier($_); $_; } @$indexcols) . ')');
indexcreated($self,$tablename,$indexname,getlogger(__PACKAGE__));
}
}
}
}
return $index_count;
}
sub create_texttable {
my $self = shift;
my ($tablename,$fieldnames,$keycols,$indexes,$truncate,$defer_indexes) = @_;
#my $tablename = $self->getsafetablename($tableidentifier);
#my ($tableidentifier,$fieldnames,$keycols,$indexes,$truncate) = @_;
#my $tablename = $self->getsafetablename($tableidentifier);
if (length($tablename) > 0 and defined $fieldnames and ref $fieldnames eq 'ARRAY') {
my $created = 0;
if ($self->table_exists($tablename) == 0) {
my $statement = 'CREATE TABLE ' . $self->tableidentifier($tablename) . ' (';
#$statement .= join(' TEXT, ',@$fieldnames) . ' TEXT';
my $allindexcols = [];
if (defined $indexes and ref $indexes eq 'HASH' and scalar keys %$indexes > 0) {
foreach my $indexname (keys %$indexes) {
$allindexcols = mergearrays($allindexcols,$self->_extract_indexcols($indexes->{$indexname}));
#push(@allindexcols, $self->_extract_indexcols($indexes->{$indexname}));
}
}
$allindexcols = removeduplicates($allindexcols,1);
my @fieldspecs = ();
foreach my $fieldname (@$fieldnames) {
if (contains($fieldname,$keycols,1)) {
push @fieldspecs,$self->columnidentifier($fieldname) . ' VARCHAR(' . $varcharsize . ') NOT NULL';
#$statement .= $fieldname . ' VARCHAR(256)';
} elsif (contains($fieldname,$allindexcols,1)) {
push @fieldspecs,$self->columnidentifier($fieldname) . ' VARCHAR(' . $varcharsize . ')';
#$statement .= $fieldname . ' VARCHAR(256)';
} else {
push @fieldspecs,$self->columnidentifier($fieldname) . ' VARCHAR(MAX)';
#$statement .= $fieldname . ' TEXT';
}
}
$statement .= join(', ',@fieldspecs);
#if (defined $keycols and ref $keycols eq 'ARRAY' and scalar @$keycols > 0 and setcontains($keycols,$fieldnames,1)) {
if (not $defer_indexes and defined $keycols and ref $keycols eq 'ARRAY' and scalar @$keycols > 0 and setcontains($keycols,$fieldnames,1)) {
$statement .= ', PRIMARY KEY (' . join(', ',map { local $_ = $_; $_ = $self->columnidentifier($_); $_; } @$keycols) . ')';
}
$statement .= ')'; # CHARACTER SET ' . $texttable_charset . ', COLLATE ' . $texttable_collation . ', ENGINE ' . $texttable_engine;
$self->db_do($statement);
texttablecreated($self,$tablename,getlogger(__PACKAGE__));
if (not $defer_indexes and defined $indexes and ref $indexes eq 'HASH' and scalar keys %$indexes > 0) {
foreach my $indexname (keys %$indexes) {
my $indexcols = $self->_extract_indexcols($indexes->{$indexname});
if (not arrayeq($indexcols,$keycols,1)) {
#$statement .= ', INDEX ' . $indexname . ' (' . join(', ',@{$indexes->{$indexname}}) . ')';
$self->db_do('CREATE INDEX ' . $indexname . ' ON ' . $self->tableidentifier($tablename) . ' (' . join(', ',map { local $_ = $_; $_ = $self->columnidentifier($_); $_; } @$indexcols) . ')');
indexcreated($self,$tablename,$indexname,getlogger(__PACKAGE__));
}
}
}
$created = 1;
} else {
my $fieldnamesfound = $self->getfieldnames($tablename);
if (not setcontains($fieldnames,$fieldnamesfound,1)) {
fieldnamesdiffer($self,$tablename,$fieldnames,$fieldnamesfound,getlogger(__PACKAGE__));
return 0;
}
}
if (not $created and $truncate) {
$self->truncate_table($tablename);
}
return 1;
} else {
return 0;
}
#return $tablename;
}
sub multithreading_supported {
my $self = shift;
return 1;
}
sub rowblock_transactional {
my $self = shift;
return $rowblock_transactional;
}
sub truncate_table {
my $self = shift;
my $tablename = shift;
$self->db_do('TRUNCATE ' . $self->tableidentifier($tablename));
tabletruncated($self,$tablename,getlogger(__PACKAGE__));
}
sub table_exists {
my $self = shift;
my $tablename = shift;
return $self->db_get_value('SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?',$tablename);
}
sub drop_table {
my $self = shift;
my $tablename = shift;
if ($self->table_exists($tablename) > 0) {
$self->db_do('DROP TABLE ' . $self->tableidentifier($tablename));
tabledropped($self,$tablename,getlogger(__PACKAGE__));
return 1;
}
return 0;
}
sub db_do_begin {
my $self = shift;
my $query = shift;
#my $tablename = shift;
$self->SUPER::db_do_begin($query,$rowblock_transactional,@_);
}
sub db_get_begin {
my $self = shift;
my $query = shift;
#my $tablename = shift;
#my $lock = shift;
$self->SUPER::db_get_begin($query,$rowblock_transactional,@_);
}
sub db_finish {
my $self = shift;
#my $unlock = shift;
my $rollback = shift;
$self->SUPER::db_finish($rowblock_transactional,$rollback);
}
1;