upload_docs.py 7.14 KB
Newer Older
Sartika Aritonang committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
# -*- coding: utf-8 -*-
"""upload_docs

Implements a Distutils 'upload_docs' subcommand (upload documentation to
PyPI's pythonhosted.org).
"""

from base64 import standard_b64encode
from distutils import log
from distutils.errors import DistutilsOptionError
import os
import socket
import zipfile
import tempfile
import shutil
import itertools
import functools

from setuptools.extern import six
from setuptools.extern.six.moves import http_client, urllib

from pkg_resources import iter_entry_points
from .upload import upload


def _encode(s):
    errors = 'surrogateescape' if six.PY3 else 'strict'
    return s.encode('utf-8', errors)


class upload_docs(upload):
    # override the default repository as upload_docs isn't
    # supported by Warehouse (and won't be).
    DEFAULT_REPOSITORY = 'https://pypi.python.org/pypi/'

    description = 'Upload documentation to PyPI'

    user_options = [
        ('repository=', 'r',
         "url of repository [default: %s]" % upload.DEFAULT_REPOSITORY),
        ('show-response', None,
         'display full response text from server'),
        ('upload-dir=', None, 'directory to upload'),
    ]
    boolean_options = upload.boolean_options

    def has_sphinx(self):
        if self.upload_dir is None:
            for ep in iter_entry_points('distutils.commands', 'build_sphinx'):
                return True

    sub_commands = [('build_sphinx', has_sphinx)]

    def initialize_options(self):
        upload.initialize_options(self)
        self.upload_dir = None
        self.target_dir = None

    def finalize_options(self):
        upload.finalize_options(self)
        if self.upload_dir is None:
            if self.has_sphinx():
                build_sphinx = self.get_finalized_command('build_sphinx')
                self.target_dir = build_sphinx.builder_target_dir
            else:
                build = self.get_finalized_command('build')
                self.target_dir = os.path.join(build.build_base, 'docs')
        else:
            self.ensure_dirname('upload_dir')
            self.target_dir = self.upload_dir
        if 'pypi.python.org' in self.repository:
            log.warn("Upload_docs command is deprecated. Use RTD instead.")
        self.announce('Using upload directory %s' % self.target_dir)

    def create_zipfile(self, filename):
        zip_file = zipfile.ZipFile(filename, "w")
        try:
            self.mkpath(self.target_dir)  # just in case
            for root, dirs, files in os.walk(self.target_dir):
                if root == self.target_dir and not files:
                    tmpl = "no files found in upload directory '%s'"
                    raise DistutilsOptionError(tmpl % self.target_dir)
                for name in files:
                    full = os.path.join(root, name)
                    relative = root[len(self.target_dir):].lstrip(os.path.sep)
                    dest = os.path.join(relative, name)
                    zip_file.write(full, dest)
        finally:
            zip_file.close()

    def run(self):
        # Run sub commands
        for cmd_name in self.get_sub_commands():
            self.run_command(cmd_name)

        tmp_dir = tempfile.mkdtemp()
        name = self.distribution.metadata.get_name()
        zip_file = os.path.join(tmp_dir, "%s.zip" % name)
        try:
            self.create_zipfile(zip_file)
            self.upload_file(zip_file)
        finally:
            shutil.rmtree(tmp_dir)

    @staticmethod
    def _build_part(item, sep_boundary):
        key, values = item
        title = '\nContent-Disposition: form-data; name="%s"' % key
        # handle multiple entries for the same name
        if not isinstance(values, list):
            values = [values]
        for value in values:
            if isinstance(value, tuple):
                title += '; filename="%s"' % value[0]
                value = value[1]
            else:
                value = _encode(value)
            yield sep_boundary
            yield _encode(title)
            yield b"\n\n"
            yield value
            if value and value[-1:] == b'\r':
                yield b'\n'  # write an extra newline (lurve Macs)

    @classmethod
    def _build_multipart(cls, data):
        """
        Build up the MIME payload for the POST data
        """
        boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
        sep_boundary = b'\n--' + boundary
        end_boundary = sep_boundary + b'--'
        end_items = end_boundary, b"\n",
        builder = functools.partial(
            cls._build_part,
            sep_boundary=sep_boundary,
        )
        part_groups = map(builder, data.items())
        parts = itertools.chain.from_iterable(part_groups)
        body_items = itertools.chain(parts, end_items)
        content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii')
        return b''.join(body_items), content_type

    def upload_file(self, filename):
        with open(filename, 'rb') as f:
            content = f.read()
        meta = self.distribution.metadata
        data = {
            ':action': 'doc_upload',
            'name': meta.get_name(),
            'content': (os.path.basename(filename), content),
        }
        # set up the authentication
        credentials = _encode(self.username + ':' + self.password)
        credentials = standard_b64encode(credentials)
        if six.PY3:
            credentials = credentials.decode('ascii')
        auth = "Basic " + credentials

        body, ct = self._build_multipart(data)

        msg = "Submitting documentation to %s" % (self.repository)
        self.announce(msg, log.INFO)

        # build the Request
        # We can't use urllib2 since we need to send the Basic
        # auth right with the first request
        schema, netloc, url, params, query, fragments = \
            urllib.parse.urlparse(self.repository)
        assert not params and not query and not fragments
        if schema == 'http':
            conn = http_client.HTTPConnection(netloc)
        elif schema == 'https':
            conn = http_client.HTTPSConnection(netloc)
        else:
            raise AssertionError("unsupported schema " + schema)

        data = ''
        try:
            conn.connect()
            conn.putrequest("POST", url)
            content_type = ct
            conn.putheader('Content-type', content_type)
            conn.putheader('Content-length', str(len(body)))
            conn.putheader('Authorization', auth)
            conn.endheaders()
            conn.send(body)
        except socket.error as e:
            self.announce(str(e), log.ERROR)
            return

        r = conn.getresponse()
        if r.status == 200:
            msg = 'Server response (%s): %s' % (r.status, r.reason)
            self.announce(msg, log.INFO)
        elif r.status == 301:
            location = r.getheader('Location')
            if location is None:
                location = 'https://pythonhosted.org/%s/' % meta.get_name()
            msg = 'Upload successful. Visit %s' % location
            self.announce(msg, log.INFO)
        else:
            msg = 'Upload failed (%s): %s' % (r.status, r.reason)
            self.announce(msg, log.ERROR)
        if self.show_response:
            print('-' * 75, r.read(), '-' * 75)