Zope Subversion Repository |
|
Zope: zc.buildout/trunk/src/zc/buildout/easy_install.py
File:
[Zope] /
zc.buildout /
trunk /
src /
zc /
buildout / easy_install.py
(
download)
(
as text)
Revision:
72218,
Wed Jan 24 23:11:02 2007 UTC (6 years, 3 months ago) by
jim
File size: 22812 byte(s)
Fixed bug:
Explicitly specifying a Python executable failed if the output of
running Python with the -V option included a 2-digit (rather than a
3-digit) version number.
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Python easy_install API
This module provides a high-level Python API for installing packages.
It doesn't install scripts. It uses setuptools and requires it to be
installed.
$Id$
"""
import glob, logging, os, re, shutil, sys, tempfile, urlparse, zipimport
import distutils.errors
import pkg_resources
import setuptools.command.setopt
import setuptools.package_index
import setuptools.archive_util
import zc.buildout
default_index_url = os.environ.get('buildout-testing-index-url')
logger = logging.getLogger('zc.buildout.easy_install')
url_match = re.compile('[a-z0-9+.-]+://').match
setuptools_loc = pkg_resources.working_set.find(
pkg_resources.Requirement.parse('setuptools')
).location
# Include buildout and setuptools eggs in paths
buildout_and_setuptools_path = [
setuptools_loc,
pkg_resources.working_set.find(
pkg_resources.Requirement.parse('zc.buildout')).location,
]
_versions = {sys.executable: '%d.%d' % sys.version_info[:2]}
def _get_version(executable):
try:
return _versions[executable]
except KeyError:
i, o = os.popen4(executable + ' -V')
i.close()
version = o.read().strip()
o.close()
pystring, version = version.split()
assert pystring == 'Python'
version = re.match('(\d[.]\d)([.]\d)?$', version).group(1)
_versions[executable] = version
return version
_indexes = {}
def _get_index(executable, index_url, find_links):
key = executable, index_url, tuple(find_links)
index = _indexes.get(key)
if index is not None:
return index
if index_url is None:
index_url = default_index_url
if index_url is None:
index = setuptools.package_index.PackageIndex(
python=_get_version(executable)
)
else:
index = setuptools.package_index.PackageIndex(
index_url, python=_get_version(executable)
)
if find_links:
index.add_find_links(find_links)
_indexes[key] = index
return index
def _satisfied(req, env, dest, executable, index, links):
dists = [dist for dist in env[req.project_name] if dist in req]
if not dists:
logger.debug('We have no distributions for %s', req.project_name)
return None
# Note that dists are sorted from best to worst, as promised by
# env.__getitem__
for dist in dists:
if (dist.precedence == pkg_resources.DEVELOP_DIST):
logger.debug('We have a develop egg for %s', req)
return dist
# Find an upprt limit in the specs, if there is one:
specs = [(pkg_resources.parse_version(v), op) for (op, v) in req.specs]
specs.sort()
maxv = None
greater = False
lastv = None
for v, op in specs:
if op == '==' and not greater:
maxv = v
elif op in ('>', '>=', '!='):
maxv = None
greater == True
elif op == '<':
maxv = None
greater == False
elif op == '<=':
maxv = v
greater == False
if v == lastv:
# Repeated versions values are undefined, so
# all bets are off
maxv = None
greater = True
else:
lastv = v
best_we_have = dists[0] # Because dists are sorted from best to worst
# Check if we have the upper limit
if maxv is not None and best_we_have.version == maxv:
logger.debug('We have the best distribution that satisfies\n%s',
req)
return best_we_have
# We have some installed distros. There might, theoretically, be
# newer ones. Let's find out which ones are available and see if
# any are newer. We only do this if we're willing to install
# something, which is only true if dest is not None:
if dest is not None:
best_available = _get_index(executable, index, links).obtain(req)
else:
best_available = None
if best_available is None:
# That's a bit odd. There aren't any distros available.
# We should use the best one we have that meets the requirement.
logger.debug(
'There are no distros available that meet %s. Using our best.', req)
return best_we_have
else:
# Let's find out if we already have the best available:
if best_we_have.parsed_version >= best_available.parsed_version:
# Yup. Use it.
logger.debug('We have the best distribution that satisfies\n%s', req)
return best_we_have
return None
if sys.platform == 'win32':
# work around spawn lamosity on windows
# XXX need safe quoting (see the subproces.list2cmdline) and test
def _safe_arg(arg):
return '"%s"' % arg
else:
_safe_arg = str
_easy_install_cmd = _safe_arg(
'from setuptools.command.easy_install import main; main()'
)
def _call_easy_install(spec, env, ws, dest, links, index,
executable, always_unzip):
path = _get_dist(pkg_resources.Requirement.parse('setuptools'),
env, ws, dest, links, index, executable, False).location
args = ('-c', _easy_install_cmd, '-mUNxd', _safe_arg(dest))
if always_unzip:
args += ('-Z', )
level = logger.getEffectiveLevel()
if level > logging.DEBUG:
args += ('-q', )
elif level < logging.DEBUG:
args += ('-v', )
args += (spec, )
if level <= logging.DEBUG:
logger.debug('Running easy_install:\n%s "%s"\npath=%s\n',
executable, '" "'.join(args), path)
args += (dict(os.environ, PYTHONPATH=path), )
sys.stdout.flush() # We want any pending output first
exit_code = os.spawnle(os.P_WAIT, executable, executable, *args)
assert exit_code == 0
def _get_dist(requirement, env, ws,
dest, links, index_url, executable, always_unzip):
# Maybe an existing dist is already the best dist that satisfies the
# requirement
dist = _satisfied(requirement, env, dest, executable, index_url, links)
if dist is None:
if dest is not None:
logger.info("Getting new distribution for %s", requirement)
# Retrieve the dist:
index = _get_index(executable, index_url, links)
dist = index.obtain(requirement)
if dist is None:
raise zc.buildout.UserError(
"Couldn't find a distribution for %s."
% requirement)
fname = dist.location
if url_match(fname):
fname = urlparse.urlparse(fname)[2]
if fname.endswith('.egg'):
# It's already an egg, just fetch it into the dest
tmp = tempfile.mkdtemp('get_dist')
try:
dist = index.fetch_distribution(requirement, tmp)
if dist is None:
raise zc.buildout.UserError(
"Couln't download a distribution for %s."
% requirement)
newloc = os.path.join(
dest, os.path.basename(dist.location))
if os.path.isdir(dist.location):
# we got a directory. It must have been
# obtained locally. Jut copy it.
shutil.copytree(dist.location, newloc)
else:
if always_unzip:
should_unzip = True
else:
metadata = pkg_resources.EggMetadata(
zipimport.zipimporter(dist.location)
)
should_unzip = (
metadata.has_metadata('not-zip-safe')
or not metadata.has_metadata('zip-safe')
)
if should_unzip:
setuptools.archive_util.unpack_archive(
dist.location, newloc)
else:
shutil.copyfile(dist.location, newloc)
finally:
shutil.rmtree(tmp)
else:
# It's some other kind of dist. We'll download it to
# a temporary directory and let easy_install have it's
# way with it:
tmp = tempfile.mkdtemp('get_dist')
try:
dist = index.fetch_distribution(requirement, tmp)
# May need a new one. Call easy_install
_call_easy_install(
dist.location, env, ws, dest, links, index_url,
executable, always_unzip)
finally:
shutil.rmtree(tmp)
# Because we have added a new egg, we need to rescan
# the destination directory.
# We may overwrite distributions, so clear importer
# cache.
sys.path_importer_cache.clear()
env.scan([dest])
dist = env.best_match(requirement, ws)
logger.info("Got %s", dist)
else:
dist = env.best_match(requirement, ws)
if dist is None:
raise ValueError("Couldn't find", requirement)
# XXX Need test for this
if dist.has_metadata('dependency_links.txt'):
for link in dist.get_metadata_lines('dependency_links.txt'):
link = link.strip()
if link not in links:
links.append(link)
return dist
def _maybe_add_setuptools(ws, dist, env, dest, links, index, executable):
if dist.has_metadata('namespace_packages.txt'):
for r in dist.requires():
if r.project_name == 'setuptools':
break
else:
# We have a namespace package but no requirement for setuptools
if dist.precedence == pkg_resources.DEVELOP_DIST:
logger.warn(
"Develop distribution for %s\n"
"uses namespace packages but the distribution "
"does not require setuptools.",
dist)
requirement = pkg_resources.Requirement.parse('setuptools')
if ws.find(requirement) is None:
dist = _get_dist(requirement, env, ws,
dest, links, index, executable,
False)
ws.add(dist)
def install(specs, dest,
links=(), index=None,
executable=sys.executable, always_unzip=False,
path=None, working_set=None):
logger.debug('Installing %r', specs)
path = path and path[:] or []
if dest is not None and dest not in path:
path.insert(0, dest)
path += buildout_and_setuptools_path
links = list(links) # make copy, because we may need to mutate
# For each spec, see if it is already installed. We create a working
# set to keep track of what we've collected and to make sue than the
# distributions assembled are consistent.
env = pkg_resources.Environment(path, python=_get_version(executable))
requirements = [pkg_resources.Requirement.parse(spec) for spec in specs]
if working_set is None:
ws = pkg_resources.WorkingSet([])
else:
ws = working_set
for requirement in requirements:
dist = _get_dist(requirement, env, ws,
dest, links, index, executable, always_unzip)
ws.add(dist)
_maybe_add_setuptools(ws, dist,
env, dest, links, index, executable)
# OK, we have the requested distributions and they're in the working
# set, but they may have unmet requirements. We'll simply keep
# trying to resolve requirements, adding missing requirements as they
# are reported.
#
# Note that we don't pass in the environment, because we
# want to look for new eggs unless what we have is the best that matches
# the requirement.
while 1:
try:
ws.resolve(requirements)
except pkg_resources.DistributionNotFound, err:
[requirement] = err
if dest:
logger.debug('Getting required %s', requirement)
dist = _get_dist(requirement, env, ws,
dest, links, index, executable, always_unzip)
ws.add(dist)
_maybe_add_setuptools(ws, dist,
env, dest, links, index, executable)
else:
break
return ws
def build(spec, dest, build_ext,
links=(), index=None,
executable=sys.executable,
path=None):
index_url = index
logger.debug('Building %r', spec)
path = path and path[:] or []
if dest is not None:
path.insert(0, dest)
path += buildout_and_setuptools_path
links = list(links) # make copy, because we may need to mutate
# For each spec, see if it is already installed. We create a working
# set to keep track of what we've collected and to make sue than the
# distributions assembled are consistent.
env = pkg_resources.Environment(path, python=_get_version(executable))
requirement = pkg_resources.Requirement.parse(spec)
dist = _satisfied(requirement, env, dest, executable, index_url, links)
if dist is not None:
return [dist.location]
undo = []
try:
tmp = tempfile.mkdtemp('build')
undo.append(lambda : shutil.rmtree(tmp))
tmp2 = tempfile.mkdtemp('build')
undo.append(lambda : shutil.rmtree(tmp2))
index = _get_index(executable, index_url, links)
dist = index.fetch_distribution(requirement, tmp2, False, True)
if dist is None:
raise zc.buildout.UserError(
"Couldn't find a source distribution for %s."
% requirement)
setuptools.archive_util.unpack_archive(dist.location, tmp)
if os.path.exists(os.path.join(tmp, 'setup.py')):
base = tmp
else:
setups = glob.glob(os.path.join(tmp, '*', 'setup.py'))
if not setups:
raise distutils.errors.DistutilsError(
"Couldn't find a setup script in %s"
% os.path.basename(dist.location)
)
if len(setups) > 1:
raise distutils.errors.DistutilsError(
"Multiple setup scripts in %s"
% os.path.basename(dist.location)
)
base = os.path.dirname(setups[0])
setup_cfg = os.path.join(base, 'setup.cfg')
if not os.path.exists(setup_cfg):
f = open(setup_cfg, 'w')
f.close()
setuptools.command.setopt.edit_config(
setup_cfg, dict(build_ext=build_ext))
tmp3 = tempfile.mkdtemp('build', dir=dest)
undo.append(lambda : shutil.rmtree(tmp3))
_call_easy_install(base, env, pkg_resources.WorkingSet(),
tmp3, links, index_url, executable, True)
return _copyeggs(tmp3, dest, '.egg', undo)
finally:
undo.reverse()
[f() for f in undo]
def _rm(*paths):
for path in paths:
if os.path.isdir(path):
shutil.rmtree(path)
elif os.path.exists(path):
os.remove(path)
def _copyeggs(src, dest, suffix, undo):
result = []
undo.append(lambda : _rm(*result))
for name in os.listdir(src):
if name.endswith(suffix):
new = os.path.join(dest, name)
_rm(new)
os.rename(os.path.join(src, name), new)
result.append(new)
assert len(result) == 1
undo.pop()
return result[0]
def develop(setup, dest,
build_ext=None,
executable=sys.executable):
if os.path.isdir(setup):
directory = setup
setup = os.path.join(directory, 'setup.py')
else:
directory = os.path.dirname(setup)
undo = []
try:
if build_ext:
setup_cfg = os.path.join(directory, 'setup.cfg')
if os.path.exists(setup_cfg):
os.rename(setup_cfg, setup_cfg+'-develop-aside')
def restore_old_setup():
if os.path.exists(setup_cfg):
os.remove(setup_cfg)
os.rename(setup_cfg+'-develop-aside', setup_cfg)
undo.append(restore_old_setup)
else:
open(setup_cfg, 'w')
undo.append(lambda: os.remove(setup_cfg))
setuptools.command.setopt.edit_config(
setup_cfg, dict(build_ext=build_ext))
fd, tsetup = tempfile.mkstemp()
undo.append(lambda: os.remove(tsetup))
undo.append(lambda: os.close(fd))
os.write(fd, runsetup_template % dict(
setuptools=setuptools_loc,
setupdir=directory,
setup=setup,
__file__ = setup,
))
tmp3 = tempfile.mkdtemp('build', dir=dest)
undo.append(lambda : shutil.rmtree(tmp3))
args = [
zc.buildout.easy_install._safe_arg(tsetup),
'-q', 'develop', '-mxN',
'-d', _safe_arg(tmp3),
]
log_level = logger.getEffectiveLevel()
if log_level <= logging.DEBUG:
if log_level == logging.DEBUG:
del args[1]
else:
args[1] == '-v'
logger.debug("in: %s\n%r", directory, args)
assert os.spawnl(os.P_WAIT, executable, executable, *args) == 0
return _copyeggs(tmp3, dest, '.egg-link', undo)
finally:
undo.reverse()
[f() for f in undo]
def working_set(specs, executable, path):
return install(specs, None, executable=executable, path=path)
def scripts(reqs, working_set, executable, dest,
scripts=None,
extra_paths=(),
arguments='',
interpreter=None,
initialization='',
):
path = [dist.location for dist in working_set]
path.extend(extra_paths)
path = repr(path)[1:-1].replace(', ', ',\n ')
generated = []
if isinstance(reqs, str):
raise TypeError('Expected iterable of requirements or entry points,'
' got string.')
if initialization:
initialization = '\n'+initialization+'\n'
entry_points = []
for req in reqs:
if isinstance(req, str):
req = pkg_resources.Requirement.parse(req)
dist = working_set.find(req)
for name in pkg_resources.get_entry_map(dist, 'console_scripts'):
entry_point = dist.get_entry_info('console_scripts', name)
entry_points.append(
(name, entry_point.module_name,
'.'.join(entry_point.attrs))
)
else:
entry_points.append(req)
for name, module_name, attrs in entry_points:
if scripts is not None:
sname = scripts.get(name)
if sname is None:
continue
else:
sname = name
sname = os.path.join(dest, sname)
generated.extend(
_script(module_name, attrs, path, sname, executable, arguments,
initialization)
)
if interpreter:
sname = os.path.join(dest, interpreter)
generated.extend(_pyscript(path, sname, executable))
return generated
def _script(module_name, attrs, path, dest, executable, arguments,
initialization):
generated = []
if sys.platform == 'win32':
# generate exe file and give the script a magic name:
open(dest+'.exe', 'wb').write(
pkg_resources.resource_string('setuptools', 'cli.exe')
)
generated.append(dest+'.exe')
dest += '-script.py'
open(dest, 'w').write(script_template % dict(
python = executable,
path = path,
module_name = module_name,
attrs = attrs,
arguments = arguments,
initialization = initialization,
))
try:
os.chmod(dest, 0755)
except (AttributeError, os.error):
pass
generated.append(dest)
return generated
script_template = '''\
#!%(python)s
import sys
sys.path[0:0] = [
%(path)s,
]
%(initialization)s
import %(module_name)s
if __name__ == '__main__':
%(module_name)s.%(attrs)s(%(arguments)s)
'''
def _pyscript(path, dest, executable):
generated = []
if sys.platform == 'win32':
# generate exe file and give the script a magic name:
open(dest+'.exe', 'wb').write(
pkg_resources.resource_string('setuptools', 'cli.exe')
)
generated.append(dest+'.exe')
dest += '-script.py'
open(dest, 'w').write(py_script_template % dict(
python = executable,
path = path,
))
try:
os.chmod(dest,0755)
except (AttributeError, os.error):
pass
generated.append(dest)
return generated
py_script_template = '''\
#!%(python)s
import sys
sys.path[0:0] = [
%(path)s,
]
_interactive = True
if len(sys.argv) > 1:
import getopt
_options, _args = getopt.getopt(sys.argv[1:], 'ic:')
_interactive = False
for (_opt, _val) in _options:
if _opt == '-i':
_interactive = True
elif _opt == '-c':
exec _val
if _args:
sys.argv[:] = _args
execfile(sys.argv[0])
if _interactive:
import code
code.interact(banner="", local=globals())
'''
runsetup_template = """
import sys
sys.path.insert(0, %(setuptools)r)
import os, setuptools
__file__ = %(__file__)r
os.chdir(%(setupdir)r)
sys.argv[0] = %(setup)r
execfile(%(setup)r)
"""