Skip to content
Snippets Groups Projects
Commit d8903dff authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

TMSS-403: update migration script to work against current TMSS state, add some...

TMSS-403: update migration script to work against current TMSS state, add some of the missing info in projects and subtasks
parent e9ca968d
No related branches found
No related tags found
1 merge request!257Resolve TMSS-403
#!/usr/bin/env python3
from tmss.wsgi import application # required to set up django, even though not explicitly used
from tmss.tmssapp import models
from lofar.common import dbcredentials
......@@ -8,8 +6,23 @@ import logging
import datetime
import pymysql
from optparse import OptionParser
import os
import django
import sys
logger = logging.getLogger(__file__)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# 'mom2id' -> 'tmss object' mapping
# (so we know what has been created already and refer to that)
mom2id_to_tmss_url_mapping = {}
stats = {"projects_skipped": 0, "projects_updated": 0, "projects_created":0,
"subtasks_skipped": 0, "subtasks_updated": 0, "subtasks_created": 0}
def _execute_query(query, data=None):
try:
......@@ -32,20 +45,38 @@ def query_project_details_from_momdb():
project.allowtriggers,
mom2object.name,
mom2object.description,
mom2object.mom2id
mom2object.mom2id,
resource.resourcetypeid,
resource.projectpath
FROM project
JOIN mom2object ON project.mom2objectid=mom2object.id
LEFT JOIN resource ON projectid=project.id WHERE resource.resourcetypeid IN (2,5,12)
AND resource.projectpath IS NOT NULL
ORDER BY mom2id;
"""
results = _execute_query(query)
# dummy data:
# results = [{"mom2id": 42,
# "name": "dummyproject",
# "description": "fake description",
# "priority": 1234,
# "allowtriggers": True}]
# MoM resourcetypes
#
# mysql> SELECT * FROM lofar_mom_test_lsmr.resourcetype;
# +----+----------------------------------+--------------------------------------------------------------------------------+----------------+
# | id | name | hosturi | type |
# +----+----------------------------------+--------------------------------------------------------------------------------+----------------+
# | 1 | Lofar Observing Time | NULL | OBSERVING_TIME |
# | 2 | Lofar Storage (SARA) | srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/ | LTA_STORAGE |
# | 3 | Lofar Test Storage (SARA) | srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/test/projects/ | LTA_STORAGE |
# | 4 | Lofar Storage (TARGET) old | gsiftp://lotar1.staging.lofar/target/gpfs2/lofar/home/lofarops/ops/projects/ | LTA_STORAGE |
# | 5 | Lofar Storage (Jülich) | srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/ | LTA_STORAGE |
# | 6 | Lofar User Disk Storage (SARA) | srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data/lofar/user/disk/projects/ | LTA_STORAGE |
# | 7 | Lofar Tape Storage (Target) | srm://srm.target.rug.nl:8444/lofar/ops/projects/ | LTA_STORAGE |
# | 8 | Lofar Tape Test Storage (Target) | srm://srm.target.rug.nl:8444/lofar/ops/test/projects/ | LTA_STORAGE |
# | 9 | Lofar Disk Storage (Target) | srm://srm.target.rug.nl:8444/lofar/ops/disk/projects/ | LTA_STORAGE |
# | 10 | Lofar Disk Test Storage (Target) | srm://srm.target.rug.nl:8444/lofar/ops/disk/test/projects/ | LTA_STORAGE |
# | 11 | Lofar Processing Time | NULL | OBSERVING_TIME |
# | 12 | Lofar Storage (Poznan) | srm://lta-head.lofar.psnc.pl:8443/lofar/ops/projects/ | LTA_STORAGE |
# | 13 | Lofar Triggers | NULL | LOFAR_TRIGGERS |
# +----+----------------------------------+--------------------------------------------------------------------------------+----------------+
return results
......@@ -61,19 +92,37 @@ def get_project_details_from_momdb():
for mom_details in mom_results:
# create new tmss details based on MoM details
# derive values for TMSS:
# cluster
# todo: how to deal with test/disk/Target locations?
# Add all those as clusters? Which do we want to migrate at all?
cluster_map = {2: models.Cluster.objects.get(name="SARA"),
5: models.Cluster.objects.get(name="Jülich"),
12: models.Cluster.objects.get(name="Poznan")}
if mom_details['resourcetypeid']:
archive_location = cluster_map[mom_details['resourcetypeid']]
else:
archive_location = None
logger.warning('Missing archive location in MoM details, using None! mom2id=%(mom2id)s' % mom_details)
# create new tmss details
details = {"name": mom_details['name'],
"description": mom_details['description'],
"tags": ["migrated_from_MoM"],
"priority": mom_details['priority'],
"description": "" if not mom_details['description'] else mom_details['description'],
"tags": ["migrated_from_MoM", "migration_incomplete"],
"priority_rank": mom_details['priority'],
"trigger_priority": 1000,
"can_trigger": mom_details['allowtriggers'],
"private_data": True # todo: check project.releasedate and compare to now or how to determine???
"private_data": True, # todo: check project.releasedate and compare to now or how to determine???
"archive_subdirectory": mom_details['projectpath'],
# optional:
# "project_category":,
# "period_category":,
"archive_location": archive_location
}
# alterations to comply with constraints:
if details['description'] is None:
details['description'] = ''
# add to return dict
results[mom_details['mom2id']] = details
......@@ -81,7 +130,7 @@ def get_project_details_from_momdb():
def query_subtask_details_for_project_from_momdb(project_mom2id):
logger.info("Querying MoM database for tasks of project %s" % project_mom2id)
logger.info("Querying MoM database for subtasks of project %s" % project_mom2id)
# todo: double-check the correct use of ids. What refers to a mom2id and what refers to a database entry pk does not seem systematic and is very comfusing.
query = '''SELECT mom2object.mom2id, mom2object.name, mom2object.description, mom2object.mom2objecttype, status.code,
lofar_pipeline.template, lofar_observation.default_template, lofar_pipeline.starttime, lofar_pipeline.endtime,
......@@ -104,26 +153,29 @@ def query_subtask_details_for_project_from_momdb(project_mom2id):
return results
def _dummy_subtask_template():
def _dummy_subtask_template(name):
template_name = "[%s]_dummy" % name
try:
return models.SubtaskTemplate.objects.get(name='dummy')
return models.SubtaskTemplate.objects.get(name=template_name)
except:
dummy_template_details = {"name": "dummy",
"description": 'Dummy Template',
dummy_template_details = {"name": template_name,
"description": "Dummy template for MoM migration, when no matching template in TMSS",
"version": '1',
"schema": {},
"schema": {"$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/empty/1#",
"$schema": "http://json-schema.org/draft-06/schema#"},
"realtime": False,
"queue": False,
"tags": ["DUMMY"]}
"tags": ["DUMMY"],
"type": models.SubtaskType.objects.get(value='other')}
return models.SubtaskTemplate.objects.create(**dummy_template_details)
def _dummy_scheduling_set(project):
dummy_scheduling_set_details = {"name": 'dummy',
"description": "Dummy scheduling unit set",
"description": "Dummy scheduling unit set for MoM migration",
"tags": ["DUMMY"],
"generator_doc": "{}",
"generator_doc": {},
"project": project,
"generator_template": None}
......@@ -131,67 +183,83 @@ def _dummy_scheduling_set(project):
return models.SchedulingSet.objects.create(**dummy_scheduling_set_details)
def _dummy_scheduling_unit_template():
dummy_scheduling_unit_template_details = {"name": "dummy",
"description": 'Dummy scheduling unit template',
def _dummy_scheduling_unit_template(name):
template_name = "[%s]_dummy" % name
try:
return models.SubtaskTemplate.objects.get(name=template_name)
except:
dummy_scheduling_unit_template_details = {"name": "dummy",
"description": "Dummy template for MoM migration, when no matching template in TMSS",
"version": 'v0.314159265359',
"schema": {"mykey": "my value"},
"schema": {"$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/empty/1#",
"$schema": "http://json-schema.org/draft-06/schema#"},
"tags": ["DUMMY"]}
return models.RunTemplate.objects.create(**dummy_scheduling_unit_template_details)
return models.SchedulingUnitTemplate.objects.create(**dummy_scheduling_unit_template_details)
def _dummy_scheduling_unit_draft(scheduling_set, template):
dummy_scheduling_unit_draft_details = {"name": 'dummy',
"description": "Dummy scheduling_unit draft",
dummy_scheduling_unit_draft_details = {
"name": 'dummy',
"description": "Dummy scheduling unit draft for MoM migration",
"tags": ["DUMMY"],
"requirements_doc": "{}",
"copy_reason": models.CopyReason.objects.get(value='template'),
"generator_instance_doc": "para",
"copies": None,
"requirements_doc": {},
"scheduling_set": scheduling_set,
"generator_source": None,
"template": template}
"requirements_template": template
#optional:
#"copy_reason": models.CopyReason.objects.get(value='template'),
#"copies": None,
#"generator_instance_doc" : {},
#"scheduling_constraints_doc": {},
#"scheduling_constraints_template": None,
#"observation_strategy_template": None,
}
return models.RunDraft.objects.create(**dummy_scheduling_unit_draft_details)
return models.SchedulingUnitDraft.objects.create(**dummy_scheduling_unit_draft_details)
def _dummy_scheduling_unit_blueprint(draft, template):
dummy_scheduling_unit_blueprint_details = {"name": 'dummy',
"description": "Dummy scheduling_unit blueprint",
"tags": ["DUMMY"],
"requirements_doc": "{}",
"do_cancel": False,
"draft": draft,
"template": template}
"description": "Dummy scheduling unit blueprint for MoM migration",
"tags": ["DUMMY"],
"requirements_doc": {},
"do_cancel": False,
"draft": draft,
"requirements_template": template}
return models.RunBlueprint.objects.create(**dummy_scheduling_unit_blueprint_details)
return models.SchedulingUnitBlueprint.objects.create(**dummy_scheduling_unit_blueprint_details)
def _dummy_task_template():
dummy_task_template_details = {"name": "dummy",
"description": 'Dummy work request template',
"validation_code_js": "",
"version": 'v0.314159265359',
"schema": {"mykey": "my value"},
"tags": ["DUMMY"]}
def _dummy_task_template(name):
template_name = "[%s]_dummy" % name
try:
return models.SubtaskTemplate.objects.get(name=template_name)
except:
dummy_task_template_details = {"name": "dummy",
"description": 'Dummy task template for MoM migration, when no matching template in TMSS',
"validation_code_js": "",
"version": 'v0.314159265359',
"schema": {"$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/empty/1#",
"$schema": "http://json-schema.org/draft-06/schema#"},
"tags": ["DUMMY"],
"type": models.TaskType.objects.get(value='other')}
return models.TaskTemplate.objects.create(**dummy_task_template_details)
return models.TaskTemplate.objects.create(**dummy_task_template_details)
def _dummy_task_draft(scheduling_unit_draft, template):
dummy_task_draft_details = {"name": 'dummy',
"description": "Dummy work request draft",
"description": "Dummy task draft for MoM migration",
"tags": ["DUMMY"],
"requirements_doc": "{}",
"copy_reason": models.CopyReason.objects.get(value='template'),
"copies": None,
"specifications_doc": {},
#"copy_reason": models.CopyReason.objects.get(value='template'),
#"copies": None,
"scheduling_unit_draft": scheduling_unit_draft,
"template": template}
"specifications_template": template}
return models.TaskDraft.objects.create(**dummy_task_draft_details)
......@@ -199,94 +267,89 @@ def _dummy_task_draft(scheduling_unit_draft, template):
def _dummy_task_blueprint(draft, template, scheduling_unit_blueprint):
dummy_task_blueprint_details = {"name": 'dummy',
"description": "Dummy work request blueprint",
"description": "Dummy task blueprint for MoM migration",
"tags": ["DUMMY"],
"requirements_doc": "{}",
"specifications_doc": {},
"do_cancel": False,
"draft": draft,
"template": template,
"specifications_template": template,
"scheduling_unit_blueprint": scheduling_unit_blueprint}
return models.TaskBlueprint.objects.create(**dummy_task_blueprint_details)
def get_subtask_details_from_momdb(project_mom2id, project):
def create_subtask_trees_for_project_in_momdb(project_mom2id, project):
"""
Migrates all observations and pipelines that belong to the given project as Subtasks to TMSS.
This also creates associated Task and SchedulingUnit drafts and blueprints in order to link the subtask to its project.
:param project_mom2id: The mom id of the project to migrate
:param project: The TMSS project object to refer to
"""
global stats
global mom2id_to_tmss_url_mapping
logger.info("Getting subtask details from MoM database")
mom_results = query_subtask_details_for_project_from_momdb(project_mom2id)
results = {}
logger.info("There are %s subtasks to migrate in project name=%s" % (len(mom_results), project.name))
for mom_details in mom_results:
# different types have some info in different spots, so they end up in different columns.
# put same information in same spot to keep following code same for all tasks.
# (maybe we want to instead separate these into different queries instead or union them in SQL?)
logger.info("...now migrating subtask mom2id=%s mom2objecttype=%s" % (mom_details['mom2id'], mom_details['mom2objecttype']))
# derive values for TMSS
# type and start/end times
if 'OBSERVATION' in mom_details['mom2objecttype']:
type = models.SubtaskType.objects.get(value='observation')
task_type = models.SubtaskType.objects.get(value='observation')
template_name = mom_details['default_template']
start_time = mom_details['obs_starttime']
end_time = mom_details['obs_endtime']
stop_time = mom_details['obs_endtime']
elif 'PIPELINE' in mom_details['mom2objecttype']:
type = models.SubtaskType.objects.get(value='pipeline')
task_type = models.SubtaskType.objects.get(value='pipeline')
template_name = mom_details['template']
start_time = mom_details['starttime']
end_time = mom_details['endtime']
stop_time = mom_details['endtime']
else:
logger.warning('Unknown type %(mom2objecttype)s' % mom_details)
logger.warning('Skipping %s' % mom_details)
logger.error('Unknown type %(mom2objecttype)s - Skipping subtask mom2id=%(mom2id)s' % mom_details)
stats['subtasks_skipped'] += 1
continue
# create new tmss details (leave out stuff that might go wrong)
# timestamps
details = {"type": type,
"start_time": None, # mandatory
"stop_time": None, # mandatory
"state": None, # mandatory
"requested_state": None, # mandatory
"specification": "{}",
"task_blueprint": None, # optional, but required for project reference
"template": None, # mandatory
"tags": ["migrated_from_MoM"]}
if start_time is None:
start_time = datetime.datetime.utcfromtimestamp(0).isoformat() # not-null constraint
# timestamps
if stop_time is None:
stop_time = datetime.datetime.utcfromtimestamp(0).isoformat() # not-null constraint
if start_time is not None:
details['start_time'] = start_time
else:
details['start_time'] = datetime.datetime.utcfromtimestamp(0).isoformat() # not-null constraint
if end_time is not None:
details['stop_time'] = end_time
else:
details['stop_time'] = datetime.datetime.utcfromtimestamp(0).isoformat() # not-null constraint
# state
# state
try:
state = models.SubtaskState.objects.get(value=mom_details['code'])
details['state'] = state
details['requested_state'] = state
code = mom_details['code']
# todo: map mom to tmss states
state = models.SubtaskState.objects.get(value=code)
except Exception as e:
logger.error("No state choice matching '%s' in tmss database! %s" % (mom_details['code'], e))
logger.warning('Skipping %s' % mom_details)
logger.error("No SubtaskState choice matching '%(code)s' in tmss database! - Skipping subtask mom2id=%(mom2id)s" % mom_details)
stats['subtasks_skipped'] += 1
continue
# template
# template
if template_name is not None:
try:
details['template'] = models.SubtaskTemplate.objects.get(name=template_name)
specifications_template = models.SubtaskTemplate.objects.get(name=template_name)
except Exception as e:
logger.warning("No task template matching '%s' in tmss database! Using dummy instead! %s" % (template_name, e))
logger.warning("No SubtaskTemplate matching '%s' in tmss database! Using dummy instead! mom2id=%s" % (template_name, mom_details['mom2id']))
# todo: create a lot of templates to reflect what was used for the actual task?
# todo: raise Exception (or continue) once we have proper templates for everything.
details["template"] = _dummy_subtask_template()
specifications_template = _dummy_subtask_template(template_name)
else:
logger.warning('Missing template name in MoM details!')
logger.warning('Skipping %s' % mom_details)
logger.error('Missing template name in MoM details! - Skipping subtask mom2id=%(mom2id)s' % mom_details)
stats['subtasks_skipped'] += 1
continue
# ----------------
......@@ -297,7 +360,7 @@ def get_subtask_details_from_momdb(project_mom2id, project):
scheduling_set = _dummy_scheduling_set(project)
# scheduling unit template
scheduling_unit_template = _dummy_scheduling_unit_template()
scheduling_unit_template = _dummy_scheduling_unit_template(template_name)
# scheduling unit draft
scheduling_unit_draft = _dummy_scheduling_unit_draft(scheduling_set, scheduling_unit_template)
......@@ -306,40 +369,94 @@ def get_subtask_details_from_momdb(project_mom2id, project):
scheduling_unit_blueprint = _dummy_scheduling_unit_blueprint(scheduling_unit_draft, scheduling_unit_template)
# work request template
task_template = _dummy_task_template()
task_template = _dummy_task_template(template_name)
# work request draft
task_draft = _dummy_task_draft(scheduling_unit_draft, task_template)
# work request blueprint
details['task_blueprint'] = _dummy_task_blueprint(task_draft,
task_template,
scheduling_unit_blueprint)
task_blueprint = _dummy_task_blueprint(task_draft, task_template, scheduling_unit_blueprint)
# ----------------
# add task mom2id and its details to return dict
results[mom_details['mom2id']] = details
return results
details = {"id": mom_details['mom2id'],
"state": state,
"specifications_doc": {},
"task_blueprint": task_blueprint,
"specifications_template": specifications_template,
"tags": ["migrated_from_MoM", "migration_incomplete"], # todo: set complete once it is verified that all info is present
"priority": project.priority_rank, # todo: correct to derive from project?
# optional:
"start_time": start_time,
"stop_time": stop_time,
"schedule_method": models.ScheduleMethod.objects.get(value="manual"), # todo: correct?
# "created_or_updated_by_user" = None,
# "raw_feedback" = None,
# "do_cancel": None,
"cluster": None # set below
}
subtask_qs = models.Subtask.objects.filter(id=details["id"])
if subtask_qs.count():
subtask_qs.update(**details)
subtask = subtask_qs.first()
logger.info("...updated existing subtask tmss id=%s" % subtask.id)
stats['subtasks_updated'] += 1
else:
subtask = models.Subtask.objects.create(**details)
logger.info("...created new subtask tmss id=%s" % subtask.id)
stats['subtasks_created'] += 1
mom2id_to_tmss_url_mapping[mom_details['mom2id']] = '/subtask/%s' % subtask.id
logger.info("...handled %s TMSS objects so far | %s" % (len(mom2id_to_tmss_url_mapping), stats))
def main():
"""
Migrates data from a MoM database to a TMSS database.
Existing objects in TMSS of same name or id are updated, otherwise new objects are created.
"""
global mom2id_to_tmss_url_mapping
global stats
# query details of all projects in MoM database
project_details = get_project_details_from_momdb()
logger.info("There are %s projects to migrate" % len(project_details))
# iterate projects
for p_id, p_details in project_details.items():
logger.info("---\nNow migrating project %s..." % p_details['name'])
project = models.Project.objects.create(**p_details)
logger.info("...created new project with tmss id %s" % project.id)
try:
logger.info("Now migrating project mom_name=%s mom2id=%s" % (p_details['name'], p_id))
# create or update project
project_qs = models.Project.objects.filter(name=p_details["name"])
if project_qs.count():
project_qs.update(**p_details)
project = project_qs.first()
logger.info("...updated existing project tmss_name=%s" % project.name)
stats["projects_updated"] += 1
else:
project = models.Project.objects.create(**p_details)
logger.info("...created new project tmss_name=%s" % project.name)
stats["projects_created"] += 1
mom2id_to_tmss_url_mapping[p_id] = '/project/%s' % project
task_details = get_subtask_details_from_momdb(p_id, project)
for t_id, t_details in task_details.items():
# create all subtasks and related objects for the project
create_subtask_trees_for_project_in_momdb(p_id, project)
logger.info("...creating new task mom2id %s" % t_id)
task = models.Subtask.objects.create(**t_details)
logger.info("...created new task with tmss id %s" % task.id)
logger.info("...done migrating project mom_name=%s mom2id=%s tmss_name=%s." % (p_details['name'], p_id, project.name))
logger.info("...done migrating project %s." % p_details['name'])
except Exception as ex:
logger.error(ex, exc_info=True)
logger.warning("Skipping migration of project mom_name=%s mom2id=%s details=%s." % (p_details['name'], p_id, p_details))
stats["projects_skipped"] += 1
logger.info("Done. Handled %s TMSS objects in total | %s" % (len(mom2id_to_tmss_url_mapping), stats))
if __name__ == "__main__":
......@@ -354,7 +471,7 @@ if __name__ == "__main__":
global dbcreds
dbcreds = dbcredentials.parse_options(options)
logger.info("Using dbcreds: %s", dbcreds.stringWithHiddenPassword())
logger.info("Using MoM dbcreds: %s", dbcreds.stringWithHiddenPassword())
# note: this requires a config file .lofar/dbcredentials/mom.ini, with contents as such (adapt as needed):
#
......@@ -366,5 +483,14 @@ if __name__ == "__main__":
# password = mompass
# database = lofar_mom_test_tmss
# set up Django
creds_name = os.environ.get('TMSS_DBCREDENTIALS', 'tmss')
os.environ['TMSS_DBCREDENTIALS'] = creds_name
tmss_dbcreds = dbcredentials.DBCredentials().get(creds_name)
logger.info("Using TMSS dbcreds: %s", tmss_dbcreds.stringWithHiddenPassword())
os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'lofar.sas.tmss.tmss.settings')
django.setup()
from lofar.sas.tmss.tmss.tmssapp import models # has to happen after Django setup
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment