package NGCP::Cleanup; use strict; use warnings; use English; use DBI; use DateTime; use Data::Dumper; use Log::Log4perl; use Log::Log4perl::Level; use Redis; my @queue = (); my %env = (); my %cmds = (); my $log; my $MAX_PARTITIONS = 24; sub new { my $class = shift; my (%args) = @_; my $self = bless {}, $class; my $config_file = $args{config_file} or die "config_file argument is required\n"; $self->init_config($config_file); $self->init_log(); $self->init_cmds(); local $OUTPUT_AUTOFLUSH = 1; return $self; } sub cmd { my ($self, $cmd) = @_; die "No command found $cmd" unless $cmds{$cmd}; return $cmds{$cmd}; } sub queue { my $self = shift; return \@queue; } sub env { my ($self, @val) = @_; return unless @val; if ($#val == 0) { return $env{$val[0]} // undef; } for (my $i = 0; $i < $#val; $i++) { next unless ($i % 2) == 0; $env{$val[$i]} = $val[$i+1]; } return ''; } sub logger { my ($self, $level, $str) = @_; my @caller = (caller 2); unless (@caller) { @caller = (caller 1); } my $caller = $caller[3]; $caller = (split(/::/, $caller))[-1] // ''; $log->log($level, $caller,': ',$str); return; } sub info { shift->logger($INFO, shift); } sub debug { shift->logger($DEBUG, shift); } sub error { shift->logger($ERROR, shift); } sub init_log { my $self = shift; my $debug = $self->env('debug') ? 'DEBUG' : 'INFO'; Log::Log4perl->init(\<get_logger("ngcp-cleanup"); } sub init_cmds { %cmds = ( connect => sub { my ($self, $db) = @_; my $host = $self->env('host'); my $user = $self->env('user') // $self->env('username'); my $pass = $self->env('pass') // $self->env('password'); my $dbi = "dbi:mysql:$db;host=$host"; my $dbh = DBI->connect($dbi, $user, $pass, { PrintError => 0, mysql_auto_reconnect => 1 }) or die "Failed to connect to DB $db ($host):".$DBI::errstr; $self->env(dbh => $dbh); $self->env(own_db => $db); }, 'connect-redis' => sub { my ($self, $db) = @_; my $host = $self->env('host'); my $port = $self->env('redis-port'); my $redis = Redis->new(server => $host.':'.$port) or die "Failed to connect to Redis $host:$port"; $redis->select($db); $self->env(redis => $redis); }, backup => sub { my ($self, $table) = @_; $table or die "No table name provided in the 'backup' command"; $self->env('dbh') or die "Not connected to a DB in the 'backup' command"; if ($self->env('timestamp-column')) { $self->env('time-column-mode' => 'timestamp'); $self->env('time-column' => $self->env('timestamp-column')); } elsif ($self->env('time-column')) { $self->env('time-column-mode' => 'time'); } foreach my $v (qw(time-column time-column-mode keep-months)) { $self->env($v) or die "Variable '$v' is not set in the 'backup' command"; } # deprecated commands, config upgrade is mandatory $self->env('backup-months') and die "backup-months is deprecated and replaced by keep-months, please upgrade or adjust the respective config file\n"; $self->env('retro-months') and die "retro-months is deprecated and removed, plase upgarde or adjust the respective config file\n"; # $self->backup_table($table); }, archive => sub { my ($self, $table) = @_; $table or die "No table name provided in the 'archive' command"; $self->env('dbh') or die "Not connected to a DB in the 'archive' command"; foreach my $v (qw(archive-months archive-target)) { $self->env($v) or die "Variable '$v' is not set in the 'archive' command"; } $self->archive_dump($table); }, cleanup => sub { my ($self, $table) = @_; $table or die "No table name provided in the 'cleanup' command"; $self->env('dbh') or die "Not connected to a DB in the 'cleanup' command"; if ($self->env('timestamp-column')) { $self->env('time-column-mode' => 'timestamp'); $self->env('time-column' => $self->env('timestamp-column')); } elsif ($self->env('time-column')) { $self->env('time-column-mode' => 'time'); } foreach my $v (qw(time-column time-column-mode cleanup-days)) { $self->env($v) or die "Variable '$v' is not set in the 'cleanup' command"; } $self->cleanup_table($table); }, 'cleanup-redis' => sub { my ($self, $scan_keys) = @_; $scan_keys or die "No key-pattern provided in the 'cleanup-redis' command"; foreach my $v (qw(time-column cleanup-days cleanup-mode)) { $self->env($v) or die "Variable '$v' is not set in the 'cleanup-redis' command"; } $self->env('redis') or die "Not connected to Redis in the 'cleanup-redis' command"; $self->cleanup_redis($scan_keys); }, ); } sub init_config { my ($self, $config_file) = @_; open(my $config_fh, '<', $config_file) or die "Couldn't open configuration file '$config_file'.\n"; while (my $line = <$config_fh>) { $line =~ s/^\s*//s; $line =~ s/\s*$//s; next if $line =~ /^#/; next if $line =~ /^$/; if ($line =~ /^([\w-]+)\s*=\s*(\S*)$/) { if (lc($1) eq 'maintenance' and $2 eq 'yes') { @queue = (); last; } $env{$1} = $2; if ($1 eq "time-column") { $env{"timestamp-column"} = undef; } elsif ($1 eq "timestamp-column") { $env{"time-column"} = undef; } next; } if ($line =~ /^([\w-]+)(?:\s+(.*?))?$/) { my ($cmd, $arg) = ($1, $2); push @queue, {cmd => $cmd, arg => $arg, env => {%env}}; } } close $config_fh; } sub process { my $self = shift; foreach my $q (@{$self->queue}) { $self->env(%{$q->{env}}); $self->cmd($q->{cmd})->($self, $q->{arg}); } } sub fetch_row { my ($self, $db, $table, $cols, $where) = @_; my $dbh = $self->env('dbh'); $db ||= $self->env('own_db'); $table = $db.'.'.$table; $where ||= ''; my @res = $dbh->selectrow_array(<fetch_row( 'information_schema', 'tables', 'table_name', "where table_schema = 'accounting' and table_name = '$table'"); return $pvalue; } sub get_table_row_format { my ($self, $table) = @_; my ($row_format) = $self->fetch_row( 'information_schema', 'tables', 'row_format', "where table_schema = 'accounting' and table_name = '$table'"); return $row_format; } sub check_partition_exists { my ($self, $table, $pname) = @_; my ($pvalue) = $self->fetch_row( 'information_schema', 'partitions', 'partition_description', "where table_schema = 'accounting' and table_name = '$table' and partition_name = '$pname'"); return $pvalue; } sub get_partition_row_format { my ($self, $table, $pname) = @_; my ($row_format) = $self->fetch_row( 'information_schema', 'innodb_sys_tables', 'row_format', "where name = 'accounting/$table#P#$pname'"); return $row_format; } sub check_table_partitioned { my ($self, $table) = @_; my ($pvalue) = $self->fetch_row( 'information_schema', 'partitions', 'partition_name', "where table_schema = 'accounting' and table_name = '$table' and partition_name IS NOT NULL limit 1"); return $pvalue; } sub build_partitions_list { my ($self, $table, $mpart, $mdt, $gap) = @_; my @parts = (); for (my $i = $gap; $i >= 0; $i--) { last if $i ==0 && $mpart eq 'pmax'; my $pname = $mdt->strftime('p%Y%m'); my $ndt = $mdt->clone; $ndt->add(months => 1)->truncate(to => "month"); my $nymd = $ndt->ymd('-'); push @parts, sprintf( "PARTITION %s VALUES LESS THAN(UNIX_TIMESTAMP('%s'))", $pname, $nymd); $mdt->subtract(months => 1); } @parts = reverse @parts; if ($mpart eq 'pmax') { push @parts, sprintf("PARTITION %s VALUES LESS THAN(MAXVALUE)", $mpart); } return join(",\n", @parts); } sub reorganize_partitions { my ($self, $table, $mpart, $mdt, $gap) = @_; my $dbh = $self->env('dbh'); my $parts = $self->build_partitions_list($table, $mpart, $mdt, $gap); my $sql = <debug($sql); $dbh->do($sql); die "Cannot reorganize partitions: ".$DBI::errstr if $DBI::err; return; } sub create_partitions { my ($self, $table, $mpart, $mdt, $gap) = @_; my $dbh = $self->env('dbh'); my $col = $self->env('time-column'); my $parts = $self->build_partitions_list($table, $mpart, $mdt, $gap); my $sql = <debug($sql); $dbh->do($sql); die "Cannot create partitions: ".$DBI::errstr if $DBI::err; } sub drop_partition { my ($self, $table, $pname) = @_; my $dbh = $self->env('dbh'); $dbh->do(<debug("drop partition=$pname from table=$table"); die "Cannot drop partition $pname: ".$DBI::errstr if $DBI::err; } sub update_partitions { my ($self, $table) = @_; my $dbh = $self->env('dbh'); my $db = $self->env('own_db'); my $keep_months = $self->env('keep-months'); my $col = $self->env('time-column'); my @cols = ("min($col)", "max($col)"); my ($min_ts, $max_ts) = $self->fetch_row(undef, $table, join(',', @cols)); return unless $min_ts; my $dt_min = DateTime->now(epoch => $min_ts, time_zone => 'local'); $dt_min->truncate(to => "month"); my $dt_max = DateTime->now(epoch => $max_ts, time_zone => 'local'); $dt_max->add(months => 1); $dt_max->truncate(to => 'month'); my $dt_diff = $dt_max-$dt_min; my $months = ($dt_diff->years*12+$dt_diff->months) * ($dt_diff->is_negative() ? -1 : 1); $self->debug(sprintf "table=%s checking start=%s end=%s", $table, $dt_min->strftime('%Y-%m'), $dt_max->strftime('%Y-%m')); if ($months >= $MAX_PARTITIONS) { die sprintf "%s%s", "Cannot update partitions for table=$table, ", "the gap between max record time and min record time is too large (more than 24 months)."; } if ($self->check_partition_exists($table, 'pmax')) { # already partitionined my $pname = $dt_max->strftime('p%Y%m'); my $mpart = 'pmax'; my $mdt = $dt_max->clone; my $gap = 0; my $all_parts = $dbh->selectall_hashref(<{pmax}; while ($dt_max >= $dt_min) { my $pname = $dt_max->strftime('p%Y%m'); unless ($self->check_partition_exists($table, $pname)) { $gap++; } else { if ($gap) { $self->reorganize_partitions($table, $mpart, $mdt, $gap); } $gap = 0; $mpart = $pname; $mdt = $dt_max->clone; delete $all_parts->{$pname}; } $dt_max->subtract(months => 1); } if ($gap) { $self->reorganize_partitions($table, $mpart, $mdt, $gap); } # drop existing unused partitions foreach my $part (sort { $a cmp $b } keys %{$all_parts}) { my $pdt = DateTime->now(time_zone => 'local', epoch => $all_parts->{$part}->{value}); # drop partitions older than the possible min if ($pdt <= $dt_min) { $self->drop_partition($table, $part); } } } else { my $mpart = 'pmax'; my $mdt = $dt_max->clone; $self->create_partitions($table, $mpart, $mdt, $months+1); } } sub backup_partition { my ($self, $table, $bm) = @_; my $dbh = $self->env('dbh'); my $col = $self->env('time-column'); my @cols = ("min($col)", "max($col)"); my $mtable = $table . '_' . $bm->strftime('%Y%m'); my $pname = 'p'.$bm->strftime('%Y%m'); $self->debug("checking table=$table pname=$pname"); if ($self->check_partition_exists($table, $pname)) { my ($min_ts, $max_ts) = $self->fetch_row(undef, $table, join(',', @cols), "PARTITION ($pname)"); return unless $min_ts; $self->debug("table=$table backup=$mtable"); if ($self->check_table_exists($mtable)) { $self->info("table $mtable already exists, fallback to the table backup method"); $self->delete_loop($table, $bm); } else { $dbh->do("create table $mtable like $table"); die "Cannot create table: $mtable: ".$DBI::errstr if $DBI::err; if ($self->check_table_partitioned($mtable)) { # drop partitioning layout inherited from copied table creation $dbh->do("alter table $mtable remove partitioning"); } my $mtable_row_format = lc($self->get_table_row_format($mtable)); my $partition_row_format = lc($self->get_partition_row_format($table,$pname)); if ($mtable_row_format ne $partition_row_format) { $self->info("table $mtable row_format ($mtable_row_format) differs from table $table partition $pname row_format ($partition_row_format), fallback to the table backup method"); $self->delete_loop($table, $bm); } else { $dbh->do("alter table $table exchange partition $pname with table $mtable"); die "Cannot exchange partition $table -> $mtable: ".$DBI::errstr if $DBI::err; } } } } sub delete_loop { my ($self, $table, $bm) = @_; my $dbh = $self->env('dbh'); my $batch = $self->env('batch') // 0; my $limit = $batch ? "limit $batch" : ''; my $col = $self->env('time-column'); my $col_mode = $self->env('time-column-mode'); my $mstart = $bm->strftime('%Y-%m-01 00:00:00'); my $mend = $bm->clone->add(months => 1)->strftime('%Y-%m-%d %H:%M:%S'); my $mtable = $table . '_' . $bm->strftime('%Y%m'); $self->debug("table=$table backup=$mtable"); my $sth = $dbh->prepare("show fields from $table"); $sth->execute; my $fieldinfo = $sth->fetchall_hashref('Field'); $sth->finish; my @keycols = (); my @upsertcols = (); foreach my $fieldname (keys %$fieldinfo) { if (uc($fieldinfo->{$fieldname}->{'Key'}) eq 'PRI') { push @keycols,$fieldname; } else { push @upsertcols,$fieldname; } } die "No primary key columns for table $table" unless @keycols; my $primary_key_cols = join(",",@keycols); my $upsert_cols = join(",", map { $_ . '=VALUES(' . $_ . ')'; } @upsertcols); $dbh->do("create table if not exists $mtable like $table"); while (1) { my $temp_table = $table . "_tmp"; my $size = 0; if ($col_mode eq "time") { $size = $dbh->do(<= ? AND $col < ? $limit) SQL or die "Failed to create temporary table $temp_table: " . $DBI::errstr; } else { $size = $dbh->do(<= UNIX_TIMESTAMP(?) AND $col < UNIX_TIMESTAMP(?) $limit) SQL or die "Failed to create temporary table $temp_table: " . $DBI::errstr; } if ($size > 0) { if (scalar @upsertcols) { $dbh->do(<do(<do(<do("DROP TEMPORARY TABLE $temp_table") or die "Failed to drop temporary table $temp_table: " . $DBI::errstr; last unless $size > 0; } } sub archive_dump { my ($self, $table) = @_; $self->debug($table); my $dbh = $self->env('dbh'); my $db = $self->env('own_db'); my $month = $self->env('archive-months'); my $target = $self->env('archive-target'); my $compress = $self->env('compress'); my $user = $self->env('user') // $self->env('username'); my $pass = $self->env('pass') // $self->env('password'); my $host = $self->env('host'); my $now = DateTime->now(time_zone => 'local'); my $bm = $now->clone; while ($bm->strftime('%Y%m') ge '198001') { $now = DateTime->now(time_zone => 'local'); $bm = $now->clone; $bm->subtract(months => $month)->truncate(to => "month"); my $mtable = $table . "_" . $bm->strftime('%Y%m'); my $res = $dbh->selectcol_arrayref("show table status like ?", undef, $mtable); $month++; ($res && @$res && $res->[0]) or next; if ($target ne '/dev/null') { my $fname = sprintf("%s/%s.%s.sql", $target, $mtable, $now->strftime("%Y%m%d%H%M%S")); my @cmd = ('mysqldump'); $user and push(@cmd, "-u" . $user); $pass and push(@cmd, "-p" . $pass); $host and push(@cmd, "-h" . $host); push(@cmd, "--opt", $db, $mtable); for (@cmd) { s/'/'"'"'/g; $_ = "'$_'" } my $cmd = join(' ', @cmd); if (system("$cmd > $fname")) { unlink($fname); die "MySQL DUMP of table $mtable into file $fname failed\n"; } if ($compress eq 'gzip') { if (system("nice gzip -9 $fname")) { unlink($fname, "$fname.gz"); die "Gzipping of dump file $fname failed\n"; } } $self->debug("created backup=$fname"); } $self->debug("drop table=$mtable"); $dbh->do("drop table $mtable"); } } sub backup_table { my ($self, $table) = @_; my $use_part = $self->env('use-partitioning') // 'no'; if ($use_part eq 'yes') { eval { $self->update_partitions($table) }; if ($EVAL_ERROR) { $self->debug("table=$table cannot be used for partitioning, falling back to sql backups"); $use_part = 'no'; } } my $data_moved = 0;; my $keep_months = $self->env('keep-months'); my $col = $self->env('time-column'); my @cols = ("min($col)", "max($col)"); my ($min_ts, $max_ts) = $self->fetch_row(undef, $table, join(',', @cols)); return unless $min_ts; my $dt_min = DateTime->now(epoch => $min_ts, time_zone => 'local'); $dt_min->truncate(to => "month"); my $dt_max = DateTime->now(time_zone => 'local'); $dt_max->truncate(to => "month"); $dt_max->subtract(months => $keep_months); while ($dt_min <= $dt_max) { if ($use_part eq 'yes') { $self->backup_partition($table, $dt_min); } else { $self->delete_loop($table, $dt_min); } $dt_min->add(months => 1); $data_moved = 1; } if ($use_part eq 'yes' && $data_moved) { $self->update_partitions($table); } return 1; } sub cleanup_table { my ($self, $table) = @_; $self->debug($table); my $batch = $self->env('batch') // 0; my $limit = $batch ? "limit $batch" : ''; my $col = $self->env('time-column'); my $col_mode = $self->env('time-column-mode'); my $dbh = $self->env('dbh'); my $cleanup_days = $self->env('cleanup-days'); my $deleted_rows = 0; while (1) { my $aff = 0; if ($col_mode eq "time") { $aff = $dbh->do(<do(<error("Cannot delete: $DBI::errstr") if $DBI::err; $aff //= 0; $deleted_rows += $aff; last unless $aff > 0; } $self->debug("table=$table deleted rows=$deleted_rows"); } sub cleanup_redis { my ($self, $scan_keys) = @_; $self->debug("cleanup-redis: ".$scan_keys); my $batch = $self->env('redis-batch') // 0; my $redis = $self->env('redis'); my $time_col = $self->env('time-column'); my $dbh = $self->env('dbh') // undef; my $db = $self->env('own_db') // undef; my $table = 'acc_backup'; my $cleanup_days = $self->env('cleanup-days'); my $cleanup_mode = $self->env('cleanup-mode'); my $deleted_rows = 0; my $cursor = 0; my $now = time; my @sql_buffer; my @cols = qw(method callid); my $query = ""; if ($cleanup_mode eq "mysql") { @cols = map { $_->[0] } @{$dbh->selectall_arrayref(<scan($cursor, MATCH => $scan_keys, COUNT => $batch); $cursor = shift @{ $res }; my $keys = shift @{ $res }; my $sql = "INSERT IGNORE INTO $table (" . join(',', @cols) . ") VALUES "; my %vals = (); foreach my $key (@{ $keys }) { my %data = $redis->hgetall($key); my $data_ok = 1; foreach my $c (($time_col, qw/callid method/)) { unless ($data{$c}) { $self->error("missing '$c' column") ; $data_ok = 0; last; } if ($c eq $time_col && $data{$c} !~ /^\d+(\.\d+)?$/) { $self->error("invalid time column '$time_col' format, expected unixtime"); $data_ok = 0; last; } } last if not $data_ok; my $time = $data{$time_col}; next if (($time + $cleanup_days*86400) > $now); if ($cleanup_mode eq "mysql") { my @query_data = map { quote_field($data{$_},",","'","\\"); } @cols; $query .= "(" . join(",", @query_data) . "),"; } $vals{$key} = { meth => $data{method}, cid => $data{callid}, }; } last unless %vals; $sql .= substr($query, 0, -1); my $sql_ok = $cleanup_mode eq "mysql" && $dbh->do($sql); if ($cleanup_mode eq "delete" || $sql_ok) { foreach my $key (keys %vals) { # delete direct entry $redis->del($key); my $cid = $vals{$key}->{cid}; my $meth = $vals{$key}->{meth}; # delete from cid map $redis->srem("acc:cid::$cid", $key) if defined $cid; # delete from meth map $redis->srem("acc:meth::$meth", $key) if defined $meth; } } else { $self->error("insert into mysql: ".$DBI::errstr) if $DBI::err; last; } $deleted_rows += $#$keys+1; last unless $cursor; } if ($cleanup_mode eq "mysql") { $self->debug("redis=$scan_keys mode=$cleanup_mode table=$db.$table moved rows=$deleted_rows"); } else { $self->debug("redis=$scan_keys mode=$cleanup_mode deleted rows=$deleted_rows"); } return; } sub quote_field { my ($field,$sep,$quotes,$escape_symbol) = @_; if (defined $quotes and length($quotes) > 0) { if (defined $escape_symbol and length($escape_symbol) > 0) { if (defined $field and length($field) > 0) { foreach my $escape ($escape_symbol,$quotes) { my $escape_re = quotemeta($escape); #fun $field =~ s/($escape_re)/$escape_symbol$1/g; } $field = $quotes . $field . $quotes; } else { $field = $quotes . $quotes; } } else { if (defined $field and length($field) > 0) { $field = $quotes . $field . $quotes; } else { $field = $quotes . $quotes; } } } else { if (defined $escape_symbol and length($escape_symbol) > 0) { if (defined $field and length($field) > 0) { foreach my $escape ($escape_symbol,$sep) { my $escape_re = quotemeta($escape); #fun $field =~ s/($escape_re)/$escape_symbol$1/g; } } else { $field = ''; } } else { if (not defined $field or length($field) == 0) { $field = ''; } } } return $field; } 1;