You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
433 lines
13 KiB
433 lines
13 KiB
package NGCP::BulkProcessor::ConnectorPool;
|
|
use strict;
|
|
|
|
## no critic
|
|
|
|
use NGCP::BulkProcessor::Globals qw(
|
|
|
|
$rowblock_transactional
|
|
|
|
$accounting_databasename
|
|
$accounting_username
|
|
$accounting_password
|
|
$accounting_host
|
|
$accounting_port
|
|
|
|
$billing_databasename
|
|
$billing_username
|
|
$billing_password
|
|
$billing_host
|
|
$billing_port
|
|
|
|
$provisioning_databasename
|
|
$provisioning_username
|
|
$provisioning_password
|
|
$provisioning_host
|
|
$provisioning_port
|
|
|
|
$kamailio_databasename
|
|
$kamailio_username
|
|
$kamailio_password
|
|
$kamailio_host
|
|
$kamailio_port
|
|
|
|
$xa_databasename
|
|
$xa_username
|
|
$xa_password
|
|
$xa_host
|
|
$xa_port
|
|
|
|
$ngcprestapi_uri
|
|
$ngcprestapi_username
|
|
$ngcprestapi_password
|
|
$ngcprestapi_realm
|
|
|
|
$location_databaseindex
|
|
$location_password
|
|
$location_host
|
|
$location_port
|
|
$location_sock
|
|
|
|
);
|
|
|
|
|
|
use NGCP::BulkProcessor::Logging qw(getlogger);
|
|
use NGCP::BulkProcessor::LogError qw(dbclustererror dbclusterwarn); #nodumpdbset
|
|
|
|
use NGCP::BulkProcessor::SqlConnectors::MySQLDB;
|
|
#use NGCP::BulkProcessor::SqlConnectors::OracleDB;
|
|
#use NGCP::BulkProcessor::SqlConnectors::PostgreSQLDB;
|
|
#use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode
|
|
# cleanupdbfiles);
|
|
#use NGCP::BulkProcessor::SqlConnectors::CSVDB;
|
|
#use NGCP::BulkProcessor::SqlConnectors::SQLServerDB;
|
|
use NGCP::BulkProcessor::RestConnectors::NGCPRestApi;
|
|
use NGCP::BulkProcessor::NoSqlConnectors::Redis;
|
|
|
|
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
|
|
|
|
use NGCP::BulkProcessor::Utils qw(threadid);
|
|
|
|
use NGCP::BulkProcessor::Array qw(
|
|
filter
|
|
mergearrays
|
|
getroundrobinitem
|
|
getrandomitem
|
|
);
|
|
|
|
require Exporter;
|
|
our @ISA = qw(Exporter);
|
|
our @EXPORT_OK = qw(
|
|
get_accounting_db
|
|
accounting_db_tableidentifier
|
|
|
|
get_billing_db
|
|
billing_db_tableidentifier
|
|
|
|
get_provisioning_db
|
|
provisioning_db_tableidentifier
|
|
|
|
get_kamailio_db
|
|
kamailio_db_tableidentifier
|
|
|
|
get_xa_db
|
|
xa_db_tableidentifier
|
|
|
|
get_ngcp_restapi
|
|
|
|
get_location_store
|
|
|
|
destroy_dbs
|
|
destroy_stores
|
|
get_connectorinstancename
|
|
get_cluster_db
|
|
|
|
ping_dbs
|
|
ping_stores
|
|
ping
|
|
);
|
|
|
|
my $connectorinstancenameseparator = '_';
|
|
|
|
#my $logger = getlogger(__PACKAGE__);
|
|
|
|
# thread connector pools:
|
|
my $accounting_dbs = {};
|
|
my $billing_dbs = {};
|
|
my $provisioning_dbs = {};
|
|
my $kamailio_dbs = {};
|
|
my $xa_dbs = {};
|
|
|
|
my $ngcp_restapis = {};
|
|
|
|
my $location_stores = {};
|
|
|
|
sub get_accounting_db {
|
|
|
|
my ($instance_name,$reconnect) = @_;
|
|
my $name = get_connectorinstancename($instance_name);
|
|
if (!defined $accounting_dbs->{$name}) {
|
|
$accounting_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
|
|
if (!defined $reconnect) {
|
|
$reconnect = 1;
|
|
}
|
|
}
|
|
if ($reconnect) {
|
|
$accounting_dbs->{$name}->db_connect($accounting_databasename,$accounting_username,$accounting_password,$accounting_host,$accounting_port);
|
|
}
|
|
return $accounting_dbs->{$name};
|
|
|
|
}
|
|
|
|
sub accounting_db_tableidentifier {
|
|
|
|
my ($get_target_db,$tablename) = @_;
|
|
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
|
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$accounting_databasename));
|
|
|
|
}
|
|
|
|
|
|
sub get_billing_db {
|
|
|
|
my ($instance_name,$reconnect) = @_;
|
|
my $name = get_connectorinstancename($instance_name);
|
|
if (!defined $billing_dbs->{$name}) {
|
|
$billing_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
|
|
if (!defined $reconnect) {
|
|
$reconnect = 1;
|
|
}
|
|
}
|
|
if ($reconnect) {
|
|
$billing_dbs->{$name}->db_connect($billing_databasename,$billing_username,$billing_password,$billing_host,$billing_port);
|
|
}
|
|
return $billing_dbs->{$name};
|
|
|
|
}
|
|
|
|
sub billing_db_tableidentifier {
|
|
|
|
my ($get_target_db,$tablename) = @_;
|
|
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
|
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$billing_databasename));
|
|
|
|
}
|
|
|
|
sub get_provisioning_db {
|
|
|
|
my ($instance_name,$reconnect) = @_;
|
|
my $name = get_connectorinstancename($instance_name);
|
|
if (!defined $provisioning_dbs->{$name}) {
|
|
$provisioning_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
|
|
if (!defined $reconnect) {
|
|
$reconnect = 1;
|
|
}
|
|
}
|
|
if ($reconnect) {
|
|
$provisioning_dbs->{$name}->db_connect($provisioning_databasename,$provisioning_username,$provisioning_password,$provisioning_host,$provisioning_port);
|
|
}
|
|
return $provisioning_dbs->{$name};
|
|
|
|
}
|
|
|
|
sub provisioning_db_tableidentifier {
|
|
|
|
my ($get_target_db,$tablename) = @_;
|
|
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
|
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$provisioning_databasename));
|
|
|
|
}
|
|
|
|
sub get_kamailio_db {
|
|
|
|
my ($instance_name,$reconnect) = @_;
|
|
my $name = get_connectorinstancename($instance_name);
|
|
if (!defined $kamailio_dbs->{$name}) {
|
|
$kamailio_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
|
|
if (!defined $reconnect) {
|
|
$reconnect = 1;
|
|
}
|
|
}
|
|
if ($reconnect) {
|
|
$kamailio_dbs->{$name}->db_connect($kamailio_databasename,$kamailio_username,$kamailio_password,$kamailio_host,$kamailio_port);
|
|
}
|
|
return $kamailio_dbs->{$name};
|
|
|
|
}
|
|
|
|
sub kamailio_db_tableidentifier {
|
|
|
|
my ($get_target_db,$tablename) = @_;
|
|
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
|
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$kamailio_databasename));
|
|
|
|
}
|
|
|
|
|
|
sub get_xa_db {
|
|
|
|
my ($instance_name,$reconnect) = @_;
|
|
my $name = get_connectorinstancename($instance_name);
|
|
if (!defined $xa_dbs->{$name}) {
|
|
$xa_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name);
|
|
if (!defined $reconnect) {
|
|
$reconnect = 1;
|
|
}
|
|
}
|
|
if ($reconnect) {
|
|
$xa_dbs->{$name}->db_connect($xa_databasename,$xa_username,$xa_password,$xa_host,$xa_port);
|
|
}
|
|
return $xa_dbs->{$name};
|
|
|
|
}
|
|
|
|
sub xa_db_tableidentifier {
|
|
|
|
my ($get_target_db,$tablename) = @_;
|
|
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
|
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$xa_databasename));
|
|
|
|
}
|
|
|
|
sub get_ngcp_restapi {
|
|
|
|
my ($instance_name) = @_;
|
|
my $name = get_connectorinstancename($instance_name);
|
|
if (!defined $ngcp_restapis->{$name}) {
|
|
$ngcp_restapis->{$name} = NGCP::BulkProcessor::RestConnectors::NGCPRestApi->new($instance_name);
|
|
$ngcp_restapis->{$name}->setup($ngcprestapi_uri,$ngcprestapi_username,$ngcprestapi_password,$ngcprestapi_realm);
|
|
}
|
|
return $ngcp_restapis->{$name};
|
|
|
|
}
|
|
|
|
sub get_location_store {
|
|
|
|
my ($instance_name,$reconnect) = @_;
|
|
my $name = get_connectorinstancename($instance_name);
|
|
if (!defined $location_stores->{$name}) {
|
|
$location_stores->{$name} = NGCP::BulkProcessor::NoSqlConnectors::Redis->new($instance_name);
|
|
if (!defined $reconnect) {
|
|
$reconnect = 1;
|
|
}
|
|
}
|
|
if ($reconnect) {
|
|
$location_stores->{$name}->connect($location_databaseindex,$location_password,$location_host,$location_port,$location_sock);
|
|
}
|
|
return $location_stores->{$name};
|
|
|
|
}
|
|
|
|
|
|
sub get_connectorinstancename {
|
|
my ($name) = @_;
|
|
my $instance_name = threadid();
|
|
if (length($name) > 0) {
|
|
$instance_name .= $connectorinstancenameseparator . $name;
|
|
}
|
|
return $instance_name;
|
|
}
|
|
|
|
sub ping_dbs {
|
|
ping($accounting_dbs);
|
|
ping($billing_dbs);
|
|
ping($provisioning_dbs);
|
|
ping($kamailio_dbs);
|
|
ping($xa_dbs);
|
|
}
|
|
|
|
sub ping_stores {
|
|
ping($location_stores);
|
|
}
|
|
|
|
sub ping {
|
|
my $dbs = shift;
|
|
my $this_tid = threadid();
|
|
foreach my $instance_name (keys %$dbs) {
|
|
my ($tid,$name) = split(quotemeta($connectorinstancenameseparator),$instance_name,2);
|
|
next unless ($this_tid == $tid and defined $dbs->{$instance_name});
|
|
my $result = 0;
|
|
eval {
|
|
$result = $dbs->{$instance_name}->ping();
|
|
};
|
|
undef $dbs->{$instance_name} unless $result;
|
|
}
|
|
}
|
|
|
|
sub destroy_dbs {
|
|
|
|
|
|
foreach my $name (keys %$accounting_dbs) {
|
|
cleartableinfo($accounting_dbs->{$name});
|
|
undef $accounting_dbs->{$name};
|
|
delete $accounting_dbs->{$name};
|
|
}
|
|
|
|
foreach my $name (keys %$billing_dbs) {
|
|
cleartableinfo($billing_dbs->{$name});
|
|
undef $billing_dbs->{$name};
|
|
delete $billing_dbs->{$name};
|
|
}
|
|
|
|
foreach my $name (keys %$provisioning_dbs) {
|
|
cleartableinfo($provisioning_dbs->{$name});
|
|
undef $provisioning_dbs->{$name};
|
|
delete $provisioning_dbs->{$name};
|
|
}
|
|
|
|
foreach my $name (keys %$kamailio_dbs) {
|
|
cleartableinfo($kamailio_dbs->{$name});
|
|
undef $kamailio_dbs->{$name};
|
|
delete $kamailio_dbs->{$name};
|
|
}
|
|
|
|
foreach my $name (keys %$xa_dbs) {
|
|
cleartableinfo($xa_dbs->{$name});
|
|
undef $xa_dbs->{$name};
|
|
delete $xa_dbs->{$name};
|
|
}
|
|
|
|
}
|
|
|
|
sub destroy_stores {
|
|
|
|
foreach my $name (keys %$location_stores) {
|
|
undef $location_stores->{$name};
|
|
delete $location_stores->{$name};
|
|
}
|
|
|
|
}
|
|
|
|
sub get_cluster_db { # oracle RAC and the like ...
|
|
|
|
my ($cluster,$instance_name,$reconnect) = @_;
|
|
#if ((defined $cluster) and ref $cluster ne 'HASH') {
|
|
my $node = undef;
|
|
my $tid = threadid();
|
|
if ((!defined $cluster->{scheduling_vars}) or ref $cluster->{scheduling_vars} ne 'HASH') {
|
|
$cluster->{scheduling_vars} = {};
|
|
}
|
|
my $scheduling_vars = $cluster->{scheduling_vars};
|
|
if ((!defined $scheduling_vars->{$tid}) or ref $scheduling_vars->{$tid} ne 'HASH') {
|
|
$scheduling_vars->{$tid} = {};
|
|
}
|
|
$scheduling_vars = $scheduling_vars->{$tid};
|
|
my $nodes;
|
|
if (!defined $scheduling_vars->{nodes}) {
|
|
$nodes = {};
|
|
foreach my $node (@{$cluster->{nodes}}) {
|
|
if (defined $node and ref $node eq 'HASH') {
|
|
if ($node->{active}) {
|
|
$nodes->{$node->{label}} = $node;
|
|
}
|
|
} else {
|
|
dbclustererror($cluster->{name},'node configuration error',getlogger(__PACKAGE__));
|
|
}
|
|
}
|
|
$scheduling_vars->{nodes} = $nodes;
|
|
} else {
|
|
$nodes = $scheduling_vars->{nodes};
|
|
}
|
|
my @active_nodes = @{$nodes}{sort keys(%$nodes)}; #hash slice
|
|
if (defined $cluster->{scheduling_code} and ref $cluster->{scheduling_code} eq 'CODE') {
|
|
my $cluster_instance_name;
|
|
if (length($instance_name) > 0) {
|
|
$cluster_instance_name = $cluster->{name} . $connectorinstancenameseparator . $instance_name;
|
|
} else {
|
|
$cluster_instance_name = $cluster->{name};
|
|
}
|
|
($node,$scheduling_vars->{node_index}) = &{$cluster->{scheduling_code}}(\@active_nodes,$scheduling_vars->{node_index});
|
|
if (defined $node) {
|
|
my $get_db = $node->{get_db};
|
|
if (defined $get_db and ref $get_db eq 'CODE') {
|
|
my $db = undef;
|
|
eval {
|
|
$db = &{$get_db}($cluster_instance_name,$reconnect,$cluster);
|
|
};
|
|
if ($@) {
|
|
dbclusterwarn($cluster->{name},'node ' . $node->{label} . ' inactive',getlogger(__PACKAGE__));
|
|
delete $nodes->{$node->{label}};
|
|
return get_cluster_db($cluster,$instance_name,$reconnect);
|
|
} else {
|
|
#$db->cluster($cluster);
|
|
return $db;
|
|
}
|
|
} else {
|
|
dbclustererror($cluster->{name},'node ' . $node->{label} . ' configuration error',getlogger(__PACKAGE__));
|
|
delete $nodes->{$node->{label}};
|
|
return get_cluster_db($cluster,$instance_name,$reconnect);
|
|
}
|
|
}
|
|
} else {
|
|
dbclustererror($cluster->{name},'scheduling configuration error',getlogger(__PACKAGE__));
|
|
return undef;
|
|
}
|
|
|
|
#}
|
|
dbclustererror($cluster->{name},'cannot switch to next active node',getlogger(__PACKAGE__));
|
|
return undef;
|
|
|
|
}
|
|
|
|
1;
|