diff --git a/RTCP/FCNP/src/fcnp_ion.cc b/RTCP/FCNP/src/fcnp_ion.cc index 8e84d4bdec860f1ff53ad62a25dd589606343282..38f5f3391371089d4057518a1cea1841f168c54a 100644 --- a/RTCP/FCNP/src/fcnp_ion.cc +++ b/RTCP/FCNP/src/fcnp_ion.cc @@ -25,6 +25,7 @@ #include "fcnp_ion.h" #include "protocol.h" +#define USE_SPIN_LOCKS #undef USE_TIMER @@ -110,7 +111,13 @@ static bool initialized[256]; // FIXME static std::vector<Handshake *> scheduledWriteRequests; static uint32_t vc0; static int fd; -static _BGP_Atomic sendMutex = {0}; + +#if defined USE_SPIN_LOCKS +static _BGP_Atomic sendMutex = {0}; +#else +static pthread_mutex_t sendMutex = PTHREAD_MUTEX_INITIALIZER; +#endif + static pthread_mutex_t scheduledRequestsLock = PTHREAD_MUTEX_INITIALIZER; static Semaphore recvSema(1); // could have used mutex, but this is slower!??? livelock??? static volatile bool stop, stopped; @@ -196,16 +203,22 @@ static void handshakeComplete(Handshake *handshake) static inline void sendPacket(_BGP_TreePtpHdr *header, const void *ptr) { - //pthread_mutex_lock(&sendMutex); +#if defined USE_SPIN_LOCKS while (!_bgp_test_and_set(&sendMutex, 1)) ; +#else + pthread_mutex_lock(&sendMutex); +#endif waitForFreeSendSlot(); _bgp_vcX_pkt_inject(&header->word, const_cast<void *>(ptr), vc0); - //pthread_mutex_unlock(&sendMutex); +#if defined USE_SPIN_LOCKS _bgp_msync(); sendMutex.atom = 0; +#else + pthread_mutex_unlock(&sendMutex); +#endif } @@ -214,18 +227,24 @@ static inline void sendPacket(_BGP_TreePtpHdr *header, const void *ptr) static inline void send16Packets(_BGP_TreePtpHdr *header, void *ptr) { - //pthread_mutex_lock(&sendMutex); +#if defined USE_SPIN_LOCKS while (!_bgp_test_and_set(&sendMutex, 1)) ; +#else + pthread_mutex_lock(&sendMutex); +#endif for (char *p = (char *) ptr, *end = p + 16 * _BGP_TREE_PKT_MAX_BYTES; p < end; p += _BGP_TREE_PKT_MAX_BYTES) { waitForFreeSendSlot(); _bgp_vcX_pkt_inject(&header->word, p, vc0); } - //pthread_mutex_unlock(&sendMutex); +#if defined USE_SPIN_LOCKS _bgp_msync(); sendMutex.atom = 0; +#else + pthread_mutex_unlock(&sendMutex); +#endif } @@ -355,8 +374,8 @@ static void *pollThread(void *) unsigned nrInterrupts = 0; - if (useInterrupts) { - while (!stop) { + while (!stop) { + if (useInterrupts) { stat.status_word = _bgp_In32((uint32_t *) (vc0 + _BGP_TRx_Sx)); if (stat.RecHdrCount == 0) { @@ -365,41 +384,22 @@ static void *pollThread(void *) read(fd, &word, sizeof word); // wait for Irq packet ++ nrInterrupts; } + } - recvSema.down(); - - if (checkForIncomingPacket()) { - _bgp_vcX_pkt_receive(&header.word, &request, vc0); - assert(header.Irq); - handleRequest(&request); - } + recvSema.down(); - recvSema.up(); + if (checkForIncomingPacket()) { + _bgp_vcX_pkt_receive(&header.word, &request, vc0); + assert(header.Irq); + handleRequest(&request); } + recvSema.up(); + } + + if (useInterrupts) { std::clog << "received " << nrInterrupts << " vc0 interrupts" << std::endl; stopped = true; - } else { - while (!stop) { - if (checkForIncomingPacket()) { - _bgp_vcX_pkt_receive(&header.word, &request, vc0); - assert(header.Irq); - handleRequest(&request); - } - - pthread_mutex_lock(&scheduledRequestsLock); - - if (scheduledWriteRequests.size() > 0) { - Handshake *handshake = scheduledWriteRequests.back(); - scheduledWriteRequests.pop_back(); - pthread_mutex_unlock(&scheduledRequestsLock); - - handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size); - handshake->writeFinished.up(); - } else { - pthread_mutex_unlock(&scheduledRequestsLock); - } - } } return 0; @@ -411,33 +411,25 @@ void IONtoCN_ZeroCopy(unsigned rankInPSet, const void *ptr, size_t size) assert(size % 16 == 0 && (size_t) ptr % 16 == 0); Handshake *handshake = &handshakes[rankInPSet][RequestPacket::ZERO_COPY_READ]; - pthread_mutex_lock(&handshake->ionRequest.mutex); + //pthread_mutex_lock(&handshake->ionRequest.mutex); while (size > 0) { handshake->cnRequest.slotFilled.down(); // handle all read requests sequentially (and definitely those from multiple // cores from the same node!) -#if 0 static pthread_mutex_t streamingSendMutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&streamingSendMutex); handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size); pthread_mutex_unlock(&streamingSendMutex); -#else - static Semaphore streamingSendSema(1); - - streamingSendSema.down(); - handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size); - streamingSendSema.up(); -#endif size -= handshake->cnRequest.packet.size; ptr = (const void *) ((const char *) ptr + handshake->cnRequest.packet.size); handshake->cnRequest.slotFree.up(); } - pthread_mutex_unlock(&handshake->ionRequest.mutex); + //pthread_mutex_unlock(&handshake->ionRequest.mutex); } @@ -446,7 +438,7 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size) assert(size % 16 == 0 && (size_t) ptr % 16 == 0); Handshake *handshake = &handshakes[rankInPSet][RequestPacket::ZERO_COPY_WRITE]; - pthread_mutex_lock(&handshake->ionRequest.mutex); + //pthread_mutex_lock(&handshake->ionRequest.mutex); while (size > 0) { handshake->ionRequest.size = size; @@ -454,17 +446,9 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size) handshake->cnRequest.slotFilled.down(); - if (useInterrupts) { - recvSema.down(); - handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size); - recvSema.up(); - } else { - pthread_mutex_lock(&scheduledRequestsLock); - scheduledWriteRequests.push_back(handshake); - pthread_mutex_unlock(&scheduledRequestsLock); - - handshake->writeFinished.down(); - } + recvSema.down(); + handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size); + recvSema.up(); size -= handshake->cnRequest.packet.size; ptr = (void *) ((char *) ptr + handshake->cnRequest.packet.size); @@ -472,7 +456,7 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size) handshake->cnRequest.slotFree.up(); } - pthread_mutex_unlock(&handshake->ionRequest.mutex); + //pthread_mutex_unlock(&handshake->ionRequest.mutex); }