"""
Main functionality coordination
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This module contains the class responsible for aggregating and
organizing the main functionality and usage of the package.
Specifically:
- :class:`Mixcloud`, providing the interface of the package.
Stores the basic configuration and preferences, holds the session
which API calls are made from and has all the methods necessary
to reach every endpoint of the API, as well as take advantage of
its capabilities.
"""
from json import JSONDecodeError
from os.path import getsize
import aiohttp
import yarl
from aiomixcloud.constants import API_ROOT, MP3_MAX_SIZE, \
MIXCLOUD_ROOT, OEMBED_ROOT
from aiomixcloud.decorators import displayed, paginated, \
personal, targeting, uploading
from aiomixcloud.exceptions import MixcloudError
from aiomixcloud.json import MixcloudJSONDecoder
from aiomixcloud.models import AccessDict, Resource, ResourceList
[docs]class Mixcloud:
"""Asynchronous Mixcloud API handler
This class orchestrates the interaction of the package's user with
the Mixcloud API. Being ready for use with no explicit
configuration at all, as well as being capable of customized
operation, it provides the methods to hit the API. A common to use
method and base for many of the rest, is the :meth:`get` method,
which takes a "key", a URL segment corresponding to a unique API
resource and returns a native, yet familiar and handy data
structure representing that resource. There, also, exist a family
of methods, like :meth:`popular`, which receive pagination
arguments and return a list of resources. The class provides some
personalized methods, that is methods which require authorization
to run, through an access token. These are methods like
:meth:`follow` and :meth:`unfollow`. In this category belong the
methods about uploading data to the server, :meth:`upload` and
:meth:`edit`. Finally, there are methods about getting embedding
information for some resource, like :meth:`embed_html`. The class
can be used as an asynchronous context manager to avoid having to
call :meth:`close` explicitly, which closes the session.
"""
#: Default Mixcloud root URL
mixcloud_root = MIXCLOUD_ROOT
#: Default Mixcloud API root URL
api_root = API_ROOT
#: Default Mixcloud oEmbed root URL
oembed_root = OEMBED_ROOT
#: Default JSON decoder class
json_decoder_class = MixcloudJSONDecoder
#: Resource model class
resource_class = Resource
#: Resource model list class
resource_list_class = ResourceList
def __init__(self, api_root=api_root, *, access_token=None,
mixcloud_root=mixcloud_root, oembed_root=oembed_root,
json_decoder_class=json_decoder_class,
resource_class=resource_class,
resource_list_class=resource_list_class,
raise_exceptions=False, session=None):
"""Store instance attributes. If no `session` is given,
start a new one.
"""
if session is None:
session = aiohttp.ClientSession()
#: Base URL for all API requests
self._api_root = api_root
#: OAuth Access token
self.access_token = access_token
#: Base Mixcloud URL
self._mixcloud_root = mixcloud_root
#: Base URL for all oEmbed requests
self._oembed_root = oembed_root
#: JSON decode function
self._json_decode = json_decoder_class().decode
#: Class for resource model
self._resource_class = resource_class
#: Class for resource model list
self._resource_list_class = resource_list_class
#: Whether to raise an exception when API responds
#: with error message
self._raise_exceptions = raise_exceptions
#: The :class:`~aiohttp.ClientSession` object to
#: make requests from
self._session = session
async def __aenter__(self):
"""Enable asynchronous context management."""
return self
async def __aexit__(self, *args):
"""Clean up."""
await self.close()
@staticmethod
def _url_join(url, segment):
"""Return a :class:`~yarl.URL` consisting of `url`, followed
by `segment`. Strip possibly existing leading slash of
`segment`, for joining to work.
"""
return yarl.URL(url) / segment.lstrip('/')
def _build_url(self, segment):
"""Return a :class:`~yarl.URL` consisting of API root,
followed by `segment`.
"""
return self._url_join(self._api_root, segment)
async def _process_response(self, response):
"""Return JSON-decoded data out of `response`. If fail to
decode, let :exc:`~json.JSONDecodeError` pass through in
case :attr:`_raise_exceptions` is set, otherwise return
``None``.
"""
try:
# Pass None as content type to avoid strict
# content type checking.
data = await response.json(
loads=self._json_decode, content_type=None)
except JSONDecodeError:
if self._raise_exceptions:
# Let JSONDecodeError pass through.
raise
# Inform caller that decoding failed.
return None
return data
[docs] async def get(self, url, *, relative=True,
create_connections=True, **params):
"""Send a GET request to API and return JSON-decoded data.
If `relative` is ``True``, `url` is considered relative to the
API root, otherwise it is considered as an absolute URL.
"""
# Relative or not, either way end up with a yarl URL.
if relative:
yarl_url = self._build_url(url)
else:
yarl_url = yarl.URL(url)
# Ask for detailed information by setting "metadata" to 1
# in the query.
params['metadata'] = 1
# If an access token is available, use it.
if self.access_token is not None:
params['access_token'] = self.access_token
# Use yarl's `update_query` rather than pass `params` to
# `session.get` to avoid adding possibly duplicate query
# parameters to the URL.
final_url = yarl_url.update_query(params)
async with self._session.get(final_url) as response:
data = await self._process_response(response)
if data is None:
# Could not decode JSON response, return empty data.
return AccessDict({}, mixcloud=self)
if 'error' in data:
if self._raise_exceptions:
raise MixcloudError(data)
# Error response, no resource
return AccessDict(data, mixcloud=self)
if 'data' in data:
# List of resources
return self._resource_list_class(data, mixcloud=self)
# Single resource
return self._resource_class(
data, full=True,
create_connections=create_connections, mixcloud=self)
[docs] @personal
async def me(self):
"""Get information about user authorized by
used access token.
"""
return await self.get('me')
[docs] async def discover(self, tag):
"""Get information about `tag`."""
return await self.get(f'discover/{tag}')
[docs] @paginated
async def popular(self, params):
"""Get information about popular cloudcasts."""
return await self.get('popular', **params)
[docs] @paginated
async def hot(self, params):
"""Get information about hot cloudcasts."""
return await self.get('popular/hot', **params)
[docs] @paginated
async def new(self, params):
"""Get information about new cloudcasts."""
return await self.get('new', **params)
[docs] @paginated
async def search(self, query, params, *, type='cloudcast'):
"""Search resources of `type` by `query` and return
found information.
"""
return await self.get('search', q=query, type=type, **params)
async def _native_result(self, response):
"""Wrap :meth:`_process_response` and return appropriate
:class:`~aiomixcloud.models.AccessDict` object.
"""
data = await self._process_response(response)
if data is None:
# Could not decode JSON response, return empty data.
return AccessDict({}, mixcloud=self)
if 'error' in data and self._raise_exceptions:
raise MixcloudError(data)
return AccessDict(data, mixcloud=self)
@personal
async def _do_action(self, url, action, method):
"""Make HTTP `method` request about `action`, to resource
specified by `url` and return results.
"""
# API requires URLs to end in a slash, when dealing
# with non-GET requests.
action = f'{action}/'
# Form path, build absolute URL and add
# access token GET parameter.
absolute_url = self._build_url(url) / action
final_url = absolute_url.with_query(access_token=self.access_token)
# Choose appropriate HTTP method.
make_request = getattr(self._session, method)
async with make_request(final_url) as response:
return await self._native_result(response)
async def _post_action(self, url, action):
"""Make HTTP POST request about `action`, to resource
specified by `url` and return results.
"""
return await self._do_action(url, action, 'post')
async def _delete_action(self, url, action):
"""Make HTTP DELETE request about `action`, to resource
specified by `url` and return results.
"""
return await self._do_action(url, action, 'delete')
[docs] @targeting
async def follow(self, user):
"""Follow `user` and return results of the request."""
return await self._post_action(user, 'follow')
[docs] @targeting
async def favorite(self, cloudcast):
"""Favorite `cloudcast` and return results of the request."""
return await self._post_action(cloudcast, 'favorite')
[docs] @targeting
async def repost(self, cloudcast):
"""Repost `cloudcast` and return results of the request."""
return await self._post_action(cloudcast, 'repost')
[docs] @targeting
async def listen_later(self, cloudcast):
"""Add `cloudcast` to "listen later" list and
return results of the request.
"""
return await self._post_action(cloudcast, 'listen-later')
[docs] @targeting
async def unfollow(self, user):
"""Unfollow `user` and return results of the request."""
return await self._delete_action(user, 'follow')
[docs] @targeting
async def unfavorite(self, cloudcast):
"""Unfavorite `cloudcast` and return results of the request."""
return await self._delete_action(cloudcast, 'favorite')
[docs] @targeting
async def unrepost(self, cloudcast):
"""Unrepost `cloudcast` and return results of the request."""
return await self._delete_action(cloudcast, 'repost')
[docs] @targeting
async def unlisten_later(self, cloudcast):
"""Remove `cloudcast` from "listen later" list and
return results of the request.
"""
return await self._delete_action(cloudcast, 'listen-later')
async def _proper_result(self, response):
"""Return the proper kind of result, based on Content-Type
of `response`.
"""
content_type = response.headers.get('content-type', '')
if 'javascript' in content_type or 'json' in content_type:
# Handle JSON response
return await self._native_result(response)
# Handle non-JSON response
return await response.text()
@displayed
async def _embed(self, cloudcast, params):
"""Get embed data for `cloudcast`, in desirable format
using specified display options.
"""
format = params.pop('format')
url = self._build_url(cloudcast) / f'embed-{format}'
async with self._session.get(url, params=params) as response:
return await self._proper_result(response)
[docs] @targeting
async def embed_json(self, *args, **kwargs):
"""Get embed data for given cloudcast, in JSON format
using specified display options.
"""
return await self._embed(*args, format='json', **kwargs)
[docs] @targeting
async def embed_html(self, *args, **kwargs):
"""Get embed data for given cloudcast, in HTML format
using specified display options.
"""
return await self._embed(*args, format='html', **kwargs)
[docs] @targeting
@displayed
async def oembed(self, key, params):
"""Get oEmbed data for resource identified by `key`,
in desirable format using specified display options.
"""
url = self._url_join(self._mixcloud_root, key)
params['url'] = str(url)
async with self._session.get(
self._oembed_root, params=params) as response:
return await self._proper_result(response)
async def _upload(self, params, data, url):
"""Add multipart fields from `params` to possibly half-filled
`data`, POST it to `url` and return results.
"""
# Add possibly existing picture.
picture = params.pop('picture', None)
if picture is not None:
picture_file = open(picture, 'rb')
data.add_field('picture', picture_file)
# Remove tags and sections from `params` to handle
# them separately.
tags = params.pop('tags', [])
sections = params.pop('sections', [])
# Add rest of the parameters.
for k, v in params.items():
data.add_field(k, v)
# Add possibly existing tags.
for i, tag in enumerate(tags):
data.add_field(f'tags-{i}-tag', tag)
# Add possibly existing sections.
for i, section in enumerate(sections):
for k, v in section.items():
data.add_field(f'sections-{i}-{k}', str(v))
final_url = url.with_query(access_token=self.access_token)
async with self._session.post(final_url, data=data) as response:
result = await self._native_result(response)
if picture is not None:
picture_file.close()
return result
[docs] @uploading
@personal
async def upload(self, mp3, name, params):
"""Upload file with filename indicated by `mp3`, named `name`
and described by specified parameters.
"""
message = f'mp3 file size must be {MP3_MAX_SIZE} bytes at most'
assert getsize(mp3) <= MP3_MAX_SIZE, message
url = self._build_url('upload/')
data = aiohttp.FormData()
data.add_field('name', name)
with open(mp3, 'rb') as mp3_file:
data.add_field('mp3', mp3_file)
return await self._upload(params, data, url)
[docs] @targeting
@uploading
@personal
async def edit(self, key, params, *, name=None):
"""Edit upload identified by `key`, as described by
specified parameters.
"""
if '/' not in key:
# `key` is just the cloudcast's key, without the username
# part, build full key by fetching current user's key.
user = await self.me()
segment = yarl.URL(user['key'])
key = str(segment / key)
# Strip leading slash for joining to work.
key = key.lstrip('/')
url = self._build_url('upload') / key / 'edit/'
data = aiohttp.FormData()
if name is not None:
data.add_field('name', name)
return await self._upload(params, data, url)
[docs] async def close(self):
"""Close :attr:`_session`."""
await self._session.close()