我想知道boto3中是否存在一个键。我可以循环桶内容并检查键是否匹配。

但这似乎太长了,也太过分了。Boto3官方文档明确说明了如何做到这一点。

也许我忽略了最明显的一点。有人能告诉我怎么做吗?


当前回答

FWIW,这里是我正在使用的非常简单的函数

import boto3

def get_resource(config: dict={}):
    """Loads the s3 resource.

    Expects AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to be in the environment
    or in a config dictionary.
    Looks in the environment first."""

    s3 = boto3.resource('s3',
                        aws_access_key_id=os.environ.get(
                            "AWS_ACCESS_KEY_ID", config.get("AWS_ACCESS_KEY_ID")),
                        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", config.get("AWS_SECRET_ACCESS_KEY")))
    return s3


def get_bucket(s3, s3_uri: str):
    """Get the bucket from the resource.
    A thin wrapper, use with caution.

    Example usage:

    >> bucket = get_bucket(get_resource(), s3_uri_prod)"""
    return s3.Bucket(s3_uri)


def isfile_s3(bucket, key: str) -> bool:
    """Returns T/F whether the file exists."""
    objs = list(bucket.objects.filter(Prefix=key))
    return len(objs) == 1 and objs[0].key == key


def isdir_s3(bucket, key: str) -> bool:
    """Returns T/F whether the directory exists."""
    objs = list(bucket.objects.filter(Prefix=key))
    return len(objs) > 1

其他回答

FWIW,这里是我正在使用的非常简单的函数

import boto3

def get_resource(config: dict={}):
    """Loads the s3 resource.

    Expects AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to be in the environment
    or in a config dictionary.
    Looks in the environment first."""

    s3 = boto3.resource('s3',
                        aws_access_key_id=os.environ.get(
                            "AWS_ACCESS_KEY_ID", config.get("AWS_ACCESS_KEY_ID")),
                        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", config.get("AWS_SECRET_ACCESS_KEY")))
    return s3


def get_bucket(s3, s3_uri: str):
    """Get the bucket from the resource.
    A thin wrapper, use with caution.

    Example usage:

    >> bucket = get_bucket(get_resource(), s3_uri_prod)"""
    return s3.Bucket(s3_uri)


def isfile_s3(bucket, key: str) -> bool:
    """Returns T/F whether the file exists."""
    objs = list(bucket.objects.filter(Prefix=key))
    return len(objs) == 1 and objs[0].key == key


def isdir_s3(bucket, key: str) -> bool:
    """Returns T/F whether the directory exists."""
    objs = list(bucket.objects.filter(Prefix=key))
    return len(objs) > 1

沿着这条线索,有人能得出结论,哪一种方法是检查S3中是否存在对象的最有效方法吗?

我认为head_object可能会赢,因为它只是检查元数据,比实际对象本身更轻

import boto3
client = boto3.client('s3')
s3_key = 'Your file without bucket name e.g. abc/bcd.txt'
bucket = 'your bucket name'
content = client.head_object(Bucket=bucket,Key=s3_key)
    if content.get('ResponseMetadata',None) is not None:
        print "File exists - s3://%s/%s " %(bucket,s3_key) 
    else:
        print "File does not exist - s3://%s/%s " %(bucket,s3_key)

有一种简单的方法可以检查文件是否存在于S3桶中。我们不需要为此使用异常

sesssion = boto3.Session(aws_access_key_id, aws_secret_access_key)
s3 = session.client('s3')

object_name = 'filename'
bucket = 'bucketname'
obj_status = s3.list_objects(Bucket = bucket, Prefix = object_name)
if obj_status.get('Contents'):
    print("File exists")
else:
    print("File does not exists")
S3_REGION="eu-central-1"
bucket="mybucket1"
name="objectname"

import boto3
from botocore.client import Config
client = boto3.client('s3',region_name=S3_REGION,config=Config(signature_version='s3v4'))
list = client.list_objects_v2(Bucket=bucket,Prefix=name)
for obj in list.get('Contents', []):
    if obj['Key'] == name: return True
return False