aboutsummaryrefslogtreecommitdiff
path: root/stream.h
blob: b55c8ed2cbb72bd0d5f6332a48fbd536f60b17cf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/*************************************************************
 *
 *    streaming external for PD
 *
 * File: stream.h
 *
 * Description: Declaration of the streamer class
 *
 * Author: Thomas Grill (t.grill@gmx.net)
 *
 *************************************************************/

#ifndef __STREAM_H
#define __STREAM_H

#include <pthread.h>
 
#include <assert.h>
#define ASSERT assert

#include <string>

#include <samplerate.h>

#include "fifo.h"
#include "socket.h"


/*! Class representing an abstract stream.
	\note Must be inherited for a special stream format
*/
class Stream
{
public:
	Stream();
	virtual ~Stream();

	//! Return true if in initializing state
	bool isInitializing() const { return state == ST_INIT || state == ST_RECONNECT; }
	//! Return true if socket is valid
	bool isOk() const { return file >= 0; }

	// Get/set size of encoder FIFO
	int getStreamBufSize() const { return encoded.Size(); }
	bool setStreamBufSize(int sz) { return encoded.Resize(sz,true); }

	// Get/set size of encoder chunk (amount of data written from the stream at once)
	int getStreamBufChunk() const { return encchunk; }
	void setStreamBufChunk(int sz) { if(encchunk > 0) encchunk = sz; }

	// Get/set buffer size ratio under which refilling is triggered (0...1)
	float getStreamBufThresh() const { return encthresh; }
	void setStreamBufThresh(float r) { if(encthresh > 0 && encthresh <= 1) encthresh = r; }

	//! Get ratio of filling (0...1)
	float getFilling() const { return (float)encoded.Have()/encoded.Size(); }

	//! name of stream
	virtual std::string getTag(const char *tag) const { return ""; }
	//! number of stream channels
	virtual int getChannels() const = 0;
	//! stream sample rate
	virtual float getSamplerate() const = 0;
	//! nominal stream bit rate
	virtual float getBitrate() const = 0;

	const std::string &getHostname() const { return hostname; }
	const std::string &getMountpoint() const { return mountpt; }
	int getPort() const { return port; }

	/*! Initialize stream */
	bool doInit(const char *url);

	/*! Disconnect from stream */
	bool doExit();

	//! Get a number of sample frames
	int doGet(int ch,float *const *buf,int frames,float sr);

	// Debug flag
	volatile bool debug;

protected:

	//! Reset encoder state (disconnect and clear FIFOs)
	virtual void Reset();
	//! Reset URL
	void ResetHost();

	//! Init decoder
	virtual bool WorkInit() = 0;
	//! Decode data to channel buffers
	virtual int DataRead(int frames) = 0;

	//! Read stream data to encoder FIFO
	int ReadChunk(int chunk,bool unlock);
	//! Read stream data to buffer
	int ReadChunk(char *buf,int chunk,bool unlock);

	//! Set hostname, mountpt, port
	bool SetURL(const char *url);

	std::string hostname,mountpt;
	int port;

	// --- FIFO for encoded stream data -----------------------

	int encchunk;		//! Size of data chunk to get from socket at once
	float encthresh;	//! Ratio of fifo filling to keep up to
	Fifo<char> encoded; //! Fifo for encoded stream data

	// --- low-level socket stuff ------------------------------

	//! stream socket
	volatile SOCKET file;

	//! Connect to stream
	static SOCKET Connect(const char *hostname,const char *mountpoint,int portno);
	//! Disonnect from stream
	static void Disconnect(SOCKET fd);
	//! Read data from stream
	static int Read(SOCKET fd,char *buf,int size,int timeout = 1000);

	// --- threading stuff -------------------------------------

	//! status type
	enum state_t {
		ST_IDLE,  // nothing to do
		ST_INIT,  // shall connect
		ST_PROCESS,  // do decoding
		ST_WAIT,  // wait a bit
		ST_RECONNECT,  // try to reconnect
		ST_FINISHED  // waiting for shutdown
	};

	volatile bool exit;  //! exit flag
	volatile state_t state;  //! decoder state 
	pthread_mutex_t mutex;  //! thread mutex
	pthread_cond_t cond;	//! thread conditional
	pthread_t thrid;  //! worker thread ID

	int waitgrain,waitreconnect;

	static void *thr_func(void *th);
	void Work();

	// --- channel buffers --------------------------------------

	int bufch;
	float **bufs;
	Fifo<float> *decoded;

	// --- SRC stuff --------------------------------------------

	int src_channels;
	double src_factor;
	SRC_STATE **src_state;
};

#endif // __STREAM_H