defuze.me
Client
|
00001 /************************************************************************** 00002 ** defuze.me Epitech Innovative Project 00003 ** 00004 ** Copyright 2010-2011 00005 ** Athena Calmettes - Jocelyn De La Rosa - Francois Gaillard 00006 ** Adrien Jarthon - Alexandre Moore - Luc Peres - Arnaud Sellier 00007 ** 00008 ** All rights reserved. 00009 **************************************************************************/ 00010 00011 #include "servicesync.hpp" 00012 #include "queuetrack.hpp" 00013 #include "logger.hpp" 00014 #include "status.hpp" 00015 #include <QTimer> 00016 00017 using namespace WebService; 00018 00019 ServiceSync::ServiceSync() : reply(NULL), ws(0), ws_tries(0), ws_delay(1) 00020 { 00021 } 00022 00023 ServiceSync::~ServiceSync() 00024 { 00025 } 00026 00027 void ServiceSync::init() 00028 { 00029 queue = plugins->cast<Queue::PlayQueue>("queue"); 00030 connect(queue, SIGNAL(initialized()), SLOT(sendPlayQueue())); 00031 connect(queue, SIGNAL(addQueueElem(Queue::Queueable*)), SLOT(sendAddQueueElem(Queue::Queueable*))); 00032 connect(queue, SIGNAL(removeQueueElem(Queue::Queueable*)), SLOT(sendRemoveQueueElem(Queue::Queueable*))); 00033 connect(queue, SIGNAL(altered()), SLOT(sendPlayQueue())); 00034 connect(cores->net(), SIGNAL(APIAuthenticated()), SLOT(sendNextRequest())); 00035 connect(cores->net(), SIGNAL(APIAuthenticated()), SLOT(wsConnect())); 00036 wsConnect(); 00037 } 00038 00039 void ServiceSync::wsConnect() 00040 { 00041 int radio_id = cores->net()->getRadioInfo()["id"].toInt(); 00042 if (radio_id && !ws) 00043 { 00044 ws = new Network::WebSocket(QString("ws://%1/push/%2/client").arg(gl_PUSH_HOSTNAME).arg(radio_id)); 00045 ws->onMessage(this, SLOT(onMessage(QByteArray))); 00046 ws->onOpen(this, SLOT(onOpen())); 00047 ws->onClose(this, SLOT(onClose(QString))); 00048 ws->connect(); 00049 } 00050 } 00051 00052 void ServiceSync::defineParams() 00053 { 00054 } 00055 00056 bool ServiceSync::hasRequests() const 00057 { 00058 return (!requests.empty()); 00059 } 00060 00061 bool ServiceSync::isRequesting() const 00062 { 00063 return (reply != NULL); 00064 } 00065 00066 void ServiceSync::sendNextRequest() 00067 { 00068 if (hasRequests() && !isRequesting()) 00069 { 00070 ServiceRequest *sRequest = requests.front(); 00071 QNetworkRequest netRequest = cores->net()->apiRequest(sRequest->url); 00072 00073 // Notify 00074 Notification::Status::gMessage(tr("Synchronisation with website"), Notification::SYNC); 00075 00076 // Store serialized data to a buffer 00077 buffer.setData(cores->net()->apiParser().serialize(sRequest->data)); 00078 00079 // send request to web service 00080 reply = cores->net()->web().sendCustomRequest(netRequest, sRequest->verb, &buffer); 00081 // handle events 00082 connect(reply, SIGNAL(finished()), SLOT(receiveResponse())); 00083 } 00084 else 00085 { 00086 //qDebug() << "can't send next request yet (hasRequests = " << hasRequests() << ", isRequesting = " << isRequesting() << ")"; 00087 } 00088 } 00089 00090 void ServiceSync::sendPlayQueue() 00091 { 00092 ServiceRequest *req = newRequest("PUT", "radios/my"); 00093 QVariantMap radio; 00094 QVariantList elems; 00095 00096 // Ignore changes if queue is being altered 00097 if (queue->isBeingAlteredALot()) 00098 return; 00099 // build PUT data hash 00100 foreach(Queue::Queueable* elem, queue->getQueue()) 00101 { 00102 QVariantMap hash; 00103 hash["position"] = elem->getPosition() + 1; 00104 hash["kind"] = elem->queueType(); 00105 hash["play_at"] = elem->getPlayTime(); 00106 if (!elem->getQueueAttributes().empty()) 00107 hash["properties"] = elem->getQueueAttributes(); 00108 if (elem->isTrack()) 00109 hash["track_attributes"] = elem->toQueueTrack()->getContent(true); 00110 elems << hash; 00111 } 00112 radio["queue_elems_attributes"] = elems; 00113 (*req)["radio"] = radio; 00114 00115 req->enqueue(); 00116 } 00117 00118 void ServiceSync::sendRemoveQueueElem(Queue::Queueable* elem) 00119 { 00120 int position = elem->getPosition() + 1; 00121 ServiceRequest *req; 00122 00123 // Ignore changes if queue is being altered 00124 if (queue->isBeingAlteredALot()) 00125 return; 00126 if (!sync_mutex.tryLock()) 00127 return; 00128 sync_mutex.unlock(); 00129 req = newRequest("DELETE", QString("radios/my/queue/%1").arg(position)); 00130 req->enqueue(); 00131 } 00132 00133 void ServiceSync::sendAddQueueElem(Queue::Queueable* elem) 00134 { 00135 ServiceRequest *req; 00136 QVariantMap queue_elem; 00137 00138 // Ignore changes if queue is being altered 00139 if (queue->isBeingAlteredALot()) 00140 return; 00141 if (!sync_mutex.tryLock()) 00142 return; 00143 sync_mutex.unlock(); 00144 // build POST data hash 00145 req = newRequest("POST", "radios/my/queue"); 00146 queue_elem["position"] = elem->getPosition() + 1; 00147 queue_elem["kind"] = elem->queueType(); 00148 queue_elem["play_at"] = elem->getPlayTime(); 00149 if (!elem->getQueueAttributes().empty()) 00150 queue_elem["properties"] = elem->getQueueAttributes(); 00151 if (elem->isTrack()) 00152 queue_elem["track_attributes"] = elem->toQueueTrack()->getContent(true); 00153 00154 (*req)["queue_elem"] = queue_elem; 00155 req->enqueue(); 00156 } 00157 00158 void ServiceSync::receiveResponse() 00159 { 00160 ServiceRequest* sRequest = requests.front(); 00161 00162 if (reply->error()) 00163 { 00164 if (reply->attribute(QNetworkRequest::HttpStatusCodeAttribute) == 401) 00165 { 00166 //cores->net()->invalidateWebToken(); 00167 reply->deleteLater(); 00168 reply = NULL; 00169 Notification::Message *msg = plugins->cast<Notification::Status>("status")->message(tr("Bad or expired credentials"), Notification::WARN); 00170 msg->setAction(tr("Sign in"), cores->net(), SLOT(invalidateWebToken())); 00171 return; 00172 } 00173 QVariant data = cores->net()->apiParser().parse(reply->readAll()); 00174 QString error = data.toMap()["error"].toString(); 00175 if (error == "") 00176 error = reply->errorString(); 00177 Logger::log(QString("Web service: error: %1").arg(error)); 00178 Notification::Status::gMessage(tr("Synchronisation failed: %1").arg(error), Notification::ERR); 00179 } 00180 else 00181 { 00182 QVariant data = cores->net()->apiParser().parse(reply->readAll()); 00183 } 00184 reply->deleteLater(); 00185 reply = NULL; 00186 requests.pop_front(); 00187 delete sRequest; 00188 sendNextRequest(); 00189 } 00190 00191 ServiceRequest* ServiceSync::newRequest(const QString& verb, const QString& url) 00192 { 00193 ServiceRequest *request; 00194 00195 // create request 00196 request = new ServiceRequest(this, url); 00197 request->verb = verb.toUtf8(); 00198 00199 return request; 00200 } 00201 00202 void ServiceSync::onMessage(const QByteArray& data) 00203 { 00204 QVariantMap hash = Network::JsonParser().parse(data).toMap(); 00205 QMutexLocker locker(&sync_mutex); 00206 00207 if (hash.contains("move")) 00208 wsMove(hash["move"].toMap()); 00209 } 00210 00211 void ServiceSync::onOpen() 00212 { 00213 qDebug() << "WebSocket connection OK!"; 00214 ws_tries = 0; 00215 ws_delay = 1; 00216 } 00217 00218 void ServiceSync::onClose(const QString& error) 00219 { 00220 qDebug() << "WebSocket connection error: " << error; 00221 if (ws_tries < 5) 00222 { 00223 ws_tries++; 00224 qDebug() << "Retry" << ws_tries << "in" << ws_delay << "s"; 00225 QTimer::singleShot(ws_delay * 1000, ws, SLOT(connect())); 00226 ws_delay *= 2; 00227 } 00228 } 00229 00230 void ServiceSync::wsMove(const QVariantMap& data) 00231 { 00232 int oldPosition = data["position"].toInt() - 1; 00233 int newPosition = data["newPosition"].toInt() - 1; 00234 Queue::Queueable* elem = queue->getQueue()[oldPosition]; 00235 Library::AudioTrack *track = elem->toQueueTrack()->getTrack(); 00236 00237 00238 queue->remove(oldPosition); 00239 queue->add(new Queue::QueueTrack(*track), newPosition); 00240 queue->emitAltered(); 00241 }