Skip to content

Commit

Permalink
👽 Wrap the json serialization/deserialization to force the UUID repre…
Browse files Browse the repository at this point in the history
…sentation to LEGACY

Changed all the references in the `bin` scripts to also use the new wrappers to
serialize/deserialize UUID objects.

This is a follow-on to
e-mission@51a6881
which also has the additional context

Additional Context:
- e-mission/e-mission-docs#856 (comment)
- https://pymongo.readthedocs.io/en/stable/examples/uuid.html
- 9c683cf
- 6ac02a2
- edd8b77

As of 4.3.3, the LEGACY_JSON_OPTIONS also has an UNSPECIFIED UUID representation

>  bson.json_util.LEGACY_JSON_OPTIONS: bson.json_util.JSONOptions =
>  JSONOptions(strict_number_long=False, datetime_representation=0,
>  strict_uuid=False, json_mode=0, document_class=dict, tz_aware=False,
>  uuid_representation=UuidRepresentation.UNSPECIFIED,
>  unicode_decode_error_handler='strict', tzinfo=None,
>  type_registry=TypeRegistry(type_codecs=[], fallback_encoder=None),
>  datetime_conversion=DatetimeConversion.DATETIME)¶

Testing done:
- Ensured that there were no imports of json_utils

```
$ find bin -name \*.py | xargs grep json_utils
$
```

- Ensured that all `bju` prefixes were replaced

```
$ find bin -name \*.py | xargs grep bju
$
```

- Ensured that all `esj` imports used the `wrapped` version of the name

```
$ find bin -name \*.py | xargs grep esj | egrep -v "import|esj.wrapped"
$
```
  • Loading branch information
shankari committed May 8, 2023
1 parent e96bd8e commit 39f2944
Show file tree
Hide file tree
Showing 17 changed files with 34 additions and 36 deletions.
4 changes: 2 additions & 2 deletions bin/debug/export_participants_trips_csv.py
Expand Up @@ -6,7 +6,7 @@
import uuid
import datetime as pydt
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj
import arrow
import argparse

Expand Down Expand Up @@ -111,7 +111,7 @@ def export_demographic_table_as_csv(uuid_list, args):
uuid_list = esdu.get_all_uuids()
elif args.file:
with open(args.file) as fd:
uuid_entries = json.load(fd, object_hook=bju.object_hook)
uuid_entries = json.load(fd, object_hook=esj.wrapped_object_hook)
uuid_list = [ue["uuid"] for ue in uuid_entries]
export_participant_table_as_csv(uuid_list, args)
export_trip_tables_as_csv(uuid_list, args)
Expand Down
6 changes: 3 additions & 3 deletions bin/debug/extract_geojson_for_time_range_and_user.py
Expand Up @@ -18,7 +18,7 @@
import uuid
import datetime as pydt
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj
import arrow
import argparse

Expand Down Expand Up @@ -50,7 +50,7 @@ def export_geojson(user_id, start_day_str, end_day_str, timezone, file_name):
geojson_filename = "%s_%s.gz" % (file_name, user_id)
with gzip.open(geojson_filename, "wt") as gcfd:
json.dump(user_gj,
gcfd, default=bju.default, allow_nan=False, indent=4)
gcfd, default=esj.wrapped_default, allow_nan=False, indent=4)

def export_geojson_for_users(user_id_list, args):
for curr_uuid in user_id_list:
Expand Down Expand Up @@ -85,6 +85,6 @@ def export_geojson_for_users(user_id_list, args):
uuid_list = esdu.get_all_uuids()
elif args.file:
with open(args.file) as fd:
uuid_entries = json.load(fd, object_hook=bju.object_hook)
uuid_entries = json.load(fd, object_hook=esj.wrapped_object_hook)
uuid_list = [ue["uuid"] for ue in uuid_entries]
export_geojson_for_users(uuid_list, args)
4 changes: 2 additions & 2 deletions bin/debug/extract_timeline_for_day_and_user.py
Expand Up @@ -13,7 +13,7 @@
import uuid
import datetime as pydt
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj

import emission.core.get_database as edb

Expand All @@ -34,7 +34,7 @@ def export_timeline(user_id_str, day_str, file_name):
final_query.update(date_query)
entry_list = list(edb.get_timeseries_db().find(final_query))
logging.info("Found %d entries" % len(entry_list))
json.dump(entry_list, open(file_name, "w"), default=bju.default, allow_nan=False, indent=4)
json.dump(entry_list, open(file_name, "w"), default=esj.wrapped_default, allow_nan=False, indent=4)

if __name__ == '__main__':
if len(sys.argv) != 4:
Expand Down
8 changes: 4 additions & 4 deletions bin/debug/extract_timeline_for_day_range_and_user.py
Expand Up @@ -16,7 +16,7 @@
import uuid
import datetime as pydt
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj
import arrow
import argparse

Expand Down Expand Up @@ -117,7 +117,7 @@ def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name):
combined_filename = "%s_%s.gz" % (file_name, user_id)
with gzip.open(combined_filename, "wt") as gcfd:
json.dump(combined_list,
gcfd, default=bju.default, allow_nan=False, indent=4)
gcfd, default=esj.wrapped_default, allow_nan=False, indent=4)

import emission.core.get_database as edb

Expand All @@ -129,7 +129,7 @@ def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name):
pipeline_filename = "%s_pipelinestate_%s.gz" % (file_name, user_id)
with gzip.open(pipeline_filename, "wt") as gpfd:
json.dump(pipeline_state_list,
gpfd, default=bju.default, allow_nan=False, indent=4)
gpfd, default=esj.wrapped_default, allow_nan=False, indent=4)

def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
MAX_LIMIT = 25 * 10000
Expand Down Expand Up @@ -173,6 +173,6 @@ def export_timeline_for_users(user_id_list, args):
uuid_list = esdu.get_all_uuids()
elif args.file:
with open(args.file) as fd:
uuid_entries = json.load(fd, object_hook=bju.object_hook)
uuid_entries = json.load(fd, object_hook=esj.wrapped_object_hook)
uuid_list = [ue["uuid"] for ue in uuid_entries]
export_timeline_for_users(uuid_list, args)
4 changes: 2 additions & 2 deletions bin/debug/extract_trips_for_day_and_user.py
Expand Up @@ -13,7 +13,7 @@
import uuid
import datetime as pydt
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj

import emission.core.get_database as edb

Expand All @@ -31,7 +31,7 @@ def export_timeline(user_id_str, start_day_str, end_day_str, file_name):
print("query = %s" % query)
entry_list = list(edb.get_analysis_timeseries_db().find(query))
logging.info("Found %d entries" % len(entry_list))
json.dump(entry_list, open(file_name, "w"), default=bju.default, allow_nan=False, indent=4)
json.dump(entry_list, open(file_name, "w"), default=esj.wrapped_default, allow_nan=False, indent=4)

if __name__ == '__main__':
if len(sys.argv) != 5:
Expand Down
1 change: 0 additions & 1 deletion bin/debug/fix_usercache_processing.py
Expand Up @@ -20,7 +20,6 @@
import uuid
import datetime as pydt
import json
import bson.json_util as bju

import emission.core.get_database as edb
import emission.net.usercache.abstract_usercache_handler as euah
Expand Down
4 changes: 2 additions & 2 deletions bin/debug/get_users_for_channel.py
Expand Up @@ -10,7 +10,7 @@
import argparse
import logging
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj

import emission.core.get_database as edb

Expand All @@ -30,4 +30,4 @@
logging.debug("Mapped %d entries for channel %s" % (len(matched_email2uuid_it), args.channel))

out_fd = sys.stdout if args.outfile is None else open(args.outfile, "w")
json.dump(matched_email2uuid_it, out_fd, default=bju.default)
json.dump(matched_email2uuid_it, out_fd, default=esj.wrapped_default)
8 changes: 4 additions & 4 deletions bin/debug/load_multi_timeline_for_range.py
Expand Up @@ -11,7 +11,7 @@
import logging

import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj
import argparse

import common
Expand Down Expand Up @@ -39,7 +39,7 @@ def register_fake_users(prefix, unique_user_list):
user = ecwu.User.registerWithUUID(username, uuid)

def register_mapped_users(mapfile, unique_user_list):
uuid_entries = json.load(open(mapfile), object_hook=bju.object_hook)
uuid_entries = json.load(open(mapfile), object_hook=esj.wrapped_object_hook)
logging.info("Creating user entries for %d users from map of length %d" % (len(unique_user_list), len(mapfile)))

lookup_map = dict([(eu["uuid"], eu) for eu in uuid_entries])
Expand Down Expand Up @@ -69,7 +69,7 @@ def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error):
print("Loading pipeline state for %s from %s" %
(curr_uuid, pipeline_filename))
with gzip.open(pipeline_filename) as gfd:
states = json.load(gfd, object_hook = bju.object_hook)
states = json.load(gfd, object_hook = esj.wrapped_object_hook)
if args.verbose:
logging.debug("Loading states of length %s" % len(states))
if len(states) > 0:
Expand Down Expand Up @@ -154,7 +154,7 @@ def post_check(unique_user_list, all_rerun_list):
logging.info("=" * 50)
logging.info("Loading data from file %s" % filename)

entries = json.load(gzip.open(filename), object_hook = bju.object_hook)
entries = json.load(gzip.open(filename), object_hook = esj.wrapped_object_hook)

# Obtain uuid and rerun information from entries
curr_uuid_list, needs_rerun = common.analyse_timeline(entries)
Expand Down
4 changes: 2 additions & 2 deletions bin/debug/load_timeline_for_day_and_user.py
Expand Up @@ -6,7 +6,7 @@
standard_library.install_aliases()
from builtins import *
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj
import emission.storage.timeseries.cache_series as estcs
import argparse
import emission.core.wrapper.user as ecwu
Expand All @@ -31,7 +31,7 @@
user = ecwu.User.register(args.user_email)
override_uuid = user.uuid
print("After registration, %s -> %s" % (args.user_email, override_uuid))
entries = json.load(open(fn), object_hook = bju.object_hook)
entries = json.load(open(fn), object_hook = esj.wrapped_object_hook)
munged_entries = []
for i, entry in enumerate(entries):
entry["user_id"] = override_uuid
Expand Down
4 changes: 2 additions & 2 deletions bin/debug/purge_multi_timeline_for_range.py
Expand Up @@ -9,7 +9,7 @@
import argparse
import json
import gzip
import bson.json_util as bju
import emission.storage.json_wrappers as esj

# Our imports
import common
Expand Down Expand Up @@ -50,7 +50,7 @@
logging.info("=" * 50)
logging.info("Deleting data from file %s" % filename)

entries = json.load(gzip.open(filename), object_hook = bju.object_hook)
entries = json.load(gzip.open(filename), object_hook = esj.wrapped_object_hook)

# Obtain uuid and rerun information from entries
curr_uuid_list, needs_rerun = common.analyse_timeline(entries)
Expand Down
6 changes: 3 additions & 3 deletions bin/debug/save_ground_truth.py
@@ -1,6 +1,6 @@
import attrdict as ad
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj
import sys
from uuid import UUID
import argparse
Expand All @@ -13,14 +13,14 @@ def save_diary(args):
print("Saving data for %s, %s to file %s" % (args.sel_uuid, args.date, args.file_name))
tj = edb.get_usercache_db().find_one({'metadata.key': "diary/trips-%s" % args.date, "user_id": args.sel_uuid})
print("Retrieved object is of length %s" % len(tj))
json.dump(tj, open(args.file_name, "w"), indent=4, default=bju.default)
json.dump(tj, open(args.file_name, "w"), indent=4, default=esj.wrapped_default)

def save_ct_list(args):
print("Saving confirmed trip list for %s to file %s" % (args.sel_uuid, args.file_name))
ts = esta.TimeSeries.get_time_series(args.sel_uuid)
composite_trips = list(ts.find_entries(["analysis/composite_trip"], None))
print("Retrieved object is of length %s" % len(composite_trips))
json.dump(composite_trips, open(args.file_name, "w"), indent=4, default=bju.default)
json.dump(composite_trips, open(args.file_name, "w"), indent=4, default=esj.wrapped_default)

if __name__ == '__main__':
parser = argparse.ArgumentParser(prog="save_ground_truth")
Expand Down
4 changes: 2 additions & 2 deletions bin/debug/simulate_server_to_phone.py
Expand Up @@ -13,7 +13,7 @@
import uuid
import datetime as pydt
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj

import emission.net.api.usercache as enau

Expand All @@ -24,7 +24,7 @@ def save_server_to_phone(user_id_str, file_name):
# TODO: Convert to call to get_timeseries once we get that working
# Or should we even do that?
retVal = enau.sync_server_to_phone(uuid.UUID(user_id_str))
json.dump(retVal, open(file_name, "w"), default=bju.default, allow_nan=False, indent=4)
json.dump(retVal, open(file_name, "w"), default=esj.wrapped_default, allow_nan=False, indent=4)

if __name__ == '__main__':
if len(sys.argv) != 3:
Expand Down
2 changes: 1 addition & 1 deletion bin/monitor/check_participant_status.py
Expand Up @@ -18,5 +18,5 @@
print(f"For {ue['user_email']}: Trip count = {trip_count}, location count = {location_count}, first trip = {first_trip_time}, last trip = {last_trip_time}, confirmed_pct ({month_ago} -> {now}) = exactly {confirmed_pct:.2f}")
else:
confirmed_count = edb.get_analysis_timeseries_db().count_documents({"user_id": ue["uuid"], "metadata.key": "analysis/confirmed_trip", "data.user_input": {"$ne": {}}})
confirmed_pct = confirmed_count / trip_count
confirmed_pct = confirmed_count / trip_count if trip_count != 0 else 0
print(f"For {ue['user_email']}: Trip count = {trip_count}, location count = {location_count}, first trip = {first_trip_time}, last trip = {last_trip_time}, confirmed_pct = approximately {confirmed_pct:.2f}")
1 change: 0 additions & 1 deletion bin/monitor/find_active_users.py
Expand Up @@ -2,7 +2,6 @@
import logging
import json
import argparse
import bson.json_util as bju
from uuid import UUID
import emission.core.get_database as edb

Expand Down
4 changes: 2 additions & 2 deletions bin/public/extract_uuids_from_email_list.py
Expand Up @@ -15,7 +15,7 @@
import gzip
import json
import argparse
import bson.json_util as bju
import emission.storage.json_wrappers as esj

import emission.core.wrapper.user as ecwu

Expand Down Expand Up @@ -46,4 +46,4 @@
uuids.append(uuid)

uuid_strs = [{"uuid": u} for u in uuids]
json.dump(uuid_strs, out_fd, default=bju.default)
json.dump(uuid_strs, out_fd, default=esj.wrapped_default)
4 changes: 2 additions & 2 deletions bin/public/request_public_data.py
Expand Up @@ -11,7 +11,7 @@
import arrow
from uuid import UUID
import json
import bson.json_util as bju
import emission.storage.json_wrappers as esj

# This script pulls public data from the server and then loads it to a local server
parser = argparse.ArgumentParser(prog="request_public_data")
Expand Down Expand Up @@ -66,4 +66,4 @@
else:
assert(args.output_file is not None)
with open(args.output_file, "w") as fp:
json.dump(entries, fp, default=bju.default, allow_nan=False, indent=4)
json.dump(entries, fp, default=esj.wrapped_default, allow_nan=False, indent=4)
2 changes: 1 addition & 1 deletion bin/reset_pipeline.py
Expand Up @@ -90,7 +90,7 @@ def _email_2_user_list(email_list):
# Handle the second row in the table
day_dt = arrow.get(args.date, "YYYY-MM-DD")
logging.debug("day_dt is %s" % day_dt)
day_ts = day_dt.timestamp
day_ts = day_dt.timestamp()
logging.debug("day_ts is %s" % day_ts)
user_list = _get_user_list(args)
logging.info("received list with %s users" % user_list)
Expand Down

0 comments on commit 39f2944

Please sign in to comment.