diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index bada732845fdb362daa2441d22def51add1f1aa6..2cbe35fa7274dbed0d8c7be68622f7db353ab324 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -71,7 +71,7 @@ class RADatabase: except psycopg2.IntegrityError as e: logger.error("Rolling back query=\'%s\' args=\'%s\' due to error: \'%s\'" % (query, ', '.join(str(x) for x in qargs), e)) self.conn.rollback() - return -1; + return -1 if fetch == _FETCH_ONE: return self.cursor.fetchone() @@ -191,7 +191,7 @@ class RADatabase: id = self._executeQuery(query, (mom_id, otdb_id, task_status, task_type, specification_id), fetch=_FETCH_ONE)['id'] if commit: - self.conn.commit() + self.commit() return id def deleteTask(self, task_id, commit=True): @@ -200,7 +200,7 @@ class RADatabase: self._executeQuery(query, [task_id]) if commit: - self.conn.commit() + self.commit() return self.cursor.rowcount > 0 def updateTask(self, task_id, mom_id=None, otdb_id=None, task_status=None, task_type=None, specification_id=None, commit=True): @@ -239,7 +239,7 @@ class RADatabase: self._executeQuery(query, values) if commit: - self.conn.commit() + self.commit() return self.cursor.rowcount > 0 @@ -287,13 +287,13 @@ class RADatabase: id = self._executeQuery(query, (task_id, predecessor_id), fetch=_FETCH_ONE)['id'] if commit: - self.conn.commit() + self.commit() return id def insertTaskPredecessors(self, task_id, predecessor_ids, commit=True): ids = [self.insertTaskPredecessor(task_id, predecessor_id, false) for predecessor_id in predecessor_ids] if commit: - self.conn.commit() + self.commit() return ids def getSpecifications(self): @@ -315,7 +315,7 @@ class RADatabase: id = self._executeQuery(query, (starttime, endtime, content), fetch=_FETCH_ONE)['id'] if commit: - self.conn.commit() + self.commit() return id def deleteSpecification(self, specification_id, commit=True): @@ -324,7 +324,7 @@ class RADatabase: self._executeQuery(query, [specification_id]) if commit: - self.conn.commit() + self.commit() return self.cursor.rowcount > 0 def updateSpecification(self, specification_id, starttime=None, endtime=None, content=None, commit=True): @@ -353,7 +353,7 @@ class RADatabase: self._executeQuery(query, values) if commit: - self.conn.commit() + self.commit() return self.cursor.rowcount > 0 @@ -527,7 +527,7 @@ class RADatabase: id = self._executeQuery(query, (resource_id, task_id, starttime, endtime, status, session_id, claim_size, nr_of_parts, username, user_id), fetch=_FETCH_ONE)['id'] if commit: - self.conn.commit() + self.commit() return id def deleteResourceClaim(self, resource_claim_id, commit=True): @@ -536,7 +536,7 @@ class RADatabase: self._executeQuery(query, [resource_claim_id]) if commit: - self.conn.commit() + self.commit() return self.cursor.rowcount > 0 def updateResourceClaim(self, resource_claim_id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, session_id=None, claim_size=None, nr_of_parts=None, username=None, user_id=None, commit=True): @@ -597,7 +597,7 @@ class RADatabase: self._executeQuery(query, values) if commit: - self.conn.commit() + self.commit() return self.cursor.rowcount > 0 @@ -660,10 +660,34 @@ class RADatabase: updated &= self.cursor.rowcount > 0 if commit: - self.conn.commit() + self.commit() return updated + def insertSpecificationAndTask(self, mom_id, otdb_id, task_status, task_type, starttime, endtime, content, commit=True): + ''' + Insert a new specification and task in one transaction. + Removes existing task with same otdb_id if present in the same transaction. + ''' + try: + task = self.getTask(otdb_id=otdb_id) + + if task: + # delete old specification, task, and resource claims using cascaded delete + self.deleteSpecification(task['specification_id'], False) + + specId = self.insertSpecification(starttime, endtime, content, False) + taskId = self.insertTask(mom_id, otdb_id, task_status, task_type, specId, False) + + if specId >= 0 and taskId >= 0: + if commit: + self.commit() + return {'inserted': True, 'specification_id': specId, 'task_id': taskId} + except: + self.conn.rollback() + + return {'inserted': False, 'specification_id': None, 'task_id': None} + if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', @@ -727,17 +751,28 @@ if __name__ == '__main__': #print db.updateTask(taskId, task_status='scheduled', otdb_id=723, task_type='PIPELINE') - #resultPrint(db.getSpecifications) + resultPrint(db.getSpecifications) + resultPrint(db.getTasks) + + #raw_input() #for s in db.getSpecifications(): #db.deleteSpecification(s['id']) #resultPrint(db.getSpecifications) - #specId = db.insertSpecification(datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=4), "") - #taskId = db.insertTask(1234, 5678, 600, 0, specId) + result = db.insertSpecificationAndTask(1234, 5678, 600, 0, datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=1), "", False) + print result - #resultPrint(db.getSpecifications) + resultPrint(db.getSpecifications) + resultPrint(db.getTasks) + + #raw_input() + + db.commit() + + resultPrint(db.getSpecifications) + resultPrint(db.getTasks) #claims = db.getResourceClaims() #for c in claims: