Skip to main content

Downloading Data from AWS

Updated over a week ago

This document provides instructions on how to download data from Mobito.

For each use case, two examples are provided:

  • Using AWS CLI

  • Using Python’s Boto3 library

Installation

Before executing any commands, ensure you have installed your chosen tool or library. Use the links below for installation:

Downloading files

Using the AWS CLI

To authenticate and download data from S3, follow these steps:

  1. Configure AWS CLI: Run the following command in your terminal and enter your access key, secret key, default region, and output format:

    aws configure

  2. Download Files: After configuring AWS CLI, use the following command to download all files from the specified S3 path:

    aws s3 sync s3://example-data-bucket/folder-1/ /local/path/to/save/files

    1. Replace “example-data-bucket/folder-1/” with the S3 path provided by Mobito in the AWS S3 Bucket Access Details document

    2. Replace "/local/path/to/save/files" with the local directory where you want to save the downloaded files.

This command uses the aws s3 sync to synchronize the specified S3 bucket path with your local directory, ensuring all files are downloaded efficiently.

Using Boto3 for python

Before running the script, ensure Boto3 is installed. If not, install it using:

pip install boto3

Once installed, you can run the following script:

import boto3
import os
from botocore.exceptions import NoCredentialsError
import time
import json
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

def download_file(s3_client, bucket_name, key, local_file_path):
"""Download a single file """
try:
local_dir = os.path.dirname(local_file_path)
os.makedirs(local_dir, exist_ok=True)

# print(f"Starting download: {key}")

# Download file synchronously (but thread will handle concurrency)
s3_client.download_file(bucket_name, key, local_file_path)

# print(f"Completed download: {key}")
return key

except Exception as e:
print(f"Error downloading {key}: {e}")
return None

def download_files_from_s3(access_key, secret_key, bucket_name, s3_prefix, local_directory, max_concurrent):
"""Download files from S3 using ThreadPoolExecutor"""

# Create S3 client
s3_client = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
)

# Initialize results tracking for JSON output
download_results = {
'metadata': {
'start_time': datetime.now().isoformat(),
's3_prefix': s3_prefix,
'local_directory': local_directory,
'max_concurrent_workers': None # Will be set when executor is created
},
'download_futures': [],
'summary': {}
}

try:
print(f"Starting to list objects with prefix: {s3_prefix}")

# First, get all objects using paginator
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=s3_prefix)

all_objects = []
total_files = 0

# Collect all objects first (excluding folders)
for page in page_iterator:
page_objects = page.get('Contents', [])
# page_count = len(page_objects)

# Filter out folders/prefixes before counting
file_objects = [obj for obj in page_objects if not obj['Key'].endswith('/')]
file_count = len(file_objects)

# if page_count > 0:
# print(f"Found page with {page_count} objects ({file_count} files, {page_count - file_count} folders)")

# Add only file objects to our list for concurrent processing
all_objects.extend(file_objects)
total_files += file_count

print(f"Total objects to download: {total_files}")

if total_files == 0:
print("No files found to download.")
return

# Create ThreadPoolExecutor
start_time = time.time()

results = []
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
download_results['metadata']['max_concurrent_workers'] = executor._max_workers
print(f"Starting downloads with {executor._max_workers} max threads...")

# Submit all download tasks and track future details
future_to_key = {}
key_to_future_info = {} # O(1) lookup dictionary

for i, obj in enumerate(all_objects):
key = obj['Key']
local_file_path = os.path.join(local_directory, key)

future = executor.submit(download_file, s3_client, bucket_name, key, local_file_path)
future_to_key[future] = key

# Track future details with file metadata
future_info = {
'future_id': i + 1,
'key': key,
'local_file_path': local_file_path,
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat() if 'LastModified' in obj else None,
'etag': obj['ETag'],
'submitted_at': datetime.now().isoformat()
}
download_results['download_futures'].append(future_info)
key_to_future_info[key] = future_info # Store for O(1) lookup

# Collect results as they complete
completed_count = 0
for future in as_completed(future_to_key):
key = future_to_key[future]
completion_time = datetime.now().isoformat()

try:
result = future.result()
results.append(result)
status = "success"
error_message = None
except Exception as exc:
print(f"Download generated an exception for {key}: {exc}")
results.append(None)
status = "failed"
error_message = str(exc)

# Update the future info with completion data (O(1) lookup)
future_info = key_to_future_info[key]
future_info.update({
'completed_at': completion_time,
'status': status,
'error_message': error_message,
'result': result if status == "success" else None
})

completed_count += 1
# print(f"Completed {completed_count}/{total_files}: {key} [{status}]")

end_time = time.time()
duration = end_time - start_time

# Count successful downloads
successful_downloads = sum(1 for result in results if result is not None)
failed_downloads = total_files - successful_downloads

# Update summary
download_results['summary'] = {
'total_files': total_files,
'successful_downloads': successful_downloads,
'failed_downloads': failed_downloads,
'duration_seconds': duration,
'end_time': datetime.now().isoformat()
}

print("\nDownload Summary:")
print(f"Total files found: {total_files}")
print(f"Successfully downloaded: {successful_downloads}")
print(f"Failed downloads: {failed_downloads}")
print(f"Total time: {duration:.2f} seconds")

# Save results to JSON file
output_file = f"download_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump(download_results, f, indent=2, default=str)

print(f"\nDownload report saved to: {output_file}")

except NoCredentialsError:
print("Credentials not available or not valid.")
except Exception as e:
print(f"An error occurred: {e}")

def run_download():
"""Run S3 download function"""
# Replace with your AWS credentials and other details
aws_access_key = "<ACCESS-KEY-ID>"
aws_secret_key = "<SECRET-KEY>"
bucket_name = "example-data-bucket"
s3_prefix = "folder-1"
local_directory = "/local/path/to/save/files"

# Max Threads
# if None, it will default to min(32, os.cpu_count() + 4)
max_concurrent = None

print("Starting S3 download...")

# Run the threaded function
download_files_from_s3(
aws_access_key,
aws_secret_key,
bucket_name,
s3_prefix,
local_directory,
max_concurrent
)

if __name__ == "__main__":
run_download()

Did this answer your question?