You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rate-o-mat/rate-o-mat.pl

3903 lines
145 KiB

#!/usr/bin/perl -w
use lib '/usr/share/ngcp-rate-o-mat';
use strict;
use warnings;
use DBI;
use POSIX qw(setsid mktime ceil);
use Fcntl qw(LOCK_EX LOCK_NB SEEK_SET);
use IO::Handle;
use IO::Socket::UNIX;
use NetAddr::IP;
use Data::Dumper;
use Time::HiRes qw(); #for debugging info only
use List::Util qw(shuffle);
use Storable qw(dclone);
use JSON::XS qw(encode_json decode_json);
# constants: ###########################################################
$0 = 'ngcp-rate-o-mat'; ## no critic (Variables::RequireLocalizedPunctuationVars)
my $fork = $ENV{RATEOMAT_DAEMONIZE} // 0;
my $pidfile = $ENV{RATEOMAT_PIDFILE} // '/run/ngcp-rate-o-mat.pid';
my $type = 'call';
my $loop_interval = ((defined $ENV{RATEOMAT_LOOP_INTERVAL} && $ENV{RATEOMAT_LOOP_INTERVAL}) ? int $ENV{RATEOMAT_LOOP_INTERVAL} : 10);
my $debug = ((defined $ENV{RATEOMAT_DEBUG} && $ENV{RATEOMAT_DEBUG}) ? int $ENV{RATEOMAT_DEBUG} : 0);
# number of unrated cdrs to fetch at once:
my $batch_size = ((defined $ENV{RATEOMAT_BATCH_SIZE} && $ENV{RATEOMAT_BATCH_SIZE} > 0) ? int $ENV{RATEOMAT_BATCH_SIZE} : 100);
# if rate-o-mat processes are working on the same accounting.cdr table:
# set to 1 to minimize collisions (and thus rollbacks)
my $shuffle_batch = ((defined $ENV{RATEOMAT_SHUFFLE_BATCH} && $ENV{RATEOMAT_SHUFFLE_BATCH}) ? int $ENV{RATEOMAT_SHUFFLE_BATCH} : 0);
# preload the whole prepaid_costs table, if number of records
# is below this limit:
my $prepaid_costs_cache_limit = ((defined $ENV{RATEOMAT_PREPAID_COSTS_CACHE} && $ENV{RATEOMAT_PREPAID_COSTS_CACHE} > 0) ? int $ENV{RATEOMAT_PREPAID_COSTS_CACHE} : 10000);
# if split_peak_parts is set to true, rate-o-mat will create a separate
# CDR every time a peak time border is crossed for either the customer,
# the reseller or the carrier billing profile.
my $split_peak_parts = ((defined $ENV{RATEOMAT_SPLIT_PEAK_PARTS} && $ENV{RATEOMAT_SPLIT_PEAK_PARTS}) ? int $ENV{RATEOMAT_SPLIT_PEAK_PARTS} : 0);
# set to 1 to write real call costs to CDRs for postpaid, even if balance was consumed:
my $use_customer_real_cost = 0;
my $use_provider_real_cost = 0;
# don't update balance of prepaid contracts, if no prepaid_costs record is found (re-rating):
my $prepaid_update_balance = ((defined $ENV{RATEOMAT_PREPAID_UPDATE_BALANCE} && $ENV{RATEOMAT_PREPAID_UPDATE_BALANCE}) ? int $ENV{RATEOMAT_PREPAID_UPDATE_BALANCE} : 0);
# control writing cdr relation data:
# disable it for now until this will be limited to prepaid contracts,
# as it produces massive amounts of zeroed or unneeded data.
my $write_cash_balance_before_after = $ENV{RATEOMAT_WRITE_CDR_RELATION_DATA} // 0;
my $write_free_time_balance_before_after = $ENV{RATEOMAT_WRITE_CDR_RELATION_DATA} // 0;
my $write_profile_package_id = $ENV{RATEOMAT_WRITE_CDR_RELATION_DATA} // 0;
my $write_contract_balance_id = $ENV{RATEOMAT_WRITE_CDR_RELATION_DATA} // 0;
# terminate if the same cdr fails $failed_cdr_max_retries + 1 times:
my $failed_cdr_max_retries = ((defined $ENV{RATEOMAT_MAX_RETRIES} && $ENV{RATEOMAT_MAX_RETRIES} >= 0) ? int $ENV{RATEOMAT_MAX_RETRIES} : 2);
my $failed_cdr_retry_delay = ((defined $ENV{RATEOMAT_RETRY_DELAY} && $ENV{RATEOMAT_RETRY_DELAY} >= 0) ? int $ENV{RATEOMAT_RETRY_DELAY} : 30);
# with 2 retries and 30sec delay, rato-o-mat tolerates a replication
# lag of around 60secs until it terminates.
# use source_user if number and source_cli =~ /anonymous/i:
my $offnet_anonymous_source_cli_fallback = 1;
# pause between db connect attempts:
my $connect_interval = 3;
my $maintenance_mode = $ENV{RATEOMAT_MAINTENANCE} // 'no';
my $lock_timeout = 5;
my $hostname_filepath = '/etc/ngcp_hostname';
$hostname_filepath = $ENV{RATEOMAT_HOSTNAME_FILEPATH} if exists $ENV{RATEOMAT_HOSTNAME_FILEPATH};
my $multi_master = ((defined $ENV{RATEOMAT_MUTLI_MASTER} && $ENV{RATEOMAT_MUTLI_MASTER}) ? int $ENV{RATEOMAT_MUTLI_MASTER} : 0);
my $multi_master_stall = 1; # idle if other node repl is stopped
#execute contract subscriber locks if fraud limits are exceeded after a call:
my $apply_fraud_lock = ((defined $ENV{RATEOMAT_FRAUD_LOCK} && $ENV{RATEOMAT_FRAUD_LOCK}) ? int $ENV{RATEOMAT_FRAUD_LOCK} : 0);
# test may execute rate-o-mat on another host with different
# timezone. the connection timezone can therefore be forced to
# eg. the UTC default on ngcp.
my $connection_timezone = $ENV{RATEOMAT_CONNECTION_TIMEZONE};
# $ENV{TZ} has to be adjusted in the root thread.
# option to transform onpeak/offpeak times to a subscriber contract's timezone:
my $subscriber_offpeak_tz = 0; #0;
# billing database
my $BillDB_Name = $ENV{RATEOMAT_BILLING_DB_NAME} || 'billing';
my $BillDB_Host = $ENV{RATEOMAT_BILLING_DB_HOST} || 'localhost';
my $BillDB_Port = $ENV{RATEOMAT_BILLING_DB_PORT} ? int $ENV{RATEOMAT_BILLING_DB_PORT} : 3306;
my $BillDB_User = $ENV{RATEOMAT_BILLING_DB_USER} || die "Missing billing DB user setting.";
my $BillDB_Pass = $ENV{RATEOMAT_BILLING_DB_PASS}; # || die "Missing billing DB password setting.";
# accounting database
my $AcctDB_Name = $ENV{RATEOMAT_ACCOUNTING_DB_NAME} || 'accounting';
my $AcctDB_Host = $ENV{RATEOMAT_ACCOUNTING_DB_HOST} || 'localhost';
my $AcctDB_Port = $ENV{RATEOMAT_ACCOUNTING_DB_PORT} ? int $ENV{RATEOMAT_ACCOUNTING_DB_PORT} : 3306;
my $AcctDB_User = $ENV{RATEOMAT_ACCOUNTING_DB_USER} || die "Missing accounting DB user setting.";
my $AcctDB_Pass = $ENV{RATEOMAT_ACCOUNTING_DB_PASS}; # || die "Missing accounting DB password setting.";
# provisioning database
my $ProvDB_Name = $ENV{RATEOMAT_PROVISIONING_DB_NAME} || 'provisioning';
my $ProvDB_Host = $ENV{RATEOMAT_PROVISIONING_DB_HOST} || 'localhost';
my $ProvDB_Port = $ENV{RATEOMAT_PROVISIONING_DB_PORT} ? int $ENV{RATEOMAT_PROVISIONING_DB_PORT} : 3306;
my $ProvDB_User = $ENV{RATEOMAT_PROVISIONING_DB_USER};
my $ProvDB_Pass = $ENV{RATEOMAT_PROVISIONING_DB_PASS};
# duplication database
my $DupDB_Name = $ENV{RATEOMAT_DUPLICATE_DB_NAME} || 'accounting';
my $DupDB_Host = $ENV{RATEOMAT_DUPLICATE_DB_HOST} || 'localhost';
my $DupDB_Port = $ENV{RATEOMAT_DUPLICATE_DB_PORT} ? int $ENV{RATEOMAT_DUPLICATE_DB_PORT} : 3306;
my $DupDB_User = $ENV{RATEOMAT_DUPLICATE_DB_USER};
my $DupDB_Pass = $ENV{RATEOMAT_DUPLICATE_DB_PASS};
my @cdr_fields = qw(source_user_id source_provider_id source_external_subscriber_id source_external_contract_id source_account_id source_user source_domain source_cli source_clir source_ip source_lnp_prefix source_user_out destination_user_id destination_provider_id destination_external_subscriber_id destination_external_contract_id destination_account_id destination_user destination_domain destination_user_dialed destination_user_in destination_domain_in destination_lnp_prefix destination_user_out peer_auth_user peer_auth_realm call_type call_status call_code init_time start_time duration call_id source_carrier_cost source_reseller_cost source_customer_cost source_carrier_free_time source_reseller_free_time source_customer_free_time source_carrier_billing_fee_id source_reseller_billing_fee_id source_customer_billing_fee_id source_carrier_billing_zone_id source_reseller_billing_zone_id source_customer_billing_zone_id destination_carrier_cost destination_reseller_cost destination_customer_cost destination_carrier_free_time destination_reseller_free_time destination_customer_free_time destination_carrier_billing_fee_id destination_reseller_billing_fee_id destination_customer_billing_fee_id destination_carrier_billing_zone_id destination_reseller_billing_zone_id destination_customer_billing_zone_id frag_carrier_onpeak frag_reseller_onpeak frag_customer_onpeak is_fragmented split rated_at rating_status exported_at export_status source_lnp_type destination_lnp_type);
foreach my $gpp_idx(0 .. 9) {
push @cdr_fields, ("source_gpp$gpp_idx", "destination_gpp$gpp_idx");
}
my @mos_data_fields = qw(mos_average mos_average_packetloss mos_average_jitter mos_average_roundtrip);
my $acc_cash_balance_col_model_key = 1;
my $acc_time_balance_col_model_key = 2;
my $acc_relation_col_model_key = 3;
my $acc_tag_col_model_key = 4;
my $dup_cash_balance_col_model_key = 5;
my $dup_time_balance_col_model_key = 6;
my $dup_relation_col_model_key = 7;
my $dup_tag_col_model_key = 8;
# globals: #############################################################
my $shutdown = 0;
my $prepaid_costs_cache;
my %cdr_col_models = ();
my $rollback;
my $log_fatal = 1;
# load equalization using first or second order low pass filter:
my $cps_info = {
rated => 0,
rated_old => 0,
d_rated => 0,
d_rated_old => 0,
dd_rated => 0,
t => 0.0,
t_old => 0.0,
dt => 0.0,
delay => $loop_interval,
cps => 0.0,
speedup => 0.02,
speeddown => 0.01,
};
# stmt handlers: #######################################################
my $billdbh;
my $acctdbh;
my $provdbh;
my $dupdbh;
my $sth_get_contract_info;
my $sth_get_subscriber_contract_id;
my $sth_billing_info_network;
my $sth_billing_info;
my $sth_profile_info;
my $sth_profile_fraud_info;
my $sth_contract_fraud_info;
my $sth_upsert_cdr_period_costs;
my $sth_get_cdr_period_costs;
my $sth_duplicate_upsert_cdr_period_costs;
my $sth_duplicate_get_cdr_period_costs;
my $sth_offpeak;
my $sth_offpeak_subscriber;
my $sth_unrated_cdrs;
my $sth_get_cdr;
my $sth_lock_cdr;
my $sth_update_cdr;
my $sth_create_cdr_fragment;
my $sth_mos_data;
my $sth_get_cbalances;
my $sth_update_cbalance_w_underrun_profiles_lock;
my $sth_update_cbalance_w_underrun_lock;
my $sth_update_cbalance_w_underrun_profiles;
my $sth_update_cbalance;
my $sth_new_cbalance;
my $sth_new_cbalance_infinite_future;
my $sth_get_last_cbalance;
my $sth_get_cbalance;
my $sth_get_first_cbalance;
my $sth_get_last_topup_cbalance,
my $sth_lnp_number;
my $sth_lnp_profile_info;
my $sth_prepaid_costs_cache;
my $sth_prepaid_costs_count;
my $sth_prepaid_cost;
my $sth_delete_prepaid_cost;
my $sth_delete_old_prepaid;
my $sth_get_billing_voip_subscribers;
my $sth_lock_billing_subscribers;
my $sth_unlock_billing_subscribers;
my $sth_get_provisioning_voip_subscribers;
my $sth_get_usr_preference_attribute;
my $sth_get_usr_preference_value;
my $sth_create_usr_preference_value;
my $sth_update_usr_preference_value;
my $sth_delete_usr_preference_value;
my $sth_duplicate_cdr;
my $sth_duplicate_mos_data;
# run the main loop: ##################################################
main();
exit 0;
# implementation: ######################################################
sub FATAL {
my $msg = shift;
chomp $msg;
die "FATAL: $msg\n";
}
sub DEBUG {
return unless $debug;
my $msg = shift;
$msg = &$msg() if 'CODE' eq ref $msg;
chomp $msg;
$msg =~ s/#012 +/ /g;
print "DEBUG: $msg\n";
}
sub INFO {
my $msg = shift;
chomp $msg;
print "INFO: $msg\n";
}
sub WARNING {
my $msg = shift;
chomp $msg;
warn "WARNING: $msg\n";
}
sub _sql_offpeak_convert_tz {
my ($date_col) = @_;
return 'unix_timestamp(convert_tz('.$date_col.
',@@session.time_zone,(select coalesce((select tz.name FROM billing.v_contract_timezone tz WHERE tz.contract_id = ? LIMIT 1),@@session.time_zone))))';
}
sub sql_time {
my ($time) = @_;
my ($y, $m, $d, $H, $M, $S) = (localtime($time))[5,4,3,2,1,0];
$y += 1900;
$m += 1;
return sprintf('%04d-%02d-%02d %02d:%02d:%02d', $y, $m, $d, $H, $M, $S);
}
sub set_start_strtime {
my $start = shift;
my $r_str = shift;
$$r_str = sql_time($start);
return 0;
}
sub connect_billdbh {
do {
INFO "Trying to connect to billing db...";
$billdbh = DBI->connect("dbi:mysql:database=$BillDB_Name;host=$BillDB_Host;port=$BillDB_Port", $BillDB_User, $BillDB_Pass, {AutoCommit => 1, mysql_auto_reconnect => 0, mysql_no_autocommit_cmd => 0, PrintError => 1, PrintWarn => 0});
} while(!defined $billdbh && ($DBI::err == 2002 || $DBI::err == 2003) && !$shutdown && sleep $connect_interval);
FATAL "Error connecting to db: ".$DBI::errstr
unless defined($billdbh);
if ($multi_master) {
$billdbh->do("SET SESSION binlog_format = 'STATEMENT'") or WARNING 'error setting session binlog_format';
}
$billdbh->do('SET time_zone = ?',undef,$connection_timezone) or FATAL 'error setting connection timezone' if $connection_timezone;
INFO "Successfully connected to billing db...";
}
sub connect_acctdbh {
do {
INFO "Trying to connect to accounting db...";
$acctdbh = DBI->connect("dbi:mysql:database=$AcctDB_Name;host=$AcctDB_Host;port=$AcctDB_Port", $AcctDB_User, $AcctDB_Pass, {AutoCommit => 1, mysql_auto_reconnect => 0, mysql_no_autocommit_cmd => 0, PrintError => 1, PrintWarn => 0});
} while(!defined $acctdbh && ($DBI::err == 2002 || $DBI::err == 2003) && !$shutdown && sleep $connect_interval);
FATAL "Error connecting to db: ".$DBI::errstr
unless defined($acctdbh);
if ($multi_master) {
$acctdbh->do("SET SESSION binlog_format = 'STATEMENT'") or WARNING 'error setting session binlog_format';
}
$acctdbh->do('SET time_zone = ?',undef,$connection_timezone) or FATAL 'error setting connection timezone' if $connection_timezone;
INFO "Successfully connected to accounting db...";
}
sub connect_provdbh {
unless ($ProvDB_User) {
undef $dupdbh;
WARNING "No provisioning db credentials, disabled.";
return;
}
do {
INFO "Trying to connect to provisioning db...";
$provdbh = DBI->connect("dbi:mysql:database=$ProvDB_Name;host=$ProvDB_Host;port=$ProvDB_Port", $ProvDB_User, $ProvDB_Pass, {AutoCommit => 1, mysql_auto_reconnect => 0, mysql_no_autocommit_cmd => 0, PrintError => 1, PrintWarn => 0});
} while(!defined $provdbh && ($DBI::err == 2002 || $DBI::err == 2003) && !$shutdown && sleep $connect_interval);
FATAL "Error connecting to db: ".$DBI::errstr
unless defined($provdbh);
$provdbh->do('SET time_zone = ?',undef,$connection_timezone) or FATAL 'error setting connection timezone' if $connection_timezone;
INFO "Successfully connected to provisioning db...";
}
sub connect_dupdbh {
unless ($DupDB_User) {
undef $dupdbh;
INFO "No duplication db credentials, disabled.";
return;
}
do {
INFO "Trying to connect to duplication db...";
$dupdbh = DBI->connect("dbi:mysql:database=$DupDB_Name;host=$DupDB_Host;port=$DupDB_Port", $DupDB_User, $DupDB_Pass, {AutoCommit => 1, mysql_auto_reconnect => 0, mysql_no_autocommit_cmd => 0, PrintError => 1, PrintWarn => 0});
} while(!defined $dupdbh && ($DBI::err == 2002 || $DBI::err == 2003) && !$shutdown && sleep $connect_interval);
FATAL "Error connecting to db: ".$DBI::errstr
unless defined($dupdbh);
$dupdbh->do("SET SESSION binlog_format = 'STATEMENT'") or WARNING 'error setting session binlog_format';
$dupdbh->do('SET time_zone = ?',undef,$connection_timezone) or FATAL 'error setting connection timezone' if $connection_timezone;
INFO "Successfully connected to duplication db...";
}
sub begin_transaction {
my ($dbh,$isolation_level) = @_;
if ($dbh) {
if ($isolation_level) {
$dbh->do('SET TRANSACTION ISOLATION LEVEL '.$isolation_level) or FATAL "Error setting transaction isolation level: ".$dbh->errstr;
}
$dbh->begin_work or FATAL "Error starting transaction: ".$dbh->errstr;
}
}
sub commit_transaction {
my $dbh = shift;
if ($dbh) {
#capture result to force list context and prevent legacy komodo perl5db.pl bug:
my @wa = $dbh->commit or FATAL "Error committing: ".$dbh->errstr;
}
}
sub rollback_transaction {
my $dbh = shift;
if ($dbh) {
my @wa = $dbh->rollback or FATAL "Error rolling back: ".$dbh->errstr;
}
}
sub rollback_all {
eval { rollback_transaction($billdbh); };
eval { rollback_transaction($provdbh); };
eval { rollback_transaction($acctdbh); };
eval { rollback_transaction($dupdbh); };
}
sub bigint_to_bytes {
my ($bigint,$size) = @_;
return pack('C' x $size, map { hex($_) } (sprintf('%0' . 2 * $size . 's',substr($bigint->as_hex(),2)) =~ /(..)/g));
}
sub is_infinite_unix {
my $unix_ts = shift;
return 1 unless defined $unix_ts; #internally, we use undef for infinite future
return $unix_ts == 0 ? 1 : 0; #If you pass an out-of-range date to UNIX_TIMESTAMP(), it returns 0
}
sub last_day_of_month {
my $t = shift;
my ($month,$year) = (localtime($t))[4,5];
$month++;
$year += 1900;
if (1 == $month || 3 == $month || 5 == $month || 7 == $month || 8 == $month || 10 == $month || 12 == $month) {
return 31;
} elsif (2 == $month) {
my $is_leap_year = 0;
if ($year % 4 == 0) {
$is_leap_year = 1;
}
if ($year % 100 == 0) {
$is_leap_year = 0;
}
if ($year % 400 == 0) {
$is_leap_year = 1;
}
if ($is_leap_year) {
return 29;
} else {
return 28;
}
} else {
return 30;
}
}
sub init_db {
connect_billdbh;
connect_provdbh;
connect_acctdbh;
connect_dupdbh;
$sth_get_contract_info = $billdbh->prepare(
"SELECT UNIX_TIMESTAMP(c.create_timestamp),".
" UNIX_TIMESTAMP(c.modify_timestamp),".
" co.reseller_id,".
" p.id,".
" p.balance_interval_unit,".
" p.balance_interval_value,".
" p.balance_interval_start_mode,".
" p.carry_over_mode,".
" p.notopup_discard_intervals,".
" p.underrun_profile_threshold,".
" p.underrun_lock_threshold,".
" p.underrun_lock_level, ".
" (SELECT COUNT(*) FROM billing.package_profile_sets WHERE package_id = p.id AND discriminator = 'underrun') as underrun_profiles_count, ".
" product.class ".
"FROM billing.contracts c ".
"JOIN billing.products product ON c.product_id = product.id ".
"LEFT JOIN billing.profile_packages p ON c.profile_package_id = p.id ".
"LEFT JOIN billing.contacts co ON c.contact_id = co.id ".
"WHERE c.id = ?"
) or FATAL "Error preparing subscriber contract info statement: ".$billdbh->errstr;
$sth_get_last_cbalance = $billdbh->prepare(
"SELECT id, UNIX_TIMESTAMP(start), UNIX_TIMESTAMP(end), cash_balance, cash_balance_interval, ".
"free_time_balance, free_time_balance_interval, topup_count, timely_topup_count ".
"FROM billing.contract_balances ".
"WHERE contract_id = ? ".
"ORDER BY end DESC LIMIT 1"
) or FATAL "Error preparing get last contract balance statement: ".$billdbh->errstr;
$sth_get_cbalance = $billdbh->prepare(
"SELECT id, UNIX_TIMESTAMP(start), UNIX_TIMESTAMP(end), cash_balance, cash_balance_interval, ".
"free_time_balance, free_time_balance_interval, topup_count, timely_topup_count ".
"FROM billing.contract_balances ".
"WHERE id = ?"
) or FATAL "Error preparing get last contract balance statement: ".$billdbh->errstr;
$sth_get_first_cbalance = $billdbh->prepare(
"SELECT UNIX_TIMESTAMP(start),UNIX_TIMESTAMP(end) ".
"FROM billing.contract_balances ".
"WHERE contract_id = ? ".
"ORDER BY start ASC LIMIT 1"
) or FATAL "Error preparing get first contract balance statement: ".$billdbh->errstr;
$sth_get_last_topup_cbalance = $billdbh->prepare(
"SELECT UNIX_TIMESTAMP(start),UNIX_TIMESTAMP(end) ".
"FROM billing.contract_balances ".
"WHERE contract_id = ? AND ".
"topup_count > 0 ".
"ORDER BY end DESC LIMIT 1"
) or FATAL "Error preparing get last topup contract balance statement: ".$billdbh->errstr;
$sth_get_subscriber_contract_id = $billdbh->prepare(
"SELECT contract_id FROM billing.voip_subscribers WHERE uuid = ?"
) or FATAL "Error preparing subscriber contract id statement: ".$billdbh->errstr;
$sth_billing_info_network = $billdbh->prepare(<<EOS
SELECT bp.id, bp.prepaid,
bp.interval_charge, bp.interval_free_time, bp.interval_free_cash,
bp.interval_unit, bp.interval_count, bp.ignore_domain
FROM billing.billing_profiles bp
WHERE bp.id = billing.get_billing_profile_by_contract_id_network(?,?,?)
EOS
) or FATAL "Error preparing network billing info statement: ".$billdbh->errstr;
$sth_billing_info = $billdbh->prepare(<<EOS
SELECT bp.id, bp.prepaid,
bp.interval_charge, bp.interval_free_time, bp.interval_free_cash,
bp.interval_unit, bp.interval_count, bp.ignore_domain
FROM billing.billing_profiles bp
WHERE bp.id = billing.get_billing_profile_by_contract_id(?,?)
EOS
) or FATAL "Error preparing billing info statement: ".$billdbh->errstr;
$sth_lnp_number = $billdbh->prepare(
"SELECT lnp_provider_id,type FROM billing.lnp_numbers WHERE id = billing.get_lnp_number_id(?,?)"
) or FATAL "Error preparing LNP number statement: ".$billdbh->errstr;
$sth_profile_info = $billdbh->prepare(
"SELECT id, source, destination, ".
"onpeak_init_rate, onpeak_init_interval, ".
"onpeak_follow_rate, onpeak_follow_interval, ".
"offpeak_init_rate, offpeak_init_interval, ".
"offpeak_follow_rate, offpeak_follow_interval, ".
"billing_zones_history_id, offpeak_use_free_time, onpeak_use_free_time, ".
"onpeak_extra_second, onpeak_extra_rate, ".
"offpeak_extra_second, offpeak_extra_rate ".
"FROM billing.billing_fees_history WHERE id = billing.get_billing_fee_id(?,?,?,?,?,null)"
) or FATAL "Error preparing profile info statement: ".$billdbh->errstr;
$sth_profile_fraud_info = $billdbh->prepare(
"SELECT bp.fraud_interval_limit, bp.fraud_daily_limit, " .
"bp.fraud_interval_lock, bp.fraud_daily_lock, bp.fraud_use_reseller_rates " .
"FROM billing.billing_profiles bp WHERE bp.id = ?"
) or FATAL "Error preparing profile fraud info statement: ".$billdbh->errstr;
$sth_contract_fraud_info = $billdbh->prepare(
"SELECT cfp.fraud_interval_limit, cfp.fraud_daily_limit, " .
"cfp.fraud_interval_lock, cfp.fraud_daily_lock " .
"FROM billing.contract_fraud_preferences cfp WHERE cfp.contract_id = ?"
) or FATAL "Error preparing contract fraud info statement: ".$billdbh->errstr;
$sth_lnp_profile_info = $billdbh->prepare(
"SELECT id, source, destination, ".
"onpeak_init_rate, onpeak_init_interval, ".
"onpeak_follow_rate, onpeak_follow_interval, ".
"offpeak_init_rate, offpeak_init_interval, ".
"offpeak_follow_rate, offpeak_follow_interval, ".
"billing_zones_history_id, offpeak_use_free_time, onpeak_use_free_time ".
"FROM billing.billing_fees_history WHERE id = billing.get_billing_fee_id(?,?,?,null,?,\"exact_destination\")"
) or FATAL "Error preparing LNP profile info statement: ".$billdbh->errstr;
$sth_offpeak = $billdbh->prepare("select ".
"unix_timestamp(concat(date_enum.d,' ',pw.start)),unix_timestamp(concat(date_enum.d,' ',pw.end))".
" from ngcp.date_range_helper as date_enum ".
"join billing.billing_peaktime_weekdays pw on pw.weekday=weekday(date_enum.d) ".
"where date_enum.d >= date(from_unixtime(?)) ".
"and date_enum.d <= date(from_unixtime(? + ?)) ".
"and pw.billing_profile_id = ?".
" union ".
"select ".
"unix_timestamp(ps.start),unix_timestamp(ps.end)" .
" from billing.billing_peaktime_special as ps ".
"where ps.billing_profile_id = ? ".
"and (ps.start <= from_unixtime(? + ?) and ps.end >= from_unixtime(?))"
) or FATAL "Error preparing offpeak statement: ".$billdbh->errstr;
$sth_offpeak_subscriber = $billdbh->prepare("select ".
_sql_offpeak_convert_tz("concat(date_enum.d,' ',pw.start)") .','. _sql_offpeak_convert_tz("concat(date_enum.d,' ',pw.end)") .
" from ngcp.date_range_helper as date_enum ".
"join billing.billing_peaktime_weekdays pw on pw.weekday=weekday(date_enum.d) ".
"where date_enum.d >= date(from_unixtime(?)) ".
"and date_enum.d <= date(from_unixtime(? + ?)) ".
"and pw.billing_profile_id = ?".
" union ".
"select ".
_sql_offpeak_convert_tz("ps.start") .','. _sql_offpeak_convert_tz("ps.end") .
" from billing.billing_peaktime_special as ps ".
"where ps.billing_profile_id = ? ".
"and (ps.start <= from_unixtime(? + ?) and ps.end >= from_unixtime(?))"
) or FATAL "Error preparing offpeak subscriber statement: ".$billdbh->errstr;
$sth_unrated_cdrs = $acctdbh->prepare(
"SELECT * ".
"FROM accounting.cdr WHERE rating_status = 'unrated' ".
"AND id > ? ORDER BY start_time ASC LIMIT " . $batch_size
) or FATAL "Error preparing unrated cdr statement: ".$acctdbh->errstr;
$sth_get_cdr = $acctdbh->prepare(
"SELECT * ".
"FROM accounting.cdr WHERE id = ?"
) or FATAL "Error preparing get cdr statement: ".$acctdbh->errstr;
$sth_lock_cdr = $acctdbh->prepare(
"SELECT id, rating_status ".
"FROM accounting.cdr WHERE id = ? FOR UPDATE"
) or FATAL "Error preparing lock cdr statement: ".$acctdbh->errstr;
$sth_update_cdr = $acctdbh->prepare(
"UPDATE LOW_PRIORITY accounting.cdr SET ".
"source_carrier_cost = ?, source_reseller_cost = ?, source_customer_cost = ?, ".
"source_carrier_free_time = ?, source_reseller_free_time = ?, source_customer_free_time = ?, ".
"rated_at = ?, rating_status = ?, ".
"source_carrier_billing_fee_id = ?, source_reseller_billing_fee_id = ?, source_customer_billing_fee_id = ?, ".
"source_carrier_billing_zone_id = ?, source_reseller_billing_zone_id = ?, source_customer_billing_zone_id = ?, ".
"destination_carrier_cost = ?, destination_reseller_cost = ?, destination_customer_cost = ?, ".
"destination_carrier_free_time = ?, destination_reseller_free_time = ?, destination_customer_free_time = ?, ".
"destination_carrier_billing_fee_id = ?, destination_reseller_billing_fee_id = ?, destination_customer_billing_fee_id = ?, ".
"destination_carrier_billing_zone_id = ?, destination_reseller_billing_zone_id = ?, destination_customer_billing_zone_id = ?, ".
"frag_carrier_onpeak = ?, frag_reseller_onpeak = ?, frag_customer_onpeak = ?, is_fragmented = ?, ".
"duration = ? ".
"WHERE id = ?"
) or FATAL "Error preparing update cdr statement: ".$acctdbh->errstr;
my $upsert_cdr_period_costs_stmt = "INSERT INTO accounting.cdr_period_costs (" .
" id," .
" contract_id," .
" period," .
" period_date," .
" direction," .
#billing_profile_id,
" customer_cost," .
" reseller_cost," .
" cdr_count," .
" fraud_limit_exceeded," .
" fraud_limit_type," .
" first_cdr_start_time," .
" first_cdr_id," .
" last_cdr_start_time," .
" last_cdr_id" .
") VALUES (" .
" NULL," .
" ?," . #_contract_id," .
" ?," . #'month'," .
" ?," . #_month_period_date," .
" ?," . #_direction," .
#_billing_profile_id,
" ?," . #_customer_cost," .
" ?," . #_reseller_cost," .
" 1," .
" if(? > 0," . #_fraud_use_reseller_rates
" if(coalesce(? + 0.0 >= ? + 0.0,0),1,0)," . #_reseller_cost _fraud_interval_limit
" if(coalesce(? + 0.0 >= ? + 0.0,0),1,0))," . #_customer_cost _fraud_interval_limit
" ?," . #_fraud_limit_type," .
" ?," . #_cdr_start_time," .
" ?," . #_cdr_id," .
" ?," . #_cdr_start_time" .
" ?" . #_cdr_id," .
") ON DUPLICATE KEY UPDATE " .
#billing_profile_id = _billing_profile_id,
" id = LAST_INSERT_ID(id)," . #_customer_cost," .
" fraud_limit_exceeded = if(? > 0," . #_fraud_use_reseller_rates
" if(coalesce(? + reseller_cost >= ? + 0.0,0),1,0)," . #_reseller_cost _fraud_interval_limit
" if(coalesce(? + customer_cost >= ? + 0.0,0),1,0))," . #_customer_cost _fraud_interval_limit
" customer_cost = ? + customer_cost," . #_customer_cost," .
" reseller_cost = ? + reseller_cost," . #_reseller_cost," .
" cdr_count = cdr_count + 1," .
" fraud_limit_type = ?," . #_fraud_limit_type
" first_cdr_start_time = if(? + 0.0 < first_cdr_start_time," . #_cdr_start_time
" ?," . #_cdr_start_time
" first_cdr_start_time)," .
" first_cdr_id = if(? + 0 < first_cdr_id," . #_cdr_id
" ?," . #_cdr_id
" first_cdr_id)," .
" last_cdr_start_time = if(? + 0.0 > last_cdr_start_time," . #_cdr_start_time
" ?," . #_cdr_start_time
" last_cdr_start_time)," .
" last_cdr_id = if(? + 0 > last_cdr_id," . #_cdr_id
" ?," . #_cdr_id
" last_cdr_id)";
my $get_cdr_period_costs_stmt = "SELECT " .
"cpc.fraud_limit_exceeded, cpc.customer_cost, cpc.reseller_cost, cpc.cdr_count " .
"FROM accounting.cdr_period_costs as cpc WHERE " .
"cpc.id = LAST_INSERT_ID()";
$sth_upsert_cdr_period_costs = $acctdbh->prepare(
$upsert_cdr_period_costs_stmt
) or FATAL "Error preparing upsert cdr period costs statement: ".$acctdbh->errstr;
$sth_get_cdr_period_costs = $acctdbh->prepare(
$get_cdr_period_costs_stmt
) or FATAL "Error preparing get cdr period costs statement: ".$acctdbh->errstr;
$sth_mos_data = $acctdbh->prepare(
"SELECT * ".
"FROM accounting.cdr_mos_data WHERE cdr_id = ?"
) or FATAL "Error preparing mos data statement: ".$acctdbh->errstr;
if ($split_peak_parts) {
my @exclude_fragment_fields = qw(start_time duration is_fragmented);
my %exclude_fragment_fields = map { $_ => 1 } @exclude_fragment_fields;
my @fragment_fields = grep {!$exclude_fragment_fields{$_}} @cdr_fields;
$sth_create_cdr_fragment = $acctdbh->prepare(
"INSERT INTO accounting.cdr (".
join(',', @fragment_fields, @exclude_fragment_fields).
") SELECT ".
join(',', @fragment_fields). ", " .
"start_time + ?,duration - ?,1 " .
"FROM accounting.cdr " .
"WHERE id = ? AND rating_status = 'unrated'"
) or FATAL "Error preparing create cdr fragment statement: ".$acctdbh->errstr;
}
$sth_get_cbalances = $billdbh->prepare(
"SELECT id, cash_balance, cash_balance_interval, ".
"free_time_balance, free_time_balance_interval, start, ".
"unix_timestamp(start) start_unix, ".
"unix_timestamp(end) end_unix ".
"FROM billing.contract_balances ".
"WHERE contract_id = ? AND ".
"end >= FROM_UNIXTIME(?) ORDER BY start ASC"
) or FATAL "Error preparing get contract balance statement: ".$billdbh->errstr;
$sth_new_cbalance = $billdbh->prepare(
"INSERT IGNORE INTO billing.contract_balances (".
" contract_id, cash_balance, initial_cash_balance, cash_balance_interval, free_time_balance, initial_free_time_balance, free_time_balance_interval, underrun_profiles, underrun_lock, start, end".
") VALUES (?, ?, ?, ?, ?, ?, ?, IF(? = 0, NULL, FROM_UNIXTIME(?)), IF(? = 0, NULL, FROM_UNIXTIME(?)), FROM_UNIXTIME(?), FROM_UNIXTIME(?))"
) or FATAL "Error preparing create contract balance statement: ".$billdbh->errstr;
$sth_new_cbalance_infinite_future = $billdbh->prepare(
"INSERT IGNORE INTO billing.contract_balances (".
" contract_id, cash_balance, initial_cash_balance, cash_balance_interval, free_time_balance, initial_free_time_balance, free_time_balance_interval, underrun_profiles, underrun_lock, start, end".
") VALUES (?, ?, ?, ?, ?, ?, ?, IF(? = 0, NULL, FROM_UNIXTIME(?)), IF(? = 0, NULL, FROM_UNIXTIME(?)), FROM_UNIXTIME(?), '9999-12-31 23:59:59')"
) or FATAL "Error preparing create contract balance statement: ".$billdbh->errstr;
$sth_update_cbalance_w_underrun_profiles_lock = $billdbh->prepare(
"UPDATE billing.contract_balances SET ".
"cash_balance = cash_balance + ?, cash_balance_interval = cash_balance_interval + ?, ".
"free_time_balance = free_time_balance + ?, free_time_balance_interval = free_time_balance_interval + ?, underrun_profiles = FROM_UNIXTIME(?), underrun_lock = FROM_UNIXTIME(?) ".
"WHERE id = ?"
) or FATAL "Error preparing update contract balance statement: ".$billdbh->errstr;
$sth_update_cbalance_w_underrun_lock = $billdbh->prepare(
"UPDATE billing.contract_balances SET ".
"cash_balance = cash_balance + ?, cash_balance_interval = cash_balance_interval + ?, ".
"free_time_balance = free_time_balance + ?, free_time_balance_interval = free_time_balance_interval + ?, underrun_lock = FROM_UNIXTIME(?) ".
"WHERE id = ?"
) or FATAL "Error preparing update contract balance statement: ".$billdbh->errstr;
$sth_update_cbalance_w_underrun_profiles = $billdbh->prepare(
"UPDATE billing.contract_balances SET ".
"cash_balance = cash_balance + ?, cash_balance_interval = cash_balance_interval + ?, ".
"free_time_balance = free_time_balance + ?, free_time_balance_interval = free_time_balance_interval + ?, underrun_profiles = FROM_UNIXTIME(?) ".
"WHERE id = ?"
) or FATAL "Error preparing update contract balance statement: ".$billdbh->errstr;
$sth_update_cbalance = $billdbh->prepare(
"UPDATE billing.contract_balances SET ".
"cash_balance = cash_balance + ?, cash_balance_interval = cash_balance_interval + ?, ".
"free_time_balance = free_time_balance + ?, free_time_balance_interval = free_time_balance_interval + ? ".
"WHERE id = ?"
) or FATAL "Error preparing update contract balance statement: ".$billdbh->errstr;
$sth_prepaid_costs_cache = $acctdbh->prepare(
"SELECT * FROM accounting.prepaid_costs order by timestamp asc" # newer entries overwrite older ones
) or FATAL "Error preparing prepaid costs cache statement: ".$acctdbh->errstr;
$sth_prepaid_costs_count = $acctdbh->prepare(
"SELECT count(cnt.id) FROM (SELECT id FROM accounting.prepaid_costs LIMIT " . ($prepaid_costs_cache_limit + 1) . ") AS cnt"
) or FATAL "Error preparing prepaid costs count statement: ".$acctdbh->errstr;
$sth_prepaid_cost = $acctdbh->prepare( #call_id index required
'SELECT * FROM accounting.prepaid_costs WHERE call_id = ? ' .
'AND source_user_id = ? AND destination_user_id = ?' .
'ORDER BY timestamp ASC' # newer entries overwrite older ones
) or FATAL "Error preparing prepaid cost statement: ".$acctdbh->errstr;
$sth_delete_prepaid_cost = $acctdbh->prepare( #call_id index required
'DELETE FROM accounting.prepaid_costs WHERE call_id = ? ' .
'AND source_user_id = ? AND destination_user_id = ?'
) or FATAL "Error preparing delete prepaid costs statement: ".$acctdbh->errstr;
$sth_delete_old_prepaid = $acctdbh->prepare(
"DELETE FROM accounting.prepaid_costs WHERE timestamp < DATE_SUB(NOW(), INTERVAL 7 DAY) LIMIT 10000"
) or FATAL "Error preparing delete old prepaid statement: ".$acctdbh->errstr;
$sth_get_billing_voip_subscribers = $billdbh->prepare(
"SELECT uuid FROM billing.voip_subscribers WHERE contract_id = ? AND status != 'terminated'"
) or FATAL "Error preparing get billing voip subscribers statement: ".$billdbh->errstr;
$sth_lock_billing_subscribers = $billdbh->prepare(
"UPDATE billing.voip_subscribers SET status = 'locked' WHERE contract_id = ? AND status = 'active'"
) or FATAL "Error preparing lock billing subscribers statement: ".$billdbh->errstr;
$sth_unlock_billing_subscribers = $billdbh->prepare(
"UPDATE billing.voip_subscribers SET status = 'active' WHERE contract_id = ? AND status = 'locked'"
) or FATAL "Error preparing lock billing subscribers statement: ".$billdbh->errstr;
if ($provdbh) {
$sth_get_provisioning_voip_subscribers = $provdbh->prepare(
"SELECT id FROM provisioning.voip_subscribers WHERE uuid = ?"
) or FATAL "Error preparing get provisioning voip subscribers statement: ".$provdbh->errstr;
$sth_get_usr_preference_attribute = $provdbh->prepare(
"SELECT id FROM provisioning.voip_preferences WHERE attribute = ? AND usr_pref = 1"
) or FATAL "Error preparing get usr preference attribute statement: ".$provdbh->errstr;
$sth_get_usr_preference_value = $provdbh->prepare(
"SELECT id,value FROM provisioning.voip_usr_preferences WHERE attribute_id = ? AND subscriber_id = ?"
) or FATAL "Error preparing get usr preference value statement: ".$provdbh->errstr;
$sth_create_usr_preference_value = $provdbh->prepare(
"INSERT INTO provisioning.voip_usr_preferences (subscriber_id, attribute_id, value) VALUES (?, ?, ?)"
) or FATAL "Error preparing create usr preference value statement: ".$provdbh->errstr;
$sth_update_usr_preference_value = $provdbh->prepare(
"UPDATE provisioning.voip_usr_preferences SET value = ? WHERE id = ?"
) or FATAL "Error preparing update usr preference value statement: ".$provdbh->errstr;
$sth_delete_usr_preference_value = $provdbh->prepare(
"DELETE FROM provisioning.voip_usr_preferences WHERE id = ?"
) or FATAL "Error preparing delete usr preference value statement: ".$provdbh->errstr;
}
prepare_cdr_col_models($acctdbh,
$acc_cash_balance_col_model_key,
$acc_time_balance_col_model_key,
$acc_relation_col_model_key,
$acc_tag_col_model_key,
'local');
if ($dupdbh) {
$sth_duplicate_cdr = $dupdbh->prepare(
'insert into cdr ('.
join(',', @cdr_fields).
') values ('.
join(',', (map {'?'} @cdr_fields)).
')'
) or FATAL "Error preparing duplicate_cdr statement: ".$dupdbh->errstr;
$sth_duplicate_mos_data = $dupdbh->prepare(
'insert into cdr_mos_data ('.
join(',', 'cdr_id',@mos_data_fields,'cdr_start_time').
') values (?,'.
join(',', (map {'?'} @mos_data_fields)).
',?) ON DUPLICATE KEY UPDATE ' . join(',',map { $_ . ' = ?'; } @mos_data_fields)
) or FATAL "Error preparing duplicate_mos_data statement: ".$dupdbh->errstr;
$sth_duplicate_upsert_cdr_period_costs = $dupdbh->prepare(
$upsert_cdr_period_costs_stmt
) or FATAL "Error preparing duplicate upsert cdr period costs statement: ".$dupdbh->errstr;
$sth_duplicate_get_cdr_period_costs = $dupdbh->prepare(
$get_cdr_period_costs_stmt
) or FATAL "Error preparing duplicate get cdr period costs statement: ".$dupdbh->errstr;
prepare_cdr_col_models($dupdbh,
$dup_cash_balance_col_model_key,
$dup_time_balance_col_model_key,
$dup_relation_col_model_key,
$dup_tag_col_model_key,
'duplication');
}
foreach (keys %cdr_col_models) {
init_cdr_col_model($_);
}
return 1;
}
sub prepare_cdr_col_models {
my $dbh = shift;
my $cash_balance_col_model_key = shift;
my $time_balance_col_model_key = shift;
my $relation_col_model_key = shift;
my $tag_col_model_key = shift;
my $description_prefix = shift;
prepare_cdr_col_model($dbh,$cash_balance_col_model_key,$description_prefix.' cdr cash balance column model',$description_prefix,
[ 'provider', 'direction', 'cash_balance' ], # avoid using Tie::IxHash
{
provider => {
sql => 'SELECT * FROM accounting.cdr_provider',
description => "get $description_prefix cdr provider cols",
},
direction => { # the name "direction" for "source" and "destination" is not ideal
sql => 'SELECT * FROM accounting.cdr_direction',
description => "get $description_prefix cdr direction cols",
},
cash_balance => {
sql => 'SELECT * FROM accounting.cdr_cash_balance',
description => "get $description_prefix cdr cash balance cols",
},
},{
sql => "INSERT INTO accounting.cdr_cash_balance_data".
" (cdr_id,cdr_start_time,provider_id,direction_id,cash_balance_id,val_before,val_after) VALUES".
" (?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE ".
"val_before = ?, val_after = ?",
description => "write $description_prefix cdr cash balance col data",
},{
sql => "SELECT val_before, val_after FROM accounting.cdr_cash_balance_data".
" WHERE cdr_id = ? AND provider_id = ? AND direction_id = ? AND cash_balance_id = ?",
description => "read $description_prefix cdr cash balance col data",
}
);
prepare_cdr_col_model($dbh,$time_balance_col_model_key,$description_prefix.' cdr time balance column model',$description_prefix,
[ 'provider', 'direction', 'time_balance' ],
{
provider => {
sql => 'SELECT * FROM accounting.cdr_provider',
description => "get $description_prefix cdr provider cols",
},
direction => {
sql => 'SELECT * FROM accounting.cdr_direction',
description => "get $description_prefix cdr direction cols",
},
time_balance => {
sql => 'SELECT * FROM accounting.cdr_time_balance',
description => "get $description_prefix cdr time balance cols",
},
},{
sql => "INSERT INTO accounting.cdr_time_balance_data".
" (cdr_id,cdr_start_time,provider_id,direction_id,time_balance_id,val_before,val_after) VALUES".
" (?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE ".
"val_before = ?, val_after = ?",
description => "write $description_prefix cdr time balance col data",
},{
sql => "SELECT val_before, val_after FROM accounting.cdr_time_balance_data".
" WHERE cdr_id = ? AND provider_id = ? AND direction_id = ? AND time_balance_id = ?",
description => "read $description_prefix cdr time balance col data",
}
);
prepare_cdr_col_model($dbh,$relation_col_model_key,$description_prefix.' cdr relation column model',$description_prefix,
[ 'provider', 'direction', 'relation' ],
{
provider => {
sql => 'SELECT * FROM accounting.cdr_provider',
description => "get $description_prefix cdr provider cols",
},
direction => {
sql => 'SELECT * FROM accounting.cdr_direction',
description => "get $description_prefix cdr direction cols",
},
relation => {
sql => 'SELECT * FROM accounting.cdr_relation',
description => "get $description_prefix relation cols",
},
},{
sql => "INSERT INTO accounting.cdr_relation_data".
" (cdr_id,cdr_start_time,provider_id,direction_id,relation_id,val) VALUES".
" (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE ".
"val = ?",
description => "write $description_prefix cdr relation col data",
},{
sql => "SELECT val FROM accounting.cdr_relation_data".
" WHERE cdr_id = ? AND provider_id = ? AND direction_id = ? AND relation_id = ?",
description => "read $description_prefix cdr relation col data",
}
);
prepare_cdr_col_model($dbh,$tag_col_model_key,$description_prefix.' cdr tag column model',$description_prefix,
[ 'provider', 'direction', 'tag' ],
{
provider => {
sql => 'SELECT * FROM accounting.cdr_provider',
description => "get $description_prefix cdr provider cols",
},
direction => {
sql => 'SELECT * FROM accounting.cdr_direction',
description => "get $description_prefix cdr direction cols",
},
tag => {
sql => 'SELECT * FROM accounting.cdr_tag',
description => "get $description_prefix tag cols",
},
},{
sql => "INSERT INTO accounting.cdr_tag_data".
" (cdr_id,cdr_start_time,provider_id,direction_id,tag_id,val) VALUES".
" (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE ".
"val = ?",
description => "write $description_prefix cdr tag col data",
},{
sql => "SELECT val FROM accounting.cdr_tag_data".
" WHERE cdr_id = ? AND provider_id = ? AND direction_id = ? AND tag_id = ?",
description => "read $description_prefix cdr tag col data",
}
);
}
sub lock_cdr {
my $cdr = shift;
my $sth = $sth_lock_cdr;
$sth->execute($cdr->{id})
or FATAL "Error executing cdr row lock selection statement: ".$sth->errstr;
my ($id,$rating_status) = $sth->fetchrow_array;
$sth->finish;
return $rating_status;
}
sub lock_contracts {
my $cdr = shift;
# we lock all contracts when rating a single CDR, which will
# eventually need a contract_balances catchup. that are up to 4.
# there must be a single 'for update' select statement to lock
# the contracts all at once, otherwise deadlock situations are
# guaranteed. this final lock statement must avoid joins, otherwise
# all rows of joined tables can get locked, since innodb poorly
# locks rows by touching an index value. to prepare the lock
# statement, we need to determine the 4 contract ids saparately
# before:
my %provider_cids = ();
# caller "provider" contract:
$provider_cids{$cdr->{source_provider_id}} = 1 if $cdr->{source_provider_id} ne "0";
# callee "provider" contract:
$provider_cids{$cdr->{destination_provider_id}} = 1 if $cdr->{destination_provider_id} ne "0";
my @pcids = keys %provider_cids;
my $pcid_count = scalar @pcids;
my $sth = undef;
my %lock_cids = ();
#if ($pcid_count > 0) {
# $sth = $billdbh->prepare("SELECT c.id from billing.contracts c ".
# "WHERE c.id IN (" . substr(',?' x $pcid_count,1) . ")")
# or FATAL "Error preparing contract row lock selection statement: ".$billdbh->errstr;
# $sth->execute(@pcids)
# or FATAL "Error executing contract row lock selection statement: ".$sth->errstr;
# while (my @res = $sth->fetchrow_array) {
# $lock_cids{$res[0]} = 1;
# }
# $sth->finish;
#}
my %user_ids = ();
# callee subscriber contract:
WARNING "empty source_user_id for CDR ID $cdr->{id}" unless length($cdr->{source_user_id}) > 0;
$user_ids{$cdr->{source_user_id}} = 1 if $cdr->{source_user_id} ne "0";
# (onnet) caller subscriber:
WARNING "empty destination_user_id for CDR ID $cdr->{id}" unless length($cdr->{destination_user_id}) > 0;
$user_ids{$cdr->{destination_user_id}} = 1 if $cdr->{destination_user_id} ne "0";
my @uuids = keys %user_ids;
my $uuid_count = scalar @uuids;
if ($uuid_count > 0) {
$sth = $billdbh->prepare("SELECT DISTINCT c.id from billing.contracts c ".
" JOIN billing.voip_subscribers s ON c.id = s.contract_id ".
"WHERE s.uuid IN (" . substr(',?' x $uuid_count,1) . ")")
or FATAL "Error preparing subscriber contract row lock selection statement: ".$billdbh->errstr;
$sth->execute(@uuids)
or FATAL "Error executing subscriber contract row lock selection statement: ".$sth->errstr;
while (my @res = $sth->fetchrow_array) {
$lock_cids{$res[0]} = 1;
}
$sth->finish;
}
my @cids = keys %lock_cids;
my $lock_count = scalar @cids;
if ($pcid_count > 0 or $lock_count > 0) {
push(@cids,@pcids);
$lock_count = scalar @cids;
@cids = sort { $a <=> $b } @cids; #"Access your tables and rows in a fixed order."
my $sth = $billdbh->prepare("SELECT c.id from billing.contracts c ".
"WHERE c.id IN (" . substr(',?' x $lock_count,1) . ") FOR UPDATE WAIT $lock_timeout")
or FATAL "Error preparing contract row lock statement: ".$billdbh->errstr;
#finally lock the contract rows at this point:
$sth->execute(@cids)
or FATAL "Error executing contract row lock statement: ".$sth->errstr;
$sth->finish;
DEBUG "$lock_count contract(s) locked: ".join(', ',@cids);
}
return $lock_count;
}
sub add_interval {
my ($unit,$count,$from_time,$align_eom_time,$src) = @_;
my $to_time;
my ($from_year,$from_month,$from_day,$from_hour,$from_minute,$from_second) = (localtime($from_time))[5,4,3,2,1,0];
if($unit eq "minute") {
$to_time = mktime($from_second,$from_minute + $count,$from_hour,$from_day,$from_month,$from_year);
} elsif($unit eq "hour") {
$to_time = mktime($from_second,$from_minute,$from_hour + $count,$from_day,$from_month,$from_year);
} elsif($unit eq "day") {
$to_time = mktime($from_second,$from_minute,$from_hour,$from_day + $count,$from_month,$from_year);
} elsif($unit eq "week") {
$to_time = mktime($from_second,$from_minute,$from_hour,$from_day + 7*$count,$from_month,$from_year);
} elsif($unit eq "month") {
$to_time = mktime($from_second,$from_minute,$from_hour,$from_day,$from_month + $count,$from_year);
#DateTime's "preserve" mode would get from 30.Jan to 30.Mar, when adding 2 months
#When adding 1 month two times, we get 28.Mar or 29.Mar, so we adjust:
if (defined $align_eom_time) {
my $align_eom_day = (localtime($align_eom_time))[3]; #local or not is irrelavant here
my $to_day = (localtime($to_time))[3]; #local or not is irrelavant here
if ($to_day > $align_eom_day
&& $from_day == last_day_of_month($from_time)) {
my $delta = last_day_of_month($align_eom_time) - $align_eom_day;
$to_day = last_day_of_month($to_time) - $delta;
$to_time = mktime($from_second,$from_minute,$from_hour,$to_day,$from_month,$from_year);
}
}
} else {
FATAL "Invalid interval unit '$unit' in $src";
}
return $to_time;
}
sub truncate_day {
my $t = shift;
my ($year,$month,$day,$hour,$minute,$second) = (localtime($t))[5,4,3,2,1,0];
return mktime(0,0,0,$day,$month,$year);
}
sub set_subscriber_first_int_attribute_value {
my $contract_id = shift;
my $new_value = shift;
my $attribute = shift;
my $readonly = shift;
my $changed = 0;
my $attr_id = undef;
my $sth;
unless ($sth_get_provisioning_voip_subscribers &&
$sth_get_usr_preference_attribute &&
$sth_get_usr_preference_value &&
$sth_create_usr_preference_value &&
$sth_update_usr_preference_value &&
$sth_delete_usr_preference_value) {
return $changed;
}
$sth_get_billing_voip_subscribers->execute($contract_id)
or FATAL "Error executing get billing voip subscribers statement: ".
$sth_get_billing_voip_subscribers->errstr;
while (my @res = $sth_get_billing_voip_subscribers->fetchrow_array) {
my $uuid = $res[0];
$sth = $sth_get_provisioning_voip_subscribers;
$sth->execute($uuid)
or FATAL "Error executing get provisioning voip subscribers statement: ".$sth->errstr;
my ($prov_subs_id) = $sth->fetchrow_array();
$sth->finish;
if (defined $prov_subs_id) {
unless (defined $attr_id) {
$sth = $sth_get_usr_preference_attribute;
$sth->execute($attribute)
or FATAL "Error executing get '$attribute' usr preference attribute statement: ".$sth->errstr;
($attr_id) = $sth->fetchrow_array();
$sth->finish;
FATAL "Cannot find '$attribute' usr preference attribute" unless defined $attr_id;
}
$sth = $sth_get_usr_preference_value;
$sth->execute($attr_id,$prov_subs_id)
or FATAL "Error executing get '$attribute' usr preference value statement: ".$sth->errstr;
my ($val_id,$old_value) = $sth->fetchrow_array();
$sth->finish;
undef $sth;
if (defined $val_id) {
if ($readonly) {
if ($old_value != $new_value) {
WARNING "'$attribute' usr preference value ID $val_id should be '$new_value' instead of '$old_value'";
} else {
DEBUG "'$attribute' usr preference value ID $val_id value '$new_value' is correct";
}
} else {
if ($new_value == 0) {
$sth = $sth_delete_usr_preference_value;
$sth->execute($val_id)
or FATAL "Error executing delete '$attribute' usr preference value statement: ".$sth->errstr;
$changed++;
INFO "'$attribute' usr preference value ID $val_id with value '$old_value' deleted";
} else {
$sth = $sth_update_usr_preference_value;
$sth->execute($new_value,$val_id)
or FATAL "Error executing update usr preference value statement: ".$sth->errstr;
$changed++;
INFO "'$attribute' usr preference value ID $val_id updated from old value '$old_value' to new value '$new_value'";
}
}
} elsif ($new_value > 0) {
if ($readonly) {
WARNING "creating '$attribute' usr preference value '$new_value' skipped for prov subscriber ID $prov_subs_id";
} else {
$sth = $sth_create_usr_preference_value;
$sth->execute($prov_subs_id,$attr_id,$new_value)
or FATAL "Error executing create usr preference value statement: ".$sth->errstr;
$changed++;
INFO "'$attribute' usr preference value ID ".$provdbh->{'mysql_insertid'}." with value '$new_value' created";
}
} else {
DEBUG "'$attribute' usr preference value does not exists and no value is to be set";
}
$sth->finish if $sth;
}
}
$sth_get_billing_voip_subscribers->finish;
return $changed;
}
sub set_subscriber_lock_level {
my $contract_id = shift;
my $lock_level = shift; #int
my $readonly = shift;
return set_subscriber_first_int_attribute_value($contract_id,$lock_level // 0,'lock',$readonly);
}
sub set_subscriber_status {
my $contract_id = shift;
my $lock_level = shift; #int
my $readonly = shift;
my $changed = 0;
my $sth;
if ($readonly) {
#todo: warn about billing subscriber discrepancies
} else {
if (defined $lock_level && $lock_level > 0) {
$sth = $sth_lock_billing_subscribers;
$changed = $sth->execute($contract_id);
if ($changed) {
DEBUG "status of $changed billing subscriber(s) set to 'locked'";
}
} else {
$sth = $sth_unlock_billing_subscribers;
$changed = $sth->execute($contract_id);
if ($changed) {
DEBUG "status of $changed billing subscriber(s) set to 'active'";
}
}
}
$sth->finish if $sth;
return $changed;
}
sub add_profile_mappings {
my $contract_id = shift;
my $stime = shift;
my $package_id = shift;
my $profiles = shift;
$billdbh->do("CALL billing.create_contract_billing_profile_network_from_package(?,?,?,?)",undef,$contract_id,$stime,$package_id,$profiles)
or FATAL "Error executing create billing mappings statement: ".$DBI::errstr;
}
sub add_period_costs {
my $dup = shift;
my $cdr_id = shift;
my $contract_id = shift;
my $stime = shift;
my $duration = shift;
my $billing_profile_id = shift;
my $customer_cost = shift;
my $reseller_cost = shift;
$sth_profile_fraud_info->execute($billing_profile_id)
or FATAL "Error executing profile fraud info statement: ".$sth_profile_fraud_info->errstr;
my ($profile_fraud_interval_limit,
$profile_fraud_daily_limit,
$profile_fraud_interval_lock,
$profile_fraud_daily_lock,
$fraud_use_reseller_rates) = $sth_profile_fraud_info->fetchrow_array();
$sth_contract_fraud_info->execute($contract_id)
or FATAL "Error executing contracts fraud info statement: ".$sth_contract_fraud_info->errstr;
my ($contract_fraud_interval_limit,
$contract_fraud_daily_limit,
$contract_fraud_interval_lock,
$contract_fraud_daily_lock) = $sth_contract_fraud_info->fetchrow_array();
my ($month_period_date,$day_period_date);
{
my ($y, $m, $d, $H, $M, $S) = (localtime(ceil($stime + $duration)))[5,4,3,2,1,0];
$y += 1900;
$m += 1;
$day_period_date = sprintf('%04d-%02d-%02d', $y, $m, $d);
$month_period_date = sprintf('%04d-%02d-01', $y, $m);
}
my $direction = "out";
my ($fraud_limit_type,$fraud_limit,$month_lock,$daily_lock);
my ($upsert_sth, $get_sth);
if ($dup) {
$upsert_sth = $sth_duplicate_upsert_cdr_period_costs;
$get_sth = $sth_duplicate_get_cdr_period_costs;
} else {
$upsert_sth = $sth_upsert_cdr_period_costs;
$get_sth = $sth_get_cdr_period_costs;
}
if (defined $contract_fraud_interval_limit and $contract_fraud_interval_limit > 0.0) {
$fraud_limit = $contract_fraud_interval_limit;
$fraud_limit_type = "contract";
$month_lock = $contract_fraud_interval_lock;
} elsif (defined $profile_fraud_interval_limit and $profile_fraud_interval_limit > 0.0) {
$fraud_limit = $profile_fraud_interval_limit;
$fraud_limit_type = "billing_profile";
$month_lock = $profile_fraud_interval_lock;
} else {
$fraud_limit = undef;
$fraud_limit_type = undef;
$month_lock = undef;
}
my @bind_params = ($contract_id,
"month",
$month_period_date,
$direction,
$customer_cost,
$reseller_cost,
$fraud_use_reseller_rates,
$reseller_cost, $fraud_limit,
$customer_cost, $fraud_limit,
$fraud_limit_type,
$stime,
$cdr_id,
$stime,
$cdr_id,
$fraud_use_reseller_rates,
$reseller_cost, $fraud_limit,
$customer_cost, $fraud_limit,
$customer_cost,
$reseller_cost,
$fraud_limit_type,
$stime,$stime,$cdr_id,$cdr_id,
$stime,$stime,$cdr_id,$cdr_id,
);
DEBUG sub { "month fraud check: ".(Dumper {
fraud_limit => $fraud_limit,
fraud_limit_type => $fraud_limit_type,
month_lock => $month_lock,
bind => \@bind_params,
}) };
$upsert_sth->execute(
@bind_params
) or FATAL "Error executing upsert cdr month period costs statement: ".$upsert_sth->errstr;
$get_sth->execute() or FATAL "Error executing get cdr day period costs statement: ".$get_sth->errstr;
my ($month_limit_exceeded,$month_customer_cost,$month_reseller_cost,$month_cdr_count) = $get_sth->fetchrow_array();
if ($month_limit_exceeded) {
INFO "contract ID $contract_id month period costs $month_customer_cost (customer), $month_reseller_cost (reseller) exceed $fraud_limit_type limit of $fraud_limit ($month_cdr_count cdrs)";
} else {
$month_lock = undef;
DEBUG "contract ID $contract_id month period costs $month_customer_cost (customer), $month_reseller_cost (reseller) ($month_cdr_count cdrs)";
}
if (defined $contract_fraud_daily_limit and $contract_fraud_daily_limit > 0.0) {
$fraud_limit = $contract_fraud_daily_limit;
$fraud_limit_type = "contract";
$daily_lock = $contract_fraud_daily_lock;
} elsif (defined $profile_fraud_daily_limit and $profile_fraud_daily_limit > 0.0) {
$fraud_limit = $profile_fraud_daily_limit;
$fraud_limit_type = "billing_profile";
$daily_lock = $profile_fraud_daily_lock;
} else {
$fraud_limit = undef;
$fraud_limit_type = undef;
$daily_lock = undef;
}
@bind_params = (
$contract_id,
"day",
$day_period_date,
$direction,
$customer_cost,
$reseller_cost,
$fraud_use_reseller_rates,
$reseller_cost, $fraud_limit,
$customer_cost, $fraud_limit,
$fraud_limit_type,
$stime,
$cdr_id,
$stime,
$cdr_id,
$fraud_use_reseller_rates,
$reseller_cost, $fraud_limit,
$customer_cost, $fraud_limit,
$customer_cost,
$reseller_cost,
$fraud_limit_type,
$stime,$stime,$cdr_id,$cdr_id,
$stime,$stime,$cdr_id,$cdr_id,
);
DEBUG sub { "day fraud check: ".(Dumper {
fraud_limit => $fraud_limit,
fraud_limit_type => $fraud_limit_type,
daily_lock => $daily_lock,
bind => \@bind_params,
}) };
$upsert_sth->execute(
@bind_params
) or FATAL "Error executing upsert cdr day period costs statement: ".$upsert_sth->errstr;
$get_sth->execute() or FATAL "Error executing get cdr day period costs statement: ".$get_sth->errstr;
my ($day_limit_exceeded,$day_customer_cost,$day_reseller_cost,$day_cdr_count) = $get_sth->fetchrow_array();
if ($day_limit_exceeded) {
INFO "contract ID $contract_id day period costs $day_customer_cost (customer), $day_reseller_cost (reseller) exceed $fraud_limit_type limit of $fraud_limit ($day_cdr_count cdrs)";
} else {
$daily_lock = undef;
DEBUG "contract ID $contract_id day period costs $day_customer_cost (customer), $day_reseller_cost (reseller) ($day_cdr_count cdrs)";
}
return $month_lock // $daily_lock;
}
sub get_notopup_expiration {
my $contract_id = shift;
my $last_start_time = shift;
my $notopup_discard_intervals = shift;
my $interval_unit = shift;
my $align_eom_time = shift;
my $package_id = shift;
my $sth;
my $notopup_expiration = undef;
my $last_topup_start_time;
my $last_topup_end_time;
if ($notopup_discard_intervals) { #get notopup_expiration:
if (defined $last_start_time) {
$last_topup_start_time = $last_start_time;
} else {
$sth = $sth_get_last_topup_cbalance;
$sth->execute($contract_id) or FATAL "Error executing get latest contract balance statement: ".$sth->errstr;
($last_topup_start_time,$last_topup_end_time) = $sth->fetchrow_array();
$sth->finish;
if (!$last_topup_start_time) {
$sth = $sth_get_first_cbalance;
$sth->execute($contract_id) or FATAL "Error executing get first contract balance statement: ".$sth->errstr;
($last_topup_start_time,$last_topup_end_time) = $sth->fetchrow_array();
$sth->finish;
}
if ($last_topup_start_time) {
if (!is_infinite_unix($last_topup_end_time)) {
$last_topup_start_time = $last_topup_end_time + 1;
}
}
}
if ($last_topup_start_time) {
$notopup_expiration = add_interval($interval_unit, $notopup_discard_intervals,
$last_topup_start_time, $align_eom_time, "package id " . $package_id);
}
}
return $notopup_expiration;
}
sub get_timely_end {
my $last_start_time = shift;
my $interval_value = shift;
my $interval_unit = shift;
#my $align_eom = shift;
my $carry_over_mode = shift;
my $package_id = shift;
my $timely_end_time = undef;
if ("carry_over_timely" eq $carry_over_mode) {
$timely_end_time = add_interval($interval_unit, $interval_value,
$last_start_time, undef, "package id " . $package_id);
$timely_end_time--;
}
return $timely_end_time;
}
sub catchup_contract_balance {
my $cdr = shift;
my $call_start_time = shift;
my $call_end_time = shift;
my $contract_id = shift;
my $r_package_info = shift;
DEBUG "catching up contract ID $contract_id balance rows";
my $sth = $sth_get_contract_info;
$sth->execute($contract_id) or FATAL "Error executing get info statement: ".$sth->errstr;
my ($create_time,$modify,$contact_reseller_id,$package_id,$interval_unit,$interval_value,
$start_mode,$carry_over_mode,$notopup_discard_intervals,$underrun_profile_threshold,
$underrun_lock_threshold,$underrun_lock_level,$underrun_profiles_count,$class) = $sth->fetchrow_array();
$sth->finish;
$create_time ||= $modify; #contract create_timestamp might be 0000-00-00 00:00:00
my $create_time_aligned;
my $has_package = defined $package_id && defined $contact_reseller_id;
if (!$has_package) { #backward-defaults
$start_mode = "1st";
$carry_over_mode = "carry_over";
}
RESTART_BALANCE_CATCHUP:
$sth = $sth_get_last_cbalance;
$sth->execute($contract_id) or FATAL "Error executing get latest contract balance statement: ".$sth->errstr;
my ($last_id,$last_start,$last_end,$last_cash_balance,$last_cash_balance_int,$last_free_balance,$last_free_balance_int,$last_topups,$last_timely_topups) = $sth->fetchrow_array();
$sth->finish;
my $last_profile = undef;
my $next_start;
my $profile;
my ($stime,$etime);
my $align_eom_time;
if (("create" eq $start_mode or "create_tz" eq $start_mode) && defined $create_time) {
$align_eom_time = $create_time;
} #no eom preserve, since we don't have the begin of the first topup interval
#} elsif ("topup_interval" eq $start_mode && defined x) {
# $align_eom_time = x;
#}
my $ratio;
my $old_free_cash;
my $cash_balance;
my $cash_balance_interval;
my $free_cash;
my $free_time;
my $free_time_balance;
my $free_time_balance_interval;
my $balances_count = 0;
my ($underrun_lock_applied,$underrun_profiles_applied) = (0,0);
my ($underrun_profiles_time,$underrun_lock_time) = (undef,undef);
my $notopup_expiration = 0;
my $timely_end = 0;
my $now = time;
my $bal;
while (defined $last_id && !is_infinite_unix($last_end) && $last_end < $call_end_time) {
$next_start = $last_end + 1;
if ($has_package && $balances_count == 0) {
#we have two queries here, so do it only if really creating contract_balances
$notopup_expiration = get_notopup_expiration($contract_id,undef,$notopup_discard_intervals,$interval_unit,$align_eom_time,$package_id);
}
#profile of last and next interval:
unless($last_profile) {
#no ip here - same as in panel: for now we assume that the profiles in a contracts'
#profile+network mapping schedule have the same free_time/free cash!
$last_profile = {};
get_billing_info($last_start < $create_time ? $create_time : $last_start, $contract_id, undef, $last_profile) or
FATAL "Error getting billing info for date '".($last_start < $create_time ? $create_time : $last_start)."' and contract_id $contract_id\n";
}
($underrun_profiles_time,$underrun_lock_time) = (undef,undef);
PREPARE_BALANCE_CATCHUP:
$profile = {};
get_billing_info($next_start, $contract_id, undef, $profile) or
FATAL "Error getting billing info for date '".$next_start."' and contract_id $contract_id\n";
#stime, etime:
$interval_unit = $has_package ? $interval_unit : ($profile->{int_unit} // 'month'); #backward-defaults
$interval_value = $has_package ? $interval_value : ($profile->{int_count} // 1);
$stime = $next_start;
if ("topup" eq $start_mode) {
$etime = undef;
} else {
$etime = add_interval($interval_unit, $interval_value, $next_start, $align_eom_time, $has_package ? "package id " . $package_id : "profile id " . $profile->{profile_id});
$etime--;
}
#balance values:
$cash_balance = 0;
if (("carry_over" eq $carry_over_mode || ("carry_over_timely" eq $carry_over_mode && $last_timely_topups > 0))
&& (!$notopup_expiration || $stime < $notopup_expiration)) {
$ratio = 1.0;
if($create_time > $last_start and $create_time < $last_end) {
$create_time_aligned = truncate_day($create_time);
$create_time_aligned = $create_time if $create_time_aligned < $last_start;
DEBUG sub { "last ratio = " . ($last_end + 1 - $create_time_aligned) . ' / ' . ($last_end + 1 - $last_start) . ", create_time = $create_time, create_time_aligned = $create_time_aligned"; };
$ratio = ($last_end + 1 - $create_time_aligned) / ($last_end + 1 - $last_start);
}
DEBUG "last ratio: $ratio";
#take the previous interval's (old) free cash, e.g. 5euro:
$old_free_cash = $ratio * ($last_profile->{int_free_cash} // 0.0);
#carry over the last cash balance value, e.g. 23euro:
$cash_balance = $last_cash_balance;
if ($last_cash_balance_int < $old_free_cash) {
# the customer didn't spent all of the the old free cash, but
# only e.g. 2euro overall. to get the raw balance, subtract the
# unused rest of the old free cash, e.g. -3euro.
DEBUG sub { "cash balance = $cash_balance, last_cash_balance_int = $last_cash_balance_int, old_free_cash = $old_free_cash"; };
$cash_balance += $last_cash_balance_int - $old_free_cash;
DEBUG sub { "free cash refill: " . (($last_cash_balance_int - $old_free_cash) + ($profile->{int_free_cash} // 0.0)); };
}
} else {
DEBUG "discarding cash balance (mode '$carry_over_mode'".($notopup_expiration ? ", notopup expiration " . $notopup_expiration : "").")";
}
$ratio = 1.0;
$free_cash = $ratio * ($profile->{int_free_cash} // 0.0); #backward-defaults
$cash_balance += $free_cash; #add new free cash
$cash_balance_interval = 0.0;
$free_time = $ratio * ($profile->{int_free_time} // 0);
$free_time_balance = $free_time; #just set free cash for now
$free_time_balance_interval = 0;
if (!$underrun_lock_applied && defined $underrun_lock_threshold && $last_cash_balance >= $underrun_lock_threshold && $cash_balance < $underrun_lock_threshold) {
$underrun_lock_applied = 1;
DEBUG "cash balance was decreased from $last_cash_balance to $cash_balance and dropped below underrun lock threshold $underrun_lock_threshold";
if (defined $underrun_lock_level) {
set_subscriber_lock_level($contract_id,$underrun_lock_level,0);
set_subscriber_status($contract_id,$underrun_lock_level,0);
$underrun_lock_time = $now;
}
}
if (!$underrun_profiles_applied && defined $underrun_profile_threshold && $last_cash_balance >= $underrun_profile_threshold && $cash_balance < $underrun_profile_threshold) {
$underrun_profiles_applied = 1;
DEBUG "cash balance was decreased from $last_cash_balance to $cash_balance and dropped below underrun profile threshold $underrun_profile_threshold";
if ($underrun_profiles_count > 0) {
add_profile_mappings($contract_id,$stime,$package_id,'underrun',0);
$underrun_profiles_time = $now;
goto PREPARE_BALANCE_CATCHUP;
}
}
#exec create statement:
$sth = (defined $etime ? $sth_new_cbalance : $sth_new_cbalance_infinite_future);
($last_cash_balance,$last_cash_balance_int,$last_free_balance,$last_free_balance_int) =
(truncate_cash_balance($cash_balance), truncate_cash_balance($cash_balance_interval),
truncate_free_time_balance($free_time_balance), truncate_free_time_balance($free_time_balance_interval));
my @bind_parms = ($contract_id,
($last_cash_balance) x 2,$last_cash_balance_int,($last_free_balance) x 2,$last_free_balance_int,
((defined $underrun_profiles_time ? $underrun_profiles_time : 0)) x 2,((defined $underrun_lock_time ? $underrun_lock_time : 0)) x 2,$stime);
push(@bind_parms,$etime) if defined $etime;
unless ($sth->execute(@bind_parms)
or FATAL "Error executing new contract balance statement: ".$sth->errstr) {
$sth->finish;
INFO "cash balance record ($contract_id, $stime) was created elsewhere, starting over";
goto RESTART_BALANCE_CATCHUP;
}
$sth->finish;
$balances_count++;
#reload the contract balance to have mysql's local timezone applied to $last_start, $last_end by UNIX_TIMESTAMP:
$sth = $sth_get_cbalance;
$sth->execute($billdbh->{'mysql_insertid'}) or FATAL "Error executing reload contract balance statement: ".$sth->errstr;
($last_id,$last_start,$last_end,$last_cash_balance,$last_cash_balance_int,$last_free_balance,$last_free_balance_int,$last_topups,$last_timely_topups) = $sth->fetchrow_array();
$sth->finish;
$bal = {
id => $last_id,
cash_balance => $last_cash_balance,
cash_balance_interval => $last_cash_balance_int,
free_time_balance => $last_free_balance,
free_time_balance_interval => $last_free_balance_int,
start_unix => $last_start,
end_unix => $last_end,
};
DEBUG sub { "contract balance created: ".(Dumper $bal) };
$last_profile = $profile;
}
# in case of "topup" or "topup_interval" start modes, the current interval end can be
# infinite and no new contract balances are created. for this infinite end interval,
# the interval start represents the time the last topup happened in case of "topup".
# in case of "topup_interval", the interval start represents the contract creation.
# the cash balance should be discarded when
# 1. the current/call time is later than than $notopup_discard_intervals periods
# after the interval start, or
# 2. we have the "carry_over_timely" mode, and the current/call time is beyond
# the timely end already
if ($has_package && defined $last_id && is_infinite_unix($last_end)) {
$notopup_expiration = get_notopup_expiration($contract_id,$last_start,$notopup_discard_intervals,$interval_unit,$align_eom_time,$package_id);
$timely_end = get_timely_end($last_start,$interval_value,$interval_unit,$carry_over_mode,$package_id);
if ((defined $notopup_expiration && $call_start_time >= $notopup_expiration)
|| (defined $timely_end && $call_start_time > $timely_end)) {
DEBUG "discarding cash balance (mode '$carry_over_mode'".($timely_end ? ", timely end " . $timely_end : "").
($notopup_expiration ? ", notopup expiration " . $notopup_expiration : "").")";
$bal = {
id => $last_id,
cash_balance => 0,
cash_balance_interval => $last_cash_balance_int,
free_time_balance => $last_free_balance,
free_time_balance_interval => $last_free_balance_int,
underrun_profile_time => undef,
underrun_lock_time => undef,
};
if (!$underrun_lock_applied && defined $underrun_lock_threshold && $last_cash_balance >= $underrun_lock_threshold && 0.0 < $underrun_lock_threshold) {
$underrun_lock_applied = 1;
DEBUG "cash balance was decreased from $last_cash_balance to 0 and dropped below underrun lock threshold $underrun_lock_threshold";
if (defined $underrun_lock_level) {
set_subscriber_lock_level($contract_id,$underrun_lock_level,0);
set_subscriber_status($contract_id,$underrun_lock_level,0);
$bal->{underrun_lock_time} = $now;
}
}
if (!$underrun_profiles_applied && defined $underrun_profile_threshold && $last_cash_balance >= $underrun_profile_threshold && 0.0 < $underrun_profile_threshold) {
$underrun_profiles_applied = 1;
DEBUG "cash balance was decreased from $last_cash_balance to 0 and dropped below underrun profile threshold $underrun_profile_threshold";
if ($underrun_profiles_count > 0) {
add_profile_mappings($contract_id,$call_start_time,$package_id,'underrun',0);
$underrun_profiles_time = $now;
$bal->{underrun_profile_time} = $now;
}
}
update_contract_balance($cdr,[$bal])
or FATAL "Error updating customer contract balance\n";
}
}
$r_package_info->{id} = $package_id;
$r_package_info->{class} = $class;
$r_package_info->{underrun_profile_threshold} = $underrun_profile_threshold;
$r_package_info->{underrun_lock_threshold} = $underrun_lock_threshold;
$r_package_info->{underrun_lock_level} = $underrun_lock_level;
$r_package_info->{underrun_lock_applied} = $underrun_lock_applied;
$r_package_info->{underrun_profiles_applied} = $underrun_profiles_applied;
$r_package_info->{underrun_profiles_count} = $underrun_profiles_count;
DEBUG "$balances_count contract balance rows created";
return $balances_count;
}
sub get_contract_balances {
my $cdr = shift;
my $contract_id = shift;
my $r_package_info = shift;
my $r_balances = shift;
my $start_time = $cdr->{start_time};
my $duration = $cdr->{duration};
catchup_contract_balance($cdr,int($start_time),int($start_time + $duration),$contract_id,$r_package_info);
my $sth = $sth_get_cbalances;
$sth->execute($contract_id, int($start_time))
or FATAL "Error executing get contract balance statement: ".$sth->errstr;
my $res = $sth->fetchall_arrayref({});
$sth->finish;
foreach my $bal (@$res) {
# restore balances & create balances savepoint:
$bal->{cash_balance} -= (get_balance_delta($cdr, $bal->{id}, "cash_balance") // 0.0);
$bal->{cash_balance_old} = $bal->{cash_balance};
$bal->{free_time_balance} -= (get_balance_delta($cdr, $bal->{id}, "free_time_balance") // 0);
$bal->{free_time_balance_old} = $bal->{free_time_balance};
$bal->{cash_balance_interval} -= (get_balance_delta($cdr, $bal->{id}, "cash_balance_interval") // 0.0);
$bal->{cash_balance_interval_old} = $bal->{cash_balance_interval};
$bal->{free_time_balance_interval} -= (get_balance_delta($cdr, $bal->{id}, "free_time_balance_interval") // 0);
$bal->{free_time_balance_interval_old} = $bal->{free_time_balance_interval};
push(@$r_balances,$bal);
}
return scalar @$res;
}
sub update_contract_balance {
my $cdr = shift;
my $r_balances = shift;
my $changed = 0;
for my $bal (@$r_balances) {
my @bind_parms = (
($bal->{cash_balance} // 0.0) - ($bal->{cash_balance_old} // 0.0),
$bal->{cash_balance_interval} - $bal->{cash_balance_interval_old},
($bal->{free_time_balance} // 0) - ($bal->{free_time_balance_old} // 0),
$bal->{free_time_balance_interval} - $bal->{free_time_balance_interval_old});
my $sth;
if (defined $bal->{underrun_profile_time} && defined $bal->{underrun_lock_time}) {
push(@bind_parms,$bal->{underrun_profile_time});
push(@bind_parms,$bal->{underrun_lock_time});
$sth = $sth_update_cbalance_w_underrun_profiles_lock;
} elsif (defined $bal->{underrun_profile_time}) {
push(@bind_parms,$bal->{underrun_profile_time});
$sth = $sth_update_cbalance_w_underrun_profiles;
} elsif (defined $bal->{underrun_lock_time}) {
push(@bind_parms,$bal->{underrun_lock_time});
$sth = $sth_update_cbalance_w_underrun_lock;
} else {
$sth = $sth_update_cbalance;
}
push(@bind_parms,$bal->{id});
$sth->execute(@bind_parms) or FATAL "Error executing update contract balance statement: ".$sth->errstr;
$sth->finish;
$changed++;
set_balance_delta($cdr, $bal->{id}, "cash_balance", ($bal->{cash_balance} // 0.0) - ($bal->{cash_balance_old} // 0.0));
set_balance_delta($cdr, $bal->{id}, "cash_balance_interval", $bal->{cash_balance_interval} - $bal->{cash_balance_interval_old});
set_balance_delta($cdr, $bal->{id}, "free_time_balance", ($bal->{free_time_balance} // 0) - ($bal->{free_time_balance_old} // 0));
set_balance_delta($cdr, $bal->{id}, "free_time_balance_interval", $bal->{free_time_balance_interval} - $bal->{free_time_balance_interval_old});
}
DEBUG $changed . " contract balance row(s) updated";
return 1;
}
sub get_subscriber_contract_id {
my $uuid = shift;
my $sth = $sth_get_subscriber_contract_id;
$sth->execute($uuid) or
FATAL "Error executing get_subscriber_contract_id statement: ".$sth->errstr;
my @res = $sth->fetchrow_array();
FATAL "No contract id found for uuid '$uuid'\n" unless @res;
return $res[0];
}
sub get_billing_info {
my $start = shift;
my $contract_id = shift;
my $source_ip = shift;
my $r_info = shift;
my $label;
my $sth;
if ($source_ip) {
$sth = $sth_billing_info_network;
$sth->execute($contract_id, $start, $source_ip) or
FATAL "Error executing billing info statement: ".$sth->errstr;
$label = " and address $source_ip";
} else {
$sth = $sth_billing_info;
$sth->execute($contract_id, $start) or
FATAL "Error executing billing info statement: ".$sth->errstr;
$label = "";
}
my @res = $sth->fetchrow_array();
FATAL "No billing info found for contract_id $contract_id\n" unless @res;
$r_info->{contract_id} = $contract_id;
$r_info->{profile_id} = $res[0];
$r_info->{prepaid} = $res[1];
$r_info->{int_charge} = $res[2];
$r_info->{int_free_time} = $res[3];
$r_info->{int_free_cash} = $res[4];
$r_info->{int_unit} = $res[5];
$r_info->{int_count} = $res[6];
$r_info->{ignore_domain} = $res[7];
DEBUG "contract ID $contract_id billing mapping is profile id $r_info->{profile_id} for time $start" . $label;
$sth->finish;
return 1;
}
sub get_profile_info {
my $bpid = shift;
my $type = shift;
my $direction = shift;
my $source = shift;
my $destination = shift;
my $lnp_number = shift; #force lnp fee lookup
my $b_info = shift;
my $start_time = shift;
my @res;
if (defined $lnp_number and $lnp_number =~ /^\d+$/) {
# let's see if we find the number in our LNP database
$sth_lnp_number->execute($lnp_number, $start_time)
or FATAL "Error executing LNP number statement: ".$sth_lnp_number->errstr;
my ($lnppid,$lnpnumbertype) = $sth_lnp_number->fetchrow_array();
if ($lnppid) {
# let's see if we have a billing fee entry for the LNP provider ID
$sth_lnp_profile_info->execute($bpid, $type, $direction, 'lnp:'.$lnppid)
or FATAL "Error executing LNP profile info statement: ".$sth_lnp_profile_info->errstr;
@res = $sth_lnp_profile_info->fetchrow_array();
FATAL "Error fetching LNP profile info: ".$sth_lnp_profile_info->errstr
if $sth_lnp_profile_info->err;
unless (@res) {
if (length($lnpnumbertype)) {
$sth_lnp_profile_info->execute($bpid, $type, $direction, 'lnpnumbertype:'.$lnpnumbertype)
or FATAL "Error executing LNP profile info statement: ".$sth_lnp_profile_info->errstr;
@res = $sth_lnp_profile_info->fetchrow_array();
FATAL "Error fetching LNP profile info: ".$sth_lnp_profile_info->errstr
if $sth_lnp_profile_info->err;
}
}
}
}
my $sth = $sth_profile_info;
unless (@res) {
$sth->execute($bpid, $type, $direction, $source, $destination)
or FATAL "Error executing profile info statement: ".$sth->errstr;
@res = $sth->fetchrow_array();
}
return 0 unless @res;
$b_info->{fee_id} = $res[0];
$b_info->{source_pattern} = $res[1];
$b_info->{pattern} = $res[2];
$b_info->{on_init_rate} = $res[3];
$b_info->{on_init_interval} = $res[4] == 0 ? 1 : $res[4]; # prevent loops
$b_info->{on_follow_rate} = $res[5];
$b_info->{on_follow_interval} = $res[6] == 0 ? 1 : $res[6];
$b_info->{off_init_rate} = $res[7];
$b_info->{off_init_interval} = $res[8] == 0 ? 1 : $res[8];
$b_info->{off_follow_rate} = $res[9];
$b_info->{off_follow_interval} = $res[10] == 0 ? 1 : $res[10];
$b_info->{zone_id} = $res[11];
$b_info->{off_use_free_time} = $res[12];
$b_info->{on_use_free_time} = $res[13];
$b_info->{on_extra_second} = $res[14];
$b_info->{on_extra_rate} = $res[15];
$b_info->{off_extra_second} = $res[16];
$b_info->{off_extra_rate} = $res[17];
$sth->finish;
return 1;
}
sub get_offpeak {
my $bpid = shift;
my $subscriber_contract_id = shift;
my $start = shift;
my $duration = shift;
my $r_offpeaks = shift;
my $sth;
if ($subscriber_offpeak_tz) {
$sth = $sth_offpeak_subscriber;
$sth->execute(
$subscriber_contract_id,$subscriber_contract_id,
$start,
$start,$duration,
$bpid,
$subscriber_contract_id,$subscriber_contract_id,
$bpid,
$start,$duration,
$start
) or FATAL "Error executing offpeak subscriber statement: ".$sth->errstr;
} else {
$sth = $sth_offpeak;
$sth->execute(
$start,
$start,$duration,
$bpid,
$bpid,
$start,$duration,
$start
) or FATAL "Error executing offpeak statement: ".$sth->errstr;
}
while(my @res = $sth->fetchrow_array())
{
my %e = ();
$e{start} = $res[0];
$e{end} = $res[1];
push @$r_offpeaks, \%e;
}
return 1;
}
sub is_offpeak {
my $start = shift;
my $offset = shift;
my $r_offpeaks = shift;
my $secs = $start + $offset; # we have unix-timestamp as reference
foreach my $r_o(@$r_offpeaks) {
return 1 if($secs >= $r_o->{start} && $secs <= $r_o->{end});
}
return 0;
}
sub get_start_time {
my $cdr = shift;
if ($cdr->{is_fragmented}) {
my $id;
while (($id) = get_cdr_col_data($acc_relation_col_model_key,$cdr->{id},
{ direction => 'source', provider => 'customer', relation => 'prev_fragment_id' })) {
$sth_get_cdr->execute($id) or FATAL "Error executing get cdr statement: ".$sth_get_cdr->errstr;
$cdr = $sth_get_cdr->fetchrow_hashref();
WARNING "missing cdr fragment ID $id" unless $cdr;
}
DEBUG "first cdr fragment ID is $id" if $id;
}
return $cdr->{start_time};
}
sub check_shutdown {
if ($shutdown) {
WARNING 'Shutdown detected, aborting work in progress';
return 1;
}
return 0;
}
sub get_unrated_cdrs {
my $r_cdrs = shift;
my $r_last_cdr_id = shift;
my @cdrs;
my $nodename;
my $sth = $sth_unrated_cdrs;
FETCH_CDRS:
$sth->execute($multi_master_stall ? 0 : $$r_last_cdr_id) or die("Error executing unrated cdr statement: ".$sth->errstr);
@cdrs = ();
$nodename = get_hostname();
#set to undef if corosync reports there is no other working node left:
#$nodename = undef;
my $cnt = 0;
while (my $cdr = $sth->fetchrow_hashref()) {
if (not $multi_master or not length($nodename) or $nodename eq 'spce') {
push(@cdrs,$cdr);
} elsif (substr($nodename,-1,1) eq '1' or substr($nodename,-1,1) eq 'a') {
push(@cdrs,$cdr) if (
(($cdr->{id} % 2) == 1
and ($cdr->{id} % 4) == 3)
or
(($cdr->{id} % 2) == 0
and ($cdr->{id} % 4) == 2)
);
} elsif (substr($nodename,-1,1) eq '2' or substr($nodename,-1,1) eq 'b') {
push(@cdrs,$cdr) if (
(($cdr->{id} % 2) == 1
and ($cdr->{id} % 4) == 1)
or
(($cdr->{id} % 2) == 0
and ($cdr->{id} % 4) == 0)
);
} else {
push(@cdrs,$cdr);
INFO "Unknown hostname '$nodename'";
}
check_shutdown() and return 0;
$cnt++;
$$r_last_cdr_id = $cdr->{id};
}
# the while above may have been interrupted because there is no
# data left, or because there was an error. To decide what
# happened, we have to query $sth->err()
die("Error fetching unrated cdr's: ". $sth->errstr) if $sth->err;
$sth->finish;
if ((not $multi_master_stall) and $cnt > 0 and (scalar @cdrs) == 0) {
goto FETCH_CDRS;
}
if ($shuffle_batch) {
# if concurrent rate-o-mat instances grab the same cdr batch, there
# can be a contention due to waits on same caller/callee contract
# lock attempts when they start processing the batch in the same order.
foreach my $cdr (shuffle @cdrs) {
push(@$r_cdrs,$cdr);
}
} else {
@$r_cdrs = @cdrs;
}
return 1;
}
sub get_balance_delta_field {
my $field = shift;
return unless $field;
return 'cb' if $field eq 'cash_balance';
return 'cbi' if $field eq 'cash_balance_interval';
return 'ftb' if $field eq 'free_time_balance';
return 'ftbi' if $field eq 'free_time_balance_interval';
}
sub get_balance_delta {
my $cdr = shift;
my $bal_id = shift;
my $field = shift;
unless ($cdr->{balance_delta_old}) {
($cdr->{balance_delta_old}) = get_cdr_col_data($acc_tag_col_model_key,$cdr->{id},
{ direction => 'source', provider => 'customer', tag => 'balance_delta' });
if ($cdr->{balance_delta_old}) {
my $deserialized = decode_json($cdr->{balance_delta_old});
$cdr->{balance_delta_old} = $deserialized;
}
$cdr->{balance_delta_old} //= {};
}
if ($bal_id and $field = get_balance_delta_field($field)
and exists $cdr->{balance_delta_old}->{$bal_id}
and exists $cdr->{balance_delta_old}->{$bal_id}->{$field}) {
return $cdr->{balance_delta_old}->{$bal_id}->{$field};
}
return;
}
sub set_balance_delta {
my $cdr = shift;
my $bal_id = shift;
my $field = shift;
my $val = shift;
return unless $val;
return unless $bal_id;
return unless $field = get_balance_delta_field($field);
unless ($cdr->{balance_delta}) {
$cdr->{balance_delta} = {};
}
unless ($cdr->{balance_delta}->{$bal_id}) {
$cdr->{balance_delta}->{$bal_id} = {};
}
$cdr->{balance_delta}->{$bal_id}->{$field} = $val;
}
sub save_balance_delta {
my $cdr = shift;
if ($cdr->{balance_delta}) {
my $serialized = encode_json($cdr->{balance_delta});
return write_cdr_col_data($acc_tag_col_model_key,$cdr,$cdr->{id},
{ direction => 'source', provider => 'customer', tag => 'balance_delta' }, $serialized);
}
return 0;
}
sub update_cdr {
my $cdr = shift;
$cdr->{rating_status} = 'ok';
$cdr->{rated_at} = sql_time(time());
my $sth = $sth_update_cdr;
$sth->execute(
$cdr->{source_carrier_cost}, $cdr->{source_reseller_cost}, $cdr->{source_customer_cost},
$cdr->{source_carrier_free_time}, $cdr->{source_reseller_free_time}, $cdr->{source_customer_free_time},
$cdr->{rated_at}, $cdr->{rating_status},
$cdr->{source_carrier_billing_fee_id}, $cdr->{source_reseller_billing_fee_id}, $cdr->{source_customer_billing_fee_id},
$cdr->{source_carrier_billing_zone_id}, $cdr->{source_reseller_billing_zone_id}, $cdr->{source_customer_billing_zone_id},
$cdr->{destination_carrier_cost}, $cdr->{destination_reseller_cost}, $cdr->{destination_customer_cost},
$cdr->{destination_carrier_free_time}, $cdr->{destination_reseller_free_time}, $cdr->{destination_customer_free_time},
$cdr->{destination_carrier_billing_fee_id}, $cdr->{destination_reseller_billing_fee_id}, $cdr->{destination_customer_billing_fee_id},
$cdr->{destination_carrier_billing_zone_id}, $cdr->{destination_reseller_billing_zone_id}, $cdr->{destination_customer_billing_zone_id},
$cdr->{frag_carrier_onpeak}, $cdr->{frag_reseller_onpeak}, $cdr->{frag_customer_onpeak},
$cdr->{is_fragmented} // 0, $cdr->{duration},
$cdr->{id})
or FATAL "Error executing update cdr statement: ".$sth->errstr;
if ($sth->rows > 0) {
DEBUG "cdr ID $cdr->{id} updated";
my $fraud_lock;
if (not $dupdbh
and $cdr->{source_account_id}) {
unless ($cdr->{source_customer_billing_profile_id}) {
my %billing_info = ();
get_billing_info($cdr->{start_time}, $cdr->{source_account_id}, $cdr->{source_ip}, \%billing_info) or
FATAL "Error getting source_customer billing info\n";
$cdr->{source_customer_billing_profile_id} = $billing_info{profile_id};
}
$fraud_lock = add_period_costs(0,
$cdr->{id},
$cdr->{source_account_id},
$cdr->{start_time},
$cdr->{duration},
$cdr->{source_customer_billing_profile_id},
-1.0 * ($cdr->{source_customer_cost_old} || 0.0) + $cdr->{source_customer_cost},
-1.0 * ($cdr->{source_reseller_cost_old} || 0.0) + $cdr->{source_reseller_cost},
) if $cdr->{source_customer_billing_profile_id};
}
write_cdr_cols($cdr,$cdr->{id},
$acc_cash_balance_col_model_key,
$acc_time_balance_col_model_key,
$acc_relation_col_model_key,
$acc_tag_col_model_key);
save_balance_delta($cdr);
if ($dupdbh) {
$sth_duplicate_cdr->execute(@$cdr{@cdr_fields})
or FATAL "Error executing duplicate cdr statement: ".$sth_duplicate_cdr->errstr;
my $dup_cdr_id = $dupdbh->{'mysql_insertid'};
if ($dup_cdr_id) {
DEBUG "local cdr ID $cdr->{id} was duplicated to duplication cdr ID $dup_cdr_id";
if ($cdr->{source_account_id}) {
unless ($cdr->{source_customer_billing_profile_id}) {
my %billing_info = ();
get_billing_info($cdr->{start_time}, $cdr->{source_account_id}, $cdr->{source_ip}, \%billing_info) or
FATAL "Error getting source_customer billing info\n";
$cdr->{source_customer_billing_profile_id} = $billing_info{profile_id};
}
$fraud_lock = add_period_costs(1,
$dup_cdr_id,
$cdr->{source_account_id},
$cdr->{start_time},
$cdr->{duration},
$cdr->{source_customer_billing_profile_id},
-1.0 * ($cdr->{source_customer_cost_old} || 0.0) + $cdr->{source_customer_cost},
-1.0 * ($cdr->{source_reseller_cost_old} || 0.0) + $cdr->{source_reseller_cost},
) if $cdr->{source_customer_billing_profile_id};
}
write_cdr_cols($cdr,$dup_cdr_id,
$dup_cash_balance_col_model_key,
$dup_time_balance_col_model_key,
$dup_relation_col_model_key,
$dup_tag_col_model_key);
copy_cdr_col_data($acc_tag_col_model_key,$dup_tag_col_model_key,$cdr,$cdr->{id},$dup_cdr_id,
{ direction => 'destination', provider => 'customer', tag => 'furnished_charging_info' });
copy_cdr_col_data($acc_tag_col_model_key,$dup_tag_col_model_key,$cdr,$cdr->{id},$dup_cdr_id,
{ direction => 'source', provider => 'customer', tag => 'header=P-Asserted-Identity' });
copy_cdr_col_data($acc_tag_col_model_key,$dup_tag_col_model_key,$cdr,$cdr->{id},$dup_cdr_id,
{ direction => 'source', provider => 'customer', tag => 'header=P-Preferred-Identity' });
copy_cdr_col_data($acc_tag_col_model_key,$dup_tag_col_model_key,$cdr,$cdr->{id},$dup_cdr_id,
{ direction => 'destination', provider => 'customer', tag => 'header=Diversion' });
copy_cdr_col_data($acc_tag_col_model_key,$dup_tag_col_model_key,$cdr,$cdr->{id},$dup_cdr_id,
{ direction => 'destination', provider => 'customer', tag => 'hg_ext_response' });
copy_cdr_mos_data($cdr,$cdr->{id},$dup_cdr_id);
} else {
FATAL "cdr ID $cdr->{id} and col data could not be duplicated";
}
}
if (defined $fraud_lock and $fraud_lock > 0) {
set_subscriber_lock_level($cdr->{source_account_id},$fraud_lock,not $apply_fraud_lock);
set_subscriber_status($cdr->{source_account_id},$fraud_lock,not $apply_fraud_lock);
}
} else {
$rollback = 1;
FATAL "cdr ID $cdr->{id} seems to be already processed by someone else";
}
return 1;
}
sub write_cdr_cols {
my $cdr = shift;
my $cdr_id = shift;
my $cash_balance_col_model_key = shift;
my $time_balance_col_model_key = shift;
my $relation_col_model_key = shift;
my $tag_col_model_key = shift;
foreach my $dir (('source', 'destination')) {
foreach my $provider (('carrier','reseller','customer')) {
write_cdr_col_data($cash_balance_col_model_key,$cdr,$cdr_id,
{ direction => $dir, provider => $provider, cash_balance => 'cash_balance' },
$cdr->{$dir.'_'.$provider."_cash_balance_before"},
$cdr->{$dir.'_'.$provider."_cash_balance_after"}) if $write_cash_balance_before_after;
write_cdr_col_data($time_balance_col_model_key,$cdr,$cdr_id,
{ direction => $dir, provider => $provider, time_balance => 'free_time_balance' },
$cdr->{$dir.'_'.$provider."_free_time_balance_before"},
$cdr->{$dir.'_'.$provider."_free_time_balance_after"}) if $write_free_time_balance_before_after;
write_cdr_col_data($relation_col_model_key,$cdr,$cdr_id,
{ direction => $dir, provider => $provider, relation => 'profile_package_id' },
$cdr->{$dir.'_'.$provider."_profile_package_id"}) if $write_profile_package_id;
write_cdr_col_data($relation_col_model_key,$cdr,$cdr_id,
{ direction => $dir, provider => $provider, relation => 'contract_balance_id' },
$cdr->{$dir.'_'.$provider."_contract_balance_id"}) if $write_contract_balance_id;
write_cdr_col_data($tag_col_model_key,$cdr,$cdr_id,
{ direction => $dir, provider => $provider, tag => 'extra_rate' },
$cdr->{$dir.'_'.$provider."_extra_rate"});
}
}
}
sub get_call_cost {
my $cdr = shift;
my $type = shift;
my $direction = shift;
my $contract_id = shift;
my $subscriber_contract_id = shift;
my $profile_id = shift;
my $ignore_domain = shift;
my $readonly = shift;
my $prepaid = shift;
my $r_profile_info = shift;
my $r_package_info = shift;
my $r_cost = shift;
my $r_real_cost = shift;
my $r_free_time = shift;
my $r_rating_duration = shift;
my $r_onpeak = shift;
my $r_extra_rate = shift;
my $r_balances = shift;
my $src_user;
if($offnet_anonymous_source_cli_fallback
and $cdr->{source_user_id} eq "0"
and $cdr->{source_cli} =~ /anonymous/i
and $cdr->{source_user} =~ /^[+ 0-9]+$/) {
$src_user = $cdr->{source_user};
} else {
$src_user = $cdr->{source_cli};
}
my $src_user_domain = $src_user.'@'.$cdr->{source_domain};
my $dst_user = $cdr->{destination_user_in};
my $dst_user_domain = $cdr->{destination_user_in}.'@'.$cdr->{destination_domain};
DEBUG "calculating call cost for profile_id $profile_id with type $type, direction $direction, ".
"src_user_domain $src_user_domain, dst_user_domain $dst_user_domain" unless $ignore_domain;
if($ignore_domain or not get_profile_info($profile_id, $type, $direction, $src_user_domain, $dst_user_domain, $dst_user,
$r_profile_info, $cdr->{start_time})) {
DEBUG "trying user only for profile_id $profile_id with type $type, direction $direction, ".
"src_user_domain $src_user, dst_user_domain $dst_user";
unless(get_profile_info($profile_id, $type, $direction, $src_user, $dst_user, undef,
$r_profile_info, $cdr->{start_time})) {
# we gracefully ignore missing profile infos for inbound direction
FATAL "No outbound fee info for profile $profile_id and ".
"source user '$src_user' or user/domain '$src_user_domain' and ".
"destination user '$dst_user' or user/domain '$dst_user_domain' ".
"found\n" if($direction eq "out");
$$r_cost = 0;
$$r_free_time = 0;
return 1;
}
}
$$r_rating_duration = 0; # ensure we start with zero length
DEBUG sub { "billing fee is ".(Dumper $r_profile_info) };
my @offpeak = ();
get_offpeak($profile_id, $subscriber_contract_id, $cdr->{_start_time},
$cdr->{start_time} - $cdr->{_start_time} + $cdr->{duration}, \@offpeak) or
FATAL "Error getting offpeak info\n";
DEBUG sub { "offpeak info: " . Dumper \@offpeak; };
$$r_cost = 0;
$$r_real_cost = 0;
$$r_free_time = 0;
my $interval = 0;
my $rate = 0;
my $offset = 0;
my $onpeak = 0;
my $init = $cdr->{is_fragmented} // 0;
my $extra_second;
my $extra_rate;
my $use_free_time;
if (is_offpeak($cdr->{_start_time}, 0, \@offpeak)) {
$extra_second = $r_profile_info->{off_extra_second};
$extra_rate = $r_profile_info->{off_extra_rate} // 0.0;
$use_free_time = $r_profile_info->{off_use_free_time};
} else {
$extra_second = $r_profile_info->{on_extra_second};
$extra_rate = $r_profile_info->{on_extra_rate} // 0.0;
$use_free_time = $r_profile_info->{on_use_free_time};
}
my $duration = (defined $cdr->{rating_duration} and $cdr->{rating_duration} < $cdr->{duration}) ? $cdr->{rating_duration} : $cdr->{duration};
my $prev_bal_id = undef;
my @cash_balance_rates = ();
my $prev_cash_balance = undef;
my $last_bal = undef;
my $cash_balance_rate_sum;
my ($underrun_lock_applied,$underrun_profiles_applied) = ($r_package_info->{underrun_lock_applied},$r_package_info->{underrun_profiles_applied});
my ($underrun_profiles_time,$underrun_lock_time) = (undef,undef);
my %bal_map = map { $_->{id} => $_; } @$r_balances;
if($duration == 0) { # zero duration call, yes these are possible
if(is_offpeak($cdr->{start_time}, $offset, \@offpeak)) {
$$r_onpeak = 0;
} else {
$$r_onpeak = 1;
}
}
while ($duration > 0) {
DEBUG "try to rate remaining duration of $duration secs";
if(is_offpeak($cdr->{start_time}, $offset, \@offpeak)) {
$onpeak = 0;
} else {
#print "offset $offset is onpeak\n";
$onpeak = 1;
}
unless($init) {
$init = 1;
$interval = $onpeak == 1 ?
$r_profile_info->{on_init_interval} : $r_profile_info->{off_init_interval};
$rate = $onpeak == 1 ?
$r_profile_info->{on_init_rate} : $r_profile_info->{off_init_rate};
DEBUG "add init rate $rate per sec to costs";
} else {
$interval = $onpeak == 1 ?
$r_profile_info->{on_follow_interval} : $r_profile_info->{off_follow_interval};
$rate = $onpeak == 1 ?
$r_profile_info->{on_follow_rate} : $r_profile_info->{off_follow_rate};
DEBUG "add follow rate $rate per sec to costs";
}
$$r_onpeak = $onpeak unless defined $$r_onpeak;
if ($split_peak_parts #break the cdr, if
and not defined $cdr->{rating_duration} #is the first attempt to calculate,
and defined($$r_onpeak) #it started with onpeak or offpeak in the first interval,
and $$r_onpeak != $onpeak) { #and switched onpeak/offpeak in the next interval
DEBUG (($$r_onpeak ? 'onpeak' : 'offpeak').' -> '.($onpeak ? 'onpeak' : 'offpeak').' transition, rating_duration = ' . $$r_rating_duration);
#$split = 1;
last;
}
$rate *= $interval;
DEBUG "interval is $interval, so rate for this interval is $rate";
#my @bals = grep {($_->{start_unix} + $offset) <= $cdr->{start_time}} @$r_balances;
my $current_call_time = int($cdr->{start_time} + $offset);
my @bals = grep {
$_->{start_unix} <= $current_call_time &&
(is_infinite_unix($_->{end_unix}) || $current_call_time <= $_->{end_unix})
} @$r_balances;
@bals or FATAL "No contract balance for CDR $cdr->{id} found";
WARNING "overlapping contract balances for CDR $cdr->{id} found: ".(Dumper \@bals) if (scalar @bals) > 1;
foreach my $bal (@bals) {
delete $bal_map{$bal->{id}};
}
@bals = @{ sort_contract_balances(\@bals) };
my $bal = $bals[0];
$last_bal = $bal;
if (defined $prev_bal_id) {
if ($bal->{id} != $prev_bal_id) { #contract balance transition
DEBUG sub { "next contract balance entered: ".(Dumper $bal) };
$prev_cash_balance = $bal->{cash_balance};
#carry over the costs so far:
$cash_balance_rate_sum = 0;
foreach my $cash_balance_rate (@cash_balance_rates) {
if ($cash_balance_rate <= $bal->{cash_balance}) {
$bal->{cash_balance} -= $cash_balance_rate;
$cash_balance_rate_sum += $cash_balance_rate;
}
}
DEBUG "carry over costs - rates of $cash_balance_rate_sum so far were subtracted from cash balance $prev_cash_balance";
$prev_bal_id = $bal->{id};
}
} else {
DEBUG sub { "starting with contract balance: ".(Dumper $bal) };
$prev_bal_id = $bal->{id};
$prev_cash_balance = $bal->{cash_balance};
}
if ($use_free_time && $bal->{free_time_balance} >= $interval) {
DEBUG "subtracting $interval sec from free_time_balance $$bal{free_time_balance} and skip costs for this interval";
$$r_rating_duration += $interval;
$duration -= $interval;
$bal->{free_time_balance} -= $interval;
$bal->{free_time_balance_interval} += $interval;
$$r_free_time += $interval;
next;
}
if ($use_free_time && $bal->{free_time_balance} > 0) {
DEBUG "using $$bal{free_time_balance} sec free time for this interval and calculate cost for remaining interval chunk";
$$r_free_time += $bal->{free_time_balance};
$$r_rating_duration += $bal->{free_time_balance};
$duration -= $bal->{free_time_balance};
$bal->{free_time_balance_interval} += $bal->{free_time_balance};
$rate *= 1.0 - ($bal->{free_time_balance} / $interval);
$interval -= $bal->{free_time_balance};
$bal->{free_time_balance} = 0;
DEBUG "calculate cost for remaining interval chunk $interval";
}
if (defined $extra_second) {
my $extra_second_time = int($cdr->{_start_time}) + $extra_second;
if ($extra_second_time >= $current_call_time
and $extra_second_time < ($current_call_time + $interval)
and ($current_call_time + int($duration)) >= $extra_second_time
and int($cdr->{start_time}) <= $extra_second_time) {
DEBUG "add extra second ($extra_second) cost $extra_rate to rate $rate";
$rate += $extra_rate;
$$r_extra_rate = $extra_rate;
undef $extra_second;
}
}
if (($rate > 0 || $prepaid) and $rate <= $bal->{cash_balance}) {
DEBUG "we still have cash balance $$bal{cash_balance} left, subtract rate $rate from that";
$bal->{cash_balance} -= $rate;
push(@cash_balance_rates,$rate);
} else {
DEBUG "add current interval cost $rate to total cost $$r_cost";
$$r_cost += $rate;
}
$bal->{cash_balance_interval} += $rate;
$$r_real_cost += $rate;
$duration -= $interval;
$$r_rating_duration += $interval;
$offset += $interval;
}
if (defined $cdr->{rating_duration} # we are in the second attempt,
and $cdr->{rating_duration} >= $cdr->{duration} # must be last, final fragment,
and $cdr->{rating_duration} > $$r_rating_duration) { # set $$r_rating_duration to the max rating duration, if its not
DEBUG "set rating_duration from $$r_rating_duration to rating_duration = $cdr->{rating_duration}";
$$r_rating_duration = $cdr->{rating_duration}; # will result in identical rating durations, and the fragment will pass.
} else {
DEBUG ("rating_duration = $$r_rating_duration");
}
if ((scalar @cash_balance_rates) > 0) {
my @remaining_bals = @{ sort_contract_balances([ values %bal_map ]) };
foreach my $bal (@remaining_bals) {
DEBUG sub { "remaining contract balance: ".(Dumper $bal) };
$last_bal = $bal;
$prev_cash_balance = $bal->{cash_balance};
$cash_balance_rate_sum = 0;
foreach my $cash_balance_rate (@cash_balance_rates) {
if ($cash_balance_rate <= $bal->{cash_balance}) {
$bal->{cash_balance} -= $cash_balance_rate;
$cash_balance_rate_sum += $cash_balance_rate;
}
}
DEBUG "carry over costs - rates of $cash_balance_rate_sum so far were subtracted from cash balance $prev_cash_balance";
}
}
if (defined $last_bal && defined $prev_cash_balance) {
my $now = time;
if (!$underrun_lock_applied && defined $r_package_info->{underrun_lock_threshold} && $prev_cash_balance >= $r_package_info->{underrun_lock_threshold} && $last_bal->{cash_balance} < $r_package_info->{underrun_lock_threshold}) {
$underrun_lock_applied = 1;
DEBUG "cash balance was decreased from $prev_cash_balance to $last_bal->{cash_balance} and dropped below underrun lock threshold $r_package_info->{underrun_lock_threshold}";
if (defined $r_package_info->{underrun_lock_level}) {
set_subscriber_lock_level($contract_id,$r_package_info->{underrun_lock_level},$readonly);
set_subscriber_status($contract_id,$r_package_info->{underrun_lock_level},$readonly);
$last_bal->{underrun_lock_time} = $now;
}
}
if (!$underrun_profiles_applied && defined $r_package_info->{underrun_profile_threshold} && $prev_cash_balance >= $r_package_info->{underrun_profile_threshold} && $last_bal->{cash_balance} < $r_package_info->{underrun_profile_threshold}) {
$underrun_profiles_applied = 1;
DEBUG "cash balance was decreased from $prev_cash_balance to $last_bal->{cash_balance} and dropped below underrun profile threshold $r_package_info->{underrun_profile_threshold}";
if (not $readonly and $r_package_info->{underrun_profiles_count} > 0) {
add_profile_mappings($contract_id,$cdr->{start_time} + $cdr->{duration},$r_package_info->{id},'underrun');
$last_bal->{underrun_profile_time} = $now;
}
}
}
return 1;
}
sub truncate_cash_balance {
return sprintf("%.4f",shift);
}
sub truncate_free_time_balance {
return sprintf("%.0f",shift);
}
sub sort_contract_balances {
my $balances = shift;
my $desc = shift;
$desc = ($desc ? -1 : 1);
my @bals = sort { ($a->{start_unix} <=> $b->{start_unix}) * $desc; } @$balances;
return \@bals;
}
sub get_prepaid {
my $cdr = shift;
my $billing_info = shift;
my $prefix = shift;
# strictly take it from the (scheduled) billing profile:
my $prepaid = (defined $billing_info ? $billing_info->{prepaid} : undef);
return $prepaid;
}
sub get_snapshot_contract_balance {
my $balances = shift;
return sort_contract_balances($balances)->[-1];
}
sub populate_prepaid_cost_cache {
if (!defined $prepaid_costs_cache) {
DEBUG "empty prepaid_costs cache, populate it";
$sth_prepaid_costs_count->execute()
or FATAL "Error executing get prepaid costs count statement: ".$sth_prepaid_costs_count->errstr;
my ($count) = $sth_prepaid_costs_count->fetchrow_array();
if ($count > $prepaid_costs_cache_limit) {
WARNING "over $prepaid_costs_cache_limit pending prepaid_costs records, too many to preload";
} else {
$prepaid_costs_cache = {};
$sth_prepaid_costs_cache->execute()
or FATAL "Error executing get prepaid costs cache statement: ".$sth_prepaid_costs_cache->errstr;
while (my $prepaid_cost = $sth_prepaid_costs_cache->fetchrow_hashref()) {
$prepaid_costs_cache->{$prepaid_cost->{call_id}} //= {};
my $map = $prepaid_costs_cache->{$prepaid_cost->{call_id}};
$map->{$prepaid_cost->{source_user_id}} //= {};
$map = $map->{$prepaid_cost->{source_user_id}};
if (exists $map->{$prepaid_cost->{destination_user_id}}) {
DEBUG "duplicate prepaid_costs call_id = $prepaid_cost->{call_id}, source_user_id = $prepaid_cost->{source_user_id}, destination_user_id = $prepaid_cost->{destination_user_id}";
}
$map->{$prepaid_cost->{destination_user_id}} = $prepaid_cost;
}
DEBUG "prepaid_costs cache populated, $count records";
return 1;
}
} else {
DEBUG "prepaid_costs cache already populated";
}
return 0;
}
sub clear_prepaid_cost_cache {
undef $prepaid_costs_cache;
}
sub get_prepaid_cost {
my $cdr = shift;
my $entry = undef;
my @call_ids = (
$cdr->{call_id},
$cdr->{call_id} . '_pbx-1',
);
if (defined $prepaid_costs_cache) {
foreach my $call_id (@call_ids) {
if (exists $prepaid_costs_cache->{$call_id}) {
my $map = $prepaid_costs_cache->{$call_id};
if (exists $map->{$cdr->{source_user_id}}) {
$map = $map->{$cdr->{source_user_id}};
if (exists $map->{$cdr->{destination_user_id}}) {
DEBUG "prepaid_costs call_id = $cdr->{call_id}, source_user_id = $cdr->{source_user_id}, destination_user_id = $cdr->{destination_user_id} found in cache";
$entry = $map->{$cdr->{destination_user_id}};
last;
}
}
}
}
} else {
foreach my $call_id (@call_ids) {
$sth_prepaid_cost->execute($call_id,$cdr->{source_user_id},$cdr->{destination_user_id})
or FATAL "Error executing get prepaid cost statement: ".$sth_prepaid_cost->errstr;
my $prepaid_cost = $sth_prepaid_cost->fetchall_hashref('destination_user_id');
if ($prepaid_cost && exists $prepaid_cost->{$cdr->{destination_user_id}}) {
DEBUG "prepaid cost record for call ID $cdr->{call_id} retrieved";
$entry = $prepaid_cost->{$cdr->{destination_user_id}};
#last;
}
}
}
return $entry;
}
sub drop_prepaid_cost {
my $entry = shift;
my $count = $sth_delete_prepaid_cost->execute($entry->{call_id},$entry->{source_user_id},$entry->{destination_user_id})
or FATAL "Error executing delete prepaid cost statement: ".$sth_delete_prepaid_cost->errstr;
if ($count > 1) {
WARNING "multiple prepaid_costs call_id = $entry->{call_id}, source_user_id = $entry->{source_user_id}, destination_user_id = $entry->{destination_user_id} deleted";
} elsif ($count == 1) {
DEBUG "prepaid_costs call_id = $entry->{call_id}, source_user_id = $entry->{source_user_id}, destination_user_id = $entry->{destination_user_id} deleted";
} elsif ($count == 1) {
WARNING "no prepaid_costs call_id = $entry->{call_id}, source_user_id = $entry->{source_user_id}, destination_user_id = $entry->{destination_user_id} deleted";
}
if (defined $prepaid_costs_cache) {
if (exists $prepaid_costs_cache->{$entry->{call_id}}) {
my $map = $prepaid_costs_cache->{$entry->{call_id}};
if (exists $map->{$entry->{source_user_id}}) {
$map = $map->{$entry->{source_user_id}};
if (exists $map->{$entry->{destination_user_id}}) {
delete $map->{$entry->{destination_user_id}};
my $empty = (scalar keys %$map) == 0;
$map = $prepaid_costs_cache->{$entry->{call_id}};
delete $map->{$entry->{source_user_id}} if $empty;
$empty = (scalar keys %$map) == 0;
delete $prepaid_costs_cache->{$entry->{call_id}} if $empty;
DEBUG "dropped prepaid_costs call_id = $entry->{call_id}, source_user_id = $entry->{source_user_id}, destination_user_id = $entry->{destination_user_id} from cache";
}
}
}
}
return $count;
}
sub prepare_cdr_col_model {
my $dbh = shift;
my $col_model_key = shift;
#print "prepare: $col_model_key\n";
my $model_description = shift;
my $description_prefix = shift;
my $dimensions = shift;
my $col_dimension_stmt_map = shift;
my $write_stmt = shift;
my $read_stmt = shift;
$cdr_col_models{$col_model_key} = {
description => $model_description,
description_prefix => $description_prefix,
};
my $model = $cdr_col_models{$col_model_key};
$model->{dimensions} = $dimensions;
my %col_dimension_map = ();
foreach my $dimension (@$dimensions) {
my $stmt = $col_dimension_stmt_map->{$dimension}->{sql};
my $description = $col_dimension_stmt_map->{$dimension}->{description};
my $get_col = { description => $description, };
$get_col->{sth} = $dbh->prepare($stmt)
or FATAL "Error preparing $description statement: ".$dbh->errstr;
$col_dimension_map{$dimension} = $get_col;
}
$model->{dimension_sths} = \%col_dimension_map;
$model->{write_sth} = { description => $write_stmt->{description}, };
$model->{write_sth}->{sth} = $dbh->prepare($write_stmt->{sql})
or FATAL "Error preparing ".$write_stmt->{description}." statement: ".$dbh->errstr;
$model->{read_sth} = { description => $read_stmt->{description}, };
$model->{read_sth}->{sth} = $dbh->prepare($read_stmt->{sql})
or FATAL "Error preparing ".$read_stmt->{description}." statement: ".$dbh->errstr;
}
sub init_cdr_col_model {
my $col_model_key = shift;
#print "init: $col_model_key\n";
FATAL "unknown column model key $col_model_key" unless exists $cdr_col_models{$col_model_key};
my $model = $cdr_col_models{$col_model_key};
$model->{dimension_dictionaries} = {};
foreach my $dimension (keys %{$model->{dimension_sths}}) {
my $sth = $model->{dimension_sths}->{$dimension}->{sth};
$sth->execute()
or FATAL "Error executing ".
$model->{dimension_sths}->{$dimension}->{description}
." statement: ".$sth->errstr;
$model->{dimension_dictionaries}->{$dimension} = $sth->fetchall_hashref('type');
$sth->finish;
}
INFO $model->{description} . " loaded\n";
}
sub write_cdr_col_data {
my $col_model_key = shift;
my $cdr = shift;
my $cdr_id = shift;
my $lookup = shift;
my @vals = @_;
FATAL "unknown column model key $col_model_key" unless exists $cdr_col_models{$col_model_key};
my $model = $cdr_col_models{$col_model_key};
my @bind_parms = ($cdr_id,$cdr->{start_time});
my $virtual_col_name = '';
foreach my $dimension (@{$model->{dimensions}}) {
my $dimension_value = $lookup->{$dimension};
unless ($dimension_value) {
FATAL "missing '$dimension' dimension for writing ".$model->{description_prefix}." col data of ".$model->{description};
}
my $dictionary = $model->{dimension_dictionaries}->{$dimension};
my $dimension_value_lookup = $dictionary->{$dimension_value};
unless ($dimension_value_lookup) {
FATAL "unknown '$dimension' col name '$dimension_value' for writing ".$model->{description_prefix}." col data of ".$model->{description};
}
push(@bind_parms,$dimension_value_lookup->{id});
$virtual_col_name .= '_' if length($virtual_col_name) > 0;
$virtual_col_name .= $lookup->{$dimension};
}
if ((scalar @vals) == 0 || (scalar grep { defined $_ } @vals) == 0) {
DEBUG "empty '$virtual_col_name' ".$model->{description_prefix}." col data for cdr id ".$cdr_id.', skipping';
return 0;
} else {
push(@bind_parms,@vals);
push(@bind_parms,@vals);
}
my $sth = $model->{write_sth}->{sth};
$sth->execute(@bind_parms)
or FATAL "Error executing ".
$model->{write_sth}->{description}
."statement: ".$sth->errstr;
if ($sth->rows == 1) {
DEBUG $model->{description_prefix}.' col data created or up to date for cdr id '.$cdr_id.", column '$virtual_col_name': ".join(', ',@vals);
} elsif ($sth->rows > 1) {
DEBUG $model->{description_prefix}.' col data updated for cdr id '.$cdr_id.", column '$virtual_col_name': ".join(', ',@vals);
#} else {
# DEBUG 'no '.$model->{description_prefix}.' col data written for cdr id '.$cdr_id.", column '$virtual_col_name': ".join(', ',@vals);
}
return $sth->rows;
}
sub get_cdr_col_data {
my $col_model_key = shift;
my $cdr_id = shift;
my $lookup = shift;
FATAL "unknown column model key $col_model_key" unless exists $cdr_col_models{$col_model_key};
my $model = $cdr_col_models{$col_model_key};
my @bind_parms = ($cdr_id);
my $virtual_col_name = '';
foreach my $dimension (@{$model->{dimensions}}) {
my $dimension_value = $lookup->{$dimension};
unless ($dimension_value) {
FATAL "missing '$dimension' dimension for writing ".$model->{description_prefix}." col data of ".$model->{description};
}
my $dictionary = $model->{dimension_dictionaries}->{$dimension};
my $dimension_value_lookup = $dictionary->{$dimension_value};
unless ($dimension_value_lookup) {
FATAL "unknown '$dimension' col name '$dimension_value' for reading ".$model->{description_prefix}." col data of ".$model->{description};
}
push(@bind_parms,$dimension_value_lookup->{id});
$virtual_col_name .= '_' if length($virtual_col_name) > 0;
$virtual_col_name .= $lookup->{$dimension};
}
my $sth = $model->{read_sth}->{sth};
$sth->execute(@bind_parms) or FATAL "Error executing ".$model->{read_sth}->{description}."statement: ".$sth->errstr;
my @vals = $sth->fetchrow_array;
return @vals;
}
sub copy_cdr_col_data {
my $src_col_model_key = shift;
my $dst_col_model_key = shift;
my $cdr = shift;
my $src_cdr_id = shift;
my $dst_cdr_id = shift;
my $lookup = shift;
my @vals = get_cdr_col_data($src_col_model_key,$src_cdr_id,$lookup);
return write_cdr_col_data($dst_col_model_key,$cdr,$dst_cdr_id,$lookup,@vals);
}
sub copy_cdr_mos_data {
my $cdr = shift;
my $src_cdr_id = shift;
my $dst_cdr_id = shift;
my $row_count = 0;
$sth_mos_data->execute($src_cdr_id) or FATAL "Error executing mos data statement: ".$sth_mos_data->errstr;
while (my $mos_data = $sth_mos_data->fetchrow_hashref()) {
my @bind_values = ($dst_cdr_id);
foreach my $mos_data_field (@mos_data_fields) {
push(@bind_values,$mos_data->{$mos_data_field});
}
push(@bind_values,$cdr->{start_time});
foreach my $mos_data_field (@mos_data_fields) {
push(@bind_values,$mos_data->{$mos_data_field});
}
$sth_duplicate_mos_data->execute(@bind_values) or FATAL "Error executing duplicate mos data statement: ".$sth_duplicate_mos_data->errstr;
if ($sth_duplicate_mos_data->rows == 1) {
DEBUG 'mos data created or up to date for cdr id '.$src_cdr_id.': '.join(', ',@bind_values);
} elsif ($sth_duplicate_mos_data->rows > 1) {
DEBUG 'mos data updated for cdr id '.$src_cdr_id.': '.join(', ',@bind_values);
}
$row_count += 1;
}
return $row_count;
}
sub get_hostname {
return '' unless length($hostname_filepath);
my $fh;
if (not open($fh, '<', $hostname_filepath)) {
DEBUG 'cannot open file ' . $hostname_filepath . ': ' . $!;
return '';
}
my @linebuffer = <$fh>;
close $fh;
my $hostname = $linebuffer[0];
chomp $hostname;
return $hostname;
}
sub get_customer_call_cost {
my $cdr = shift;
my $type = shift;
my $direction = shift;
my $readonly = shift;
my $r_cost = shift;
my $r_free_time = shift;
my $r_rating_duration = shift;
my $onpeak;
my $real_cost = 0;
my $extra_rate;
my $dir;
if($direction eq "out") {
$dir = "source_";
} else {
$dir = "destination_";
}
my $contract_id = get_subscriber_contract_id($cdr->{$dir."user_id"});
my $subscriber_contract_id = ('0' eq $cdr->{$dir."user_id"} ? $cdr->{$dir."provider_id"} : $contract_id);
my @balances = ();
my %package_info = ();
get_contract_balances($cdr, $contract_id, \%package_info, \@balances)
or FATAL "Error getting ".$dir."customer contract ID $contract_id balances\n";
my %billing_info = (); #profiles might have switched due to underrun while was carry over discarded
get_billing_info($cdr->{start_time}, $contract_id, $cdr->{source_ip}, \%billing_info) or
FATAL "Error getting ".$dir."customer billing info\n";
DEBUG sub { $dir."customer info is " . Dumper({
billing => \%billing_info,
package => \%package_info,
balances => \@balances,
})};
unless($billing_info{profile_id}) {
$$r_rating_duration = $cdr->{duration};
DEBUG "no billing info for ".$dir."customer contract ID $contract_id, skip";
return -1;
}
$cdr->{$dir."customer_billing_profile_id"} = $billing_info{profile_id};
my $prepaid = get_prepaid($cdr, \%billing_info, $dir.'user_');
my $outgoing_prepaid = ($prepaid == 1 && $direction eq "out");
my $prepaid_cost_entry = undef;
if ($outgoing_prepaid) {
DEBUG "billing profile is prepaid";
populate_prepaid_cost_cache();
$prepaid_cost_entry = get_prepaid_cost($cdr);
}
my %profile_info = ();
get_call_cost($cdr, $type, $direction,$contract_id,$subscriber_contract_id,
$billing_info{profile_id}, $billing_info{ignore_domain}, $readonly || ($outgoing_prepaid && defined $prepaid_cost_entry), $prepaid,
\%profile_info, \%package_info, $r_cost, \$real_cost, $r_free_time,
$r_rating_duration, \$onpeak, \$extra_rate, \@balances)
or FATAL "Error getting ".$dir."customer call cost\n";
DEBUG "got call cost $$r_cost and free time $$r_free_time";
my $snapshot_bal = get_snapshot_contract_balance(\@balances);
$cdr->{$dir."customer_cash_balance_before"} = $snapshot_bal->{cash_balance_old};
$cdr->{$dir."customer_free_time_balance_before"} = $snapshot_bal->{free_time_balance_old};
$cdr->{$dir."customer_cash_balance_after"} = $snapshot_bal->{cash_balance_old};
$cdr->{$dir."customer_free_time_balance_after"} = $snapshot_bal->{free_time_balance_old};
$cdr->{$dir."customer_profile_package_id"} = $package_info{id};
$cdr->{$dir."customer_contract_balance_id"} = $snapshot_bal->{id};
$cdr->{$dir."customer_billing_fee_id"} = $profile_info{fee_id};
$cdr->{$dir."customer_billing_zone_id"} = $profile_info{zone_id};
$cdr->{frag_customer_onpeak} = $onpeak if $split_peak_parts;
$cdr->{$dir."customer_extra_rate"} = $extra_rate;
if ($outgoing_prepaid) { #prepaid out
# overwrite the calculated costs with the ones from our table
if (defined $prepaid_cost_entry) {
$$r_cost = $prepaid_cost_entry->{cost}; #prepaid: update balance AND show full costs
$$r_free_time = $prepaid_cost_entry->{free_time_used};
drop_prepaid_cost($prepaid_cost_entry) unless $readonly;
# it would be more safe to add *_balance_before/after columns to the prepaid_costs table,
# instead of reconstructing the balance values:
$cdr->{$dir."customer_cash_balance_before"} = truncate_cash_balance($cdr->{$dir."customer_cash_balance_before"} * 1.0 + $prepaid_cost_entry->{cost});
$cdr->{$dir."customer_free_time_balance_before"} = truncate_free_time_balance($cdr->{$dir."customer_free_time_balance_before"} * 1.0 + $prepaid_cost_entry->{free_time_used});
} else {
# maybe another rateomat was faster and already processed+deleted it?
# in that case we should bail out here.
WARNING "no prepaid cost record found for call ID $cdr->{call_id}, applying calculated costs";
if ((not $readonly) and $prepaid_update_balance) {
update_contract_balance($cdr,\@balances)
or FATAL "Error updating ".$dir."customer contract balance\n";
}
$$r_cost = $real_cost; #prepaid: update balance AND show full costs
$cdr->{$dir."customer_cash_balance_after"} = $snapshot_bal->{cash_balance};
$cdr->{$dir."customer_free_time_balance_after"} = $snapshot_bal->{free_time_balance};
}
} else { #postpaid in, postpaid out, prepaid in
# we don't do prepaid for termination fees for now, so treat it as post-paid
if($prepaid == 1 && $direction eq "in") { #prepaid in
DEBUG "treat pre-paid billing profile as post-paid for termination fees";
$$r_cost = $real_cost; #prepaid: always update balance AND show full costs
} else { #postpaid in, postpaid out
DEBUG "billing profile is post-paid, update contract balance";
if ($use_customer_real_cost) {
$$r_cost = $real_cost;
}
}
unless ($readonly) {
update_contract_balance($cdr,\@balances)
or FATAL "Error updating ".$dir."customer contract balance\n";
}
$cdr->{$dir."customer_cash_balance_after"} = $snapshot_bal->{cash_balance};
$cdr->{$dir."customer_free_time_balance_after"} = $snapshot_bal->{free_time_balance};
}
DEBUG "cost for this call is $$r_cost";
return 1;
}
sub get_provider_call_cost {
my $cdr = shift;
my $type = shift;
my $direction = shift;
my $readonly = shift;
my $provider_info = shift;
my $r_cost = shift;
my $r_free_time = shift;
my $r_rating_duration = shift;
my $onpeak;
my $real_cost = 0;
my $extra_rate;
my $dir;
if($direction eq "out") {
$dir = "source_";
} else {
$dir = "destination_";
}
my $contract_id = $provider_info->{billing}->{contract_id};
my $subscriber_contract_id = ('0' eq $cdr->{$dir."user_id"} ? $cdr->{$dir."provider_id"} : get_subscriber_contract_id($cdr->{$dir."user_id"}));
unless($provider_info->{billing}->{profile_id}) {
$$r_rating_duration = $cdr->{duration};
DEBUG "no billing info for ".$dir."provider contract ID $contract_id, skip";
return -1;
}
my $provider_type;
if ($provider_info->{package}->{class} eq "reseller") {
$provider_type = "reseller_";
} else {
$provider_type = "carrier_";
}
my $prepaid = get_prepaid($cdr, $provider_info->{billing},$dir.'provider_');
my %profile_info = ();
get_call_cost($cdr, $type, $direction,$contract_id,$subscriber_contract_id,
$provider_info->{billing}->{profile_id}, $provider_info->{billing}->{ignore_domain}, $readonly || $prepaid, $prepaid, # no underruns for providers with prepaid profile
\%profile_info, $provider_info->{package}, $r_cost, \$real_cost, $r_free_time,
$r_rating_duration, \$onpeak, \$extra_rate, $provider_info->{balances})
or FATAL "Error getting ".$dir."provider call cost\n";
my $snapshot_bal = get_snapshot_contract_balance($provider_info->{balances});
$cdr->{$dir.$provider_type."package_id"} = $provider_info->{package}->{id};
$cdr->{$dir.$provider_type."contract_balance_id"} = $snapshot_bal->{id};
$cdr->{$dir.$provider_type."billing_fee_id"} = $profile_info{fee_id};
$cdr->{$dir.$provider_type."billing_zone_id"} = $profile_info{zone_id};
$cdr->{'frag_'.$provider_type.'onpeak'} = $onpeak if $split_peak_parts;
$cdr->{$dir.$provider_type."extra_rate"} = $extra_rate;
unless($prepaid == 1) {
$cdr->{$dir.$provider_type."cash_balance_before"} = $snapshot_bal->{cash_balance_old};
$cdr->{$dir.$provider_type."free_time_balance_before"} = $snapshot_bal->{free_time_balance_old};
$cdr->{$dir.$provider_type."cash_balance_after"} = $snapshot_bal->{cash_balance_old};
$cdr->{$dir.$provider_type."free_time_balance_after"} = $snapshot_bal->{free_time_balance_old};
unless ($readonly) {
update_contract_balance($cdr,$provider_info->{balances})
or FATAL "Error updating ".$dir.$provider_type."provider contract balance\n";
}
$cdr->{$dir.$provider_type."cash_balance_after"} = $snapshot_bal->{cash_balance};
$cdr->{$dir.$provider_type."free_time_balance_after"} = $snapshot_bal->{free_time_balance};
} else {
WARNING $dir.$provider_type."provider is prepaid\n";
# there are no prepaid cost records for providers, so we cannot
# restore the original balance and leave the fields empty
# no balance update for providers with prepaid profile
}
if ($use_provider_real_cost) {
$$r_cost = $real_cost;
}
return 1;
}
sub rate_cdr {
my $cdr = shift;
my $type = shift;
my $source_customer_cost = 0;
my $source_carrier_cost = 0;
my $source_reseller_cost = 0;
my $source_customer_free_time = 0;
my $source_carrier_free_time = 0;
my $source_reseller_free_time = 0;
my $destination_customer_cost = 0;
my $destination_carrier_cost = 0;
my $destination_reseller_cost = 0;
my $destination_customer_free_time = 0;
my $destination_carrier_free_time = 0;
my $destination_reseller_free_time = 0;
$cdr->{source_user_id} = '0' if lc($cdr->{source_user_id}) eq '<null>';
$cdr->{destination_user_id} = '0' if lc($cdr->{destination_user_id}) eq '<null>';
unless($cdr->{call_status} eq "ok") {
DEBUG "cdr #$$cdr{id} has call_status $$cdr{call_status}, skip.";
$cdr->{source_carrier_cost} = $source_carrier_cost;
$cdr->{source_reseller_cost} = $source_reseller_cost;
$cdr->{source_customer_cost} = $source_customer_cost;
$cdr->{source_carrier_free_time} = $source_carrier_free_time;
$cdr->{source_reseller_free_time} = $source_reseller_free_time;
$cdr->{source_customer_free_time} = $source_customer_free_time;
$cdr->{destination_carrier_cost} = $destination_carrier_cost;
$cdr->{destination_reseller_cost} = $destination_reseller_cost;
$cdr->{destination_customer_cost} = $destination_customer_cost;
$cdr->{destination_carrier_free_time} = $destination_carrier_free_time;
$cdr->{destination_reseller_free_time} = $destination_reseller_free_time;
$cdr->{destination_customer_free_time} = $destination_customer_free_time;
return 1;
}
DEBUG "fetching source provider info for source_provider_id #$$cdr{source_provider_id}";
my %source_provider_billing_info = ();
my %source_provider_package_info = ();
my @source_provider_balances = ();
if($cdr->{source_provider_id} eq "0") {
WARNING "Missing source_provider_id for source_user_id ".$cdr->{source_user_id}." in cdr #".$cdr->{id}."\n";
} else {
# we have to catchup balances at this point before getting the profile, since underrun profiles could get applied:
get_contract_balances($cdr, $cdr->{source_provider_id}, \%source_provider_package_info, \@source_provider_balances)
or FATAL "Error getting source provider contract ID $cdr->{source_provider_id} balances\n";
get_billing_info($cdr->{start_time}, $cdr->{source_provider_id}, $cdr->{source_ip}, \%source_provider_billing_info)
or FATAL "Error getting source provider billing info for cdr #".$cdr->{id}."\n";
}
my $source_provider_info = {
billing => \%source_provider_billing_info,
package => \%source_provider_package_info,
balances => \@source_provider_balances,
};
DEBUG sub { "source_provider_info is ".(Dumper $source_provider_info) };
#unless($source_provider_billing_info{profile_info}) {
# FATAL "Missing billing profile for source_provider_id ".$cdr->{source_provider_id}." for cdr #".$cdr->{id}."\n";
#}
DEBUG "fetching destination provider info for destination_provider_id #$$cdr{destination_provider_id}";
my %destination_provider_billing_info = ();
my %destination_provider_package_info = ();
my @destination_provider_balances = ();
if($cdr->{destination_provider_id} eq "0") {
WARNING "Missing destination_provider_id for destination_user_id ".$cdr->{destination_user_id}." in cdr #".$cdr->{id}."\n";
} else {
# we have to catchup balances at this point before getting the profile, since underrun profiles could get applied:
get_contract_balances($cdr, $cdr->{destination_provider_id}, \%destination_provider_package_info, \@destination_provider_balances)
or FATAL "Error getting destination provider contract ID $cdr->{destination_provider_id} balances\n";
get_billing_info($cdr->{start_time}, $cdr->{destination_provider_id}, $cdr->{source_ip}, \%destination_provider_billing_info)
or FATAL "Error getting destination provider billing info for cdr #".$cdr->{id}."\n";
}
my $destination_provider_info = {
billing => \%destination_provider_billing_info,
package => \%destination_provider_package_info,
balances => \@destination_provider_balances,
};
DEBUG sub { "destination_provider_info is ".(Dumper $destination_provider_info) };
$cdr->{_start_time} = get_start_time($cdr);
my @rating_durations;
my $rating_attempts = 0;
my $readonly;
$cdr->{rating_duration} = undef;
RATING_DURATION_FOUND:
$rating_attempts += 1;
@rating_durations = ();
$source_customer_cost = 0;
$source_carrier_cost = 0;
$source_reseller_cost = 0;
$source_customer_free_time = 0;
$source_carrier_free_time = 0;
$source_reseller_free_time = 0;
$destination_customer_cost = 0;
$destination_carrier_cost = 0;
$destination_reseller_cost = 0;
$destination_customer_free_time = 0;
$destination_carrier_free_time = 0;
$destination_reseller_free_time = 0;
$source_provider_info->{balances} = dclone(\@source_provider_balances);
$destination_provider_info->{balances} = dclone(\@destination_provider_balances);
$readonly = ($split_peak_parts ? ($rating_attempts == 1) : 0);
if ($readonly) {
DEBUG "### $rating_attempts. readonly pass ###";
} else {
DEBUG "### $rating_attempts. write pass ###";
}
#unless($destination_provider_billing_info{profile_info}) {
# FATAL "Missing billing profile for destination_provider_id ".$cdr->{destination_provider_id}." for cdr #".$cdr->{id}."\n";
#}
# call from local subscriber
if($cdr->{source_user_id} ne "0") {
DEBUG "call from local subscriber, source_user_id is $$cdr{source_user_id}";
# if we have a call from local subscriber, the source provider MUST be a reseller
if($source_provider_billing_info{profile_id} && $source_provider_package_info{class} ne "reseller") {
FATAL "The local source_user_id ".$cdr->{source_user_id}." has a source_provider_id ".$cdr->{source_provider_id}.
" which is not a reseller in cdr #".$cdr->{id}."\n";
}
if($cdr->{destination_user_id} ne "0") {
DEBUG "call to local subscriber, destination_user_id is $$cdr{destination_user_id}";
# call to local subscriber (on-net)
# there is no carrier cost for on-net calls
# for calls towards a local user, termination fees might apply if
# we find a fee with direction "in"
if($destination_provider_billing_info{profile_id}) {
DEBUG "destination provider has billing profile $destination_provider_billing_info{profile_id}, get reseller termination cost";
get_provider_call_cost($cdr, $type, "in", $readonly,
$destination_provider_info, \$destination_reseller_cost, \$destination_reseller_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting destination reseller cost for local destination_provider_id ".
$cdr->{destination_provider_id}." for cdr ".$cdr->{id}."\n";
DEBUG "destination reseller termination cost is $destination_reseller_cost";
} else {
# up to 2.8, there is one hardcoded reseller id 1, which doesn't have a billing profile, so skip this step here.
# in theory, all resellers MUST have a billing profile, so we could bail out here
DEBUG "destination provider $$cdr{destination_provider_id} has no billing profile, skip reseller termination cost";
}
DEBUG "get customer termination cost for destination_user_id $$cdr{destination_user_id}";
get_customer_call_cost($cdr, $type, "in", $readonly,
\$destination_customer_cost, \$destination_customer_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting destination customer cost for local destination_user_id ".
$cdr->{destination_user_id}." for cdr ".$cdr->{id}."\n";
DEBUG "destination customer termination cost is $destination_customer_cost";
} else {
# we can't charge termination fees to the callee if it's not local
# for the carrier cost, we use the destination billing profile of a peer
# (this is what the peering provider is charging the carrier)
if($destination_provider_billing_info{profile_id}) {
DEBUG sub { "fetching source_carrier_cost based on destination_provider_billing_info ".(Dumper \%destination_provider_billing_info) };
get_provider_call_cost($cdr, $type, "out", $readonly,
$destination_provider_info, \$source_carrier_cost, \$source_carrier_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting source carrier cost for cdr ".$cdr->{id}."\n";
} else {
WARNING "missing destination profile, so we can't calculate source_carrier_cost for destination_provider_billing_info ".(Dumper \%destination_provider_billing_info);
}
}
# get reseller cost
if($source_provider_billing_info{profile_id}) {
get_provider_call_cost($cdr, $type, "out", $readonly,
$source_provider_info, \$source_reseller_cost, \$source_reseller_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting source reseller cost for cdr ".$cdr->{id}."\n";
} else {
# up to 2.8, there is one hardcoded reseller id 1, which doesn't have a billing profile, so skip this step here.
# in theory, all resellers MUST have a billing profile, so we could bail out here
}
# get customer cost
get_customer_call_cost($cdr, $type, "out", $readonly,
\$source_customer_cost, \$source_customer_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting source customer cost for local source_user_id ".
$cdr->{source_user_id}." for cdr ".$cdr->{id}."\n";
} else {
# call from a foreign caller
# in this case, termination fees for the callee might still apply
if($cdr->{destination_user_id} ne "0") {
# call to local subscriber
# for calls towards a local user, termination fees might apply if
# we find a fee with direction "in"
# we use the source provider info (the one of the peer) for the carrier termination fees,
# as this is what the peer is charging us
if($source_provider_billing_info{profile_id}) {
DEBUG sub { "fetching destination_carrier_cost based on source_provider_billing_info ".(Dumper \%source_provider_billing_info) };
get_provider_call_cost($cdr, $type, "in", $readonly,
$source_provider_info, \$destination_carrier_cost, \$destination_carrier_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting destination carrier cost for local destination_provider_id ".
$cdr->{destination_provider_id}." for cdr ".$cdr->{id}."\n";
} else {
WARNING "missing source profile, so we can't calculate destination_carrier_cost for source_provider_billing_info ".(Dumper \%source_provider_billing_info);
}
if($destination_provider_billing_info{profile_id}) {
DEBUG sub { "fetching destination_reseller_cost based on source_provider_billing_info ".(Dumper \%destination_provider_billing_info) };
get_provider_call_cost($cdr, $type, "in", $readonly,
$destination_provider_info, \$destination_reseller_cost, \$destination_reseller_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting destination reseller cost for local destination_provider_id ".
$cdr->{destination_provider_id}." for cdr ".$cdr->{id}."\n";
} else {
# up to 2.8, there is one hardcoded reseller id 1, which doesn't have a billing profile, so skip this step here.
# in theory, all resellers MUST have a billing profile, so we could bail out here
WARNING "missing destination profile, so we can't calculate destination_reseller_cost for destination_provider_billing_info ".(Dumper \%destination_provider_billing_info);
}
get_customer_call_cost($cdr, $type, "in", $readonly,
\$destination_customer_cost, \$destination_customer_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting destination customer cost for local destination_user_id ".
$cdr->{destination_user_id}." for cdr ".$cdr->{id}."\n";
} else {
if($source_provider_billing_info{profile_id}) {
DEBUG sub { "fetching destination_carrier_cost based on source_provider_billing_info ".(Dumper \%source_provider_billing_info) };
get_provider_call_cost($cdr, $type, "in", $readonly,
$source_provider_info, \$destination_carrier_cost, \$destination_carrier_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting destination carrier cost for local destination_provider_id ".
$cdr->{destination_provider_id}." for cdr ".$cdr->{id}."\n";
} else {
WARNING "missing source profile, so we can't calculate destination_carrier_cost for source_provider_billing_info ".(Dumper \%source_provider_billing_info);
}
if($destination_provider_billing_info{profile_id}) {
DEBUG sub { "fetching source_carrier_cost based on destination_provider_billing_info ".(Dumper \%destination_provider_billing_info) };
get_provider_call_cost($cdr, $type, "out", $readonly,
$destination_provider_info, \$source_carrier_cost, \$source_carrier_free_time,
\$rating_durations[@rating_durations])
or FATAL "Error getting source carrier cost for cdr ".$cdr->{id}."\n";
} else {
WARNING "missing destination profile, so we can't calculate source_carrier_cost for destination_provider_billing_info ".(Dumper \%destination_provider_billing_info);
}
}
}
if ($split_peak_parts) {
# We require the onpeak/offpeak thresholds to be the same for all rating fee profiles used by any
# one particular CDR, so that CDR fragmentations are uniform across customer/carrier/reseller/etc
# entries. Mismatching onpeak/offpeak thresholds are a fatal error (which also results in a
# transaction rollback).
my %rating_durations;
for my $rd (@rating_durations) {
if (defined($rd)) {
$rating_durations{$rd} = 1;
$cdr->{rating_duration} //= 0;
$cdr->{rating_duration} = $rd if $rd > $cdr->{rating_duration};
}
}
if (scalar(keys(%rating_durations)) > 1) {
DEBUG 'Inconsistent rating fragment durations '.join(', ',keys(%rating_durations))." for cdr ID $cdr->{id}";
if ($rating_attempts > 1) {
FATAL "Error getting consistent rating fragment for cdr ".$cdr->{id}.". Rating profiles don't match.";
} else {
DEBUG 'trying again';
goto RATING_DURATION_FOUND;
}
} elsif ($rating_attempts == 1) { # coherent rating durations on first attempt
goto RATING_DURATION_FOUND; # just do it again to write stuff
}
my $rating_duration = (keys(%rating_durations))[0] // $cdr->{duration};
if ($rating_duration < $cdr->{duration}) {
my $sth = $sth_create_cdr_fragment; # start_time is advanced, duration decreased
$sth->execute($rating_duration, $rating_duration, $cdr->{id})
or FATAL "Error executing create cdr fragment statement: ".$sth->errstr;
if ($sth->rows > 0) {
DEBUG "New rating fragment CDR with ".($cdr->{duration} - $rating_duration)." secs duration created from cdr ID $cdr->{id}";
write_cdr_col_data($acc_relation_col_model_key,$cdr,$acctdbh->{'mysql_insertid'},
{ direction => 'source', provider => 'customer', relation => 'prev_fragment_id' },
$cdr->{id});
} else {
$rollback = 1;
FATAL "cdr ID $cdr->{id} seems to be already processed by someone else";
}
$cdr->{is_fragmented} = 1;
$cdr->{duration} = $rating_duration;
}
}
$cdr->{source_carrier_cost} = $source_carrier_cost;
$cdr->{source_reseller_cost_old} = $cdr->{source_reseller_cost};
$cdr->{source_reseller_cost} = $source_reseller_cost;
$cdr->{source_customer_cost_old} = $cdr->{source_customer_cost};
$cdr->{source_customer_cost} = $source_customer_cost;
$cdr->{source_carrier_free_time} = $source_carrier_free_time;
$cdr->{source_reseller_free_time} = $source_reseller_free_time;
$cdr->{source_customer_free_time} = $source_customer_free_time;
$cdr->{destination_carrier_cost} = $destination_carrier_cost;
$cdr->{destination_reseller_cost} = $destination_reseller_cost;
$cdr->{destination_customer_cost} = $destination_customer_cost;
$cdr->{destination_carrier_free_time} = $destination_carrier_free_time;
$cdr->{destination_reseller_free_time} = $destination_reseller_free_time;
$cdr->{destination_customer_free_time} = $destination_customer_free_time;
return 1;
}
sub create_pidfile {
my $pidfh;
open $pidfh, '>>', $pidfile or FATAL "Can't open '$pidfile' for writing: $!\n";
flock($pidfh, LOCK_EX | LOCK_NB) or FATAL "Unable to lock pidfile '$pidfile': $!\n";
return $pidfh;
}
sub write_pidfile {
my $pidfh = shift;
seek $pidfh, 0, SEEK_SET;
truncate $pidfh, 0;
printflush $pidfh "$$\n";
}
sub notify_send {
my $message = shift;
if ($ENV{NOTIFY_SOCKET}) {
my $addr = $ENV{NOTIFY_SOCKET} =~ s/^@/\0/r;
my $sock = IO::Socket::UNIX->new(
Type => SOCK_DGRAM(),
Peer => $addr,
) or warn "cannot connect to socket $ENV{NOTIFY_SOCKET}: $!\n";
if ($sock) {
$sock->autoflush(1);
print { $sock } $message
or warn "cannot send to socket $ENV{NOTIFY_SOCKET}: $!\n";
close $sock;
}
} else {
warn "NOTIFY_SOCKET not set\n";
}
}
sub daemonize {
my $pidfile = shift;
my $pidfh;
chdir '/' or FATAL "Can't chdir to /: $!\n";
$pidfh = create_pidfile($pidfile);
defined(my $pid = fork) or FATAL "Can't fork: $!\n";
exit if $pid;
setsid or FATAL "Can't start a new session: $!\n";
write_pidfile($pidfh);
return $pidfh;
}
sub signal_handler {
$shutdown = 1;
}
sub debug_rating_time {
my $t = shift;
my $cdr_id = shift;
my $error = shift;
DEBUG sub { "rating cdr ID $cdr_id " . ($error ? "aborted after" : "completed successfully in") . ' ' . sprintf("%.3f",Time::HiRes::time() - $t) . " secs" };
}
sub _cps_delay {
if ($cps_info->{delay} > 0.0) {
INFO "Sleeping for ".sprintf("%.3f",$cps_info->{delay})." seconds";
Time::HiRes::sleep($cps_info->{delay});
}
}
sub _update_cps {
my $num_of_cdrs = shift;
$cps_info->{rated_old} = $cps_info->{rated};
$cps_info->{rated} += $num_of_cdrs;
$cps_info->{d_rated_old} = $cps_info->{d_rated};
$cps_info->{d_rated} = $cps_info->{rated} - $cps_info->{rated_old};
$cps_info->{dd_rated} = $cps_info->{d_rated} - $cps_info->{d_rated_old}; # if 2nd order is to be used.
$cps_info->{t_old} = $cps_info->{t};
$cps_info->{t} = Time::HiRes::time();
$cps_info->{dt} = $cps_info->{t} - $cps_info->{t_old};
if ($cps_info->{dt} > 0.0) {
$cps_info->{cps} = $cps_info->{d_rated} / $cps_info->{dt};
DEBUG sprintf("%.1f",$cps_info->{cps} )." CDRs per sec";
} else {
$cps_info->{cps} = ~0;
}
if ($cps_info->{d_rated} > 0) { # using first order for now.
if (($cps_info->{delay} + $cps_info->{speedup}) > 0.0) {
$cps_info->{delay} -= $cps_info->{speedup};
DEBUG "reducing delay";
}
} else { #if ($cps_info->{dd_rated} < 0.0) {
if ($cps_info->{delay} < ($loop_interval - $cps_info->{speeddown})) {
$cps_info->{delay} += $cps_info->{speeddown};
DEBUG "increasing delay";
}
#} else {
}
}
sub main {
my $pidfh;
# Without autoflush logs are buffered due to
# journald which is buffering Perl STDOUT
# (STDERR flushed immediately which confusing)
select->autoflush(1);
INFO "Starting rate-o-mat.\n";
if ($fork != 0) {
$pidfh = daemonize($pidfile);
} elsif ($pidfile) {
$pidfh = create_pidfile($pidfile);
write_pidfile($pidfh);
}
local $SIG{TERM} = \&signal_handler;
local $SIG{INT} = \&signal_handler;
local $SIG{QUIT} = \&signal_handler;
local $SIG{HUP} = \&signal_handler;
if ($maintenance_mode eq 'yes') {
INFO "Up and doing nothing in the maintenance mode.\n";
notify_send("READY=1\n");
while (!$shutdown) {
sleep(1);
}
exit(0);
}
DEBUG "Init DB on start...\n";
init_db or FATAL "Error initializing database handlers\n";
my $rated = 0;
my $next_del = 10000;
my %failed_counter_map = ();
my $init = 0;
my $last_cdr_id = 0;
INFO "Up and running.\n";
notify_send("READY=1\n");
BATCH: while (!$shutdown) {
$log_fatal = 1;
if ($init) {
DEBUG "Init DB in loop...\n";
init_db or FATAL "Error initializing database handlers\n";
}
clear_prepaid_cost_cache();
my $error;
my @cdrs = ();
if ($billdbh && $acctdbh && $provdbh) {
eval {
get_unrated_cdrs(\@cdrs,\$last_cdr_id);
INFO "Grabbed ".(scalar @cdrs)." CDRs" if (scalar @cdrs) > 0;
};
$error = $@;
if ($error) {
if ($DBI::err and $DBI::err == 2006) {
INFO "DB connection gone, retrying...";
close_db();
$init = 1;
next BATCH;
}
FATAL "Error getting next bunch of CDRs: " . $error;
}
} else {
WARNING "no-op loop since mandatory db connections are n/a";
}
$shutdown and last BATCH;
my $rated_batch = 0;
my $t;
my $cdr_id;
my $info_prefix;
my $failed = 0;
eval {
## no critic (TestingAndDebugging::ProhibitNoWarnings)
no warnings qw/ exiting /;
CDR: foreach my $cdr (@cdrs) {
$rollback = 0;
$log_fatal = 0;
$info_prefix = ($rated_batch + 1) . "/" . (scalar @cdrs) . " - ";
eval {
$t = Time::HiRes::time();
$cdr_id = $cdr->{id};
DEBUG "start rating CDR ID $cdr_id";
begin_transaction($acctdbh);
if ('unrated' ne lock_cdr($cdr)) {
commit_transaction($acctdbh);
check_shutdown() and last BATCH;
next CDR;
}
# required to avoid contract_balances duplications during catchup:
if ($multi_master) {
begin_transaction($billdbh);
} else {
begin_transaction($billdbh, 'READ COMMITTED');
}
# row locks are released upon commit/rollback and have to cover
# the whole transaction. thus locking contract rows for preventing
# concurrent catchups will be our very first SQL statement in the
# billingdb transaction:
lock_contracts($cdr);
begin_transaction($provdbh);
begin_transaction($dupdbh);
INFO $info_prefix."rate CDR ID ".$cdr->{id};
rate_cdr($cdr, $type) && update_cdr($cdr);
# we would need a XA/distributed transaction manager for this:
commit_transaction($acctdbh);
commit_transaction($billdbh);
commit_transaction($provdbh);
commit_transaction($dupdbh);
$rated_batch++;
delete $failed_counter_map{$cdr_id};
debug_rating_time($t,$cdr_id,0);
check_shutdown() and last BATCH;
_update_cps(1); # unless ($rated_batch % 5);
_cps_delay();
};
$error = $@;
if ($error) {
debug_rating_time($t,$cdr_id,1);
if ($rollback) {
INFO $info_prefix."rolling back changes for CDR ID $cdr_id";
rollback_all();
next CDR; #move on to the next cdr of the batch
} else {
$failed_counter_map{$cdr_id} = 0 if !exists $failed_counter_map{$cdr_id};
if ($failed_counter_map{$cdr_id} < $failed_cdr_max_retries && !defined $DBI::err) {
WARNING $info_prefix."rating CDR ID $cdr_id aborted " .
($failed_counter_map{$cdr_id} > 0 ? " (retry $failed_counter_map{$cdr_id})" : "") .
": " . $error;
$failed_counter_map{$cdr_id} = $failed_counter_map{$cdr_id} + 1;
$failed += 1;
rollback_all();
next CDR; #move on to the next cdr of the batch
} else {
die($error); #rethrow
}
}
}
}
};
$log_fatal = 1;
$error = $@;
if ($error) {
if (defined $DBI::err) {
INFO "Caught DBI:err ".$DBI::err, "\n";
if ($DBI::err == 2006) {
INFO "DB connection gone, retrying...";
# disconnect from all of them so transactions are on par
rollback_all();
$billdbh->disconnect;
$provdbh and ($provdbh->disconnect);
$acctdbh->disconnect;
$dupdbh and ($dupdbh->disconnect);
close_db();
$init = 1;
next BATCH; #fetch new batch
} elsif ($DBI::err == 1213) {
INFO "Transaction concurrency problem, rolling back and retrying...";
rollback_all();
close_db();
$init = 1;
next BATCH; #fetch new batch
} else {
rollback_all();
FATAL $error; #terminate upon other DB errors
}
} else {
rollback_all();
FATAL $info_prefix."rating CDR ID $cdr_id aborted (failed ".
($failed_cdr_max_retries + 1)." times), please fix it manually: " . $error; #terminate
}
}
$rated += $rated_batch;
INFO "Batch of $rated_batch CDRs completed. $rated CDRs rated overall so far.\n";
unless (@cdrs) {
_update_cps(0);
_cps_delay();
}
if ($debug && $split_peak_parts && (scalar @cdrs) < 5) {
sleep $loop_interval; #split peak parts testcase
}
$shutdown and last BATCH;
if ($rated >= $next_del) { # not ideal imho
$next_del = $rated + 10000;
while ($sth_delete_old_prepaid->execute > 0) {
WARNING $sth_delete_old_prepaid->rows;
}
}
if ($failed > 0) {
INFO "There were $failed failed CDRs, sleep $failed_cdr_retry_delay";
sleep($failed_cdr_retry_delay);
}
close_db();
$init = 1;
}
notify_send("STOPPING=1\n");
INFO "Shutting down.\n";
close $pidfh;
unlink $pidfile;
}
sub close_db {
DEBUG "Closing DB connections.\n";
$sth_get_subscriber_contract_id->finish;
$sth_billing_info_network->finish;
$sth_billing_info->finish;
$sth_profile_info->finish;
$sth_profile_fraud_info->finish;
$sth_contract_fraud_info->finish;
$sth_upsert_cdr_period_costs->finish;
$sth_get_cdr_period_costs->finish;
$sth_offpeak->finish;
$sth_offpeak_subscriber->finish;
$sth_unrated_cdrs->finish;
$sth_get_cdr->finish;
$sth_lock_cdr->finish;
$sth_update_cdr->finish;
$split_peak_parts and $sth_create_cdr_fragment->finish;
$sth_mos_data->finish;
$sth_get_cbalances->finish;
$sth_update_cbalance_w_underrun_profiles_lock->finish;
$sth_update_cbalance_w_underrun_lock->finish;
$sth_update_cbalance_w_underrun_profiles->finish;
$sth_update_cbalance->finish;
$sth_new_cbalance->finish;
$sth_new_cbalance_infinite_future->finish;
$sth_get_last_cbalance->finish;
$sth_get_cbalance->finish;
$sth_get_first_cbalance->finish;
$sth_get_last_topup_cbalance->finish;
$sth_lnp_number->finish;
$sth_lnp_profile_info->finish;
$sth_get_contract_info->finish;
$sth_prepaid_costs_cache->finish;
$sth_prepaid_costs_count->finish;
$sth_prepaid_cost->finish;
$sth_delete_prepaid_cost->finish;
$sth_delete_old_prepaid->finish;
$sth_get_billing_voip_subscribers->finish;
$sth_lock_billing_subscribers->finish;
$sth_unlock_billing_subscribers->finish;
$sth_get_provisioning_voip_subscribers and $sth_get_provisioning_voip_subscribers->finish;
$sth_get_usr_preference_attribute and $sth_get_usr_preference_attribute->finish;
$sth_get_usr_preference_value and $sth_get_usr_preference_value->finish;
$sth_create_usr_preference_value and $sth_create_usr_preference_value->finish;
$sth_update_usr_preference_value and $sth_update_usr_preference_value->finish;
$sth_delete_usr_preference_value and $sth_delete_usr_preference_value->finish;
$sth_duplicate_cdr and $sth_duplicate_cdr->finish;
$sth_duplicate_mos_data and $sth_duplicate_mos_data->finish;
$sth_duplicate_upsert_cdr_period_costs and $sth_duplicate_upsert_cdr_period_costs->finish;
$sth_duplicate_get_cdr_period_costs and $sth_duplicate_get_cdr_period_costs->finish;
foreach (keys %cdr_col_models) {
my $model = $cdr_col_models{$_};
$model->{write_sth}->{sth}->finish;
$model->{read_sth}->{sth}->finish;
foreach (values %{$model->{dimension_sths}}) {
$_->{sth}->finish;
}
}
$billdbh->disconnect;
$acctdbh->disconnect;
$provdbh and $provdbh->disconnect;
$dupdbh and $dupdbh->disconnect;
}