# coding: utf-8
"""Instagram looters implementations.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import abc
import atexit
import copy
import functools
import random
import re
import threading
import time
import typing
import warnings
import fake_useragent
import fs
import six
from requests import Session
from six.moves.queue import Queue
from six.moves.http_cookiejar import FileCookieJar, LWPCookieJar
from . import __author__, __name__ as __appname__, __version__
from ._impl import length_hint, json
from ._utils import NameGenerator, CachedClassProperty, get_shared_data
from .medias import TimedMediasIterator, MediasIterator
from .pages import ProfileIterator, HashtagIterator
from .pbar import ProgressBar
from .worker import InstaDownloader
if typing.TYPE_CHECKING:
from datetime import datetime
from typing import (
Any, Callable, Dict, Iterator, Iterable, List,
Optional, Text, Tuple, Type, Union)
from fs.base import FS
from six.moves.http_cookiejar import CookieJar
_T = typing.TypeVar("_T")
_Timeframe = Tuple[Optional[datetime], Optional[datetime]]
__all__ = [
"InstaLooter",
"ProfileLooter",
"HashtagLooter",
"PostLooter",
]
[docs]@six.add_metaclass(abc.ABCMeta)
class InstaLooter(object):
"""A brutal Instagram looter that raids without API tokens.
"""
@CachedClassProperty
@classmethod
def _cachefs(cls):
"""~fs.base.FS: the cache filesystem.
"""
url = "usercache://{}:{}:{}".format(__appname__, __author__, __version__)
return fs.open_fs(url, create=True)
@CachedClassProperty
@classmethod
def _user_agents(cls):
"""~fake_useragent.UserAgent: a collection of fake user-agents.
"""
filename = 'fake_useragent_{}.json'.format(fake_useragent.VERSION)
return fake_useragent.UserAgent(
path=cls._cachefs.getsyspath(filename),
safe_attrs=['__name__', '__objclass__'])
# str: The name of the cookie file in the cache filesystem
_COOKIE_FILE = "cookies.txt"
@classmethod
def _init_session(cls, session=None):
# type: (Optional[Session]) -> Session
"""Initialise the given session and load class cookies to its jar.
Arguments:
session (~requests.Session, optional): a `requests`
session, or `None` to create a new one.
Returns:
~requests.Session: an initialised session instance.
"""
session = session or Session()
# Load cookies
session.cookies = LWPCookieJar(
cls._cachefs.getsyspath(cls._COOKIE_FILE))
try:
typing.cast(FileCookieJar, session.cookies).load()
except IOError:
pass
typing.cast(FileCookieJar, session.cookies).clear_expired_cookies()
return session
@classmethod
def _login(cls, username, password, session=None):
# type: (str, str, Optional[Session]) -> None
"""Login with provided credentials and session.
Arguments:
username (str): the username to log in with.
password (str): the password to log in with.
session (~requests.Session, optional): the session to use,
or `None` to create a new session.
Note:
Code taken from LevPasha/instabot.py
"""
session = cls._init_session(session)
headers = copy.deepcopy(session.headers)
homepage = "https://www.instagram.com/"
login_url = "https://www.instagram.com/accounts/login/ajax/"
data = {'username': username, 'password': password}
try:
session.headers.update({
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive',
'Content-Length': '0',
'Host': 'www.instagram.com',
'Origin': 'https://www.instagram.com',
'Referer': 'https://www.instagram.com',
'User-Agent': cls._user_agents.firefox,
'X-Instagram-AJAX': '1',
'X-Requested-With': 'XMLHttpRequest'
})
with session.get(homepage) as res:
token = get_shared_data(res.text)['config']['csrf_token']
session.headers.update({'X-CSRFToken': token})
time.sleep(5 * random.random()) # nosec
with session.post(login_url, data, allow_redirects=True) as login:
token = next(c.value for c in login.cookies if c.name == 'csrftoken')
session.headers.update({'X-CSRFToken': token})
if not login.ok:
raise SystemError("Login error: check your connection")
data = json.loads(login.text)
if not data.get('authenticated', False):
raise ValueError('Login error: check your login data')
time.sleep(5 * random.random()) # nosec
with session.get(homepage) as res:
if res.text.find(username) == -1:
raise ValueError('Login error: check your login data')
try:
typing.cast(FileCookieJar, session.cookies).save()
except IOError:
pass
finally:
session.headers = headers
@classmethod
def _logout(cls, session=None):
# type: (Optional[Session]) -> None
"""Log out from current session.
Also deletes the eventual cookie file left in the cache directory,
to prevent new connections from using the old session ID.
Arguments:
session (~requests.Session): the session to use, or `None`
to create a new session.
Note:
Code taken from LevPasha/instabot.py
"""
session = cls._init_session(session)
sessionid = cls._sessionid(session)
if sessionid is not None:
url = "https://www.instagram.com/accounts/logout/"
session.post(url, data={"csrfmiddlewaretoken": sessionid})
if cls._cachefs.exists(cls._COOKIE_FILE):
cls._cachefs.remove(cls._COOKIE_FILE)
@classmethod
def _logged_in(cls, session=None):
# type: (Optional[Session]) -> bool
"""Check if there is an open Instagram session.
Arguments:
session (~requests.Session): the session to use, or `None`
to create a new session.
Returns:
bool: `True` if there's an active session, `False` otherwise.
"""
return cls._sessionid(session) is not None
@classmethod
def _sessionid(cls, session=None):
# type: (Optional[Session]) -> Optional[Text]
"""Get the ID of the currently opened Instagram session.
Arguments:
session (~requests.Session): the session to use, or `None`
to create a new session.
Returns:
str or None: the session ID, if any, or `None`.
"""
_session = cls._init_session(session)
_cookies = typing.cast(FileCookieJar, _session.cookies)
return next((ck.value for ck in _cookies
if ck.domain == ".instagram.com"
and ck.name == "ds_user_id"
and ck.path == "/"), None)
def __init__(self,
add_metadata=False, # type: bool
get_videos=False, # type: bool
videos_only=False, # type: bool
jobs=16, # type: int
template="{id}", # type: Text
dump_json=False, # type: bool
dump_only=False, # type: bool
extended_dump=False, # type: bool
session=None # type: Optional[Session]
):
# type: (...) -> None
"""Create a new looter instance.
Arguments:
add_metadata (bool): Add date and comment metadata to
the downloaded pictures.
get_videos (bool): Also get the videos from the given target.
videos_only (bool): Only download videos (implies
``get_videos=True``).
jobs (bool): the number of parallel threads to use to
download media (12 or more is advised to have a true parallel
download of media files).
template (str): a filename format, in Python new-style-formatting
format. See the the :ref:`Template` page of the documentation
for available keys.
dump_json (bool): Save each resource metadata to a
JSON file next to the actual image/video.
dump_only (bool): Only save metadata and discard the actual
resource.
extended_dump (bool): Attempt to fetch as much metadata as
possible, at the cost of more time. Set to `True` if, for
instance, you always want the top comments to be downloaded
in the dump.
session (~requests.Session or None): a `requests` session,
or `None` to create a new one.
"""
self.add_metadata = add_metadata
self.get_videos = get_videos or videos_only
self.videos_only = videos_only
self.jobs = jobs
self.namegen = NameGenerator(template)
self.dump_only = dump_only
self.dump_json = dump_json or dump_only
self.extended_dump = extended_dump
self.session = self._init_session(session)
atexit.register(self.session.close)
# Set a fake User-Agent
if self.session.headers['User-Agent'].startswith('python-requests'):
self.session.headers['User-Agent'] = self._user_agents.firefox
# Get CSRFToken and RHX
with self.session.get('https://www.instagram.com/') as res:
token = get_shared_data(res.text)['config']['csrf_token']
self.session.headers['X-CSRFToken'] = token
self.rhx = get_shared_data(res.text)['rhx_gis']
[docs] @abc.abstractmethod
def pages(self):
# type: () -> Iterator[Dict[Text, Any]]
"""Obtain an iterator over Instagram post pages.
Returns:
PageIterator: an iterator over the instagram post pages.
"""
return NotImplemented
def _medias(self,
pages_iterator, # type: Iterable[Dict[Text, Any]]
timeframe=None # type: Optional[_Timeframe]
):
# type: (...) -> Iterator[Dict[Text, Any]]
"""Obtain an iterator over the medias of the given pages iterator.
Arguments:
pages_iterator (Iterator): an iterator over the Instagram
pages, returned by `InstaLooter.pages`
Returns:
MediasIterator: an iterator over the medias in every pages.
"""
if timeframe is not None:
return TimedMediasIterator(pages_iterator, timeframe)
return MediasIterator(pages_iterator)
[docs] def get_post_info(self, code):
# type: (str) -> dict
"""Get media information from a given post code.
Arguments:
code (str): the code of the post (can be obtained either
from the ``shortcode`` attribute of media dictionaries, or
from a post URL: ``https://www.instagram.com/p/<code>/``)
Returns:
dict: a media dictionaries, in the format used by Instagram.
"""
url = "https://www.instagram.com/p/{}/".format(code)
with self.session.get(url) as res:
data = get_shared_data(res.text)
return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']
[docs] def download_pictures(self,
destination, # type: Union[str, fs.base.FS]
media_count=None, # type: Optional[int]
timeframe=None, # type: Optional[_Timeframe]
new_only=False, # type: bool
pgpbar_cls=None, # type: Optional[Type[ProgressBar]]
dlpbar_cls=None # type: Optional[Type[ProgressBar]]
):
# type: (...) -> int
"""Download all the pictures to the provided destination.
Actually a shortcut for `.download` with ``condition`` set
to accept only images.
"""
return self.download(
destination,
condition=lambda media: not media["is_video"],
media_count=media_count,
timeframe=timeframe,
new_only=new_only,
pgpbar_cls=pgpbar_cls,
dlpbar_cls=dlpbar_cls,
)
[docs] def download_videos(self,
destination, # type: Union[str, fs.base.FS]
media_count=None, # type: Optional[int]
timeframe=None, # type: Optional[_Timeframe]
new_only=False, # type: bool
pgpbar_cls=None, # type: Optional[Type[ProgressBar]]
dlpbar_cls=None, # type: Optional[Type[ProgressBar]]
):
# type: (...) -> int
"""Download all videos to the provided destination.
Actually a shortcut for `.download` with ``condition`` set
to accept only videos.
"""
return self.download(
destination,
condition=lambda media: media["is_video"],
media_count=media_count,
timeframe=timeframe,
new_only=new_only,
pgpbar_cls=pgpbar_cls,
dlpbar_cls=dlpbar_cls,
)
[docs] def download(self,
destination, # type: Union[str, fs.base.FS]
condition=None, # type: Optional[Callable[[dict], bool]]
media_count=None, # type: Optional[int]
timeframe=None, # type: Optional[_Timeframe]
new_only=False, # type: bool
pgpbar_cls=None, # type: Optional[Type[ProgressBar]]
dlpbar_cls=None, # type: Optional[Type[ProgressBar]]
):
# type: (...) -> int
"""Download all medias passing ``condition`` to destination.
Arguments:
destination (~fs.base.FS or str): the filesystem where to
store the downloaded files, as a filesystem instance or
FS URL.
condition (function): the condition to filter the
medias with. If `None` is given, a function is created using
the ``get_videos`` and ``videos_only`` passed at object
initialisation.
media_count (int or None): the maximum number of medias
to download. Leave to ``None`` to download everything from
the target. *Note that more files can be downloaded, since
a post with multiple images/videos is considered to be a
single media*.
timeframe (tuple or None): a tuple of two `~datetime.datetime`
objects to enforce a time frame (the first item must be
more recent). Leave to `None` to ignore times.
new_only (bool): stop media discovery when already
downloaded medias are encountered.
pgpbar_cls (type or None): an optional `~.pbar.ProgressBar`
subclass to use to display page scraping progress.
dlpbar_cls (type or None): an optional `~.pbar.ProgressBar`
subclass to use to display file download progress.
Returns:
int: the number of queued medias.
May not be equal to the number of downloaded medias if some
errors occurred during background download.
"""
# Open the destination filesystem
destination, close_destination = self._init_destfs(destination)
# Create an iterator over the pages with an optional progress bar
pages_iterator = self.pages() # type: Iterable[Dict[Text, Any]]
pages_iterator = pgpbar = self._init_pbar(pages_iterator, pgpbar_cls)
# Create an iterator over the medias
medias_iterator = self._medias(iter(pages_iterator), timeframe)
# Create the media download bar from a dummy iterator
dlpbar = self._init_pbar(
six.moves.range(length_hint(medias_iterator)), dlpbar_cls)
# Start a group of workers
workers, queue = self._init_workers(
dlpbar if dlpbar_cls is not None else None, destination)
# Make sure exiting the main thread will shutdown workers
atexit.register(self._shutdown_workers, workers)
# Queue all medias
medias_queued = self._fill_media_queue(
queue, destination, medias_iterator, media_count,
new_only, condition)
# Once queuing the medias is fininished, finish the page progress bar
# and set a new maximum on the download progress bar.
if pgpbar_cls is not None:
pgpbar.finish() # type: ignore
if dlpbar_cls is not None:
dlpbar.set_maximum(medias_queued) # type: ignore
# If no medias were queued, issue a warning
# TODO: refine warning depending on download parameters
if medias_queued == 0:
warnings.warn("No medias found.")
# Add poison pills to the queue and wait for workers to finish
self._poison_workers(workers, queue)
self._join_workers(workers, queue)
# Once downloading is finished, finish the download progress bar
# and close the destination if needed.
if dlpbar_cls is not None:
dlpbar.finish() # type: ignore
if close_destination:
destination.close()
return medias_queued
[docs] def login(self, username, password):
# type: (str, str) -> None
"""Log the instance in using the given credentials.
Arguments:
username (str): the username to log in with.
password (str): the password to log in with.
"""
self._login(username, password, session=self.session)
[docs] def logout(self):
# type: () -> None
"""Log the instance out from the currently opened session.
"""
self._logout(session=self.session)
[docs] def logged_in(self):
# type: () -> bool
"""Check if there's an open Instagram session.
"""
return self._logged_in(self.session)
def _init_pbar(self,
it, # type: Iterable[_T]
pbar_cls=None, # type: Optional[Type[ProgressBar]]
):
# type: (...) -> Iterable[_T]
"""Wrap an iterable within a `ProgressBar`.
Arguments:
it (~collections.Iterable): an iterable to wrap.
pgpbar_cls (type or None): an optional `ProgressBar` subclass
to use, or `None` to avoid using a progress bar.
Returns:
~collections.Iterable: the wrapped iterable.
"""
if pbar_cls is not None:
if not issubclass(pbar_cls, ProgressBar):
raise TypeError("pbar must implement the ProgressBar interface !")
maximum = length_hint(it)
it = pbar = pbar_cls(it)
pbar.set_maximum(maximum)
pbar.set_lock(threading.RLock())
return it
def _init_destfs(self, destination, create=True):
# type: (Union[str, fs.base.FS], bool) -> Tuple[fs.base.FS, bool]
"""Open a filesystem either from a FS URL or filesystem instance.
Arguments:
destination (~fs.base.FS or str): the destination filesystem
to open, as a filesystem instance or FS URL.
create (bool): whether or not to create a new
filesystem if it does not exist.
Returns:
(~fs.base.FS, bool): the open FS, and whether to close it.
"""
close_destination = False
if isinstance(destination, six.binary_type):
destination = destination.decode('utf-8')
if isinstance(destination, six.text_type):
destination = fs.open_fs(destination, create=create)
close_destination = True
if not isinstance(destination, fs.base.FS):
raise TypeError("<destination> must be a FS URL or FS instance.")
return destination, close_destination
def _fill_media_queue(self,
queue, # type: Queue
destination, # type: fs.base.FS
medias_iter, # type: Iterable[Any]
media_count=None, # type: Optional[int]
new_only=False, # type: bool
condition=None, # type: Optional[Callable[[dict], bool]]
):
# type: (...) -> int
"""Fill the download queue with medias from the provided iterator.
Arguments:
queue (~queue.Queue): the download queue to fill.
destination (~fs.base.FS): the filesystem where to download
the files.
medias_iterator (~collections.Iterable): an iterable over the
Instagram medias to download.
media_count (int or None): the maximum number of new medias to
download, or ``None`` to download all discoverable medias.
new_only (bool): stop media discovery when a media that was
already downloaded is encountered.
condition (function or None): the condition to filter the medias
with. If `None` is given, a function is created using the
``get_videos`` and ``videos_only`` passed at object
initialisation.
Returns:
int: the number of queued medias.
May not be equal to the number of downloaded medias if some
errors occurred during downloads.
"""
# Create a condition from parameters if needed
if condition is not None:
_condition = condition # type: Callable[[dict], bool]
else:
if self.videos_only:
def _condition(media): return media['is_video']
elif not self.get_videos:
def _condition(media): return not media['is_video']
else:
def _condition(media): return True
# Queue all media filling the condition
medias_queued = 0
for media in six.moves.filter(_condition, medias_iter):
# Check if the whole post info is required
if self.namegen.needs_extended(media) or media["__typename"] != "GraphImage":
media = self.get_post_info(media['shortcode'])
# Check that sidecar children fit the condition
if media['__typename'] == "GraphSidecar":
# Check that each node fits the condition
for sidecar in media['edge_sidecar_to_children']['edges'][:]:
if not _condition(sidecar['node']):
media['edge_sidecar_to_children']['edges'].remove(sidecar)
# Check that the nodelist is not depleted
if not media['edge_sidecar_to_children']['edges']:
continue
# Check that the file does not exist
# FIXME: not working well with sidecar
if new_only and destination.exists(self.namegen.file(media)):
break
# Put the medias in the queue
queue.put(media)
medias_queued += 1
if media_count is not None and medias_queued >= media_count:
break
return medias_queued
# WORKERS UTILS
def _init_workers(self,
pbar, # type: Union[ProgressBar, Iterable, None]
destination, # type: fs.base.FS
):
# type: (...) -> Tuple[List[InstaDownloader], Queue]
workers = [] # type: List[InstaDownloader]
queue = Queue() # type: Queue
for _ in six.moves.range(self.jobs):
worker = InstaDownloader(
queue=queue,
destination=destination,
namegen=self.namegen,
add_metadata=self.add_metadata,
dump_json=self.dump_json,
dump_only=self.dump_only,
pbar=pbar,
session=self.session)
worker.start()
workers.append(worker)
return workers, queue
def _poison_workers(self, workers, queue):
# type: (List[InstaDownloader], Queue) -> None
for worker in workers:
queue.put(None)
def _join_workers(self, workers, queue):
# type: (List[InstaDownloader], Queue) -> None
if any(w.is_alive() for w in workers):
for worker in workers:
worker.join()
def _shutdown_workers(self, workers):
# type: (List[InstaDownloader]) -> None
for worker in workers:
worker.terminate()
[docs]class ProfileLooter(InstaLooter):
"""A looter targeting medias on a user profile.
"""
def __init__(self, username, **kwargs):
# type: (str, **Any) -> None
"""Create a new profile looter.
Arguments:
username (str): the username of the profile.
See `InstaLooter.__init__` for more details about accepted
keyword arguments.
"""
super(ProfileLooter, self).__init__(**kwargs)
self._username = username
self._owner_id = None
[docs] def pages(self):
# type: () -> ProfileIterator
"""Obtain an iterator over Instagram post pages.
Returns:
PageIterator: an iterator over the instagram post pages.
Raises:
ValueError: when the requested user does not exist.
RuntimeError: when the user is a private account
and there is no logged user (or the logged user
does not follow that account).
"""
if self._owner_id is None:
it = ProfileIterator.from_username(self._username, self.session)
self._owner_id = it.owner_id
return it
return ProfileIterator(self._owner_id, self.session, self.rhx)
[docs]class HashtagLooter(InstaLooter):
"""A looter targeting medias tagged with a hashtag.
"""
def __init__(self, hashtag, **kwargs):
# type: (str, **Any) -> None
"""Create a new hashtag looter.
Arguments:
username (str): the hashtag to search for.
See `InstaLooter.__init__` for more details about accepted
keyword arguments.
"""
super(HashtagLooter, self).__init__(**kwargs)
self._hashtag = hashtag
[docs] def pages(self): # noqa: D102
# type: () -> HashtagIterator
return HashtagIterator(self._hashtag, self.session, self.rhx)
[docs]class PostLooter(InstaLooter):
"""A looter targeting a specific post.
"""
_RX_URL = re.compile(
r'(?:https?://)?(?:www\.instagram\.com|instagr\.am)/p/([0-9a-zA-Z_\-]{10,11})'
)
_RX_CODE = re.compile(
r'^[0-9a-zA-Z_\-]{10,11}$'
)
def __init__(self, code, **kwargs):
# type: (str, **Any) -> None
"""Create a new hashtag looter.
Arguments:
code (str): the code of the post to get.
See `InstaLooter.__init__` for more details about accepted
keyword arguments.
"""
super(PostLooter, self).__init__(**kwargs)
self._info = None # type: Optional[dict]
match = self._RX_URL.match(code)
if match is not None:
self.code = match.group(1)
elif self._RX_CODE.match(code) is None:
raise ValueError("invalid post code: '{}'".format(code))
else:
self.code = code
@property
def info(self):
# type: () -> dict
if self._info is None:
self._info = self.get_post_info(self.code)
return self._info
[docs] def pages(self):
# type: () -> Iterator[Dict[Text, Any]]
"""Return a generator that yields a page with only the refered post.
Yields:
dict: a page dictionary with only a single media.
"""
yield {"edge_owner_to_timeline_media": {
"count": 1,
"page_info": {
"has_next_page": False,
"end_cursor": None,
},
"edges": [
{"node": self.info}
],
}}
[docs] def medias(self, timeframe=None):
"""Return a generator that yields only the refered post.
Yields:
dict: a media dictionary obtained from the given post.
Raises:
StopIteration: if the post does not fit the timeframe.
"""
info = self.info
if timeframe is not None:
start, end = TimedMediasIterator.get_times(timeframe)
timestamp = info.get("taken_at_timestamp") or info["media"]
if not (start >= timestamp >= end):
raise StopIteration
yield info
[docs] def download(self,
destination, # type: Union[str, fs.base.FS]
condition=None, # type: Optional[Callable[[dict], bool]]
media_count=None, # type: Optional[int]
timeframe=None, # type: Optional[_Timeframe]
new_only=False, # type: bool
pgpbar_cls=None, # type: Optional[Type[ProgressBar]]
dlpbar_cls=None, # type: Optional[Type[ProgressBar]]
):
# type: (...) -> int
"""Download the refered post to the destination.
See `InstaLooter.download` for argument reference.
Note:
This function, opposed to other *looter* implementations, will
not spawn new threads, but simply use the main thread to download
the files.
Since a worker is in charge of downloading a *media* at a time
(and not a *file*), there would be no point in spawning more.
"""
destination, close_destination = self._init_destfs(destination)
queue = Queue() # type: Queue[Dict]
medias_queued = self._fill_media_queue(
queue, destination, iter(self.medias()), media_count,
new_only, condition)
queue.put(None)
worker = InstaDownloader(
queue=queue,
destination=destination,
namegen=self.namegen,
add_metadata=self.add_metadata,
dump_json=self.dump_json,
dump_only=self.dump_only,
pbar=None,
session=self.session)
worker.run()
return medias_queued