/* * Copyright 1999-2019 Seata.io Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.seata.server.storage.file.store; import io.seata.common.exception.StoreException; import io.seata.common.thread.NamedThreadFactory; import io.seata.common.util.CollectionUtils; import io.seata.server.session.BranchSession; import io.seata.server.session.GlobalSession; import io.seata.server.session.SessionCondition; import io.seata.server.session.SessionManager; import io.seata.server.storage.file.FlushDiskMode; import io.seata.server.storage.file.ReloadableStore; import io.seata.server.storage.file.TransactionWriteStore; import io.seata.server.store.AbstractTransactionStoreManager; import io.seata.server.store.SessionStorable; import io.seata.server.store.StoreConfig; import io.seata.server.store.TransactionStoreManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import static io.seata.core.context.RootContext.MDC_KEY_BRANCH_ID; /** * The type File transaction store manager. * * @author slievrly */ public class FileTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager, ReloadableStore { private static final Logger LOGGER = LoggerFactory.getLogger(FileTransactionStoreManager.class); private static final int MAX_THREAD_WRITE = 1; private ExecutorService fileWriteExecutor; private volatile boolean stopping = false; private static final int MAX_SHUTDOWN_RETRY = 3; private static final int SHUTDOWN_CHECK_INTERVAL = 1 * 1000; private static final int MAX_WRITE_RETRY = 5; private static final String HIS_DATA_FILENAME_POSTFIX = ".1"; private static final AtomicLong FILE_TRX_NUM = new AtomicLong(0); private static final AtomicLong FILE_FLUSH_NUM = new AtomicLong(0); private static final int MARK_SIZE = 4; private static final int MAX_WAIT_TIME_MILLS = 2 * 1000; private static final int MAX_FLUSH_TIME_MILLS = 2 * 1000; private static final int MAX_FLUSH_NUM = 10; private static final int PER_FILE_BLOCK_SIZE = 65535 * 8; private static final long MAX_TRX_TIMEOUT_MILLS = 30 * 60 * 1000; private static volatile long trxStartTimeMills = System.currentTimeMillis(); private File currDataFile; private RandomAccessFile currRaf; private FileChannel currFileChannel; private long recoverCurrOffset = 0; private long recoverHisOffset = 0; private SessionManager sessionManager; private String currFullFileName; private String hisFullFileName; private WriteDataFileRunnable writeDataFileRunnable; private ReentrantLock writeSessionLock = new ReentrantLock(); private volatile long lastModifiedTime; private static final int MAX_WRITE_BUFFER_SIZE = StoreConfig.getFileWriteBufferCacheSize(); private final ByteBuffer writeBuffer = ByteBuffer.allocateDirect(MAX_WRITE_BUFFER_SIZE); private static final FlushDiskMode FLUSH_DISK_MODE = StoreConfig.getFlushDiskMode(); private static final int MAX_WAIT_FOR_FLUSH_TIME_MILLS = 2 * 1000; private static final int MAX_WAIT_FOR_CLOSE_TIME_MILLS = 2 * 1000; private static final int INT_BYTE_SIZE = 4; /** * Instantiates a new File transaction store manager. * * @param fullFileName the dir path * @param sessionManager the session manager * @throws IOException the io exception */ public FileTransactionStoreManager(String fullFileName, SessionManager sessionManager) throws IOException { initFile(fullFileName); fileWriteExecutor = new ThreadPoolExecutor(MAX_THREAD_WRITE, MAX_THREAD_WRITE, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("fileTransactionStore", MAX_THREAD_WRITE, true)); writeDataFileRunnable = new WriteDataFileRunnable(); fileWriteExecutor.submit(writeDataFileRunnable); this.sessionManager = sessionManager; } private void initFile(String fullFileName) throws IOException { this.currFullFileName = fullFileName; this.hisFullFileName = fullFileName + HIS_DATA_FILENAME_POSTFIX; try { currDataFile = new File(currFullFileName); if (!currDataFile.exists()) { // create parent dir first if (currDataFile.getParentFile() != null && !currDataFile.getParentFile().exists()) { currDataFile.getParentFile().mkdirs(); } currDataFile.createNewFile(); trxStartTimeMills = System.currentTimeMillis(); } else { trxStartTimeMills = currDataFile.lastModified(); } lastModifiedTime = System.currentTimeMillis(); currRaf = new RandomAccessFile(currDataFile, "rw"); currRaf.seek(currDataFile.length()); currFileChannel = currRaf.getChannel(); } catch (IOException exx) { LOGGER.error("init file error,{}", exx.getMessage(), exx); throw exx; } } @Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { long curFileTrxNum; writeSessionLock.lock(); try { if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) { return false; } lastModifiedTime = System.currentTimeMillis(); curFileTrxNum = FILE_TRX_NUM.incrementAndGet(); if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) { return saveHistory(); } } catch (Exception exx) { LOGGER.error("writeSession error, {}", exx.getMessage(), exx); return false; } finally { writeSessionLock.unlock(); } flushDisk(curFileTrxNum, currFileChannel); return true; } private void flushDisk(long curFileNum, FileChannel currFileChannel) { if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) { SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel); writeDataFileRunnable.putRequest(syncFlushRequest); syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS); } else { writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel)); } } /** * get all overTimeSessionStorables * merge write file * * @throws IOException */ private boolean saveHistory() throws IOException { boolean result; try { result = findTimeoutAndSave(); CloseFileRequest request = new CloseFileRequest(currFileChannel, currRaf); writeDataFileRunnable.putRequest(request); request.waitForClose(MAX_WAIT_FOR_CLOSE_TIME_MILLS); Files.move(currDataFile.toPath(), new File(hisFullFileName).toPath(), StandardCopyOption.REPLACE_EXISTING); } catch (IOException exx) { LOGGER.error("save history data file error, {}", exx.getMessage(), exx); result = false; } finally { initFile(currFullFileName); } return result; } private boolean writeDataFrame(byte[] data) { if (data == null || data.length <= 0) { return true; } int dataLength = data.length; int bufferRemainingSize = writeBuffer.remaining(); if (bufferRemainingSize <= INT_BYTE_SIZE) { if (!flushWriteBuffer(writeBuffer)) { return false; } } bufferRemainingSize = writeBuffer.remaining(); if (bufferRemainingSize <= INT_BYTE_SIZE) { throw new IllegalStateException( String.format("Write buffer remaining size %d was too small", bufferRemainingSize)); } writeBuffer.putInt(dataLength); bufferRemainingSize = writeBuffer.remaining(); int dataPos = 0; while (dataPos < dataLength) { int dataLengthToWrite = dataLength - dataPos; dataLengthToWrite = Math.min(dataLengthToWrite, bufferRemainingSize); writeBuffer.put(data, dataPos, dataLengthToWrite); bufferRemainingSize = writeBuffer.remaining(); if (bufferRemainingSize == 0) { if (!flushWriteBuffer(writeBuffer)) { return false; } bufferRemainingSize = writeBuffer.remaining(); } dataPos += dataLengthToWrite; } return true; } private boolean flushWriteBuffer(ByteBuffer writeBuffer) { writeBuffer.flip(); if (!writeDataFileByBuffer(writeBuffer)) { return false; } writeBuffer.clear(); return true; } private boolean findTimeoutAndSave() throws IOException { List globalSessionsOverMaxTimeout = sessionManager.findGlobalSessions( new SessionCondition(MAX_TRX_TIMEOUT_MILLS)); if (CollectionUtils.isEmpty(globalSessionsOverMaxTimeout)) { return true; } for (GlobalSession globalSession : globalSessionsOverMaxTimeout) { TransactionWriteStore globalWriteStore = new TransactionWriteStore(globalSession, LogOperation.GLOBAL_ADD); byte[] data = globalWriteStore.encode(); if (!writeDataFrame(data)) { return false; } List branchSessIonsOverMaXTimeout = globalSession.getSortedBranches(); if (branchSessIonsOverMaXTimeout != null) { for (BranchSession branchSession : branchSessIonsOverMaXTimeout) { try { MDC.put(MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId())); TransactionWriteStore branchWriteStore = new TransactionWriteStore(branchSession, LogOperation.BRANCH_ADD); data = branchWriteStore.encode(); if (!writeDataFrame(data)) { return false; } } finally { MDC.remove(MDC_KEY_BRANCH_ID); } } } } if (flushWriteBuffer(writeBuffer)) { currFileChannel.force(false); return true; } return false; } @Override public GlobalSession readSession(String xid) { throw new StoreException("unsupport for read from file, xid:" + xid); } @Override public List readSession(SessionCondition sessionCondition) { throw new StoreException("unsupport for read from file"); } @Override public void shutdown() { if (fileWriteExecutor != null) { fileWriteExecutor.shutdown(); stopping = true; int retry = 0; while (!fileWriteExecutor.isTerminated() && retry < MAX_SHUTDOWN_RETRY) { ++retry; try { Thread.sleep(SHUTDOWN_CHECK_INTERVAL); } catch (InterruptedException ignore) { } } if (retry >= MAX_SHUTDOWN_RETRY) { fileWriteExecutor.shutdownNow(); } } try { if (currFileChannel.isOpen()) { currFileChannel.force(true); } } catch (IOException e) { LOGGER.error("fileChannel force error: {}", e.getMessage(), e); } closeFile(currRaf); } @Override public List readWriteStore(int readSize, boolean isHistory) { File file = null; long currentOffset = 0; if (isHistory) { file = new File(hisFullFileName); currentOffset = recoverHisOffset; } else { file = new File(currFullFileName); currentOffset = recoverCurrOffset; } if (file.exists()) { return parseDataFile(file, readSize, currentOffset, isHistory); } return null; } @Override public boolean hasRemaining(boolean isHistory) { File file; RandomAccessFile raf = null; long currentOffset; if (isHistory) { file = new File(hisFullFileName); currentOffset = recoverHisOffset; } else { file = new File(currFullFileName); currentOffset = recoverCurrOffset; } try { raf = new RandomAccessFile(file, "r"); return currentOffset < raf.length(); } catch (IOException ignore) { } finally { closeFile(raf); } return false; } private List parseDataFile(File file, int readSize, long currentOffset, boolean isHistory) { List transactionWriteStores = new ArrayList<>(readSize); RandomAccessFile raf = null; FileChannel fileChannel = null; try { raf = new RandomAccessFile(file, "r"); raf.seek(currentOffset); fileChannel = raf.getChannel(); fileChannel.position(currentOffset); long size = raf.length(); ByteBuffer buffSize = ByteBuffer.allocate(MARK_SIZE); while (fileChannel.position() < size) { try { buffSize.clear(); int avilReadSize = fileChannel.read(buffSize); if (avilReadSize != MARK_SIZE) { break; } buffSize.flip(); int bodySize = buffSize.getInt(); byte[] byBody = new byte[bodySize]; ByteBuffer buffBody = ByteBuffer.wrap(byBody); avilReadSize = fileChannel.read(buffBody); if (avilReadSize != bodySize) { break; } TransactionWriteStore writeStore = new TransactionWriteStore(); writeStore.decode(byBody); transactionWriteStores.add(writeStore); if (transactionWriteStores.size() == readSize) { break; } } catch (Exception ex) { LOGGER.error("decode data file error:{}", ex.getMessage(), ex); break; } } return transactionWriteStores; } catch (IOException exx) { LOGGER.error("parse data file error:{},file:{}", exx.getMessage(), file.getName(), exx); return null; } finally { try { if (fileChannel != null) { if (isHistory) { recoverHisOffset = fileChannel.position(); } else { recoverCurrOffset = fileChannel.position(); } } closeFile(raf); } catch (IOException exx) { LOGGER.error("file close error{}", exx.getMessage(), exx); } } } private void closeFile(RandomAccessFile raf) { try { if (raf != null) { raf.close(); raf = null; } } catch (IOException exx) { LOGGER.error("file close error,{}", exx.getMessage(), exx); } } private boolean writeDataFile(byte[] bs) { if (bs == null || bs.length >= Integer.MAX_VALUE - 3) { return false; } if (!writeDataFrame(bs)) { return false; } return flushWriteBuffer(writeBuffer); } private boolean writeDataFileByBuffer(ByteBuffer byteBuffer) { for (int retry = 0; retry < MAX_WRITE_RETRY; retry++) { try { while (byteBuffer.hasRemaining()) { currFileChannel.write(byteBuffer); } return true; } catch (Exception exx) { LOGGER.error("write data file error:{}", exx.getMessage(), exx); } } LOGGER.error("write dataFile failed,retry more than :{}", MAX_WRITE_RETRY); return false; } interface StoreRequest { } abstract static class AbstractFlushRequest implements StoreRequest { private final long curFileTrxNum; private final FileChannel curFileChannel; protected AbstractFlushRequest(long curFileTrxNum, FileChannel curFileChannel) { this.curFileTrxNum = curFileTrxNum; this.curFileChannel = curFileChannel; } public long getCurFileTrxNum() { return curFileTrxNum; } public FileChannel getCurFileChannel() { return curFileChannel; } } class SyncFlushRequest extends AbstractFlushRequest { private final CountDownLatch countDownLatch = new CountDownLatch(1); public SyncFlushRequest(long curFileTrxNum, FileChannel curFileChannel) { super(curFileTrxNum, curFileChannel); } public void wakeup() { this.countDownLatch.countDown(); } public void waitForFlush(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.error("Interrupted", e); } } } class AsyncFlushRequest extends AbstractFlushRequest { public AsyncFlushRequest(long curFileTrxNum, FileChannel curFileChannel) { super(curFileTrxNum, curFileChannel); } } static class CloseFileRequest implements StoreRequest { private final CountDownLatch countDownLatch = new CountDownLatch(1); private FileChannel fileChannel; private RandomAccessFile file; public CloseFileRequest(FileChannel fileChannel, RandomAccessFile file) { this.fileChannel = fileChannel; this.file = file; } public FileChannel getFileChannel() { return fileChannel; } public RandomAccessFile getFile() { return file; } public void wakeup() { this.countDownLatch.countDown(); } public void waitForClose(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.error("Interrupted", e); } } } /** * The type Write data file runnable. */ class WriteDataFileRunnable implements Runnable { private LinkedBlockingQueue storeRequests = new LinkedBlockingQueue<>(); public void putRequest(final StoreRequest request) { storeRequests.add(request); } @Override public void run() { while (!stopping) { try { StoreRequest storeRequest = storeRequests.poll(MAX_WAIT_TIME_MILLS, TimeUnit.MILLISECONDS); handleStoreRequest(storeRequest); } catch (Exception exx) { LOGGER.error("write file error: {}", exx.getMessage(), exx); } } handleRestRequest(); } /** * handle the rest requests when stopping is true */ private void handleRestRequest() { int remainNums = storeRequests.size(); for (int i = 0; i < remainNums; i++) { handleStoreRequest(storeRequests.poll()); } } private void handleStoreRequest(StoreRequest storeRequest) { if (storeRequest == null) { flushOnCondition(currFileChannel); } if (storeRequest instanceof SyncFlushRequest) { syncFlush((SyncFlushRequest)storeRequest); } else if (storeRequest instanceof AsyncFlushRequest) { async((AsyncFlushRequest)storeRequest); } else if (storeRequest instanceof CloseFileRequest) { closeAndFlush((CloseFileRequest)storeRequest); } } private void closeAndFlush(CloseFileRequest req) { long diff = FILE_TRX_NUM.get() - FILE_FLUSH_NUM.get(); flush(req.getFileChannel()); FILE_FLUSH_NUM.addAndGet(diff); closeFile(req.getFile()); req.wakeup(); } private void async(AsyncFlushRequest req) { flushOnCondition(req.getCurFileChannel()); } private void syncFlush(SyncFlushRequest req) { if (req.getCurFileTrxNum() > FILE_FLUSH_NUM.get()) { long diff = FILE_TRX_NUM.get() - FILE_FLUSH_NUM.get(); flush(req.getCurFileChannel()); FILE_FLUSH_NUM.addAndGet(diff); } // notify req.wakeup(); } private void flushOnCondition(FileChannel fileChannel) { if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) { return; } long diff = FILE_TRX_NUM.get() - FILE_FLUSH_NUM.get(); if (diff == 0) { return; } if (diff % MAX_FLUSH_NUM == 0 || System.currentTimeMillis() - lastModifiedTime > MAX_FLUSH_TIME_MILLS) { flush(fileChannel); FILE_FLUSH_NUM.addAndGet(diff); } } private void flush(FileChannel fileChannel) { try { fileChannel.force(false); } catch (IOException exx) { LOGGER.error("flush error: {}", exx.getMessage(), exx); } } } }