diff --git a/SAS/TMSS/src/migrate_momdb_to_tmss.py b/SAS/TMSS/src/migrate_momdb_to_tmss.py index 07fa5d2ffccbcf7298d5e273e6f688acc5d3149a..f0e4267fd9b0c2c65d76a2775d0d6e37cede087f 100755 --- a/SAS/TMSS/src/migrate_momdb_to_tmss.py +++ b/SAS/TMSS/src/migrate_momdb_to_tmss.py @@ -1,6 +1,4 @@ #!/usr/bin/env python3 -from tmss.wsgi import application # required to set up django, even though not explicitly used -from tmss.tmssapp import models from lofar.common import dbcredentials @@ -8,8 +6,23 @@ import logging import datetime import pymysql from optparse import OptionParser +import os +import django +import sys logger = logging.getLogger(__file__) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + + +# 'mom2id' -> 'tmss object' mapping +# (so we know what has been created already and refer to that) +mom2id_to_tmss_url_mapping = {} +stats = {"projects_skipped": 0, "projects_updated": 0, "projects_created":0, + "subtasks_skipped": 0, "subtasks_updated": 0, "subtasks_created": 0} def _execute_query(query, data=None): try: @@ -32,20 +45,38 @@ def query_project_details_from_momdb(): project.allowtriggers, mom2object.name, mom2object.description, - mom2object.mom2id + mom2object.mom2id, + resource.resourcetypeid, + resource.projectpath FROM project JOIN mom2object ON project.mom2objectid=mom2object.id + LEFT JOIN resource ON projectid=project.id WHERE resource.resourcetypeid IN (2,5,12) + AND resource.projectpath IS NOT NULL ORDER BY mom2id; """ results = _execute_query(query) - # dummy data: - # results = [{"mom2id": 42, - # "name": "dummyproject", - # "description": "fake description", - # "priority": 1234, - # "allowtriggers": True}] + # MoM resourcetypes + # + # mysql> SELECT * FROM lofar_mom_test_lsmr.resourcetype; + # +----+----------------------------------+--------------------------------------------------------------------------------+----------------+ + # | id | name | hosturi | type | + # +----+----------------------------------+--------------------------------------------------------------------------------+----------------+ + # | 1 | Lofar Observing Time | NULL | OBSERVING_TIME | + # | 2 | Lofar Storage (SARA) | srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/ | LTA_STORAGE | + # | 3 | Lofar Test Storage (SARA) | srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/test/projects/ | LTA_STORAGE | + # | 4 | Lofar Storage (TARGET) old | gsiftp://lotar1.staging.lofar/target/gpfs2/lofar/home/lofarops/ops/projects/ | LTA_STORAGE | + # | 5 | Lofar Storage (Jülich) | srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/ | LTA_STORAGE | + # | 6 | Lofar User Disk Storage (SARA) | srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data/lofar/user/disk/projects/ | LTA_STORAGE | + # | 7 | Lofar Tape Storage (Target) | srm://srm.target.rug.nl:8444/lofar/ops/projects/ | LTA_STORAGE | + # | 8 | Lofar Tape Test Storage (Target) | srm://srm.target.rug.nl:8444/lofar/ops/test/projects/ | LTA_STORAGE | + # | 9 | Lofar Disk Storage (Target) | srm://srm.target.rug.nl:8444/lofar/ops/disk/projects/ | LTA_STORAGE | + # | 10 | Lofar Disk Test Storage (Target) | srm://srm.target.rug.nl:8444/lofar/ops/disk/test/projects/ | LTA_STORAGE | + # | 11 | Lofar Processing Time | NULL | OBSERVING_TIME | + # | 12 | Lofar Storage (Poznan) | srm://lta-head.lofar.psnc.pl:8443/lofar/ops/projects/ | LTA_STORAGE | + # | 13 | Lofar Triggers | NULL | LOFAR_TRIGGERS | + # +----+----------------------------------+--------------------------------------------------------------------------------+----------------+ return results @@ -61,19 +92,37 @@ def get_project_details_from_momdb(): for mom_details in mom_results: - # create new tmss details based on MoM details + # derive values for TMSS: + + # cluster + # todo: how to deal with test/disk/Target locations? + # Add all those as clusters? Which do we want to migrate at all? + cluster_map = {2: models.Cluster.objects.get(name="SARA"), + 5: models.Cluster.objects.get(name="Jülich"), + 12: models.Cluster.objects.get(name="Poznan")} + + if mom_details['resourcetypeid']: + archive_location = cluster_map[mom_details['resourcetypeid']] + else: + archive_location = None + logger.warning('Missing archive location in MoM details, using None! mom2id=%(mom2id)s' % mom_details) + + + # create new tmss details details = {"name": mom_details['name'], - "description": mom_details['description'], - "tags": ["migrated_from_MoM"], - "priority": mom_details['priority'], + "description": "" if not mom_details['description'] else mom_details['description'], + "tags": ["migrated_from_MoM", "migration_incomplete"], + "priority_rank": mom_details['priority'], + "trigger_priority": 1000, "can_trigger": mom_details['allowtriggers'], - "private_data": True # todo: check project.releasedate and compare to now or how to determine??? + "private_data": True, # todo: check project.releasedate and compare to now or how to determine??? + "archive_subdirectory": mom_details['projectpath'], + # optional: + # "project_category":, + # "period_category":, + "archive_location": archive_location } - # alterations to comply with constraints: - if details['description'] is None: - details['description'] = '' - # add to return dict results[mom_details['mom2id']] = details @@ -81,7 +130,7 @@ def get_project_details_from_momdb(): def query_subtask_details_for_project_from_momdb(project_mom2id): - logger.info("Querying MoM database for tasks of project %s" % project_mom2id) + logger.info("Querying MoM database for subtasks of project %s" % project_mom2id) # todo: double-check the correct use of ids. What refers to a mom2id and what refers to a database entry pk does not seem systematic and is very comfusing. query = '''SELECT mom2object.mom2id, mom2object.name, mom2object.description, mom2object.mom2objecttype, status.code, lofar_pipeline.template, lofar_observation.default_template, lofar_pipeline.starttime, lofar_pipeline.endtime, @@ -104,26 +153,29 @@ def query_subtask_details_for_project_from_momdb(project_mom2id): return results -def _dummy_subtask_template(): +def _dummy_subtask_template(name): + template_name = "[%s]_dummy" % name try: - return models.SubtaskTemplate.objects.get(name='dummy') + return models.SubtaskTemplate.objects.get(name=template_name) except: - dummy_template_details = {"name": "dummy", - "description": 'Dummy Template', + dummy_template_details = {"name": template_name, + "description": "Dummy template for MoM migration, when no matching template in TMSS", "version": '1', - "schema": {}, + "schema": {"$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/empty/1#", + "$schema": "http://json-schema.org/draft-06/schema#"}, "realtime": False, "queue": False, - "tags": ["DUMMY"]} + "tags": ["DUMMY"], + "type": models.SubtaskType.objects.get(value='other')} return models.SubtaskTemplate.objects.create(**dummy_template_details) def _dummy_scheduling_set(project): dummy_scheduling_set_details = {"name": 'dummy', - "description": "Dummy scheduling unit set", + "description": "Dummy scheduling unit set for MoM migration", "tags": ["DUMMY"], - "generator_doc": "{}", + "generator_doc": {}, "project": project, "generator_template": None} @@ -131,67 +183,83 @@ def _dummy_scheduling_set(project): return models.SchedulingSet.objects.create(**dummy_scheduling_set_details) -def _dummy_scheduling_unit_template(): - dummy_scheduling_unit_template_details = {"name": "dummy", - "description": 'Dummy scheduling unit template', +def _dummy_scheduling_unit_template(name): + template_name = "[%s]_dummy" % name + try: + return models.SubtaskTemplate.objects.get(name=template_name) + except: + dummy_scheduling_unit_template_details = {"name": "dummy", + "description": "Dummy template for MoM migration, when no matching template in TMSS", "version": 'v0.314159265359', - "schema": {"mykey": "my value"}, + "schema": {"$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/empty/1#", + "$schema": "http://json-schema.org/draft-06/schema#"}, "tags": ["DUMMY"]} - return models.RunTemplate.objects.create(**dummy_scheduling_unit_template_details) + return models.SchedulingUnitTemplate.objects.create(**dummy_scheduling_unit_template_details) def _dummy_scheduling_unit_draft(scheduling_set, template): - dummy_scheduling_unit_draft_details = {"name": 'dummy', - "description": "Dummy scheduling_unit draft", + + dummy_scheduling_unit_draft_details = { + "name": 'dummy', + "description": "Dummy scheduling unit draft for MoM migration", "tags": ["DUMMY"], - "requirements_doc": "{}", - "copy_reason": models.CopyReason.objects.get(value='template'), - "generator_instance_doc": "para", - "copies": None, + "requirements_doc": {}, "scheduling_set": scheduling_set, - "generator_source": None, - "template": template} + "requirements_template": template + #optional: + #"copy_reason": models.CopyReason.objects.get(value='template'), + #"copies": None, + #"generator_instance_doc" : {}, + #"scheduling_constraints_doc": {}, + #"scheduling_constraints_template": None, + #"observation_strategy_template": None, + } - return models.RunDraft.objects.create(**dummy_scheduling_unit_draft_details) + return models.SchedulingUnitDraft.objects.create(**dummy_scheduling_unit_draft_details) def _dummy_scheduling_unit_blueprint(draft, template): dummy_scheduling_unit_blueprint_details = {"name": 'dummy', - "description": "Dummy scheduling_unit blueprint", - "tags": ["DUMMY"], - "requirements_doc": "{}", - "do_cancel": False, - "draft": draft, - "template": template} - + "description": "Dummy scheduling unit blueprint for MoM migration", + "tags": ["DUMMY"], + "requirements_doc": {}, + "do_cancel": False, + "draft": draft, + "requirements_template": template} - return models.RunBlueprint.objects.create(**dummy_scheduling_unit_blueprint_details) + return models.SchedulingUnitBlueprint.objects.create(**dummy_scheduling_unit_blueprint_details) -def _dummy_task_template(): - dummy_task_template_details = {"name": "dummy", - "description": 'Dummy work request template', - "validation_code_js": "", - "version": 'v0.314159265359', - "schema": {"mykey": "my value"}, - "tags": ["DUMMY"]} +def _dummy_task_template(name): + template_name = "[%s]_dummy" % name + try: + return models.SubtaskTemplate.objects.get(name=template_name) + except: + dummy_task_template_details = {"name": "dummy", + "description": 'Dummy task template for MoM migration, when no matching template in TMSS', + "validation_code_js": "", + "version": 'v0.314159265359', + "schema": {"$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/empty/1#", + "$schema": "http://json-schema.org/draft-06/schema#"}, + "tags": ["DUMMY"], + "type": models.TaskType.objects.get(value='other')} - return models.TaskTemplate.objects.create(**dummy_task_template_details) + return models.TaskTemplate.objects.create(**dummy_task_template_details) def _dummy_task_draft(scheduling_unit_draft, template): dummy_task_draft_details = {"name": 'dummy', - "description": "Dummy work request draft", + "description": "Dummy task draft for MoM migration", "tags": ["DUMMY"], - "requirements_doc": "{}", - "copy_reason": models.CopyReason.objects.get(value='template'), - "copies": None, + "specifications_doc": {}, + #"copy_reason": models.CopyReason.objects.get(value='template'), + #"copies": None, "scheduling_unit_draft": scheduling_unit_draft, - "template": template} + "specifications_template": template} return models.TaskDraft.objects.create(**dummy_task_draft_details) @@ -199,94 +267,89 @@ def _dummy_task_draft(scheduling_unit_draft, template): def _dummy_task_blueprint(draft, template, scheduling_unit_blueprint): dummy_task_blueprint_details = {"name": 'dummy', - "description": "Dummy work request blueprint", + "description": "Dummy task blueprint for MoM migration", "tags": ["DUMMY"], - "requirements_doc": "{}", + "specifications_doc": {}, "do_cancel": False, "draft": draft, - "template": template, + "specifications_template": template, "scheduling_unit_blueprint": scheduling_unit_blueprint} return models.TaskBlueprint.objects.create(**dummy_task_blueprint_details) -def get_subtask_details_from_momdb(project_mom2id, project): +def create_subtask_trees_for_project_in_momdb(project_mom2id, project): + """ + Migrates all observations and pipelines that belong to the given project as Subtasks to TMSS. + This also creates associated Task and SchedulingUnit drafts and blueprints in order to link the subtask to its project. + :param project_mom2id: The mom id of the project to migrate + :param project: The TMSS project object to refer to + """ + + global stats + global mom2id_to_tmss_url_mapping logger.info("Getting subtask details from MoM database") mom_results = query_subtask_details_for_project_from_momdb(project_mom2id) - results = {} + logger.info("There are %s subtasks to migrate in project name=%s" % (len(mom_results), project.name)) for mom_details in mom_results: - # different types have some info in different spots, so they end up in different columns. - # put same information in same spot to keep following code same for all tasks. - # (maybe we want to instead separate these into different queries instead or union them in SQL?) + logger.info("...now migrating subtask mom2id=%s mom2objecttype=%s" % (mom_details['mom2id'], mom_details['mom2objecttype'])) + + # derive values for TMSS + + # type and start/end times if 'OBSERVATION' in mom_details['mom2objecttype']: - type = models.SubtaskType.objects.get(value='observation') + task_type = models.SubtaskType.objects.get(value='observation') template_name = mom_details['default_template'] start_time = mom_details['obs_starttime'] - end_time = mom_details['obs_endtime'] + stop_time = mom_details['obs_endtime'] elif 'PIPELINE' in mom_details['mom2objecttype']: - type = models.SubtaskType.objects.get(value='pipeline') + task_type = models.SubtaskType.objects.get(value='pipeline') template_name = mom_details['template'] start_time = mom_details['starttime'] - end_time = mom_details['endtime'] + stop_time = mom_details['endtime'] else: - logger.warning('Unknown type %(mom2objecttype)s' % mom_details) - logger.warning('Skipping %s' % mom_details) + logger.error('Unknown type %(mom2objecttype)s - Skipping subtask mom2id=%(mom2id)s' % mom_details) + stats['subtasks_skipped'] += 1 continue - # create new tmss details (leave out stuff that might go wrong) + # timestamps - details = {"type": type, - "start_time": None, # mandatory - "stop_time": None, # mandatory - "state": None, # mandatory - "requested_state": None, # mandatory - "specification": "{}", - "task_blueprint": None, # optional, but required for project reference - "template": None, # mandatory - "tags": ["migrated_from_MoM"]} + if start_time is None: + start_time = datetime.datetime.utcfromtimestamp(0).isoformat() # not-null constraint - # timestamps + if stop_time is None: + stop_time = datetime.datetime.utcfromtimestamp(0).isoformat() # not-null constraint - if start_time is not None: - details['start_time'] = start_time - else: - details['start_time'] = datetime.datetime.utcfromtimestamp(0).isoformat() # not-null constraint - - if end_time is not None: - details['stop_time'] = end_time - else: - details['stop_time'] = datetime.datetime.utcfromtimestamp(0).isoformat() # not-null constraint - - # state + # state try: - state = models.SubtaskState.objects.get(value=mom_details['code']) - details['state'] = state - details['requested_state'] = state + code = mom_details['code'] + # todo: map mom to tmss states + state = models.SubtaskState.objects.get(value=code) except Exception as e: - logger.error("No state choice matching '%s' in tmss database! %s" % (mom_details['code'], e)) - logger.warning('Skipping %s' % mom_details) + logger.error("No SubtaskState choice matching '%(code)s' in tmss database! - Skipping subtask mom2id=%(mom2id)s" % mom_details) + stats['subtasks_skipped'] += 1 continue - # template + # template if template_name is not None: try: - details['template'] = models.SubtaskTemplate.objects.get(name=template_name) + specifications_template = models.SubtaskTemplate.objects.get(name=template_name) except Exception as e: - logger.warning("No task template matching '%s' in tmss database! Using dummy instead! %s" % (template_name, e)) + logger.warning("No SubtaskTemplate matching '%s' in tmss database! Using dummy instead! mom2id=%s" % (template_name, mom_details['mom2id'])) # todo: create a lot of templates to reflect what was used for the actual task? # todo: raise Exception (or continue) once we have proper templates for everything. - details["template"] = _dummy_subtask_template() + specifications_template = _dummy_subtask_template(template_name) else: - logger.warning('Missing template name in MoM details!') - logger.warning('Skipping %s' % mom_details) + logger.error('Missing template name in MoM details! - Skipping subtask mom2id=%(mom2id)s' % mom_details) + stats['subtasks_skipped'] += 1 continue # ---------------- @@ -297,7 +360,7 @@ def get_subtask_details_from_momdb(project_mom2id, project): scheduling_set = _dummy_scheduling_set(project) # scheduling unit template - scheduling_unit_template = _dummy_scheduling_unit_template() + scheduling_unit_template = _dummy_scheduling_unit_template(template_name) # scheduling unit draft scheduling_unit_draft = _dummy_scheduling_unit_draft(scheduling_set, scheduling_unit_template) @@ -306,40 +369,94 @@ def get_subtask_details_from_momdb(project_mom2id, project): scheduling_unit_blueprint = _dummy_scheduling_unit_blueprint(scheduling_unit_draft, scheduling_unit_template) # work request template - task_template = _dummy_task_template() + task_template = _dummy_task_template(template_name) # work request draft task_draft = _dummy_task_draft(scheduling_unit_draft, task_template) # work request blueprint - details['task_blueprint'] = _dummy_task_blueprint(task_draft, - task_template, - scheduling_unit_blueprint) + task_blueprint = _dummy_task_blueprint(task_draft, task_template, scheduling_unit_blueprint) # ---------------- - # add task mom2id and its details to return dict - results[mom_details['mom2id']] = details - return results + details = {"id": mom_details['mom2id'], + "state": state, + "specifications_doc": {}, + "task_blueprint": task_blueprint, + "specifications_template": specifications_template, + "tags": ["migrated_from_MoM", "migration_incomplete"], # todo: set complete once it is verified that all info is present + "priority": project.priority_rank, # todo: correct to derive from project? + # optional: + "start_time": start_time, + "stop_time": stop_time, + "schedule_method": models.ScheduleMethod.objects.get(value="manual"), # todo: correct? + # "created_or_updated_by_user" = None, + # "raw_feedback" = None, + # "do_cancel": None, + "cluster": None # set below + } + + subtask_qs = models.Subtask.objects.filter(id=details["id"]) + if subtask_qs.count(): + subtask_qs.update(**details) + subtask = subtask_qs.first() + logger.info("...updated existing subtask tmss id=%s" % subtask.id) + stats['subtasks_updated'] += 1 + else: + subtask = models.Subtask.objects.create(**details) + logger.info("...created new subtask tmss id=%s" % subtask.id) + stats['subtasks_created'] += 1 + + mom2id_to_tmss_url_mapping[mom_details['mom2id']] = '/subtask/%s' % subtask.id + + logger.info("...handled %s TMSS objects so far | %s" % (len(mom2id_to_tmss_url_mapping), stats)) + def main(): + """ + Migrates data from a MoM database to a TMSS database. + Existing objects in TMSS of same name or id are updated, otherwise new objects are created. + """ + + global mom2id_to_tmss_url_mapping + global stats + + # query details of all projects in MoM database project_details = get_project_details_from_momdb() + logger.info("There are %s projects to migrate" % len(project_details)) + # iterate projects for p_id, p_details in project_details.items(): - logger.info("---\nNow migrating project %s..." % p_details['name']) - project = models.Project.objects.create(**p_details) - logger.info("...created new project with tmss id %s" % project.id) + try: + logger.info("Now migrating project mom_name=%s mom2id=%s" % (p_details['name'], p_id)) + + # create or update project + project_qs = models.Project.objects.filter(name=p_details["name"]) + if project_qs.count(): + project_qs.update(**p_details) + project = project_qs.first() + logger.info("...updated existing project tmss_name=%s" % project.name) + stats["projects_updated"] += 1 + else: + project = models.Project.objects.create(**p_details) + logger.info("...created new project tmss_name=%s" % project.name) + stats["projects_created"] += 1 + + mom2id_to_tmss_url_mapping[p_id] = '/project/%s' % project - task_details = get_subtask_details_from_momdb(p_id, project) - for t_id, t_details in task_details.items(): + # create all subtasks and related objects for the project + create_subtask_trees_for_project_in_momdb(p_id, project) - logger.info("...creating new task mom2id %s" % t_id) - task = models.Subtask.objects.create(**t_details) - logger.info("...created new task with tmss id %s" % task.id) + logger.info("...done migrating project mom_name=%s mom2id=%s tmss_name=%s." % (p_details['name'], p_id, project.name)) - logger.info("...done migrating project %s." % p_details['name']) + except Exception as ex: + logger.error(ex, exc_info=True) + logger.warning("Skipping migration of project mom_name=%s mom2id=%s details=%s." % (p_details['name'], p_id, p_details)) + stats["projects_skipped"] += 1 + + logger.info("Done. Handled %s TMSS objects in total | %s" % (len(mom2id_to_tmss_url_mapping), stats)) if __name__ == "__main__": @@ -354,7 +471,7 @@ if __name__ == "__main__": global dbcreds dbcreds = dbcredentials.parse_options(options) - logger.info("Using dbcreds: %s", dbcreds.stringWithHiddenPassword()) + logger.info("Using MoM dbcreds: %s", dbcreds.stringWithHiddenPassword()) # note: this requires a config file .lofar/dbcredentials/mom.ini, with contents as such (adapt as needed): # @@ -366,5 +483,14 @@ if __name__ == "__main__": # password = mompass # database = lofar_mom_test_tmss + # set up Django + creds_name = os.environ.get('TMSS_DBCREDENTIALS', 'tmss') + os.environ['TMSS_DBCREDENTIALS'] = creds_name + tmss_dbcreds = dbcredentials.DBCredentials().get(creds_name) + logger.info("Using TMSS dbcreds: %s", tmss_dbcreds.stringWithHiddenPassword()) + + os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'lofar.sas.tmss.tmss.settings') + django.setup() + from lofar.sas.tmss.tmss.tmssapp import models # has to happen after Django setup main()