Skip to content
Snippets Groups Projects
Commit 83675e4d authored by John Romein's avatar John Romein
Browse files

bug 225:

Merged interrupt code.  It is not stable, and does not yet terminate.
Use with care.
parent 39ceef4f
No related branches found
No related tags found
No related merge requests found
...@@ -100,11 +100,15 @@ class Handshake { ...@@ -100,11 +100,15 @@ class Handshake {
}; };
static Handshake handshakes[256][2] __attribute__ ((aligned(16))); // FIXME: variable size static Handshake handshakes[256][2] __attribute__ ((aligned(16))); // FIXME: variable size
static bool useInterrupts;
static bool initialized[256]; // FIXME static bool initialized[256]; // FIXME
static std::vector<Handshake *> scheduledWriteRequests; static std::vector<Handshake *> scheduledWriteRequests;
static uint32_t vc0; static uint32_t vc0;
static int fd;
static _BGP_Atomic sendMutex = {0}; static _BGP_Atomic sendMutex = {0};
static pthread_mutex_t scheduledRequestsLock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t scheduledRequestsLock = PTHREAD_MUTEX_INITIALIZER;
static Semaphore recvSema(1); // could have used mutex, but this is slower!??? livelock???
static volatile bool stop;
// Reading the tree status words seems to be expensive. These wrappers // Reading the tree status words seems to be expensive. These wrappers
...@@ -320,36 +324,56 @@ static void handleWriteRequest(RequestPacket *request, char *ptr, size_t bytesTo ...@@ -320,36 +324,56 @@ static void handleWriteRequest(RequestPacket *request, char *ptr, size_t bytesTo
} }
static volatile bool stop; static void *pollThread(void *)
static void *receive_thread(void *)
{ {
RequestPacket request __attribute__((aligned(16))); RequestPacket request __attribute__((aligned(16)));
unsigned nrInterrupts = 0;
while (!stop) { while (!stop) {
_BGP_TreePtpHdr header; _BGP_TreePtpHdr header;
if (checkForIncomingPacket()) { if (useInterrupts) {
_bgp_vcX_pkt_receive(&header.word, &request, vc0); if (!checkForIncomingPacket()) {
assert(header.Irq); int word;
handleRequest(&request);
}
pthread_mutex_lock(&scheduledRequestsLock); read(fd, &word, sizeof word); // wait for Irq packet
++ nrInterrupts;
}
if (scheduledWriteRequests.size() > 0) { recvSema.down();
Handshake *handshake = scheduledWriteRequests.back();
scheduledWriteRequests.pop_back();
pthread_mutex_unlock(&scheduledRequestsLock);
handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size); if (checkForIncomingPacket()) {
handshake->writeFinished.up(); _bgp_vcX_pkt_receive(&header.word, &request, vc0);
assert(header.Irq);
handleRequest(&request);
}
recvSema.up();
} else { } else {
pthread_mutex_unlock(&scheduledRequestsLock); if (checkForIncomingPacket()) {
_bgp_vcX_pkt_receive(&header.word, &request, vc0);
assert(header.Irq);
handleRequest(&request);
}
pthread_mutex_lock(&scheduledRequestsLock);
if (scheduledWriteRequests.size() > 0) {
Handshake *handshake = scheduledWriteRequests.back();
scheduledWriteRequests.pop_back();
pthread_mutex_unlock(&scheduledRequestsLock);
handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size);
handshake->writeFinished.up();
} else {
pthread_mutex_unlock(&scheduledRequestsLock);
}
} }
} }
if (useInterrupts)
std::clog << "received " << nrInterrupts << " vc0 interrupts" << std::endl;
return 0; return 0;
} }
...@@ -366,11 +390,19 @@ void IONtoCN_ZeroCopy(unsigned rankInPSet, const void *ptr, size_t size) ...@@ -366,11 +390,19 @@ void IONtoCN_ZeroCopy(unsigned rankInPSet, const void *ptr, size_t size)
// handle all read requests sequentially (and definitely those from multiple // handle all read requests sequentially (and definitely those from multiple
// cores from the same node!) // cores from the same node!)
#if 0
static pthread_mutex_t streamingSendMutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t streamingSendMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&streamingSendMutex); pthread_mutex_lock(&streamingSendMutex);
handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size); handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size);
pthread_mutex_unlock(&streamingSendMutex); pthread_mutex_unlock(&streamingSendMutex);
#else
static Semaphore streamingSendSema(1);
streamingSendSema.down();
handleReadRequest(&handshake->cnRequest.packet, static_cast<const char *>(ptr), size);
streamingSendSema.up();
#endif
size -= handshake->cnRequest.packet.size; size -= handshake->cnRequest.packet.size;
ptr = (const void *) ((const char *) ptr + handshake->cnRequest.packet.size); ptr = (const void *) ((const char *) ptr + handshake->cnRequest.packet.size);
...@@ -394,11 +426,18 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size) ...@@ -394,11 +426,18 @@ void CNtoION_ZeroCopy(unsigned rankInPSet, void *ptr, size_t size)
handshake->cnRequest.slotFilled.down(); handshake->cnRequest.slotFilled.down();
pthread_mutex_lock(&scheduledRequestsLock); if (useInterrupts) {
scheduledWriteRequests.push_back(handshake); recvSema.down();
pthread_mutex_unlock(&scheduledRequestsLock); handleWriteRequest(&handshake->cnRequest.packet, handshake->ionRequest.ptr, handshake->ionRequest.size);
recvSema.up();
} else {
pthread_mutex_lock(&scheduledRequestsLock);
scheduledWriteRequests.push_back(handshake);
pthread_mutex_unlock(&scheduledRequestsLock);
handshake->writeFinished.down();
}
handshake->writeFinished.down();
size -= handshake->cnRequest.packet.size; size -= handshake->cnRequest.packet.size;
ptr = (void *) ((char *) ptr + handshake->cnRequest.packet.size); ptr = (void *) ((char *) ptr + handshake->cnRequest.packet.size);
...@@ -445,7 +484,7 @@ static void setAffinity() ...@@ -445,7 +484,7 @@ static void setAffinity()
static void openVC0() static void openVC0()
{ {
int fd = open("/dev/tree0", O_RDWR); fd = open("/dev/tree0", O_RDWR);
if (fd < 0) { if (fd < 0) {
perror("could not open /dev/tree0"); perror("could not open /dev/tree0");
...@@ -527,11 +566,16 @@ static pthread_t thread; ...@@ -527,11 +566,16 @@ static pthread_t thread;
void init(bool enableInterrupts) void init(bool enableInterrupts)
{ {
if (enableInterrupts)
std::clog << "Warning: FCNP with interrupts is not stable!" << std::endl;
useInterrupts = enableInterrupts;
//setAffinity(); //setAffinity();
openVC0(); openVC0();
drainFIFO(); drainFIFO();
if (pthread_create(&thread, 0, receive_thread, 0) != 0) { if (pthread_create(&thread, 0, pollThread, 0) != 0) {
perror("pthread_create"); perror("pthread_create");
exit(1); exit(1);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment