Skip to content

Commit

Permalink
[3.0] Cherry pick #9978 to 3.0 (#10040)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangchengming666 committed Jun 10, 2022
1 parent db15513 commit 25d9daa
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
Expand Up @@ -20,7 +20,6 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
Expand All @@ -35,10 +34,10 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -75,10 +74,12 @@ public class AccessLogFilter implements Filter {
// It's safe to declare it as singleton since it runs on single thread only
private final DateFormat fileNameFormatter = new SimpleDateFormat(FILE_DATE_FORMAT);

private final Map<String, Set<AccessLogData>> logEntries = new ConcurrentHashMap<>();
private final Map<String, Queue<AccessLogData>> logEntries = new ConcurrentHashMap<>();

private AtomicBoolean scheduled = new AtomicBoolean();

private static final String LINE_SEPARATOR = "line.separator";

/**
* Default constructor initialize demon thread for writing into access log file with names with access log key
* defined in url <b>accesslog</b>
Expand All @@ -104,7 +105,7 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
try {
String accessLogKey = invoker.getUrl().getParameter(ACCESS_LOG_KEY);
if (ConfigUtils.isNotEmpty(accessLogKey)) {
AccessLogData logData = AccessLogData.newLogData();
AccessLogData logData = AccessLogData.newLogData();
logData.buildAccessLogData(invoker, inv);
log(accessLogKey, logData);
}
Expand All @@ -115,20 +116,20 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
}

private void log(String accessLog, AccessLogData accessLogData) {
Set<AccessLogData> logSet = logEntries.computeIfAbsent(accessLog, k -> new ConcurrentHashSet<>());
Queue<AccessLogData> logQueue = logEntries.computeIfAbsent(accessLog, k -> new ConcurrentLinkedQueue<>());

if (logSet.size() < LOG_MAX_BUFFER) {
logSet.add(accessLogData);
if (logQueue.size() < LOG_MAX_BUFFER) {
logQueue.add(accessLogData);
} else {
logger.warn("AccessLog buffer is full. Do a force writing to file to clear buffer.");
//just write current logSet to file.
writeLogSetToFile(accessLog, logSet);
writeLogSetToFile(accessLog, logQueue);
//after force writing, add accessLogData to current logSet
logSet.add(accessLogData);
logQueue.add(accessLogData);
}
}

private void writeLogSetToFile(String accessLog, Set<AccessLogData> logSet) {
private void writeLogSetToFile(String accessLog, Queue<AccessLogData> logSet) {
try {
if (ConfigUtils.isDefault(accessLog)) {
processWithServiceLogger(logSet);
Expand All @@ -148,23 +149,24 @@ private void writeLogSetToFile(String accessLog, Set<AccessLogData> logSet) {

private void writeLogToFile() {
if (!logEntries.isEmpty()) {
for (Map.Entry<String, Set<AccessLogData>> entry : logEntries.entrySet()) {
for (Map.Entry<String, Queue<AccessLogData>> entry : logEntries.entrySet()) {
String accessLog = entry.getKey();
Set<AccessLogData> logSet = entry.getValue();
Queue<AccessLogData> logSet = entry.getValue();
writeLogSetToFile(accessLog, logSet);
}
}
}

private void processWithAccessKeyLogger(Set<AccessLogData> logSet, File file) throws IOException {
try (FileWriter writer = new FileWriter(file, true)) {
for (Iterator<AccessLogData> iterator = logSet.iterator();
iterator.hasNext();
iterator.remove()) {
writer.write(iterator.next().getLogMessage());
writer.write(System.getProperty("line.separator"));
private void processWithAccessKeyLogger(Queue<AccessLogData> logQueue, File file) throws IOException {
FileWriter writer = new FileWriter(file, true);
try {
while (!logQueue.isEmpty()) {
writer.write(logQueue.poll().getLogMessage());
writer.write(System.getProperty(LINE_SEPARATOR));
}
} finally {
writer.flush();
writer.close();
}
}

Expand All @@ -180,11 +182,9 @@ private AccessLogData buildAccessLogData(Invoker<?> invoker, Invocation inv) {
return logData;
}

private void processWithServiceLogger(Set<AccessLogData> logSet) {
for (Iterator<AccessLogData> iterator = logSet.iterator();
iterator.hasNext();
iterator.remove()) {
AccessLogData logData = iterator.next();
private void processWithServiceLogger(Queue<AccessLogData> logQueue) {
while (!logQueue.isEmpty()) {
AccessLogData logData = logQueue.poll();
LoggerFactory.getLogger(LOG_KEY + "." + logData.getServiceName()).info(logData.getLogMessage());
}
}
Expand Down
Expand Up @@ -30,7 +30,7 @@

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;
import java.util.Queue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void testDefault() throws NoSuchFieldException, IllegalAccessException {

accessLogFilter.invoke(invoker, invocation);

Map<String, Set<AccessLogData>> logs = (Map<String, Set<AccessLogData>>) field.get(accessLogFilter);
Map<String, Queue<AccessLogData>> logs = (Map<String, Queue<AccessLogData>>) field.get(accessLogFilter);
assertFalse(logs.isEmpty());
assertFalse(logs.get("true").isEmpty());
AccessLogData log = logs.get("true").iterator().next();
Expand Down

0 comments on commit 25d9daa

Please sign in to comment.