Re: [reSIProcate] asynchronous registration handling
Attached is the mod for asynchronous database handling.
There is a sequence/state diagram documented at
ServerRegistration::AsyncState.
If there are no objections, I will add this to the main branch as it keeps
the existing logic intact.
Please have a look and send back any comments.
Thanks,
-justin
-----Original Message-----
From: Robert Backhouse [mailto:robertb@xxxxxxxxxxxxx]
Sent: Thursday, March 12, 2009 1:36 PM
To: Justin Matthews
Cc: 'resiprocate-devel'
Subject: Re: [reSIProcate] asynchronous registration handling
Hi Justin,
Thanks for your reply. I've still got a couple of questions (sorry if
I'm missing something):
1. I presume you're suggesting that we reject the registration in
RegistrationHandler::onXxx(). How does the information that the
RegistrationPersistenceManager has failed reach the handler? The only
way I can see is to keep a map from aor to status, provide a public
interface to get at it, and cast the persistence manager to the
appropriate subclass in order to decide whether to accept or reject the
registration.
2. What happens if, say, there is a connection problem when we try to
roll back to the previous registration details, or if we fail to take
out the lock even (since the database is shared)?
It sounds as though your new implementation will make this a lot easier
and clearer.
Thanks again,
Rob.
Justin Matthews wrote:
> Hi Rob,
>
> Currently you would handle this by trapping all database failures in your
> persistence manager and then call ServerRegistration::reject(). All the
db
> operations are performance before the calls to
onRefresh,onRemove,onAdd,etc.
> If you reject() the registration, then ServerRegistration tries to roll
back
> the changes by removing the registration (via removeAor()) and then
> re-applying the original list that it saved (via addAor).
>
> The implementation I am working on will allow more flexible error handling
> without blocking the DUM thread, will not require roll back if the
> registration is rejected and will require fewer DB calls. I should have
> more details in a few days.
>
> Thanks,
>
> -justin
>
> -----Original Message-----
> From: resiprocate-devel-bounces@xxxxxxxxxxxxxxx
> [mailto:resiprocate-devel-bounces@xxxxxxxxxxxxxxx] On Behalf Of Robert
> Backhouse
> Sent: Thursday, March 12, 2009 9:53 AM
> To: 'resiprocate-devel'
> Subject: Re: [reSIProcate] asynchronous registration handling
>
> Hi,
>
> I have a separate but related question. In the last few days I have also
> been having trouble in this area while trying to implement a
> database-based RegistrationPersistenceManager.
>
> My problem is that there doesn't seem to be any mechanism to cope with
> an action/query failing - if there is a problem with the database
> connection, the SQL syntax, or just a random deadlock or transaction
> timeout then I can't see a way to fail the registration.
>
> I presume some people must have successfully written database backed
> RegistrationPersistenceManager implementations - can anyone explain how
> they dealt with SQL exceptions without a response to signify failure or
> a suitable exception to throw?
>
> Otherwise, if the ServerRegistration logic is being rewritten, do you
> think this would be a sensible opportunity to support failures from the
> RegistrationPersistenceManager in some way?
>
> Thanks,
>
> Rob.
>
>
>> 2009/3/9 Justin Matthews <jmatthewsr@xxxxxxxxx
>> <mailto:jmatthewsr@xxxxxxxxx>>
>>
>> Hi,
>>
>> I am looking at adding asynchronous REGISTER handling in DUM. Has anyone
>> thought about or actually implemented this? There are some notes in
>> DialogSet.cxx about moving REGISTER handling to DialogUsageManager,
>> would these mean converting REGISTER handling to be a DumFeature? Would
>> this makes things easier to post back responses to a DUM feature as
>> opposed to implementing some kind of postback to a ServerRegistration?
>>
>> Basically I need to send the DB queries off to another thread to avoid
>> blocking.
>>
>> Also, is anyone else interested in this?
>>
>> Thanks,
>>
>> justin
>
--
Robert Backhouse <robertb@xxxxxxxxxxxxx>
Software Developer
Tel: +44 (0) 845 666 7778
Fax: +44 (0) 870 163 4694
http://www.mxtelecom.com
#include "precompile.h"
#include "resip/dum/ContactInstanceRecord.hxx"
#include "rutil/Timer.hxx"
#include "resip/stack/SipMessage.hxx"
using namespace resip;
ContactInstanceRecord::ContactInstanceRecord() :
mRegExpires(0),
mLastUpdated(Timer::getTimeSecs()),
mRegId(0),
mUserInfo(0)
{
}
bool
ContactInstanceRecord::operator==(const ContactInstanceRecord& rhs) const
{
return (mRegId == rhs.mRegId &&
mInstance == rhs.mInstance &&
mContact.uri() == rhs.mContact.uri());
}
ContactInstanceRecord
ContactInstanceRecord::makeRemoveDelta(const NameAddr& contact)
{
ContactInstanceRecord c;
c.mContact = contact;
return c;
}
ContactInstanceRecord
ContactInstanceRecord::makeUpdateDelta(const NameAddr& contact,
UInt64 expires, // absolute time in secs
const SipMessage& msg)
{
ContactInstanceRecord c;
c.mContact = contact;
c.mRegExpires = expires;
c.mReceivedFrom = msg.getSource();
if (msg.exists(h_Paths))
{
c.mSipPath = msg.header(h_Paths);
}
if (contact.exists(p_Instance))
{
c.mInstance = contact.param(p_Instance);
}
if (contact.exists(p_regid))
{
c.mRegId = contact.param(p_regid);
}
// !jf! need to fill in mServerSessionId here
return c;
}
#if !defined(resip_ContactInstanceRecord_hxx)
#define resip_ContactInstanceRecord_hxx
#include <vector>
#include <deque>
#include "resip/stack/NameAddr.hxx"
#include "rutil/Data.hxx"
#include "resip/stack/Tuple.hxx"
#include "rutil/SharedPtr.hxx"
namespace resip
{
/** A single contact record, bound to an Aor during registration.
*/
class ContactInstanceRecord
{
public:
ContactInstanceRecord();
static ContactInstanceRecord makeRemoveDelta(const NameAddr& contact);
static ContactInstanceRecord makeUpdateDelta(const NameAddr& contact,
UInt64 expires, // absolute
time in secs
const SipMessage& msg);
NameAddr mContact; //!< can contain callee caps and q-values
UInt64 mRegExpires; //!< in seconds
UInt64 mLastUpdated; //!< in seconds
Tuple mReceivedFrom; //!< source transport, IP address, and port
NameAddrs mSipPath; //!< Value of SIP Path header from the request
Data mInstance; //!< From the instance parameter; usually a
UUID URI
UInt32 mRegId; //!< From regid parameter of Contact header
Data mServerSessionId; //!< if there is no SIP Path header, the
connection/session identifier
// Uri gruu; (GRUU is currently derived)
void *mUserInfo; //!< can be used to map user record
information (database record id for faster updates?)
bool operator==(const ContactInstanceRecord& rhs) const;
};
typedef std::list<ContactInstanceRecord> ContactList;
/** Used to reduce copying ContactInstanceRecord objects when processing
registration.
*/
typedef std::list<resip::SharedPtr<ContactInstanceRecord>> ContactPtrList;
/** Records a log of the database transacations that were performed when
processing a local registration using the
ServerRegistration::AsyncLocalStore.
*/
class ContactRecordTransaction
{
public:
typedef enum Operation
{
none,
update,
create,
remove,
removeAll
} Operation;
ContactRecordTransaction()
:mOp(none)
{}
ContactRecordTransaction(Operation op,
resip::SharedPtr<ContactInstanceRecord> rec)
:mOp(op),mRec(rec)
{}
Operation mOp; //!< the operation that was performed in this transaction.
/** For create & update: the newly modified record; for remove: the removed
record; for removeAll: 0.
*/
resip::SharedPtr<ContactInstanceRecord> mRec;
};
typedef std::deque<resip::SharedPtr<ContactRecordTransaction>>
ContactRecordTransactionLog;
class RegistrationBinding
{
public:
Data mAor; // canonical URI for this AOR and its
aliases
ContactList mContacts;
std::vector<Uri> mAliases;
};
struct RegistrationBindingDelta
{
Data mAor;
std::vector<ContactInstanceRecord> mInserts;
std::vector<ContactInstanceRecord> mUpdates;
std::vector<ContactInstanceRecord> mRemoves;
};
}
#endif
#if !defined(RESIP_REGISTRATIONHANDLER_HXX)
#define RESIP_REGISTRATIONHANDLER_HXX
#include "resip/dum/Handles.hxx"
#include "rutil/SharedPtr.hxx"
#include "resip/dum/ContactInstanceRecord.hxx"
namespace resip
{
class SipMessage;
class NameAddr;
class MasterProfile;
class ClientRegistrationHandler
{
public:
virtual ~ClientRegistrationHandler() { }
/// Called when registraion succeeds or each time it is sucessfully
/// refreshed.
virtual void onSuccess(ClientRegistrationHandle, const SipMessage&
response)=0;
// Called when all of my bindings have been removed
virtual void onRemoved(ClientRegistrationHandle, const SipMessage&
response) = 0;
/// call on Retry-After failure.
/// return values: -1 = fail, 0 = retry immediately, N = retry in N
seconds
virtual int onRequestRetry(ClientRegistrationHandle, int retrySeconds,
const SipMessage& response)=0;
/// Called if registration fails, usage will be destroyed (unless a
/// Registration retry interval is enabled in the Profile)
virtual void onFailure(ClientRegistrationHandle, const SipMessage&
response)=0;
};
class ServerRegistrationHandler
{
public:
virtual ~ServerRegistrationHandler() {}
/// Called when registration is refreshed
virtual void onRefresh(ServerRegistrationHandle, const SipMessage& reg)=0;
/// called when one or more specified contacts is removed
virtual void onRemove(ServerRegistrationHandle, const SipMessage& reg)=0;
/// Called when all the contacts are removed using "Contact: *"
virtual void onRemoveAll(ServerRegistrationHandle, const SipMessage&
reg)=0;
/** Called when one or more contacts are added. This is after
authentication has all succeeded */
virtual void onAdd(ServerRegistrationHandle, const SipMessage& reg)=0;
/// Called when a client queries for the list of current registrations
virtual void onQuery(ServerRegistrationHandle, const SipMessage& reg)=0;
/// When processing a REGISTER request, return the desired expires value
when processing the "Expires" header.
///@param expires Set this to the desired expiration value for the set of
contacts that do not explicitely
/// set the "expires" param.
///@param returnCode If the REGISTER should be rejected, use this return
code. A value of 423 will result in
///the Min-Expires header added to the response.
///
virtual void getGlobalExpires(const SipMessage& msg,
SharedPtr<MasterProfile> masterProfile,
UInt32 &expires, UInt32 &returnCode);
/// When processing a REGISTER request, return the desired expires value
by processing this contact's expires
/// parameter. If the expires value is not modified in this function the
global expires will be used.
/// @param expires Set this to the desired expiration value for this
contact.
/// @param returnCode If the REGISTER should be rejected, use this return
code. A value of 423 will result in
/// the Min-Expires header added to the response.
///
virtual void getContactExpires(const NameAddr &contact,
SharedPtr<MasterProfile> masterProfile,
UInt32 &expires, UInt32 &returnCode);
/** If true, the registration processing will use the async* functions
here and will not use the RegistrationPersistenceManager.
*/
virtual bool asyncProcessing(void) const
{
return false;
}
/** Called when a REGISTER is first received to retrieve the current
list. This list is then updated by the
registration logic in DUM. When the list is ready, call
asyncProvideContacts() on the specified ServerRegistration.
*/
virtual void asyncGetContacts(ServerRegistrationHandle,const Uri& aor)
{}
/** Notifies the handler to update the current contact list for the AOR
to the specified list that has been updated by
DUM's registration processing. This is normally called after the
REGISTER has been processed and accepted by the user
(ServerRegistration::accept())
*/
virtual void asyncUpdateContacts(ServerRegistrationHandle,const Uri& aor,
std::auto_ptr<ContactPtrList> modifiedContactList,
std::auto_ptr<ContactRecordTransactionLog> transactionLog)
{}
/** Notifies the handler to remove the entries specified in the contacts
parameter.
No further processing is required after receiving this message.
*/
virtual void asyncRemoveExpired(ServerRegistrationHandle,const
resip::Uri& aor,
std::auto_ptr<resip::ContactPtrList> contacts)
{}
};
}
#endif
/* ====================================================================
* The Vovida Software License, Version 1.0
*
* Copyright (c) 2000 Vovida Networks, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The names "VOCAL", "Vovida Open Communication Application Library",
* and "Vovida Open Communication Application Library (VOCAL)" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact vocal@xxxxxxxxxxx
*
* 4. Products derived from this software may not be called "VOCAL", nor
* may "VOCAL" appear in their name, without prior written
* permission of Vovida Networks, Inc.
*
* THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
* NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA
* NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
* IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*
* ====================================================================
*
* This software consists of voluntary contributions made by Vovida
* Networks, Inc. and many individuals on behalf of Vovida Networks,
* Inc. For more information on Vovida Networks, Inc., please see
* <http://www.vovida.org/>.
*
*/
#include "precompile.h"
#include "resip/stack/ExtensionParameter.hxx"
#include "resip/stack/InteropHelper.hxx"
#include "resip/stack/SipMessage.hxx"
#include "resip/dum/DialogUsageManager.hxx"
#include "resip/dum/MasterProfile.hxx"
#include "resip/dum/ServerRegistration.hxx"
#include "resip/dum/Dialog.hxx"
#include "resip/dum/RegistrationHandler.hxx"
#include "resip/dum/RegistrationPersistenceManager.hxx"
#include "rutil/Logger.hxx"
#include "rutil/Timer.hxx"
#include "rutil/WinLeakCheck.hxx"
#define RESIPROCATE_SUBSYSTEM Subsystem::DUM
using namespace resip;
ServerRegistrationHandle
ServerRegistration::getHandle()
{
return ServerRegistrationHandle(mDum, getBaseHandle().getId());
}
ServerRegistration::ServerRegistration(DialogUsageManager& dum, DialogSet&
dialogSet, const SipMessage& request)
: NonDialogUsage(dum, dialogSet),
mRequest(request),
mDidOutbound(false),
mAsyncState(asyncStateNone)
{}
ServerRegistration::~ServerRegistration()
{
mDialogSet.mServerRegistration = 0;
}
void
ServerRegistration::end()
{
}
void
ServerRegistration::accept(SipMessage& ok)
{
ok.remove(h_Contacts);
InfoLog( << "accepted a registration " << mAor );
if(mDidOutbound)
{
static Token outbound("outbound");
ok.header(h_Supporteds).push_back(outbound);
}
if (!mDum.mServerRegistrationHandler->asyncProcessing())
{
// Add all registered contacts to the message.
RegistrationPersistenceManager *database =
mDum.mRegistrationPersistenceManager;
ContactList contacts;
contacts = database->getContacts(mAor);
database->unlockRecord(mAor);
//removes expired entries from the ok msg as well as calls the database
to remove expired contacts.
processFinalOkMsg(ok,contacts);
SharedPtr<SipMessage> msg(static_cast<SipMessage*>(ok.clone()));
mDum.send(msg);
delete(this);
}
else
{
if (mAsyncState == asyncStateQueryOnly)
{
if (!mAsyncLocalStore.get())
{
assert(0);
}
else
{
//remove expired contacts from ok and also the database
//mOriginalContacts will have the current list of contacts, which
are provided asyncrhonously during
//initial REGISTER processing.
std::auto_ptr<ContactRecordTransactionLog> log;
std::auto_ptr<ContactPtrList> contacts;
mAsyncLocalStore->releaseLog(log,contacts);
if (contacts.get())
{
asyncProcessFinalOkMsg(ok,*contacts);
}
SharedPtr<SipMessage> msg(static_cast<SipMessage*>(ok.clone()));
mDum.send(msg);
delete(this);
}
}
else
{
if (!mAsyncLocalStore.get())
{
assert(0);
return;
}
//This register was accepted, but still need to apply the changes made
by this register and then
//receive a final contact list before sending the 200.
mAsyncState = asyncStateAcceptedWaitingForFinalContactList;
std::auto_ptr<ContactRecordTransactionLog> log;
std::auto_ptr<ContactPtrList> modifiedContacts;
mAsyncLocalStore->releaseLog(log,modifiedContacts);
mDum.mServerRegistrationHandler->asyncUpdateContacts(getHandle(),mAor,modifiedContacts,log);
mAsyncLocalStore->destroy(); //drop ownership of resources in the
local store.
mAsyncOkMsg =
SharedPtr<SipMessage>(static_cast<SipMessage*>(ok.clone()));
}
}
}
void
ServerRegistration::accept(int statusCode)
{
SipMessage success;
mDum.makeResponse(success, mRequest, statusCode);
accept(success);
}
void
ServerRegistration::reject(int statusCode)
{
InfoLog( << "rejected a registration " << mAor << " with statusCode=" <<
statusCode );
ServerRegistrationHandler* handler = mDum.mServerRegistrationHandler;
// First, we roll back the contact database to
// the state it was before the registration request.
// Async processing hasn't actually updated the database yet, so no need to
roll back.
if (!handler->asyncProcessing())
{
RegistrationPersistenceManager *database =
mDum.mRegistrationPersistenceManager;
database->removeAor(mAor);
database->addAor(mAor, *mOriginalContacts.get());
database->unlockRecord(mAor);
}
SharedPtr<SipMessage> failure(new SipMessage);
mDum.makeResponse(*failure, mRequest, statusCode);
failure->remove(h_Contacts);
mDum.send(failure);
delete(this);
}
void
ServerRegistration::dispatch(const SipMessage& msg)
{
DebugLog( << "got a registration" );
assert(msg.isRequest());
ServerRegistrationHandler* handler = mDum.mServerRegistrationHandler;
RegistrationPersistenceManager *database =
mDum.mRegistrationPersistenceManager;
if (!handler || (!handler->asyncProcessing() && !database))
{
// ?bwc? This is a server error; why are we sending a 4xx?
// ?jmatthewsr? Possibly because of the note in section 21.5.2 about
recognizing the method, but not supporting it?
DebugLog( << "No handler or DB - sending 405" );
SharedPtr<SipMessage> failure(new SipMessage);
mDum.makeResponse(*failure, msg, 405);
mDum.send(failure);
delete(this);
return;
}
mAor = msg.header(h_To).uri().getAorAsUri();
// Checks to see whether this scheme is valid, and supported.
if( !( (mAor.scheme()=="sip" || mAor.scheme()=="sips")
&& mDum.getMasterProfile()->isSchemeSupported(mAor.scheme()) ) )
{
DebugLog( << "Bad scheme in Aor" );
SharedPtr<SipMessage> failure(new SipMessage);
mDum.makeResponse(*failure, msg, 400);
failure->header(h_StatusLine).reason() = "Bad/unsupported scheme in To:
" + mAor.scheme();
mDum.send(failure);
delete(this);
return;
}
if (handler->asyncProcessing())
{
handler->asyncGetContacts(getHandle(),mAor);
mAsyncState = asyncStateWaitingForInitialContactList;
return;
}
processRegistration(msg);
}
void
ServerRegistration::processRegistration(const SipMessage& msg)
{
ServerRegistrationHandler* handler = mDum.mServerRegistrationHandler;
RegistrationPersistenceManager *database =
mDum.mRegistrationPersistenceManager;
enum {ADD, REMOVE, REFRESH} operation = REFRESH;
UInt32 globalExpires=3600;
UInt32 returnCode=0;
handler->getGlobalExpires(msg,mDum.getMasterProfile(),globalExpires,returnCode);
bool async = handler->asyncProcessing();
if (returnCode >= 400)
{
SharedPtr<SipMessage> failure(new SipMessage);
mDum.makeResponse(*failure, msg, returnCode);
if (423 == returnCode)
{
failure->header(h_StatusLine).reason() = "Interval Too Brief";
failure->header(h_MinExpires).value() = globalExpires;
}
mDum.send(failure);
if (!async)
{
database->unlockRecord(mAor);
}
delete(this);
return;
}
if (!async)
{
database->lockRecord(mAor);
mOriginalContacts = resip::SharedPtr<ContactList>(new ContactList);
*mOriginalContacts = database->getContacts(mAor);
}
// If no conacts are present in the request, this is simply a query.
if (!msg.exists(h_Contacts))
{
if (async)
{
mAsyncState = asyncStateQueryOnly;
}
handler->onQuery(getHandle(), msg);
return;
}
ParserContainer<NameAddr> contactList(msg.header(h_Contacts));
ParserContainer<NameAddr>::iterator i(contactList.begin());
ParserContainer<NameAddr>::iterator iEnd(contactList.end());
UInt64 now=Timer::getTimeSecs();
UInt32 expires=0;
for (; i != iEnd; ++i )
{
if(!i->isWellFormed())
{
SharedPtr<SipMessage> failure(new SipMessage);
mDum.makeResponse(*failure, msg, 400, "Malformed Contact");
mDum.send(failure);
if (!async)
{
database->unlockRecord(mAor);
}
delete(this);
return;
}
expires = globalExpires;
handler->getContactExpires(*i,mDum.getMasterProfile(),expires,returnCode);
// Check for "Contact: *" style deregistration
if (i->isAllContacts())
{
if (contactList.size() > 1 || expires != 0)
{
SharedPtr<SipMessage> failure(new SipMessage);
mDum.makeResponse(*failure, msg, 400, "Invalid use of 'Contact:
*'");
mDum.send(failure);
if (!async)
{
database->unlockRecord(mAor);
}
delete(this);
return;
}
if (!async)
{
database->removeAor(mAor);
}
else
{
mAsyncLocalStore->removeAllContacts();
mAsyncState = asyncStateWaitingForAcceptReject;
}
handler->onRemoveAll(getHandle(), msg);
return;
}
ContactInstanceRecord rec;
rec.mContact=*i;
rec.mRegExpires=(UInt64)expires+now;
if(i->exists(p_Instance))
{
rec.mInstance=i->param(p_Instance);
}
if(!msg.empty(h_Paths))
{
rec.mSipPath=msg.header(h_Paths);
}
rec.mLastUpdated=now;
bool supportsOutbound = processOutbound(*i,rec,msg);
// Check to see if this is a removal.
if (expires == 0)
{
if (operation == REFRESH)
{
operation = REMOVE;
}
if (!async)
{
database->removeContact(mAor, rec);
}
else
{
mAsyncLocalStore->removeContact(rec);
}
}
else // Otherwise, it's an addition or refresh.
{
RegistrationPersistenceManager::update_status_t status;
InfoLog (<< "Adding " << mAor << " -> " << *i );
DebugLog(<< "Contact has tuple " << rec.mReceivedFrom);
if (!async)
status = database->updateContact(mAor, rec);
else
status = mAsyncLocalStore->updateContact(rec);
if (status == RegistrationPersistenceManager::CONTACT_CREATED)
{
operation = ADD;
}
}
// !bwc! If we perform outbound processing for any Contact, we need to
// set this to true.
mDidOutbound |= supportsOutbound;
}
// The way this works is:
//
// - If no additions or removals are performed, this is a refresh
//
// - If at least one contact is removed and none are added, this
// is a removal.
//
// - If at least one contact is added, this is an addition, *even*
// *if* a contact was also removed.
//for async processing, need to wait for accept()/reject(). If accepted,
the modifications made here will be
//sent to the user via asyncUpdateContacts(). The user then returns a final
contact list, which is then processed
//and sent back in the 200.
if (async)
{
mAsyncState = asyncStateWaitingForAcceptReject;
}
switch (operation)
{
case REFRESH:
handler->onRefresh(getHandle(), msg);
break;
case REMOVE:
handler->onRemove(getHandle(), msg);
break;
case ADD:
handler->onAdd(getHandle(), msg);
break;
default:
assert(0);
}
}
void
ServerRegistration::dispatch(const DumTimeout& msg)
{
}
EncodeStream&
ServerRegistration::dump(EncodeStream& strm) const
{
strm << "ServerRegistration " << mAor;
return strm;
}
bool
ServerRegistration::processOutbound(resip::NameAddr &naddr,
ContactInstanceRecord &rec, const resip::SipMessage &msg)
{
// .bwc. If, in the end, this is true, it means all necessary conditions
// for outbound support have been met.
bool supportsOutbound=InteropHelper::getOutboundSupported();
// .bwc. We only store flow information if we have a direct flow to the
// endpoint. We do not create a flow if there is an edge-proxy, because if
// our connection to the edge proxy fails, the flow from the edge-proxy to
// the endpoint is still good, so we should not discard the registration.
bool haveDirectFlow=true;
if(supportsOutbound)
{
try
{
if(!naddr.exists(p_Instance) || !naddr.exists(p_regid))
{
DebugLog(<<"instance or reg-id missing");
supportsOutbound=false;
}
if(!msg.empty(h_Paths))
{
haveDirectFlow=false;
if(!msg.header(h_Paths).back().exists(p_ob))
{
DebugLog(<<"last Path doesn't have ob");
supportsOutbound=false;
}
}
else if(msg.header(h_Vias).size() > 1)
{
DebugLog(<<"more than one Via, and no Path");
supportsOutbound=false;
}
}
catch(resip::ParseBuffer::Exception&)
{
supportsOutbound=false;
}
}
else
{
DebugLog(<<"outbound support disabled");
}
// .bwc. The outbound processing
if(supportsOutbound)
{
rec.mRegId=naddr.param(p_regid);
if(haveDirectFlow)
{
// .bwc. We NEVER record the source if outbound is not being used.
// There is nothing we can do with this info in the non-outbound
// case that doesn't flagrantly violate spec (yet).
// (Example: If we remember this info with the intent of sending
// incoming stuff to this source directly, we end up being forced
// to record-route with a flow token to give in-dialog stuff a
// chance of working. Once we have record-routed, we have set this
// decision in stone for the rest of the dialog, preventing target
// refresh requests from working. If record-route was mutable,
// maybe it would be okay to do this, but until this is officially
// allowed, we shouldn't touch it.)
rec.mReceivedFrom=msg.getSource();
// .bwc. In the outbound case, we should fail if the connection is
// gone. No recovery should be attempted by the server.
rec.mReceivedFrom.onlyUseExistingConnection=true;
DebugLog(<<"Set rec.mReceivedFrom: " << rec.mReceivedFrom);
}
}
else if(InteropHelper::getRRTokenHackEnabled())
{
// .bwc. If we are going to do the broken thing, and record-route with
// flow-tokens every time, we enable it here. Keep in mind, this will
// either break target-refreshes (if we have a direct connection to the
// endpoint), or we have a good chance of inadvertently including a
// proxy that didn't record-route in the dialog.
// !bwc! TODO remove this once mid-dialog connection reuse is handled
// by most endpoints.
rec.mReceivedFrom=msg.getSource();
rec.mReceivedFrom.onlyUseExistingConnection=false;
DebugLog(<<"Set rec.mReceivedFrom: " << rec.mReceivedFrom);
}
return supportsOutbound;
}
void
ServerRegistration::asyncProcessFinalOkMsg(SipMessage &msg, ContactPtrList
&contacts)
{
if (contacts.size() > 0)
{
ContactPtrList::iterator it(contacts.begin());
ContactPtrList::iterator itEnd(contacts.end());
std::auto_ptr<ContactPtrList> expired;
UInt64 now=Timer::getTimeSecs();
for(;it != itEnd;++it)
{
resip::SharedPtr<ContactInstanceRecord> rec(*it);
if (!rec)
{
assert(0);
continue;
}
if (rec->mRegExpires <= now)
{
if (!expired.get())
{
expired = std::auto_ptr<ContactPtrList>(new
ContactPtrList());
}
expired->push_back(rec);
continue;
}
rec->mContact.param(p_expires) = UInt32(rec->mRegExpires -
now);
msg.header(h_Contacts).push_back(rec->mContact);
}
if (expired.get() && expired->size() > 0)
{
mDum.mServerRegistrationHandler->asyncRemoveExpired(getHandle(),mAor,expired);
}
}
}
void
ServerRegistration::processFinalOkMsg(SipMessage &msg, ContactList &contacts)
{
//build the 200Ok and remove any expired entries.
//the non-asynchronous behavior is to call the database directly, the async
behavior is to build a
//list of all expired entries and send it to the handler.
if (contacts.size() > 0)
{
ContactList::iterator it(contacts.begin());
ContactList::iterator itEnd(contacts.end());
RegistrationPersistenceManager *database =
mDum.mRegistrationPersistenceManager;
bool async = mDum.mServerRegistrationHandler->asyncProcessing();
UInt64 now=Timer::getTimeSecs();
for(;it != itEnd;++it)
{
if (it->mRegExpires <= now)
{
database->removeContact(mAor,*it);
continue;
}
it->mContact.param(p_expires) = UInt32(it->mRegExpires - now);
msg.header(h_Contacts).push_back(it->mContact);
}
}
}
bool
ServerRegistration::asyncProvideContacts(std::auto_ptr<resip::ContactPtrList>
contacts)
{
switch (mAsyncState)
{
case asyncStateWaitingForInitialContactList:
{
assert(mAsyncLocalStore.get() == 0);
mAsyncLocalStore = resip::SharedPtr<AsyncLocalStore>(new
AsyncLocalStore(contacts));
mAsyncState = asyncStateProcessingRegistration;
processRegistration(mRequest);
break;
}
case asyncStateWaitingForAcceptReject:
{
assert(0); //need to call accept() or reject(), wait for
asyncUpdateContacts(), then call this function.
return false;
}
case asyncStateAcceptedWaitingForFinalContactList:
{
mAsyncState = asyncStateProvidedFinalContacts;
asyncProcessFinalContacts(contacts);
break;
}
default:
{
assert(0);
return false;
}
}
return true;
}
void
ServerRegistration::asyncProcessFinalContacts(std::auto_ptr<resip::ContactPtrList>
contacts)
{
if (contacts.get())
{
if (!mAsyncOkMsg.get())
{
assert(0);
}
else
{
asyncProcessFinalOkMsg(*mAsyncOkMsg,*contacts);
}
}
mAsyncState = asyncStateNone;
mDum.send(mAsyncOkMsg);
mAsyncOkMsg.reset();
delete(this);
}
void
ServerRegistration::AsyncLocalStore::create(std::auto_ptr<ContactPtrList>
originalContacts)
{
mModifiedContacts = originalContacts;
mLog = std::auto_ptr<ContactRecordTransactionLog>(new
ContactRecordTransactionLog());
if (originalContacts.get())
{
mLog->resize(originalContacts->size());
}
}
void
ServerRegistration::AsyncLocalStore::destroy(void)
{
mModifiedContacts.reset();
mLog.reset();
}
RegistrationPersistenceManager::update_status_t
ServerRegistration::AsyncLocalStore::updateContact(const ContactInstanceRecord
&rec)
{
if (!mModifiedContacts.get() || !mLog.get())
{
assert(0);
return RegistrationPersistenceManager::CONTACT_UPDATED;
}
ContactPtrList::iterator it(mModifiedContacts->begin());
ContactPtrList::iterator itEnd(mModifiedContacts->end());
resip::SharedPtr<ContactRecordTransaction> logEntry;
// See if the contact is already present. We use URI matching rules here.
for (; it != itEnd; ++it)
{
if ((*it) && **it == rec)
{
**it = rec;
logEntry = resip::SharedPtr<ContactRecordTransaction>(new
ContactRecordTransaction(ContactRecordTransaction::update,*it));
mLog->push_back(logEntry);
return RegistrationPersistenceManager::CONTACT_UPDATED;
}
}
// This is a new contact, so we add it to the list.
resip::SharedPtr<ContactInstanceRecord> newRec(new
ContactInstanceRecord(rec));
logEntry = resip::SharedPtr<ContactRecordTransaction>(new
ContactRecordTransaction(ContactRecordTransaction::create,newRec));
mLog->push_back(logEntry);
mModifiedContacts->push_back(newRec);
return RegistrationPersistenceManager::CONTACT_CREATED;
}
void
ServerRegistration::AsyncLocalStore::removeContact(const ContactInstanceRecord
&rec)
{
if (!mModifiedContacts.get() || !mLog.get())
{
assert(0);
return;
}
ContactPtrList::iterator it(mModifiedContacts->begin());
ContactPtrList::iterator itEnd(mModifiedContacts->end());
// See if the contact is present. We use URI matching rules here.
for (; it != itEnd; ++it)
{
if ((*it) && **it == rec)
{
resip::SharedPtr<ContactRecordTransaction>
logEntry(resip::SharedPtr<ContactRecordTransaction>
(new
ContactRecordTransaction(ContactRecordTransaction::remove,*it)));
mLog->push_back(logEntry);
mModifiedContacts->erase(it);
return;
}
}
}
void
ServerRegistration::AsyncLocalStore::removeAllContacts(void)
{
if (!mModifiedContacts.get() || !mLog.get())
{
return;
}
resip::SharedPtr<ContactInstanceRecord> recNull;
resip::SharedPtr<ContactRecordTransaction>
logEntry(resip::SharedPtr<ContactRecordTransaction>(new
ContactRecordTransaction(ContactRecordTransaction::removeAll,recNull)));
mLog->push_back(logEntry);
mModifiedContacts->clear();
}
/* ====================================================================
* The Vovida Software License, Version 1.0
*
* Copyright (c) 2000 Vovida Networks, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The names "VOCAL", "Vovida Open Communication Application Library",
* and "Vovida Open Communication Application Library (VOCAL)" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact vocal@xxxxxxxxxxx
*
* 4. Products derived from this software may not be called "VOCAL", nor
* may "VOCAL" appear in their name, without prior written
* permission of Vovida Networks, Inc.
*
* THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
* NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA
* NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
* IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*
* ====================================================================
*
* This software consists of voluntary contributions made by Vovida
* Networks, Inc. and many individuals on behalf of Vovida Networks,
* Inc. For more information on Vovida Networks, Inc., please see
* <http://www.vovida.org/>.
*
*/
#if !defined(RESIP_SERVERREGISTRATION_HXX)
#define RESIP_SERVERREGISTRATION_HXX
#include "resip/dum/NonDialogUsage.hxx"
#include "resip/dum/RegistrationPersistenceManager.hxx"
#include "resip/stack/SipMessage.hxx"
#include "rutil/HashMap.hxx"
namespace resip
{
class ServerRegistration: public NonDialogUsage
{
public:
ServerRegistrationHandle getHandle();
/// accept a SIP registration with a specific response
void accept(SipMessage& ok);
/// accept a SIP registration with the contacts known to the DUM
void accept(int statusCode = 200);
/// reject a SIP registration
void reject(int statusCode);
virtual void end();
virtual void dispatch(const SipMessage& msg);
virtual void dispatch(const DumTimeout& timer);
virtual EncodeStream& dump(EncodeStream& strm) const;
/** Used when useAsyncProcessing() is true. Provide the current set of
contacts for this registration for
* processing. This is required during initial registration processing
to provide a local copy of the registered contacts, which
are then updated by the registration processing. At the end of the
registration processing this function must provide a
final list of contacts after any user-defined manipulation.
!CAUTION! This function must be called from the DUM thread.
*/
bool asyncProvideContacts(std::auto_ptr<resip::ContactPtrList> contacts);
protected:
virtual ~ServerRegistration();
private:
friend class DialogSet;
ServerRegistration(DialogUsageManager& dum, DialogSet& dialogSet, const
SipMessage& request);
bool processOutbound(resip::NameAddr &naddr, ContactInstanceRecord &rec,
const resip::SipMessage &msg);
SipMessage mRequest;
Uri mAor;
resip::SharedPtr<ContactList> mOriginalContacts;
bool mDidOutbound;
// disabled
ServerRegistration(const ServerRegistration&);
ServerRegistration& operator=(const ServerRegistration&);
/** States are used to keep track of asynchronous requests made to the
database.
Typical activity & state progression for a successful registration
(onQuery & reject() are slightly different):
DUM ServerRegistration
RegistrationHandler
_____________________________________________________________________________________________________
1) dispatch()/asyncStateNone--->
2)
asyncGetContacts()/asyncStateWaitingForInitialContactList--->
======================================================================================================
3)
<---asyncProvideContacts()/asyncStateProcessingRegistration
4) processRegistration()
5)
(onRefresh()|onRemove()|onAdd()|onRemoveAll()|onQuery)/asyncStateWaitingForAcceptReject--->
======================================================================================================
6) <---accept()
7)
asyncUpdateContacts/asyncStateAcceptedWaitingForFinalContactList--->
======================================================================================================
8)
<---asyncProvideContacts()/asyncStateProvidedFinalContacts
9) asyncProcessFinalContacts()/asyncStateNone
10) <---send(200Ok)
______________________________________________________________________________________________________
* onQuery processing is similar, except it immediately sends the 200Ok
after handling the accept().
* ServerRegistration::reject() does not require roll-back and the
error is immediately sent back to DUM.
* It is possible to call ServerRegistration::reject() after
calling accept() (DB failure, app failure, etc)
*/
typedef enum AsyncState
{
asyncStateNone,
asyncStateWaitingForInitialContactList, //!< Sent an asynchronous
request to get the current contact list from the DB.
asyncStateProcessingRegistration, //!< received the initial contact
list, processing the current REGISTER request and updating the contact list.
asyncStateWaitingForAcceptReject, //!< RegistrationHandler called,
waiting for accept() or reject() from user.
asyncStateAcceptedWaitingForFinalContactList, //!<
asyncUpdateContacts() has been called, waiting for final list.
asyncStateProvidedFinalContacts, //!< After receiving the final
contact list; process the accepted register and send response.
asyncStateQueryOnly //!< The REGISTER is a query, so just send back
the current list in accept().
} AsyncState;
AsyncState mAsyncState;
/** Look at the contacts provided by the incoming REGISTER.
*/
void processRegistration(const SipMessage& msg);
void asyncProcessFinalOkMsg(SipMessage &msg, ContactPtrList
&contacts);
/** Add contacs to msg, adding expires param. Also removes expired
entries from contacts and calls the
* appropriate DB functions to remove them.
*/
void processFinalOkMsg(SipMessage &msg, ContactList &contacts);
/** After the user calls accept(), ServerRegistration will apply the
local contact list and wait for a final
* contact list. Once the final list is received via
asyncProvideContacts(), this function finishes the REGISTER
* processing.
*/
void asyncProcessFinalContacts(std::auto_ptr<resip::ContactPtrList>
contacts);
/** Local datastore used to aggregate all changes to the current contact
list when using the asynchronous logic.
*/
class AsyncLocalStore
{
public:
AsyncLocalStore(std::auto_ptr<ContactPtrList> originalContacts)
{
create(originalContacts);
}
~AsyncLocalStore(void)
{
destroy();
}
/** Setup this object in preparation for updating the records.
Updates occur when processing a REGISTER
message.
*/
void create(std::auto_ptr<ContactPtrList> originalContacts);
void destroy(void);
void removeContact(const ContactInstanceRecord &rec);
/** All original contacts are to be removed, move them all to the
removed list
*/
void removeAllContacts(void);
/** Could be an addition or refresh.
*/
RegistrationPersistenceManager::update_status_t
updateContact(const ContactInstanceRecord &rec);
/** Remove the transacation log and updated contact list.
This object should be considered destroyed and
not used after releasing.
*/
void releaseLog(std::auto_ptr<ContactRecordTransactionLog>
&log, std::auto_ptr<ContactPtrList> &modifiedContacts)
{
log = mLog;
modifiedContacts = mModifiedContacts;
}
private:
std::auto_ptr<ContactRecordTransactionLog> mLog;
std::auto_ptr<ContactPtrList> mModifiedContacts;
};
resip::SharedPtr<AsyncLocalStore> mAsyncLocalStore;
/** Message stored during accept() call when waiting for final contact
list from database.
* Is eventually used as the 200Ok sent back to DUM when all is finished.
*/
resip::SharedPtr<SipMessage> mAsyncOkMsg;
};
}
#endif
/* ====================================================================
* The Vovida Software License, Version 1.0
*
* Copyright (c) 2000 Vovida Networks, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The names "VOCAL", "Vovida Open Communication Application Library",
* and "Vovida Open Communication Application Library (VOCAL)" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact vocal@xxxxxxxxxxx
*
* 4. Products derived from this software may not be called "VOCAL", nor
* may "VOCAL" appear in their name, without prior written
* permission of Vovida Networks, Inc.
*
* THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
* NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA
* NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
* IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*
* ====================================================================
*
* This software consists of voluntary contributions made by Vovida
* Networks, Inc. and many individuals on behalf of Vovida Networks,
* Inc. For more information on Vovida Networks, Inc., please see
* <http://www.vovida.org/>.
*
*/