3 #include <netinet/in.h>
8 enum pa_pstream_descriptor_index
{
9 PA_PSTREAM_DESCRIPTOR_LENGTH
,
10 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
11 PA_PSTREAM_DESCRIPTOR_DELTA
,
12 PA_PSTREAM_DESCRIPTOR_MAX
15 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
17 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
18 #define FRAME_SIZE_MAX (1024*64)
21 enum { PA_PSTREAM_ITEM_PACKET
, PA_PSTREAM_ITEM_MEMBLOCK
} type
;
24 struct pa_memchunk chunk
;
29 struct pa_packet
*packet
;
33 struct pa_mainloop_api
*mainloop
;
34 struct mainloop_source
*mainloop_source
;
35 struct pa_iochannel
*io
;
36 struct pa_queue
*send_queue
;
38 int in_use
, shall_free
;
41 void (*die_callback
) (struct pa_pstream
*p
, void *userdata
);
42 void *die_callback_userdata
;
45 struct item_info
* current
;
46 pa_pstream_descriptor descriptor
;
52 struct pa_memblock
*memblock
;
53 struct pa_packet
*packet
;
54 pa_pstream_descriptor descriptor
;
59 void (*recieve_packet_callback
) (struct pa_pstream
*p
, struct pa_packet
*packet
, void *userdata
);
60 void *recieve_packet_callback_userdata
;
62 void (*recieve_memblock_callback
) (struct pa_pstream
*p
, uint32_t channel
, int32_t delta
, const struct pa_memchunk
*chunk
, void *userdata
);
63 void *recieve_memblock_callback_userdata
;
65 void (*drain_callback
)(struct pa_pstream
*p
, void *userdata
);
69 static void do_write(struct pa_pstream
*p
);
70 static void do_read(struct pa_pstream
*p
);
72 static void do_something(struct pa_pstream
*p
) {
73 assert(p
&& !p
->shall_free
);
74 p
->mainloop
->enable_fixed(p
->mainloop
, p
->mainloop_source
, 0);
79 if (pa_iochannel_is_hungup(p
->io
)) {
82 p
->die_callback(p
, p
->die_callback_userdata
);
87 if (pa_iochannel_is_writable(p
->io
)) {
98 if (pa_iochannel_is_readable(p
->io
)) {
109 static void io_callback(struct pa_iochannel
*io
, void *userdata
) {
110 struct pa_pstream
*p
= userdata
;
111 assert(p
&& p
->io
== io
);
115 static void fixed_callback(struct pa_mainloop_api
*m
, void *id
, void*userdata
) {
116 struct pa_pstream
*p
= userdata
;
117 assert(p
&& p
->mainloop_source
== id
&& p
->mainloop
== m
);
121 struct pa_pstream
*pa_pstream_new(struct pa_mainloop_api
*m
, struct pa_iochannel
*io
) {
122 struct pa_pstream
*p
;
125 p
= malloc(sizeof(struct pa_pstream
));
129 pa_iochannel_set_callback(io
, io_callback
, p
);
132 p
->die_callback
= NULL
;
133 p
->die_callback_userdata
= NULL
;
136 p
->mainloop_source
= m
->source_fixed(m
, fixed_callback
, p
);
137 m
->enable_fixed(m
, p
->mainloop_source
, 0);
139 p
->send_queue
= pa_queue_new();
140 assert(p
->send_queue
);
142 p
->write
.current
= NULL
;
145 p
->read
.memblock
= NULL
;
146 p
->read
.packet
= NULL
;
149 p
->recieve_packet_callback
= NULL
;
150 p
->recieve_packet_callback_userdata
= NULL
;
152 p
->recieve_memblock_callback
= NULL
;
153 p
->recieve_memblock_callback_userdata
= NULL
;
155 p
->drain_callback
= NULL
;
156 p
->drain_userdata
= NULL
;
158 p
->in_use
= p
->shall_free
= 0;
163 static void item_free(void *item
, void *p
) {
164 struct item_info
*i
= item
;
167 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
168 assert(i
->chunk
.memblock
);
169 pa_memblock_unref(i
->chunk
.memblock
);
171 assert(i
->type
== PA_PSTREAM_ITEM_PACKET
);
173 pa_packet_unref(i
->packet
);
179 void pa_pstream_free(struct pa_pstream
*p
) {
183 /* If this pstream object is used by someone else on the call stack, we have to postpone the freeing */
184 p
->dead
= p
->shall_free
= 1;
188 pa_iochannel_free(p
->io
);
189 pa_queue_free(p
->send_queue
, item_free
, NULL
);
191 if (p
->write
.current
)
192 item_free(p
->write
.current
, NULL
);
194 if (p
->read
.memblock
)
195 pa_memblock_unref(p
->read
.memblock
);
198 pa_packet_unref(p
->read
.packet
);
200 p
->mainloop
->cancel_fixed(p
->mainloop
, p
->mainloop_source
);
204 void pa_pstream_send_packet(struct pa_pstream
*p
, struct pa_packet
*packet
) {
208 i
= malloc(sizeof(struct item_info
));
210 i
->type
= PA_PSTREAM_ITEM_PACKET
;
211 i
->packet
= pa_packet_ref(packet
);
213 pa_queue_push(p
->send_queue
, i
);
214 p
->mainloop
->enable_fixed(p
->mainloop
, p
->mainloop_source
, 1);
217 void pa_pstream_send_memblock(struct pa_pstream
*p
, uint32_t channel
, int32_t delta
, const struct pa_memchunk
*chunk
) {
219 assert(p
&& channel
!= (uint32_t) -1 && chunk
);
221 i
= malloc(sizeof(struct item_info
));
223 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
225 i
->channel
= channel
;
228 pa_memblock_ref(i
->chunk
.memblock
);
230 pa_queue_push(p
->send_queue
, i
);
231 p
->mainloop
->enable_fixed(p
->mainloop
, p
->mainloop_source
, 1);
234 void pa_pstream_set_recieve_packet_callback(struct pa_pstream
*p
, void (*callback
) (struct pa_pstream
*p
, struct pa_packet
*packet
, void *userdata
), void *userdata
) {
235 assert(p
&& callback
);
237 p
->recieve_packet_callback
= callback
;
238 p
->recieve_packet_callback_userdata
= userdata
;
241 void pa_pstream_set_recieve_memblock_callback(struct pa_pstream
*p
, void (*callback
) (struct pa_pstream
*p
, uint32_t channel
, int32_t delta
, const struct pa_memchunk
*chunk
, void *userdata
), void *userdata
) {
242 assert(p
&& callback
);
244 p
->recieve_memblock_callback
= callback
;
245 p
->recieve_memblock_callback_userdata
= userdata
;
248 static void prepare_next_write_item(struct pa_pstream
*p
) {
251 if (!(p
->write
.current
= pa_queue_pop(p
->send_queue
)))
256 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
257 assert(p
->write
.current
->packet
);
258 p
->write
.data
= p
->write
.current
->packet
->data
;
259 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
260 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
261 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
] = 0;
263 assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
&& p
->write
.current
->chunk
.memblock
);
264 p
->write
.data
= p
->write
.current
->chunk
.memblock
->data
+ p
->write
.current
->chunk
.index
;
265 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
266 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
267 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
] = htonl(p
->write
.current
->delta
);
271 static void do_write(struct pa_pstream
*p
) {
277 if (!p
->write
.current
)
278 prepare_next_write_item(p
);
280 if (!p
->write
.current
)
283 assert(p
->write
.data
);
285 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
286 d
= (void*) p
->write
.descriptor
+ p
->write
.index
;
287 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
289 d
= (void*) p
->write
.data
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
290 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
293 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
298 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
299 assert(p
->write
.current
);
300 item_free(p
->write
.current
, (void *) 1);
301 p
->write
.current
= NULL
;
303 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
304 p
->drain_callback(p
, p
->drain_userdata
);
312 p
->die_callback(p
, p
->die_callback_userdata
);
315 static void do_read(struct pa_pstream
*p
) {
321 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
322 d
= (void*) p
->read
.descriptor
+ p
->read
.index
;
323 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
325 assert(p
->read
.data
);
326 d
= (void*) p
->read
.data
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
327 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
330 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
335 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
336 /* Reading of frame descriptor complete */
338 /* Frame size too large */
339 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) > FRAME_SIZE_MAX
)
342 assert(!p
->read
.packet
&& !p
->read
.memblock
);
344 if (ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]) == (uint32_t) -1) {
345 /* Frame is a packet frame */
346 p
->read
.packet
= pa_packet_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]));
347 assert(p
->read
.packet
);
348 p
->read
.data
= p
->read
.packet
->data
;
350 /* Frame is a memblock frame */
351 p
->read
.memblock
= pa_memblock_new(ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]));
352 assert(p
->read
.memblock
);
353 p
->read
.data
= p
->read
.memblock
->data
;
356 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
357 /* Frame payload available */
359 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) { /* Is this memblock data? Than pass it to the user */
362 l
= (p
->read
.index
- r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
: (size_t) r
;
365 struct pa_memchunk chunk
;
367 chunk
.memblock
= p
->read
.memblock
;
368 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
371 if (p
->recieve_memblock_callback
)
372 p
->recieve_memblock_callback(
374 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
375 (int32_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_DELTA
]),
377 p
->recieve_memblock_callback_userdata
);
382 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
383 if (p
->read
.memblock
) {
384 assert(!p
->read
.packet
);
386 pa_memblock_unref(p
->read
.memblock
);
387 p
->read
.memblock
= NULL
;
389 assert(p
->read
.packet
);
391 if (p
->recieve_packet_callback
)
392 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->recieve_packet_callback_userdata
);
394 pa_packet_unref(p
->read
.packet
);
395 p
->read
.packet
= NULL
;
407 p
->die_callback(p
, p
->die_callback_userdata
);
411 void pa_pstream_set_die_callback(struct pa_pstream
*p
, void (*callback
)(struct pa_pstream
*p
, void *userdata
), void *userdata
) {
412 assert(p
&& callback
);
413 p
->die_callback
= callback
;
414 p
->die_callback_userdata
= userdata
;
417 int pa_pstream_is_pending(struct pa_pstream
*p
) {
423 return p
->write
.current
|| !pa_queue_is_empty(p
->send_queue
);
426 void pa_pstream_set_drain_callback(struct pa_pstream
*p
, void (*cb
)(struct pa_pstream
*p
, void *userdata
), void *userdata
) {
429 p
->drain_callback
= cb
;
430 p
->drain_userdata
= userdata
;