Skip to content

Commit

Permalink
To test a PR of the autorope/donkeycar repo:
Browse files Browse the repository at this point in the history
https://github.com/autorope/donkeycar/pull/782/files
(Add datastore metadata function and test autorope#782)
  • Loading branch information
Billy Cheung committed May 27, 2021
1 parent 5dcc43a commit 76c7874
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 72 deletions.
74 changes: 74 additions & 0 deletions donkeycar/management/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,79 @@ def run(self, args):
else:
print("Unrecognized framework: {}. Please specify one of 'tensorflow' or 'pytorch'".format(framework))

# train remotely =====================================================================================================
import requests
import tempfile
import tarfile
from pathlib import Path
from requests_toolbelt.multipart.encoder import MultipartEncoder

class TrainRemote(BaseCommand):

@staticmethod
def generate_tub_archive(self, tub_paths, carapp_path):
print("generating tub archive")
f = tempfile.NamedTemporaryFile(mode='w+b', suffix='.tar.gz', delete=False)

with tarfile.open(fileobj=f, mode='w:gz') as tar:
for tub_path in tub_paths:
p = Path(tub_path)
tar.add(p, arcname=p.name)
tar.add(f"{carapp_path}/myconfig.py", arcname="myconfig.py")
f.close()

return f.name

@staticmethod
def submit_train_job(self, carapp_path, submit_job_url, tub_paths):
filename = self.generate_tub_archive(tub_paths, carapp_path)
deviceID = "device_id"
hostname = "hostname"

mp_encoder = MultipartEncoder(
fields={
'device_id': deviceID,
'hostname': hostname,
'tub_archive_file': ('file.tar.gz', open(filename, 'rb'), 'application/gzip'),
}
)

r = requests.post(
submit_job_url,
data=mp_encoder, # The MultipartEncoder is posted as data, don't use files=...!
# The MultipartEncoder provides the content-type header with the boundary:
headers={'Content-Type': mp_encoder.content_type}
)

if (r.status_code == 200):
# if HTTP 200 OK
if ("job_uuid" in r.json()):
try:
print(r.json()['job_uuid'])
except Exception as e:
print(e)
raise Exception("Failed to call submit job")
else:
raise Exception("Failed to call submit job")
else:
raise Exception("Failed to call submit job")

def parse_args(self, args):
parser = argparse.ArgumentParser(prog='train', usage='%(prog)s [options]')
parser.add_argument('--tub', nargs='+', help='tub data for training')
parser.add_argument('--server', default=None, help='url of the training server')
parser.add_argument('--carpath', default='.', help='path of mycar folder')
parsed_args = parser.parse_args(args)
return parsed_args

def run(self, args):
args = self.parse_args(args)
cfg = load_config(args.config)
if args.server :
self.submit_train_job(args.carpath, args.server, args.tub)
else:
self.submit_train_job(args.carpath, "https://hq.robocarstore.com/train/submit_job", args.tub)
# =====================================================================================================

def execute_from_command_line():
"""
Expand All @@ -469,6 +542,7 @@ def execute_from_command_line():
'cnnactivations': ShowCnnActivations,
'update': UpdateCar,
'train': Train,
'trainremote': TrainRemote,
}

args = sys.argv[:]
Expand Down
168 changes: 113 additions & 55 deletions donkeycar/parts/datastore_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@


class Seekable(object):
'''
A seekable file reader, writer which deals with newline delimited records. \n
This reader maintains an index of line lengths, so seeking a line is a O(1) operation.
'''
"""
A seekable file reader, writer which deals with newline delimited
records. \n
This reader maintains an index of line lengths, so seeking a line is a
O(1) operation.
"""

def __init__(self, file, read_only=False, line_lengths=list()):
self.line_lengths = list()
self.cumulative_lengths = list()
self.method = 'r' if read_only else 'a+'
self.file = open(file, self.method, newline=NEWLINE)
# If file is read only improve performance by memory mapping the file.
if self.method == 'r':
# If file is read only improve performance by memory mappping the file.
self.file = mmap.mmap(self.file.fileno(), length=0, access=mmap.ACCESS_READ)
self.file = mmap.mmap(self.file.fileno(), length=0,
access=mmap.ACCESS_READ)
self.total_length = 0
if len(line_lengths) <= 0:
self._read_contents()
Expand Down Expand Up @@ -74,7 +77,8 @@ def _line_end_offset(self, line_number):

def _offset_until(self, line_index):
end_index = line_index - 1
return self.cumulative_lengths[end_index] if end_index >= 0 and end_index < len(self.cumulative_lengths) else 0
return self.cumulative_lengths[end_index] \
if 0 <= end_index < len(self.cumulative_lengths) else 0

def readline(self):
contents = self.file.readline()
Expand All @@ -92,7 +96,8 @@ def seek_end_of_file(self):
def truncate_until_end(self, line_number):
self.line_lengths = self.line_lengths[:line_number]
self.cumulative_lengths = self.cumulative_lengths[:line_number]
self.total_length = self.cumulative_lengths[-1] if len(self.cumulative_lengths) > 0 else 0
self.total_length = self.cumulative_lengths[-1] \
if len(self.cumulative_lengths) > 0 else 0
self.seek_end_of_file()
self.file.truncate()

Expand Down Expand Up @@ -133,15 +138,18 @@ def __exit__(self, type, value, traceback):
class Catalog(object):
'''
A new line delimited file that has records delimited by newlines. \n
[ json object record ] \n
[ json object record ] \n
...
'''
def __init__(self, path, read_only=False, start_index=0):
self.path = Path(os.path.expanduser(path))
self.manifest = CatalogMetadata(self.path, read_only=read_only, start_index=start_index)
self.seekable = Seekable(self.path.as_posix(), line_lengths=self.manifest.line_lengths(), read_only=read_only)
self.manifest = CatalogMetadata(self.path,
read_only=read_only,
start_index=start_index)
self.seekable = Seekable(self.path.as_posix(),
line_lengths=self.manifest.line_lengths(),
read_only=read_only)

def _exit_handler(self):
self.close()
Expand All @@ -164,8 +172,9 @@ class CatalogMetadata(object):
'''
def __init__(self, catalog_path, read_only=False, start_index=0):
path = Path(catalog_path)
manifest_name = '%s.catalog_manifest' % (path.stem)
self.manifest_path = Path(os.path.join(path.parent.as_posix(), manifest_name))
manifest_name = f'{path.stem}.catalog_manifest'
self.manifest_path = Path(os.path.join(path.parent.as_posix(),
manifest_name))
self.seekeable = Seekable(self.manifest_path, read_only=read_only)
has_contents = False
if os.path.exists(self.manifest_path) and self.seekeable.has_content():
Expand Down Expand Up @@ -207,7 +216,6 @@ def close(self):
class Manifest(object):
'''
A newline delimited file, with the following format.
[ json array of inputs ]\n
[ json array of types ]\n
[ json object with user metadata ]\n
Expand All @@ -230,50 +238,75 @@ def __init__(self, base_path, inputs=[], types=[], metadata=[],
self.catalog_paths = list()
self.catalog_metadata = dict()
self.deleted_indexes = set()
self._updated_session = False
has_catalogs = False

if self.manifest_path.exists():
self.seekeable = Seekable(self.manifest_path, read_only=self.read_only)
if self.seekeable.has_content():
self._read_contents()
has_catalogs = len(self.catalog_paths) > 0

else:
created_at = time.time()
self.manifest_metadata['created_at'] = created_at
if not self.base_path.exists():
self.base_path.mkdir(parents=True, exist_ok=True)
print('Created a new datastore at %s' % (self.base_path.as_posix()))
print(f'Created a new datastore at {self.base_path.as_posix()}')
self.seekeable = Seekable(self.manifest_path, read_only=self.read_only)

if not has_catalogs:
self._write_contents()
self._add_catalog()
else:
last_known_catalog = os.path.join(self.base_path, self.catalog_paths[-1])
print('Using catalog %s' % (last_known_catalog))
self.current_catalog = Catalog(last_known_catalog, read_only=self.read_only, start_index=self.current_index)
last_known_catalog = os.path.join(self.base_path,
self.catalog_paths[-1])
print(f'Using catalog {last_known_catalog}')
self.current_catalog = Catalog(last_known_catalog,
read_only=self.read_only,
start_index=self.current_index)
# Create a new session_id, which will be added to each record in the
# tub, when Tub.write_record() is called.
self.session_id = self.create_new_session()

def write_record(self, record):
new_catalog = self.current_index > 0 and (self.current_index % self.max_len) == 0
new_catalog = self.current_index > 0 \
and (self.current_index % self.max_len) == 0
if new_catalog:
self._add_catalog()

self.current_catalog.write_record(record)
self.current_index += 1
# Update metadata to keep track of the last index
self._update_catalog_metadata(update=True)
# Set session_id update status to True if this method is called at
# least once. Then session id metadata will be updated when the
# session gets closed
if not self._updated_session:
self._updated_session = True

def delete_record(self, record_index):
# Does not actually delete the record, but marks it as deleted.
self.deleted_indexes.add(record_index)
self._update_catalog_metadata(update=True)

def update_metadata(self, metadata):
self.metadata = {**self.metadata, **metadata}
self._write_contents()

def restore_record(self, record_index):
# Does not actually delete the record, but marks it as deleted.
self.deleted_indexes.discard(record_index)
self._update_catalog_metadata(update=True)

def _add_catalog(self):
current_length = len(self.catalog_paths)
catalog_name = 'catalog_%s.catalog' % (current_length)
catalog_name = f'catalog_{current_length}.catalog'
catalog_path = os.path.join(self.base_path, catalog_name)
current_catalog = self.current_catalog
self.current_catalog = Catalog(catalog_path, start_index=self.current_index, read_only=self.read_only)
self.current_catalog = Catalog(catalog_path,
start_index=self.current_index,
read_only=self.read_only)
# Store relative paths
self.catalog_paths.append(catalog_name)
self._update_catalog_metadata(update=True)
Expand Down Expand Up @@ -318,7 +351,30 @@ def _update_catalog_metadata(self, update=True):
self.catalog_metadata = catalog_metadata
self.seekeable.writeline(json.dumps(catalog_metadata))

def create_new_session(self):
""" Creates a new session id and appends it to the metadata."""
sessions = self.manifest_metadata.get('sessions', {})
last_id = -1
if sessions:
last_id = sessions['last_id']
else:
sessions['all_full_ids'] = []
this_id = last_id + 1
date = time.strftime('%y-%m-%d')
this_full_id = date + '_' + str(this_id)
sessions['last_id'] = this_id
sessions['last_full_id'] = this_full_id
sessions['all_full_ids'].append(this_full_id)
self.manifest_metadata['sessions'] = sessions
return this_full_id

def close(self):
""" Closing tub closes open files for catalog, catalog manifest and
manifest.json"""
# If records were received, write updated session_id dictionary into
# the metadata, otherwise keep the session_id information unchanged
if self._updated_session:
self.seekeable.update_line(4, json.dumps(self.manifest_metadata))
self.current_catalog.close()
self.seekeable.close()

Expand All @@ -331,11 +387,10 @@ def __len__(self):


class ManifestIterator(object):
'''
"""
An iterator for the Manifest type. \n
Returns catalog entries lazily when a consumer calls __next__().
'''
"""
def __init__(self, manifest):
self.manifest = manifest
self.has_catalogs = len(self.manifest.catalog_paths) > 0
Expand All @@ -344,39 +399,42 @@ def __init__(self, manifest):
self.current_catalog = None

def __next__(self):
if not self.has_catalogs:
raise StopIteration('No catalogs')

if self.current_catalog_index >= len(self.manifest.catalog_paths):
raise StopIteration('No more catalogs')

if self.current_catalog is None:
current_catalog_path = os.path.join(self.manifest.base_path, self.manifest.catalog_paths[self.current_catalog_index])
self.current_catalog = Catalog(current_catalog_path, read_only=self.manifest.read_only)
self.current_catalog.seekable.seek_line_start(1)

contents = self.current_catalog.seekable.readline()

if contents is not None and len(contents) > 0:
# Check for current_index when we are ready to advance the underlying iterator.
current_index = self.current_index
self.current_index += 1
if current_index in self.manifest.deleted_indexes:
# Skip over index, because it has been marked deleted
return self.__next__()
while True:
if not self.has_catalogs:
raise StopIteration('No catalogs')

if self.current_catalog_index >= len(self.manifest.catalog_paths):
raise StopIteration('No more catalogs')

if self.current_catalog is None:
current_catalog_path = os.path.join(
self.manifest.base_path,
self.manifest.catalog_paths[self.current_catalog_index])
self.current_catalog = Catalog(current_catalog_path,
read_only=self.manifest.read_only)
self.current_catalog.seekable.seek_line_start(1)

contents = self.current_catalog.seekable.readline()
if contents is not None and len(contents) > 0:
# Check for current_index when we are ready to advance the
# underlying iterator.
current_index = self.current_index
self.current_index += 1
if current_index in self.manifest.deleted_indexes:
# Skip over index, because it has been marked deleted
continue
else:
try:
record = json.loads(contents)
return record
except Exception:
print(f'Ignoring record at index {current_index}')
continue
else:
try:
record = json.loads(contents)
return record
except Exception:
print('Ignoring record at index %s' % (current_index))
return self.__next__()
else:
self.current_catalog = None
self.current_catalog_index += 1
return self.__next__()
self.current_catalog = None
self.current_catalog_index += 1

next = __next__

def __len__(self):
return self.manifest.__len__()
return self.manifest.__len__()
Loading

0 comments on commit 76c7874

Please sign in to comment.