3 #include <netinet/in.h>
8 enum pstream_descriptor_index
{
9 PSTREAM_DESCRIPTOR_LENGTH
,
10 PSTREAM_DESCRIPTOR_CHANNEL
,
11 PSTREAM_DESCRIPTOR_DELTA
,
12 PSTREAM_DESCRIPTOR_MAX
15 typedef uint32_t pstream_descriptor
[PSTREAM_DESCRIPTOR_MAX
];
17 #define PSTREAM_DESCRIPTOR_SIZE (PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
18 #define FRAME_SIZE_MAX (1024*64)
21 enum { PSTREAM_ITEM_PACKET
, PSTREAM_ITEM_MEMBLOCK
} type
;
24 struct memchunk chunk
;
29 struct packet
*packet
;
33 struct mainloop
*mainloop
;
34 struct mainloop_source
*mainloop_source
;
36 struct queue
*send_queue
;
39 void (*die_callback
) (struct pstream
*p
, void *userdad
);
40 void *die_callback_userdata
;
43 struct item_info
* current
;
44 pstream_descriptor descriptor
;
49 void (*send_callback
) (struct pstream
*p
, void *userdata
);
50 void *send_callback_userdata
;
53 struct memblock
*memblock
;
54 struct packet
*packet
;
55 pstream_descriptor descriptor
;
60 int (*recieve_packet_callback
) (struct pstream
*p
, struct packet
*packet
, void *userdata
);
61 void *recieve_packet_callback_userdata
;
63 int (*recieve_memblock_callback
) (struct pstream
*p
, uint32_t channel
, int32_t delta
, struct memchunk
*chunk
, void *userdata
);
64 void *recieve_memblock_callback_userdata
;
67 static void do_write(struct pstream
*p
);
68 static void do_read(struct pstream
*p
);
70 static void io_callback(struct iochannel
*io
, void *userdata
) {
71 struct pstream
*p
= userdata
;
72 assert(p
&& p
->io
== io
);
77 static void prepare_callback(struct mainloop_source
*s
, void*userdata
) {
78 struct pstream
*p
= userdata
;
79 assert(p
&& p
->mainloop_source
== s
);
84 struct pstream
*pstream_new(struct mainloop
*m
, struct iochannel
*io
) {
88 p
= malloc(sizeof(struct pstream
));
92 iochannel_set_callback(io
, io_callback
, p
);
95 p
->die_callback
= NULL
;
96 p
->die_callback_userdata
= NULL
;
99 p
->mainloop_source
= mainloop_source_new_fixed(m
, prepare_callback
, p
);
100 mainloop_source_enable(p
->mainloop_source
, 0);
102 p
->send_queue
= queue_new();
103 assert(p
->send_queue
);
105 p
->write
.current
= NULL
;
108 p
->read
.memblock
= NULL
;
109 p
->read
.packet
= NULL
;
112 p
->send_callback
= NULL
;
113 p
->send_callback_userdata
= NULL
;
115 p
->recieve_packet_callback
= NULL
;
116 p
->recieve_packet_callback_userdata
= NULL
;
118 p
->recieve_memblock_callback
= NULL
;
119 p
->recieve_memblock_callback_userdata
= NULL
;
124 static void item_free(void *item
, void *p
) {
125 struct item_info
*i
= item
;
128 if (i
->type
== PSTREAM_ITEM_PACKET
) {
129 assert(i
->chunk
.memblock
);
130 memblock_unref(i
->chunk
.memblock
);
132 assert(i
->type
== PSTREAM_ITEM_MEMBLOCK
);
134 packet_unref(i
->packet
);
140 void pstream_free(struct pstream
*p
) {
143 iochannel_free(p
->io
);
144 queue_free(p
->send_queue
, item_free
, NULL
);
146 if (p
->write
.current
)
147 item_free(p
->write
.current
, NULL
);
149 if (p
->read
.memblock
)
150 memblock_unref(p
->read
.memblock
);
153 packet_unref(p
->read
.packet
);
155 mainloop_source_free(p
->mainloop_source
);
159 void pstream_set_send_callback(struct pstream
*p
, void (*callback
) (struct pstream
*p
, void *userdata
), void *userdata
) {
160 assert(p
&& callback
);
162 p
->send_callback
= callback
;
163 p
->send_callback_userdata
= userdata
;
166 void pstream_send_packet(struct pstream
*p
, struct packet
*packet
) {
170 i
= malloc(sizeof(struct item_info
));
172 i
->type
= PSTREAM_ITEM_PACKET
;
173 i
->packet
= packet_ref(packet
);
175 queue_push(p
->send_queue
, i
);
176 mainloop_source_enable(p
->mainloop_source
, 1);
179 void pstream_send_memblock(struct pstream
*p
, uint32_t channel
, int32_t delta
, struct memchunk
*chunk
) {
181 assert(p
&& channel
&& chunk
);
183 i
= malloc(sizeof(struct item_info
));
185 i
->type
= PSTREAM_ITEM_MEMBLOCK
;
187 i
->channel
= channel
;
190 memblock_ref(i
->chunk
.memblock
);
192 queue_push(p
->send_queue
, i
);
193 mainloop_source_enable(p
->mainloop_source
, 1);
196 void pstream_set_recieve_packet_callback(struct pstream
*p
, int (*callback
) (struct pstream
*p
, struct packet
*packet
, void *userdata
), void *userdata
) {
197 assert(p
&& callback
);
199 p
->recieve_packet_callback
= callback
;
200 p
->recieve_packet_callback_userdata
= userdata
;
203 void pstream_set_recieve_memblock_callback(struct pstream
*p
, int (*callback
) (struct pstream
*p
, uint32_t channel
, int32_t delta
, struct memchunk
*chunk
, void *userdata
), void *userdata
) {
204 assert(p
&& callback
);
206 p
->recieve_memblock_callback
= callback
;
207 p
->recieve_memblock_callback_userdata
= userdata
;
210 static void prepare_next_write_item(struct pstream
*p
) {
213 if (!(p
->write
.current
= queue_pop(p
->send_queue
)))
218 if (p
->write
.current
->type
== PSTREAM_ITEM_PACKET
) {
219 assert(p
->write
.current
->packet
);
220 p
->write
.data
= p
->write
.current
->packet
->data
;
221 p
->write
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->packet
->length
);
222 p
->write
.descriptor
[PSTREAM_DESCRIPTOR_CHANNEL
] = 0;
223 p
->write
.descriptor
[PSTREAM_DESCRIPTOR_DELTA
] = 0;
225 assert(p
->write
.current
->type
== PSTREAM_ITEM_MEMBLOCK
&& p
->write
.current
->chunk
.memblock
);
226 p
->write
.data
= p
->write
.current
->chunk
.memblock
->data
+ p
->write
.current
->chunk
.index
;
227 p
->write
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
] = htonl(p
->write
.current
->chunk
.length
);
228 p
->write
.descriptor
[PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
229 p
->write
.descriptor
[PSTREAM_DESCRIPTOR_DELTA
] = htonl(p
->write
.current
->delta
);
233 static void do_write(struct pstream
*p
) {
239 mainloop_source_enable(p
->mainloop_source
, 0);
241 if (p
->dead
|| !iochannel_is_writable(p
->io
))
244 if (!p
->write
.current
)
245 prepare_next_write_item(p
);
247 if (!p
->write
.current
)
250 assert(p
->write
.data
);
252 if (p
->write
.index
< PSTREAM_DESCRIPTOR_SIZE
) {
253 d
= (void*) p
->write
.descriptor
+ p
->write
.index
;
254 l
= PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
256 d
= (void*) p
->write
.data
+ p
->write
.index
- PSTREAM_DESCRIPTOR_SIZE
;
257 l
= ntohl(p
->write
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
]) - p
->write
.index
- PSTREAM_DESCRIPTOR_SIZE
;
260 if ((r
= iochannel_write(p
->io
, d
, l
)) < 0)
265 if (p
->write
.index
>= PSTREAM_DESCRIPTOR_SIZE
+ntohl(p
->write
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
])) {
266 assert(p
->write
.current
);
267 item_free(p
->write
.current
, (void *) 1);
268 p
->write
.current
= NULL
;
270 if (p
->send_callback
&& queue_is_empty(p
->send_queue
))
271 p
->send_callback(p
, p
->send_callback_userdata
);
279 p
->die_callback(p
, p
->die_callback_userdata
);
282 static void do_read(struct pstream
*p
) {
288 mainloop_source_enable(p
->mainloop_source
, 0);
290 if (p
->dead
|| !iochannel_is_readable(p
->io
))
293 if (p
->read
.index
< PSTREAM_DESCRIPTOR_SIZE
) {
294 d
= (void*) p
->read
.descriptor
+ p
->read
.index
;
295 l
= PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
297 assert(p
->read
.data
);
298 d
= (void*) p
->read
.data
+ p
->read
.index
- PSTREAM_DESCRIPTOR_SIZE
;
299 l
= ntohl(p
->read
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
]) - p
->read
.index
- PSTREAM_DESCRIPTOR_SIZE
;
302 if ((r
= iochannel_read(p
->io
, d
, l
)) <= 0)
307 if (p
->read
.index
== PSTREAM_DESCRIPTOR_SIZE
) {
308 /* Reading of frame descriptor complete */
310 /* Frame size too large */
311 if (ntohl(p
->read
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
]) > FRAME_SIZE_MAX
)
314 assert(!p
->read
.packet
&& !p
->read
.memblock
);
316 if (ntohl(p
->read
.descriptor
[PSTREAM_DESCRIPTOR_CHANNEL
]) == 0) {
317 /* Frame is a packet frame */
318 p
->read
.packet
= packet_new(ntohl(p
->read
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
]));
319 assert(p
->read
.packet
);
320 p
->read
.data
= p
->read
.packet
->data
;
322 /* Frame is a memblock frame */
323 p
->read
.memblock
= memblock_new(ntohl(p
->read
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
]));
324 assert(p
->read
.memblock
);
325 p
->read
.data
= p
->read
.memblock
->data
;
328 } else if (p
->read
.index
> PSTREAM_DESCRIPTOR_SIZE
) {
329 /* Frame payload available */
331 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) { /* Is this memblock data? Than pass it to the user */
334 l
= p
->read
.index
- r
< PSTREAM_DESCRIPTOR_SIZE
? p
->read
.index
- PSTREAM_DESCRIPTOR_SIZE
: r
;
337 struct memchunk chunk
;
339 chunk
.memblock
= p
->read
.memblock
;
340 chunk
.index
= p
->read
.index
- PSTREAM_DESCRIPTOR_SIZE
- l
;
343 if (p
->recieve_memblock_callback(p
,
344 ntohl(p
->read
.descriptor
[PSTREAM_DESCRIPTOR_CHANNEL
]),
345 (int32_t) ntohl(p
->read
.descriptor
[PSTREAM_DESCRIPTOR_DELTA
]),
347 p
->recieve_memblock_callback_userdata
) < 0)
353 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PSTREAM_DESCRIPTOR_LENGTH
]) + PSTREAM_DESCRIPTOR_SIZE
) {
354 if (p
->read
.memblock
) {
355 assert(!p
->read
.packet
);
357 memblock_unref(p
->read
.memblock
);
358 p
->read
.memblock
= NULL
;
361 assert(p
->read
.packet
);
363 if (p
->recieve_packet_callback
)
364 r
= p
->recieve_packet_callback(p
, p
->read
.packet
, p
->recieve_packet_callback_userdata
);
366 packet_unref(p
->read
.packet
);
367 p
->read
.packet
= NULL
;
382 p
->die_callback(p
, p
->die_callback_userdata
);
386 void pstream_set_die_callback(struct pstream
*p
, void (*callback
)(struct pstream
*p
, void *userdata
), void *userdata
) {
387 assert(p
&& callback
);
388 p
->die_callback
= callback
;
389 p
->die_callback_userdata
= userdata
;