Source code for instalooter.medias
# coding: utf-8
"""Iterators over Instagram medias.
Iterators defined in this module wrap `PageIterator` instances to yield
individual medias defined in each page instead of whole pages.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import datetime
import typing
import six
from .pages import PageIterator
if typing.TYPE_CHECKING:
from typing import Any, Dict, List, Optional, Iterable, Set, Text
_I = typing.TypeVar('_I', bound='MediasIterator')
__all__ = [
"MediasIterator",
"TimedMediasIterator",
]
[docs]class MediasIterator(typing.Iterator[typing.Dict[typing.Text, typing.Any]]):
"""An iterator over the medias obtained from a page iterator.
"""
def __init__(self, page_iterator):
# type: (Iterable[Dict[Text, Any]]) -> None
self._it = iter(page_iterator)
self._seen = set() # type: Set[Text]
self._edges = [] # type: List[Dict[Text, Dict[Text, Any]]]
self._finished = False
self._total = None # type: Optional[int]
self._done = 0
def __iter__(self):
# type: (_I) -> _I
return self
def _next_page(self):
# type: () -> Dict[Text, Any]
data = next(self._it)
section = next(s for s in six.iterkeys(data) if s.endswith('_media'))
return data[section]
[docs] def __next__(self):
# type: () -> Dict[Text, Any]
if self._finished:
raise StopIteration
if not self._edges:
page = self._next_page()
self._total = page['count']
self._edges.extend(page['edges'])
if not page['edges']:
raise StopIteration
media = self._edges.pop(0)
self._done += 1
if media['node']['id'] in self._seen:
self._finished = True
self._seen.add(media['node']['id'])
return media['node']
def __length_hint__(self):
if self._total is None:
try:
page = self._next_page()
self._total = page['count']
self._edges.extend(page['edges'])
except StopIteration:
self._total = 0
return self._total - self._done
if six.PY2:
next = __next__
[docs]class TimedMediasIterator(MediasIterator):
"""An iterator over the medias within a specific timeframe.
"""
@staticmethod
def get_times(timeframe):
if timeframe is None:
timeframe = (None, None)
try:
start_time = timeframe[0] or datetime.date.today()
end_time = timeframe[1] or datetime.date.fromtimestamp(0)
except (IndexError, AttributeError):
raise TypeError("'timeframe' must be a couple of dates!")
return start_time, end_time
def __init__(self, page_iterator, timeframe=None):
super(TimedMediasIterator, self).__init__(page_iterator)
self.start_time, self.end_time = self.get_times(timeframe)
[docs] def __next__(self):
number_old = 0
while True:
media = super(TimedMediasIterator, self).__next__()
timestamp = media.get('taken_at_timestamp') or media['date']
media_date = type(self.start_time).fromtimestamp(timestamp)
if self.start_time >= media_date >= self.end_time:
return media
elif media_date < self.end_time:
number_old += 1
if number_old >= PageIterator.PAGE_SIZE:
self._finished = True
raise StopIteration
if six.PY2:
next = __next__