4 This file is part of polypaudio.
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
37 #include <polypcore/queue.h>
38 #include <polypcore/xmalloc.h>
39 #include <polypcore/log.h>
44 PA_PSTREAM_DESCRIPTOR_LENGTH
,
45 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
46 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
47 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
48 PA_PSTREAM_DESCRIPTOR_SEEK
,
49 PA_PSTREAM_DESCRIPTOR_MAX
52 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
54 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
55 #define FRAME_SIZE_MAX (1024*500) /* half a megabyte */
58 enum { PA_PSTREAM_ITEM_PACKET
, PA_PSTREAM_ITEM_MEMBLOCK
} type
;
64 pa_seek_mode_t seek_mode
;
73 pa_mainloop_api
*mainloop
;
74 pa_defer_event
*defer_event
;
79 void (*die_callback
) (pa_pstream
*p
, void *userdata
);
80 void *die_callback_userdata
;
83 struct item_info
* current
;
84 pa_pstream_descriptor descriptor
;
90 pa_memblock
*memblock
;
92 pa_pstream_descriptor descriptor
;
97 void (*recieve_packet_callback
) (pa_pstream
*p
, pa_packet
*packet
, void *userdata
);
98 void *recieve_packet_callback_userdata
;
100 void (*recieve_memblock_callback
) (pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
);
101 void *recieve_memblock_callback_userdata
;
103 void (*drain_callback
)(pa_pstream
*p
, void *userdata
);
104 void *drain_userdata
;
106 pa_memblock_stat
*memblock_stat
;
109 static int do_write(pa_pstream
*p
);
110 static int do_read(pa_pstream
*p
);
112 static void do_something(pa_pstream
*p
) {
115 p
->mainloop
->defer_enable(p
->defer_event
, 0);
119 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
122 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
125 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
138 p
->die_callback(p
, p
->die_callback_userdata
);
143 static void io_callback(pa_iochannel
*io
, void *userdata
) {
144 pa_pstream
*p
= userdata
;
152 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
153 pa_pstream
*p
= userdata
;
156 assert(p
->defer_event
== e
);
157 assert(p
->mainloop
== m
);
162 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_memblock_stat
*s
) {
166 p
= pa_xnew(pa_pstream
, 1);
170 pa_iochannel_set_callback(io
, io_callback
, p
);
173 p
->die_callback
= NULL
;
174 p
->die_callback_userdata
= NULL
;
177 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
178 m
->defer_enable(p
->defer_event
, 0);
180 p
->send_queue
= pa_queue_new();
181 assert(p
->send_queue
);
183 p
->write
.current
= NULL
;
186 p
->read
.memblock
= NULL
;
187 p
->read
.packet
= NULL
;
190 p
->recieve_packet_callback
= NULL
;
191 p
->recieve_packet_callback_userdata
= NULL
;
193 p
->recieve_memblock_callback
= NULL
;
194 p
->recieve_memblock_callback_userdata
= NULL
;
196 p
->drain_callback
= NULL
;
197 p
->drain_userdata
= NULL
;
199 p
->memblock_stat
= s
;
201 pa_iochannel_socket_set_rcvbuf(io
, 1024*8);
202 pa_iochannel_socket_set_sndbuf(io
, 1024*8);
207 static void item_free(void *item
, PA_GCC_UNUSED
void *p
) {
208 struct item_info
*i
= item
;
211 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
212 assert(i
->chunk
.memblock
);
213 pa_memblock_unref(i
->chunk
.memblock
);
215 assert(i
->type
== PA_PSTREAM_ITEM_PACKET
);
217 pa_packet_unref(i
->packet
);
223 static void pstream_free(pa_pstream
*p
) {
228 pa_queue_free(p
->send_queue
, item_free
, NULL
);
230 if (p
->write
.current
)
231 item_free(p
->write
.current
, NULL
);
233 if (p
->read
.memblock
)
234 pa_memblock_unref(p
->read
.memblock
);
237 pa_packet_unref(p
->read
.packet
);
242 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
) {
244 assert(p
&& packet
&& p
->ref
>= 1);
249 /* pa_log(__FILE__": push-packet %p\n", packet); */
251 i
= pa_xnew(struct item_info
, 1);
252 i
->type
= PA_PSTREAM_ITEM_PACKET
;
253 i
->packet
= pa_packet_ref(packet
);
255 pa_queue_push(p
->send_queue
, i
);
256 p
->mainloop
->defer_enable(p
->defer_event
, 1);
259 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
261 assert(p
&& channel
!= (uint32_t) -1 && chunk
&& p
->ref
>= 1);
266 /* pa_log(__FILE__": push-memblock %p\n", chunk); */
268 i
= pa_xnew(struct item_info
, 1);
269 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
271 i
->channel
= channel
;
273 i
->seek_mode
= seek_mode
;
275 pa_memblock_ref(i
->chunk
.memblock
);
277 pa_queue_push(p
->send_queue
, i
);
278 p
->mainloop
->defer_enable(p
->defer_event
, 1);
281 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, void (*callback
) (pa_pstream
*p
, pa_packet
*packet
, void *userdata
), void *userdata
) {
282 assert(p
&& callback
);
284 p
->recieve_packet_callback
= callback
;
285 p
->recieve_packet_callback_userdata
= userdata
;
288 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, void (*callback
) (pa_pstream
*p
, uint32_t channel
, int64_t delta
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
), void *userdata
) {
289 assert(p
&& callback
);
291 p
->recieve_memblock_callback
= callback
;
292 p
->recieve_memblock_callback_userdata
= userdata
;
295 static void prepare_next_write_item(pa_pstream
*p
) {
298 if (!(p
->write
.current
= pa_queue_pop(p
->send_queue
)))
303 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
304 /*pa_log(__FILE__": pop-packet %p\n", p->write.current->packet);*/
306 assert(p
->write
.current
->packet
);
307 p
->write
.data
= p
->write
.current
->packet
->data
;
308 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
309 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
310 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
311 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
312 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
] = 0;
314 assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
&& p
->write
.current
->chunk
.memblock
);
315 p
->write
.data
= (uint8_t*) p
->write
.current
->chunk
.memblock
->data
+ p
->write
.current
->chunk
.index
;
316 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
317 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
318 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
319 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
320 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
] = htonl(p
->write
.current
->seek_mode
);
324 static int do_write(pa_pstream
*p
) {
330 if (!p
->write
.current
)
331 prepare_next_write_item(p
);
333 if (!p
->write
.current
)
336 assert(p
->write
.data
);
338 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
339 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
340 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
342 d
= (uint8_t*) p
->write
.data
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
343 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
346 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
351 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
352 assert(p
->write
.current
);
353 item_free(p
->write
.current
, (void *) 1);
354 p
->write
.current
= NULL
;
356 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
357 p
->drain_callback(p
, p
->drain_userdata
);
363 static int do_read(pa_pstream
*p
) {
369 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
370 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
371 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
373 assert(p
->read
.data
);
374 d
= (uint8_t*) p
->read
.data
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
375 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
378 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
383 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
384 /* Reading of frame descriptor complete */
386 /* Frame size too large */
387 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) > FRAME_SIZE_MAX
) {
388 pa_log_warn(__FILE__
": Frame size too large\n");
392 assert(!p
->read
.packet
&& !p
->read
.memblock
);
394 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]) == (uint32_t) -1) {
395 /* Frame is a packet frame */
396 p
->read
.packet
= pa_packet_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]));
397 p
->read
.data
= p
->read
.packet
->data
;
399 /* Frame is a memblock frame */
400 p
->read
.memblock
= pa_memblock_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]), p
->memblock_stat
);
401 p
->read
.data
= p
->read
.memblock
->data
;
403 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
]) > PA_SEEK_RELATIVE_END
) {
404 pa_log_warn(__FILE__
": Invalid seek mode\n");
409 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
410 /* Frame payload available */
412 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) { /* Is this memblock data? Than pass it to the user */
413 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
418 chunk
.memblock
= p
->read
.memblock
;
419 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
422 if (p
->recieve_memblock_callback
) {
426 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
427 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
429 p
->recieve_memblock_callback(
431 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
433 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
]),
435 p
->recieve_memblock_callback_userdata
);
438 /* Drop seek info for following callbacks */
439 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_SEEK
] =
440 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
441 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
446 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
447 if (p
->read
.memblock
) {
448 assert(!p
->read
.packet
);
450 pa_memblock_unref(p
->read
.memblock
);
451 p
->read
.memblock
= NULL
;
453 assert(p
->read
.packet
);
455 if (p
->recieve_packet_callback
)
456 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->recieve_packet_callback_userdata
);
458 pa_packet_unref(p
->read
.packet
);
459 p
->read
.packet
= NULL
;
469 void pa_pstream_set_die_callback(pa_pstream
*p
, void (*callback
)(pa_pstream
*p
, void *userdata
), void *userdata
) {
470 assert(p
&& callback
);
471 p
->die_callback
= callback
;
472 p
->die_callback_userdata
= userdata
;
475 int pa_pstream_is_pending(pa_pstream
*p
) {
481 return p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
484 void pa_pstream_set_drain_callback(pa_pstream
*p
, void (*cb
)(pa_pstream
*p
, void *userdata
), void *userdata
) {
488 p
->drain_callback
= cb
;
489 p
->drain_userdata
= userdata
;
492 void pa_pstream_unref(pa_pstream
*p
) {
500 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
508 void pa_pstream_close(pa_pstream
*p
) {
514 pa_iochannel_free(p
->io
);
518 if (p
->defer_event
) {
519 p
->mainloop
->defer_free(p
->defer_event
);
520 p
->defer_event
= NULL
;
523 p
->die_callback
= NULL
;
524 p
->drain_callback
= NULL
;
525 p
->recieve_packet_callback
= NULL
;
526 p
->recieve_memblock_callback
= NULL
;