HARDT - The Ham Radio DSP Toolkit
hbufferedwriter.h
1 #ifndef __HBUFFEREDWRITER_H
2 #define __HBUFFEREDWRITER_H
3 
4 #include <thread>
5 #include <mutex>
6 #include <condition_variable>
7 
8 #include "hwriter.h"
9 #include "hwriterconsumer.h"
10 
11 #define DEFAULT_BLOCKS_RESERVED 1000
12 
16 template <class T>
17 class HBufferedWriter : public HWriter<T>, public HWriterConsumer<T>
18 {
19  private:
20 
21  size_t _blocksize;
22  int _blocksReserved;
23  HWriter<T>* _writer;
24  std::vector<T*> _buffer;
25  int _blocks;
26 
27  bool _enabled;
28 
29  std::thread* _drain;
30  bool _isDraining;
31 
32  std::mutex _drainMutex;
33  std::condition_variable _drainLock;
34 
35  std::mutex _readWriteMutex;
36 
37  public:
38 
40  HBufferedWriter(HWriterConsumer<T>* consumer, size_t blocksize, int reserved = DEFAULT_BLOCKS_RESERVED, bool enabled = true):
41  _blocksize(blocksize),
42  _blocksReserved(reserved),
43  _blocks(reserved),
44  _isDraining(false),
45  _enabled(enabled)
46  {
47  Init();
48 
49  consumer->SetWriter(this->Writer());
50  }
51 
53  HBufferedWriter(HWriter<T>* writer, size_t blocksize, int reserved = DEFAULT_BLOCKS_RESERVED, bool enabled = true):
54  _blocksize(blocksize),
55  _blocksReserved(reserved),
56  _blocks(reserved),
57  _isDraining(false),
58  _enabled(enabled),
59  _writer(writer)
60  {
61  Init();
62  }
63 
66  {
67  StopDrain();
68 
69  HLog("Deleting remaining items from the buffer");
70  while( _buffer.size() > 0 ) {
71  delete _buffer.at(0);
72  _buffer.erase( _buffer.begin(), _buffer.begin() + 1 );
73  }
74  HLog("Reducing the size of the buffer");
75  _buffer.shrink_to_fit();
76  HLog("Cleanup completed, ready to delete this object");
77  }
78 
80  int Write(T* src, size_t blocksize);
81 
83  bool Start()
84  {
85  HLog("Start() called");
86  if( _isDraining ) {
87  throw new HWriterIOException("Drain thread is running. Writer must not be started again without being stopped first");
88  }
89 
90  HLog("Starting downstream writer");
91  bool started = _writer->Start();
92 
93  HLog("Starting drain thread");
94  StartDrain();
95 
96  HLog("Drain thread is running");
97  return started;
98  }
99 
101  bool Stop()
102  {
103  HLog("Stop() called");
104  StopDrain();
105 
106  HLog("Drain thread is stopped, stopping downstream writer");
107  return _writer->Stop();
108  }
109 
116  bool Command(HCommand* command)
117  {
118  return _writer->Command(command);
119  }
120 
122  void SetWriter(HWriter<T>* writer)
123  {
124  _writer = writer;
125  }
126 
128  void Flush()
129  {
130  while( _isDraining && _buffer.size() > 0 ) {
131  usleep(100);
132  }
133  }
134 
136  int Reserved()
137  {
138  return _blocks;
139  }
140 
143  int Used()
144  {
145  return _buffer.size();
146  }
147 
149  bool Enabled()
150  {
151  return _enabled;
152  }
153 
154  private:
155 
156  void Init() {
157  _buffer.reserve( _blocks * sizeof(T*) );
158  }
159 
160  void StartDrain() {
161 
162  // Start a new drain thread
163  HLog("Starting drain thread");
164  _isDraining = true;
165  _drain = new std::thread( [this]() {
166  while( _isDraining ) {
167  Drain();
168  }
169  HLog("Drain thread exiting");
170  } );
171  HLog("Drain thread is running");
172  }
173 
174  void StopDrain() {
175 
176  // Stop a running drain thread
177  if( _isDraining && _drain != NULL ) {
178 
179  HLog("Signal halt to drain thread");
180  _isDraining = false;
181 
182  HLog("Awake a waiting drain thread");
183  _drainLock.notify_one();
184 
185  HLog("Joining drain thread");
186  _drain->join();
187  _drain = NULL;
188  HLog("Drain thread joined");
189  }
190  }
191 
192  void Drain();
193 };
194 
195 #endif
HBufferedWriter::SetWriter
void SetWriter(HWriter< T > *writer)
Definition: hbufferedwriter.h:122
HBufferedWriter::Write
int Write(T *src, size_t blocksize)
Definition: hbufferedwriter.cpp:11
HBufferedWriter::Enabled
bool Enabled()
Definition: hbufferedwriter.h:149
HWriter
Definition: hwriter.h:10
HBufferedWriter::Reserved
int Reserved()
Definition: hbufferedwriter.h:136
HWriterConsumer::SetWriter
virtual void SetWriter(HWriter< T > *writer)=0
HBufferedWriter::Start
bool Start()
Definition: hbufferedwriter.h:83
HBufferedWriter::~HBufferedWriter
~HBufferedWriter()
Definition: hbufferedwriter.h:65
HBufferedWriter::Command
bool Command(HCommand *command)
Definition: hbufferedwriter.h:116
HWriter::Command
virtual bool Command(HCommand *command)=0
HWriter::Start
virtual bool Start()
Definition: hwriter.h:21
HBufferedWriter::Stop
bool Stop()
Definition: hbufferedwriter.h:101
HWriterIOException
Definition: hexceptions.h:109
HBufferedWriter::HBufferedWriter
HBufferedWriter(HWriter< T > *writer, size_t blocksize, int reserved=DEFAULT_BLOCKS_RESERVED, bool enabled=true)
Definition: hbufferedwriter.h:53
HBufferedWriter::Flush
void Flush()
Definition: hbufferedwriter.h:128
HCommand
Definition: hcommand.h:81
HWriter::Writer
HWriter< T > * Writer()
Definition: hwriter.h:33
HBufferedWriter::Used
int Used()
Definition: hbufferedwriter.h:143
HWriter::Stop
virtual bool Stop()
Definition: hwriter.h:27
HBufferedWriter::HBufferedWriter
HBufferedWriter(HWriterConsumer< T > *consumer, size_t blocksize, int reserved=DEFAULT_BLOCKS_RESERVED, bool enabled=true)
Definition: hbufferedwriter.h:40
HBufferedWriter
Definition: hbufferedwriter.h:17
HWriterConsumer
Definition: hwriterconsumer.h:8