Source code for geextract

"""geextract"""

__version__ = "0.5.0"

import ee
import sqlite3
import pandas as pd
import re
from datetime import datetime
import warnings

# Silence pandas warning
warnings.simplefilter(action='ignore')

ee.Initialize()


BANDS_TO_COLORS = {'LT4': {'B1': 'blue',
                           'B2': 'green',
                           'B3': 'red',
                           'B4': 'nir',
                           'B5': 'swir1',
                           'B7': 'swir2',
                           'id': 'id'},
                   'LC8': {'B2': 'blue',
                           'B3': 'green',
                           'B4': 'red',
                           'B5': 'nir',
                           'B6': 'swir1',
                           'B7': 'swir2',
                           'id': 'id'}}

BANDS_TO_COLORS['LT5'] = BANDS_TO_COLORS['LT4']
BANDS_TO_COLORS['LE7'] = BANDS_TO_COLORS['LT4']


[docs]def get_date(filename): """Retriev date information from typical Landsat filenames Args: filename (str): Landsat file name Returns: datetime.date : The corresponding date of the filename. Examples: >>> import geextract >>> geextract.get_date('LC81970292013106') """ p0 = re.compile(r'(?P<sensor>LC8|LE7|LT5|LT4)(?P<pathrow>\d{6})(?P<date>\d{7})') p1 = re.compile(r'(?P<sensor>LC08|LE07|LT04|LT05)_(?P<pathrow>\d{6})_(?P<date>\d{8})') if p0.search(filename): m = p0.search(filename) d = datetime.strptime(m.group('date'), '%Y%j').date() elif p1.search(filename): m = p1.search(filename) d = datetime.strptime(m.group('date'), '%Y%m%d').date() else: raise ValueError('Unknown pattern') return d
[docs]def ts_extract(sensor, start, tiers = ['T1', 'T2'], lon = None, lat = None, end = datetime.today(), radius = None, feature = None, bands = None, stats = 'mean'): """Perform a spatio temporal query to extract Landsat surface reflectance data from gee Args: lon (float): Center longitude in decimal degree lat (float): Center latitude in decimal degree sensor (str): Landsat sensor to query data from. Must be one of 'LT4', 'LT5', 'LE7', 'LC8' tiers (list): List of tiers to order. ``'T1'`` corresponds to tiers 1. Default is ``['T1', 'T2']`` start (datetime.datetime): Start date end (datetime.datetime): Optional end date; automatically set as today if unset radius (float): Optional radius around center point in meters. If unset, time-series of a single pixel are queried. Otherwise a reducer is used to spatially aggregate the pixels intersecting the circular feature built. feature (dict): Optional dictionary representation of a polygon feature in longlat CRS. If unset, time-series of a single pixel are queried. Otherwise a reducer is used to spatially aggregate the pixels intersecting the given feature. bands (list): List of Landsat band names. Optional, defaults to ['B1', 'B2', 'B3', 'B4', 'B5', 'B7'] in the case of LC8 sensor and to ['B1', 'B2', 'B3', 'B4', 'B5', 'B7'] otherwise. stats (str): Spatial aggregation function to use. Only relevant if a radius value is set. Returns: dict: A dictionary representation of the json data returned by the gee platform. Example: >>> import geextract >>> from pprint import pprint >>> from datetime import datetime >>> lon = -89.8107197 >>> lat = 20.4159611 >>> out = geextract.ts_extract(lon=lon, lat=lat, sensor='LE7', start=datetime(1980, 1, 1, 0, 0), >>> radius=500) >>> pprint(out) """ # Define some internal functions to be mapped over imageCollections def _mask_clouds(image): """Cloud masking function""" # collection 1 cloud masking example # https://code.earthengine.google.com/52e39cc00de3471c905509e374c52284 # Pre collecction masking example # https://code.earthengine.google.com/37ffd688d1b2d2c977fa5c536a023356 # collection must be a variable of the parent environment clear = image.select('pixel_qa').bitwiseAnd(0x2).neq(0) valid_range_mask = image.gte(0).And(image.lte(10000)) return image.updateMask(clear).updateMask(valid_range_mask) # Check inputs if sensor not in ['LT4', 'LT5', 'LC8', 'LE7']: raise ValueError('Unknown sensor (Must be one of LT4, LT5, LE7, LC8)') if bands is None: if sensor in ['LT4', 'LT5', 'LE7']: bands = ['B1', 'B2', 'B3', 'B4', 'B5', 'B7'] else: bands = ['B2', 'B3', 'B4', 'B5', 'B6', 'B7'] sensor = re.sub(r'(LC|LT|LE)(\d{1})', r'\g<1>0\g<2>', sensor) collection_name_template = 'LANDSAT/%s/C01/%%s_SR' % sensor # Iterate over tiers to load and merge all corresponding image collections landsat_ic = ee.ImageCollection(collection_name_template % tiers[0]) for tier in tiers[1:]: tier_ic = ee.ImageCollection(collection_name_template % tier) landsat_ic = ee.ImageCollection(landsat_ic.merge(tier_ic)) # Prepare image collection landsat = landsat_ic.\ filterDate(start=start, opt_end=end)\ .map(_mask_clouds)\ .select(bands) if radius is not None or feature is not None: # Define spatial aggregation function if stats == 'mean': fun = ee.Reducer.mean() elif stats == 'median': fun = ee.Reducer.median() elif stats == 'max': fun = ee.Reducer.max() elif stats == 'min': fun = ee.Reducer.min() else: raise ValueError('Unknown spatial aggregation function. Must be one of mean, median, max, or min') if feature is not None: geometry = ee.Geometry.Polygon(feature['geometry']['coordinates']) else: # Geometry defined by point and radius geometry = ee.Geometry.Point(lon, lat).buffer(radius) # Define function to map over imageCollection to perform spatial aggregation def _reduce_region(image): """Spatial aggregation function for a single image and a polygon feature""" stat_dict = image.reduceRegion(fun, geometry, 30); # FEature needs to be rebuilt because the backend doesn't accept to map # functions that return dictionaries return ee.Feature(None, stat_dict) fc = landsat.filterBounds(geometry).map(_reduce_region).getInfo() out = simplify(fc) else: # Extraction with a point, no spatial aggregation, etc geometry = ee.Geometry.Point(lon, lat) l = landsat.filterBounds(geometry).getRegion(geometry, 30).getInfo() out = dictify(l) # pop longitude and lattitude keys from dict collection so that band aliases can # be replaced by their color names [d.pop('longitude', None) for d in out] [d.pop('latitude', None) for d in out] [d.pop('time', None) for d in out] return out
[docs]def simplify(fc): """Take a feature collection, as returned by mapping a reducer to a ImageCollection, and reshape it into a simpler list of dictionaries Args: fc (dict): Dictionary representation of a feature collection, as returned by mapping a reducer to an ImageCollection Returns: list: A list of dictionaries. Examples: >>> fc = {u'columns': {}, ... u'features': [{u'geometry': None, ... u'id': u'LC81970292013106', ... u'properties': {u'B1': 651.8054424353023, ... u'B2': 676.6018246419446}, ... u'type': u'Feature'}, ... {u'geometry': None, ... u'id': u'LC81970292013122', ... u'properties': {u'B1': 176.99323997958842, ... u'B2': 235.83196553144882}, ... u'type': u'Feature'}]} >>> simplify(fc) """ def feature2dict(f): id = f['id'] out = f['properties'] out.update(id=id) return out out = [feature2dict(x) for x in fc['features']] return out
[docs]def dictify(x): """Build a list of dictionaries from a list of lists as returned by running getRegion on an Image collection Args: x (list): A list of list. First list element contain the keys while following list elements contain values. Returns: list: A list of dictionaries Examples: >>> l = [[u'id', u'B1', u'B2', u'B3', u'B7'], ... [u'LC81970292013106', 649, 683, 910, 1365], ... [u'LC81970292013122', 140, 191, 521, 965]] >>> dictify(l) """ out = [dict(zip(x[0], values)) for values in x[1:]] return out
[docs]def relabel(dl, sensor): """Rename the keys of each element of a list of dictionaries Args: dl (list): List of dictionaries sensor (str): Landsat sensor to which belong the data. Must be one of 'LT4', 'LT5', 'LE7' or 'LC8' Returns: list: A list of dictionaries """ def change_keys(d, dr): return dict((dr[key], value) for (key, value) in d.items()) dl_out = [change_keys(d, BANDS_TO_COLORS[sensor]) for d in dl] return dl_out
[docs]def date_append(dl): """Add time key to each element of a list of dictionaries Args: dl (list): List of dictionaries, each dictionary should at least contain the key 'id' in which a classic Landsat scene ID parsable by get_date is stored. Returns: list: A list of dictionaries """ # Add time key to each dict of dl for item in dl: item.update(time = get_date(item['id'])) return dl
[docs]def dictlist2sqlite(dl, site, sensor, db_src, table): """Write a list of dictionaries to a sqlite database Args: dl (list): List of dictionaries db_src (str): Path an sqlite database (created in case it does not exist) table (str): Name of database table to write data Returns: This function is used for its side effect of writing data to a database; it does not return anything """ df = pd.DataFrame(dl) # Drop any row that contain no-data # TODO: Filter only row for which all bands are Nan df2 = df.dropna(how='any') df2['sensor'] = sensor df2['site'] = site con = sqlite3.connect(db_src) df2.to_sql(name=table, con=con, if_exists='append')