00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <ortp/ortp.h>
00021 #include "utils.h"
00022 #include "scheduler.h"
00023 #include "rtpsession_priv.h"
00024
00025
00026 extern void rtp_session_process (RtpSession * session, uint32_t time, RtpScheduler *sched);
00027
00028
00029 void rtp_scheduler_init(RtpScheduler *sched)
00030 {
00031 sched->list=0;
00032 sched->time_=0;
00033
00034 rtp_scheduler_set_timer(sched,&posix_timer);
00035 ortp_mutex_init(&sched->lock,NULL);
00036 ortp_cond_init(&sched->unblock_select_cond,NULL);
00037 sched->max_sessions=sizeof(SessionSet)*8;
00038 session_set_init(&sched->all_sessions);
00039 sched->all_max=0;
00040 session_set_init(&sched->r_sessions);
00041 sched->r_max=0;
00042 session_set_init(&sched->w_sessions);
00043 sched->w_max=0;
00044 session_set_init(&sched->e_sessions);
00045 sched->e_max=0;
00046 }
00047
00048 RtpScheduler * rtp_scheduler_new()
00049 {
00050 RtpScheduler *sched=(RtpScheduler *) ortp_malloc(sizeof(RtpScheduler));
00051 memset(sched,0,sizeof(RtpScheduler));
00052 rtp_scheduler_init(sched);
00053 return sched;
00054 }
00055
00056 void rtp_scheduler_set_timer(RtpScheduler *sched,RtpTimer *timer)
00057 {
00058 if (sched->thread_running){
00059 ortp_warning("Cannot change timer while the scheduler is running !!");
00060 return;
00061 }
00062 sched->timer=timer;
00063
00064 sched->timer_inc=(timer->interval.tv_usec/1000) + (timer->interval.tv_sec*1000);
00065 }
00066
00067 void rtp_scheduler_start(RtpScheduler *sched)
00068 {
00069 if (sched->thread_running==0){
00070 sched->thread_running=1;
00071 ortp_mutex_lock(&sched->lock);
00072 ortp_thread_create(&sched->thread, NULL, rtp_scheduler_schedule,(void*)sched);
00073 ortp_cond_wait(&sched->unblock_select_cond,&sched->lock);
00074 ortp_mutex_unlock(&sched->lock);
00075 }
00076 else ortp_warning("Scheduler thread already running.");
00077
00078 }
00079 void rtp_scheduler_stop(RtpScheduler *sched)
00080 {
00081 if (sched->thread_running==1)
00082 {
00083 sched->thread_running=0;
00084 ortp_thread_join(sched->thread, NULL);
00085 }
00086 else ortp_warning("Scheduler thread is not running.");
00087 }
00088
00089 void rtp_scheduler_destroy(RtpScheduler *sched)
00090 {
00091 if (sched->thread_running) rtp_scheduler_stop(sched);
00092 ortp_mutex_destroy(&sched->lock);
00093
00094 ortp_cond_destroy(&sched->unblock_select_cond);
00095 ortp_free(sched);
00096 }
00097
00098 void * rtp_scheduler_schedule(void * psched)
00099 {
00100 RtpScheduler *sched=(RtpScheduler*) psched;
00101 RtpTimer *timer=sched->timer;
00102 RtpSession *current;
00103
00104
00105
00106 ortp_mutex_lock(&sched->lock);
00107 ortp_cond_signal(&sched->unblock_select_cond);
00108 ortp_mutex_unlock(&sched->lock);
00109 timer->timer_init();
00110 while(sched->thread_running)
00111 {
00112
00113 ortp_mutex_lock(&sched->lock);
00114
00115 current=sched->list;
00116
00117 while (current!=NULL)
00118 {
00119 ortp_debug("scheduler: processing session=0x%x.\n",current);
00120 rtp_session_process(current,sched->time_,sched);
00121 current=current->next;
00122 }
00123
00124 ortp_cond_broadcast(&sched->unblock_select_cond);
00125 ortp_mutex_unlock(&sched->lock);
00126
00127
00128
00129
00130 timer->timer_do();
00131 sched->time_+=sched->timer_inc;
00132 }
00133
00134 timer->timer_uninit();
00135 return NULL;
00136 }
00137
00138 void rtp_scheduler_add_session(RtpScheduler *sched, RtpSession *session)
00139 {
00140 RtpSession *oldfirst;
00141 int i;
00142 if (session->flags & RTP_SESSION_IN_SCHEDULER){
00143
00144 return;
00145 }
00146 rtp_scheduler_lock(sched);
00147
00148 oldfirst=sched->list;
00149 sched->list=session;
00150 session->next=oldfirst;
00151 if (sched->max_sessions==0){
00152 ortp_error("rtp_scheduler_add_session: max_session=0 !");
00153 }
00154
00155 for (i=0;i<sched->max_sessions;i++){
00156 if (!ORTP_FD_ISSET(i,&sched->all_sessions.rtpset)){
00157 session->mask_pos=i;
00158 session_set_set(&sched->all_sessions,session);
00159
00160 if (session->flags & RTP_SESSION_RECV_NOT_STARTED)
00161 session_set_set(&sched->r_sessions,session);
00162 if (session->flags & RTP_SESSION_SEND_NOT_STARTED)
00163 session_set_set(&sched->w_sessions,session);
00164 if (i>sched->all_max){
00165 sched->all_max=i;
00166 }
00167 break;
00168 }
00169 }
00170
00171 rtp_session_set_flag(session,RTP_SESSION_IN_SCHEDULER);
00172 rtp_scheduler_unlock(sched);
00173 }
00174
00175 void rtp_scheduler_remove_session(RtpScheduler *sched, RtpSession *session)
00176 {
00177 RtpSession *tmp;
00178 int cond=1;
00179 return_if_fail(session!=NULL);
00180 if (!(session->flags & RTP_SESSION_IN_SCHEDULER)){
00181
00182 return;
00183 }
00184
00185 rtp_scheduler_lock(sched);
00186 tmp=sched->list;
00187 if (tmp==session){
00188 sched->list=tmp->next;
00189 rtp_session_unset_flag(session,RTP_SESSION_IN_SCHEDULER);
00190 session_set_clr(&sched->all_sessions,session);
00191 rtp_scheduler_unlock(sched);
00192 return;
00193 }
00194
00195 while(cond){
00196 if (tmp!=NULL){
00197 if (tmp->next==session){
00198 tmp->next=tmp->next->next;
00199 cond=0;
00200 }
00201 else tmp=tmp->next;
00202 }else {
00203
00204 ortp_warning("rtp_scheduler_remove_session: the session was not found in the scheduler list!");
00205 cond=0;
00206 }
00207 }
00208 rtp_session_unset_flag(session,RTP_SESSION_IN_SCHEDULER);
00209
00210 session_set_clr(&sched->all_sessions,session);
00211 rtp_scheduler_unlock(sched);
00212 }