# coding: utf-8
"""Instagram looters implementations.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import abc
import atexit
import copy
import functools
import random
import re
import threading
import time
import typing
import warnings
import fs
import six
from requests import Session
from six.moves.queue import Queue
from six.moves.http_cookiejar import FileCookieJar, LWPCookieJar
from . import __author__, __name__ as __appname__, __version__
from ._impl import length_hint, json
from ._uadetect import get_user_agent
from ._utils import NameGenerator, get_shared_data, get_additional_data
from .medias import TimedMediasIterator, MediasIterator
from .pages import ProfileIterator, HashtagIterator
from .pbar import ProgressBar
from .worker import InstaDownloader
if typing.TYPE_CHECKING:
from datetime import datetime
from typing import (
Any, Callable, Dict, Iterator, Iterable, List,
Optional, Text, Tuple, Type, Union)
from fs.base import FS
from six.moves.http_cookiejar import CookieJar
_T = typing.TypeVar("_T")
_Timeframe = Tuple[Optional[datetime], Optional[datetime]]
__all__ = [
"InstaLooter",
"ProfileLooter",
"HashtagLooter",
"PostLooter",
]
[docs]@six.add_metaclass(abc.ABCMeta)
class InstaLooter(object):
"""A brutal Instagram looter that raids without API tokens.
"""
@classmethod
def _cachefs(cls):
# type: () -> FS
"""Get the a persistent filesystem to store the program cache.
"""
url = "usercache://{}:{}:{}".format(__appname__, __author__, __version__)
return fs.open_fs(url, create=True)
@classmethod
def _user_agent(cls):
# type: () -> Text
"""Get the user agent of the default web browser on the local machine.
"""
cache = cls._cachefs()
if not cache.isfile(cls._USERAGENT_FILE):
ua = get_user_agent(cache=cache.getsyspath(cls._USERAGENT_FILE))
if ua is None:
warnings.warn("Could not detect user agent, using default")
ua = "Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0"
with cache.open("user-agent.txt", "w") as f:
f.write(ua)
with cache.open(cls._USERAGENT_FILE) as f:
return f.read()
# str: The name of the user agent file in the cache filesystem
_USERAGENT_FILE = "user-agent.txt"
# str: The name of the cookie file in the cache filesystem
_COOKIE_FILE = "cookies.txt"
@classmethod
def _init_session(cls, session=None):
# type: (Optional[Session]) -> Session
"""Initialise the given session and load class cookies to its jar.
Arguments:
session (~requests.Session, optional): a `requests`
session, or `None` to create a new one.
Returns:
~requests.Session: an initialised session instance.
"""
session = session or Session()
# Load cookies
path = cls._cachefs().getsyspath(cls._COOKIE_FILE)
session.cookies = LWPCookieJar(path) # type: ignore
try:
typing.cast(FileCookieJar, session.cookies).load()
except IOError:
pass
session.cookies.clear_expired_cookies() # type: ignore
return session
@classmethod
def _login(cls, username, password, session=None):
# type: (str, str, Optional[Session]) -> None
"""Login with provided credentials and session.
Arguments:
username (str): the username to log in with.
password (str): the password to log in with.
session (~requests.Session, optional): the session to use,
or `None` to create a new session.
Note:
Code taken from LevPasha/instabot.py
"""
session = cls._init_session(session)
headers = copy.deepcopy(session.headers)
homepage = "https://www.instagram.com/"
login_url = "https://www.instagram.com/accounts/login/ajax/"
enc_password = "#PWD_INSTAGRAM_BROWSER:0:{}:{}".format(time.time(), password)
data = {'username': username, 'enc_password': enc_password}
try:
session.headers.update({
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive',
'Content-Length': '0',
'Host': 'www.instagram.com',
'Origin': 'https://www.instagram.com',
'Referer': 'https://www.instagram.com',
'User-Agent': cls._user_agent(),
'X-Instagram-AJAX': '1',
'X-Requested-With': 'XMLHttpRequest'
})
with session.get(homepage) as res:
token = get_shared_data(res.text)['config']['csrf_token']
session.headers.update({'X-CSRFToken': token})
time.sleep(5 * random.random()) # nosec
with session.post(login_url, data, allow_redirects=True) as login:
token = next(c.value for c in login.cookies if c.name == 'csrftoken')
session.headers.update({'X-CSRFToken': token})
if not login.ok:
raise SystemError("Login error: check your connection")
data = json.loads(login.text)
if not data.get('authenticated', False):
raise ValueError('Login error: check your login data')
time.sleep(5 * random.random()) # nosec
with session.get(homepage) as res:
if res.text.find(username) == -1:
raise ValueError('Login error: check your login data')
try:
typing.cast(FileCookieJar, session.cookies).save()
except IOError:
pass
finally:
session.headers = headers
@classmethod
def _logout(cls, session=None):
# type: (Optional[Session]) -> None
"""Log out from current session.
Also deletes the eventual cookie file left in the cache directory,
to prevent new connections from using the old session ID.
Arguments:
session (~requests.Session): the session to use, or `None`
to create a new session.
Note:
Code taken from LevPasha/instabot.py
"""
session = cls._init_session(session)
sessionid = cls._sessionid(session)
if sessionid is not None:
url = "https://www.instagram.com/accounts/logout/"
session.post(url, data={"csrfmiddlewaretoken": sessionid})
cache = cls._cachefs()
if cache.exists(cls._COOKIE_FILE):
cache.remove(cls._COOKIE_FILE)
@classmethod
def _logged_in(cls, session=None):
# type: (Optional[Session]) -> bool
"""Check if there is an open Instagram session.
Arguments:
session (~requests.Session): the session to use, or `None`
to create a new session.
Returns:
bool: `True` if there's an active session, `False` otherwise.
"""
return cls._sessionid(session) is not None
@classmethod
def _sessionid(cls, session=None):
# type: (Optional[Session]) -> Optional[Text]
"""Get the ID of the currently opened Instagram session.
Arguments:
session (~requests.Session): the session to use, or `None`
to create a new session.
Returns:
str or None: the session ID, if any, or `None`.
"""
_session = cls._init_session(session)
_cookies = typing.cast(FileCookieJar, _session.cookies)
return next((ck.value for ck in _cookies
if ck.domain == ".instagram.com"
and ck.name == "ds_user_id"
and ck.path == "/"), None)
def __init__(self,
add_metadata=False, # type: bool
get_videos=False, # type: bool
videos_only=False, # type: bool
jobs=16, # type: int
template="{id}", # type: Text
dump_json=False, # type: bool
dump_only=False, # type: bool
extended_dump=False, # type: bool
session=None # type: Optional[Session]
):
# type: (...) -> None
"""Create a new looter instance.
Arguments:
add_metadata (bool): Add date and comment metadata to
the downloaded pictures.
get_videos (bool): Also get the videos from the given target.
videos_only (bool): Only download videos (implies
``get_videos=True``).
jobs (bool): the number of parallel threads to use to
download media (12 or more is advised to have a true parallel
download of media files).
template (str): a filename format, in Python new-style-formatting
format. See the the :ref:`Template` page of the documentation
for available keys.
dump_json (bool): Save each resource metadata to a
JSON file next to the actual image/video.
dump_only (bool): Only save metadata and discard the actual
resource.
extended_dump (bool): Attempt to fetch as much metadata as
possible, at the cost of more time. Set to `True` if, for
instance, you always want the top comments to be downloaded
in the dump.
session (~requests.Session or None): a `requests` session,
or `None` to create a new one.
"""
self.add_metadata = add_metadata
self.get_videos = get_videos or videos_only
self.videos_only = videos_only
self.jobs = jobs
self.namegen = NameGenerator(template)
self.dump_only = dump_only
self.dump_json = dump_json or dump_only
self.extended_dump = extended_dump
self.session = self._init_session(session)
atexit.register(self.session.close)
# Set the default webbrowser user agent
if self.session.headers['User-Agent'].startswith('python-requests'):
self.session.headers['User-Agent'] = self._user_agent()
# Get CSRFToken and RHX
with self.session.get('https://www.instagram.com/') as res:
token = get_shared_data(res.text)['config']['csrf_token']
self.session.headers['X-CSRFToken'] = token
self.rhx = get_shared_data(res.text).get('rhx_gis', '')
[docs] @abc.abstractmethod
def pages(self):
# type: () -> Iterator[Dict[Text, Any]]
"""Obtain an iterator over Instagram post pages.
Returns:
PageIterator: an iterator over the instagram post pages.
"""
return NotImplemented
def _medias(self,
pages_iterator, # type: Iterable[Dict[Text, Any]]
timeframe=None # type: Optional[_Timeframe]
):
# type: (...) -> Iterator[Dict[Text, Any]]
"""Obtain an iterator over the medias of the given pages iterator.
Arguments:
pages_iterator (Iterator): an iterator over the Instagram
pages, returned by `InstaLooter.pages`
Returns:
MediasIterator: an iterator over the medias in every pages.
"""
if timeframe is not None:
return TimedMediasIterator(pages_iterator, timeframe)
return MediasIterator(pages_iterator)
[docs] def get_post_info(self, code):
# type: (str) -> dict
"""Get media information from a given post code.
Arguments:
code (str): the code of the post (can be obtained either
from the ``shortcode`` attribute of media dictionaries, or
from a post URL: ``https://www.instagram.com/p/<code>/``)
Returns:
dict: a media dictionaries, in the format used by Instagram.
"""
url = "https://www.instagram.com/p/{}/".format(code)
with self.session.get(url) as res:
data = get_shared_data(res.text)
if 'graphql' in data['entry_data']['PostPage'][0]:
return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']
data = get_additional_data(res.text)
return data['graphql']['shortcode_media']
[docs] def download_pictures(self,
destination, # type: Union[str, fs.base.FS]
media_count=None, # type: Optional[int]
timeframe=None, # type: Optional[_Timeframe]
new_only=False, # type: bool
pgpbar_cls=None, # type: Optional[Type[ProgressBar]]
dlpbar_cls=None # type: Optional[Type[ProgressBar]]
):
# type: (...) -> int
"""Download all the pictures to the provided destination.
Actually a shortcut for `.download` with ``condition`` set
to accept only images.
"""
return self.download(
destination,
condition=lambda media: not media["is_video"],
media_count=media_count,
timeframe=timeframe,
new_only=new_only,
pgpbar_cls=pgpbar_cls,
dlpbar_cls=dlpbar_cls,
)
[docs] def download_videos(self,
destination, # type: Union[str, fs.base.FS]
media_count=None, # type: Optional[int]
timeframe=None, # type: Optional[_Timeframe]
new_only=False, # type: bool
pgpbar_cls=None, # type: Optional[Type[ProgressBar]]
dlpbar_cls=None, # type: Optional[Type[ProgressBar]]
):
# type: (...) -> int
"""Download all videos to the provided destination.
Actually a shortcut for `.download` with ``condition`` set
to accept only videos.
"""
return self.download(
destination,
condition=lambda media: media["is_video"],
media_count=media_count,
timeframe=timeframe,
new_only=new_only,
pgpbar_cls=pgpbar_cls,
dlpbar_cls=dlpbar_cls,
)
[docs] def download(self,
destination, # type: Union[str, fs.base.FS]
condition=None, # type: Optional[Callable[[dict], bool]]
media_count=None, # type: Optional[int]
timeframe=None, # type: Optional[_Timeframe]
new_only=False, # type: bool
pgpbar_cls=None, # type: Optional[Type[ProgressBar]]
dlpbar_cls=None, # type: Optional[Type[ProgressBar]]
):
# type: (...) -> int
"""Download all medias passing ``condition`` to destination.
Arguments:
destination (~fs.base.FS or str): the filesystem where to
store the downloaded files, as a filesystem instance or
FS URL.
condition (function): the condition to filter the
medias with. If `None` is given, a function is created using
the ``get_videos`` and ``videos_only`` passed at object
initialisation.
media_count (int or None): the maximum number of medias
to download. Leave to ``None`` to download everything from
the target. *Note that more files can be downloaded, since
a post with multiple images/videos is considered to be a
single media*.
timeframe (tuple or None): a tuple of two `~datetime.datetime`
objects to enforce a time frame (the first item must be
more recent). Leave to `None` to ignore times.
new_only (bool): stop media discovery when already
downloaded medias are encountered.
pgpbar_cls (type or None): an optional `~.pbar.ProgressBar`
subclass to use to display page scraping progress.
dlpbar_cls (type or None): an optional `~.pbar.ProgressBar`
subclass to use to display file download progress.
Returns:
int: the number of queued medias.
May not be equal to the number of downloaded medias if some
errors occurred during background download.
"""
# Open the destination filesystem
destination, close_destination = self._init_destfs(destination)
# Create an iterator over the pages with an optional progress bar
pages_iterator = self.pages() # type: Iterable[Dict[Text, Any]]
pages_iterator = pgpbar = self._init_pbar(pages_iterator, pgpbar_cls)
# Create an iterator over the medias
medias_iterator = self._medias(iter(pages_iterator), timeframe)
# Create the media download bar from a dummy iterator
dlpbar = self._init_pbar(
six.moves.range(length_hint(medias_iterator)), dlpbar_cls)
# Start a group of workers
workers, queue = self._init_workers(
dlpbar if dlpbar_cls is not None else None, destination)
# Make sure exiting the main thread will shutdown workers
atexit.register(self._shutdown_workers, workers)
# Queue all medias
medias_queued = self._fill_media_queue(
queue, destination, medias_iterator, media_count,
new_only, condition)
# Once queuing the medias is fininished, finish the page progress bar
# and set a new maximum on the download progress bar.
if pgpbar_cls is not None:
pgpbar.finish() # type: ignore
if dlpbar_cls is not None:
dlpbar.set_maximum(medias_queued) # type: ignore
# If no medias were queued, issue a warning
# TODO: refine warning depending on download parameters
if medias_queued == 0:
warnings.warn("No medias found.")
# Add poison pills to the queue and wait for workers to finish
self._poison_workers(workers, queue)
self._join_workers(workers, queue)
# Once downloading is finished, finish the download progress bar
# and close the destination if needed.
if dlpbar_cls is not None:
dlpbar.finish() # type: ignore
if close_destination:
destination.close()
return medias_queued
[docs] def login(self, username, password):
# type: (str, str) -> None
"""Log the instance in using the given credentials.
Arguments:
username (str): the username to log in with.
password (str): the password to log in with.
"""
self._login(username, password, session=self.session)
[docs] def logout(self):
# type: () -> None
"""Log the instance out from the currently opened session.
"""
self._logout(session=self.session)
[docs] def logged_in(self):
# type: () -> bool
"""Check if there's an open Instagram session.
"""
return self._logged_in(self.session)
def _init_pbar(self,
it, # type: Iterable[_T]
pbar_cls=None, # type: Optional[Type[ProgressBar]]
):
# type: (...) -> Iterable[_T]
"""Wrap an iterable within a `ProgressBar`.
Arguments:
it (~collections.Iterable): an iterable to wrap.
pgpbar_cls (type or None): an optional `ProgressBar` subclass
to use, or `None` to avoid using a progress bar.
Returns:
~collections.Iterable: the wrapped iterable.
"""
if pbar_cls is not None:
if not issubclass(pbar_cls, ProgressBar):
raise TypeError("pbar must implement the ProgressBar interface !")
maximum = length_hint(it)
it = pbar = pbar_cls(it)
pbar.set_maximum(maximum)
pbar.set_lock(threading.RLock())
return it
def _init_destfs(self, destination, create=True):
# type: (Union[str, fs.base.FS], bool) -> Tuple[fs.base.FS, bool]
"""Open a filesystem either from a FS URL or filesystem instance.
Arguments:
destination (~fs.base.FS or str): the destination filesystem
to open, as a filesystem instance or FS URL.
create (bool): whether or not to create a new
filesystem if it does not exist.
Returns:
(~fs.base.FS, bool): the open FS, and whether to close it.
"""
close_destination = False
if isinstance(destination, six.binary_type):
destination = destination.decode('utf-8')
if isinstance(destination, six.text_type):
destination = fs.open_fs(destination, create=create)
close_destination = True
if not isinstance(destination, fs.base.FS):
raise TypeError("<destination> must be a FS URL or FS instance.")
return destination, close_destination
def _fill_media_queue(self,
queue, # type: Queue
destination, # type: fs.base.FS
medias_iter, # type: Iterable[Any]
media_count=None, # type: Optional[int]
new_only=False, # type: bool
condition=None, # type: Optional[Callable[[dict], bool]]
):
# type: (...) -> int
"""Fill the download queue with medias from the provided iterator.
Arguments:
queue (~queue.Queue): the download queue to fill.
destination (~fs.base.FS): the filesystem where to download
the files.
medias_iterator (~collections.Iterable): an iterable over the
Instagram medias to download.
media_count (int or None): the maximum number of new medias to
download, or ``None`` to download all discoverable medias.
new_only (bool): stop media discovery when a media that was
already downloaded is encountered.
condition (function or None): the condition to filter the medias
with. If `None` is given, a function is created using the
``get_videos`` and ``videos_only`` passed at object
initialisation.
Returns:
int: the number of queued medias.
May not be equal to the number of downloaded medias if some
errors occurred during downloads.
"""
# Create a condition from parameters if needed
if condition is not None:
_condition = condition # type: Callable[[dict], bool]
else:
if self.videos_only:
def _condition(media): return media['is_video']
elif not self.get_videos:
def _condition(media): return not media['is_video']
else:
def _condition(media): return True
# Queue all media filling the condition
medias_queued = 0
for media in six.moves.filter(_condition, medias_iter):
# Check if the whole post info is required
if self.namegen.needs_extended(media) or media["__typename"] != "GraphImage":
media = self.get_post_info(media['shortcode'])
# Check that sidecar children fit the condition
if media['__typename'] == "GraphSidecar":
# Check that each node fits the condition
for sidecar in media['edge_sidecar_to_children']['edges'][:]:
if not _condition(sidecar['node']):
media['edge_sidecar_to_children']['edges'].remove(sidecar)
# Check that the nodelist is not depleted
if not media['edge_sidecar_to_children']['edges']:
continue
# Check that the file does not exist
# FIXME: not working well with sidecar
if new_only and destination.exists(self.namegen.file(media)):
break
# Put the medias in the queue
queue.put(media)
medias_queued += 1
if media_count is not None and medias_queued >= media_count:
break
return medias_queued
# WORKERS UTILS
def _init_workers(self,
pbar, # type: Union[ProgressBar, Iterable, None]
destination, # type: fs.base.FS
):
# type: (...) -> Tuple[List[InstaDownloader], Queue]
workers = [] # type: List[InstaDownloader]
queue = Queue() # type: Queue
for _ in six.moves.range(self.jobs):
worker = InstaDownloader(
queue=queue,
destination=destination,
namegen=self.namegen,
add_metadata=self.add_metadata,
dump_json=self.dump_json,
dump_only=self.dump_only,
pbar=pbar,
session=self.session)
worker.start()
workers.append(worker)
return workers, queue
def _poison_workers(self, workers, queue):
# type: (List[InstaDownloader], Queue) -> None
for worker in workers:
queue.put(None)
def _join_workers(self, workers, queue):
# type: (List[InstaDownloader], Queue) -> None
if any(w.is_alive() for w in workers):
for worker in workers:
worker.join()
def _shutdown_workers(self, workers):
# type: (List[InstaDownloader]) -> None
for worker in workers:
worker.terminate()
[docs]class ProfileLooter(InstaLooter):
"""A looter targeting medias on a user profile.
"""
def __init__(self, username, **kwargs):
# type: (str, **Any) -> None
"""Create a new profile looter.
Arguments:
username (str): the username of the profile.
See `InstaLooter.__init__` for more details about accepted
keyword arguments.
"""
super(ProfileLooter, self).__init__(**kwargs)
self._username = username
self._owner_id = None
[docs] def pages(self):
# type: () -> ProfileIterator
"""Obtain an iterator over Instagram post pages.
Returns:
PageIterator: an iterator over the instagram post pages.
Raises:
ValueError: when the requested user does not exist.
RuntimeError: when the user is a private account
and there is no logged user (or the logged user
does not follow that account).
"""
if self._owner_id is None:
it = ProfileIterator.from_username(self._username, self.session)
self._owner_id = it.owner_id
return it
return ProfileIterator(self._owner_id, self.session, self.rhx)
[docs]class HashtagLooter(InstaLooter):
"""A looter targeting medias tagged with a hashtag.
"""
def __init__(self, hashtag, **kwargs):
# type: (str, **Any) -> None
"""Create a new hashtag looter.
Arguments:
username (str): the hashtag to search for.
See `InstaLooter.__init__` for more details about accepted
keyword arguments.
"""
super(HashtagLooter, self).__init__(**kwargs)
self._hashtag = hashtag
[docs] def pages(self): # noqa: D102
# type: () -> HashtagIterator
return HashtagIterator(self._hashtag, self.session, self.rhx)
[docs]class PostLooter(InstaLooter):
"""A looter targeting a specific post.
"""
_RX_URL = re.compile(
r'(?:https?://)?(?:www\.instagram\.com|instagr\.am)/p/([0-9a-zA-Z_\-]{10,11})'
)
_RX_CODE = re.compile(
r'^[0-9a-zA-Z_\-]{10,11}$'
)
def __init__(self, code, **kwargs):
# type: (str, **Any) -> None
"""Create a new hashtag looter.
Arguments:
code (str): the code of the post to get.
See `InstaLooter.__init__` for more details about accepted
keyword arguments.
"""
super(PostLooter, self).__init__(**kwargs)
self._info = None # type: Optional[dict]
match = self._RX_URL.match(code)
if match is not None:
self.code = match.group(1)
elif self._RX_CODE.match(code) is None:
raise ValueError("invalid post code: '{}'".format(code))
else:
self.code = code
@property
def info(self):
# type: () -> dict
if self._info is None:
self._info = self.get_post_info(self.code)
return self._info
[docs] def pages(self):
# type: () -> Iterator[Dict[Text, Any]]
"""Return a generator that yields a page with only the refered post.
Yields:
dict: a page dictionary with only a single media.
"""
yield {"edge_owner_to_timeline_media": {
"count": 1,
"page_info": {
"has_next_page": False,
"end_cursor": None,
},
"edges": [
{"node": self.info}
],
}}
[docs] def medias(self, timeframe=None):
"""Return a generator that yields only the refered post.
Yields:
dict: a media dictionary obtained from the given post.
Raises:
StopIteration: if the post does not fit the timeframe.
"""
info = self.info
if timeframe is not None:
start, end = TimedMediasIterator.get_times(timeframe)
timestamp = info.get("taken_at_timestamp") or info["media"]
if not (start >= timestamp >= end):
raise StopIteration
yield info
[docs] def download(self,
destination, # type: Union[str, fs.base.FS]
condition=None, # type: Optional[Callable[[dict], bool]]
media_count=None, # type: Optional[int]
timeframe=None, # type: Optional[_Timeframe]
new_only=False, # type: bool
pgpbar_cls=None, # type: Optional[Type[ProgressBar]]
dlpbar_cls=None, # type: Optional[Type[ProgressBar]]
):
# type: (...) -> int
"""Download the refered post to the destination.
See `InstaLooter.download` for argument reference.
Note:
This function, opposed to other *looter* implementations, will
not spawn new threads, but simply use the main thread to download
the files.
Since a worker is in charge of downloading a *media* at a time
(and not a *file*), there would be no point in spawning more.
"""
destination, close_destination = self._init_destfs(destination)
queue = Queue() # type: Queue[Optional[Dict]]
medias_queued = self._fill_media_queue(
queue, destination, iter(self.medias()), media_count,
new_only, condition)
queue.put(None)
worker = InstaDownloader(
queue=queue,
destination=destination,
namegen=self.namegen,
add_metadata=self.add_metadata,
dump_json=self.dump_json,
dump_only=self.dump_only,
pbar=None,
session=self.session)
worker.run()
return medias_queued