Source code for pulsar.managers.util.aws_batch
"""Interface layer for Boto's batch library shared between Galaxy and Pulsar."""
BOTO3_IMPORT_MSG = (
"The Python 'boto3' package is required to use "
"this feature, please install it or correct the "
"following error:\nImportError {msg!s}"
)
try:
import boto3
except ImportError as e:
boto3 = None
BOTO3_IMPORT_MSG.format(msg=str(e))
[docs]
def ensure_client_available() -> None:
if boto3 is None:
raise Exception(BOTO3_IMPORT_MSG)
[docs]
def get_client(access_key_id=None, secret_access_key=None, region_name=None):
session = boto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=region_name,
)
return session.client("batch")
__all__ = (
"ensure_client_available",
"get_client",
)