Source code for OceanColor.inrange

"""Search for data in range (time and space)

Resources to extract the satellite pixels that are in range (time and space)
of given waypoints.
"""

import logging
import threading, queue
import os
import time
from typing import Any, Dict, Optional, Sequence

import numpy as np
import pandas as pd
from pyproj import Geod

from .cmr import bloom_filter
from .storage import OceanColorDB, FileSystem

module_logger = logging.getLogger("OceanColor.inrange")

try:
    from loky import ProcessPoolExecutor
    LOKY_AVAILABLE = True
    module_logger.debug("Will use package loky to search in parallel.")
except:
    LOKY_AVAILABLE = False
    module_logger.info("Missing package loky. Falling back to threading.")


[docs]class InRange(object): """Search and fetch Ocean Color pixels within range of given waypoints The satellite files are scanned in parallel in the background and checked against the given waypoints, so that it searches for the next matchup in advance before it is actually requested. Examples -------- >>> track = DataFrame([ {"time": datetime64("2016-09-01 10:00:00"), "lat": 35.6, "lon": -126.81}, {"time": datetime64("2016-09-01 22:00:00"), "lat": 34, "lon": -126} ]) >>> engine = InRange(os.getenv("NASA_USERNAME"), os.getenv("NASA_PASSWORD"), './', npes=3) >>> engine.search(track, sensor="aqua", dtype="L3m", dt_tol=timedelta64(12, 'h'), dL_tol=12e3) >>> for m in engine: >>> print(m) """
[docs] def __init__(self, username, password, path="./", npes=None): """ Parameters ---------- username : str NASA's EarthData username password : str NASA's EarthData password path : str, optional Path to save locally NASA's data files npes : int, optional Number of maximum parallel jobs """ module_logger.info("Initializing inrange.InRange") if npes is None: npes = 3 self.npes = npes module_logger.debug("Initializing searching engine with npes={}".format(npes)) self.queue = queue.Queue(int(3 * npes)) module_logger.debug("Initializing searching engine with a queue size {}".format(npes)) self.db = OceanColorDB(username, password) self.db.backend = FileSystem(path)
def __iter__(self): return self def __next__(self): return self.next() def next(self): output = self.queue.get() if isinstance(output, str) and (output == "END"): raise StopIteration return output def search(self, track, sensor, dtype, dt_tol, dL_tol): """Initiate a new search in the background Parameters ---------- track: sensor: dtype: dt_tol: dL_tol: """ module_logger.debug("Searching for matchups.") if LOKY_AVAILABLE: scanner = self.scanner module_logger.debug("Scanning in parallel with loky.") else: scanner = self.scanner_threading module_logger.debug("Scanning with threading.") parent = threading.current_thread() self.worker = threading.Thread( target=scanner, args=(self.queue, parent, self.npes, track, sensor, dtype, dt_tol, dL_tol), ) module_logger.debug("Starting scanner worker.") self.worker.start() def scanner_threading(self, queue, parent, npes, track, sensor, dtype, dt_tol, dL_tol): timeout = 900 module_logger.debug("Scanner, pid: {}".format(os.getpid())) filenames = bloom_filter(track, sensor, dtype, dt_tol, dL_tol) module_logger.debug("Finished bloom filter") results = [] for f in filenames: module_logger.info("Scanning: {}".format(f)) if (len(results) >= npes) and parent.is_alive(): idx = [r.is_alive() for r in results] if np.all(idx): r = results.pop(0) module_logger.debug("Waiting for {}".format(r.name)) else: r = results.pop(idx.index(False)) r.join() module_logger.debug("Finished {}".format(r.name)) module_logger.debug("Getting {}".format(f)) ds = self.db[f].compute() module_logger.debug("Launching search on {}".format(f)) if not parent.is_alive(): return results.append( threading.Thread( target=inrange, args=(track, ds, dL_tol, dt_tol, queue) ) ) results[-1].start() for r in results: if not parent.is_alive(): return r.join() module_logger.debug("Finished {}".format(r.name)) module_logger.debug("Finished scanning all potential matchups.") queue.put("END") def scanner(self, queue, parent, npes, track, sensor, dtype, dt_tol, dL_tol): timeout = 900 module_logger.debug("Scanner, pid: {}".format(os.getpid())) filenames = bloom_filter(track, sensor, dtype, dt_tol, dL_tol) module_logger.debug("Finished bloom filter") with ProcessPoolExecutor(max_workers=npes, timeout=timeout) as executor: results = [] for f in filenames: module_logger.info("Scanning: {}".format(f)) if (len(results) >= npes) and parent.is_alive(): idx = [r.done() for r in results] while not np.any(idx): time.sleep(1) idx = [r.done() for r in results] tmp = results.pop(idx.index(True)).result() module_logger.debug("Finished reading another file") if not tmp.empty: module_logger.warning("Found {} matchs".format(len(tmp))) queue.put(tmp) module_logger.debug("Getting {}".format(f)) ds = self.db[f].compute() if not parent.is_alive(): return module_logger.debug("Submitting a new inrange process") results.append(executor.submit(inrange, track, ds, dL_tol, dt_tol)) for tmp in (r.result(timeout) for r in results): if not parent.is_alive(): return module_logger.debug("Finished reading another file") if not tmp.empty: module_logger.warning("Found {} matchs".format(len(tmp))) queue.put(tmp) module_logger.debug("Finished scanning all potential matchups.") queue.put("END")
[docs]def matchup(track, ds, dL_tol: float, dt_tol, queue=None): """Search a granule for pixels within range (time/space) of a track For a given sequence of waypoints (`track`), it returns all satellite pixels (data) from `ds` which are at most `dL_tol` (distance tolerance) and `dt_tol` (time tolerance) from one of the waypoints. The output includes lat and lon of the found pixel, plus dL (distance), and dt (difference in time) between the matchup pixel - waypoint. This is the generic function that will choose which procedure to apply according to the processing level of the image, since they are structured in different projections. Parameters ---------- track: pandas.DataFrame A collection of waypoints containing {time, lat, lon}. The index on this DataFrame will be used as the reference on for the output. ds: xarray.Dataset An L2 granule, which is usually loaded from a netCDF. dL_tol: float Maximum distance in meters from a waypoint to be considered a matchup. dt_tol: np.timedelta64 Maximum accepted time difference to be considered a matchup. queue: Queue.queue, optional If given, the results are sent to this queue. Returns ------- matchup: pd.DataFrame All pixels within space and time range from the given track of waypoints. One pixel per row. If queue is given as an input, the returns are instead transmitted to that queue and the function returns None instead. See Also -------- matchup_L2 : Search an L2 dataset for pixels within a range matchup_L3m : Search an L3m dataset for pixels within a range """ assert ds.processing_level in ("L2", "L3 Mapped") if ds.processing_level == "L2": module_logger.debug("processing_level L2, using matchup_L2") output = matchup_L2(track, ds, dL_tol, dt_tol) elif ds.processing_level == "L3 Mapped": module_logger.debug("processing_level L3 mapped, using matchup_L3m") output = matchup_L3m(track, ds, dL_tol, dt_tol) else: return if queue is None: return output elif output.size > 0: module_logger.info( "Found {} matchups in {}".format(len(output.index), ds.product_name) ) queue.put(output) else: module_logger.info("No matchups from {}".format(ds.product_name))
[docs]def matchup_L2(track, ds, dL_tol: float, dt_tol): """Search an L2 Dataset for pixels within range of a track For a given data frame of waypoints, return all satellite data, including lat, lon, dL (distance), and dt (difference in time) in respect of all waypoints. Parameters ---------- track: pd.DataFrame A collection of waypoints containing {time, lat, lon}. The index on this DataFrame will be used as the reference on for the output. ds: xr.Dataset An L2 granule, which is usually loaded from a netCDF. dL_tol: float Maximum distance in meters from a waypoint to be considered a matchup. dt_tol: np.timedelta64 Maximum accepted time difference to be considered a matchup. Returns ------- matchup: pd.DataFrame All pixels within space and time range from the given track of waypoints. One pixel per row. See Also -------- matchup : Search a dataset for pixels within a range matchup_L3m : Search an L3m dataset for pixels within a range """ assert ds.processing_level == "L2", "matchup_L2() requires L2 satellite data" output = pd.DataFrame() # Removing the Zulu part of the date definition. Better double # check if it is UTC and then remove the tz. time_coverage_start = pd.to_datetime(ds.time_coverage_start.replace("Z", "",)) time_coverage_end = pd.to_datetime(ds.time_coverage_end.replace("Z", "",)) idx = (track.time >= (time_coverage_start - dt_tol)) & ( track.time <= (time_coverage_end + dt_tol) ) # Profiles possibly in the time window covered by the file subset = track[idx].copy() assert ds.pixel_control_points.shape == ds.pixels_per_line.shape ds = ds.swap_dims({"pixel_control_points": "pixels_per_line"}) # ==== Restrict to lines and columns within the latitude range ==== # Using 100 to get a larger deg_tol deg_tol = dL_tol / 100e3 idx = (ds.lat >= (subset.lat.min() - deg_tol)) & ( ds.lat <= (subset.lat.max() + deg_tol) ) if not idx.any(): return output # Meridians converge poleward, thus requiring a different criterion lon_tol = deg_tol / np.cos(np.pi / 180 * subset.lat.abs().max()) lon_start = subset.lon.min() - lon_tol lon_end = subset.lon.max() + lon_tol # Otherwise do the precise distance estimate to handle the day line. if (lon_start > -180) and (lon_end < 180): idx &= (ds.lon >= (subset.lon.min() - lon_tol)) & ( ds.lon <= (subset.lon.max() + lon_tol)) if not idx.any(): return output ds = ds.isel(pixels_per_line=idx.any(dim="number_of_lines")) ds = ds.isel(number_of_lines=idx.any(dim="pixels_per_line")) varnames = [ v for v in ds.variables.keys() if ds.variables[v].dims == ("number_of_lines", "pixels_per_line") ] varnames = [v for v in varnames if v not in ("lat", "lon")] # ds = ds[varnames] g = Geod(ellps="WGS84") # Use Clarke 1966 ellipsoid. assert ds.time.dims == ("number_of_lines",), "Assume time by n of lines" for i, p in subset.iterrows(): for _, grp in ds.groupby("number_of_lines"): # Only sat. Chl within a certain distance. dL = g.inv( grp.lon.data, grp.lat.data, np.ones(grp.lon.shape) * p.lon, np.ones(grp.lat.shape) * p.lat, )[2] idx = dL <= dL_tol if idx.any(): # Save the product_name?? tmp = { "waypoint_id": i, "lon": grp.lon.data[idx], "lat": grp.lat.data[idx], "dL": dL[idx].astype("i"), "dt": pd.to_datetime(grp.time.data) - p.time, } for v in varnames: tmp[v] = grp[v].data[idx] tmp = pd.DataFrame(tmp) # Remove rows where all varnames are NaN tmp = tmp[(~tmp[varnames].isna()).any(axis="columns")] output = output.append(tmp, ignore_index=True) if "product_name" in ds.attrs: output["product_name"] = ds.product_name return output
[docs]def matchup_L3m(track, ds, dL_tol: float, dt_tol): """Search an L3 Dataset for pixels within range of a track For a given data frame of waypoints, return all satellite data, including lat, lon, dL (distance), and dt (difference in time) in respect of all waypoints. Parameters ---------- track: pd.DataFrame A collection of waypoints containing {time, lat, lon}. The index on this DataFrame will be used as the reference on for the output. ds: xr.Dataset L3m composite. dL_tol: float Distance in meters around a waypoint to be considered a matchup. dt_tol: np.timedelta64 Time difference to be considered a matchup. Returns ------- matchup: pd.DataFrame All pixels within space and time range from the given track of waypoints. One pixel per row. See Also -------- matchup : Search a dataset for pixels within a range matchup_L2 : Search an L2 dataset for pixels within a range Notes ----- Since L3M product are daily means, dt=0 means a profile on the same day of the satellite measurement, while + 3hrs means the hour 3 of the following day. Further, dt_tol=0 limits to satellite mean for the same day of the profile, while dt_tol=12 limits to the day of the profile plus the previous day if spray measured in the morning or following day if measurement was done in the afternoon/evening. IDEA: Maybe crop nc by min/max lat and lon before estimate the distances. Return all satellite data in range of some profile For a given data frame of profiles, return all satellite data, including lat, lon, dL (distance), and dt (difference in time) in respect of all profiles. Since L3M product is daily means, dt=0 means a profile on the same day of the satellite measurement, while + 3hrs means the hour 3 of the following day. Further, dt_tol=0 limits to satellite mean for the same day of the profile, while dt_tol=12 limits to the day of the profile plus the previous day if spray measured in the morning or following day if measurement was done in the afternoon/evening. IDEA: Maybe crop nc by min/max lat and lon before estimate the distances. """ # if dt_tol is None: # dt_tol = pd.to_timedelta(0) # elif isinstance(dt_tol, datetime): # dt_tol = pd.to_timedelta(dt_tol) assert ( ds.processing_level == "L3 Mapped" ), "matchup_L3m() requires L3 Mapped satellite data" # Removing the Zulu part of the date definition. Better double # check if it is UTC and then remove the tz. time_coverage_start = pd.to_datetime(ds.time_coverage_start.replace("Z", "",)) time_coverage_end = pd.to_datetime(ds.time_coverage_end.replace("Z", "",)) time_reference = ( time_coverage_start + (time_coverage_end - time_coverage_start) / 2.0 ) idx = (track.time >= (time_coverage_start - dt_tol)) & ( track.time <= (time_coverage_end + dt_tol) ) # Profiles possibly in the time window covered by the file subset = track[idx].copy() # Using 110 to get a slightly larger deg_tol deg_tol = dL_tol / 110e3 ds = ds.isel(lat=(ds.lat >= subset.lat.min() - deg_tol)) ds = ds.isel(lat=(ds.lat <= subset.lat.max() + deg_tol)) Lon, Lat = np.meshgrid(ds.lon[:], ds.lat[:]) varnames = [ v for v in ds.variables.keys() if ds.variables[v].dims == ("lat", "lon") ] ds = ds[varnames] output = pd.DataFrame() g = Geod(ellps="WGS84") # Use Clarke 1966 ellipsoid. # Maybe filter for i, p in subset.iterrows(): # Only sat. Chl within a certain distance. dL = g.inv(Lon, Lat, np.ones(Lon.shape) * p.lon, np.ones(Lat.shape) * p.lat)[2] idx = dL <= dL_tol tmp = { "waypoint_id": i, "lon": Lon[idx], "lat": Lat[idx], "dL": dL[idx].astype("i"), } # What to do if idx is none? Need to do something here and stop earlier # Overlap between daily averages can result in more than 2 images # Any time inside the coverage range is considered dt=0 # if p.datetime < time_coverage_start: # tmp['dt'] = p.datetime - time_coverage_start # elif p.datetime > time_coverage_end: # tmp['dt'] = p.datetime - time_coverage_end # else: # tmp['dt'] = pd.Timedelta(0) tmp["dt"] = time_reference - p.time for v in varnames: tmp[v] = ds[v].data[idx] tmp = pd.DataFrame(tmp) # tmp.dropna(inplace=True) # Remove rows where all varnames are NaN tmp = tmp[(~tmp[varnames].isna()).any(axis="columns")] output = output.append(tmp, ignore_index=True) return output