parent
4a56e3f8e4
commit
f654b0470f
@ -0,0 +1,580 @@
|
||||
package DBIx::RetryOverDisconnects;
|
||||
use base 'DBI';
|
||||
use strict;
|
||||
use 5.006;
|
||||
|
||||
our $VERSION = '0.05';
|
||||
our ($errstr, $err);
|
||||
use Exception::Class;
|
||||
use constant PRIV => 'private_DBIx-RetryOverDisconnects_data';
|
||||
|
||||
=head1 NAME
|
||||
|
||||
DBIx::RetryOverDisconnects - DBI wrapper that helps to deal with databases connection problems
|
||||
|
||||
=head1 SYNOPSIS
|
||||
|
||||
use DBIx::RetryOverDisconnects;
|
||||
my $dbh = DBIx::RetryOverDisconnects->connect($dsn, $user, $pass, {
|
||||
ReconnectRetries => 5,
|
||||
ReconnectInterval => 1,
|
||||
ReconnectTimeout => 3,
|
||||
TxnRetries => 3,
|
||||
});
|
||||
|
||||
#All of this 3 methods will be successfuly completed despite of
|
||||
#possible connection losses except for sql errors.
|
||||
$dbh->do("...");
|
||||
my $sth = $dbh->prepare("...");
|
||||
$sth->execute(...);
|
||||
|
||||
#other functionality that DBI supports
|
||||
|
||||
$dbh->begin_work;
|
||||
my $ok = eval {
|
||||
$dbh->do("...");
|
||||
#...code
|
||||
$dbh->do("...");
|
||||
$dbh->commit;
|
||||
1;
|
||||
};
|
||||
|
||||
unless ($ok) {
|
||||
if ($dbh->is_trans_disconnect) {
|
||||
#connection to database has been lost during transaction
|
||||
#$dbh has been already reconnected to database as we felt here.
|
||||
#It is now safe to retry the transaction from the beginning.
|
||||
}
|
||||
elsif($dbh->is_fatal_disconnect) {
|
||||
#database is down and reconnect retries limit is reached
|
||||
}
|
||||
elsif($dbh->is_sql_error) {
|
||||
#all other DBI's errors that are not related to connection problems
|
||||
$dbh->rollback;
|
||||
#deal with sql error;
|
||||
}
|
||||
}
|
||||
|
||||
#or simply run the perl code in transaction mode.
|
||||
$dbh->txn_do(sub {
|
||||
$dbh->do("...");
|
||||
#...code
|
||||
$dbh->do("...");
|
||||
});
|
||||
#successful completion is guaranteed except for sql or perl errors.
|
||||
|
||||
=head1 DESCRIPTION
|
||||
|
||||
This wrapper intercepts all requests. If some request fails this module
|
||||
detects the reason of fail. If the reason was database connection problem
|
||||
then wrapper would automatically reconnect and restart the query. Otherwise
|
||||
it would rethrow the exception.
|
||||
|
||||
If you are not in transaction then you can just do
|
||||
|
||||
$dbh->do('...');
|
||||
$sth->execute(...);
|
||||
|
||||
This might have 2 fatal cases:
|
||||
|
||||
=over
|
||||
|
||||
=item *
|
||||
|
||||
SQL error (a good reason to die).
|
||||
|
||||
=item *
|
||||
|
||||
Reconnect retries limit reached (database is completely down or network failure).
|
||||
|
||||
=back
|
||||
|
||||
For example, if the connection to database were lost during 'execute' call, the module
|
||||
would reconnect to database with a timeout 'ReconnectTimeout'. If reconnect failed it
|
||||
would reconnect again 'ReconnectRetries' times with 'ReconnectInterval' interval
|
||||
(in seconds). If reconnect retries limit was reached it would raise an error and
|
||||
$dbh->is_fatal_disconnect would be true.
|
||||
|
||||
If you are in transaction then even DB disconnect will raise an error.
|
||||
But you can check $dbh->is_trans_disconnect and restart the transaction if it is 'true'.
|
||||
Other possible errors are the same: sql error and reconnect limit.
|
||||
|
||||
The recommended way of using transactions is
|
||||
|
||||
$dbh->txn_do($code_ref);
|
||||
|
||||
because 'txn_do' would automatically restart the transaction if it was failed because
|
||||
of database disconnect. The transaction can be restarted at most 'TxnRetries' times.
|
||||
If 'TxnRetries' limit was reached then error would be raised and
|
||||
$dbh->is_fatal_trans_disconnect set to true.
|
||||
Other error cases are the same as above.
|
||||
|
||||
'txn_do' would try do to rollback if there was a perl or sql error (no rollback needed
|
||||
when you loose connection to database: DB server already has done it).
|
||||
Rollback is successul when $@ =~ /Rollback OK/;
|
||||
|
||||
Note: For the perfomance reasons, DBI attribute 'RaiseError' is always set to 'true'.
|
||||
|
||||
=head1 METHODS
|
||||
|
||||
=head1 Class methods
|
||||
|
||||
=head2 connect
|
||||
|
||||
DBIx::RetryOverDisconnects->connect($dsn, $user, $pass, $attrs);
|
||||
|
||||
All parameters are passed directly to DBI.
|
||||
Additional $attrs are
|
||||
|
||||
=over
|
||||
|
||||
=item *
|
||||
|
||||
ReconnectRetries - How many times DBIx::RetryOverDisconnects will try to reconnect
|
||||
to database. Default to 5.
|
||||
|
||||
=item *
|
||||
|
||||
ReconnectInterval - Interval (in seconds) between reconnect attemps.
|
||||
Default to 2.
|
||||
|
||||
=item *
|
||||
|
||||
ReconnectTimeout - Timeout (in seconds) for waiting the database to accept
|
||||
connection (because sometimes DBI->connect can block your application).
|
||||
Default to 5.
|
||||
|
||||
=item *
|
||||
|
||||
TxnRetries - How many times the wrapper would try to restart transaction if it was
|
||||
failed because of database connection problems. Default to 4.
|
||||
|
||||
=back
|
||||
|
||||
=cut
|
||||
|
||||
sub connect {
|
||||
my ($this, $dsn, $user, $pass, $attrs) = @_;
|
||||
|
||||
my $self_attrs = $this->get_self_attrs($attrs);
|
||||
$attrs->{RaiseError} = 1;
|
||||
my $self = $this->SUPER::connect($dsn, $user, $pass, $attrs);
|
||||
my $driver = $self->{Driver}{Name};
|
||||
Exception::Class::Base->new("Sorry, driver '$driver' is not yet supported\n")->throw
|
||||
unless DBIx::RetryOverDisconnects::db->can('is_disconnect_'.lc($driver));
|
||||
$self_attrs->{AutoCommit} = $self->{AutoCommit};
|
||||
$self->{PRIV()} = $self_attrs;
|
||||
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub get_self_attrs {
|
||||
my ($this, $attrs) = @_;
|
||||
return {
|
||||
retries => exists $attrs->{ReconnectRetries} ? (delete $attrs->{ReconnectRetries}) : 5,
|
||||
interval => (delete $attrs->{ReconnectInterval}) || 1,
|
||||
timeout => (delete $attrs->{ReconnectTimeout}) || 5,
|
||||
txn_retries => (delete $attrs->{TxnRetries}) || 4,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
package DBIx::RetryOverDisconnects::db;
|
||||
use base 'DBI::db';
|
||||
use strict;
|
||||
|
||||
use constant PRIV => DBIx::RetryOverDisconnects::PRIV();
|
||||
|
||||
sub clone {
|
||||
my $self = shift;
|
||||
local $^W = 0;
|
||||
my $new_self = $self->SUPER::clone(@_) or return;
|
||||
$new_self->{PRIV()} = { %{ $self->{PRIV()} } };
|
||||
return $new_self;
|
||||
}
|
||||
|
||||
=head1 Database handle object methods
|
||||
|
||||
=head2 set_callback
|
||||
|
||||
$dbh->set_callback(afterReconnect => $code_ref);
|
||||
|
||||
Set callbacks for some events. Currently only afterReconnect is supported.
|
||||
It is called after every successful reconnect to database.
|
||||
|
||||
=cut
|
||||
|
||||
sub set_callback {
|
||||
my ($self, %callbacks) = @_;
|
||||
my $old = $self->{PRIV()}->{callback} || {};
|
||||
$self->{PRIV()}->{callback} = {%$old, %callbacks};
|
||||
return;
|
||||
}
|
||||
|
||||
sub exc_conn_trans {
|
||||
my $self = shift;
|
||||
my $msg = 'Connection to database lost while in transaction';
|
||||
$DBIx::RetryOverDisconnects::errstr = $msg;
|
||||
$DBIx::RetryOverDisconnects::err = 3;
|
||||
Exception::Class::Base->new($msg);
|
||||
}
|
||||
|
||||
sub exc_conn_trans_fatal {
|
||||
my $self = shift;
|
||||
my $msg = 'Connection to database lost while in transaction (retries exceeded)';
|
||||
$DBIx::RetryOverDisconnects::errstr = $msg;
|
||||
$DBIx::RetryOverDisconnects::err = 4;
|
||||
Exception::Class::Base->new($msg);
|
||||
}
|
||||
|
||||
=head2 is_fatal_trans_disconnect
|
||||
|
||||
Returns 'true' if last failed operation was txn_do and TxnRetries limit
|
||||
was reached.
|
||||
|
||||
=cut
|
||||
|
||||
sub is_fatal_trans_disconnect {$DBIx::RetryOverDisconnects::err == 4}
|
||||
|
||||
=head2 is_trans_disconnect
|
||||
|
||||
Return 'true' if last failed operation was a transaction and it could be restarted.
|
||||
The database handle was successfuly reconnected again.
|
||||
|
||||
=cut
|
||||
|
||||
sub is_trans_disconnect {$DBIx::RetryOverDisconnects::err == 3}
|
||||
|
||||
=head2 is_fatal_disconnect
|
||||
|
||||
Return 'true' if reconnect retries limit has been reached. In this case the
|
||||
database handle is not connected.
|
||||
|
||||
=cut
|
||||
|
||||
sub is_fatal_disconnect {$DBIx::RetryOverDisconnects::err == 2}
|
||||
|
||||
=head2 is_sql_error
|
||||
|
||||
Return 'true' if query failed because of some other reason, not related to
|
||||
database connection problems. See $DBI::errstr for details.
|
||||
|
||||
=cut
|
||||
|
||||
sub is_sql_error {$DBIx::RetryOverDisconnects::err == 1}
|
||||
|
||||
sub exc_conn_fatal {
|
||||
my $self = shift;
|
||||
my $msg = 'Connection to database lost (retries exceeded)';
|
||||
$DBIx::RetryOverDisconnects::errstr = $msg;
|
||||
$DBIx::RetryOverDisconnects::err = 2;
|
||||
Exception::Class::Base->new($msg);
|
||||
}
|
||||
|
||||
sub exc_flush {
|
||||
my $self = shift;
|
||||
$DBIx::RetryOverDisconnects::errstr = undef;
|
||||
$DBIx::RetryOverDisconnects::err = undef;
|
||||
}
|
||||
|
||||
sub exc_std {
|
||||
my ($self, $e) = @_;
|
||||
$DBIx::RetryOverDisconnects::errstr = 'standart DBI error';
|
||||
$DBIx::RetryOverDisconnects::err = 1;
|
||||
$e;
|
||||
}
|
||||
|
||||
foreach my $func (qw/
|
||||
prepare do statistics_info begin_work commit rollback
|
||||
selectrow_array selectrow_arrayref selectall_arrayref
|
||||
selectall_hashref
|
||||
/)
|
||||
{
|
||||
no strict 'refs';
|
||||
*$func = sub {
|
||||
my $self = shift;
|
||||
my $super_method = "SUPER::$func";
|
||||
my $data = $self->{PRIV()};
|
||||
return $self->$super_method(@_) if $data->{Intercept}; #Already protected
|
||||
|
||||
my ($retval, @retval);
|
||||
my $wa = wantarray;
|
||||
my $autocommit = $self->{AutoCommit};
|
||||
|
||||
while(1) {
|
||||
|
||||
$data->{Intercept} = 1;
|
||||
my $ok = eval {
|
||||
defined $wa ? $wa ? (@retval = $self->$super_method(@_)) :
|
||||
($retval = $self->$super_method(@_)) :
|
||||
$self->$super_method(@_);
|
||||
1;
|
||||
};
|
||||
$data->{Intercept} = 0;
|
||||
|
||||
last if $ok;
|
||||
|
||||
my $e = Exception::Class::Base->new( $DBI::errstr or $@ );
|
||||
return unless $self->take_measures($e, undef, $autocommit);
|
||||
}
|
||||
|
||||
return $wa ? @retval : $retval;
|
||||
};
|
||||
}
|
||||
|
||||
=head2 ping
|
||||
|
||||
Always returns 'true' or dies ($dbh->is_fatal_disconnect = true). Does original DBI::db's
|
||||
ping and if it is false then it reconnects.
|
||||
|
||||
=cut
|
||||
|
||||
sub ping {
|
||||
my $self = shift;
|
||||
return 1 if $self->SUPER::ping;
|
||||
my $in_trans = !$self->{AutoCommit};
|
||||
$self->reconnect;
|
||||
$self->exc_conn_trans->throw if $in_trans;
|
||||
return 1;
|
||||
}
|
||||
|
||||
sub take_measures {
|
||||
my ($self, $e, $sth, $autocommit) = @_;
|
||||
$self->exc_flush;
|
||||
$self->SUPER::ping and $self->exc_std($e)->rethrow;
|
||||
|
||||
my $is_disconnect_method = 'is_disconnect_'.lc($self->{Driver}->{Name});
|
||||
if ($self->$is_disconnect_method($e)) {
|
||||
warn "Disconnected!\n" if $self->{PrintError};
|
||||
return unless $self->reconnect($sth);
|
||||
$self->exc_conn_trans->throw unless $autocommit;
|
||||
return 1;
|
||||
}
|
||||
|
||||
$self->exc_std($e)->rethrow;
|
||||
}
|
||||
|
||||
sub is_disconnect_mysql {
|
||||
my $self = shift;
|
||||
local $_ = shift;
|
||||
return 1 if /lost\s+connection/i or /can't\s+connect/i or
|
||||
/server\s+shutdown/i or /MySQL\s+server\s+has\s+gone\s+away/i;
|
||||
return;
|
||||
}
|
||||
|
||||
sub is_disconnect_pg {
|
||||
my $self = shift;
|
||||
local $_ = shift;
|
||||
return 1 if /server\s+closed\s+the\s+connection\s+unexpectedly/i or
|
||||
/terminating connection/;
|
||||
return;
|
||||
}
|
||||
*is_disconnect_pgpp = *is_disconnect_pg;
|
||||
|
||||
sub is_disconnect_sqlite {} #SQLite has no connection problems. Isn't that right?
|
||||
*is_disconnect_sqlite2 = *is_disconnect_sqlite;
|
||||
|
||||
sub is_disconnect_oracle {
|
||||
#?
|
||||
}
|
||||
|
||||
sub is_disconnect_sybase {
|
||||
#?
|
||||
}
|
||||
|
||||
sub is_disconnect_db2 {
|
||||
#?
|
||||
}
|
||||
|
||||
sub reconnect {
|
||||
my ($self, $sth) = @_;
|
||||
my $data = $self->{PRIV()};
|
||||
my $new_dbh;
|
||||
|
||||
for (my $i = 1; (!$data->{retries} || $i <= $data->{retries}); $i++) {
|
||||
warn "Reconnect try #$i\n" if $self->{PrintError};
|
||||
my $alarm;
|
||||
local $SIG{ALRM} = sub {
|
||||
alarm(0);
|
||||
die($alarm = 1);
|
||||
};
|
||||
eval {
|
||||
alarm($data->{timeout});
|
||||
eval {
|
||||
local $^W = 0;
|
||||
$new_dbh = $self->clone;
|
||||
};
|
||||
alarm(0);
|
||||
};
|
||||
if ($new_dbh) {
|
||||
warn "Reconnected!\n" if $self->{PrintError};
|
||||
last;
|
||||
}
|
||||
sleep $data->{interval};
|
||||
}
|
||||
|
||||
($self->disconnect, $self->exc_conn_fatal->throw) unless $new_dbh;
|
||||
|
||||
$self->swap_inner_handle($new_dbh);
|
||||
$self->{PRIV()} = $data;
|
||||
$new_dbh->{PRIV()} = undef;
|
||||
$new_dbh->STORE('Active', 0);
|
||||
|
||||
$self->STORE('CachedKids', {});
|
||||
if ($sth) {
|
||||
my $new_sth = $self->prepare_cached($sth->{Statement});
|
||||
$sth->swap_inner_handle($new_sth, 1);
|
||||
$sth->restore_params($new_sth);
|
||||
$new_sth->finish;
|
||||
}
|
||||
$self->STORE('CachedKids', {});
|
||||
|
||||
#Now autocommit is broken (has been copied from disconnected handle)
|
||||
$self->{AutoCommit} = $data->{AutoCommit}; #Set initial value
|
||||
$new_dbh->disconnect;
|
||||
|
||||
#Call callback. Currently only one supported.
|
||||
if($self->{PRIV()}{callback} && (my $code = $self->{PRIV()}{callback}{afterReconnect})) {
|
||||
$code->($self, $sth) if ref $code eq 'CODE';
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
=head2 txn_do
|
||||
|
||||
$dbh->txn_do($code_ref);
|
||||
|
||||
Executes $code_ref in a transaction environment. Automatically reconnects and
|
||||
restarts the transaction in any case of connection problems.
|
||||
'txn_do' is able to die with one of the is_fatal_disconnect, is_sql_error,
|
||||
is_fatal_trans_disconnect set to true.
|
||||
|
||||
In most cases you don't need to wrap it into 'eval' because all of this exceptions
|
||||
are subject to die (database completely down, network down, bussiness logic error, etc).
|
||||
|
||||
=cut
|
||||
|
||||
sub txn_do {
|
||||
my ($self, $coderef) = (shift, shift);
|
||||
|
||||
Exception::Class::Base->new('$coderef must be a CODE reference')->throw
|
||||
unless ref $coderef eq 'CODE';
|
||||
|
||||
return $coderef->(@_) unless $self->{AutoCommit};
|
||||
|
||||
my $wa = wantarray;
|
||||
my (@result, $result, $i);
|
||||
while ('preved') {
|
||||
my $ok = eval {
|
||||
$self->begin_work;
|
||||
defined $wa ? $wa ? (@result = $coderef->(@_)) :
|
||||
($result = $coderef->(@_)) :
|
||||
$coderef->(@_);
|
||||
$self->commit;
|
||||
1;
|
||||
};
|
||||
last if $ok;
|
||||
|
||||
$self->exc_conn_trans_fatal->throw if $self->{PRIV()}{txn_retries} <= ++$i;
|
||||
next if $self->is_trans_disconnect;
|
||||
$@->rethrow if $self->is_fatal_disconnect;
|
||||
my $txn_err = $@;
|
||||
my $rollback_ok = eval {$self->rollback; 1};
|
||||
$txn_err .= $rollback_ok ? ' (Rollback OK)' : "(Rollback failed: $@)";
|
||||
Exception::Class::Base->new($txn_err)->throw;
|
||||
}
|
||||
|
||||
return $wa ? @result : $result;
|
||||
}
|
||||
|
||||
|
||||
package DBIx::RetryOverDisconnects::st;
|
||||
use base 'DBI::st';
|
||||
use strict;
|
||||
|
||||
use constant PRIV => DBIx::RetryOverDisconnects::PRIV();
|
||||
|
||||
foreach my $func (qw/execute execute_array execute_for_fetch/) {
|
||||
no strict 'refs';
|
||||
*$func = sub {
|
||||
my $self = shift;
|
||||
my $super_method = "SUPER::$func";
|
||||
my $dbh = $self->{Database};
|
||||
my $data = $dbh->{PRIV()};
|
||||
return $self->$super_method(@_) if $data->{Intercept}; #Already protected
|
||||
|
||||
my ($retval, @retval);
|
||||
my $wa = wantarray;
|
||||
my $autocommit = $dbh->{AutoCommit};
|
||||
|
||||
while(1) {
|
||||
|
||||
$data->{Intercept} = 1;
|
||||
my $ok = eval {
|
||||
defined $wa ? $wa ? (@retval = $self->$super_method(@_)) :
|
||||
($retval = $self->$super_method(@_)) :
|
||||
$self->$super_method(@_);
|
||||
1;
|
||||
};
|
||||
$data->{Intercept} = 0;
|
||||
|
||||
last if $ok;
|
||||
|
||||
my $e = Exception::Class::Base->new( $DBI::errstr or $@ );
|
||||
return unless $dbh->take_measures($e, $self, $autocommit);
|
||||
}
|
||||
return $wa ? @retval : $retval;
|
||||
};
|
||||
}
|
||||
|
||||
sub restore_params {
|
||||
my $self = shift;
|
||||
my $from = shift;
|
||||
|
||||
my $types = $from->{ParamTypes} || {};
|
||||
#Restore possible ParamArrays
|
||||
my $param_arrays = $from->{ParamArrays} || {};
|
||||
while (my($bind, $array) = each %$param_arrays) {
|
||||
$self->bind_param_array($bind, $array, $types->{$bind} ? $types->{$bind} : ());
|
||||
}
|
||||
|
||||
#Restore normal ph's values
|
||||
my $param_values = $from->{ParamValues} || {};
|
||||
my $i = 1;
|
||||
foreach my $bind_name (sort {($a =~ /(\d+)/)[0] <=> ($b =~ /(\d+)/)[0]} keys %$param_values) {
|
||||
$self->bind_param($i++, $param_values->{$bind_name}, $types->{$bind_name} ? $types->{$bind_name} : ());
|
||||
}
|
||||
}
|
||||
|
||||
=head1 OVERLOADED METHODS
|
||||
|
||||
=head2 Database handle object methods
|
||||
|
||||
prepare, do, statistics_info, begin_work, commit, rollback, selectrow_array,
|
||||
selectrow_arrayref, selectall_arrayref, selectall_hashref
|
||||
|
||||
=head2 Database statement object methods
|
||||
|
||||
execute, execute_array, execute_for_fetch
|
||||
|
||||
=head1 DATABASE SUPPORT
|
||||
|
||||
Currently PostgreSQL, MySQL and SQLite are supported. In the nearest future
|
||||
DB2, MSSQL, Oracle and Sybase will be supported.
|
||||
|
||||
=head1 SEE ALSO
|
||||
|
||||
L<DBI>, L<DBIx::Class>.
|
||||
|
||||
=head1 AUTHOR
|
||||
|
||||
Pronin Oleg <syber@cpan.org>
|
||||
|
||||
=head1 LICENSE
|
||||
|
||||
You may distribute this code under the same terms as Perl itself.
|
||||
|
||||
=cut
|
||||
|
||||
1;
|
@ -1 +1,2 @@
|
||||
usr/sbin
|
||||
usr/share/ngcp-rate-o-mat/DBIx
|
||||
|
Loading…
Reference in new issue