Skip to content
Snippets Groups Projects
Select Git revision
  • 43127148b2422abad26cc8b1d27d833c086f0df0
  • main default protected
  • tickets/156
  • tickets/86
  • add_to_basket
5 results

zooniverse.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    zooniverse.py 7.94 KiB
    import requests
    import json
    import io
    import getpass
    import pandas as pd
    
    from typing import Union
    from warnings import warn
    
    from panoptes_client import Panoptes, Project, Workflow
    from panoptes_client.panoptes import PanoptesAPIException
    
    from .baseConnector import BaseConnector
    
    class Zooniverse(BaseConnector):
    
        name = "zooniverse"
        archive = "zooniverse"
        entity_types = {"workflow": Workflow, "project": Project}
        category_converters = {
            "subjects": dict(metadata=json.loads, locations=json.loads),
            "classifications": dict(metadata=json.loads, annotations=json.loads),
        }
    
        def __init__(self, username: str, password: str = None):
            """Constructor.
    
            Parameters
            ----------
            username : str
                Zooniverse (panoptes) account username.
            password : str
                Zooniverse (panoptes) account password.
            """
            self.username = username
            self.password = password
            if self.password is None:
                self.password = getpass.getpass()
    
            self.panoptes = Panoptes.connect(username=self.username, password=self.password)
    
        def is_available(self, item: Union[dict, pd.Series], verbose: bool = False):
            try:
                description = self._get_entity(item).describe_export(
                    self._get_item_entry(item, "category")
                )
                if verbose:
                    print(description)
                return True
            except PanoptesAPIException:
                return False
    
        def generate(
            self, item: Union[dict, pd.Series], wait: bool = False, **read_csv_args
        ) -> Union[requests.Response, pd.DataFrame, None]:
            """Generate an export of data from the Zooniverse panoptes database
            specified by an item from the shopping basket.
    
            Parameters
            ----------
            item : Union[dict, pd.Series]
                A single item from a retrieved shopping basket - either a raw `dict`
                or a converted `pd.Series`.
            wait : bool
                If `True` blocks until the requested item has been generated.
            **read_csv_args : type
                Extra arguments passed to `pd.read_csv()` when parsing the retrieved
                data.
    
            Returns
            -------
            Union[requests.Response, pd.DataFrame, None]
                Description of returned object.
    
            """
            print("Generating requested export...")
            if wait:
                print("\t\tWaiting for generation to complete...")
            else:
                print("\t\tNot waiting for generation to complete...")
            response = self._get_entity(item).get_export(
                self._get_item_entry(item, "category"), generate=True, wait=wait
            )
            if response.ok and wait:
                return response
            else:
                return None
    
        def retrieve(
            self,
            item: Union[dict, pd.Series],
            generate: bool = False,
            wait: bool = False,
            convert_to_pandas: bool = True,
            chunked_retrieve: bool = False,
            chunk_size: int = int(1e5),
            **read_csv_args,
        ) -> Union[requests.Response, pd.DataFrame, None]:
            """Retrieve data specified by an item from the shopping basket from the
            Zooniverse panoptes database. Optionally (re)generate the requested
            data.
    
            Parameters
            ----------
            item : Union[dict, pd.Series]
                A single item from a retrieved shopping basket - either a raw `dict`
                or a converted `pd.Series`.
            generate : bool
                If `True` generate the requested data item. If the item has already
                been generated, it will be regenerated. If the item does not exist
                and `generate` is `False` a warning is shown and `None` is returned.
            wait : bool
                If `generate` is `True`, setting `wait` to `True` blocks until the
                requested item has been generated. If `generate` is `False`, `wait`
                has no effect.
            convert_to_pandas : bool
                If `True` the retrieved data are parsed into a pd.DataFrame.
            chunked_retrieve : bool
                If `True` read the requested data objects in chunks to avoid
                exhausting memory.
            chunk_size : int
                The number of lines of returned data in each chunk if
                `chunked_retrieve` is `True`.
            **read_csv_args : type
                Extra arguments passed to `pd.read_csv()` when parsing the retrieved
                data.
    
            Returns
            -------
            type
                Union[requests.Response, pd.DataFrame]
    
            """
            if self.is_available(item) and not generate:
                response = self._get_entity(item).get_export(
                    self._get_item_entry(item, "category"), generate=False, wait=wait
                )
            else:
                if not generate:
                    warn(
                        "Requested resource is not available and you have specified generate==False"
                    )
                    return None
                else:
                    print("Generating requested export...")
                    response = self.generate(item, wait)
            if response is None:
                warn("No data immediately available. Returning NoneType")
                return None
            if response.ok:
                if convert_to_pandas:
                    return (
                        self._chunked_content(
                            item, response, chunk_size=chunk_size, **read_csv_args
                        )
                        if chunked_retrieve
                        else pd.read_csv(
                            io.BytesIO(response.content),
                            converters=Zooniverse.category_converters[
                                self._get_item_entry(item, "category")
                            ],
                        )
                    )
                else:
                    return response
            else:
                return None
    
        def _chunked_content(
            self,
            item: Union[dict, pd.Series],
            response: requests.Response,
            chunk_size: int = int(1e5),
            **read_csv_args,
        ):
            response_iterator = response.iter_lines(1)
            chunk_frames = []
            nrows = read_csv_args.pop("nrows", None)
            skiprows = read_csv_args.pop("skiprows", 0)
            _ = read_csv_args.pop("header", None)
            names = read_csv_args.pop("names", None)
            while True:
                chunk = b"\n".join(
                    [line for _, line in zip(range(chunk_size), response_iterator) if line]
                )
                if len(chunk) == 0 or (
                    nrows is not None
                    and len(chunk_frames) * chunk_size - 1 > nrows + skiprows
                ):
                    # response_iterator exhausted
                    print("All data received.")
                    break
                chunk_frames.append(
                    pd.read_csv(
                        io.BytesIO(chunk),
                        converters=Zooniverse.category_converters[
                            self._get_item_entry(item, "category")
                        ],
                        header=None if len(chunk_frames) else 0,
                        names=chunk_frames[0].columns
                        if len(chunk_frames)
                        else names
                        if names is not None
                        else None,
                        **read_csv_args,
                    )
                )
            end = (skiprows + nrows) if nrows is not None else None
            return (
                pd.concat(chunk_frames, axis=0, ignore_index=True)
                .iloc[slice(skiprows, end)]
                .reset_index(drop=True)
            )
    
        def _get_entity(self, item):
            entity = Zooniverse.entity_types[self._get_item_entry(item, "catalog")].find(
                int(self._get_item_entry(item, self._catalogue_to_id_string(item)))
            )
            return entity
    
        def _get_item_entry(self, item, entry):
            if type(item) == dict:
                print(item)
                item = json.loads(item["item_data"].replace("'", '"'))
            return item.get(entry, None)
    
        def _catalogue_to_id_string(self, item):
            return self._get_item_entry(item, "catalog") + "_id"