package NGCP::BulkProcessor::ConnectorPool; use strict; ## no critic use NGCP::BulkProcessor::Globals qw( $rowblock_transactional $accounting_databasename $accounting_username $accounting_password $accounting_host $accounting_port $billing_databasename $billing_username $billing_password $billing_host $billing_port $provisioning_databasename $provisioning_username $provisioning_password $provisioning_host $provisioning_port $kamailio_databasename $kamailio_username $kamailio_password $kamailio_host $kamailio_port $xa_databasename $xa_username $xa_password $xa_host $xa_port $ngcprestapi_uri $ngcprestapi_username $ngcprestapi_password $ngcprestapi_realm $location_databaseindex $location_password $location_host $location_port $location_sock ); use NGCP::BulkProcessor::Logging qw(getlogger); use NGCP::BulkProcessor::LogError qw(dbclustererror dbclusterwarn); #nodumpdbset use NGCP::BulkProcessor::SqlConnectors::MySQLDB; #use NGCP::BulkProcessor::SqlConnectors::OracleDB; #use NGCP::BulkProcessor::SqlConnectors::PostgreSQLDB; #use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode # cleanupdbfiles); #use NGCP::BulkProcessor::SqlConnectors::CSVDB; #use NGCP::BulkProcessor::SqlConnectors::SQLServerDB; use NGCP::BulkProcessor::RestConnectors::NGCPRestApi; use NGCP::BulkProcessor::NoSqlConnectors::Redis; use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); use NGCP::BulkProcessor::Utils qw(threadid); use NGCP::BulkProcessor::Array qw( filter mergearrays getroundrobinitem getrandomitem ); require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( get_accounting_db accounting_db_tableidentifier get_billing_db billing_db_tableidentifier get_provisioning_db provisioning_db_tableidentifier get_kamailio_db kamailio_db_tableidentifier get_xa_db xa_db_tableidentifier get_ngcp_restapi get_location_store destroy_dbs destroy_stores get_connectorinstancename get_cluster_db ping_dbs ping_stores ping ); my $connectorinstancenameseparator = '_'; #my $logger = getlogger(__PACKAGE__); # thread connector pools: my $accounting_dbs = {}; my $billing_dbs = {}; my $provisioning_dbs = {}; my $kamailio_dbs = {}; my $xa_dbs = {}; my $ngcp_restapis = {}; my $location_stores = {}; sub get_accounting_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $accounting_dbs->{$name}) { $accounting_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } } if ($reconnect) { $accounting_dbs->{$name}->db_connect($accounting_databasename,$accounting_username,$accounting_password,$accounting_host,$accounting_port); } return $accounting_dbs->{$name}; } sub accounting_db_tableidentifier { my ($get_target_db,$tablename) = @_; my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$accounting_databasename)); } sub get_billing_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $billing_dbs->{$name}) { $billing_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } } if ($reconnect) { $billing_dbs->{$name}->db_connect($billing_databasename,$billing_username,$billing_password,$billing_host,$billing_port); } return $billing_dbs->{$name}; } sub billing_db_tableidentifier { my ($get_target_db,$tablename) = @_; my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$billing_databasename)); } sub get_provisioning_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $provisioning_dbs->{$name}) { $provisioning_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } } if ($reconnect) { $provisioning_dbs->{$name}->db_connect($provisioning_databasename,$provisioning_username,$provisioning_password,$provisioning_host,$provisioning_port); } return $provisioning_dbs->{$name}; } sub provisioning_db_tableidentifier { my ($get_target_db,$tablename) = @_; my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$provisioning_databasename)); } sub get_kamailio_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $kamailio_dbs->{$name}) { $kamailio_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } } if ($reconnect) { $kamailio_dbs->{$name}->db_connect($kamailio_databasename,$kamailio_username,$kamailio_password,$kamailio_host,$kamailio_port); } return $kamailio_dbs->{$name}; } sub kamailio_db_tableidentifier { my ($get_target_db,$tablename) = @_; my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$kamailio_databasename)); } sub get_xa_db { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $xa_dbs->{$name}) { $xa_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::MySQLDB->new($rowblock_transactional,$instance_name); if (!defined $reconnect) { $reconnect = 1; } } if ($reconnect) { $xa_dbs->{$name}->db_connect($xa_databasename,$xa_username,$xa_password,$xa_host,$xa_port); } return $xa_dbs->{$name}; } sub xa_db_tableidentifier { my ($get_target_db,$tablename) = @_; my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::MySQLDB::get_tableidentifier($tablename,$xa_databasename)); } sub get_ngcp_restapi { my ($instance_name) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $ngcp_restapis->{$name}) { $ngcp_restapis->{$name} = NGCP::BulkProcessor::RestConnectors::NGCPRestApi->new($instance_name); $ngcp_restapis->{$name}->setup($ngcprestapi_uri,$ngcprestapi_username,$ngcprestapi_password,$ngcprestapi_realm); } return $ngcp_restapis->{$name}; } sub get_location_store { my ($instance_name,$reconnect) = @_; my $name = get_connectorinstancename($instance_name); if (!defined $location_stores->{$name}) { $location_stores->{$name} = NGCP::BulkProcessor::NoSqlConnectors::Redis->new($instance_name); if (!defined $reconnect) { $reconnect = 1; } } if ($reconnect) { $location_stores->{$name}->connect($location_databaseindex,$location_password,$location_host,$location_port,$location_sock); } return $location_stores->{$name}; } sub get_connectorinstancename { my ($name) = @_; my $instance_name = threadid(); if (length($name) > 0) { $instance_name .= $connectorinstancenameseparator . $name; } return $instance_name; } sub ping_dbs { ping($accounting_dbs); ping($billing_dbs); ping($provisioning_dbs); ping($kamailio_dbs); ping($xa_dbs); } sub ping_stores { ping($location_stores); } sub ping { my $dbs = shift; my $this_tid = threadid(); foreach my $instance_name (keys %$dbs) { my ($tid,$name) = split(quotemeta($connectorinstancenameseparator),$instance_name,2); next unless ($this_tid == $tid and defined $dbs->{$instance_name}); my $result = 0; eval { $result = $dbs->{$instance_name}->ping(); }; undef $dbs->{$instance_name} unless $result; } } sub destroy_dbs { foreach my $name (keys %$accounting_dbs) { cleartableinfo($accounting_dbs->{$name}); undef $accounting_dbs->{$name}; delete $accounting_dbs->{$name}; } foreach my $name (keys %$billing_dbs) { cleartableinfo($billing_dbs->{$name}); undef $billing_dbs->{$name}; delete $billing_dbs->{$name}; } foreach my $name (keys %$provisioning_dbs) { cleartableinfo($provisioning_dbs->{$name}); undef $provisioning_dbs->{$name}; delete $provisioning_dbs->{$name}; } foreach my $name (keys %$kamailio_dbs) { cleartableinfo($kamailio_dbs->{$name}); undef $kamailio_dbs->{$name}; delete $kamailio_dbs->{$name}; } foreach my $name (keys %$xa_dbs) { cleartableinfo($xa_dbs->{$name}); undef $xa_dbs->{$name}; delete $xa_dbs->{$name}; } } sub destroy_stores { foreach my $name (keys %$location_stores) { undef $location_stores->{$name}; delete $location_stores->{$name}; } } sub get_cluster_db { # oracle RAC and the like ... my ($cluster,$instance_name,$reconnect) = @_; #if ((defined $cluster) and ref $cluster ne 'HASH') { my $node = undef; my $tid = threadid(); if ((!defined $cluster->{scheduling_vars}) or ref $cluster->{scheduling_vars} ne 'HASH') { $cluster->{scheduling_vars} = {}; } my $scheduling_vars = $cluster->{scheduling_vars}; if ((!defined $scheduling_vars->{$tid}) or ref $scheduling_vars->{$tid} ne 'HASH') { $scheduling_vars->{$tid} = {}; } $scheduling_vars = $scheduling_vars->{$tid}; my $nodes; if (!defined $scheduling_vars->{nodes}) { $nodes = {}; foreach my $node (@{$cluster->{nodes}}) { if (defined $node and ref $node eq 'HASH') { if ($node->{active}) { $nodes->{$node->{label}} = $node; } } else { dbclustererror($cluster->{name},'node configuration error',getlogger(__PACKAGE__)); } } $scheduling_vars->{nodes} = $nodes; } else { $nodes = $scheduling_vars->{nodes}; } my @active_nodes = @{$nodes}{sort keys(%$nodes)}; #hash slice if (defined $cluster->{scheduling_code} and ref $cluster->{scheduling_code} eq 'CODE') { my $cluster_instance_name; if (length($instance_name) > 0) { $cluster_instance_name = $cluster->{name} . $connectorinstancenameseparator . $instance_name; } else { $cluster_instance_name = $cluster->{name}; } ($node,$scheduling_vars->{node_index}) = &{$cluster->{scheduling_code}}(\@active_nodes,$scheduling_vars->{node_index}); if (defined $node) { my $get_db = $node->{get_db}; if (defined $get_db and ref $get_db eq 'CODE') { my $db = undef; eval { $db = &{$get_db}($cluster_instance_name,$reconnect,$cluster); }; if ($@) { dbclusterwarn($cluster->{name},'node ' . $node->{label} . ' inactive',getlogger(__PACKAGE__)); delete $nodes->{$node->{label}}; return get_cluster_db($cluster,$instance_name,$reconnect); } else { #$db->cluster($cluster); return $db; } } else { dbclustererror($cluster->{name},'node ' . $node->{label} . ' configuration error',getlogger(__PACKAGE__)); delete $nodes->{$node->{label}}; return get_cluster_db($cluster,$instance_name,$reconnect); } } } else { dbclustererror($cluster->{name},'scheduling configuration error',getlogger(__PACKAGE__)); return undef; } #} dbclustererror($cluster->{name},'cannot switch to next active node',getlogger(__PACKAGE__)); return undef; } 1;