diff --git a/Qiniu-librtmp/Pod/Classes/pili-librtmp/PushModule.c b/Qiniu-librtmp/Pod/Classes/pili-librtmp/PushModule.c new file mode 100644 index 0000000..63c85c7 --- /dev/null +++ b/Qiniu-librtmp/Pod/Classes/pili-librtmp/PushModule.c @@ -0,0 +1,138 @@ +// +// PushModule.c +// rtmpdump_test +// +// Created by 李雪岩 on 2017/4/26. +// Copyright © 2017年 hongduoxing. All rights reserved. +// + +#include "PushModule.h" +#include "rtmp.h" +#include + +extern panda_push_module_t xypush_module; +extern panda_push_module_t rtmppush_module; + +/* 定义所有模块 */ +panda_push_module_t *global_modules[] = { + &rtmppush_module, + &xypush_module + /* 其他厂商的模块加在这里即可 */ +}; + + + +/*导入所有模块*/ +int +expore_all_module(char *negotiate) +{ + int end = sizeof(global_modules)/sizeof(global_modules[0]); + int i; + for(i = 0; i < end; i++) { + strcat(negotiate, global_modules[i]->module_name); + if(i < end-1) + strcat(negotiate, ","); + PILI_RTMP_Log(PILI_RTMP_LOGDEBUG, "export module name=[%s].\n",global_modules[i]->module_name); + } + PILI_RTMP_Log(PILI_RTMP_LOGDEBUG, "negotiate: %s\n",negotiate); + return 0; + +} + + +int +rtmp_packet_to_flv(PILI_RTMPPacket *packet, char *flv_tag, int tag_size) +{ + if(tag_size != (1+3+4+packet->m_nBodySize)) { + return -1; + } + memcpy(flv_tag, packet->m_packetType, sizeof(packet->m_packetType)); /*type*/ + memcpy(flv_tag, packet->m_nBodySize, 3); /*datalen*/ + memcpy(flv_tag, packet->m_nTimeStamp, 4); /*timestamp3 + extra1*/ + memcpy(flv_tag, 0, 3); /*stream id always 0*/ + memcpy(flv_tag, packet->m_body, packet->m_nBodySize); /*body*/ + + return 0; +} + +/* 根据服务器返回选择传输模块 */ +panda_push_module_t * +select_module(PILI_AVal *negotiate) +{ + int i; + + for (i = 0; i < sizeof(global_modules)/sizeof(global_modules[0]); ++i) { + + if(strncmp(global_modules[i]->module_name, negotiate->av_val, negotiate->av_len) == 0) + { + PILI_RTMP_Log(PILI_RTMP_LOGINFO, "Get module [%s].", global_modules[i]->module_name); + return global_modules[i]; + } + + } + return &rtmppush_module; +} + +/* 定义星域推流模块 */ +static int xypush_module_init(void *arg, void *err); +static int xypush_module_release(void *arg); +static int xypush_module_push(void*, void*, uint32_t, void*); + +//static struct XYPushSession *s = NULL; + +panda_push_module_t xypush_module = +{ + "XYPushModule", + xypush_module_init, + xypush_module_release, + xypush_module_push +}; + + +int xypush_module_init(void *arg, void *err) +{ + return TRUE; +} + +int xypush_module_release(void *arg) +{ + return 0; +} + +int xypush_module_push(void *rtmp, void *buf, uint32_t size, void *err) +{ + return TRUE; +} + + +/* 定义rtmp默认推流模块 */ +static int rtmp_module_init(void *arg, void *err); +static int rtmp_module_release(void *arg); +static int rtmp_module_push(void*, void*, uint32_t, void*); + +panda_push_module_t rtmppush_module = +{ + "RTMPPushModule", + rtmp_module_init, + rtmp_module_release, + rtmp_module_push +}; + + +//return TRUE for ok, FALSE or other for err; +int rtmp_module_init(void *arg, void *err) +{ + return PILI_RTMP_ConnectStream_Module(arg, err); +} +int rtmp_module_release(void *arg) +{ + return 0; +} + + +int rtmp_module_push(void* rtmp, void* buf, uint32_t size, void* err) +{ + return PILI_RTMP_Write_Module(rtmp, buf, size, err); +} + + diff --git a/Qiniu-librtmp/Pod/Classes/pili-librtmp/PushModule.h b/Qiniu-librtmp/Pod/Classes/pili-librtmp/PushModule.h new file mode 100644 index 0000000..892fd08 --- /dev/null +++ b/Qiniu-librtmp/Pod/Classes/pili-librtmp/PushModule.h @@ -0,0 +1,47 @@ +// +// PushModule.h +// rtmpdump_test +// +// Created by 李雪岩 on 2017/4/26. +// Copyright © 2017年 hongduoxing. All rights reserved. +// + +#ifndef PushModule_h +#define PushModule_h + +#include +#include "amf.h" +#include "log.h" +#include "rtmp.h" +#ifdef __cplusplus +extern "C" { +#endif + + + /* 模块定义 */ + typedef struct panda_push_module_s { + + const char *module_name; + /* (PILI_RTMP*, RTMP_Error*) */ + int (*init)(void*, void*); + + /* PILI_RTMP* */ + int (*release)(void*); + + /* PILI_RTMP*, const char*, int, RTMP_Error* */ + int (*push_message_push)(void *rtmp, void *buf, uint32_t size, void *err); + + }panda_push_module_t; + + int + rtmp_packet_to_flv(struct PILI_RTMPPacket *packet, char *flv_tag, int flv_tag_size); + + int expore_all_module(char *negotiate); + panda_push_module_t *select_module(PILI_AVal *negotiate); + +#ifdef __cplusplus +} +#endif + + +#endif /* PushModule_h */ diff --git a/Qiniu-librtmp/Pod/Classes/pili-librtmp/log.c b/Qiniu-librtmp/Pod/Classes/pili-librtmp/log.c index 8714f1d..b15950e 100644 --- a/Qiniu-librtmp/Pod/Classes/pili-librtmp/log.c +++ b/Qiniu-librtmp/Pod/Classes/pili-librtmp/log.c @@ -32,7 +32,7 @@ #define MAX_PRINT_LEN 2048 -PILI_RTMP_LogLevel PILI_RTMP_debuglevel = PILI_RTMP_LOGERROR; +PILI_RTMP_LogLevel PILI_RTMP_debuglevel = PILI_RTMP_LOGALL; static int neednl; diff --git a/Qiniu-librtmp/Pod/Classes/pili-librtmp/main.cpp b/Qiniu-librtmp/Pod/Classes/pili-librtmp/main.cpp new file mode 100644 index 0000000..79ffa47 --- /dev/null +++ b/Qiniu-librtmp/Pod/Classes/pili-librtmp/main.cpp @@ -0,0 +1,410 @@ +/** + * Simplest Librtmp Send FLV + * + * * This program can send local flv file to net server as a rtmp live stream. + */ + +#include +#include +#include +#include +#ifndef WIN32 +#include +#endif + + +#include "rtmp_sys.h" +#include "log.h" +//#include "PushModule.h" + +#define HTON16(x) ((x>>8&0xff)|(x<<8&0xff00)) +#define HTON24(x) ((x>>16&0xff)|(x<<16&0xff0000)|(x&0xff00)) +#define HTON32(x) ((x>>24&0xff)|(x>>8&0xff00)|\ +(x<<8&0xff0000)|(x<<24&0xff000000)) +#define HTONTIME(x) ((x>>16&0xff)|(x<<16&0xff0000)|(x&0xff00)|(x&0xff000000)) + +/*read 1 byte*/ +int ReadU8(uint32_t *u8,FILE*fp){ + if(fread(u8,1,1,fp)!=1) + return 0; + return 1; +} +/*read 2 byte*/ +int ReadU16(uint32_t *u16,FILE*fp){ + if(fread(u16,2,1,fp)!=1) + return 0; + *u16=HTON16(*u16); + return 1; +} +/*read 3 byte*/ +int ReadU24(uint32_t *u24,FILE*fp){ + if(fread(u24,3,1,fp)!=1) + return 0; + *u24=HTON24(*u24); + return 1; +} +/*read 4 byte*/ +int ReadU32(uint32_t *u32,FILE*fp){ + if(fread(u32,4,1,fp)!=1) + return 0; + *u32=HTON32(*u32); + return 1; +} +/*read 1 byte,and loopback 1 byte at once*/ +int PeekU8(uint32_t *u8,FILE*fp){ + if(fread(u8,1,1,fp)!=1) + return 0; + fseek(fp,-1,SEEK_CUR); + return 1; +} +/*read 4 byte and convert to time format*/ +int ReadTime(uint32_t *utime,FILE*fp){ + if(fread(utime,4,1,fp)!=1) + return 0; + *utime=HTONTIME(*utime); + return 1; +} + +int InitSockets() +{ +#ifdef WIN32 + WORD version; + WSADATA wsaData; + version=MAKEWORD(2,2); + return (WSAStartup(version, &wsaData) == 0); +#endif + return TRUE; +} + +void CleanupSockets() +{ +#ifdef WIN32 + WSACleanup(); +#endif +} + +//Publish using RTMP_SendPacket() +int publish_using_packet(){ + + PILI_RTMP *rtmp=NULL; + PILI_RTMPPacket *packet=NULL; + uint32_t start_time=0; + uint32_t now_time=0; + //the timestamp of the previous frame + long pre_frame_time=0; + long lasttime=0; + int bNextIsKey=1; + uint32_t preTagsize=0; + + //packet attributes + uint32_t type=0; + uint32_t datalength=0; + uint32_t timestamp=0; + uint32_t streamid=0; + + FILE*fp=NULL; + fp=fopen("/Users/leexueyan/片段.flv","rb"); + if (!fp){ + PILI_RTMP_LogPrintf("Open File Error.\n"); + CleanupSockets(); + return -1; + } + + /* set log level */ + PILI_RTMP_LogLevel loglvl = PILI_RTMP_LOGALL; + PILI_RTMP_LogSetLevel(loglvl); + + if (!InitSockets()){ + PILI_RTMP_LogPrintf("Init Socket Err\n"); + return -1; + } + + rtmp=PILI_RTMP_Alloc(); + PILI_RTMP_Init(rtmp); + //set connection timeout,default 30s + rtmp->Link.timeout=5; + RTMPError* err = NULL; + if(!PILI_RTMP_SetupURL(rtmp,"rtmp://live.test.com/live/stream1", err)) + { + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"SetupURL Err\n"); + PILI_RTMP_Free(rtmp); + CleanupSockets(); + return -1; + } + + //if unable,the AMF command would be 'play' instead of 'publish' + PILI_RTMP_EnableWrite(rtmp); + + if (!PILI_RTMP_Connect(rtmp,NULL, err)){ + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"Connect Err\n"); + PILI_RTMP_Free(rtmp); + CleanupSockets(); + return -1; + } + + if (!PILI_RTMP_ConnectStream(rtmp,0,err)){ + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"ConnectStream Err\n"); + PILI_RTMP_Close(rtmp,err); + PILI_RTMP_Free(rtmp); + CleanupSockets(); + return -1; + } + + packet=(PILI_RTMPPacket*)malloc(sizeof(PILI_RTMPPacket)); + PILI_RTMPPacket_Alloc(packet,1024*256); + PILI_RTMPPacket_Reset(packet); + + packet->m_hasAbsTimestamp = 0; + packet->m_nChannel = 0x04; + packet->m_nInfoField2 = rtmp->m_stream_id; + + PILI_RTMP_LogPrintf("Start to send data ...\n"); + + //jump over FLV Header + fseek(fp,9,SEEK_SET); + //jump over previousTagSizen + fseek(fp,4,SEEK_CUR); + start_time=PILI_RTMP_GetTime(); + while(1) + { + if((((now_time=PILI_RTMP_GetTime())-start_time) + <(pre_frame_time)) && bNextIsKey){ + //wait for 1 sec if the send process is too fast + //this mechanism is not very good,need some improvement + if(pre_frame_time>lasttime){ + PILI_RTMP_LogPrintf("TimeStamp:%8lu ms\n",pre_frame_time); + lasttime=pre_frame_time; + } + sleep(1000); + continue; + } + + //not quite the same as FLV spec + if(!ReadU8(&type,fp)) + break; + if(!ReadU24(&datalength,fp)) + break; + if(!ReadTime(×tamp,fp)) + break; + if(!ReadU24(&streamid,fp)) + break; + + if (type!=0x08&&type!=0x09){ + //jump over non_audio and non_video frame, + //jump over next previousTagSizen at the same time + fseek(fp,datalength+4,SEEK_CUR); + continue; + } + + if(fread(packet->m_body,1,datalength,fp)!=datalength) + break; + + packet->m_headerType = RTMP_PACKET_SIZE_LARGE; + packet->m_nTimeStamp = timestamp; + packet->m_packetType = type; + packet->m_nBodySize = datalength; + pre_frame_time=timestamp; + + if (!PILI_RTMP_IsConnected(rtmp)){ + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"rtmp is not connect\n"); + break; + } + if (!PILI_RTMP_SendPacket(rtmp,packet,0,err)){ + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"Send Error\n"); + break; + } + + if(!ReadU32(&preTagsize,fp)) + break; + + if(!PeekU8(&type,fp)) + break; + if(type==0x09){ + if(fseek(fp,11,SEEK_CUR)!=0) + break; + if(!PeekU8(&type,fp)){ + break; + } + if(type==0x17) + bNextIsKey=1; + else + bNextIsKey=0; + + fseek(fp,-11,SEEK_CUR); + } + } + + PILI_RTMP_LogPrintf("\nSend Data Over\n"); + + if(fp) + fclose(fp); + + if (rtmp!=NULL){ + PILI_RTMP_Close(rtmp,err); + PILI_RTMP_Free(rtmp); + rtmp=NULL; + } + if (packet!=NULL){ + PILI_RTMPPacket_Free(packet); + free(packet); + packet=NULL; + } + + CleanupSockets(); + return 0; +} + +//Publish using RTMP_Write() +int publish_using_write(){ + uint32_t start_time=0; + uint32_t now_time=0; + uint32_t pre_frame_time=0; + uint32_t lasttime=0; + int bNextIsKey=0; + char* pFileBuf=NULL; + + //read from tag header + uint32_t type=0; + uint32_t datalength=0; + uint32_t timestamp=0; + + PILI_RTMP *rtmp=NULL; + RTMPError *err; + FILE*fp=NULL; + fp=fopen("/Users/leexueyan/片段.flv","rb"); + if (!fp){ + PILI_RTMP_LogPrintf("Open File Error.\n"); + CleanupSockets(); + return -1; + } + + /* set log level */ + //PILI_RTMP_LogLevel loglvl=PILI_RTMP_LOGDEBUG; + //PILI_RTMP_LogSetLevel(loglvl); + + if (!InitSockets()){ + PILI_RTMP_LogPrintf("Init Socket Err\n"); + return -1; + } + + rtmp=PILI_RTMP_Alloc(); + PILI_RTMP_Init(rtmp); + //set connection timeout,default 30s + rtmp->Link.timeout=5; + if(!PILI_RTMP_SetupURL(rtmp,"rtmp://live.test.com:2004/live/stream1",err)) + { + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"SetupURL Err\n"); + PILI_RTMP_Free(rtmp); + CleanupSockets(); + return -1; + } + + PILI_RTMP_EnableWrite(rtmp); + //1hour + PILI_RTMP_SetBufferMS(rtmp, 3600*1000); + if (!PILI_RTMP_Connect(rtmp,NULL, err)){ + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"Connect Err\n"); + PILI_RTMP_Free(rtmp); + CleanupSockets(); + return -1; + } + + if (!PILI_RTMP_ConnectStream(rtmp,0,err)){ + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"ConnectStream Err\n"); + PILI_RTMP_Close(rtmp,err); + PILI_RTMP_Free(rtmp); + CleanupSockets(); + return -1; + } + + printf("Start to send data ...\n"); + + //jump over FLV Header + fseek(fp,9,SEEK_SET); + //jump over previousTagSizen + fseek(fp,4,SEEK_CUR); + start_time=PILI_RTMP_GetTime(); + while(1) + { + if((((now_time=PILI_RTMP_GetTime())-start_time) + <(pre_frame_time)) && bNextIsKey){ + //wait for 1 sec if the send process is too fast + //this mechanism is not very good,need some improvement + if(pre_frame_time>lasttime){ + PILI_RTMP_LogPrintf("TimeStamp:%8lu ms\n",pre_frame_time); + lasttime=pre_frame_time; + } + sleep(1000); + continue; + } + + //jump over type + fseek(fp,1,SEEK_CUR); + if(!ReadU24(&datalength,fp)) + break; + if(!ReadTime(×tamp,fp)) + break; + //jump back + fseek(fp,-8,SEEK_CUR); + + pFileBuf=(char*)malloc(11+datalength+4); + memset(pFileBuf,0,11+datalength+4); + if(fread(pFileBuf,1,11+datalength+4,fp)!=(11+datalength+4)) + break; + + pre_frame_time=timestamp; + + if (!PILI_RTMP_IsConnected(rtmp)){ + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"rtmp is not connect\n"); + break; + } + if (!PILI_RTMP_Write(rtmp,pFileBuf,11+datalength+4, err)){ + PILI_RTMP_Log(PILI_RTMP_LOGERROR,"Rtmp Write Error\n"); + break; + } + + free(pFileBuf); + pFileBuf=NULL; + + if(!PeekU8(&type,fp)) + break; + if(type==0x09){ + if(fseek(fp,11,SEEK_CUR)!=0) + break; + if(!PeekU8(&type,fp)){ + break; + } + if(type==0x17) + bNextIsKey=1; + else + bNextIsKey=0; + fseek(fp,-11,SEEK_CUR); + } + } + + PILI_RTMP_LogPrintf("\nSend Data Over\n"); + + if(fp) + fclose(fp); + + if (rtmp!=NULL){ + PILI_RTMP_Close(rtmp, err); + PILI_RTMP_Free(rtmp); + rtmp=NULL; + } + + if(pFileBuf){ + free(pFileBuf); + pFileBuf=NULL; + } + + CleanupSockets(); + return 0; +} + +int main(int argc, char* argv[]){ + //2 Methods: + publish_using_packet(); + //publish_using_write(); + return 0; +} diff --git a/Qiniu-librtmp/Pod/Classes/pili-librtmp/rtmp.c b/Qiniu-librtmp/Pod/Classes/pili-librtmp/rtmp.c index 6aaab01..b3e13e2 100644 --- a/Qiniu-librtmp/Pod/Classes/pili-librtmp/rtmp.c +++ b/Qiniu-librtmp/Pod/Classes/pili-librtmp/rtmp.c @@ -33,6 +33,8 @@ #include "log.h" #include "rtmp_sys.h" #include "time.h" +#include "rtmp.h" + #ifdef CRYPTO #ifdef USE_POLARSSL @@ -145,6 +147,57 @@ uint32_t #endif } + + + +int AMF_Dump_name(PILI_AMFObject *obj, PILI_AVal *p_name) +{ + int n, ret; + for (n = 0; n < obj->o_num; ++n) + { + ret = AMFProp_Dump_name(&obj->o_props[n], p_name); + if(ret == 0) + { + return ret; + } + } + return -1; +} + + +int +AMFProp_Dump_name(PILI_AMFObjectProperty *prop, PILI_AVal *p_name) +{ + PILI_AVal name; + int ret; + if (prop->p_type != PILI_AMF_STRING && prop->p_type != PILI_AMF_OBJECT) + { + return -1; + } + if (prop->p_type == PILI_AMF_OBJECT) + { + ret = AMF_Dump_name(&prop->p_vu.p_object, p_name); + return ret; + } + + if (prop->p_name.av_len) + { + PILI_RTMP_Log(PILI_RTMP_LOGDEBUG, "prop_name:%s.", prop->p_name.av_val); + if(strncmp(prop->p_name.av_val, p_name->av_val, p_name->av_len) == 0) + { + snprintf(p_name->av_val, 255, "%.*s", prop->p_vu.p_aval.av_len, + prop->p_vu.p_aval.av_val); + p_name->av_len = prop->p_vu.p_aval.av_len; + return 0; + } + return -1; + } + return -1; +} + + + + void PILI_RTMP_UserInterrupt() { PILI_RTMP_ctrlC = TRUE; } @@ -1027,6 +1080,12 @@ int PILI_RTMP_Connect1(PILI_RTMP *r, PILI_RTMPPacket *cp, RTMPError *error) { int PILI_RTMP_Connect(PILI_RTMP *r, PILI_RTMPPacket *cp, RTMPError *error) { //获取hub char hub[5] = {0}; + char negotiate[4096] = {0}; + + expore_all_module(negotiate); + r->Link.negotiate.av_val = negotiate; + r->Link.negotiate.av_len = strlen(negotiate); + if (r->Link.app.av_len>4) { strncpy(hub, r->Link.app.av_val,4); }else if(r->Link.app.av_len>0){ @@ -1128,10 +1187,13 @@ static int int PILI_RTMP_ConnectStream(PILI_RTMP *r, int seekTime, RTMPError *error) { PILI_RTMPPacket packet = {0}; - + int ret; /* seekTime was already set by SetupStream / SetupURL. * This is only needed by ReconnectStream. */ + if(r->push_module != NULL) + goto push_module; + if (seekTime > 0) r->Link.seekTime = seekTime; @@ -1150,10 +1212,77 @@ int PILI_RTMP_ConnectStream(PILI_RTMP *r, int seekTime, RTMPError *error) { } PILI_RTMP_ClientPacket(r, &packet); + if(r->push_module) { + ret = r->push_module->init(r, error); + return ret; + } PILI_RTMPPacket_Free(&packet); } } +push_module: + + if(r->push_module) { + + r->push_module->release(r); + } + ret = r->push_module->init(r, error); + if(ret == 0) { + return TRUE; + } + r->push_module->release(r); + error->code = PILI_RTMPErrorRTMPConnectStreamFailed; + return ret; +} + + +static int SendReleaseStream(PILI_RTMP *r, RTMPError *error); +static int SendFCPublish(PILI_RTMP *r, RTMPError *error); + +int PILI_RTMP_ConnectStream_Module(PILI_RTMP *r, RTMPError *error) { + PILI_RTMPPacket packet = {0}; + + if (r->Link.protocol & RTMP_FEATURE_WRITE) + { + SendReleaseStream(r, error); + SendFCPublish(r, error); + } + else + { + PILI_RTMP_SendServerBW(r, error); + PILI_RTMP_SendCtrl(r, 3, 0, 300, error); + } + PILI_RTMP_SendCreateStream(r, error); + + if (!(r->Link.protocol & RTMP_FEATURE_WRITE)) + { + /* Send the FCSubscribe if live stream or if subscribepath is set */ + if (r->Link.subscribepath.av_len) + PILI_SendFCSubscribe(r, &r->Link.subscribepath, error); + else if (r->Link.lFlags & RTMP_LF_LIVE) + PILI_SendFCSubscribe(r, &r->Link.playpath,error); + } + + while (!r->m_bPlaying && PILI_RTMP_IsConnected(r) && PILI_RTMP_ReadPacket(r, &packet)) + { + if (RTMPPacket_IsReady(&packet)) + { + if (!packet.m_nBodySize) + continue; + if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) || + (packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) || + (packet.m_packetType == RTMP_PACKET_TYPE_INFO)) + { + PILI_RTMP_Log(PILI_RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring."); + PILI_RTMPPacket_Free(&packet); + continue; + } + + PILI_RTMP_ClientPacket(r, &packet); + PILI_RTMPPacket_Free(&packet); + } + } + if (!r->m_bPlaying && error) { char *msg = "PILI_RTMP connect stream failed."; PILI_RTMPError_Alloc(error, strlen(msg)); @@ -1164,7 +1293,9 @@ int PILI_RTMP_ConnectStream(PILI_RTMP *r, int seekTime, RTMPError *error) { return r->m_bPlaying; } -int PILI_RTMP_ReconnectStream(PILI_RTMP *r, int seekTime, RTMPError *error) { + + +int PILI_RTMP_ReconnecConn(PILI_RTMP *r, int seekTime, RTMPError *error) { PILI_RTMP_DeleteStream(r, error); PILI_RTMP_SendCreateStream(r, error); @@ -1653,6 +1784,7 @@ SAVC(secureTokenResponse); SAVC(type); SAVC(nonprivate); SAVC(xreqid); +SAVC(negotiate); static int PILI_SendConnectPacket(PILI_RTMP *r, PILI_RTMPPacket *cp, RTMPError *error) { @@ -1709,6 +1841,11 @@ static int if (!enc) return FALSE; } + if (r->Link.negotiate.av_len) { + enc = PILI_AMF_EncodeNamedString(enc, pend, &av_negotiate, &r->Link.negotiate); + if (!enc) + return FALSE; + } if (!(r->Link.protocol & RTMP_FEATURE_WRITE)) { enc = PILI_AMF_EncodeNamedBoolean(enc, pend, &av_fpad, FALSE); if (!enc) @@ -2477,6 +2614,11 @@ static int PILI_HandleInvoke(PILI_RTMP *r, const char *body, unsigned int nBodySize) { PILI_AMFObject obj; PILI_AVal method; + PILI_AVal negotiate; + char p_name[256] = "negotiate"; + negotiate.av_val = p_name; + negotiate.av_len = strlen(p_name); + int txn; int ret = 0, nRes; if (body[0] != 0x02) /* make sure it is a string method name we start with */ @@ -2527,6 +2669,19 @@ static int SendSecureTokenResponse(r, &p.p_vu.p_aval, &error); } } + if (r->Link.negotiate.av_len) { + AMF_Dump_name(&obj, &negotiate); + if(negotiate.av_len != 0) + { + + r->push_module = select_module(&negotiate); + + free(methodInvoked.av_val); + PILI_AMF_Reset(&obj); + return 0; + } + + } if (r->Link.protocol & RTMP_FEATURE_WRITE) { SendReleaseStream(r, &error); SendFCPublish(r, &error); @@ -3293,6 +3448,33 @@ int PILI_RTMP_SendChunk(PILI_RTMP *r, PILI_RTMPChunk *chunk, RTMPError *error) { } int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPError *error) { + static i = 0; + int ret; + int tag_size = 1 + 3 + 3 + 1 + 3 + packet->m_nBodySize; + + if(r->push_module == NULL || strncmp(r->push_module->module_name, "RTMPPushModule", strlen("RTMPPushModule")) == 0) { + ret = PILI_RTMP_SendPacket_Module(r, packet, queue, error); + } else { + char *flv_tag = malloc(tag_size); + if(flv_tag == NULL) { + PILI_RTMP_Log(PILI_RTMP_LOGERROR, "malloc error"); + return FALSE; + } + memset(flv_tag, 0, tag_size); + + PILI_RTMP_Log(PILI_RTMP_LOGERROR, "malloc1 %p, tag_size: %d", flv_tag, tag_size); + ret = rtmp_packet_to_flv(packet, flv_tag, tag_size); + r->push_module->push_message_push(r, flv_tag, tag_size, error); + free(flv_tag); + flv_tag = NULL; + + } + + return ret; + +} + +int PILI_RTMP_SendPacket_Module(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPError *error) { const PILI_RTMPPacket *prevPacket = r->m_vecChannelsOut[packet->m_nChannel]; uint32_t last = 0; int nSize; @@ -3302,17 +3484,17 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE char *buffer, *tbuf = NULL, *toff = NULL; int nChunkSize; int tlen; - + if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE) { /* compress a bit by using the prev packet's attributes */ if (prevPacket->m_nBodySize == packet->m_nBodySize && prevPacket->m_packetType == packet->m_packetType && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM) packet->m_headerType = RTMP_PACKET_SIZE_SMALL; - + if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp && packet->m_headerType == RTMP_PACKET_SIZE_SMALL) packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM; last = prevPacket->m_nTimeStamp; } - + if (packet->m_headerType > 3) /* sanity */ { if (error) { @@ -3321,18 +3503,18 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE error->code = PILI_RTMPErrorSanityFailed; strcpy(error->message, msg); } - + PILI_RTMP_Log(PILI_RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.", - (unsigned char)packet->m_headerType); - + (unsigned char)packet->m_headerType); + return FALSE; } - + nSize = packetSize[packet->m_headerType]; hSize = nSize; cSize = 0; t = packet->m_nTimeStamp - last; - + if (packet->m_body) { header = packet->m_body - nSize; hend = packet->m_body; @@ -3340,7 +3522,7 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE header = hbuf + 6; hend = hbuf + sizeof(hbuf); } - + if (packet->m_nChannel > 319) cSize = 2; else if (packet->m_nChannel > 63) @@ -3349,12 +3531,12 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE header -= cSize; hSize += cSize; } - + if (nSize > 1 && t >= 0xffffff) { header -= 4; hSize += 4; } - + hptr = header; c = packet->m_headerType << 6; switch (cSize) { @@ -3374,28 +3556,28 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE if (cSize == 2) *hptr++ = tmp >> 8; } - + if (nSize > 1) { hptr = PILI_AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t); } - + if (nSize > 4) { hptr = PILI_AMF_EncodeInt24(hptr, hend, packet->m_nBodySize); *hptr++ = packet->m_packetType; } - + if (nSize > 8) hptr += EncodeInt32LE(hptr, packet->m_nInfoField2); - + if (nSize > 1 && t >= 0xffffff) hptr = PILI_AMF_EncodeInt32(hptr, hend, t); - + nSize = packet->m_nBodySize; buffer = packet->m_body; nChunkSize = r->m_outChunkSize; - + PILI_RTMP_Log(PILI_RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket, - nSize); + nSize); /* send all chunks in one HTTP request */ if (r->Link.protocol & RTMP_FEATURE_HTTP) { int chunks = (nSize + nChunkSize - 1) / nChunkSize; @@ -3409,10 +3591,10 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE } while (nSize + hSize) { int wrote; - + if (nSize < nChunkSize) nChunkSize = nSize; - + PILI_RTMP_LogHexString(PILI_RTMP_LOGDEBUG2, (uint8_t *)header, hSize); PILI_RTMP_LogHexString(PILI_RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize); if (tbuf) { @@ -3426,7 +3608,7 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE nSize -= nChunkSize; buffer += nChunkSize; hSize = 0; - + if (nSize > 0) { header = buffer - 1; hSize = 1; @@ -3450,7 +3632,7 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE if (!wrote) return FALSE; } - + /* we invoked a remote method */ if (packet->m_packetType == 0x14) { PILI_AVal method; @@ -3466,13 +3648,14 @@ int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPE AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn); } } - + if (!r->m_vecChannelsOut[packet->m_nChannel]) r->m_vecChannelsOut[packet->m_nChannel] = malloc(sizeof(PILI_RTMPPacket)); memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(PILI_RTMPPacket)); return TRUE; } + int PILI_RTMP_Serve(PILI_RTMP *r, RTMPError *error) { return SHandShake(r, error); } @@ -4281,6 +4464,14 @@ int PILI_RTMP_Read(PILI_RTMP *r, char *buf, int size) { static const PILI_AVal av_setDataFrame = AVC("@setDataFrame"); int PILI_RTMP_Write(PILI_RTMP *r, const char *buf, int size, RTMPError *error) { + int ret = 0; + if(r->push_module->push_message_push != NULL) { + ret = r->push_module->push_message_push(r, (void*)buf, size, error); + } + return ret; +} + +int PILI_RTMP_Write_Module(PILI_RTMP *r, const char *buf, int size, RTMPError *error) { PILI_RTMPPacket *pkt = &r->m_write; char *pend, *enc; int s2 = size, ret, num; diff --git a/Qiniu-librtmp/Pod/Classes/pili-librtmp/rtmp.h b/Qiniu-librtmp/Pod/Classes/pili-librtmp/rtmp.h index 719d82a..033dc88 100644 --- a/Qiniu-librtmp/Pod/Classes/pili-librtmp/rtmp.h +++ b/Qiniu-librtmp/Pod/Classes/pili-librtmp/rtmp.h @@ -36,6 +36,7 @@ #include #include "amf.h" +#include "PushModule.h" #include "error.h" #ifdef __cplusplus @@ -130,6 +131,7 @@ typedef struct PILI_RTMP_LNK { PILI_AVal playpath0; /* parsed from URL */ PILI_AVal playpath; /* passed in explicitly */ PILI_AVal tcUrl; + PILI_AVal negotiate; PILI_AVal swfUrl; PILI_AVal pageUrl; PILI_AVal app; @@ -217,7 +219,7 @@ typedef struct PILI_CONNECTION_TIME { typedef void (*PILI_RTMP_ConnectionTimeCallback)( PILI_CONNECTION_TIME *conn_time, void *userData); - +struct panda_push_module_s; typedef struct PILI_RTMP { int m_inChunkSize; int m_outChunkSize; @@ -255,6 +257,7 @@ typedef struct PILI_RTMP { int m_polling; int m_resplen; int m_unackd; + struct panda_push_module_s *push_module; PILI_AVal m_clientID; PILI_RTMP_READ m_read; @@ -301,6 +304,10 @@ int PILI_RTMP_Serve(PILI_RTMP *r, RTMPError *error); int PILI_RTMP_ReadPacket(PILI_RTMP *r, PILI_RTMPPacket *packet); int PILI_RTMP_SendPacket(PILI_RTMP *r, PILI_RTMPPacket *packet, int queue, RTMPError *error); +int PILI_RTMP_SendPacket_Module(PILI_RTMP *r, PILI_RTMPPacket *packet, + int queue, + RTMPError *error); + int PILI_RTMP_SendChunk(PILI_RTMP *r, PILI_RTMPChunk *chunk, RTMPError *error); int PILI_RTMP_IsConnected(PILI_RTMP *r); int PILI_RTMP_Socket(PILI_RTMP *r); @@ -309,6 +316,8 @@ double PILI_RTMP_GetDuration(PILI_RTMP *r); int PILI_RTMP_ToggleStream(PILI_RTMP *r, RTMPError *error); int PILI_RTMP_ConnectStream(PILI_RTMP *r, int seekTime, RTMPError *error); +int PILI_RTMP_ConnectStream_Module(PILI_RTMP *r, RTMPError *error); + int PILI_RTMP_ReconnectStream(PILI_RTMP *r, int seekTime, RTMPError *error); void PILI_RTMP_DeleteStream(PILI_RTMP *r, RTMPError *error); int PILI_RTMP_GetNextMediaPacket(PILI_RTMP *r, PILI_RTMPPacket *packet); @@ -346,6 +355,9 @@ int PILI_RTMP_SendClientBW(PILI_RTMP *r, RTMPError *error); void PILI_RTMP_DropRequest(PILI_RTMP *r, int i, int freeit); int PILI_RTMP_Read(PILI_RTMP *r, char *buf, int size); int PILI_RTMP_Write(PILI_RTMP *r, const char *buf, int size, RTMPError *error); +int PILI_RTMP_Write_Module(PILI_RTMP *r, const char *buf, int size, + RTMPError *error); + #define MAJOR 1 #define MINOR 0