# ZTF single day file exploration with Async, Promises and Orchestrator

In [1]:
import orchestrator as orch
import time
import sys
import tarfile
import os
import functools

from pprint import pprint
from promise import Promise

from fastavro import reader, writer
import julian

In [2]:
ztf_file="data/ztf_public_20190728.tar.gz"
ztf_date="20190728"

In [3]:
# untar the file
untar_file = False

if untar_file:
 print("extracting alerts from %s" % ztf_file)
 tar = tarfile.open(ztf_file)
 tar.extractall(path="in/%s" % ztf_date)
 tar.close()
else:
 print("skipping file extraction")
 if not os.path.exists("in/%s" % ztf_date):
 raise(RuntimeError("ZTF alerts directory %s does not exists"))
 else:
 print("Using in/%s as ZTF alert directory" % ztf_date)

ztf_in="in/%s" % ztf_date


skipping file extraction
Using in/20190728 as ZTF alert directory


In [4]:
# count the alerts in the directory

num_alerts=0

avro_files = []
avro_limit = 10000

for file in os.listdir(ztf_in):
 if file.endswith(".avro"):
 num_alerts+=1
 if num_alerts < avro_limit:
 avro_files.append(file)

print("num alerts: %d" % num_alerts)

num alerts: 125586


In [5]:
# rebuild the list?
rebuild_avro_list = False
if rebuild_avro_list:
 num_alerts=0

 avro_files = []
 avro_limit = 100000

 for file in os.listdir(ztf_in):
 if file.endswith(".avro"):
 num_alerts+=1
 if num_alerts < avro_limit:
 avro_files.append(file)

 print("num alerts: %d" % num_alerts)

number_of_chunks = 10

chunk_size = int(len(avro_files)/number_of_chunks)+1
print("chunk_size: ",chunk_size)

log_each = 1000

chunks = lambda l, n: [l[x: x+n] for x in range(0, len(l), n)]
avro_lists = chunks(avro_files,chunk_size)

print("number of lists:",len(avro_lists))

class Counter:
 def __init__(self,count=0,log_each=100,total = 0):
 self.count = count
 self.log_each = log_each
 self.total = total
 def add(self, c):
 self.count+=c
 def show_completed(self):
 if (self.count % self.log_each) == 0:
 print("%.2f %% completed" % ((self.count/self.total) * 100))
 def __repr__(self):
 return self.count
 
@orch.Async
def map_list_ts_key(file_list, counter):
 local_dates = {}
 for file in file_list:
 num_records = 0
 with open("%s/%s" % (ztf_in,file), 'rb') as fo:
 avro_reader = reader(fo)
 for record in avro_reader:
 #pprint(record)
 num_records+=1
 jd = record["candidate"]["jd"]
 ts = julian.from_jd(jd, fmt='jd')
 ts_key = ts.strftime("%Y%m%d")
 if ts_key in local_dates:
 local_dates[ts_key]+=1
 else:
 local_dates[ts_key]=1

 counter.add(num_records)
 counter.show_completed()

 print(len(file_list),local_dates)
 return local_dates

counter = Counter(0,log_each, len(avro_files))

counts = [ Promise.resolve(map_list_ts_key(x,counter)) for x in avro_lists ]

local_dates = []
# wait for the async calls
for x in counts:
 x.get().wait()
 # one get fo the promise, and the other get for the Async call
 local_dates.append(x.get().get())

# putting all local_dates together

def merge(x,y):
 ret={}
 keys = set(list(x.keys()) + list(y.keys()))
 for key in keys:
 if key in ret:
 ret[key]+=(x[key]+y[key])
 else:
 ret[key]=(x[key]+y[key])
 return ret

dates = functools.reduce(merge,local_dates)
print(dates)

{'candid': 938318155215010004,
 'candidate': {'aimage': 0.7630000114440918,
 'aimagerat': 0.27745455503463745,
 'bimage': 0.7200000286102295,
 'bimagerat': 0.26181817054748535,
 'candid': 938318155215010004,
 'chinr': 1.3009999990463257,
 'chipsf': 37.39431381225586,
 'classtar': 0.9829999804496765,
 'clrcoeff': 0.08901599794626236,
 'clrcounc': 9.455599865759723e-06,
 'clrmed': 0.9570000171661377,
 'clrrms': 0.27230900526046753,
 'dec': 22.6338266,
 'decnr': 22.6338191,
 'diffmaglim': 20.799943923950195,
 'distnr': 0.042584728449583054,
 'distpsnr1': 0.06271562725305557,
 'distpsnr2': 0.2556854486465454,
 'distpsnr3': 2.1249427795410156,
 'drb': 0.9395135641098022,
 'drbversion': 'd6_m7',
 'dsdiff': -152.47169494628906,
 'dsnrms': 194.2211456298828,
 'elong': 1.0597221851348877,
 'exptime': 30.0,
 'fid': 2,
 'field': 591,
 'fwhm': 2.75,
 'isdiffpos': 'f',
 'jd': 2458692.8181597,
 'jdendhist': 2458692.8186111,
 'jdendref': 2458294.833785,
 'jdstarthist': 2458291.9795139,
 'jdstartref':

In [17]:
# let's try to use different nodes to see if performance can be better

# rebuild the list?
rebuild_avro_list = True
if rebuild_avro_list:
 num_alerts=0

 avro_files = []
 avro_limit = 50000

 for file in os.listdir(ztf_in):
 if file.endswith(".avro"):
 num_alerts+=1
 if num_alerts < avro_limit:
 avro_files.append(file)

 print("num alerts: %d" % num_alerts)

number_of_chunks = 10

chunk_size = int(len(avro_files)/number_of_chunks)+1
print("chunk_size: ",chunk_size)

log_each = 1000

chunks = lambda l, n: [l[x: x+n] for x in range(0, len(l), n)]
avro_lists = chunks(avro_files,chunk_size)

@orch.Async
def new_map_list_ts_key(file_list):
 
 from fastavro import reader, writer
 import julian
 
 log_each = 100
 local_dates = {}
 ztf_date = "20190728"
 ztf_in = "in/%s" % ztf_date
 num_records = 0
 for file in file_list:
 with open("%s/%s" % (ztf_in,file), 'rb') as fo:
 avro_reader = reader(fo)
 for record in avro_reader:
 #pprint(record)
 num_records+=1
 jd = record["candidate"]["jd"]
 ts = julian.from_jd(jd, fmt='jd')
 ts_key = ts.strftime("%Y%m%d")
 if ts_key in local_dates:
 local_dates[ts_key]+=1
 else:
 local_dates[ts_key]=1
 if num_records % log_each == 0:
 print("%.2f %% completed" % ((num_records/len(file_list)) * 100))
 
 return local_dates

def mapping_job(file_list):
 job=orch.Job(params={\
 "ntasks":"1",
 "nodes":"1",
 "job-name":"avro_map_ts_key",
 "cpus-per-task":"5"
 })
 job.setVerbose(False)
 ret_list = job.run(new_map_list_ts_key, file_list)
 return(ret_list)

res = []
for avro_list in avro_lists:
 # process each list as a promise 
 p = Promise.resolve(mapping_job(avro_list))
 res.append(p)

# gather results
local_dates = Promise.all(res).then(lambda x: functools.reduce(merge,[p.get().get() for p in x]))

# print the result
print(local_dates.get())


num alerts: 125586
chunk_size: 5000
{'20190728': 49999}
