Skip to content
Snippets Groups Projects
Commit 47f97380 authored by John Romein's avatar John Romein
Browse files

bug 225:

Minor speed improvements.
parent 7770e8e3
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,7 @@
#include "fcnp_ion.h"
#include "protocol.h"
#define USE_SPIN_LOCKS
#undef USE_TIMER
......@@ -110,7 +111,13 @@ static bool initialized[256]; // FIXME
static std::vector<Handshake *> scheduledWriteRequests;
static uint32_t vc0;
static int fd;
static _BGP_Atomic sendMutex = {0};
#if defined USE_SPIN_LOCKS
static _BGP_Atomic sendMutex = {0};
#else
static pthread_mutex_t sendMutex = PTHREAD_MUTEX_INITIALIZER;
#endif
static pthread_mutex_t scheduledRequestsLock = PTHREAD_MUTEX_INITIALIZER;
static Semaphore recvSema(1); // could have used mutex, but this is slower!??? livelock???
static volatile bool stop, stopped;
......@@ -196,16 +203,22 @@ static void handshakeComplete(Handshake *handshake)
static inline void sendPacket(_BGP_TreePtpHdr *header, const void *ptr)
{
//pthread_mutex_lock(&sendMutex);
#if defined USE_SPIN_LOCKS
while (!_bgp_test_and_set(&sendMutex, 1))
;
#else
pthread_mutex_lock(&sendMutex);
#endif
waitForFreeSendSlot();
_bgp_vcX_pkt_inject(&header->word, const_cast<void *>(ptr), vc0);
//pthread_mutex_unlock(&sendMutex);
#if defined USE_SPIN_LOCKS
_bgp_msync();
sendMutex.atom = 0;
#else
pthread_mutex_unlock(&sendMutex);
#endif
}
......@@ -214,18 +227,24 @@ static inline void sendPacket(_BGP_TreePtpHdr *header, const void *ptr)
static inline void send16Packets(_BGP_TreePtpHdr *header, void *ptr)
{
//pthread_mutex_lock(&sendMutex);
#if defined USE_SPIN_LOCKS
while (!_bgp_test_and_set(&sendMutex, 1))
;
#else
pthread_mutex_lock(&sendMutex);
#endif
for (char *p = (char *) ptr, *end = p + 16 * _BGP_TREE_PKT_MAX_BYTES; p < end; p += _BGP_TREE_PKT_MAX_BYTES) {
waitForFreeSendSlot();
_bgp_vcX_pkt_inject(&header->word, p, vc0);
}
//pthread_mutex_unlock(&sendMutex);
#if defined USE_SPIN_LOCKS
_bgp_msync();
sendMutex.atom = 0;
#else
pthread_mutex_unlock(&sendMutex);
#endif
}
......@@ -355,8 +374,8 @@ static void *pollThread(void *)
unsigned nrInterrupts = 0;
if (useInterrupts) {
while (!stop) {
while (!stop) {
if (useInterrupts) {
stat.status_word = _bgp_In32((uint32_t *) (vc0 + _BGP_TRx_Sx));
if (stat.RecHdrCount == 0) {
......@@ -365,41 +384,22 @@ static void *pollThread(void *)
read(fd, &word, sizeof word); // wait for Irq packet
++ nrInterrupts;
}
}
recvSema.down();
if (checkForIncomingPacket()) {
_bgp_vcX_pkt_receive(&header.word, &request, vc0);
assert(header.Irq);
handleRequest(&request);
}
recvSema.down();
recvSema.up();
if (checkForIncomingPacket()) {
_bgp_vcX_pkt_receive(&header.word, &request, vc0);
assert(header.Irq);
handleRequest(&request);
}
recvSema.up();
}
if (useInterrupts) {
std::clog << "received " << nrInterrupts << " vc0 interrupts" << std::endl;
stopped = true;
} else {
while (!stop) {
if (checkForIncomingPacket()) {
_bgp_vcX_pkt_receive(&header.word, &request, vc0);
assert(header.Irq);
handleRequest(&request);
}
pthread_mutex_lock(&scheduledRequestsLock);
if (scheduledWriteRequests.size() > 0) {
Handshake *handshake = scheduledWriteRequests.back();
scheduledWriteRequests.pop_back();
pthread_mutex_unlock(&scheduledRequestsLock);
handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size);
handshake->writeFinished.up();
} else {
pthread_mutex_unlock(&scheduledRequestsLock);
}
}
}
return 0;
......@@ -411,33 +411,25 @@ void IONtoCN_ZeroCopy(unsigned rankInPSet, const void *ptr, size_t size)
assert(size % 16 == 0 && (size_t) ptr % 16 == 0);
Handshake *handshake = &handshakes[rankInPSet][RequestPacket::ZERO_COPY_READ];
pthread_mutex_lock(&handshake->ionRequest.mutex);
//pthread_mutex_lock(&handshake->ionRequest.mutex);
while (size > 0) {
handshake->cnRequest.slotFilled.down();
// handle all read requests sequentially (and definitely those from multiple
// cores from the same node!)
#if 0
static pthread_mutex_t streamingSendMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&streamingSendMutex);
handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size);
pthread_mutex_unlock(&streamingSendMutex);
#else
static Semaphore streamingSendSema(1);
streamingSendSema.down();
handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size);
streamingSendSema.up();
#endif
size -= handshake->cnRequest.packet.size;
ptr = (const void *) ((const char *) ptr + handshake->cnRequest.packet.size);
handshake->cnRequest.slotFree.up();
}
pthread_mutex_unlock(&handshake->ionRequest.mutex);
//pthread_mutex_unlock(&handshake->ionRequest.mutex);
}
......@@ -446,7 +438,7 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size)
assert(size % 16 == 0 && (size_t) ptr % 16 == 0);
Handshake *handshake = &handshakes[rankInPSet][RequestPacket::ZERO_COPY_WRITE];
pthread_mutex_lock(&handshake->ionRequest.mutex);
//pthread_mutex_lock(&handshake->ionRequest.mutex);
while (size > 0) {
handshake->ionRequest.size = size;
......@@ -454,17 +446,9 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size)
handshake->cnRequest.slotFilled.down();
if (useInterrupts) {
recvSema.down();
handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size);
recvSema.up();
} else {
pthread_mutex_lock(&scheduledRequestsLock);
scheduledWriteRequests.push_back(handshake);
pthread_mutex_unlock(&scheduledRequestsLock);
handshake->writeFinished.down();
}
recvSema.down();
handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size);
recvSema.up();
size -= handshake->cnRequest.packet.size;
ptr = (void *) ((char *) ptr + handshake->cnRequest.packet.size);
......@@ -472,7 +456,7 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size)
handshake->cnRequest.slotFree.up();
}
pthread_mutex_unlock(&handshake->ionRequest.mutex);
//pthread_mutex_unlock(&handshake->ionRequest.mutex);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment