From 27bc5e9ae5e7beab1a4ff3a0b353f93edca60d52 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 12 Nov 2021 12:33:51 +0100 Subject: [PATCH] raop: more improvements Create sync and timing packets, reply to timing requests. Send the right amount of data. Parse protocol/encryption/codec parameters and use them. --- src/modules/module-raop-sink.c | 478 +++++++++++++++++++++++++-------- 1 file changed, 373 insertions(+), 105 deletions(-) diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index f414994fe..128b6eec9 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -76,10 +76,10 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define AES_CHUNK_SIZE 16 -#define MAX_PORT_RETRY 16 +#define MAX_PORT_RETRY 128 #define DEFAULT_FORMAT "S16" -#define DEFAULT_RATE 48000 +#define DEFAULT_RATE 44100 #define DEFAULT_CHANNELS "2" #define DEFAULT_POSITION "[ FL FR ]" @@ -100,6 +100,21 @@ static const struct spa_dict_item module_props[] = { { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; +enum { + PROTO_TCP, + PROTO_UDP, +}; +enum { + CRYPTO_NONE, + CRYPTO_RSA, +}; +enum { + CODEC_PCM, + CODEC_ALAC, + CODEC_AAC, + CODEC_AAC_ELD, +}; + struct impl { struct pw_context *context; @@ -112,6 +127,8 @@ struct impl { struct spa_hook module_listener; int protocol; + int encryption; + int codec; struct pw_core *core; struct spa_hook core_proxy_listener; @@ -140,12 +157,23 @@ struct impl { int control_fd; uint16_t timing_port; int timing_fd; + struct spa_source *timing_source; uint16_t server_port; int server_fd; + uint32_t block_size; + uint32_t delay; + uint16_t seq; uint32_t rtptime; uint32_t ssrc; + uint32_t sync; + uint32_t sync_period; + unsigned int first:1; + unsigned int recording:1; + + uint8_t buffer[FRAMES_PER_TCP_PACKET * 4]; + uint32_t filled; }; static void do_unload_module(void *obj, void *data, int res, uint32_t id) @@ -189,10 +217,8 @@ static void stream_state_changed(void *d, enum pw_stream_state old, static inline void bit_writer(uint8_t **p, int *pos, uint8_t data, int len) { int lb, rb; - lb = 7 - *pos; rb = lb - len + 1; - if (rb >= 0) { **p = (*pos ? **p : 0) | (data << rb); *pos += len; @@ -223,30 +249,71 @@ static int aes_encrypt(struct impl *impl, uint8_t *data, int len) return i; } - -static int add_to_packet(struct impl *impl, uint8_t *data, size_t size) +static inline uint64_t timespec_to_ntp(struct timespec *ts) { - const size_t max = 12 + 8 + (FRAMES_PER_UDP_PACKET * 4); - uint8_t *pkt, *bp; - int bpos = 0, res; + uint64_t ntp = (uint64_t) ts->tv_nsec * UINT32_MAX / SPA_NSEC_PER_SEC; + return ntp | (uint64_t) (ts->tv_sec + 0x83aa7e80) << 32; +} - pkt = alloca(max); - pkt[0] = 0x80; /* RTP v2: 0x80 */ - pkt[1] = 0x60; /* Payload type: 0x60 */ - pkt[2] = impl->seq >> 8; /* Sequence number: 0x0000 */ - pkt[3] = impl->seq; - pkt[4] = impl->rtptime >> 24; /* Timestamp */ - pkt[5] = impl->rtptime >> 16; - pkt[6] = impl->rtptime >> 8; - pkt[7] = impl->rtptime; - pkt[8] = impl->ssrc >> 24; - pkt[9] = impl->ssrc >> 16; - pkt[10] = impl->ssrc >> 8; - pkt[11] = impl->ssrc; +static inline uint64_t ntp_now(int clockid) +{ + struct timespec now; + clock_gettime(clockid, &now); + return timespec_to_ntp(&now); +} - bp = &pkt[12]; +static int send_udp_sync_packet(struct impl *impl) +{ + uint32_t pkt[5]; + uint32_t rtptime = impl->rtptime; + uint32_t delay = impl->delay; + uint64_t transmitted; - size = SPA_MIN(size, FRAMES_PER_UDP_PACKET * 4u); + pkt[0] = htonl(0x80d40007); + if (impl->first) + pkt[0] |= htonl(0x10000000); + rtptime -= delay; + pkt[1] = htonl(rtptime); + transmitted = ntp_now(CLOCK_MONOTONIC); + pkt[2] = htonl(transmitted >> 32); + pkt[3] = htonl(transmitted & 0xffffffff); + rtptime += delay; + pkt[4] = htonl(rtptime); + + pw_log_info("sync: delayed:%u now:%"PRIu64" rtptime:%u", + rtptime - delay, transmitted, rtptime); + + return write(impl->control_fd, pkt, sizeof(pkt)); +} + +static int send_udp_timing_packet(struct impl *impl, uint64_t remote, uint64_t received) +{ + uint32_t pkt[8]; + uint64_t transmitted; + + pkt[0] = htonl(0x80d30007); + pkt[1] = 0x00000000; + pkt[2] = htonl(remote >> 32); + pkt[3] = htonl(remote & 0xffffffff); + pkt[4] = htonl(received >> 32); + pkt[5] = htonl(received & 0xffffffff); + transmitted = ntp_now(CLOCK_MONOTONIC); + pkt[6] = htonl(transmitted >> 32); + pkt[7] = htonl(transmitted & 0xffffffff); + + pw_log_info("sync: remote:%"PRIu64" received:%"PRIu64" transmitted:%"PRIu64, + remote, received, transmitted); + + return write(impl->timing_fd, pkt, sizeof(pkt)); +} + +static int write_codec_pcm(void *dst, void *frames, uint32_t n_frames) +{ + uint8_t *bp, *b, *d = frames; + int bpos = 0; + uint32_t i; + + b = bp = dst; bit_writer(&bp, &bpos, 1, 3); /* channel=1, stereo */ bit_writer(&bp, &bpos, 0, 4); /* Unknown */ @@ -255,32 +322,109 @@ static int add_to_packet(struct impl *impl, uint8_t *data, size_t size) bit_writer(&bp, &bpos, 1, 1); /* Hassize */ bit_writer(&bp, &bpos, 0, 2); /* Unused */ bit_writer(&bp, &bpos, 1, 1); /* Is-not-compressed */ - /* Size of data, integer, big endian. */ - bit_writer(&bp, &bpos, (size >> 24) & 0xff, 8); - bit_writer(&bp, &bpos, (size >> 16) & 0xff, 8); - bit_writer(&bp, &bpos, (size >> 8) & 0xff, 8); - bit_writer(&bp, &bpos, (size) & 0xff, 8); + bit_writer(&bp, &bpos, (n_frames >> 24) & 0xff, 8); + bit_writer(&bp, &bpos, (n_frames >> 16) & 0xff, 8); + bit_writer(&bp, &bpos, (n_frames >> 8) & 0xff, 8); + bit_writer(&bp, &bpos, (n_frames) & 0xff, 8); - impl->rtptime += size / 4; - - while (size > 4) { - /* Byte swap stereo data. */ - bit_writer(&bp, &bpos, *(data + 1), 8); - bit_writer(&bp, &bpos, *(data + 0), 8); - bit_writer(&bp, &bpos, *(data + 3), 8); - bit_writer(&bp, &bpos, *(data + 2), 8); - data += 4; - size -= 4; + for (i = 0; i < n_frames; i++) { + bit_writer(&bp, &bpos, *(d + 1), 8); + bit_writer(&bp, &bpos, *(d + 0), 8); + bit_writer(&bp, &bpos, *(d + 3), 8); + bit_writer(&bp, &bpos, *(d + 2), 8); + d += 4; } + return bp - b + 1; +} +static int flush_to_udp_packet(struct impl *impl) +{ + const size_t max = 12 + 8 + impl->block_size; + uint32_t pkt[max], len, n_frames; + uint8_t *dst; + int res; + + if (!impl->recording) + return 0; + + impl->sync++; + if (impl->first || impl->sync == impl->sync_period) { + impl->sync = 0; + send_udp_sync_packet(impl); + } + pkt[0] = htonl(0x80600000); + if (impl->first) + pkt[0] |= htonl((uint32_t)0x80 << 16); + pkt[0] |= htonl((uint32_t)impl->seq); + pkt[1] = htonl(impl->rtptime); + pkt[2] = htonl(impl->ssrc); + + n_frames = impl->filled / impl->frame_size; + dst = (uint8_t*)&pkt[3]; + + switch (impl->codec) { + case CODEC_PCM: + len = write_codec_pcm(dst, impl->buffer, n_frames); + break; + default: + len = 8 + impl->block_size; + memset(dst, 0, len); + break; + } + aes_encrypt(impl, dst, len); + + impl->rtptime += n_frames; impl->seq = (impl->seq + 1) & 0xffff; - aes_encrypt(impl, pkt + 12, max - 12); + pw_log_debug("send %u", len + 12); + res = write(impl->server_fd, pkt, len + 12); - pw_log_info("send %zu", max); - res = write(impl->server_fd, pkt, max); + impl->first = false; - return 0; + return res; +} + +static int flush_to_tcp_packet(struct impl *impl) +{ + const size_t max = 16 + 8 + impl->block_size; + uint32_t pkt[max], len, n_frames; + uint8_t *dst; + int res; + + if (!impl->recording) + return 0; + + pkt[0] = htonl(0x24000000); + pkt[1] = htonl(0x80e00000); + pkt[1] |= htonl((uint32_t)impl->seq); + pkt[2] = htonl(impl->rtptime); + pkt[3] = htonl(impl->ssrc); + + n_frames = impl->filled / impl->frame_size; + dst = (uint8_t*)&pkt[4]; + + switch (impl->codec) { + case CODEC_PCM: + len = write_codec_pcm(dst, impl->buffer, n_frames); + break; + default: + len = 8 + impl->block_size; + memset(dst, 0, len); + break; + } + aes_encrypt(impl, dst, len); + + pkt[0] |= htonl((uint32_t) len + 12); + + impl->rtptime += n_frames; + impl->seq = (impl->seq + 1) & 0xffff; + + pw_log_debug("send %u", len + 16); + res = write(impl->server_fd, pkt, len + 16); + + impl->first = false; + + return res; } static void playback_stream_process(void *d) @@ -288,21 +432,43 @@ static void playback_stream_process(void *d) struct impl *impl = d; struct pw_buffer *buf; struct spa_data *bd; - void *data; + uint8_t *data; uint32_t size; - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_debug("out of buffers: %m"); return; } bd = &buf->buffer->datas[0]; - data = SPA_PTROFF(bd->data, bd->chunk->offset, void); + data = SPA_PTROFF(bd->data, bd->chunk->offset, uint8_t); size = bd->chunk->size; - if (impl->server_fd > 0) - add_to_packet(impl, data, size); + while (size > 0) { + uint32_t avail, to_fill; + + avail = impl->block_size - impl->filled; + to_fill = SPA_MIN(avail, size); + + memcpy(&impl->buffer[impl->filled], data, to_fill); + + impl->filled += to_fill; + avail -= to_fill; + size -= to_fill; + data += to_fill; + + if (avail == 0) { + switch (impl->protocol) { + case PROTO_UDP: + flush_to_udp_packet(impl); + break; + case PROTO_TCP: + flush_to_tcp_packet(impl); + break; + } + impl->filled = 0; + } + } pw_stream_queue_buffer(impl->stream, buf); } @@ -427,11 +593,20 @@ error: static void rtsp_record_reply(void *data, int status, const struct spa_dict *headers) { + struct impl *impl = data; + pw_log_info("reply %d", status); + + impl->first = true; + impl->sync = 0; + impl->sync_period = impl->info.rate / (impl->block_size / impl->frame_size); + impl->recording = true; } -static void rtsp_do_record(struct impl *impl) +static int rtsp_do_record(struct impl *impl) { + int res; + pw_getrandom(&impl->seq, sizeof(impl->seq), 0); pw_getrandom(&impl->rtptime, sizeof(impl->rtptime), 0); @@ -439,11 +614,38 @@ static void rtsp_do_record(struct impl *impl) pw_properties_setf(impl->headers, "RTP-Info", "seq=%u;rtptime=%u", impl->seq, impl->rtptime); - pw_rtsp_client_send(impl->rtsp, "RECORD", &impl->headers->dict, + res = pw_rtsp_client_send(impl->rtsp, "RECORD", &impl->headers->dict, NULL, NULL, rtsp_record_reply, impl); pw_properties_set(impl->headers, "Range", NULL); pw_properties_set(impl->headers, "RTP-Info", NULL); + + return res; +} + +static void +on_timing_source_io(void *data, int fd, uint32_t mask) +{ + struct impl *impl = data; + uint32_t packet[8]; + ssize_t bytes; + + if (mask & SPA_IO_IN) { + uint64_t remote, received; + + received = ntp_now(CLOCK_MONOTONIC); + bytes = read(impl->timing_fd, packet, sizeof(packet)); + if (bytes != sizeof(packet)) { + pw_log_warn("discarding short (%zd < %zd) timing packet", + bytes, sizeof(bytes)); + return; + } + if (packet[0] != ntohl(0x80d20007)) + return; + + remote = ((uint64_t)ntohl(packet[6])) << 32 | ntohl(packet[7]); + send_udp_timing_packet(impl, remote, received); + } } static void rtsp_setup_reply(void *data, int status, const struct spa_dict *headers) @@ -451,6 +653,7 @@ static void rtsp_setup_reply(void *data, int status, const struct spa_dict *head struct impl *impl = data; const char *str, *state = NULL, *s; size_t len; + uint64_t ntp; uint16_t control_port, timing_port; pw_log_info("reply %d", status); @@ -478,29 +681,53 @@ static void rtsp_setup_reply(void *data, int status, const struct spa_dict *head timing_port = atoi(s + 12); } } - if (impl->server_port == 0 || control_port == 0 || timing_port == 0) { - pw_log_error("missing ports in Transport"); + if (impl->server_port == 0) { + pw_log_error("missing server port in Transport"); return; } - pw_log_info("server port:%u control:%u timing:%u", - impl->server_port, control_port, timing_port); + pw_log_info("server port:%u", impl->server_port); - if ((impl->server_fd = connect_udp_socket(impl, -1, impl->server_port)) <= 0) - return; - if ((impl->control_fd = connect_udp_socket(impl, impl->control_fd, control_port)) <= 0) - return; - if ((impl->timing_fd = connect_udp_socket(impl, impl->timing_fd, timing_port)) <= 0) - return; + switch (impl->protocol) { + case PROTO_TCP: + break; + case PROTO_UDP: + if (control_port == 0 || timing_port == 0) { + pw_log_error("missing UDP ports in Transport"); + return; + } + pw_log_info("control:%u timing:%u", control_port, timing_port); + + if ((impl->server_fd = connect_udp_socket(impl, -1, impl->server_port)) <= 0) + return; + if ((impl->control_fd = connect_udp_socket(impl, impl->control_fd, control_port)) <= 0) + return; + if ((impl->timing_fd = connect_udp_socket(impl, impl->timing_fd, timing_port)) <= 0) + return; + + ntp = ntp_now(CLOCK_MONOTONIC); + send_udp_timing_packet(impl, ntp, ntp); + + impl->timing_source = pw_loop_add_io(impl->loop, impl->timing_fd, + SPA_IO_IN, false, on_timing_source_io, impl); + break; + default: + return; + } rtsp_do_record(impl); } -static void rtsp_do_setup(struct impl *impl) +static int rtsp_do_setup(struct impl *impl) { - if (impl->protocol == 1) { + int res; + + switch (impl->protocol) { + case PROTO_TCP: pw_properties_set(impl->headers, "Transport", "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record"); - } else { + break; + + case PROTO_UDP: impl->control_port = DEFAULT_UDP_CONTROL_PORT; impl->timing_port = DEFAULT_UDP_TIMING_PORT; @@ -513,15 +740,18 @@ static void rtsp_do_setup(struct impl *impl) "RTP/AVP/UDP;unicast;interleaved=0-1;mode=record;" "control_port=%u;timing_port=%u", impl->control_port, impl->timing_port); + break; + + default: + return -ENOTSUP; } - pw_rtsp_client_send(impl->rtsp, "SETUP", &impl->headers->dict, + res = pw_rtsp_client_send(impl->rtsp, "SETUP", &impl->headers->dict, NULL, NULL, rtsp_setup_reply, impl); pw_properties_set(impl->headers, "Transport", NULL); - return; - + return res; error: if (impl->control_fd > 0) close(impl->control_fd); @@ -529,6 +759,7 @@ error: if (impl->timing_fd > 0) close(impl->timing_fd); impl->timing_fd = -1; + return -EIO; } static void rtsp_announce_reply(void *data, int status, const struct spa_dict *headers) @@ -617,7 +848,7 @@ static int rsa_encrypt(uint8_t *data, int len, uint8_t *res) return size; } -static void rtsp_do_announce(struct impl *impl) +static int rtsp_do_announce(struct impl *impl) { const char *host; uint8_t rsakey[512]; @@ -629,51 +860,56 @@ static void rtsp_do_announce(struct impl *impl) host = pw_properties_get(impl->props, "raop.hostname"); - if (impl->protocol == 0) + if (impl->protocol == PROTO_TCP) frames = FRAMES_PER_TCP_PACKET; else frames = FRAMES_PER_UDP_PACKET; - pw_getrandom(impl->key, sizeof(impl->key), 0); - AES_set_encrypt_key(impl->key, 128, &impl->aes); - pw_getrandom(impl->iv, sizeof(impl->iv), 0); - - pw_log_info("aes %p", &impl->aes); - - i = rsa_encrypt(impl->key, 16, rsakey); - base64_encode(rsakey, i, key, '='); - base64_encode(impl->iv, 16, iv, '='); + impl->block_size = frames * impl->frame_size; pw_rtsp_client_get_local_ip(impl->rtsp, &ip_version, local_ip, sizeof(local_ip)); -#if 0 - asprintf(&sdp, "v=0\r\n" - "o=iTunes %s 0 IN IP%d %s\r\n" - "s=iTunes\r\n" - "c=IN IP%d %s\r\n" - "t=0 0\r\n" - "m=audio 0 RTP/AVP 96\r\n" - "a=rtpmap:96 AppleLossless\r\n" - "a=fmtp:96 %d 0 16 40 10 14 2 255 0 0 44100\r\n", - impl->session_id, ip_version, local_ip, - ip_version, host, frames); -#else - asprintf(&sdp, "v=0\r\n" - "o=iTunes %s 0 IN IP%d %s\r\n" - "s=iTunes\r\n" - "c=IN IP%d %s\r\n" - "t=0 0\r\n" - "m=audio 0 RTP/AVP 96\r\n" - "a=rtpmap:96 AppleLossless\r\n" - "a=fmtp:96 %d 0 16 40 10 14 2 255 0 0 44100\r\n" - "a=rsaaeskey:%s\r\n" - "a=aesiv:%s\r\n", - impl->session_id, ip_version, local_ip, - ip_version, host, frames, key, iv); -#endif + switch (impl->encryption) { + case CRYPTO_NONE: + asprintf(&sdp, "v=0\r\n" + "o=iTunes %s 0 IN IP%d %s\r\n" + "s=iTunes\r\n" + "c=IN IP%d %s\r\n" + "t=0 0\r\n" + "m=audio 0 RTP/AVP 96\r\n" + "a=rtpmap:96 AppleLossless\r\n" + "a=fmtp:96 %d 0 16 40 10 14 2 255 0 0 44100\r\n", + impl->session_id, ip_version, local_ip, + ip_version, host, frames); + break; - pw_rtsp_client_send(impl->rtsp, "ANNOUNCE", &impl->headers->dict, + case CRYPTO_RSA: + pw_getrandom(impl->key, sizeof(impl->key), 0); + AES_set_encrypt_key(impl->key, 128, &impl->aes); + pw_getrandom(impl->iv, sizeof(impl->iv), 0); + + i = rsa_encrypt(impl->key, 16, rsakey); + base64_encode(rsakey, i, key, '='); + base64_encode(impl->iv, 16, iv, '='); + + asprintf(&sdp, "v=0\r\n" + "o=iTunes %s 0 IN IP%d %s\r\n" + "s=iTunes\r\n" + "c=IN IP%d %s\r\n" + "t=0 0\r\n" + "m=audio 0 RTP/AVP 96\r\n" + "a=rtpmap:96 AppleLossless\r\n" + "a=fmtp:96 %d 0 16 40 10 14 2 255 0 0 44100\r\n" + "a=rsaaeskey:%s\r\n" + "a=aesiv:%s\r\n", + impl->session_id, ip_version, local_ip, + ip_version, host, frames, key, iv); + break; + default: + return -ENOTSUP; + } + return pw_rtsp_client_send(impl->rtsp, "ANNOUNCE", &impl->headers->dict, "application/sdp", sdp, rtsp_announce_reply, impl); } @@ -745,7 +981,7 @@ static int create_stream(struct impl *impl) const char *hostname, *port; uint32_t session_id; - impl->stream = pw_stream_new(impl->core, "example sink", impl->stream_props); + impl->stream = pw_stream_new(impl->core, "RAOP sink", impl->stream_props); impl->stream_props = NULL; if (impl->stream == NULL) @@ -763,7 +999,6 @@ static int create_stream(struct impl *impl) if ((res = pw_stream_connect(impl->stream, PW_DIRECTION_INPUT, PW_ID_ANY, - PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS, params, n_params)) < 0) @@ -1005,6 +1240,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, pw_properties_get(props, PW_KEY_NODE_NAME)); + if (pw_properties_get(props, PW_KEY_NODE_LATENCY) == NULL) + pw_properties_set(props, PW_KEY_NODE_LATENCY, "352/44100"); if ((str = pw_properties_get(props, "stream.props")) != NULL) pw_properties_update_string(impl->stream_props, str, strlen(str)); @@ -1025,6 +1262,37 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) goto error; } + if ((str = pw_properties_get(props, "raop.transport")) == NULL) + str = "udp"; + if (spa_streq(str, "udp")) + impl->protocol = PROTO_UDP; + else if (spa_streq(str, "tcp")) + impl->protocol = PROTO_TCP; + else { + pw_log_error( "can't handle transport %s", str); + goto error; + } + + if ((str = pw_properties_get(props, "raop.encryption.type")) == NULL) + str = "none"; + if (spa_streq(str, "none")) + impl->encryption = CRYPTO_NONE; + else if (spa_streq(str, "RSA")) + impl->encryption = CRYPTO_RSA; + else { + pw_log_error( "can't handle encryption type %s", str); + goto error; + } + + if ((str = pw_properties_get(props, "raop.audio.codec")) == NULL) + str = "PCM"; + if (spa_streq(str, "PCM")) + impl->codec = CODEC_PCM; + else { + pw_log_error( "can't handle codec type %s", str); + goto error; + } + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { str = pw_properties_get(props, PW_KEY_REMOTE_NAME);