首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊flink的FileSystem

聊聊flink的FileSystem

原创
作者头像
code4it
发布于 2019-03-02 02:49:51
发布于 2019-03-02 02:49:51
2.2K00
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下flink的FileSystem

FileSystem

flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Public
public abstract class FileSystem {/**
     * The possible write modes. The write mode decides what happens if a file should be created,
     * but already exists.
     */
    public enum WriteMode {/** Creates the target file only if no file exists at that path already.
         * Does not overwrite existing files and directories. */
        NO_OVERWRITE,/** Creates a new target file regardless of any existing files or directories.
         * Existing files and directories will be deleted (recursively) automatically before
         * creating the new file. */
        OVERWRITE
    }// ------------------------------------------------------------------------/** Logger for all FileSystem work. */
    private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
     * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */
    private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);/** Object used to protect calls to specific methods.*/
    private static final ReentrantLock LOCK = new ReentrantLock(true);/** Cache for file systems, by scheme + authority. */
    private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();/** All available file system factories. */
    private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems();/** Mapping of file system schemes to the corresponding factories,
     * populated in {@link FileSystem#initialize(Configuration)}. */
    private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();/** The default factory that is used when no scheme matches. */
    private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();/** The default filesystem scheme to be used, configured during process-wide initialization.
     * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
    private static URI defaultScheme;//......// ------------------------------------------------------------------------
    //  Initialization
    // ------------------------------------------------------------------------/**
     * Initializes the shared file system settings.
     *
     * <p>The given configuration is passed to each file system factory to initialize the respective
     * file systems. Because the configuration of file systems may be different subsequent to the call
     * of this method, this method clears the file system instance cache.
     *
     * <p>This method also reads the default file system URI from the configuration key
     * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
     * the URI has no scheme will be interpreted as relative to that URI.
     * As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}.
     * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
     * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
     *
     * @param config the configuration from where to fetch the parameter.
     */
    public static void initialize(Configuration config) throws IOException, IllegalConfigurationException {
        LOCK.lock();
        try {
            // make sure file systems are re-instantiated after re-configuration
            CACHE.clear();
            FS_FACTORIES.clear();// configure all file system factories
            for (FileSystemFactory factory : RAW_FACTORIES) {
                factory.configure(config);
                String scheme = factory.getScheme();
​
                FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
                FS_FACTORIES.put(scheme, fsf);
            }// configure the default (fallback) factory
            FALLBACK_FACTORY.configure(config);// also read the default file system scheme
            final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
            if (stringifiedUri == null) {
                defaultScheme = null;
            }
            else {
                try {
                    defaultScheme = new URI(stringifiedUri);
                }
                catch (URISyntaxException e) {
                    throw new IllegalConfigurationException("The default file system scheme ('" +
                            CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
                }
            }
        }
        finally {
            LOCK.unlock();
        }
    }// ------------------------------------------------------------------------
    //  Obtaining File System Instances
    // ------------------------------------------------------------------------public static FileSystem getLocalFileSystem() {
        return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LocalFileSystem.getSharedInstance());
    }public static FileSystem get(URI uri) throws IOException {
        return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
    }
​
    @Internal
    public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
        checkNotNull(fsUri, "file system URI");LOCK.lock();
        try {
            final URI uri;if (fsUri.getScheme() != null) {
                uri = fsUri;
            }
            else {
                // Apply the default fs scheme
                final URI defaultUri = getDefaultFsUri();
                URI rewrittenUri = null;try {
                    rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(),
                            defaultUri.getPort(), fsUri.getPath(), null, null);
                }
                catch (URISyntaxException e) {
                    // for local URIs, we make one more try to repair the path by making it absolute
                    if (defaultUri.getScheme().equals("file")) {
                        try {
                            rewrittenUri = new URI(
                                    "file", null,
                                    new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(),
                                    null);
                        } catch (URISyntaxException ignored) {
                            // could not help it...
                        }
                    }
                }if (rewrittenUri != null) {
                    uri = rewrittenUri;
                }
                else {
                    throw new IOException("The file system URI '" + fsUri +
                            "' declares no scheme and cannot be interpreted relative to the default file system URI ("
                            + defaultUri + ").");
                }
            }// print a helpful pointer for malformed local URIs (happens a lot to new users)
            if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
                String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();throw new IOException("Found local file path with authority '" + uri.getAuthority() + "' in path '"
                        + uri.toString() + "'. Hint: Did you forget a slash? (correct path would be '" + supposedUri + "')");
            }
​
            final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());// See if there is a file system object in the cache
            {
                FileSystem cached = CACHE.get(key);
                if (cached != null) {
                    return cached;
                }
            }// this "default" initialization makes sure that the FileSystem class works
            // even when not configured with an explicit Flink configuration, like on
            // JobManager or TaskManager setup
            if (FS_FACTORIES.isEmpty()) {
                initialize(new Configuration());
            }// Try to create a new file system
            final FileSystem fs;
            final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme());if (factory != null) {
                fs = factory.create(uri);
            }
            else {
                try {
                    fs = FALLBACK_FACTORY.create(uri);
                }
                catch (UnsupportedFileSystemSchemeException e) {
                    throw new UnsupportedFileSystemSchemeException(
                            "Could not find a file system implementation for scheme '" + uri.getScheme() +
                                    "'. The scheme is not directly supported by Flink and no Hadoop file " +
                                    "system to support this scheme could be loaded.", e);
                }
            }CACHE.put(key, fs);
            return fs;
        }
        finally {
            LOCK.unlock();
        }
    }public static URI getDefaultFsUri() {
        return defaultScheme != null ? defaultScheme : LocalFileSystem.getLocalFsURI();
    }// ------------------------------------------------------------------------
    //  File System Methods
    // ------------------------------------------------------------------------public abstract Path getWorkingDirectory();public abstract Path getHomeDirectory();public abstract URI getUri();public abstract FileStatus getFileStatus(Path f) throws IOException;public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException;public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;public abstract FSDataInputStream open(Path f) throws IOException;public RecoverableWriter createRecoverableWriter() throws IOException {
        throw new UnsupportedOperationException("This file system does not support recoverable writers.");
    }public abstract FileStatus[] listStatus(Path f) throws IOException;public boolean exists(final Path f) throws IOException {
        try {
            return (getFileStatus(f) != null);
        } catch (FileNotFoundException e) {
            return false;
        }
    }public abstract boolean delete(Path f, boolean recursive) throws IOException;public abstract boolean mkdirs(Path f) throws IOException;
​
​
    public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;public abstract boolean rename(Path src, Path dst) throws IOException;public abstract boolean isDistributedFS();public abstract FileSystemKind getKind();// ------------------------------------------------------------------------
    //  output directory initialization
    // ------------------------------------------------------------------------public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
        if (isDistributedFS()) {
            return false;
        }// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
        // concurrently work in this method (multiple output formats writing locally) might end
        // up deleting each other's directories and leave non-retrievable files, without necessarily
        // causing an exception. That results in very subtle issues, like output files looking as if
        // they are not getting created.// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
        // here can cancel faster
        try {
            OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
        }
        catch (InterruptedException e) {
            // restore the interruption state
            Thread.currentThread().interrupt();// leave the method - we don't have the lock anyways
            throw new IOException("The thread was interrupted while trying to initialize the output directory");
        }try {
            FileStatus status;
            try {
                status = getFileStatus(outPath);
            }
            catch (FileNotFoundException e) {
                // okay, the file is not there
                status = null;
            }// check if path exists
            if (status != null) {
                // path exists, check write mode
                switch (writeMode) {case NO_OVERWRITE:
                    if (status.isDir() && createDirectory) {
                        return true;
                    } else {
                        // file may not be overwritten
                        throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " +
                                "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
                                WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
                    }case OVERWRITE:
                    if (status.isDir()) {
                        if (createDirectory) {
                            // directory exists and does not need to be created
                            return true;
                        } else {
                            // we will write in a single file, delete directory
                            try {
                                delete(outPath, true);
                            }
                            catch (IOException e) {
                                throw new IOException("Could not remove existing directory '" + outPath +
                                        "' to allow overwrite by result file", e);
                            }
                        }
                    }
                    else {
                        // delete file
                        try {
                            delete(outPath, false);
                        }
                        catch (IOException e) {
                            throw new IOException("Could not remove existing file '" + outPath +
                                    "' to allow overwrite by result file/directory", e);
                        }
                    }
                    break;default:
                    throw new IllegalArgumentException("Invalid write mode: " + writeMode);
                }
            }if (createDirectory) {
                // Output directory needs to be created
                if (!exists(outPath)) {
                    mkdirs(outPath);
                }// double check that the output directory exists
                try {
                    return getFileStatus(outPath).isDir();
                }
                catch (FileNotFoundException e) {
                    return false;
                }
            }
            else {
                // check that the output path does not exist and an output file
                // can be created by the output format.
                return !exists(outPath);
            }
        }
        finally {
            OUTPUT_DIRECTORY_INIT_LOCK.unlock();
        }
    }public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
        if (!isDistributedFS()) {
            return false;
        }// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
        // concurrently work in this method (multiple output formats writing locally) might end
        // up deleting each other's directories and leave non-retrievable files, without necessarily
        // causing an exception. That results in very subtle issues, like output files looking as if
        // they are not getting created.// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
        // here can cancel faster
        try {
            OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
        }
        catch (InterruptedException e) {
            // restore the interruption state
            Thread.currentThread().interrupt();// leave the method - we don't have the lock anyways
            throw new IOException("The thread was interrupted while trying to initialize the output directory");
        }try {
            // check if path exists
            if (exists(outPath)) {
                // path exists, check write mode
                switch(writeMode) {case NO_OVERWRITE:
                    // file or directory may not be overwritten
                    throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
                            WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
                                " mode to overwrite existing files and directories.");case OVERWRITE:
                    // output path exists. We delete it and all contained files in case of a directory.
                    try {
                        delete(outPath, true);
                    } catch (IOException e) {
                        // Some other thread might already have deleted the path.
                        // If - for some other reason - the path could not be deleted,
                        // this will be handled later.
                    }
                    break;default:
                    throw new IllegalArgumentException("Invalid write mode: " + writeMode);
                }
            }if (createDirectory) {
                // Output directory needs to be created
                try {
                    if (!exists(outPath)) {
                        mkdirs(outPath);
                    }
                } catch (IOException ioe) {
                    // Some other thread might already have created the directory.
                    // If - for some other reason - the directory could not be created
                    // and the path does not exist, this will be handled later.
                }// double check that the output directory exists
                return exists(outPath) && getFileStatus(outPath).isDir();
            }
            else {
                // single file case: check that the output path does not exist and
                // an output file can be created by the output format.
                return !exists(outPath);
            }
        }
        finally {
            OUTPUT_DIRECTORY_INIT_LOCK.unlock();
        }
    }//......
}
  • FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统
  • FileSystem定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现
  • FileSystem提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法

LocalFileSystem

flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Internal
public class LocalFileSystem extends FileSystem {private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);/** The URI representing the local file system. */
    private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");/** The shared instance of the local file system. */
    private static final LocalFileSystem INSTANCE = new LocalFileSystem();/** Path pointing to the current working directory.
     * Because Paths are not immutable, we cannot cache the proper path here */
    private final URI workingDir;/** Path pointing to the current working directory.
     * Because Paths are not immutable, we cannot cache the proper path here. */
    private final URI homeDir;/** The host name of this machine. */
    private final String hostName;/**
     * Constructs a new <code>LocalFileSystem</code> object.
     */
    public LocalFileSystem() {
        this.workingDir = new File(System.getProperty("user.dir")).toURI();
        this.homeDir = new File(System.getProperty("user.home")).toURI();
​
        String tmp = "unknownHost";
        try {
            tmp = InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            LOG.error("Could not resolve local host", e);
        }
        this.hostName = tmp;
    }// ------------------------------------------------------------------------
​
    @Override
    public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
        return new BlockLocation[] {
                new LocalBlockLocation(hostName, file.getLen())
        };
    }
​
    @Override
    public FileStatus getFileStatus(Path f) throws IOException {
        final File path = pathToFile(f);
        if (path.exists()) {
            return new LocalFileStatus(path, this);
        }
        else {
            throw new FileNotFoundException("File " + f + " does not exist or the user running "
                    + "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
        }
    }
​
    @Override
    public URI getUri() {
        return LOCAL_URI;
    }
​
    @Override
    public Path getWorkingDirectory() {
        return new Path(workingDir);
    }
​
    @Override
    public Path getHomeDirectory() {
        return new Path(homeDir);
    }
​
    @Override
    public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
        return open(f);
    }
​
    @Override
    public FSDataInputStream open(final Path f) throws IOException {
        final File file = pathToFile(f);
        return new LocalDataInputStream(file);
    }
​
    @Override
    public LocalRecoverableWriter createRecoverableWriter() throws IOException {
        return new LocalRecoverableWriter(this);
    }
​
    @Override
    public boolean exists(Path f) throws IOException {
        final File path = pathToFile(f);
        return path.exists();
    }
​
    @Override
    public FileStatus[] listStatus(final Path f) throws IOException {
​
        final File localf = pathToFile(f);
        FileStatus[] results;if (!localf.exists()) {
            return null;
        }
        if (localf.isFile()) {
            return new FileStatus[] { new LocalFileStatus(localf, this) };
        }
​
        final String[] names = localf.list();
        if (names == null) {
            return null;
        }
        results = new FileStatus[names.length];
        for (int i = 0; i < names.length; i++) {
            results[i] = getFileStatus(new Path(f, names[i]));
        }return results;
    }
​
    @Override
    public boolean delete(final Path f, final boolean recursive) throws IOException {
​
        final File file = pathToFile(f);
        if (file.isFile()) {
            return file.delete();
        } else if ((!recursive) && file.isDirectory()) {
            File[] containedFiles = file.listFiles();
            if (containedFiles == null) {
                throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred");
            } else if (containedFiles.length != 0) {
                throw new IOException("Directory " + file.toString() + " is not empty");
            }
        }return delete(file);
    }/**
     * Deletes the given file or directory.
     *
     * @param f
     *        the file to be deleted
     * @return <code>true</code> if all files were deleted successfully, <code>false</code> otherwise
     * @throws IOException
     *         thrown if an error occurred while deleting the files/directories
     */
    private boolean delete(final File f) throws IOException {if (f.isDirectory()) {
            final File[] files = f.listFiles();
            if (files != null) {
                for (File file : files) {
                    final boolean del = delete(file);
                    if (!del) {
                        return false;
                    }
                }
            }
        } else {
            return f.delete();
        }// Now directory is empty
        return f.delete();
    }/**
     * Recursively creates the directory specified by the provided path.
     *
     * @return <code>true</code>if the directories either already existed or have been created successfully,
     *         <code>false</code> otherwise
     * @throws IOException
     *         thrown if an error occurred while creating the directory/directories
     */
    @Override
    public boolean mkdirs(final Path f) throws IOException {
        checkNotNull(f, "path is null");
        return mkdirsInternal(pathToFile(f));
    }private boolean mkdirsInternal(File file) throws IOException {
        if (file.isDirectory()) {
                return true;
        }
        else if (file.exists() && !file.isDirectory()) {
            // Important: The 'exists()' check above must come before the 'isDirectory()' check to
            //            be safe when multiple parallel instances try to create the directory// exists and is not a directory -> is a regular file
            throw new FileAlreadyExistsException(file.getAbsolutePath());
        }
        else {
            File parent = file.getParentFile();
            return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory());
        }
    }
​
    @Override
    public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException {
        checkNotNull(filePath, "filePath");if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
            throw new FileAlreadyExistsException("File already exists: " + filePath);
        }
​
        final Path parent = filePath.getParent();
        if (parent != null && !mkdirs(parent)) {
            throw new IOException("Mkdirs failed to create " + parent);
        }
​
        final File file = pathToFile(filePath);
        return new LocalDataOutputStream(file);
    }
​
    @Override
    public boolean rename(final Path src, final Path dst) throws IOException {
        final File srcFile = pathToFile(src);
        final File dstFile = pathToFile(dst);
​
        final File dstParent = dstFile.getParentFile();// Files.move fails if the destination directory doesn't exist
        //noinspection ResultOfMethodCallIgnored -- we don't care if the directory existed or was created
        dstParent.mkdirs();try {
            Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            return true;
        }
        catch (NoSuchFileException | AccessDeniedException | DirectoryNotEmptyException | SecurityException ex) {
            // catch the errors that are regular "move failed" exceptions and return false
            return false;
        }
    }
​
    @Override
    public boolean isDistributedFS() {
        return false;
    }
​
    @Override
    public FileSystemKind getKind() {
        return FileSystemKind.FILE_SYSTEM;
    }// ------------------------------------------------------------------------/**
     * Converts the given Path to a File for this file system.
     *
     * <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
     */
    public File pathToFile(Path path) {
        if (!path.isAbsolute()) {
            path = new Path(getWorkingDirectory(), path);
        }
        return new File(path.toUri().getPath());
    }// ------------------------------------------------------------------------/**
     * Gets the URI that represents the local file system.
     * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
     * UNIX family platforms.
     *
     * @return The URI that represents the local file system.
     */
    public static URI getLocalFsURI() {
        return LOCAL_URI;
    }/**
     * Gets the shared instance of this file system.
     *
     * @return The shared instance of this file system.
     */
    public static LocalFileSystem getSharedInstance() {
        return INSTANCE;
    }
}
  • LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM

HadoopFileSystem

flink-1.7.2/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class HadoopFileSystem extends FileSystem {/** The wrapped Hadoop File System. */
    private final org.apache.hadoop.fs.FileSystem fs;/* This field caches the file system kind. It is lazily set because the file system
    * URL is lazily initialized. */
    private FileSystemKind fsKind;
​
​
    /**
     * Wraps the given Hadoop File System object as a Flink File System object.
     * The given Hadoop file system object is expected to be initialized already.
     *
     * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
     */
    public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
        this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
    }/**
     * Gets the underlying Hadoop FileSystem.
     * @return The underlying Hadoop FileSystem.
     */
    public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
        return this.fs;
    }// ------------------------------------------------------------------------
    //  file system methods
    // ------------------------------------------------------------------------
​
    @Override
    public Path getWorkingDirectory() {
        return new Path(this.fs.getWorkingDirectory().toUri());
    }public Path getHomeDirectory() {
        return new Path(this.fs.getHomeDirectory().toUri());
    }
​
    @Override
    public URI getUri() {
        return fs.getUri();
    }
​
    @Override
    public FileStatus getFileStatus(final Path f) throws IOException {
        org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(toHadoopPath(f));
        return new HadoopFileStatus(status);
    }
​
    @Override
    public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
            throws IOException {
        if (!(file instanceof HadoopFileStatus)) {
            throw new IOException("file is not an instance of DistributedFileStatus");
        }
​
        final HadoopFileStatus f = (HadoopFileStatus) file;
​
        final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
            start, len);// Wrap up HDFS specific block location objects
        final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
        for (int i = 0; i < distBlkLocations.length; i++) {
            distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
        }return distBlkLocations;
    }
​
    @Override
    public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
        final org.apache.hadoop.fs.Path path = toHadoopPath(f);
        final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
        return new HadoopDataInputStream(fdis);
    }
​
    @Override
    public HadoopDataInputStream open(final Path f) throws IOException {
        final org.apache.hadoop.fs.Path path = toHadoopPath(f);
        final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
        return new HadoopDataInputStream(fdis);
    }
​
    @Override
    @SuppressWarnings("deprecation")
    public HadoopDataOutputStream create(
            final Path f,
            final boolean overwrite,
            final int bufferSize,
            final short replication,
            final long blockSize) throws IOException {
​
        final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
                toHadoopPath(f), overwrite, bufferSize, replication, blockSize);
        return new HadoopDataOutputStream(fdos);
    }
​
    @Override
    public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
        final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
                this.fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
        return new HadoopDataOutputStream(fsDataOutputStream);
    }
​
    @Override
    public boolean delete(final Path f, final boolean recursive) throws IOException {
        return this.fs.delete(toHadoopPath(f), recursive);
    }
​
    @Override
    public boolean exists(Path f) throws IOException {
        return this.fs.exists(toHadoopPath(f));
    }
​
    @Override
    public FileStatus[] listStatus(final Path f) throws IOException {
        final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(toHadoopPath(f));
        final FileStatus[] files = new FileStatus[hadoopFiles.length];// Convert types
        for (int i = 0; i < files.length; i++) {
            files[i] = new HadoopFileStatus(hadoopFiles[i]);
        }return files;
    }
​
    @Override
    public boolean mkdirs(final Path f) throws IOException {
        return this.fs.mkdirs(toHadoopPath(f));
    }
​
    @Override
    public boolean rename(final Path src, final Path dst) throws IOException {
        return this.fs.rename(toHadoopPath(src), toHadoopPath(dst));
    }
​
    @SuppressWarnings("deprecation")
    @Override
    public long getDefaultBlockSize() {
        return this.fs.getDefaultBlockSize();
    }
​
    @Override
    public boolean isDistributedFS() {
        return true;
    }
​
    @Override
    public FileSystemKind getKind() {
        if (fsKind == null) {
            fsKind = getKindForScheme(this.fs.getUri().getScheme());
        }
        return fsKind;
    }
​
    @Override
    public RecoverableWriter createRecoverableWriter() throws IOException {
        // This writer is only supported on a subset of file systems, and on
        // specific versions. We check these schemes and versions eagerly for better error
        // messages in the constructor of the writer.
        return new HadoopRecoverableWriter(fs);
    }// ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------public static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
        return new org.apache.hadoop.fs.Path(path.toUri());
    }/**
     * Gets the kind of the file system from its scheme.
     *
     * <p>Implementation note: Initially, especially within the Flink 1.3.x line
     * (in order to not break backwards compatibility), we must only label file systems
     * as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
     * Otherwise, we cause regression for example in the performance and cleanup handling
     * of checkpoints.
     * For that reason, we initially mark some filesystems as 'eventually consistent' or
     * as 'object stores', and leave the others as 'consistent file systems'.
     */
    static FileSystemKind getKindForScheme(String scheme) {
        scheme = scheme.toLowerCase(Locale.US);if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
            // the Amazon S3 storage
            return FileSystemKind.OBJECT_STORE;
        }
        else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
            // file servers instead of file systems
            // they might actually be consistent, but we have no hard guarantees
            // currently to rely on that
            return FileSystemKind.OBJECT_STORE;
        }
        else {
            // the remainder should include hdfs, kosmos, ceph, ...
            // this also includes federated HDFS (viewfs).
            return FileSystemKind.FILE_SYSTEM;
        }
    }}
  • HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem

小结

  • FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统;它定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现;提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法
  • LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM
  • HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
Go语言入门
在编程语言的江湖中,Go(Golang)的崛起堪称传奇——诞生仅十余年,却已成为云计算、微服务、高并发领域的「事实标准」。从Docker到Kubernetes,从Etcd到TiDB,Go的身影遍布现代基础设施的核心场景。它既非C/C++的性能王者,亦非Python的胶水语言,却以「简单、高效、可维护」的设计哲学,重新定义了「工程级语言」的标准。
小文要打代码
2025/07/10
1120
深度解密Go语言之scheduler
一个月前,《Go 语言高级编程》作者柴树杉老师在 CSDN 上发表了一篇《Go 语言十年而立,Go2 蓄势待发》,视角十分宏大。我们既要低头看路,有时也要抬头看天,这篇文章就属于“抬头”看天类的,推荐阅读。
梦醒人间
2019/09/04
1.1K0
深度解密Go语言之scheduler
字节跳动的 Go 语言面试会问哪些问题?
这个一般分为两个层次,初中级开发(1-1、1-2)和高级开发(2-1、2-2),不同级别的面试一般要求是不一样的。对于初中级开发,一般会问一些语言层面的东西,一些常用的基础原理和一些算法,但是高级开发就没那么简单了。下面我为读者分享一段面试的经历。
肉眼品世界
2020/11/11
3.5K0
Go语言的一些使用心得
起初一直使用的Python,到了18年下半年由于业务需求而接触了Golang,从开始学习到现在的快半年里,也用Golang写了些代码,公司产品和业余写的都有,今天就写点Golang相关的总结或者感想吧。
程序员同行者
2019/04/19
7380
100天精通Golang(基础入门篇)——第1天:学习Go语言基本概念
欢迎来到100天精通Golang的基础入门篇!在这个系列教程中,我们将带领你逐步掌握Go语言的核心概念和语法。本篇博文将着重介绍Go语言的基本概念,包括其定义、用途和优势。通过阅读本文,你将对Go语言有一个清晰的了解。
猫头虎
2024/04/09
9240
100天精通Golang(基础入门篇)——第1天:学习Go语言基本概念
Go语言学习之旅 1 - 简介
语法简单 并发模型 Goroutine 是 Go 最显著的特征。它用类协程的方式来处理并发单元 并发编程变得极为容易,无须处理回调,无须关注线程切换,仅一个关键字,简单而自然。 搭配 channel,将并发单元间的数据耦合拆解开来,这对所有纠结于内存共享、锁粒度的开发人员都是一个可期盼的解脱 内存分配 垃圾回收 静态链接 将运行时、依赖库直接打包到可执行文件内部 标准库 其中值得称道的是 net/http,仅须简单几条语句就能实现一个高性能 Web Server, 工具链
张云飞Vir
2020/03/16
3650
五分钟技术小分享 - 2022Week11
Go的源码会被编译成二进制文件,然后直接在对应的操作系统上运行。那么,这对学习GC有什么意义呢?让我们一起看看今天的内容。
junedayday
2022/03/29
3440
五分钟技术小分享 - 2022Week11
Go语言垃圾回收器的限制优先优化策略
为了解决这个问题,Go语言的垃圾回收器引入了一种叫做限制优先(Limit Prioritization)的优化策略。这种策略的基本思想是:尽量减少垃圾回收对程序运行性能的影响,即在满足程序运行要求的同时,尽可能降低垃圾回收的开销。
运维开发王义杰
2023/08/10
1630
Go语言垃圾回收器的限制优先优化策略
Go语言简介&开发环境配置
Go (又称GoLang)是一款比较年轻的开源编程语言,它是从2007年末由来自谷歌的Robert Griesemer, Rob Pike, Ken Thompson主持开发,后来又加入了后来还加入了Ian Lance Taylor, Russ Cox等人,并最终于2009年11月开源。现在Go语言拥有活跃的开发社区和开发人员,国内的一些大公司(字节、快手、B站)也纷纷开始转向基于Go的后端开发。
Steve Wang
2020/12/21
8600
Go语言简介&开发环境配置
[Go语言]采用Go语言作为服务端编程语言的建议书
按:这是我给公司(部门)写的使用推广Go语言的建议书,给领导看了以后,领导同意使用Go语言对一些服务器程序进行改写并部署到外网进行验证。希望这篇文章能够给同样在自己公司内部推广Go语言的技术人员有一些帮助。同时如果发现文章中有疏漏不足错误之处也欢迎提出。 采用Go语言作为服务端编程语言的建议书 一、当前的挑战 随着互联网时代的到来,软件(特别是网络游戏)版本更新产品更迭的速度也在加快,这对软件开发效率和质量提出了更高的要求。只有更快更好更多地拿出产品,软件公司才能在市场上取得一席之地。随着
李海彬
2018/03/22
2.5K0
阿里太狠了,把人问蒙了
在互联网大厂Java后端面试中,阿里巴巴算是里面难度比较高的,面试都对Java 技术特别熟,所以问Java 都会比较深。
小林coding
2024/05/17
2740
阿里太狠了,把人问蒙了
Go 语言的前生今世与介绍
Go 语言的创始人有三位,分别是图灵奖获得者、C 语法联合发明人、Unix 之父肯·汤普森(Ken Thompson),Plan 9 操作系统领导者、UTF-8 编码的最初设计者罗伯·派克(Rob Pike),以及 Java 的 HotSpot 虚拟机和 Chrome 浏览器的 JavaScript V8 引擎的设计者之一罗伯特·格瑞史莫(Robert Griesemer)。
贾维斯Echo
2023/10/18
1.4K0
Go 语言的前生今世与介绍
nodejs php go语言了解
1、Nodejs 1) 简单的说 Node.js 就是运行在服务端的 JavaScript。 2) Node.js 是一个基于Chrome JavaScript 运行时建立的一个平台。 3) Node.js是一个事件驱动I/O服务端JavaScript环境,基于Google的V8引擎,V8引擎执行Javascript的速度非常快,性能非常好。 4) 我们写下的js代码,是在单线程的环境中执行,但nodejs本身不是单线程的。如果我们在代码中调用了nodejs提供的异步api(如IO等),它们可能是通过底层的
李海彬
2018/03/21
1.9K0
Golang与Java全方位对比总结
Golang: 编码风格及可见域规则严格且简单;Java: 来说层次接口清晰、规范,主要表现有以下这些。
腾讯技术工程官方号
2023/07/15
1.3K0
Golang与Java全方位对比总结
Go语言的runtime包深入解析
Go语言的runtime包是Go标准库中非常重要的一部分,它包含了与Go运行时系统(包括内存分配、垃圾回收、并发调度等)相关的底层函数和数据结构。理解runtime包的工作机制,有助于开发者更好地优化Go应用程序的性能。
二一年冬末
2024/07/01
6420
Go语言入门介绍
Go语言是一门由Google设计和开发的编程语言,于2009年首次公开发布。自此以后,Go语言在短时间内就迅速发展壮大,并逐渐成为了一个备受关注的编程语言。
一个风轻云淡
2023/11/23
2660
Java 革新之路:GraalVM 原生镜像
Java 主导着企业级应用。但在云计算领域,采用 Java 的成本比它的一些竞争对手更高。原生编译降低了在云端采用 Java 的成本:用它创建的应用程序启动速度更快,使用的内存更少。
xcbeyond
2022/09/07
1.6K0
Java 革新之路:GraalVM 原生镜像
[GO专栏-2]Go语言的设计哲学
所谓大道至简,就是这个道理,语言终归只是个工具,能用简单的方式处理问题为什么要复杂起来呢?
苏州程序大白
2022/04/13
8210
[GO专栏-2]Go语言的设计哲学
不是语言之争--Go vs Erlang
因为 云巴 系统对高并发、低延迟的需求,我们对各个语言、平台做了很多的调研比较工作。这自然就包括致力于开发高并发应用的 Go 和 Erlang。 并发 Go 对高并发的支持通过 goroutine 实现。goroutine 可以理解为轻量级的 线程(thread)。同一个 Go 应用创建的 goroutine 共享地址空间。 Erlang 的高并发通过轻量级 进程(process)实现,每一个进程都有独立的状态记录。 另外,使用 goroutine 要注意,goroutine 运行完毕后,占用的内存放回内存
李海彬
2018/03/21
3K0
go面试题目收集
使用append向Slice追加元素时, 如果Slice空间不足, 将会触发Slice扩容, 扩容实际上重新一配一块更大的内存, 将原Slice数据拷贝进新Slice, 然后返回新Slice, 扩容后再将数据追加进去。 扩容容量的选择遵循以下规则:
twelvecoder
2021/12/24
7340
go面试题目收集
相关推荐
Go语言入门
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档