2 This file is part of PulseAudio.
4 Copyright 2006 Lennart Poettering
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
29 #include <pulsecore/thread.h>
30 #include <pulsecore/semaphore.h>
31 #include <pulsecore/macro.h>
33 #include <pulse/mainloop-api.h>
35 #include "thread-mq.h"
37 PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq
);
39 static void asyncmsgq_read_cb(pa_mainloop_api
*api
, pa_io_event
*e
, int fd
, pa_io_event_flags_t events
, void *userdata
) {
40 pa_thread_mq
*q
= userdata
;
43 pa_assert(events
== PA_IO_EVENT_INPUT
);
45 if (pa_asyncmsgq_read_fd(q
->outq
) == fd
)
46 pa_asyncmsgq_ref(aq
= q
->outq
);
47 else if (pa_asyncmsgq_read_fd(q
->inq
) == fd
)
48 pa_asyncmsgq_ref(aq
= q
->inq
);
50 pa_assert_not_reached();
52 pa_asyncmsgq_read_after_poll(aq
);
61 /* Check whether there is a message for us to process */
62 while (pa_asyncmsgq_get(aq
, &object
, &code
, &data
, &offset
, &chunk
, 0) >= 0) {
65 if (!object
&& code
== PA_MESSAGE_SHUTDOWN
) {
66 pa_asyncmsgq_done(aq
, 0);
71 ret
= pa_asyncmsgq_dispatch(object
, code
, data
, offset
, &chunk
);
72 pa_asyncmsgq_done(aq
, ret
);
75 if (pa_asyncmsgq_read_before_poll(aq
) == 0)
79 pa_asyncmsgq_unref(aq
);
82 static void asyncmsgq_write_inq_cb(pa_mainloop_api
*api
, pa_io_event
*e
, int fd
, pa_io_event_flags_t events
, void *userdata
) {
83 pa_thread_mq
*q
= userdata
;
85 pa_assert(pa_asyncmsgq_write_fd(q
->inq
) == fd
);
86 pa_assert(events
== PA_IO_EVENT_INPUT
);
88 pa_asyncmsgq_write_after_poll(q
->inq
);
89 pa_asyncmsgq_write_before_poll(q
->inq
);
92 static void asyncmsgq_write_outq_cb(pa_mainloop_api
*api
, pa_io_event
*e
, int fd
, pa_io_event_flags_t events
, void *userdata
) {
93 pa_thread_mq
*q
= userdata
;
95 pa_assert(pa_asyncmsgq_write_fd(q
->outq
) == fd
);
96 pa_assert(events
== PA_IO_EVENT_INPUT
);
98 pa_asyncmsgq_write_after_poll(q
->outq
);
99 pa_asyncmsgq_write_before_poll(q
->outq
);
102 void pa_thread_mq_init_thread_mainloop(pa_thread_mq
*q
, pa_mainloop_api
*main_mainloop
, pa_mainloop_api
*thread_mainloop
) {
104 pa_assert(main_mainloop
);
105 pa_assert(thread_mainloop
);
107 pa_assert_se(q
->inq
= pa_asyncmsgq_new(0));
108 pa_assert_se(q
->outq
= pa_asyncmsgq_new(0));
110 q
->main_mainloop
= main_mainloop
;
111 q
->thread_mainloop
= thread_mainloop
;
113 pa_assert_se(pa_asyncmsgq_read_before_poll(q
->outq
) == 0);
114 pa_asyncmsgq_write_before_poll(q
->outq
);
115 pa_assert_se(q
->read_main_event
= main_mainloop
->io_new(main_mainloop
, pa_asyncmsgq_read_fd(q
->outq
), PA_IO_EVENT_INPUT
, asyncmsgq_read_cb
, q
));
116 pa_assert_se(q
->write_thread_event
= main_mainloop
->io_new(thread_mainloop
, pa_asyncmsgq_write_fd(q
->outq
), PA_IO_EVENT_INPUT
, asyncmsgq_write_outq_cb
, q
));
118 pa_asyncmsgq_read_before_poll(q
->inq
);
119 pa_asyncmsgq_write_before_poll(q
->inq
);
120 pa_assert_se(q
->read_thread_event
= thread_mainloop
->io_new(thread_mainloop
, pa_asyncmsgq_read_fd(q
->inq
), PA_IO_EVENT_INPUT
, asyncmsgq_read_cb
, q
));
121 pa_assert_se(q
->write_main_event
= main_mainloop
->io_new(main_mainloop
, pa_asyncmsgq_write_fd(q
->inq
), PA_IO_EVENT_INPUT
, asyncmsgq_write_inq_cb
, q
));
124 void pa_thread_mq_init(pa_thread_mq
*q
, pa_mainloop_api
*mainloop
, pa_rtpoll
*rtpoll
) {
128 q
->main_mainloop
= mainloop
;
129 q
->thread_mainloop
= NULL
;
131 pa_assert_se(q
->inq
= pa_asyncmsgq_new(0));
132 pa_assert_se(q
->outq
= pa_asyncmsgq_new(0));
134 pa_assert_se(pa_asyncmsgq_read_before_poll(q
->outq
) == 0);
135 pa_assert_se(q
->read_main_event
= mainloop
->io_new(mainloop
, pa_asyncmsgq_read_fd(q
->outq
), PA_IO_EVENT_INPUT
, asyncmsgq_read_cb
, q
));
137 pa_asyncmsgq_write_before_poll(q
->inq
);
138 pa_assert_se(q
->write_main_event
= mainloop
->io_new(mainloop
, pa_asyncmsgq_write_fd(q
->inq
), PA_IO_EVENT_INPUT
, asyncmsgq_write_inq_cb
, q
));
140 pa_rtpoll_item_new_asyncmsgq_read(rtpoll
, PA_RTPOLL_EARLY
, q
->inq
);
141 pa_rtpoll_item_new_asyncmsgq_write(rtpoll
, PA_RTPOLL_LATE
, q
->outq
);
144 void pa_thread_mq_done(pa_thread_mq
*q
) {
147 /* Since we are called from main context we can be sure that the
148 * inq is empty. However, the outq might still contain messages
149 * for the main loop, which we need to dispatch (e.g. release
150 * msgs, other stuff). Hence do so if we aren't currently
151 * dispatching anyway. */
153 if (!pa_asyncmsgq_dispatching(q
->outq
))
154 pa_asyncmsgq_flush(q
->outq
, true);
156 q
->main_mainloop
->io_free(q
->read_main_event
);
157 q
->main_mainloop
->io_free(q
->write_main_event
);
158 q
->read_main_event
= q
->write_main_event
= NULL
;
160 if (q
->thread_mainloop
) {
161 q
->thread_mainloop
->io_free(q
->read_thread_event
);
162 q
->thread_mainloop
->io_free(q
->write_thread_event
);
163 q
->read_thread_event
= q
->write_thread_event
= NULL
;
166 pa_asyncmsgq_unref(q
->inq
);
167 pa_asyncmsgq_unref(q
->outq
);
168 q
->inq
= q
->outq
= NULL
;
170 q
->main_mainloop
= NULL
;
171 q
->thread_mainloop
= NULL
;
174 void pa_thread_mq_install(pa_thread_mq
*q
) {
177 pa_assert(!(PA_STATIC_TLS_GET(thread_mq
)));
178 PA_STATIC_TLS_SET(thread_mq
, q
);
181 pa_thread_mq
*pa_thread_mq_get(void) {
182 return PA_STATIC_TLS_GET(thread_mq
);