Source code for lsst.sims.movingObjects.chebyFits

from __future__ import print_function, division
import os
import warnings
import numpy as np
from .chebyshevUtils import chebfit, makeChebMatrix, makeChebMatrixOnlyX
from .orbits import Orbits
from .ooephemerides import PyOrbEphemerides

__all__ = ['ChebyFits']


def three_sixty_to_neg(ra):
    """Wrap discontiguous RA values into more-contiguous results."""
    if (ra.min() < 100) and (ra.max() > 270):
        ra = np.where(ra > 270, ra - 360, ra)
    return ra


[docs]class ChebyFits(object): """Generates chebyshev coefficients for a provided set of orbits. Calculates true ephemerides using PyEphemerides, then fits these positions with a constrained Chebyshev Polynomial, using the routines in chebyshevUtils.py. Many chebyshev polynomials are used to fit one moving object over a given timeperiod; typically, the length of each segment is typically about 2 days for MBAs. The start and end of each segment must match exactly, and the entire segments must fit into the total timespan an integer number of times. This is accomplished by setting nDecimal to the number of decimal places desired in the 'time' value. For faster moving objects, this number needs be greater to allow for smaller subdivisions. It's tempting to allow flexibility to the point of not enforcing this for non-database use; however, then the resulting ephemeris may have multiple values depending on which polynomial segment was used to calculate the ephemeris. The length of each chebyshev polynomial is related to the number of ephemeris positions used to fit that polynomial by ngran: length = timestep * ngran The length of each polynomial is adjusted so that the residuals in RA/Dec position are less than skyTolerance - default = 2.5mas. The polynomial length (and the resulting residuals) is affected by ngran (i.e. timestep). Default values are based on Yusra AlSayaad's work. Parameters ---------- orbitsObj : Orbits The orbits for which to fit chebyshev polynomial coefficients. tStart : float The starting point in time to fit coefficients. MJD. tSpan : float The time span (starting at tStart) over which to fit coefficients. Days. timeScale : {'TAI', 'UTC', 'TT'} The timescale of the MJD time, tStart, and the timeScale that should be used with the chebyshev coefficients. obsCode : int, optional The observatory code of the location for which to generate ephemerides. Default 807 (CTIO). skyTolerance : float, optional The desired tolerance in mas between ephemerides calculated by OpenOrb and fitted values. Default 2.5 mas. nCoeff_position : int, optional The number of Chebyshev coefficients to fit for the RA/Dec positions. Default 14. nCoeff_vmag : int, optional The number of Chebyshev coefficients to fit for the V magnitude values. Default 9. nCoeff_delta : int, optional The number of Chebyshev coefficients to fit for the distance between Earth/Object. Default 5. nCoeff_elongation : int, optional The number of Chebyshev coefficients to fit for the solar elongation. Default 5. ngran : int, optional The number of ephemeris points within each Chebyshev polynomial segment. Default 64. ephFile : str, optional The path to the JPL ephemeris file to use. Default is '$OORB_DATA/de405.dat'. nDecimal : int, optional The number of decimal places to allow in the segment length (and thus the times of the endpoints) can be limited to nDecimal places. Default 10. For LSST SIMS moving object database, this should be 13 decimal places for NEOs and 0 for all others. """ def __init__(self, orbitsObj, tStart, tSpan, timeScale='TAI', obscode=807, skyTolerance=2.5, nCoeff_position=14, nCoeff_vmag=9, nCoeff_delta=5, nCoeff_elongation=6, ngran=64, ephFile=None, nDecimal=10): # Set up PyOrbEphemerides. if ephFile is None: self.ephFile = os.path.join(os.getenv('OORB_DATA'), 'de405.dat') else: self.ephFile = ephFile self.pyephems = PyOrbEphemerides(self.ephFile) # And then set orbits. self._setOrbits(orbitsObj) # Save input parameters. # We have to play some games with the start and end times, using Decimal, # in order to get the subdivision and times to match exactly, up to nDecimal places. self.nDecimal = int(nDecimal) self.tStart = round(tStart, self.nDecimal) self.tSpan = round(tSpan, self.nDecimal) self.tEnd = round(self.tStart + self.tSpan, self.nDecimal) # print('input times', self.tStart, self.tSpan, self.tEnd, orbitsObj.orbits.objId.as_matrix()) if timeScale.upper() == 'TAI': self.timeScale = 'TAI' elif timeScale.upper() == 'UTC': self.timeScale = 'UTC' elif timeScale.upper() == 'TT': self.timeScale = 'TT' else: raise ValueError('Do not understand timeScale; use TAI, UTC or TT.') self.obscode = obscode self.skyTolerance = skyTolerance self.nCoeff = {} self.nCoeff['position'] = int(nCoeff_position) self.nCoeff['geo_dist'] = int(nCoeff_delta) self.nCoeff['vmag'] = int(nCoeff_vmag) self.nCoeff['elongation'] = int(nCoeff_elongation) self.ngran = int(ngran) # Precompute multipliers (we only do this once, instead of per segment). self._precomputeMultipliers() # Initialize attributes to save the coefficients and residuals. self.coeffs = {'objId': [], 'tStart': [], 'tEnd': [], 'ra': [], 'dec': [], 'geo_dist': [], 'vmag': [], 'elongation': []} self.resids = {'objId': [], 'tStart': [], 'tEnd': [], 'pos': [], 'geo_dist': [], 'vmag': [], 'elongation': []} self.failed = [] def _setOrbits(self, orbitsObj): """Set the orbits, to be used to generate ephemerides. Parameters ---------- orbitsObj : Orbits The orbits to use to generate ephemerides. """ if not isinstance(orbitsObj, Orbits): raise ValueError('Need to provide an Orbits object.') self.orbitsObj = orbitsObj self.pyephems.setOrbits(self.orbitsObj) def _precomputeMultipliers(self): """Calculate multipliers for Chebyshev fitting. Calculate these once, rather than for each segment. """ # The nPoints are predetermined here, based on Yusra's earlier work. # The weight is based on Newhall, X. X. 1989, Celestial Mechanics, 45, p. 305-310 self.multipliers = {} self.multipliers['position'] = makeChebMatrix(self.ngran + 1, self.nCoeff['position'], weight=0.16) self.multipliers['vmag'] = makeChebMatrixOnlyX(self.ngran + 1, self.nCoeff['vmag']) self.multipliers['geo_dist'] = makeChebMatrixOnlyX(self.ngran + 1, self.nCoeff['geo_dist']) self.multipliers['elongation'] = makeChebMatrixOnlyX(self.ngran + 1, self.nCoeff['elongation']) def _lengthToTimestep(self, length): """Convert chebyshev polynomial segment lengths to the corresponding timestep over the segment. Parameters ---------- length : float The chebyshev polynomial segment length (nominally, days). Returns ------- float The corresponding timestep, = length/ngran (nominally, days). """ return length / self.ngran
[docs] def makeAllTimes(self): """Using tStart and tEnd, generate a numpy array containing times spaced at timestep = self.length/self.ngran. The expected use for this time array would be to generate ephemerides at each timestep. Returns ------- numpy.ndarray Numpy array of times. """ try: self.length except AttributeError: raise AttributeError('Need to set self.timestep first, using calcSegmentLength.') timestep = self._lengthToTimestep(self.length) times = np.arange(self.tStart, self.tEnd + timestep / 2, timestep) return times
[docs] def generateEphemerides(self, times, byObject=True, verbose=False): """Generate ephemerides using OpenOrb for all orbits. Parameters ---------- times : numpy.ndarray The times to use for ephemeris generation. """ return self.pyephems.generateEphemerides(times, obscode=self.obscode, ephMode='N', ephType='basic', timeScale=self.timeScale, byObject=byObject, verbose=verbose)
def _roundLength(self, length): """Modify length, to fit in an 'integer multiple' within the tStart/tEnd, and to have the desired number of decimal values. Parameters ---------- length : float The input length value to be rounded. Returns ------- float The rounded length value. """ length = round(length, self.nDecimal) length_in = length # Make length an integer value within the time interval, to last decimal place accuracy. counter = 0 prev_int_factor = 0 numTolerance = 10. ** (-1 * (self.nDecimal - 1)) while ((self.tSpan % length) > numTolerance) and (length > 0) and (counter < 20): int_factor = int(self.tSpan / length) + 1 # round up / ceiling if int_factor == prev_int_factor: int_factor = prev_int_factor + 1 prev_int_factor = int_factor length = round(self.tSpan / int_factor, self.nDecimal) counter += 1 if (self.tSpan % length) > numTolerance or (length <= 0): # Add this entire segment into the failed list. for objId in self.orbitsObj.orbits['objId'].as_matrix(): self.failed.append((objId, self.tStart, self.tEnd)) raise ValueError('Could not find a suitable length for the timespan (start %f, span %f), ' 'starting with length %s, ending with length value %f' % (self.tStart, self.tSpan, str(length_in), length)) return length def _testResiduals(self, length, cutoff=99): """Calculate the position residual, for a test case. Convenience function to make calcSegmentLength easier to read. """ # The pos_resid used will be the 'cutoff' percentile of all max residuals per object. max_pos_resids = np.zeros(len(self.orbitsObj), float) timestep = self._lengthToTimestep(length) # Test for one segment near the start (would do at midpoint, but for long timespans # this is not efficient .. a point near the start should be fine). times = np.arange(self.tStart, self.tStart + length + timestep / 2, timestep) # We must regenerate ephemerides here, because the timestep is different each time. ephs = self.generateEphemerides(times, byObject=True) # Look for the coefficients and residuals. for i, e in enumerate(ephs): coeff_ra, coeff_dec, max_pos_resids[i] = self._getCoeffsPosition(e) # Find a representative value and return. pos_resid = np.percentile(max_pos_resids, cutoff) ratio = pos_resid / self.skyTolerance return pos_resid, ratio
[docs] def calcSegmentLength(self, length=None): """Set the typical initial ephemeris timestep and segment length for all objects between tStart/tEnd. Sets self.length. The segment length will fit into the time period between tStart/tEnd an approximately integer multiple of times, and will only have a given number of decimal places. Parameters ---------- length : float, optional If specified, this value for the length is used, instead of calculating it here. """ # If length is specified, use it and do nothing else. if length is not None: length = self._roundLength(length) pos_resid, ratio = self._testResiduals(length) if pos_resid > self.skyTolerance: warnings.warn('Will set length and timestep, but this value of length ' 'produces residuals (%f) > skyTolerance (%f).' % (pos_resid, self.skyTolerance)) self.length = length return # Otherwise, calculate an appropriate length and timestep. # Give a guess at a very approximate segment length, given the skyTolerance, # purposefully trying to overestimate this value. # The actual behavior of the residuals is not linear with segment length. # There is a linear increase at low residuals < ~2 mas / segment length < 2 days # Then at around 2 days the residuals blow up, increasing rapidly to about 5000 mas # (depending on orbit .. TNOs, for example, increase but only to about 300 mas, # when the residuals resume ~linear growth out to 70 day segments if ngran=128) # Make an arbitrary cap on segment length at 60 days, (25000 mas) ~.5 arcminute accuracy. maxLength = 60 maxIterations = 50 if self.skyTolerance < 5: # This is the cap of the low-linearity regime, looping below will refine this value. length = 2.0 elif self.skyTolerance >= 5000: # Make a very rough guess. length = np.round((5000.0 / 20.0) * (self.skyTolerance - 5000.)) + 5.0 length = np.min([maxLength, int(length * 10) / 10.0]) else: # Try to pick a length somewhere in the middle of the fast increase. length = 4.0 # Tidy up some characteristics of "length": # make it fit an integer number of times into overall timespan. # and use a given number of decimal places (easier for database storage). length = self._roundLength(length) # Check the resulting residuals. pos_resid, ratio = self._testResiduals(length) counter = 0 # Now should be relatively close. Start to zero in using slope around the value.ngran while pos_resid > self.skyTolerance and counter <= maxIterations and length > 0: length = length / 2 length = self._roundLength(length) pos_resid, ratio = self._testResiduals(length) counter += 1 # print(counter, length, pos_resid, ratio) if counter > maxIterations or length <= 0: # Add this entire segment into the failed list. for objId in self.orbitsObj.orbits['objId'].as_matrix(): self.failed.append((objId, self.tStart, self.tEnd)) errorMessage = 'Could not find good segment length to meet skyTolerance %f' % (self.skyTolerance) errorMessage += ' milliarcseconds within %d iterations. ' % (maxIterations) errorMessage += 'Final residual was %f milli-arcseconds.' % (pos_resid) raise ValueError(errorMessage) else: self.length = length
def _getCoeffsPosition(self, ephs): """Calculate coefficients for the ra/dec values of a single objects ephemerides. Parameters ---------- times : numpy.ndarray The times of the ephemerides. ephs : numpy.ndarray The structured array returned by PyOrbEphemerides holding ephemeris values, for one object. Returns ------- numpy.ndarray The ra coefficients numpy.ndarray The dec coefficients float The positional error residuals between fit and ephemeris values, in mas. """ dradt_coord = ephs['dradt'] / np.cos(np.radians(ephs['dec'])) coeff_ra, resid_ra, rms_ra_resid, max_ra_resid = \ chebfit(ephs['time'], three_sixty_to_neg(ephs['ra']), dxdt=dradt_coord, xMultiplier=self.multipliers['position'][0], dxMultiplier=self.multipliers['position'][1], nPoly=self.nCoeff['position']) coeff_dec, resid_dec, rms_dec_resid, max_dec_resid = \ chebfit(ephs['time'], ephs['dec'], dxdt=ephs['ddecdt'], xMultiplier=self.multipliers['position'][0], dxMultiplier=self.multipliers['position'][1], nPoly=self.nCoeff['position']) max_pos_resid = np.max(np.sqrt(resid_dec**2 + (resid_ra * np.cos(np.radians(ephs['dec'])))**2)) # Convert position residuals to mas. max_pos_resid *= 3600.0 * 1000.0 return coeff_ra, coeff_dec, max_pos_resid def _getCoeffsOther(self, ephs): """Calculate coefficients for the ra/dec values of a single objects ephemerides. Parameters ---------- ephs : numpy.ndarray The structured array returned by PyOrbEphemerides holding ephemeris values, for one object. Returns ------- dict Dictionary containing the coefficients for each of 'geo_dist', 'vmag', 'elongation' dict Dictionary containing the max residual values for each of 'geo_dist', 'vmag', 'elongation'. """ coeffs = {} max_resids = {} for key, ephValue in zip(('geo_dist', 'vmag', 'elongation'), ('geo_dist', 'magV', 'solarelon')): coeffs[key], resid, rms, max_resids[key] = chebfit(ephs['time'], ephs[ephValue], dxdt=None, xMultiplier=self.multipliers[key], dxMultiplier=None, nPoly=self.nCoeff[key]) return coeffs, max_resids
[docs] def calcSegments(self): """Run the calculation of all segments over the entire time span. """ # First calculate ephemerides for all objects, over entire time span. # For some objects, we will end up recalculating the ephemeride values, but most should be fine. times = self.makeAllTimes() ephs = self.generateEphemerides(times) eps = self._lengthToTimestep(self.length)/4.0 # Loop through each object to generate coefficients. for orbitObj, e in zip(self.orbitsObj, ephs): tSegmentStart = self.tStart # Cycle through all segments until we reach the end of the period we're fitting. while tSegmentStart < (self.tEnd - eps): # Identify the subset of times and ephemerides which are relevant for this segment # (at the default segment size). tSegmentEnd = round(tSegmentStart + self.length, self.nDecimal) subset = np.where((times >= tSegmentStart) & (times < tSegmentEnd + eps)) self.calcOneSegment(orbitObj, e[subset]) tSegmentStart = tSegmentEnd
[docs] def calcOneSegment(self, orbitObj, ephs): """Calculate the coefficients for a single Chebyshev segment, for a single object. Calculates the coefficients and residuals, and saves this information to self.coeffs, self.resids, and (if there are problems), self.failed. Parameters ---------- orbitObj : Orbits The single Orbits object we're fitting at the moment. ephs : numpy.ndarray The ephemerides we're fitting at the moment (for the single object / single segment). """ objId = orbitObj.orbits.objId.iloc[0] tSegmentStart = ephs['time'][0] tSegmentEnd = ephs['time'][-1] coeff_ra, coeff_dec, max_pos_resid = self._getCoeffsPosition(ephs) if max_pos_resid > self.skyTolerance: # print('subdividing segments', orbitObj.orbits.objId.iloc[0]) self._subdivideSegment(orbitObj, ephs) else: # print('working on ', orbitObj.orbits.objId.iloc[0], 'at times', tSegmentStart, tSegmentEnd) coeffs, max_resids = self._getCoeffsOther(ephs) fitFailed = False for k in max_resids: if np.isnan(max_resids[k]): fitFailed = True if fitFailed: warnings.warn('Fit failed for orbitObj %d for times between %f and %f' % (objId, tSegmentStart, tSegmentEnd)) self.failed.append((orbitObj.orbits['objId'], tSegmentStart, tSegmentEnd)) else: # Consolidate items into the tracked coefficient values. self.coeffs['objId'].append(objId) self.coeffs['tStart'].append(tSegmentStart) self.coeffs['tEnd'].append(tSegmentEnd) self.coeffs['ra'].append(coeff_ra) self.coeffs['dec'].append(coeff_dec) self.coeffs['geo_dist'].append(coeffs['geo_dist']) self.coeffs['vmag'].append(coeffs['vmag']) self.coeffs['elongation'].append(coeffs['elongation']) # Consolidate items into the tracked residual values. self.resids['objId'].append(objId) self.resids['tStart'].append(tSegmentStart) self.resids['tEnd'].append(tSegmentEnd) self.resids['pos'].append(max_pos_resid) self.resids['geo_dist'].append(max_resids['geo_dist']) self.resids['vmag'].append(max_resids['geo_dist']) self.resids['elongation'].append(max_resids['elongation'])
def _subdivideSegment(self, orbitObj, ephs): """Subdivide a segment, then calculate the segment coefficients. Parameters ---------- orbitObj : Orbits The single Orbits object we're fitting at the moment. ephs : numpy.ndarray The ephemerides we're fitting at the moment (for the single object / single segment). """ newCheby = ChebyFits(orbitObj, ephs['time'][0], (ephs['time'][-1] - ephs['time'][0]), timeScale=self.timeScale, obscode=self.obscode, skyTolerance=self.skyTolerance, nCoeff_position=self.nCoeff['position'], nCoeff_vmag=self.nCoeff['vmag'], nCoeff_delta=self.nCoeff['geo_dist'], nCoeff_elongation=self.nCoeff['elongation'], ngran=self.ngran, ephFile=self.ephFile, nDecimal=self.nDecimal) try: newCheby.calcSegmentLength() except ValueError as ve: # Could not find a good segment length. warningmessage = 'Objid %s, segment %f to %f ' % (orbitObj.orbits.objId.iloc[0], ephs['time'][0], ephs['time'][-1]) warningmessage += ' - error: %s' % (ve) warnings.warn(warningmessage) self.failed += newCheby.failed return newCheby.calcSegments() # Add subdivided segment values into tracked values here. for k in self.coeffs: self.coeffs[k] += newCheby.coeffs[k] for k in self.resids: self.resids[k] += newCheby.resids[k] self.failed += newCheby.failed
[docs] def write(self, coeffFile, residFile, failedFile, append=False): """Write coefficients, residuals and failed fits to disk. Parameters ---------- coeffFile : str The filename for the coefficient values. residFile : str The filename for the residual values. failedFile : str The filename to write the failed fit information (if failed objects exist). append : bool, optional Flag to append (or overwrite) the output files. """ if append: openMode = 'aw' else: openMode = 'w' # Write a header to the coefficients file, if writing to a new file: if (not append) or (not os.path.isfile(coeffFile)): header = 'objId tStart tEnd ' header += ' '.join(['ra_%d' % x for x in range(self.nCoeff['position'])]) + ' ' header += ' '.join(['dec_%d' % x for x in range(self.nCoeff['position'])]) + ' ' header += ' '.join(['geo_dist_%d' % x for x in range(self.nCoeff['geo_dist'])]) + ' ' header += ' '.join(['vmag_%d' % x for x in range(self.nCoeff['vmag'])]) + ' ' header += ' '.join(['elongation_%d' % x for x in range(self.nCoeff['elongation'])]) else: header = None if (not append) or (not os.path.isfile(residFile)): resid_header = 'objId segNum tStart tEnd length pos geo_dist vmag elong' else: resid_header = None timeformat = '%.' + '%s' % self.nDecimal + 'f' with open(coeffFile, openMode) as f: if header is not None: print(header, file=f) for i, (objId, tStart, tEnd, cRa, cDec, cDelta, cVmag, cE) in \ enumerate(zip(self.coeffs['objId'], self.coeffs['tStart'], self.coeffs['tEnd'], self.coeffs['ra'], self.coeffs['dec'], self.coeffs['geo_dist'], self.coeffs['vmag'], self.coeffs['elongation'])): print("%s %s %s %s %s %s %s %s" % (objId, timeformat % tStart, timeformat % tEnd, " ".join('%.14e' % j for j in cRa), " ".join('%.14e' % j for j in cDec), " ".join('%.7e' % j for j in cDelta), " ".join('%.7e' % j for j in cVmag), " ".join('%.7e' % j for j in cE)), file=f) with open(residFile, openMode) as f: if resid_header is not None: print(resid_header, file=f) for i, (objId, tStart, tEnd, rPos, rDelta, rVmag, rE) in \ enumerate(zip(self.resids['objId'], self.resids['tStart'], self.resids['tEnd'], self.resids['pos'], self.resids['geo_dist'], self.resids['vmag'], self.resids['elongation'])): print("%s %i %.14f %.14f %.14f %.14e %.14e %.14e %.14e" % (objId, i + 1, tStart, tEnd, (tEnd - tStart), rPos, rDelta, rVmag, rE), file=f) if len(self.failed) > 0: with open(failedFile, openMode) as f: for i, failed in enumerate(self.failed): print(' '.join([str(x) for x in failed]), file=f)