7 #include "protocol-native-spec.h"
11 #include "socket-client.h"
12 #include "pstream-util.h"
16 #define DEFAULT_MAXLENGTH 20480
17 #define DEFAULT_TLENGTH 10240
18 #define DEFAULT_PREBUF 4096
19 #define DEFAULT_MINREQ 1024
21 #define DEFAULT_TIMEOUT (5*60)
22 #define DEFAULT_SERVER "/tmp/polypaudio/native"
26 struct pa_mainloop_api
* mainloop
;
27 struct pa_socket_client
*client
;
28 struct pa_pstream
*pstream
;
29 struct pa_pdispatch
*pdispatch
;
30 struct pa_dynarray
*streams
;
31 struct pa_stream
*first_stream
;
43 void (*connect_complete_callback
)(struct pa_context
*c
, int success
, void *userdata
);
44 void *connect_complete_userdata
;
46 void (*drain_complete_callback
)(struct pa_context
*c
, void *userdata
);
47 void *drain_complete_userdata
;
49 void (*die_callback
)(struct pa_context
*c
, void *userdata
);
52 uint8_t auth_cookie
[PA_NATIVE_COOKIE_LENGTH
];
56 struct pa_context
*context
;
57 struct pa_stream
*next
, *previous
;
60 struct pa_buffer_attr buffer_attr
;
61 struct pa_sample_spec sample_spec
;
62 uint32_t device_index
;
65 enum pa_stream_direction direction
;
67 enum { STREAM_LOOKING_UP
, STREAM_CREATING
, STREAM_READY
, STREAM_DEAD
} state
;
68 uint32_t requested_bytes
;
70 void (*read_callback
)(struct pa_stream
*p
, const void*data
, size_t length
, void *userdata
);
73 void (*write_callback
)(struct pa_stream
*p
, size_t length
, void *userdata
);
76 void (*create_complete_callback
)(struct pa_stream
*s
, int success
, void *userdata
);
77 void *create_complete_userdata
;
79 void (*drain_complete_callback
)(struct pa_stream
*s
, void *userdata
);
80 void *drain_complete_userdata
;
82 void (*die_callback
)(struct pa_stream
*c
, void *userdata
);
86 static void command_request(struct pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct pa_tagstruct
*t
, void *userdata
);
88 static const struct pa_pdispatch_command command_table
[PA_COMMAND_MAX
] = {
89 [PA_COMMAND_ERROR
] = { NULL
},
90 [PA_COMMAND_REPLY
] = { NULL
},
91 [PA_COMMAND_CREATE_PLAYBACK_STREAM
] = { NULL
},
92 [PA_COMMAND_DELETE_PLAYBACK_STREAM
] = { NULL
},
93 [PA_COMMAND_CREATE_RECORD_STREAM
] = { NULL
},
94 [PA_COMMAND_DELETE_RECORD_STREAM
] = { NULL
},
95 [PA_COMMAND_EXIT
] = { NULL
},
96 [PA_COMMAND_REQUEST
] = { command_request
},
99 struct pa_context
*pa_context_new(struct pa_mainloop_api
*mainloop
, const char *name
) {
100 struct pa_context
*c
;
101 assert(mainloop
&& name
);
103 c
= malloc(sizeof(struct pa_context
));
105 c
->name
= strdup(name
);
106 c
->mainloop
= mainloop
;
110 c
->streams
= pa_dynarray_new();
112 c
->first_stream
= NULL
;
113 c
->error
= PA_ERROR_OK
;
114 c
->state
= CONTEXT_UNCONNECTED
;
117 c
->connect_complete_callback
= NULL
;
118 c
->connect_complete_userdata
= NULL
;
120 c
->drain_complete_callback
= NULL
;
121 c
->drain_complete_userdata
= NULL
;
123 c
->die_callback
= NULL
;
124 c
->die_userdata
= NULL
;
126 pa_check_for_sigpipe();
130 void pa_context_free(struct pa_context
*c
) {
133 while (c
->first_stream
)
134 pa_stream_free(c
->first_stream
);
137 pa_socket_client_free(c
->client
);
139 pa_pdispatch_free(c
->pdispatch
);
141 pa_pstream_free(c
->pstream
);
143 pa_dynarray_free(c
->streams
, NULL
, NULL
);
149 static void stream_dead(struct pa_stream
*s
) {
152 if (s
->state
== STREAM_DEAD
)
155 if (s
->state
== STREAM_READY
) {
156 s
->state
= STREAM_DEAD
;
158 s
->die_callback(s
, s
->die_userdata
);
160 s
->state
= STREAM_DEAD
;
163 static void context_dead(struct pa_context
*c
) {
167 if (c
->state
== CONTEXT_DEAD
)
171 pa_pdispatch_free(c
->pdispatch
);
175 pa_pstream_free(c
->pstream
);
179 pa_socket_client_free(c
->client
);
182 for (s
= c
->first_stream
; s
; s
= s
->next
)
185 if (c
->state
== CONTEXT_READY
) {
186 c
->state
= CONTEXT_DEAD
;
188 c
->die_callback(c
, c
->die_userdata
);
190 s
->state
= CONTEXT_DEAD
;
193 static void pstream_die_callback(struct pa_pstream
*p
, void *userdata
) {
194 struct pa_context
*c
= userdata
;
199 static void pstream_packet_callback(struct pa_pstream
*p
, struct pa_packet
*packet
, void *userdata
) {
200 struct pa_context
*c
= userdata
;
201 assert(p
&& packet
&& c
);
203 if (pa_pdispatch_run(c
->pdispatch
, packet
, c
) < 0) {
204 fprintf(stderr
, "polyp.c: invalid packet.\n");
209 static void pstream_memblock_callback(struct pa_pstream
*p
, uint32_t channel
, int32_t delta
, struct pa_memchunk
*chunk
, void *userdata
) {
210 struct pa_context
*c
= userdata
;
212 assert(p
&& chunk
&& c
&& chunk
->memblock
&& chunk
->memblock
->data
);
214 if (!(s
= pa_dynarray_get(c
->streams
, channel
)))
217 if (s
->read_callback
)
218 s
->read_callback(s
, chunk
->memblock
->data
+ chunk
->index
, chunk
->length
, s
->read_userdata
);
221 static int handle_error(struct pa_context
*c
, uint32_t command
, struct pa_tagstruct
*t
) {
224 if (command
== PA_COMMAND_ERROR
) {
225 if (pa_tagstruct_getu32(t
, &c
->error
) < 0) {
226 c
->error
= PA_ERROR_PROTOCOL
;
233 c
->error
= (command
== PA_COMMAND_TIMEOUT
) ? PA_ERROR_TIMEOUT
: PA_ERROR_INTERNAL
;
237 static void setup_complete_callback(struct pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct pa_tagstruct
*t
, void *userdata
) {
238 struct pa_context
*c
= userdata
;
239 assert(pd
&& c
&& (c
->state
== CONTEXT_AUTHORIZING
|| c
->state
== CONTEXT_SETTING_NAME
));
241 if (command
!= PA_COMMAND_REPLY
) {
242 handle_error(c
, command
, t
);
245 if (c
->connect_complete_callback
)
246 c
->connect_complete_callback(c
, 0, c
->connect_complete_userdata
);
251 if (c
->state
== CONTEXT_AUTHORIZING
) {
252 struct pa_tagstruct
*t
;
253 c
->state
= CONTEXT_SETTING_NAME
;
254 t
= pa_tagstruct_new(NULL
, 0);
256 pa_tagstruct_putu32(t
, PA_COMMAND_SET_NAME
);
257 pa_tagstruct_putu32(t
, tag
= c
->ctag
++);
258 pa_tagstruct_puts(t
, c
->name
);
259 pa_pstream_send_tagstruct(c
->pstream
, t
);
260 pa_pdispatch_register_reply(c
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, c
);
262 assert(c
->state
== CONTEXT_SETTING_NAME
);
264 c
->state
= CONTEXT_READY
;
266 if (c
->connect_complete_callback
)
267 c
->connect_complete_callback(c
, 1, c
->connect_complete_userdata
);
273 static void on_connection(struct pa_socket_client
*client
, struct pa_iochannel
*io
, void *userdata
) {
274 struct pa_context
*c
= userdata
;
275 struct pa_tagstruct
*t
;
277 assert(client
&& io
&& c
&& c
->state
== CONTEXT_CONNECTING
);
279 pa_socket_client_free(client
);
283 c
->error
= PA_ERROR_CONNECTIONREFUSED
;
286 if (c
->connect_complete_callback
)
287 c
->connect_complete_callback(c
, 0, c
->connect_complete_userdata
);
292 c
->pstream
= pa_pstream_new(c
->mainloop
, io
);
294 pa_pstream_set_die_callback(c
->pstream
, pstream_die_callback
, c
);
295 pa_pstream_set_recieve_packet_callback(c
->pstream
, pstream_packet_callback
, c
);
296 pa_pstream_set_recieve_memblock_callback(c
->pstream
, pstream_memblock_callback
, c
);
298 c
->pdispatch
= pa_pdispatch_new(c
->mainloop
, command_table
, PA_COMMAND_MAX
);
299 assert(c
->pdispatch
);
301 t
= pa_tagstruct_new(NULL
, 0);
303 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
304 pa_tagstruct_putu32(t
, tag
= c
->ctag
++);
305 pa_tagstruct_put_arbitrary(t
, c
->auth_cookie
, sizeof(c
->auth_cookie
));
306 pa_pstream_send_tagstruct(c
->pstream
, t
);
307 pa_pdispatch_register_reply(c
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, c
);
308 c
->state
= CONTEXT_AUTHORIZING
;
311 int pa_context_connect(struct pa_context
*c
, const char *server
, void (*complete
) (struct pa_context
*c
, int success
, void *userdata
), void *userdata
) {
312 assert(c
&& c
->state
== CONTEXT_UNCONNECTED
);
314 if (pa_authkey_load_from_home(PA_NATIVE_COOKIE_FILE
, c
->auth_cookie
, sizeof(c
->auth_cookie
)) < 0) {
315 c
->error
= PA_ERROR_AUTHKEY
;
320 if (!(c
->client
= pa_socket_client_new_unix(c
->mainloop
, server
? server
: DEFAULT_SERVER
))) {
321 c
->error
= PA_ERROR_CONNECTIONREFUSED
;
325 c
->connect_complete_callback
= complete
;
326 c
->connect_complete_userdata
= userdata
;
328 pa_socket_client_set_callback(c
->client
, on_connection
, c
);
329 c
->state
= CONTEXT_CONNECTING
;
334 int pa_context_is_dead(struct pa_context
*c
) {
336 return c
->state
== CONTEXT_DEAD
;
339 int pa_context_is_ready(struct pa_context
*c
) {
341 return c
->state
== CONTEXT_READY
;
344 int pa_context_errno(struct pa_context
*c
) {
349 void pa_context_set_die_callback(struct pa_context
*c
, void (*cb
)(struct pa_context
*c
, void *userdata
), void *userdata
) {
351 c
->die_callback
= cb
;
352 c
->die_userdata
= userdata
;
355 static void command_request(struct pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct pa_tagstruct
*t
, void *userdata
) {
357 struct pa_context
*c
= userdata
;
358 uint32_t bytes
, channel
;
359 assert(pd
&& command
== PA_COMMAND_REQUEST
&& t
&& c
);
361 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
362 pa_tagstruct_getu32(t
, &bytes
) < 0 ||
363 !pa_tagstruct_eof(t
)) {
364 c
->error
= PA_ERROR_PROTOCOL
;
369 if (!(s
= pa_dynarray_get(c
->streams
, channel
)))
372 if (s
->state
!= STREAM_READY
)
375 s
->requested_bytes
+= bytes
;
377 if (s
->requested_bytes
&& s
->write_callback
)
378 s
->write_callback(s
, s
->requested_bytes
, s
->write_userdata
);
381 static void create_playback_callback(struct pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct pa_tagstruct
*t
, void *userdata
) {
382 struct pa_stream
*s
= userdata
;
383 assert(pd
&& s
&& s
->state
== STREAM_CREATING
);
385 if (command
!= PA_COMMAND_REPLY
) {
386 if (handle_error(s
->context
, command
, t
) < 0) {
387 context_dead(s
->context
);
392 if (s
->create_complete_callback
)
393 s
->create_complete_callback(s
, 0, s
->create_complete_userdata
);
398 if (pa_tagstruct_getu32(t
, &s
->channel
) < 0 ||
399 pa_tagstruct_getu32(t
, &s
->device_index
) < 0 ||
400 !pa_tagstruct_eof(t
)) {
401 s
->context
->error
= PA_ERROR_PROTOCOL
;
402 context_dead(s
->context
);
406 s
->channel_valid
= 1;
407 pa_dynarray_put(s
->context
->streams
, s
->channel
, s
);
409 s
->state
= STREAM_READY
;
410 if (s
->create_complete_callback
)
411 s
->create_complete_callback(s
, 1, s
->create_complete_userdata
);
414 static void create_stream(struct pa_stream
*s
, uint32_t tdev_index
) {
415 struct pa_tagstruct
*t
;
419 s
->state
= STREAM_CREATING
;
421 t
= pa_tagstruct_new(NULL
, 0);
424 pa_tagstruct_putu32(t
, s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_CREATE_PLAYBACK_STREAM
: PA_COMMAND_CREATE_RECORD_STREAM
);
425 pa_tagstruct_putu32(t
, tag
= s
->context
->ctag
++);
426 pa_tagstruct_puts(t
, s
->name
);
427 pa_tagstruct_put_sample_spec(t
, &s
->sample_spec
);
428 pa_tagstruct_putu32(t
, tdev_index
);
429 pa_tagstruct_putu32(t
, s
->buffer_attr
.maxlength
);
430 pa_tagstruct_putu32(t
, s
->buffer_attr
.tlength
);
431 pa_tagstruct_putu32(t
, s
->buffer_attr
.prebuf
);
432 pa_tagstruct_putu32(t
, s
->buffer_attr
.minreq
);
434 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
435 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_playback_callback
, s
);
438 static void lookup_device_callback(struct pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct pa_tagstruct
*t
, void *userdata
) {
439 struct pa_stream
*s
= userdata
;
441 assert(pd
&& s
&& s
->state
== STREAM_LOOKING_UP
);
443 if (command
!= PA_COMMAND_REPLY
) {
444 if (handle_error(s
->context
, command
, t
) < 0) {
445 context_dead(s
->context
);
450 if (s
->create_complete_callback
)
451 s
->create_complete_callback(s
, 0, s
->create_complete_userdata
);
455 if (pa_tagstruct_getu32(t
, &tdev
) < 0 ||
456 !pa_tagstruct_eof(t
)) {
457 s
->context
->error
= PA_ERROR_PROTOCOL
;
458 context_dead(s
->context
);
462 create_stream(s
, tdev
);
465 static void lookup_device(struct pa_stream
*s
, const char *tdev
) {
466 struct pa_tagstruct
*t
;
470 s
->state
= STREAM_LOOKING_UP
;
472 t
= pa_tagstruct_new(NULL
, 0);
475 pa_tagstruct_putu32(t
, s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_LOOKUP_SINK
: PA_COMMAND_LOOKUP_SOURCE
);
476 pa_tagstruct_putu32(t
, tag
= s
->context
->ctag
++);
477 pa_tagstruct_puts(t
, tdev
);
479 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
480 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, lookup_device_callback
, s
);
483 struct pa_stream
* pa_stream_new(
484 struct pa_context
*c
,
485 enum pa_stream_direction dir
,
488 const struct pa_sample_spec
*ss
,
489 const struct pa_buffer_attr
*attr
,
490 void (*complete
) (struct pa_stream
*s
, int success
, void *userdata
),
495 assert(c
&& name
&& ss
&& c
->state
== CONTEXT_READY
);
497 s
= malloc(sizeof(struct pa_stream
));
501 s
->read_callback
= NULL
;
502 s
->read_userdata
= NULL
;
503 s
->write_callback
= NULL
;
504 s
->write_userdata
= NULL
;
505 s
->die_callback
= NULL
;
506 s
->die_userdata
= NULL
;
507 s
->create_complete_callback
= complete
;
508 s
->create_complete_userdata
= NULL
;
510 s
->name
= strdup(name
);
511 s
->state
= STREAM_CREATING
;
512 s
->requested_bytes
= 0;
514 s
->channel_valid
= 0;
515 s
->device_index
= (uint32_t) -1;
517 s
->sample_spec
= *ss
;
519 s
->buffer_attr
= *attr
;
521 s
->buffer_attr
.maxlength
= DEFAULT_MAXLENGTH
;
522 s
->buffer_attr
.tlength
= DEFAULT_TLENGTH
;
523 s
->buffer_attr
.prebuf
= DEFAULT_PREBUF
;
524 s
->buffer_attr
.minreq
= DEFAULT_MINREQ
;
527 s
->next
= c
->first_stream
;
529 s
->next
->previous
= s
;
534 lookup_device(s
, dev
);
536 create_stream(s
, (uint32_t) -1);
541 void pa_stream_free(struct pa_stream
*s
) {
542 assert(s
&& s
->context
);
544 if (s
->context
->pdispatch
)
545 pa_pdispatch_unregister_reply(s
->context
->pdispatch
, s
);
549 if (s
->channel_valid
&& s
->context
->state
== CONTEXT_READY
) {
550 struct pa_tagstruct
*t
= pa_tagstruct_new(NULL
, 0);
553 pa_tagstruct_putu32(t
, PA_COMMAND_DELETE_PLAYBACK_STREAM
);
554 pa_tagstruct_putu32(t
, s
->context
->ctag
++);
555 pa_tagstruct_putu32(t
, s
->channel
);
556 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
559 if (s
->channel_valid
)
560 pa_dynarray_put(s
->context
->streams
, s
->channel
, NULL
);
563 s
->next
->previous
= s
->previous
;
565 s
->previous
->next
= s
->next
;
567 s
->context
->first_stream
= s
->next
;
572 void pa_stream_set_write_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*p
, size_t length
, void *userdata
), void *userdata
) {
574 s
->write_callback
= cb
;
575 s
->write_userdata
= userdata
;
578 void pa_stream_write(struct pa_stream
*s
, const void *data
, size_t length
) {
579 struct pa_memchunk chunk
;
580 assert(s
&& s
->context
&& data
&& length
&& s
->state
== STREAM_READY
);
582 chunk
.memblock
= pa_memblock_new(length
);
583 assert(chunk
.memblock
&& chunk
.memblock
->data
);
584 memcpy(chunk
.memblock
->data
, data
, length
);
586 chunk
.length
= length
;
588 pa_pstream_send_memblock(s
->context
->pstream
, s
->channel
, 0, &chunk
);
589 pa_memblock_unref(chunk
.memblock
);
591 /*fprintf(stderr, "Sent %u bytes\n", length);*/
593 if (length
< s
->requested_bytes
)
594 s
->requested_bytes
-= length
;
596 s
->requested_bytes
= 0;
599 size_t pa_stream_writable_size(struct pa_stream
*s
) {
600 assert(s
&& s
->state
== STREAM_READY
);
601 return s
->requested_bytes
;
604 void pa_stream_set_read_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*p
, const void*data
, size_t length
, void *userdata
), void *userdata
) {
606 s
->read_callback
= cb
;
607 s
->read_userdata
= userdata
;
610 int pa_stream_is_dead(struct pa_stream
*s
) {
611 return s
->state
== STREAM_DEAD
;
614 int pa_stream_is_ready(struct pa_stream
*s
) {
615 return s
->state
== STREAM_READY
;
618 void pa_stream_set_die_callback(struct pa_stream
*s
, void (*cb
)(struct pa_stream
*s
, void *userdata
), void *userdata
) {
620 s
->die_callback
= cb
;
621 s
->die_userdata
= userdata
;
624 int pa_context_is_pending(struct pa_context
*c
) {
627 if (c
->state
!= CONTEXT_READY
)
630 return pa_pstream_is_pending(c
->pstream
) || pa_pdispatch_is_pending(c
->pdispatch
);
633 struct pa_context
* pa_stream_get_context(struct pa_stream
*p
) {
638 static void set_dispatch_callbacks(struct pa_context
*c
);
640 static void pdispatch_drain_callback(struct pa_pdispatch
*pd
, void *userdata
) {
641 set_dispatch_callbacks(userdata
);
644 static void pstream_drain_callback(struct pa_pstream
*s
, void *userdata
) {
645 set_dispatch_callbacks(userdata
);
648 static void set_dispatch_callbacks(struct pa_context
*c
) {
649 assert(c
&& c
->state
== CONTEXT_READY
);
651 pa_pstream_set_drain_callback(c
->pstream
, NULL
, NULL
);
652 pa_pdispatch_set_drain_callback(c
->pdispatch
, NULL
, NULL
);
654 if (pa_pdispatch_is_pending(c
->pdispatch
)) {
655 pa_pdispatch_set_drain_callback(c
->pdispatch
, pdispatch_drain_callback
, c
);
659 if (pa_pstream_is_pending(c
->pstream
)) {
660 pa_pstream_set_drain_callback(c
->pstream
, pstream_drain_callback
, c
);
664 assert(c
->drain_complete_callback
);
665 c
->drain_complete_callback(c
, c
->drain_complete_userdata
);
668 int pa_context_drain(
669 struct pa_context
*c
,
670 void (*complete
) (struct pa_context
*c
, void *userdata
),
673 assert(c
&& c
->state
== CONTEXT_READY
);
675 if (complete
== NULL
) {
676 c
->drain_complete_callback
= NULL
;
677 pa_pstream_set_drain_callback(c
->pstream
, NULL
, NULL
);
678 pa_pdispatch_set_drain_callback(c
->pdispatch
, NULL
, NULL
);
682 if (!pa_context_is_pending(c
))
685 c
->drain_complete_callback
= complete
;
686 c
->drain_complete_userdata
= userdata
;
688 set_dispatch_callbacks(c
);
693 static void stream_drain_callback(struct pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, struct pa_tagstruct
*t
, void *userdata
) {
694 struct pa_stream
*s
= userdata
;
697 if (command
!= PA_COMMAND_REPLY
) {
698 if (handle_error(s
->context
, command
, t
) < 0) {
699 context_dead(s
->context
);
707 if (s
->state
!= STREAM_READY
)
710 if (!pa_tagstruct_eof(t
)) {
711 s
->context
->error
= PA_ERROR_PROTOCOL
;
712 context_dead(s
->context
);
716 if (s
->drain_complete_callback
) {
717 void (*temp
) (struct pa_stream
*s
, void *userdata
) = s
->drain_complete_callback
;
718 s
->drain_complete_callback
= NULL
;
719 temp(s
, s
->drain_complete_userdata
);
724 void pa_stream_drain(struct pa_stream
*s
, void (*complete
) (struct pa_stream
*s
, void *userdata
), void *userdata
) {
725 struct pa_tagstruct
*t
;
727 assert(s
&& s
->state
== STREAM_READY
);
730 s
->drain_complete_callback
= NULL
;
734 s
->drain_complete_callback
= complete
;
735 s
->drain_complete_userdata
= userdata
;
737 t
= pa_tagstruct_new(NULL
, 0);
740 pa_tagstruct_putu32(t
, PA_COMMAND_DRAIN_PLAYBACK_STREAM
);
741 pa_tagstruct_putu32(t
, tag
= s
->context
->ctag
++);
742 pa_tagstruct_putu32(t
, s
->channel
);
743 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
744 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_drain_callback
, s
);