# coding: utf-8
"""Implementation of the main program executable.
Warning:
Only `.cli.main` and `.cli.logger` are guaranteed to be stable, do not
rely on any other member from this package !
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import functools
import logging
import getpass
import os
import sys
import traceback
import warnings
import coloredlogs
import docopt
import fs
import six
import verboselogs
from .. import __version__
from ..looters import InstaLooter, HashtagLooter, ProfileLooter, PostLooter
from ..pbar import TqdmProgressBar
from ..batch import BatchRunner, logger as batch_logger
from . import logutils
from .constants import HELP, USAGE, WARNING_ACTIONS
from .time import get_times_from_cli
from .login import login, logger as login_logger
__all__ = ["main", "logger"]
#: A `~logging.Logger` instance used within theĀ `.cli` module.
logger = verboselogs.VerboseLogger(__name__)
[docs]@logutils.wrap_warnings(logger)
def main(argv=None, stream=None):
"""Run from the command line interface.
Arguments:
argv (list): The positional arguments to read. Defaults to
`sys.argv` to use CLI arguments.
stream (~io.IOBase): A file where to write error messages.
Leave to `None` to use the `~coloredlogs.StandardErrorHandler`
for logs, and `sys.stderr` for error messages.
Returns:
int: An error code, or 0 if the program executed successfully.
"""
_print = functools.partial(print, file=stream or sys.stderr)
# Parse command line arguments
try:
args = docopt.docopt(
HELP, argv, version='instalooter {}'.format(__version__))
except docopt.DocoptExit as de:
_print(de)
return 1
# Print usage and exit if required (docopt does not do this !)
if args['--usage']:
_print(USAGE)
return 0
# Set the loggers up with the requested logging level
level = "ERROR" if args['--quiet'] else args.get("--loglevel", "INFO")
for logger_ in (logger, login_logger, batch_logger):
coloredlogs.install(
level=int(level) if level.isdigit() else level,
stream=stream,
logger=logger_)
# Check the requested logging level
if args['-W'] not in WARNING_ACTIONS:
_print("Unknown warning action:", args['-W'])
_print(" available actions:", ', '.join(WARNING_ACTIONS))
return 1
with warnings.catch_warnings():
warnings.simplefilter(args['-W'])
try:
# Run in batch mode
if args['batch']:
# Load the batch configuration from the given file
with open(args['<batch_file>']) as batch_file:
batch_runner = BatchRunner(batch_file, args)
# Run the batch
batch_runner.run_all()
return 0
# Login if requested
if args['login']:
try:
if not args['--username']:
args['--username'] = six.moves.input('Username: ')
login(args)
return 0
except ValueError as ve:
logger.error("%s", ve)
if args["--traceback"]:
traceback.print_exc()
return 1
# Logout if requested
if args['logout']:
if InstaLooter._cachefs().exists(InstaLooter._COOKIE_FILE):
InstaLooter._logout()
logger.success('Logged out.')
else:
warnings.warn('Cookie file not found.')
return 0
# Normal download mode:
if args['user']:
looter_cls = ProfileLooter
target = args['<profile>']
elif args['hashtag']:
looter_cls = HashtagLooter
target = args['<hashtag>']
elif args['post']:
looter_cls = PostLooter
target = args['<post_token>']
else:
raise NotImplementedError("TODO")
# Instantiate the looter
looter = looter_cls(
target,
add_metadata=args['--add-metadata'],
get_videos=args['--get-videos'],
videos_only=args['--videos-only'],
jobs=int(args['--jobs']) if args['--jobs'] is not None else 16,
template=args['--template'],
dump_json=args['--dump-json'],
dump_only=args['--dump-only'],
extended_dump=args['--extended-dump']
)
# Attempt to login and extract the timeframe
if args['--username']:
login(args)
if args['--num-to-dl']:
args['--num-to-dl'] = int(args['--num-to-dl'])
try:
if args['--time'] is not None:
args['--time'] = get_times_from_cli(args['--time'])
except ValueError as ve:
_print("invalid format for --time parameter:", args["--time"])
_print(" (format is [D]:[D] where D is an ISO 8601 date)")
return 1
logger.debug("Opening destination filesystem")
dest_url = args.get('<directory>') or os.getcwd()
dest_fs = fs.open_fs(dest_url, create=True)
logger.notice("Starting download of `%s`", target)
n = looter.download(
destination=dest_fs,
media_count=args['--num-to-dl'],
timeframe=args['--time'],
new_only=args['--new'],
pgpbar_cls=None if args['--quiet'] else TqdmProgressBar,
dlpbar_cls=None if args['--quiet'] else TqdmProgressBar)
if n > 1:
logger.success("Downloaded %i posts.", n)
elif n == 1:
logger.success("Downloaded %i post.", n)
except (Exception, KeyboardInterrupt) as e:
from .threadutils import threads_force_join, threads_count
# Show error traceback if any
if not isinstance(e, KeyboardInterrupt):
logger.critical("%s", e)
if args["--traceback"]:
traceback.print_exc()
else:
logger.critical("Interrupted")
# Close remaining threads spawned by InstaLooter.download
count = threads_count()
if count:
logger.notice("Terminating %i remaining workers...", count)
threads_force_join()
# Return the error number if any
errno = e.errno if hasattr(e, "errno") else None
return errno if errno is not None else 1
else:
return 0
finally:
logger.debug("Closing destination filesystem")
try:
dest_fs.close()
except Exception:
pass