Skip to content
Snippets Groups Projects
Select Git revision
  • 9e39ce94f7d8e553879d50ddaf4acacfcd928a81
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

validation.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    zooniverse.py 7.94 KiB
    import requests
    import json
    import io
    import getpass
    import pandas as pd
    
    from typing import Union
    from warnings import warn
    
    from panoptes_client import Panoptes, Project, Workflow
    from panoptes_client.panoptes import PanoptesAPIException
    
    from .baseConnector import BaseConnector
    
    class Zooniverse(BaseConnector):
    
        name = "zooniverse"
        archive = "zooniverse"
        entity_types = {"workflow": Workflow, "project": Project}
        category_converters = {
            "subjects": dict(metadata=json.loads, locations=json.loads),
            "classifications": dict(metadata=json.loads, annotations=json.loads),
        }
    
        def __init__(self, username: str, password: str = None):
            """Constructor.
    
            Parameters
            ----------
            username : str
                Zooniverse (panoptes) account username.
            password : str
                Zooniverse (panoptes) account password.
            """
            self.username = username
            self.password = password
            if self.password is None:
                self.password = getpass.getpass()
    
            self.panoptes = Panoptes.connect(username=self.username, password=self.password)
    
        def is_available(self, item: Union[dict, pd.Series], verbose: bool = False):
            try:
                description = self._get_entity(item).describe_export(
                    self._get_item_entry(item, "category")
                )
                if verbose:
                    print(description)
                return True
            except PanoptesAPIException:
                return False
    
        def generate(
            self, item: Union[dict, pd.Series], wait: bool = False, **read_csv_args
        ) -> Union[requests.Response, pd.DataFrame, None]:
            """Generate an export of data from the Zooniverse panoptes database
            specified by an item from the shopping basket.
    
            Parameters
            ----------
            item : Union[dict, pd.Series]
                A single item from a retrieved shopping basket - either a raw `dict`
                or a converted `pd.Series`.
            wait : bool
                If `True` blocks until the requested item has been generated.
            **read_csv_args : type
                Extra arguments passed to `pd.read_csv()` when parsing the retrieved
                data.
    
            Returns
            -------
            Union[requests.Response, pd.DataFrame, None]
                Description of returned object.
    
            """
            print("Generating requested export...")
            if wait:
                print("\t\tWaiting for generation to complete...")
            else:
                print("\t\tNot waiting for generation to complete...")
            response = self._get_entity(item).get_export(
                self._get_item_entry(item, "category"), generate=True, wait=wait
            )
            if response.ok and wait:
                return response
            else:
                return None
    
        def retrieve(
            self,
            item: Union[dict, pd.Series],
            generate: bool = False,
            wait: bool = False,
            convert_to_pandas: bool = True,
            chunked_retrieve: bool = False,
            chunk_size: int = int(1e5),
            **read_csv_args,
        ) -> Union[requests.Response, pd.DataFrame, None]:
            """Retrieve data specified by an item from the shopping basket from the
            Zooniverse panoptes database. Optionally (re)generate the requested
            data.
    
            Parameters
            ----------
            item : Union[dict, pd.Series]
                A single item from a retrieved shopping basket - either a raw `dict`
                or a converted `pd.Series`.
            generate : bool
                If `True` generate the requested data item. If the item has already
                been generated, it will be regenerated. If the item does not exist
                and `generate` is `False` a warning is shown and `None` is returned.
            wait : bool
                If `generate` is `True`, setting `wait` to `True` blocks until the
                requested item has been generated. If `generate` is `False`, `wait`
                has no effect.
            convert_to_pandas : bool
                If `True` the retrieved data are parsed into a pd.DataFrame.
            chunked_retrieve : bool
                If `True` read the requested data objects in chunks to avoid
                exhausting memory.
            chunk_size : int
                The number of lines of returned data in each chunk if
                `chunked_retrieve` is `True`.
            **read_csv_args : type
                Extra arguments passed to `pd.read_csv()` when parsing the retrieved
                data.
    
            Returns
            -------
            type
                Union[requests.Response, pd.DataFrame]
    
            """
            if self.is_available(item) and not generate:
                response = self._get_entity(item).get_export(
                    self._get_item_entry(item, "category"), generate=False, wait=wait
                )
            else:
                if not generate:
                    warn(
                        "Requested resource is not available and you have specified generate==False"
                    )
                    return None
                else:
                    print("Generating requested export...")
                    response = self.generate(item, wait)
            if response is None:
                warn("No data immediately available. Returning NoneType")
                return None
            if response.ok:
                if convert_to_pandas:
                    return (
                        self._chunked_content(
                            item, response, chunk_size=chunk_size, **read_csv_args
                        )
                        if chunked_retrieve
                        else pd.read_csv(
                            io.BytesIO(response.content),
                            converters=Zooniverse.category_converters[
                                self._get_item_entry(item, "category")
                            ],
                        )
                    )
                else:
                    return response
            else:
                return None
    
        def _chunked_content(
            self,
            item: Union[dict, pd.Series],
            response: requests.Response,
            chunk_size: int = int(1e5),
            **read_csv_args,
        ):
            response_iterator = response.iter_lines(1)
            chunk_frames = []
            nrows = read_csv_args.pop("nrows", None)
            skiprows = read_csv_args.pop("skiprows", 0)
            _ = read_csv_args.pop("header", None)
            names = read_csv_args.pop("names", None)
            while True:
                chunk = b"\n".join(
                    [line for _, line in zip(range(chunk_size), response_iterator) if line]
                )
                if len(chunk) == 0 or (
                    nrows is not None
                    and len(chunk_frames) * chunk_size - 1 > nrows + skiprows
                ):
                    # response_iterator exhausted
                    print("All data received.")
                    break
                chunk_frames.append(
                    pd.read_csv(
                        io.BytesIO(chunk),
                        converters=Zooniverse.category_converters[
                            self._get_item_entry(item, "category")
                        ],
                        header=None if len(chunk_frames) else 0,
                        names=chunk_frames[0].columns
                        if len(chunk_frames)
                        else names
                        if names is not None
                        else None,
                        **read_csv_args,
                    )
                )
            end = (skiprows + nrows) if nrows is not None else None
            return (
                pd.concat(chunk_frames, axis=0, ignore_index=True)
                .iloc[slice(skiprows, end)]
                .reset_index(drop=True)
            )
    
        def _get_entity(self, item):
            entity = Zooniverse.entity_types[self._get_item_entry(item, "catalog")].find(
                int(self._get_item_entry(item, self._catalogue_to_id_string(item)))
            )
            return entity
    
        def _get_item_entry(self, item, entry):
            if type(item) == dict:
                print(item)
                item = json.loads(item["item_data"].replace("'", '"'))
            return item.get(entry, None)
    
        def _catalogue_to_id_string(self, item):
            return self._get_item_entry(item, "catalog") + "_id"