This document provides instructions on how to download data from Mobito.
For each use case, two examples are provided:
Using AWS CLI
Using Python’s Boto3 library
Installation
Before executing any commands, ensure you have installed your chosen tool or library. Use the links below for installation:
Downloading files
Using the AWS CLI
To authenticate and download data from S3, follow these steps:
Configure AWS CLI: Run the following command in your terminal and enter your access key, secret key, default region, and output format:
aws configure
Download Files: After configuring AWS CLI, use the following command to download all files from the specified S3 path:
aws s3 sync s3://example-data-bucket/folder-1/ /local/path/to/save/files
Replace “example-data-bucket/folder-1/” with the S3 path provided by Mobito in the AWS S3 Bucket Access Details document
Replace "/local/path/to/save/files" with the local directory where you want to save the downloaded files.
This command uses the aws s3 sync
to synchronize the specified S3 bucket path with your local directory, ensuring all files are downloaded efficiently.
Using Boto3 for python
Before running the script, ensure Boto3 is installed. If not, install it using:
pip install boto3
Once installed, you can run the following script:
import boto3
import os
from botocore.exceptions import NoCredentialsError
import time
import json
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
def download_file(s3_client, bucket_name, key, local_file_path):
"""Download a single file """
try:
local_dir = os.path.dirname(local_file_path)
os.makedirs(local_dir, exist_ok=True)
# print(f"Starting download: {key}")
# Download file synchronously (but thread will handle concurrency)
s3_client.download_file(bucket_name, key, local_file_path)
# print(f"Completed download: {key}")
return key
except Exception as e:
print(f"Error downloading {key}: {e}")
return None
def download_files_from_s3(access_key, secret_key, bucket_name, s3_prefix, local_directory, max_concurrent):
"""Download files from S3 using ThreadPoolExecutor"""
# Create S3 client
s3_client = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
)
# Initialize results tracking for JSON output
download_results = {
'metadata': {
'start_time': datetime.now().isoformat(),
's3_prefix': s3_prefix,
'local_directory': local_directory,
'max_concurrent_workers': None # Will be set when executor is created
},
'download_futures': [],
'summary': {}
}
try:
print(f"Starting to list objects with prefix: {s3_prefix}")
# First, get all objects using paginator
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=s3_prefix)
all_objects = []
total_files = 0
# Collect all objects first (excluding folders)
for page in page_iterator:
page_objects = page.get('Contents', [])
# page_count = len(page_objects)
# Filter out folders/prefixes before counting
file_objects = [obj for obj in page_objects if not obj['Key'].endswith('/')]
file_count = len(file_objects)
# if page_count > 0:
# print(f"Found page with {page_count} objects ({file_count} files, {page_count - file_count} folders)")
# Add only file objects to our list for concurrent processing
all_objects.extend(file_objects)
total_files += file_count
print(f"Total objects to download: {total_files}")
if total_files == 0:
print("No files found to download.")
return
# Create ThreadPoolExecutor
start_time = time.time()
results = []
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
download_results['metadata']['max_concurrent_workers'] = executor._max_workers
print(f"Starting downloads with {executor._max_workers} max threads...")
# Submit all download tasks and track future details
future_to_key = {}
key_to_future_info = {} # O(1) lookup dictionary
for i, obj in enumerate(all_objects):
key = obj['Key']
local_file_path = os.path.join(local_directory, key)
future = executor.submit(download_file, s3_client, bucket_name, key, local_file_path)
future_to_key[future] = key
# Track future details with file metadata
future_info = {
'future_id': i + 1,
'key': key,
'local_file_path': local_file_path,
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat() if 'LastModified' in obj else None,
'etag': obj['ETag'],
'submitted_at': datetime.now().isoformat()
}
download_results['download_futures'].append(future_info)
key_to_future_info[key] = future_info # Store for O(1) lookup
# Collect results as they complete
completed_count = 0
for future in as_completed(future_to_key):
key = future_to_key[future]
completion_time = datetime.now().isoformat()
try:
result = future.result()
results.append(result)
status = "success"
error_message = None
except Exception as exc:
print(f"Download generated an exception for {key}: {exc}")
results.append(None)
status = "failed"
error_message = str(exc)
# Update the future info with completion data (O(1) lookup)
future_info = key_to_future_info[key]
future_info.update({
'completed_at': completion_time,
'status': status,
'error_message': error_message,
'result': result if status == "success" else None
})
completed_count += 1
# print(f"Completed {completed_count}/{total_files}: {key} [{status}]")
end_time = time.time()
duration = end_time - start_time
# Count successful downloads
successful_downloads = sum(1 for result in results if result is not None)
failed_downloads = total_files - successful_downloads
# Update summary
download_results['summary'] = {
'total_files': total_files,
'successful_downloads': successful_downloads,
'failed_downloads': failed_downloads,
'duration_seconds': duration,
'end_time': datetime.now().isoformat()
}
print("\nDownload Summary:")
print(f"Total files found: {total_files}")
print(f"Successfully downloaded: {successful_downloads}")
print(f"Failed downloads: {failed_downloads}")
print(f"Total time: {duration:.2f} seconds")
# Save results to JSON file
output_file = f"download_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump(download_results, f, indent=2, default=str)
print(f"\nDownload report saved to: {output_file}")
except NoCredentialsError:
print("Credentials not available or not valid.")
except Exception as e:
print(f"An error occurred: {e}")
def run_download():
"""Run S3 download function"""
# Replace with your AWS credentials and other details
aws_access_key = "<ACCESS-KEY-ID>"
aws_secret_key = "<SECRET-KEY>"
bucket_name = "example-data-bucket"
s3_prefix = "folder-1"
local_directory = "/local/path/to/save/files"
# Max Threads
# if None, it will default to min(32, os.cpu_count() + 4)
max_concurrent = None
print("Starting S3 download...")
# Run the threaded function
download_files_from_s3(
aws_access_key,
aws_secret_key,
bucket_name,
s3_prefix,
local_directory,
max_concurrent
)
if __name__ == "__main__":
run_download()