leancloud.file_ 源代码

# coding: utf-8

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import re
import io
import hashlib
import logging
import threading

import six
import requests

import leancloud
from leancloud import client
from leancloud import utils
from leancloud.errors import LeanCloudError

__author__ = "asaka <lan@leancloud.rocks>"

logger = logging.getLogger(__name__)


DEFAULT_TIMEOUT = 30


[文档]class File(object): _class_name = "_File" # walks like a leancloud.Object def __init__(self, name="", data=None, mime_type=None): self._name = name self.key = None self.id = None self._url = None self._acl = None self.current_user = leancloud.User.get_current() self.timeout = 30 self._metadata = {"owner": "unknown"} if ( self.current_user and self.current_user != None ): # NOQA: self.current_user may be a thread_local object self._metadata["owner"] = self.current_user.id pattern = re.compile(r"\.([^.]*)$") extension = pattern.findall(name) if extension: self.extension = extension[0].lower() else: self.extension = "" self._mime_type = mime_type if data is None: self._source = None return try: data.read data.tell data.seek(0, os.SEEK_END) data.seek(0, os.SEEK_SET) except Exception: if (six.PY3 and isinstance(data, (memoryview, bytes))) or ( six.PY2 and isinstance(data, (buffer, memoryview, str)) # noqa: F821 ): data = io.BytesIO(data) elif data.read: data = io.BytesIO(data.read()) else: raise TypeError( "Do not know how to handle data, accepts file like object or bytes" ) data.seek(0, os.SEEK_SET) checksum = hashlib.md5() while True: chunk = data.read(4096) if not chunk: break try: checksum.update(chunk) except TypeError: checksum.update(chunk.encode("utf-8")) self._metadata["_checksum"] = checksum.hexdigest() self._metadata["size"] = data.tell() # 3.5MB, 1Mbps * 30s # increase timeout if self._metadata["size"] > 3750000: self.timeout = self.timeout * int(self._metadata["size"] / 3750000) data.seek(0, os.SEEK_SET) self._source = data @utils.classproperty def query(self): return leancloud.Query(self)
[文档] @classmethod def create_with_url(cls, name, url, meta_data=None, mime_type=None): f = File(name, None, mime_type) if meta_data: f._metadata.update(meta_data) if isinstance(url, six.string_types): f._url = url else: raise ValueError("url must be a str / unicode") f._metadata["__source"] = "external" return f
[文档] @classmethod def create_without_data(cls, object_id): f = File("") f.id = object_id return f
[文档] def get_acl(self): return self._acl
[文档] def set_acl(self, acl): if not isinstance(acl, leancloud.ACL): raise TypeError("acl must be a leancloud.ACL instance") self._acl = acl
@property def name(self): return self._name @property def url(self): return self._url @property def mime_type(self): return self._mime_type @mime_type.setter def set_mime_type(self, mime_type): self._mime_type = mime_type @property def size(self): return self._metadata["size"] @property def owner_id(self): return self._metadata["owner"] @property def metadata(self): return self._metadata
[文档] def get_thumbnail_url( self, width, height, quality=100, scale_to_fit=True, fmt="png" ): if not self._url: raise ValueError("invalid url") if width < 0 or height < 0: raise ValueError("invalid height or width params") if quality > 100 or quality <= 0: raise ValueError("quality must between 0 and 100") mode = 2 if scale_to_fit else 1 return self.url + "?imageView/{0}/w/{1}/h/{2}/q/{3}/format/{4}".format( mode, width, height, quality, fmt )
[文档] def destroy(self): if not self.id: return False response = client.delete("/files/{0}".format(self.id)) if response.status_code != 200: raise LeanCloudError(1, "the file is not sucessfully destroyed")
def _save_to_qiniu_internal_py3(self, token, key): from qiniu.services.storage.uploader import crc32, _form_put from qiniu.config import _BLOCK_SIZE final_data = "" while True: tmp_data = self._source.read(_BLOCK_SIZE) if len(tmp_data) == 0: break elif len(final_data) == 0: final_data = tmp_data else: final_data += tmp_data crc = crc32(final_data) return _form_put(token, key, final_data, None, self.mime_type, crc) def _save_to_qiniu(self, token, key): self._source.seek(0) import qiniu qiniu.set_default(connection_timeout=self.timeout) if six.PY3: # use patched put_data implementation for py3k ret, info = self._save_to_qiniu_internal_py3(token, key) else: # use put_data implementation provided by qiniu-sdk ret, info = qiniu.put_data(token, key, self._source) self._source.seek(0) if info.status_code != 200: self._save_callback(token, False) raise LeanCloudError( 1, "the file is not saved, qiniu status code: {0}".format( info.status_code ), ) self._save_callback(token, True) def _save_to_s3(self, token, upload_url): self._source.seek(0) response = requests.put( upload_url, data=self._source, headers={"Content-Type": self.mime_type} ) if response.status_code != 200: self._save_callback(token, False) raise LeanCloudError(1, "The file is not successfully saved to S3") self._source.seek(0) self._save_callback(token, True) def _save_external(self): data = { "name": self._name, "ACL": self._acl, "metaData": self._metadata, "mime_type": self.mime_type, "url": self._url, } response = client.post("/files".format(self._name), data) content = response.json() self.id = content["objectId"] def _save_to_qcloud(self, token, upload_url): headers = { "Authorization": token, } self._source.seek(0) data = { "op": "upload", "filecontent": self._source.read(), } response = requests.post(upload_url, headers=headers, files=data) self._source.seek(0) info = response.json() if info["code"] != 0: self._save_callback(token, False) raise LeanCloudError( 1, "this file is not saved, qcloud cos status code: {}".format( info["code"] ), ) self._save_callback(token, True) def _save_callback(self, token, successed): if not token: return def f(): try: client.post("/fileCallback", {"token": token, "result": successed}) except LeanCloudError as e: logger.warning("call file callback failed, error: %s", e) threading.Thread(target=f).start()
[文档] def save(self): if self._url and self.metadata.get("__source") == "external": self._save_external() elif not self._source: pass else: content = self._get_file_token() self._mime_type = content["mime_type"] if content["provider"] == "qiniu": self._save_to_qiniu(content["token"], content["key"]) elif content["provider"] == "qcloud": self._save_to_qcloud(content["token"], content["upload_url"]) elif content["provider"] == "s3": self._save_to_s3(content.get("token"), content["upload_url"]) else: raise RuntimeError("The provider field in the fetched content is empty") self._update_data(content)
def _update_data(self, server_data): if "objectId" in server_data: self.id = server_data.get("objectId") if "name" in server_data: self._name = server_data.get("name") if "url" in server_data: self._url = server_data.get("url") if "mime_type" in server_data: self._mime_type = server_data["mime_type"] if "metaData" in server_data: self._metadata = server_data.get("metaData") def _get_file_token(self): data = { "name": self._name, "ACL": self._acl, "mime_type": self.mime_type, "metaData": self._metadata, } if self.key is not None: data["key"] = self.key response = client.post("/fileTokens", data) content = response.json() self.id = content["objectId"] self._url = content["url"] self.key = content["key"] return content
[文档] def fetch(self): response = client.get("/files/{0}".format(self.id)) content = response.json() self._update_data(content)