Skip to content
Snippets Groups Projects
Commit c93fbfeb authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: made transactional insertSpecificationAndTask method

parent 76788e97
No related branches found
No related tags found
No related merge requests found
......@@ -71,7 +71,7 @@ class RADatabase:
except psycopg2.IntegrityError as e:
logger.error("Rolling back query=\'%s\' args=\'%s\' due to error: \'%s\'" % (query, ', '.join(str(x) for x in qargs), e))
self.conn.rollback()
return -1;
return -1
if fetch == _FETCH_ONE:
return self.cursor.fetchone()
......@@ -191,7 +191,7 @@ class RADatabase:
id = self._executeQuery(query, (mom_id, otdb_id, task_status, task_type, specification_id), fetch=_FETCH_ONE)['id']
if commit:
self.conn.commit()
self.commit()
return id
def deleteTask(self, task_id, commit=True):
......@@ -200,7 +200,7 @@ class RADatabase:
self._executeQuery(query, [task_id])
if commit:
self.conn.commit()
self.commit()
return self.cursor.rowcount > 0
def updateTask(self, task_id, mom_id=None, otdb_id=None, task_status=None, task_type=None, specification_id=None, commit=True):
......@@ -239,7 +239,7 @@ class RADatabase:
self._executeQuery(query, values)
if commit:
self.conn.commit()
self.commit()
return self.cursor.rowcount > 0
......@@ -287,13 +287,13 @@ class RADatabase:
id = self._executeQuery(query, (task_id, predecessor_id), fetch=_FETCH_ONE)['id']
if commit:
self.conn.commit()
self.commit()
return id
def insertTaskPredecessors(self, task_id, predecessor_ids, commit=True):
ids = [self.insertTaskPredecessor(task_id, predecessor_id, false) for predecessor_id in predecessor_ids]
if commit:
self.conn.commit()
self.commit()
return ids
def getSpecifications(self):
......@@ -315,7 +315,7 @@ class RADatabase:
id = self._executeQuery(query, (starttime, endtime, content), fetch=_FETCH_ONE)['id']
if commit:
self.conn.commit()
self.commit()
return id
def deleteSpecification(self, specification_id, commit=True):
......@@ -324,7 +324,7 @@ class RADatabase:
self._executeQuery(query, [specification_id])
if commit:
self.conn.commit()
self.commit()
return self.cursor.rowcount > 0
def updateSpecification(self, specification_id, starttime=None, endtime=None, content=None, commit=True):
......@@ -353,7 +353,7 @@ class RADatabase:
self._executeQuery(query, values)
if commit:
self.conn.commit()
self.commit()
return self.cursor.rowcount > 0
......@@ -527,7 +527,7 @@ class RADatabase:
id = self._executeQuery(query, (resource_id, task_id, starttime, endtime, status, session_id, claim_size, nr_of_parts, username, user_id), fetch=_FETCH_ONE)['id']
if commit:
self.conn.commit()
self.commit()
return id
def deleteResourceClaim(self, resource_claim_id, commit=True):
......@@ -536,7 +536,7 @@ class RADatabase:
self._executeQuery(query, [resource_claim_id])
if commit:
self.conn.commit()
self.commit()
return self.cursor.rowcount > 0
def updateResourceClaim(self, resource_claim_id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, session_id=None, claim_size=None, nr_of_parts=None, username=None, user_id=None, commit=True):
......@@ -597,7 +597,7 @@ class RADatabase:
self._executeQuery(query, values)
if commit:
self.conn.commit()
self.commit()
return self.cursor.rowcount > 0
......@@ -660,10 +660,34 @@ class RADatabase:
updated &= self.cursor.rowcount > 0
if commit:
self.conn.commit()
self.commit()
return updated
def insertSpecificationAndTask(self, mom_id, otdb_id, task_status, task_type, starttime, endtime, content, commit=True):
'''
Insert a new specification and task in one transaction.
Removes existing task with same otdb_id if present in the same transaction.
'''
try:
task = self.getTask(otdb_id=otdb_id)
if task:
# delete old specification, task, and resource claims using cascaded delete
self.deleteSpecification(task['specification_id'], False)
specId = self.insertSpecification(starttime, endtime, content, False)
taskId = self.insertTask(mom_id, otdb_id, task_status, task_type, specId, False)
if specId >= 0 and taskId >= 0:
if commit:
self.commit()
return {'inserted': True, 'specification_id': specId, 'task_id': taskId}
except:
self.conn.rollback()
return {'inserted': False, 'specification_id': None, 'task_id': None}
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
......@@ -727,17 +751,28 @@ if __name__ == '__main__':
#print db.updateTask(taskId, task_status='scheduled', otdb_id=723, task_type='PIPELINE')
#resultPrint(db.getSpecifications)
resultPrint(db.getSpecifications)
resultPrint(db.getTasks)
#raw_input()
#for s in db.getSpecifications():
#db.deleteSpecification(s['id'])
#resultPrint(db.getSpecifications)
#specId = db.insertSpecification(datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=4), "")
#taskId = db.insertTask(1234, 5678, 600, 0, specId)
result = db.insertSpecificationAndTask(1234, 5678, 600, 0, datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=1), "", False)
print result
#resultPrint(db.getSpecifications)
resultPrint(db.getSpecifications)
resultPrint(db.getTasks)
#raw_input()
db.commit()
resultPrint(db.getSpecifications)
resultPrint(db.getTasks)
#claims = db.getResourceClaims()
#for c in claims:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment