Source code for OceanColor.storage

"""Store and manage NASA data

Different backends allow for different ways to handle the data from NASA.
"""

from datetime import datetime, timedelta
import logging
import numpy as np
import os
import random
import re
import tempfile
import time
import threading

import xarray as xr

from .gsfc import read_remote_file


module_logger = logging.getLogger("OceanColor.storage")


[docs]class OceanColorDB(object): """An abstraction of NASA's Ocean Color database While OceanColorDB provides access to NASA's ocean color data, it is the backend that manages the data accessed. Currently, there is only one backend based on local files and directories. But it is planned more alternatives such as AWS S3 storage. Examples -------- >>> db = OceanColorDB(username, password) >>> db.backend = FileSystem('./') >>> ds = db['T2004006.L3m_DAY_CHL_chlor_a_4km.nc'] >>> ds.attrs Notes ----- Think about the best way to define the backend. Maybe add an optional parameter path, which if available is used to define the backend as a FileSystem. """ lock = threading.Lock() time_last_download = datetime(1970, 1, 1)
[docs] def __init__(self, username: str, password: str, download: bool = True): """Initializes OceanColorDB Parameters ---------- username: str The username registered with EarthData password: str The password associated the the username download: bool, optional Download new data when required, otherwise limits to the already available datasets. Default is true, i.e. download when necessary. """ self.username = username self.password = password self.download = download
def __getitem__(self, key): """ Maybe use BytesIO?? or ds.compute()? """ module_logger.debug("Reading from backend: {}".format(key)) try: return self.backend[key] except KeyError: module_logger.debug("{} is not on the storage".format(key)) if not self.download: module_logger.info( "{} is not available and download is off.".format(key) ) raise KeyError module_logger.debug("Downloading from Ocean Color: {}".format(key)) # Probably move this reading from remote to another function content = self._remote_content(key) # ds = xr.open_dataset(BytesIO(content)) # Seems like it can't read groups using BytesIO with tempfile.NamedTemporaryFile(mode="w+b", delete=True) as tmp: tmp.write(content) tmp.flush() ds = xr.open_dataset(tmp.name) assert ds.processing_level in ( "L2", "L3 Mapped", ), "I only handle L2 or L3 Mapped" if ds.processing_level == "L2": geo = xr.open_dataset(tmp.name, group="geophysical_data") ds = ds.merge(geo) nav = xr.open_dataset(tmp.name, group="navigation_data") ds = ds.merge(nav) # Maybe include full scan line into ds sline = xr.open_dataset(tmp.name, group="scan_line_attributes") ds["time"] = ( (sline - 1970).year.astype("datetime64[Y]") + sline.day - np.timedelta64(1, "D") + sline.msec ) ds = ds.rename({"latitude": "lat", "longitude": "lon"}) self.backend[key] = ds return ds def __contains__(self, item: str): return self.backend.__contains__(item) def backend(self): """Placeholder to reinforce the use of a backend While OceanColorDB manages the access to NASA's database and does the front end with the user, the backend actually manages the data. See Also -------- OceanColor.storage.FileSystem : A storage backend based on directories and files """ module_logger.critical( "OceanColorDB requires a backend. Consider using OceanColor.storage.FileSystem" ) raise NotImplementedError("Must define a backend for OceanColorDB") def _remote_content(self, filename: str, t_min: int = 4, t_random: int = 4): """Read a remote file with a minimum time between downloads NASA monitors the downloads and excessive activity is temporarily banned, so this function guarantees a minimum time between downloads to avoid ovoerloading NASA servers. """ self.lock.acquire() module_logger.debug("remote_content aquired lock") dt = t_min + round(random.random() * t_random, 2) next_time = self.time_last_download + timedelta(seconds=(dt)) waiting_time = max((next_time - datetime.now()).total_seconds(), 0) module_logger.debug( "Waiting {} seconds before downloading.".format(waiting_time) ) time.sleep(waiting_time) try: module_logger.info("Downloading: {}".format(filename)) content = read_remote_file(filename, self.username, self.password) finally: self.time_last_download = datetime.now() module_logger.debug("remote_content releasing lock") self.lock.release() return content
# db.backend
[docs]class FileSystem(object): """Backend for OceanColorDB based on files and directories A file system backend for OceanColorDB to save the data files in directories. Distribute the files in a directory system close to the one in the OceanColor website, otherwise it could pile more than the OS can hold in the same directory. ToDo ---- Need to create some function that understands the filename syntax so that it can extract level of processing, platform and at least year so that the files can be split in multiple subdirectories, otherwise it can blow the contents limit for the operational system. Probably around several hundreds of files in the same directory. """
[docs] def __init__(self, root: str): """Initiate a FileSystem backend Paremeters ---------- root : str Base path where to build/find the local data structure. All data is contained inside this directory. """ module_logger.debug("Using FileSystem as storage at: {}".format(root)) if not os.path.isdir(root): module_logger.critical( "Invalid path for backend.FileSystem {}".format(root) ) raise FileNotFoundError self.root = os.path.abspath(root)
def __getitem__(self, key): filename = self.path(key) try: module_logger.debug("Openning file: {}".format(filename)) ds = xr.open_dataset(filename) except FileNotFoundError: raise KeyError return ds def __setitem__(self, key, ds): assert isinstance(ds, xr.Dataset) filename = self.path(key) d = os.path.dirname(filename) if not os.path.exists(d): module_logger.debug("Creating missing directory: {}".format(d)) os.makedirs(d) # ds.to_netcdf("{}.nc".format(filename)) ds.to_netcdf(filename) def __contains__(self, key: str): # Improve this: Better handle invalid granule name (key). try: filename = self.path(key) except: return False if os.path.exists(filename): return True else: return False def path(self, filename: str): """Standard path for the given filename Ocean Color filenames follow certain standards that can be used to infer the platform, sensor, year, DOY, etc. From that information it is defined the standard directory where to store/find that file. Parameters ---------- filename: str Filename, or granule as called at NASA Examples -------- >>> f = FileSystem('/data') >>> f.path('A2019109.L3m_DAY_CHL_chlor_a_4km.nc') '/data/MODIS-Aqua/L3m/2019/109/A2019109.L3m_DAY_CHL_chlor_a_4km.nc' """ return os.path.join(self.root, Filename(filename).path)
class Filename(object): """Parse implicit information on NASA's filename NASA's data filename, and granules, follows a logical standard that can be used to infer some information, such as instrument or year of the measuremnt. This class is used in support for the FileSystem backend to guide its directory structure. """ def __init__(self, filename: str): """ Parameters ---------- filename : str A filename following NASA's OceanColor standard Examples -------- >>> f = Filename("A2019109.L3m_DAY_CHL_chlor_a_4km.nc") >>> f.mission MODIS-Aqua """ self.filename = filename self.attrs = parse_filename(filename) @property def mission(self): attrs = self.attrs if attrs["platform"] == "S": return "SeaWIFS" elif attrs["platform"] == "A": return "MODIS-Aqua" elif attrs["platform"] == "T": return "MODIS-Terra" elif attrs["platform"] == "V": if attrs["instrument"] == "JPSS1": return "VIIRS-JPSS1" elif attrs["instrument"] == "SNPP": return "VIIRS-SNPP" @property def dirname(self): path = os.path.join( self.mission, self.attrs["mode"], self.attrs["year"], self.attrs["doy"] ) return path @property def path(self): return os.path.join(self.dirname, self.filename) def parse_filename(filename: str): """Parse an OceanColor data filename There is a logical standard on the filenames and this function takes advantage of that to extract information such as date, processing level, and platform. Parameters ---------- filename : str An Ocean Color dataset filename. Returns ------- dict : A dictionary with fields such as platform, year, day of year (doy), time, mode (data processing level), and instrument. It returns None when the field is not available. Notes ----- Examples of possible files: - S2002006003729.L2_[GAC_IOP|GAC_OC|MLAC_OC].nc - S2001006.L3m_DAY_[CHL_chlor_a|CHL_chl_ocx|ZLEE_Zeu_lee]_9km.nc - A2011010000000.L2[_LAC_OC|_LAC_IOP|SST|SST4].nc - T2004006.L3m[_DAY_CHL_chlor_a|_DAY_CHL_chl_ocx]_[4|9]km.nc - V2018007000000.L2_SNPP_OC.nc - V2015009.L3m_DAY_SNPP_CHL_chlor_a_4km.nc - V2018006230000.L2_JPSS1_OC.nc """ rule = r""" (?P<platform>[S|A|T|V]) (?P<year>\d{4}) (?P<doy>\d{3}) (?P<time>\d+)? \. (?P<mode>(L2)|(L3m)) (?:_DAY)? _ (?P<instrument>(?:SNPP)|(?:JPSS1))? .*? \.nc """ output = re.match(rule, filename, re.VERBOSE).groupdict() return output