diff --git a/RTCP/FCNP/src/fcnp_ion.cc b/RTCP/FCNP/src/fcnp_ion.cc index c99e35e0a4c0581ca5302cf681c648337020465a..199a75469f93c350cf18e153413966c0a74f9aac 100644 --- a/RTCP/FCNP/src/fcnp_ion.cc +++ b/RTCP/FCNP/src/fcnp_ion.cc @@ -100,11 +100,15 @@ class Handshake { }; static Handshake handshakes[256][2] __attribute__ ((aligned(16))); // FIXME: variable size +static bool useInterrupts; static bool initialized[256]; // FIXME static std::vector<Handshake *> scheduledWriteRequests; static uint32_t vc0; +static int fd; static _BGP_Atomic sendMutex = {0}; static pthread_mutex_t scheduledRequestsLock = PTHREAD_MUTEX_INITIALIZER; +static Semaphore recvSema(1); // could have used mutex, but this is slower!??? livelock??? +static volatile bool stop; // Reading the tree status words seems to be expensive. These wrappers @@ -320,36 +324,56 @@ static void handleWriteRequest(RequestPacket *request, char *ptr, size_t bytesTo } -static volatile bool stop; - - -static void *receive_thread(void *) +static void *pollThread(void *) { RequestPacket request __attribute__((aligned(16))); + unsigned nrInterrupts = 0; while (!stop) { _BGP_TreePtpHdr header; - if (checkForIncomingPacket()) { - _bgp_vcX_pkt_receive(&header.word, &request, vc0); - assert(header.Irq); - handleRequest(&request); - } + if (useInterrupts) { + if (!checkForIncomingPacket()) { + int word; - pthread_mutex_lock(&scheduledRequestsLock); + read(fd, &word, sizeof word); // wait for Irq packet + ++ nrInterrupts; + } - if (scheduledWriteRequests.size() > 0) { - Handshake *handshake = scheduledWriteRequests.back(); - scheduledWriteRequests.pop_back(); - pthread_mutex_unlock(&scheduledRequestsLock); + recvSema.down(); - handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size); - handshake->writeFinished.up(); + if (checkForIncomingPacket()) { + _bgp_vcX_pkt_receive(&header.word, &request, vc0); + assert(header.Irq); + handleRequest(&request); + } + + recvSema.up(); } else { - pthread_mutex_unlock(&scheduledRequestsLock); + if (checkForIncomingPacket()) { + _bgp_vcX_pkt_receive(&header.word, &request, vc0); + assert(header.Irq); + handleRequest(&request); + } + + pthread_mutex_lock(&scheduledRequestsLock); + + if (scheduledWriteRequests.size() > 0) { + Handshake *handshake = scheduledWriteRequests.back(); + scheduledWriteRequests.pop_back(); + pthread_mutex_unlock(&scheduledRequestsLock); + + handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size); + handshake->writeFinished.up(); + } else { + pthread_mutex_unlock(&scheduledRequestsLock); + } } } + if (useInterrupts) + std::clog << "received " << nrInterrupts << " vc0 interrupts" << std::endl; + return 0; } @@ -366,11 +390,19 @@ void IONtoCN_ZeroCopy(unsigned rankInPSet, const void *ptr, size_t size) // handle all read requests sequentially (and definitely those from multiple // cores from the same node!) +#if 0 static pthread_mutex_t streamingSendMutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&streamingSendMutex); handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size); pthread_mutex_unlock(&streamingSendMutex); +#else + static Semaphore streamingSendSema(1); + + streamingSendSema.down(); + handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size); + streamingSendSema.up(); +#endif size -= handshake->cnRequest.packet.size; ptr = (const void *) ((const char *) ptr + handshake->cnRequest.packet.size); @@ -394,11 +426,18 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size) handshake->cnRequest.slotFilled.down(); - pthread_mutex_lock(&scheduledRequestsLock); - scheduledWriteRequests.push_back(handshake); - pthread_mutex_unlock(&scheduledRequestsLock); + if (useInterrupts) { + recvSema.down(); + handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size); + recvSema.up(); + } else { + pthread_mutex_lock(&scheduledRequestsLock); + scheduledWriteRequests.push_back(handshake); + pthread_mutex_unlock(&scheduledRequestsLock); + + handshake->writeFinished.down(); + } - handshake->writeFinished.down(); size -= handshake->cnRequest.packet.size; ptr = (void *) ((char *) ptr + handshake->cnRequest.packet.size); @@ -445,7 +484,7 @@ static void setAffinity() static void openVC0() { - int fd = open("/dev/tree0", O_RDWR); + fd = open("/dev/tree0", O_RDWR); if (fd < 0) { perror("could not open /dev/tree0"); @@ -527,11 +566,16 @@ static pthread_t thread; void init(bool enableInterrupts) { + if (enableInterrupts) + std::clog << "Warning: FCNP with interrupts is not stable!" << std::endl; + + useInterrupts = enableInterrupts; + //setAffinity(); openVC0(); drainFIFO(); - if (pthread_create(&thread, 0, receive_thread, 0) != 0) { + if (pthread_create(&thread, 0, pollThread, 0) != 0) { perror("pthread_create"); exit(1); }