]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
Protect platform dependent headers with ifdefs.
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34 #ifdef HAVE_SYS_UN_H
35 #include <sys/un.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40
41 #include "winsock.h"
42
43 #include <pulse/xmalloc.h>
44
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49
50 #include "pstream.h"
51
52 enum {
53 PA_PSTREAM_DESCRIPTOR_LENGTH,
54 PA_PSTREAM_DESCRIPTOR_CHANNEL,
55 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
56 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
57 PA_PSTREAM_DESCRIPTOR_SEEK,
58 PA_PSTREAM_DESCRIPTOR_MAX
59 };
60
61 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
62
63 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
64 #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
65
66 struct item_info {
67 enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
68
69 /* memblock info */
70 pa_memchunk chunk;
71 uint32_t channel;
72 int64_t offset;
73 pa_seek_mode_t seek_mode;
74
75 /* packet info */
76 pa_packet *packet;
77 #ifdef HAVE_CREDS
78 int with_creds;
79 pa_creds creds;
80 #endif
81 };
82
83 struct pa_pstream {
84 int ref;
85
86 pa_mainloop_api *mainloop;
87 pa_defer_event *defer_event;
88 pa_iochannel *io;
89 pa_queue *send_queue;
90
91 int dead;
92
93 struct {
94 struct item_info* current;
95 pa_pstream_descriptor descriptor;
96 void *data;
97 size_t index;
98 } write;
99
100 struct {
101 pa_memblock *memblock;
102 pa_packet *packet;
103 pa_pstream_descriptor descriptor;
104 void *data;
105 size_t index;
106 } read;
107
108 pa_pstream_packet_cb_t recieve_packet_callback;
109 void *recieve_packet_callback_userdata;
110
111 pa_pstream_memblock_cb_t recieve_memblock_callback;
112 void *recieve_memblock_callback_userdata;
113
114 pa_pstream_notify_cb_t drain_callback;
115 void *drain_callback_userdata;
116
117 pa_pstream_notify_cb_t die_callback;
118 void *die_callback_userdata;
119
120 pa_memblock_stat *memblock_stat;
121
122 #ifdef HAVE_CREDS
123 pa_creds read_creds, write_creds;
124 int read_creds_valid, send_creds_now;
125 #endif
126 };
127
128 static int do_write(pa_pstream *p);
129 static int do_read(pa_pstream *p);
130
131 static void do_something(pa_pstream *p) {
132 assert(p);
133
134 p->mainloop->defer_enable(p->defer_event, 0);
135
136 pa_pstream_ref(p);
137
138 if (!p->dead && pa_iochannel_is_readable(p->io)) {
139 if (do_read(p) < 0)
140 goto fail;
141 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
142 goto fail;
143
144 if (!p->dead && pa_iochannel_is_writable(p->io)) {
145 if (do_write(p) < 0)
146 goto fail;
147 }
148
149 pa_pstream_unref(p);
150 return;
151
152 fail:
153
154 p->dead = 1;
155
156 if (p->die_callback)
157 p->die_callback(p, p->die_callback_userdata);
158
159 pa_pstream_unref(p);
160 }
161
162 static void io_callback(pa_iochannel*io, void *userdata) {
163 pa_pstream *p = userdata;
164
165 assert(p);
166 assert(p->io == io);
167
168 do_something(p);
169 }
170
171 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
172 pa_pstream *p = userdata;
173
174 assert(p);
175 assert(p->defer_event == e);
176 assert(p->mainloop == m);
177
178 do_something(p);
179 }
180
181 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) {
182 pa_pstream *p;
183 assert(io);
184
185 p = pa_xnew(pa_pstream, 1);
186
187 p->ref = 1;
188 p->io = io;
189 pa_iochannel_set_callback(io, io_callback, p);
190
191 p->dead = 0;
192
193 p->mainloop = m;
194 p->defer_event = m->defer_new(m, defer_callback, p);
195 m->defer_enable(p->defer_event, 0);
196
197 p->send_queue = pa_queue_new();
198 assert(p->send_queue);
199
200 p->write.current = NULL;
201 p->write.index = 0;
202
203 p->read.memblock = NULL;
204 p->read.packet = NULL;
205 p->read.index = 0;
206
207 p->recieve_packet_callback = NULL;
208 p->recieve_packet_callback_userdata = NULL;
209
210 p->recieve_memblock_callback = NULL;
211 p->recieve_memblock_callback_userdata = NULL;
212
213 p->drain_callback = NULL;
214 p->drain_callback_userdata = NULL;
215
216 p->die_callback = NULL;
217 p->die_callback_userdata = NULL;
218
219 p->memblock_stat = s;
220
221 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
222 pa_iochannel_socket_set_sndbuf(io, 1024*8);
223
224 #ifdef HAVE_CREDS
225 p->send_creds_now = 0;
226 p->read_creds_valid = 0;
227 #endif
228 return p;
229 }
230
231 static void item_free(void *item, PA_GCC_UNUSED void *p) {
232 struct item_info *i = item;
233 assert(i);
234
235 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
236 assert(i->chunk.memblock);
237 pa_memblock_unref(i->chunk.memblock);
238 } else {
239 assert(i->type == PA_PSTREAM_ITEM_PACKET);
240 assert(i->packet);
241 pa_packet_unref(i->packet);
242 }
243
244 pa_xfree(i);
245 }
246
247 static void pstream_free(pa_pstream *p) {
248 assert(p);
249
250 pa_pstream_close(p);
251
252 pa_queue_free(p->send_queue, item_free, NULL);
253
254 if (p->write.current)
255 item_free(p->write.current, NULL);
256
257 if (p->read.memblock)
258 pa_memblock_unref(p->read.memblock);
259
260 if (p->read.packet)
261 pa_packet_unref(p->read.packet);
262
263 pa_xfree(p);
264 }
265
266 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
267 struct item_info *i;
268 assert(p && packet && p->ref >= 1);
269
270 if (p->dead)
271 return;
272
273 /* pa_log(__FILE__": push-packet %p", packet); */
274
275 i = pa_xnew(struct item_info, 1);
276 i->type = PA_PSTREAM_ITEM_PACKET;
277 i->packet = pa_packet_ref(packet);
278 #ifdef HAVE_CREDS
279 if ((i->with_creds = !!creds))
280 i->creds = *creds;
281 #endif
282
283 pa_queue_push(p->send_queue, i);
284 p->mainloop->defer_enable(p->defer_event, 1);
285 }
286
287 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
288 struct item_info *i;
289 assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
290
291 if (p->dead)
292 return;
293
294 /* pa_log(__FILE__": push-memblock %p", chunk); */
295
296 i = pa_xnew(struct item_info, 1);
297 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
298 i->chunk = *chunk;
299 i->channel = channel;
300 i->offset = offset;
301 i->seek_mode = seek_mode;
302 #ifdef HAVE_CREDS
303 i->with_creds = 0;
304 #endif
305
306 pa_memblock_ref(i->chunk.memblock);
307
308 pa_queue_push(p->send_queue, i);
309 p->mainloop->defer_enable(p->defer_event, 1);
310 }
311
312 static void prepare_next_write_item(pa_pstream *p) {
313 assert(p);
314
315 if (!(p->write.current = pa_queue_pop(p->send_queue)))
316 return;
317
318 p->write.index = 0;
319
320 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
321 /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/
322
323 assert(p->write.current->packet);
324 p->write.data = p->write.current->packet->data;
325 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
326 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
327 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
328 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
329 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0;
330
331
332 } else {
333 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
334 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
335 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
336 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
337 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
338 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
339 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode);
340 }
341
342 #ifdef HAVE_CREDS
343 if ((p->send_creds_now = p->write.current->with_creds))
344 p->write_creds = p->write.current->creds;
345
346 #endif
347
348 }
349
350 static int do_write(pa_pstream *p) {
351 void *d;
352 size_t l;
353 ssize_t r;
354 assert(p);
355
356 if (!p->write.current)
357 prepare_next_write_item(p);
358
359 if (!p->write.current)
360 return 0;
361
362 assert(p->write.data);
363
364 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
365 d = (uint8_t*) p->write.descriptor + p->write.index;
366 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
367 } else {
368 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
369 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
370 }
371
372 #ifdef HAVE_CREDS
373 if (p->send_creds_now) {
374
375 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
376 return -1;
377
378 p->send_creds_now = 0;
379 } else
380 #endif
381
382 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
383 return -1;
384
385 p->write.index += r;
386
387 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
388 assert(p->write.current);
389 item_free(p->write.current, (void *) 1);
390 p->write.current = NULL;
391
392 if (p->drain_callback && !pa_pstream_is_pending(p))
393 p->drain_callback(p, p->drain_callback_userdata);
394 }
395
396 return 0;
397 }
398
399 static int do_read(pa_pstream *p) {
400 void *d;
401 size_t l;
402 ssize_t r;
403 assert(p);
404
405 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
406 d = (uint8_t*) p->read.descriptor + p->read.index;
407 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
408 } else {
409 assert(p->read.data);
410 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
411 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
412 }
413
414 #ifdef HAVE_CREDS
415 {
416 int b = 0;
417
418 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
419 return -1;
420
421 p->read_creds_valid = p->read_creds_valid || b;
422 }
423 #else
424 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
425 return -1;
426 #endif
427
428 p->read.index += r;
429
430 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
431 /* Reading of frame descriptor complete */
432
433 /* Frame size too large */
434 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
435 pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX);
436 return -1;
437 }
438
439 assert(!p->read.packet && !p->read.memblock);
440
441 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
442 /* Frame is a packet frame */
443 p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
444 p->read.data = p->read.packet->data;
445 } else {
446 /* Frame is a memblock frame */
447 p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
448 p->read.data = p->read.memblock->data;
449
450 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) {
451 pa_log_warn(__FILE__": Invalid seek mode");
452 return -1;
453 }
454 }
455
456 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
457 /* Frame payload available */
458
459 if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
460 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
461
462 if (l > 0) {
463 pa_memchunk chunk;
464
465 chunk.memblock = p->read.memblock;
466 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
467 chunk.length = l;
468
469 if (p->recieve_memblock_callback) {
470 int64_t offset;
471
472 offset = (int64_t) (
473 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
474 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
475
476 p->recieve_memblock_callback(
477 p,
478 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
479 offset,
480 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]),
481 &chunk,
482 p->recieve_memblock_callback_userdata);
483 }
484
485 /* Drop seek info for following callbacks */
486 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] =
487 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
488 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
489 }
490 }
491
492 /* Frame complete */
493 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
494 if (p->read.memblock) {
495 assert(!p->read.packet);
496
497 pa_memblock_unref(p->read.memblock);
498 p->read.memblock = NULL;
499 } else {
500 assert(p->read.packet);
501
502 if (p->recieve_packet_callback)
503 #ifdef HAVE_CREDS
504 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
505 #else
506 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
507 #endif
508
509 pa_packet_unref(p->read.packet);
510 p->read.packet = NULL;
511 }
512
513 p->read.index = 0;
514 #ifdef HAVE_CREDS
515 p->read_creds_valid = 0;
516 #endif
517 }
518 }
519
520 return 0;
521 }
522
523 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
524 assert(p);
525 assert(p->ref >= 1);
526
527 p->die_callback = cb;
528 p->die_callback_userdata = userdata;
529 }
530
531
532 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
533 assert(p);
534 assert(p->ref >= 1);
535
536 p->drain_callback = cb;
537 p->drain_callback_userdata = userdata;
538 }
539
540 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
541 assert(p);
542 assert(p->ref >= 1);
543
544 p->recieve_packet_callback = cb;
545 p->recieve_packet_callback_userdata = userdata;
546 }
547
548 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
549 assert(p);
550 assert(p->ref >= 1);
551
552 p->recieve_memblock_callback = cb;
553 p->recieve_memblock_callback_userdata = userdata;
554 }
555
556 int pa_pstream_is_pending(pa_pstream *p) {
557 assert(p);
558
559 if (p->dead)
560 return 0;
561
562 return p->write.current || !pa_queue_is_empty(p->send_queue);
563 }
564
565 void pa_pstream_unref(pa_pstream*p) {
566 assert(p);
567 assert(p->ref >= 1);
568
569 if (--p->ref == 0)
570 pstream_free(p);
571 }
572
573 pa_pstream* pa_pstream_ref(pa_pstream*p) {
574 assert(p);
575 assert(p->ref >= 1);
576
577 p->ref++;
578 return p;
579 }
580
581 void pa_pstream_close(pa_pstream *p) {
582 assert(p);
583
584 p->dead = 1;
585
586 if (p->io) {
587 pa_iochannel_free(p->io);
588 p->io = NULL;
589 }
590
591 if (p->defer_event) {
592 p->mainloop->defer_free(p->defer_event);
593 p->defer_event = NULL;
594 }
595
596 p->die_callback = NULL;
597 p->drain_callback = NULL;
598 p->recieve_packet_callback = NULL;
599 p->recieve_memblock_callback = NULL;
600 }