blob: 7743301553a027e77202a4b6e4300fb35bfeb4e5 [file] [log] [blame]
Joe Onorato1754d742016-11-21 17:51:35 -08001/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "incidentd"
18
19#include "FdBuffer.h"
20
21#include <cutils/log.h>
22#include <utils/SystemClock.h>
23
24#include <fcntl.h>
25#include <poll.h>
26#include <unistd.h>
Yi Jin0a3406f2017-06-22 19:23:11 -070027#include <wait.h>
Joe Onorato1754d742016-11-21 17:51:35 -080028
Yi Jin0a3406f2017-06-22 19:23:11 -070029const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB
Joe Onorato1754d742016-11-21 17:51:35 -080030const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max
31
Joe Onorato1754d742016-11-21 17:51:35 -080032FdBuffer::FdBuffer()
33 :mBuffers(),
34 mStartTime(-1),
35 mFinishTime(-1),
36 mCurrentWritten(-1),
37 mTimedOut(false),
38 mTruncated(false)
39{
40}
41
42FdBuffer::~FdBuffer()
43{
44 const int N = mBuffers.size();
45 for (int i=0; i<N; i++) {
46 uint8_t* buf = mBuffers[i];
47 free(buf);
48 }
49}
50
51status_t
52FdBuffer::read(int fd, int64_t timeout)
53{
54 struct pollfd pfds = {
55 .fd = fd,
56 .events = POLLIN
57 };
58 mStartTime = uptimeMillis();
59
60 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
61
62 uint8_t* buf = NULL;
63 while (true) {
64 if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
65 if (mBuffers.size() == MAX_BUFFER_COUNT) {
66 mTruncated = true;
67 break;
68 }
69 buf = (uint8_t*)malloc(BUFFER_SIZE);
70 if (buf == NULL) {
71 return NO_MEMORY;
72 }
73 mBuffers.push_back(buf);
74 mCurrentWritten = 0;
75 }
76
77 int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
78 if (remainingTime <= 0) {
79 mTimedOut = true;
80 break;
81 }
82
83 int count = poll(&pfds, 1, remainingTime);
84 if (count == 0) {
85 mTimedOut = true;
86 break;
87 } else if (count < 0) {
88 return -errno;
89 } else {
90 if ((pfds.revents & POLLERR) != 0) {
91 return errno != 0 ? -errno : UNKNOWN_ERROR;
92 } else {
93 ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
94 if (amt < 0) {
95 if (errno == EAGAIN || errno == EWOULDBLOCK) {
96 continue;
97 } else {
98 return -errno;
99 }
100 } else if (amt == 0) {
101 break;
102 }
103 mCurrentWritten += amt;
104 }
105 }
106 }
107
108 mFinishTime = uptimeMillis();
109 return NO_ERROR;
110}
111
Yi Jin0a3406f2017-06-22 19:23:11 -0700112status_t
113FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs)
114{
115 struct pollfd pfds[] = {
116 { .fd = fd, .events = POLLIN },
117 { .fd = toFd, .events = POLLOUT },
118 { .fd = fromFd, .events = POLLIN },
119 };
120
121 mStartTime = uptimeMillis();
122
123 // mark all fds non blocking
124 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
125 fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK);
126 fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK);
127
128 // A circular buffer holds data read from fd and writes to parsing process
129 uint8_t cirBuf[BUFFER_SIZE];
130 size_t cirSize = 0;
131 int rpos = 0, wpos = 0;
132
133 // This is the buffer used to store processed data
134 uint8_t* buf = NULL;
135 while (true) {
136 if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
137 if (mBuffers.size() == MAX_BUFFER_COUNT) {
138 mTruncated = true;
139 break;
140 }
141 buf = (uint8_t*)malloc(BUFFER_SIZE);
142 if (buf == NULL) {
143 return NO_MEMORY;
144 }
145 mBuffers.push_back(buf);
146 mCurrentWritten = 0;
147 }
148
149 int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
150 if (remainingTime <= 0) {
151 mTimedOut = true;
152 break;
153 }
154
155 // wait for any pfds to be ready to perform IO
156 int count = poll(pfds, 3, remainingTime);
157 if (count == 0) {
158 mTimedOut = true;
159 break;
160 } else if (count < 0) {
161 return -errno;
162 }
163
164 // make sure no errors occur on any fds
165 for (int i = 0; i < 3; ++i) {
166 if ((pfds[i].revents & POLLERR) != 0) {
167 return errno != 0 ? -errno : UNKNOWN_ERROR;
168 }
169 }
170
171 // read from fd
172 if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
173 ssize_t amt;
174 if (rpos >= wpos) {
175 amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
176 } else {
177 amt = :: read(fd, cirBuf + rpos, wpos - rpos);
178 }
179 if (amt < 0) {
180 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
181 return -errno;
182 } // otherwise just continue
183 } else if (amt == 0) { // reach EOF so don't have to poll pfds[0].
184 ::close(pfds[0].fd);
185 pfds[0].fd = -1;
186 } else {
187 rpos += amt;
188 cirSize += amt;
189 }
190 }
191
192 // write to parsing process
193 if (cirSize > 0 && pfds[1].fd != -1) {
194 ssize_t amt;
195 if (rpos > wpos) {
196 amt = ::write(toFd, cirBuf + wpos, rpos - wpos);
197 } else {
198 amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos);
199 }
200 if (amt < 0) {
201 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
202 return -errno;
203 } // otherwise just continue
204 } else {
205 wpos += amt;
206 cirSize -= amt;
207 }
208 }
209
210 // if buffer is empty and fd is closed, close write fd.
211 if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
212 ::close(pfds[1].fd);
213 pfds[1].fd = -1;
214 }
215
216 // circular buffer, reset rpos and wpos
217 if (rpos >= BUFFER_SIZE) {
218 rpos = 0;
219 }
220 if (wpos >= BUFFER_SIZE) {
221 wpos = 0;
222 }
223
224 // read from parsing process
225 ssize_t amt = ::read(fromFd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
226 if (amt < 0) {
227 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
228 return -errno;
229 } // otherwise just continue
230 } else if (amt == 0) {
231 break;
232 } else {
233 mCurrentWritten += amt;
234 }
235 }
236
237 mFinishTime = uptimeMillis();
238 return NO_ERROR;
239}
240
Joe Onorato1754d742016-11-21 17:51:35 -0800241size_t
242FdBuffer::size()
243{
244 return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten;
245}
246
247status_t
248FdBuffer::write(ReportRequestSet* reporter)
249{
250 const int N = mBuffers.size() - 1;
251 for (int i=0; i<N; i++) {
252 reporter->write(mBuffers[i], BUFFER_SIZE);
253 }
254 reporter->write(mBuffers[N], mCurrentWritten);
255 return NO_ERROR;
256}
257
258