00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef _PASSENGER_LOGGING_H_
00026 #define _PASSENGER_LOGGING_H_
00027
00028 #include <boost/shared_ptr.hpp>
00029 #include <oxt/system_calls.hpp>
00030 #include <oxt/backtrace.hpp>
00031
00032 #include <sys/types.h>
00033 #include <sys/time.h>
00034 #include <sys/file.h>
00035 #include <sys/resource.h>
00036 #include <unistd.h>
00037 #include <fcntl.h>
00038 #include <pthread.h>
00039 #include <string>
00040 #include <map>
00041 #include <ostream>
00042 #include <sstream>
00043 #include <cstdio>
00044 #include <ctime>
00045
00046 #include "RandomGenerator.h"
00047 #include "FileDescriptor.h"
00048 #include "MessageClient.h"
00049 #include "StaticString.h"
00050 #include "Exceptions.h"
00051 #include "Utils.h"
00052 #include "Utils/StrIntUtils.h"
00053 #include "Utils/MD5.h"
00054 #include "Utils/SystemTime.h"
00055
00056
00057 namespace Passenger {
00058
00059 using namespace std;
00060 using namespace boost;
00061 using namespace oxt;
00062
00063
00064
00065
00066 extern int _logLevel;
00067 extern ostream *_logStream;
00068
00069 int getLogLevel();
00070 void setLogLevel(int value);
00071 void setDebugFile(const char *logFile = NULL);
00072
00073
00074
00075
00076
00077
00078
00079 #define P_LOG_TO(level, expr, stream) \
00080 do { \
00081 if (stream != 0 && Passenger::_logLevel >= level) { \
00082 time_t the_time; \
00083 struct tm the_tm; \
00084 char datetime_buf[60]; \
00085 struct timeval tv; \
00086 std::stringstream sstream; \
00087 \
00088 the_time = time(NULL); \
00089 localtime_r(&the_time, &the_tm); \
00090 strftime(datetime_buf, sizeof(datetime_buf), "%F %H:%M:%S", &the_tm); \
00091 gettimeofday(&tv, NULL); \
00092 sstream << \
00093 "[ pid=" << ((unsigned long) getpid()) << \
00094 " thr=" << pthread_self() << \
00095 " file=" << __FILE__ << ":" << (unsigned long) __LINE__ << \
00096 " time=" << datetime_buf << "." << (unsigned long) (tv.tv_usec / 1000) << \
00097 " ]: " << \
00098 expr << std::endl; \
00099 *stream << sstream.str(); \
00100 stream->flush(); \
00101 } \
00102 } while (false)
00103
00104
00105
00106
00107 #define P_LOG(level, expr) P_LOG_TO(level, expr, Passenger::_logStream)
00108
00109
00110
00111
00112
00113 #define P_WARN(expr) P_LOG(0, expr)
00114
00115
00116
00117
00118
00119 #define P_ERROR(expr) P_LOG(-1, expr)
00120
00121
00122
00123
00124
00125 #define P_DEBUG(expr) P_TRACE(1, expr)
00126
00127 #ifdef PASSENGER_DEBUG
00128 #define P_TRACE(level, expr) P_LOG_TO(level, expr, Passenger::_logStream)
00129
00130 #define P_ASSERT(expr, result_if_failed, message) \
00131 do { \
00132 if (!(expr)) { \
00133 P_ERROR("Assertion failed: " << message); \
00134 return result_if_failed; \
00135 } \
00136 } while (false)
00137 #define P_ASSERT_WITH_VOID_RETURN(expr, message) \
00138 do { \
00139 if (!(expr)) { \
00140 P_ERROR("Assertion failed: " << message); \
00141 return; \
00142 } \
00143 } while (false)
00144 #else
00145 #define P_TRACE(level, expr) do { } while (false)
00146
00147 #define P_ASSERT(expr, result_if_failed, message) do { } while (false)
00148 #define P_ASSERT_WITH_VOID_RETURN(expr, message) do { } while (false)
00149 #endif
00150
00151
00152
00153
00154 struct AnalyticsLoggerSharedData {
00155 boost::mutex lock;
00156 MessageClient client;
00157
00158 void disconnect(bool checkErrorResponse = false) {
00159 if (checkErrorResponse && client.connected()) {
00160
00161
00162 TRACE_POINT();
00163 vector<string> args;
00164 bool hasData = true;
00165
00166 try {
00167 hasData = client.read(args);
00168 } catch (const SystemException &e) {
00169 if (e.code() != ECONNRESET) {
00170 throw;
00171 }
00172 }
00173
00174 UPDATE_TRACE_POINT();
00175 client.disconnect();
00176 if (hasData) {
00177 if (args[0] == "error") {
00178 throw IOException("The logging server responded with an error: " + args[1]);
00179 } else {
00180 throw IOException("The logging server sent an unexpected reply.");
00181 }
00182 }
00183 } else {
00184 client.disconnect();
00185 }
00186 }
00187 };
00188 typedef shared_ptr<AnalyticsLoggerSharedData> AnalyticsLoggerSharedDataPtr;
00189
00190 class AnalyticsLog {
00191 private:
00192 static const int INT64_STR_BUFSIZE = 22;
00193
00194 AnalyticsLoggerSharedDataPtr sharedData;
00195 string txnId;
00196 string groupName;
00197 string category;
00198 string unionStationKey;
00199 bool shouldFlushToDiskAfterClose;
00200
00201
00202
00203
00204 char *insertTxnIdAndTimestamp(char *buffer) {
00205 int size;
00206
00207
00208 memcpy(buffer, txnId.c_str(), txnId.size());
00209 buffer += txnId.size();
00210
00211
00212 *buffer = ' ';
00213 buffer++;
00214
00215
00216 size = snprintf(buffer, INT64_STR_BUFSIZE, "%llu", SystemTime::getUsec());
00217 if (size >= INT64_STR_BUFSIZE) {
00218
00219 throw IOException("Cannot format a new transaction log message timestamp.");
00220 }
00221 buffer += size;
00222
00223
00224 *buffer = ' ';
00225
00226 return buffer + 1;
00227 }
00228
00229 public:
00230 AnalyticsLog() { }
00231
00232 AnalyticsLog(const AnalyticsLoggerSharedDataPtr &sharedData, const string &txnId,
00233 const string &groupName, const string &category, const string &unionStationKey)
00234 {
00235 this->sharedData = sharedData;
00236 this->txnId = txnId;
00237 this->groupName = groupName;
00238 this->category = category;
00239 this->unionStationKey = unionStationKey;
00240 shouldFlushToDiskAfterClose = false;
00241 }
00242
00243 ~AnalyticsLog() {
00244 if (sharedData != NULL) {
00245 lock_guard<boost::mutex> l(sharedData->lock);
00246 if (sharedData->client.connected()) {
00247 try {
00248 char timestamp[2 * sizeof(unsigned long long) + 1];
00249 integerToHexatri<unsigned long long>(SystemTime::getUsec(),
00250 timestamp);
00251 sharedData->client.write("closeTransaction",
00252 txnId.c_str(), timestamp, NULL);
00253 } catch (const SystemException &e) {
00254 if (e.code() == EPIPE || e.code() == ECONNRESET) {
00255 TRACE_POINT();
00256 sharedData->disconnect(true);
00257 } else {
00258 throw;
00259 }
00260 }
00261
00262 if (shouldFlushToDiskAfterClose) {
00263 vector<string> args;
00264 sharedData->client.write("flush", NULL);
00265 sharedData->client.read(args);
00266 }
00267 }
00268 }
00269 }
00270
00271 void message(const StaticString &text) {
00272 if (sharedData != NULL) {
00273 lock_guard<boost::mutex> l(sharedData->lock);
00274 if (sharedData->client.connected()) {
00275 char timestamp[2 * sizeof(unsigned long long) + 1];
00276 integerToHexatri<unsigned long long>(SystemTime::getUsec(), timestamp);
00277 sharedData->client.write("log", txnId.c_str(),
00278 timestamp, NULL);
00279 sharedData->client.writeScalar(text);
00280 }
00281 }
00282 }
00283
00284 void abort(const StaticString &text) {
00285 if (sharedData != NULL) {
00286 lock_guard<boost::mutex> l(sharedData->lock);
00287 if (sharedData->client.connected()) {
00288 message("ABORT");
00289 }
00290 }
00291 }
00292
00293 void flushToDiskAfterClose(bool value) {
00294 shouldFlushToDiskAfterClose = value;
00295 }
00296
00297 bool isNull() const {
00298 return sharedData == NULL;
00299 }
00300
00301 string getTxnId() const {
00302 return txnId;
00303 }
00304
00305 string getGroupName() const {
00306 return groupName;
00307 }
00308
00309 string getCategory() const {
00310 return category;
00311 }
00312
00313 string getUnionStationKey() const {
00314 return unionStationKey;
00315 }
00316 };
00317
00318 typedef shared_ptr<AnalyticsLog> AnalyticsLogPtr;
00319
00320 class AnalyticsScopeLog {
00321 private:
00322 AnalyticsLog *log;
00323 enum {
00324 NAME,
00325 GRANULAR
00326 } type;
00327 union {
00328 const char *name;
00329 struct {
00330 const char *endMessage;
00331 const char *abortMessage;
00332 } granular;
00333 } data;
00334 bool ok;
00335
00336 static string timevalToString(struct timeval &tv) {
00337 unsigned long long i = (unsigned long long) tv.tv_sec * 1000000 + tv.tv_usec;
00338 return usecToString(i);
00339 }
00340
00341 static string usecToString(unsigned long long usec) {
00342 char timestamp[2 * sizeof(unsigned long long) + 1];
00343 integerToHexatri<unsigned long long>(usec, timestamp);
00344 return timestamp;
00345 }
00346
00347 public:
00348 AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *name) {
00349 this->log = log.get();
00350 type = NAME;
00351 data.name = name;
00352 ok = false;
00353 if (log != NULL && !log->isNull()) {
00354 string message;
00355 struct rusage usage;
00356
00357 message.reserve(150);
00358 message.append("BEGIN: ");
00359 message.append(name);
00360 message.append(" (");
00361 message.append(usecToString(SystemTime::getUsec()));
00362 message.append(",");
00363 if (getrusage(RUSAGE_SELF, &usage) == -1) {
00364 int e = errno;
00365 throw SystemException("getrusage() failed", e);
00366 }
00367 message.append(timevalToString(usage.ru_utime));
00368 message.append(",");
00369 message.append(timevalToString(usage.ru_stime));
00370 message.append(") ");
00371 log->message(message);
00372 }
00373 }
00374
00375 AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *beginMessage,
00376 const char *endMessage, const char *abortMessage = NULL
00377 ) {
00378 this->log = log.get();
00379 if (log != NULL) {
00380 type = GRANULAR;
00381 data.granular.endMessage = endMessage;
00382 data.granular.abortMessage = abortMessage;
00383 ok = abortMessage == NULL;
00384 log->message(beginMessage);
00385 }
00386 }
00387
00388 ~AnalyticsScopeLog() {
00389 if (log == NULL) {
00390 return;
00391 }
00392 if (type == NAME) {
00393 if (!log->isNull()) {
00394 string message;
00395 struct rusage usage;
00396
00397 message.reserve(150);
00398 if (ok) {
00399 message.append("END: ");
00400 } else {
00401 message.append("FAIL: ");
00402 }
00403 message.append(data.name);
00404 message.append(" (");
00405 message.append(usecToString(SystemTime::getUsec()));
00406 message.append(",");
00407 if (getrusage(RUSAGE_SELF, &usage) == -1) {
00408 int e = errno;
00409 throw SystemException("getrusage() failed", e);
00410 }
00411 message.append(timevalToString(usage.ru_utime));
00412 message.append(",");
00413 message.append(timevalToString(usage.ru_stime));
00414 message.append(")");
00415 log->message(message);
00416 }
00417 } else {
00418 if (ok) {
00419 log->message(data.granular.endMessage);
00420 } else {
00421 log->message(data.granular.abortMessage);
00422 }
00423 }
00424 }
00425
00426 void success() {
00427 ok = true;
00428 }
00429 };
00430
00431 class AnalyticsLogger {
00432 private:
00433 static const int RETRY_SLEEP = 200000;
00434
00435 string serverAddress;
00436 string username;
00437 string password;
00438 string nodeName;
00439 RandomGenerator randomGenerator;
00440 unsigned int maxConnectTries;
00441 unsigned long long reconnectTimeout;
00442 unsigned long long nextReconnectTime;
00443
00444
00445 AnalyticsLoggerSharedDataPtr sharedData;
00446
00447 bool connected() const {
00448 return sharedData->client.connected();
00449 }
00450
00451 void connect() {
00452 TRACE_POINT();
00453 vector<string> args;
00454
00455 sharedData->client.connect(serverAddress, username, password);
00456 sharedData->client.write("init", nodeName.c_str(), NULL);
00457 if (!sharedData->client.read(args)) {
00458 throw SystemException("Cannot connect to logging server", ECONNREFUSED);
00459 } else if (args.size() != 1) {
00460 throw IOException("Logging server returned an invalid reply for the 'init' command");
00461 } else if (args[0] == "server shutting down") {
00462 throw SystemException("Cannot connect to server", ECONNREFUSED);
00463 } else if (args[0] != "ok") {
00464 throw IOException("Logging server returned an invalid reply for the 'init' command");
00465 }
00466
00467
00468
00469 sharedData->client.setAutoDisconnect(false);
00470 }
00471
00472 void disconnect(bool checkErrorResponse = false) {
00473 sharedData->disconnect(checkErrorResponse);
00474
00475
00476
00477 sharedData.reset(new AnalyticsLoggerSharedData());
00478 }
00479
00480 bool isNetworkError(int code) const {
00481 return code == EPIPE || code == ECONNREFUSED || code == ECONNRESET
00482 || code == EHOSTUNREACH || code == ENETDOWN || code == ENETUNREACH
00483 || code == ETIMEDOUT;
00484 }
00485
00486 public:
00487 AnalyticsLogger() { }
00488
00489 AnalyticsLogger(const string &serverAddress, const string &username,
00490 const string &password, const string &nodeName = "")
00491 {
00492 this->serverAddress = serverAddress;
00493 this->username = username;
00494 this->password = password;
00495 if (nodeName.empty()) {
00496 this->nodeName = getHostName();
00497 } else {
00498 this->nodeName = nodeName;
00499 }
00500 if (!serverAddress.empty()) {
00501 sharedData.reset(new AnalyticsLoggerSharedData());
00502 }
00503 if (isLocalSocketAddress(serverAddress)) {
00504 maxConnectTries = 10;
00505 } else {
00506 maxConnectTries = 1;
00507 }
00508 maxConnectTries = 10;
00509 reconnectTimeout = 60 * 1000000;
00510 nextReconnectTime = 0;
00511 }
00512
00513 AnalyticsLogPtr newTransaction(const string &groupName, const string &category = "requests",
00514 const string &unionStationKey = string())
00515 {
00516 if (serverAddress.empty()) {
00517 return ptr(new AnalyticsLog());
00518 }
00519
00520 unsigned long long timestamp = SystemTime::getUsec();
00521 char txnId[
00522 2 * sizeof(unsigned int) +
00523 11 +
00524 1
00525 ];
00526 char *end;
00527 unsigned int timestampSize;
00528 char timestampStr[2 * sizeof(unsigned long long) + 1];
00529
00530
00531
00532
00533
00534 timestampSize = integerToHexatri<unsigned int>(timestamp / 1000000 / 60,
00535 txnId);
00536 end = txnId + timestampSize;
00537
00538
00539 *end = '-';
00540 end++;
00541
00542
00543 randomGenerator.generateAsciiString(end, 11);
00544 end += 11;
00545 *end = '\0';
00546
00547 integerToHexatri<unsigned long long>(timestamp, timestampStr);
00548
00549 lock_guard<boost::mutex> l(sharedData->lock);
00550
00551 if (SystemTime::getUsec() >= nextReconnectTime) {
00552 unsigned int tryCount = 0;
00553
00554 while (tryCount < maxConnectTries) {
00555 try {
00556 if (!connected()) {
00557 TRACE_POINT();
00558 connect();
00559 }
00560 sharedData->client.write("openTransaction",
00561 txnId,
00562 groupName.c_str(),
00563 "",
00564 category.c_str(),
00565 timestampStr,
00566 unionStationKey.c_str(),
00567 "true",
00568 NULL);
00569 return ptr(new AnalyticsLog(sharedData,
00570 string(txnId, end - txnId),
00571 groupName, category,
00572 unionStationKey));
00573 } catch (const SystemException &e) {
00574 TRACE_POINT();
00575 if (e.code() == ENOENT || isNetworkError(e.code())) {
00576 tryCount++;
00577 disconnect(true);
00578 if (tryCount < maxConnectTries) {
00579 syscalls::usleep(RETRY_SLEEP);
00580 }
00581 } else {
00582 disconnect();
00583 throw;
00584 }
00585 }
00586
00587
00588 P_WARN("Cannot connect to the logging agent (" << serverAddress << "); " <<
00589 "retrying in " << reconnectTimeout / 1000000 << " seconds.");
00590 nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
00591 }
00592 }
00593 return ptr(new AnalyticsLog());
00594 }
00595
00596 AnalyticsLogPtr continueTransaction(const string &txnId, const string &groupName,
00597 const string &category = "requests", const string &unionStationKey = string())
00598 {
00599 if (serverAddress.empty() || txnId.empty()) {
00600 return ptr(new AnalyticsLog());
00601 }
00602
00603 char timestampStr[2 * sizeof(unsigned long long) + 1];
00604 integerToHexatri<unsigned long long>(SystemTime::getUsec(), timestampStr);
00605
00606 lock_guard<boost::mutex> l(sharedData->lock);
00607
00608 if (SystemTime::getUsec() >= nextReconnectTime) {
00609 unsigned int tryCount = 0;
00610
00611 while (tryCount < maxConnectTries) {
00612 try {
00613 if (!connected()) {
00614 TRACE_POINT();
00615 connect();
00616 }
00617 sharedData->client.write("openTransaction",
00618 txnId.c_str(),
00619 groupName.c_str(),
00620 "",
00621 category.c_str(),
00622 timestampStr,
00623 unionStationKey.c_str(),
00624 "true",
00625 NULL);
00626 return ptr(new AnalyticsLog(sharedData,
00627 txnId, groupName, category,
00628 unionStationKey));
00629 } catch (const SystemException &e) {
00630 TRACE_POINT();
00631 if (e.code() == EPIPE || isNetworkError(e.code())) {
00632 tryCount++;
00633 disconnect(true);
00634 if (tryCount < maxConnectTries) {
00635 syscalls::usleep(RETRY_SLEEP);
00636 }
00637 } else {
00638 disconnect();
00639 throw;
00640 }
00641 }
00642 }
00643
00644
00645 P_WARN("Cannot connect to the logging agent (" << serverAddress << "); " <<
00646 "retrying in " << reconnectTimeout / 1000000 << " seconds.");
00647 nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
00648 }
00649 return ptr(new AnalyticsLog());
00650 }
00651
00652 void setMaxConnectTries(unsigned int value) {
00653 lock_guard<boost::mutex> l(sharedData->lock);
00654 maxConnectTries = value;
00655 }
00656
00657 void setReconnectTimeout(unsigned long long usec) {
00658 lock_guard<boost::mutex> l(sharedData->lock);
00659 reconnectTimeout = usec;
00660 }
00661
00662 bool isNull() const {
00663 return serverAddress.empty();
00664 }
00665
00666 string getAddress() const {
00667 return serverAddress;
00668 }
00669
00670 string getUsername() const {
00671 return username;
00672 }
00673
00674 string getPassword() const {
00675 return password;
00676 }
00677
00678 FileDescriptor getConnection() const {
00679 return sharedData->client.getConnection();
00680 }
00681
00682
00683
00684
00685 string getNodeName() const {
00686 return nodeName;
00687 }
00688 };
00689
00690 typedef shared_ptr<AnalyticsLogger> AnalyticsLoggerPtr;
00691
00692 }
00693
00694 #endif
00695