Updates keepalive implementation to detect uplink drops. Use of XEP-0199 xmpp ping.

Fix jabber authorization ignore.
cusax-fix
Damian Minkov 14 years ago
parent 2f33d1f22d
commit a1f49f30d2

@ -8,7 +8,6 @@
import java.util.*;
import net.java.sip.communicator.impl.protocol.jabber.extensions.keepalive.*;
import net.java.sip.communicator.impl.protocol.jabber.extensions.mailnotification.*;
import net.java.sip.communicator.service.protocol.*;
import net.java.sip.communicator.service.protocol.Message;
@ -42,21 +41,6 @@ public class OperationSetBasicInstantMessagingJabberImpl
private static final Logger logger =
Logger.getLogger(OperationSetBasicInstantMessagingJabberImpl.class);
/**
* KeepAlive interval for sending packets
*/
private static final long KEEPALIVE_INTERVAL = 180000l; // 3 minutes
/**
* The interval after which a packet is considered to be lost
*/
private static final long KEEPALIVE_WAIT = 20000l;
/**
* Indicates whether we should be sending our own keep alive packets.
*/
private boolean keepAliveEnabled = false;
/**
* The maximum number of unread threads that we'd be notifying the user of.
*/
@ -98,16 +82,6 @@ private class TargetAddress
*/
private static final long JID_INACTIVITY_TIMEOUT = 10*60*1000;//10 min.
/**
* The task sending packets
*/
private KeepAliveSendTask keepAliveSendTask = null;
/**
* The timer executing tasks on specified intervals
*/
private Timer keepAliveTimer;
/**
* Indicates the time of the last Mailbox report that we received from
* Google (if this is a Google server we are talking to). Should be included
@ -115,17 +89,6 @@ private class TargetAddress
*/
private long lastReceivedMailboxResultTime = -1;
/**
* The queue holding the received packets
*/
private final LinkedList<KeepAliveEvent> receivedKeepAlivePackets
= new LinkedList<KeepAliveEvent>();
/**
* Stores the number of keep alive packets that we haven't heard back for.
*/
private int failedKeepalivePackets = 0;
/**
* The provider that created us.
*/
@ -166,12 +129,6 @@ private class TargetAddress
this.jabberProvider = provider;
provider.addRegistrationStateChangeListener(
new RegistrationStateListener());
// register the KeepAlive Extension in the smack library
ProviderManager.getInstance()
.addIQProvider(KeepAliveEventProvider.ELEMENT_NAME,
KeepAliveEventProvider.NAMESPACE,
new KeepAliveEventProvider());
}
/**
@ -566,26 +523,6 @@ else if (evt.getNewState() == RegistrationState.REGISTERED)
if (enableGmailNotifications)
subscribeForGmailNotifications();
// run keep alive thread
if((keepAliveSendTask == null || keepAliveTimer == null)
&& keepAliveEnabled)
{
jabberProvider.getConnection().addPacketListener(
new KeepalivePacketListener(),
new PacketTypeFilter(
KeepAliveEvent.class));
keepAliveSendTask = new KeepAliveSendTask();
keepAliveTimer = new Timer("Jabber keepalive timer for <"
+ evt.getProvider().getAccountID() + ">", true);
keepAliveTimer.scheduleAtFixedRate(
keepAliveSendTask,
KEEPALIVE_INTERVAL,
KEEPALIVE_INTERVAL);
}
}
else if(evt.getNewState() == RegistrationState.UNREGISTERED
|| evt.getNewState() == RegistrationState.CONNECTION_FAILED
@ -598,17 +535,6 @@ else if(evt.getNewState() == RegistrationState.UNREGISTERED
smackMessageListener);
}
if(keepAliveSendTask != null)
{
keepAliveSendTask.cancel();
keepAliveSendTask = null;
}
if(keepAliveTimer != null)
{
keepAliveTimer.cancel();
keepAliveTimer = null;
}
smackMessageListener = null;
}
}
@ -767,161 +693,6 @@ public void processPacket(Packet packet)
}
/**
* Receives incoming KeepAlive Packets
*/
private class KeepalivePacketListener
implements PacketListener
{
/**
* Handles incoming keep alive packets.
*
* @param packet the packet that we need to handle if it is a keep alive
* one.
*/
public void processPacket(Packet packet)
{
if(packet != null && !(packet instanceof KeepAliveEvent))
return;
KeepAliveEvent keepAliveEvent = (KeepAliveEvent)packet;
if(logger.isDebugEnabled())
{
if (logger.isDebugEnabled())
logger.debug("Received keepAliveEvent from "
+ keepAliveEvent.getFromUserID()
+ " the message : "
+ keepAliveEvent.toXML());
}
receivedKeepAlivePackets.addLast(keepAliveEvent);
}
}
/**
* Task sending packets on intervals.
* The task is runned on specified intervals by the keepAliveTimer
*/
private class KeepAliveSendTask
extends TimerTask
{
/**
* Sends a single <tt>KeepAliveEvent</tt>.
*/
public void run()
{
// if we are not registerd do nothing
if(!jabberProvider.isRegistered())
{
if (logger.isTraceEnabled())
logger.trace("provider not registered. "
+"won't send keep alive. acc.id="
+ jabberProvider.getAccountID()
.getAccountUniqueID());
return;
}
KeepAliveEvent keepAliveEvent =
new KeepAliveEvent(jabberProvider.getConnection().getUser());
keepAliveEvent.setSrcOpSetHash(
OperationSetBasicInstantMessagingJabberImpl.this.hashCode());
keepAliveEvent.setSrcProviderHash(jabberProvider.hashCode());
// schedule the check task
keepAliveTimer.schedule(
new KeepAliveCheckTask(), KEEPALIVE_WAIT);
if (logger.isTraceEnabled())
logger.trace(
"send keepalive for acc: "
+ jabberProvider.getAccountID().getAccountUniqueID());
jabberProvider.getConnection().sendPacket(keepAliveEvent);
}
}
/**
* Check if the first received packet in the queue
* is ok and if its not or the queue has no received packets
* the this means there is some network problem, so fire event
*/
private class KeepAliveCheckTask
extends TimerTask
{
/**
* Checks if the first received packet in the queue is ok and if it is
* not or if the queue has no received packets then this means there
* is some network problem, so we fire an event
*/
public void run()
{
try
{
// check till we find a correct message
// or if NoSuchElementException is thrown
// there is no message
while(!checkFirstPacket());
failedKeepalivePackets = 0;
}
catch (NoSuchElementException ex)
{
logger.error(
"Did not receive last keep alive packet for account "
+ jabberProvider.getAccountID().getAccountUniqueID());
failedKeepalivePackets++;
// if we have 3 keepalive fails then unregister
if(failedKeepalivePackets == 3)
{
logger.error("unregistering.");
jabberProvider.unregister(false);
jabberProvider.fireRegistrationStateChanged(
jabberProvider.getRegistrationState(),
RegistrationState.CONNECTION_FAILED,
RegistrationStateChangeEvent.REASON_SERVER_NOT_FOUND,
null);
failedKeepalivePackets = 0;
}
}
}
/**
* Checks whether first packet in queue is ok
* @return <tt>true</tt> if the topmost keep alive packet seems to be ok
* and <tt>false</tt> otherwise.
*
* @throws NoSuchElementException if the topmost packet is malformed.
*/
private boolean checkFirstPacket()
throws NoSuchElementException
{
KeepAliveEvent receivedEvent
= receivedKeepAlivePackets.removeLast();
return
(jabberProvider.hashCode() == receivedEvent.getSrcProviderHash()
&& OperationSetBasicInstantMessagingJabberImpl.this.hashCode()
== receivedEvent.getSrcOpSetHash()
&& jabberProvider.getAccountID().getUserID()
.equals(receivedEvent.getFromUserID()));
}
}
/**
* Enable sending keep alive packets
* @param keepAliveEnabled boolean
*/
public void setKeepAliveEnabled(boolean keepAliveEnabled)
{
this.keepAliveEnabled = keepAliveEnabled;
}
/**
* A filter that prevents this operation set from handling multi user chat
* messages.

@ -1111,23 +1111,30 @@ public void run()
AuthorizationRequest req = new AuthorizationRequest();
AuthorizationResponse response
= handler.processAuthorisationRequest(req, srcContact);
Presence.Type responsePresenceType;
Presence.Type responsePresenceType = null;
if(response != null
&& response.getResponseCode()
.equals(AuthorizationResponse.ACCEPT))
{
responsePresenceType = Presence.Type.subscribed;
if (logger.isInfoEnabled())
logger.info("Sending Accepted Subscription");
}
else
if(response != null)
{
responsePresenceType = Presence.Type.unsubscribed;
if (logger.isInfoEnabled())
logger.info("Sending Rejected Subscription");
if(response.getResponseCode()
.equals(AuthorizationResponse.ACCEPT))
{
responsePresenceType = Presence.Type.subscribed;
if (logger.isInfoEnabled())
logger.info("Sending Accepted Subscription");
}
else if(response.getResponseCode()
.equals(AuthorizationResponse.REJECT))
{
responsePresenceType = Presence.Type.unsubscribed;
if (logger.isInfoEnabled())
logger.info("Sending Rejected Subscription");
}
}
// subscription ignored
if(responsePresenceType == null)
return;
Presence responsePacket = new Presence(
responsePresenceType);

@ -15,6 +15,7 @@
import javax.net.ssl.*;
import net.java.sip.communicator.impl.protocol.jabber.debugger.*;
import net.java.sip.communicator.impl.protocol.jabber.extensions.keepalive.*;
import net.java.sip.communicator.service.protocol.*;
import net.java.sip.communicator.service.protocol.event.*;
import net.java.sip.communicator.service.protocol.jabberconstants.*;
@ -1229,7 +1230,7 @@ public void unregister()
* Unregister and fire the event if requested
* @param fireEvent boolean
*/
void unregister(boolean fireEvent)
public void unregister(boolean fireEvent)
{
synchronized(initializationLock)
{
@ -1343,9 +1344,9 @@ protected void initialize(String screenname,
OperationSetBasicInstantMessagingJabberImpl basicInstantMessaging =
new OperationSetBasicInstantMessagingJabberImpl(this);
if (keepAliveStrValue != null)
basicInstantMessaging.setKeepAliveEnabled(Boolean
.parseBoolean(keepAliveStrValue));
if (keepAliveStrValue == null
|| !keepAliveStrValue.equalsIgnoreCase(Boolean.FALSE.toString()))
new KeepAliveManager(this);
addSupportedOperationSet(
OperationSetBasicInstantMessaging.class,
@ -1644,7 +1645,7 @@ public AccountID getAccountID()
* @return a reference to the <tt>XMPPConnection</tt> last opened by this
* provider.
*/
protected XMPPConnection getConnection()
public XMPPConnection getConnection()
{
return connection;
}

@ -7,14 +7,11 @@
package net.java.sip.communicator.impl.protocol.jabber.extensions.keepalive;
import org.jivesoftware.smack.packet.*;
import org.jivesoftware.smack.util.*;
/**
* KeepAlive Event. Events are send on specified interval
* and must be received from the sendin provider.
* Carries the information for the source ProtocolProvider and
* source OperationSet - so we can be sure that we are sending and receiving the
* same package.
* KeepAlive Event. Events are sent if there are no received packets
* for a specified interval of time.
* XEP-0199: XMPP Ping.
*
* @author Damian Minkov
*/
@ -22,17 +19,14 @@ public class KeepAliveEvent
extends IQ
{
/**
* Element name for source provider hash.
* Element name for ping.
*/
public static final String SOURCE_PROVIDER_HASH = "src-provider-hash";
public static final String ELEMENT_NAME = "ping";
/**
* Element name for source opset hash.
* Namespace for ping.
*/
public static final String SOURCE_OPSET_HASH = "src-opset-hash";
private int srcProviderHash = -1;
private int srcOpSetHash = -1;
public static final String NAMESPACE = "urn:xmpp:ping";
/**
* Constructs empty packet
@ -45,13 +39,15 @@ public KeepAliveEvent()
*
* @param to the address of the contact that the packet is to be sent to.
*/
public KeepAliveEvent(String to)
public KeepAliveEvent(String from, String to)
{
if (to == null)
{
throw new IllegalArgumentException("Parameter cannot be null");
}
setType(Type.GET);
setTo(to);
setFrom(from);
}
/**
@ -62,69 +58,10 @@ public KeepAliveEvent(String to)
public String getChildElementXML()
{
StringBuffer buf = new StringBuffer();
buf.append("<").append(KeepAliveEventProvider.ELEMENT_NAME).
append(" xmlns=\"").append(KeepAliveEventProvider.NAMESPACE).
append("\">");
buf.append("<").
append(SOURCE_PROVIDER_HASH).append(">").
append(getSrcProviderHash()).append("</").
append(SOURCE_PROVIDER_HASH).append(">");
buf.append("<").append(ELEMENT_NAME).
append(" xmlns=\"").append(NAMESPACE).
append("\"/>");
buf.append("<").
append(SOURCE_OPSET_HASH).append(">").
append(getSrcOpSetHash()).append("</").
append(SOURCE_OPSET_HASH).append(">");
buf.append("</").append(KeepAliveEventProvider.ELEMENT_NAME).append(">");
return buf.toString();
}
/**
* The user id sending this packet
* @return String user id
*/
public String getFromUserID()
{
if(getFrom() != null)
return StringUtils.parseBareAddress(getFrom());
else
return null;
}
/**
* Returns the hash of the source opeartion set sending this message
* @return int hash of the operation set
*/
public int getSrcOpSetHash()
{
return srcOpSetHash;
}
/**
* Returns the hash of the source provider sending this message
* @return int hash of the provider
*/
public int getSrcProviderHash()
{
return srcProviderHash;
}
/**
* Sets the hash of the source provider that will send the message
* @param srcProviderHash int hash of the provider
*/
public void setSrcProviderHash(int srcProviderHash)
{
this.srcProviderHash = srcProviderHash;
}
/**
* Sets the hash of the source opeartion set that will send the message
* @param srcOpSetHash int hash of the operation set
*/
public void setSrcOpSetHash(int srcOpSetHash)
{
this.srcOpSetHash = srcOpSetHash;
}
}

@ -12,23 +12,13 @@
/**
* The KeepAliveEventProvider parses KeepAlive Event packets.
* The KeepAliveEventProvider parses ping iq packets.
*
* @author Damian Minkov
*/
public class KeepAliveEventProvider
implements IQProvider
{
/**
* Element name for keepalive.
*/
public static final String ELEMENT_NAME = "keepalive";
/**
* Namespace for keepalive.
*/
public static final String NAMESPACE = "jitsi:iq:keepalive";
/**
* Creates a new KeepAliveEventProvider.
* ProviderManager requires that every PacketExtensionProvider has a public,
@ -38,7 +28,7 @@ public KeepAliveEventProvider()
{}
/**
* Parses a KeepAliveEvent packet .
* Parses a ping iq packet .
*
* @param parser an XML parser.
* @return a new IQ instance.
@ -49,38 +39,15 @@ public IQ parseIQ(XmlPullParser parser)
{
KeepAliveEvent result = new KeepAliveEvent();
boolean done = false;
while (!done)
{
try
{
int eventType = parser.next();
if(eventType == XmlPullParser.START_TAG)
{
if(parser.getName().equals(KeepAliveEvent.
SOURCE_PROVIDER_HASH))
{
result.setSrcProviderHash(Integer.parseInt(parser.
nextText()));
}
if(parser.getName().equals(KeepAliveEvent.SOURCE_OPSET_HASH))
{
result.setSrcOpSetHash(Integer.parseInt(parser.nextText()));
}
}
else if(eventType == XmlPullParser.END_TAG)
{
if(parser.getName().equals(ELEMENT_NAME))
{
done = true;
}
}
}
catch(NumberFormatException ex)
{
ex.printStackTrace();
}
}
String type = parser.getAttributeValue(null, "type");
String id = parser.getAttributeValue(null, "id");
String from = parser.getAttributeValue(null, "from");
String to = parser.getAttributeValue(null, "to");
result.setType(IQ.Type.fromString(type));
result.setPacketID(id);
result.setFrom(from);
result.setTo(to);
return result;
}

@ -0,0 +1,217 @@
package net.java.sip.communicator.impl.protocol.jabber.extensions.keepalive;
import net.java.sip.communicator.impl.protocol.jabber.*;
import net.java.sip.communicator.service.protocol.*;
import net.java.sip.communicator.service.protocol.event.*;
import net.java.sip.communicator.util.*;
import org.jivesoftware.smack.*;
import org.jivesoftware.smack.packet.*;
import org.jivesoftware.smack.provider.*;
import java.util.*;
/**
* XEP-0199: XMPP Ping. Tracks received packets and if for some interval
* there is nothing received.
*
* @author Damian Minkov
*/
public class KeepAliveManager
implements RegistrationStateChangeListener,
PacketListener
{
/**
* Our class logger
*/
private static final Logger logger =
Logger.getLogger(KeepAliveManager.class);
/**
* The task sending packets
*/
private KeepAliveSendTask keepAliveSendTask = null;
/**
* The timer executing tasks on specified intervals
*/
private Timer keepAliveTimer;
/**
* The last received packet from server.
*/
private long lastReceiveActivity = 0;
/**
* The interval between checks.
*/
private int keepAliveCheckInterval;
/**
* If we didn't receive a packet between two checks, we send a packet,
* so we can receive something error or reply.
*/
private String waitingForPacketWithID = null;
/**
* Our parent provider.
*/
private ProtocolProviderServiceJabberImpl parentProvider = null;
/**
* Creates manager.
* @param parentProvider the parent provider.
*/
public KeepAliveManager(ProtocolProviderServiceJabberImpl parentProvider)
{
this.parentProvider = parentProvider;
this.parentProvider.addRegistrationStateChangeListener(this);
// register the KeepAlive Extension in the smack library
// used only if somebody ping us
ProviderManager.getInstance()
.addIQProvider(KeepAliveEvent.ELEMENT_NAME,
KeepAliveEvent.NAMESPACE,
new KeepAliveEventProvider());
}
/**
* The method is called by a ProtocolProvider implementation whenever
* a change in the registration state of the corresponding provider had
* occurred.
* @param evt ProviderStatusChangeEvent the event describing the status
* change.
*/
public void registrationStateChanged(RegistrationStateChangeEvent evt)
{
if (logger.isDebugEnabled())
logger.debug("The provider changed state from: "
+ evt.getOldState()
+ " to: " + evt.getNewState());
if (evt.getNewState() == RegistrationState.REGISTERED)
{
parentProvider.getConnection().removePacketListener(this);
parentProvider.getConnection().addPacketListener(this, null);
keepAliveSendTask = new KeepAliveSendTask();
keepAliveCheckInterval =
2 * SmackConfiguration.getKeepAliveInterval();
if(keepAliveCheckInterval == 0)
keepAliveCheckInterval = 60000;
keepAliveTimer = new Timer("Jabber keepalive timer for <"
+ parentProvider.getAccountID() + ">", true);
keepAliveTimer.scheduleAtFixedRate(
keepAliveSendTask,
keepAliveCheckInterval,
keepAliveCheckInterval);
}
else if(evt.getNewState() == RegistrationState.UNREGISTERED
|| evt.getNewState() == RegistrationState.CONNECTION_FAILED
|| evt.getNewState() == RegistrationState.AUTHENTICATION_FAILED)
{
if(parentProvider.getConnection() != null)
parentProvider.getConnection().removePacketListener(this);
if(keepAliveSendTask != null)
{
keepAliveSendTask.cancel();
keepAliveSendTask = null;
}
if(keepAliveTimer != null)
{
keepAliveTimer.cancel();
keepAliveTimer = null;
}
}
}
/**
* A packet Listener for all incoming packets.
* @param packet an incoming packet
*/
public void processPacket(Packet packet)
{
// store that we have received
lastReceiveActivity = System.currentTimeMillis();
if(waitingForPacketWithID != null &&
waitingForPacketWithID.equals(packet.getPacketID()))
{
// we are no more waiting for this packet
waitingForPacketWithID = null;
}
if(packet instanceof KeepAliveEvent)
{
// replay only to server pings, to avoid leak of presence
KeepAliveEvent evt = (KeepAliveEvent)packet;
if(evt.getFrom() != null
&& evt.getFrom()
.equals(parentProvider.getAccountID().getService()))
{
parentProvider.getConnection().sendPacket(
IQ.createResultIQ(evt));
}
}
}
/**
* Task sending packets on intervals.
* The task is runned on specified intervals by the keepAliveTimer
*/
private class KeepAliveSendTask
extends TimerTask
{
/**
* Sends a single <tt>KeepAliveEvent</tt>.
*/
public void run()
{
// if we are not registered do nothing
if(!parentProvider.isRegistered())
{
if (logger.isTraceEnabled())
logger.trace("provider not registered. "
+"won't send keep alive for "
+ parentProvider.getAccountID().getDisplayName());
return;
}
if(System.currentTimeMillis() - lastReceiveActivity >
keepAliveCheckInterval)
{
if(waitingForPacketWithID != null)
{
logger.error("un-registering not received ping packet.");
parentProvider.unregister(false);
parentProvider.fireRegistrationStateChanged(
parentProvider.getRegistrationState(),
RegistrationState.CONNECTION_FAILED,
RegistrationStateChangeEvent.REASON_SERVER_NOT_FOUND,
null);
return;
}
// lets send a ping
KeepAliveEvent ping = new KeepAliveEvent(
parentProvider.getOurJID(),
parentProvider.getAccountID().getService()
);
waitingForPacketWithID = ping.getPacketID();
if (logger.isTraceEnabled())
logger.trace("send keepalive for acc: "
+ parentProvider.getAccountID().getDisplayName());
parentProvider.getConnection().sendPacket(ping);
}
}
}
}
Loading…
Cancel
Save